feat(coal): 添加煤洗日报分析模块的 Server-Sent Events (SSE) 功能

- 新增 SSE 控制器方法和相关服务类
- 实现 SSE 的注册和广播功能
- 修改现有 API,支持 SSE 请求
- 优化过滤器和响应处理,以适应 SSE
This commit is contained in:
2025-02-26 20:47:50 +08:00
parent c052ad24c9
commit 8433c09977
11 changed files with 138 additions and 3 deletions

View File

@@ -10,6 +10,7 @@ import cn.lihongjie.coal.coalWashingDailyAnalysis.dto.CoalWashingDailyAnalysisDt
import cn.lihongjie.coal.coalWashingDailyAnalysis.dto.CreateCoalWashingDailyAnalysisDto;
import cn.lihongjie.coal.coalWashingDailyAnalysis.dto.UpdateCoalWashingDailyAnalysisDto;
import cn.lihongjie.coal.coalWashingDailyAnalysis.service.CoalWashingDailyAnalysisService;
import cn.lihongjie.coal.sse.service.SseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
@@ -17,6 +18,7 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RequestMapping("/coalWashingDailyAnalysis")
@RestController
@@ -82,10 +84,20 @@ public class CoalWashingDailyAnalysisController extends BaseController {
return this.service.getById(dto.getId());
}
@PostMapping("/report")
public Object report(@RequestBody CoalWashingDailyAnalysisCoalReportRequest request) {
return this.service.report(request);
}
@Autowired SseService sseService;
@PostMapping("/sse")
public SseEmitter sse(@RequestBody IdRequest request) {
SseEmitter emitter = new SseEmitter();
sseService.register("coalWashingDailyAnalysis." + request.getId(), emitter);
return emitter;
}
}

View File

@@ -16,8 +16,10 @@ import cn.lihongjie.coal.common.FreeMakerUtils;
import cn.lihongjie.coal.common.JpaUtils;
import cn.lihongjie.coal.exception.BizException;
import cn.lihongjie.coal.product.service.ProductService;
import cn.lihongjie.coal.sse.service.SseService;
import com.google.common.base.CaseFormat;
import com.google.common.collect.ImmutableMap;
import jakarta.annotation.PostConstruct;
import jakarta.persistence.EntityManager;
@@ -103,6 +105,8 @@ public class CoalWashingDailyAnalysisService
}
}
@Autowired SseService sseService;
public CoalWashingDailyAnalysisDto update(UpdateCoalWashingDailyAnalysisDto request) {
CoalWashingDailyAnalysisEntity entity = this.repository.get(request.getId());
@@ -121,6 +125,8 @@ public class CoalWashingDailyAnalysisService
}
this.repository.save(entity);
sseService.broadcast("coalWashingDailyAnalysis." + entity.getId(), ImmutableMap.of("id", entity.getId(), "event", "update"));
return getById(entity.getId());
}
@@ -131,6 +137,13 @@ public class CoalWashingDailyAnalysisService
}
this.repository.deleteAllById(request.getIds());
for (String id : request.getIds()) {
sseService.broadcast("coalWashingDailyAnalysis." + id, ImmutableMap.of("id", id, "event", "delete"));
}
}
public CoalWashingDailyAnalysisDto getById(String id) {

View File

@@ -95,4 +95,11 @@ public class RequestUtils {
return request.getRequestURI().replace(contextPath1, "/");
}
}
public static boolean isSse(HttpServletRequest request) {
return StringUtils.equalsIgnoreCase(request.getHeader("Accept"), "text/event-stream");
}
}

View File

@@ -146,7 +146,7 @@ public class AuthFilter extends OncePerRequestFilter {
stopWatch.start("hasResource");
boolean hasPermission = userService.hasResource(user, resource.get().getId());
boolean hasPermission = userService.hasResource(user, resource.get().getId()) && !RequestUtils.isSse(request);
stopWatch.stop();

View File

@@ -1,6 +1,7 @@
package cn.lihongjie.coal.filter;
import cn.lihongjie.coal.common.Constants;
import cn.lihongjie.coal.common.RequestUtils;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
@@ -28,6 +29,12 @@ public class CacheFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
if (RequestUtils.isSse(request)){
doFilter(request, response, filterChain);
return;
}
long start = System.currentTimeMillis();
HttpServletRequest req = request;

View File

@@ -53,6 +53,13 @@ public class RateLimitFilter extends OncePerRequestFilter {
HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
if (RequestUtils.isSse(request)){
doFilter(request, response, filterChain);
return;
}
String sessionId = request.getHeader(Constants.HTTP_HEADER_TOKEN);
String ip = RequestUtils.getIp(request);

View File

@@ -39,6 +39,10 @@ public class ResourceFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
if (RequestUtils.isSse(request)){
doFilter(request, response, filterChain);
}
Optional<ResourceDto> resource =
resourceService.findUrlFromCache2(RequestUtils.getRequestURI(request, contextPath));

View File

@@ -70,6 +70,11 @@ public class SignFilter extends OncePerRequestFilter {
return;
}
if (RequestUtils.isSse(request)){
doFilter(request, response, filterChain);
return;
}
if (request.getAttribute(Constants.HTTP_ATTR_RESOURCE) != null) {
if (BooleanUtils.isFalse(

View File

@@ -59,6 +59,11 @@ public class SubmitTokenFilter extends OncePerRequestFilter {
return;
}
if (RequestUtils.isSse(request)){
doFilter(request, response, filterChain);
return;
}
if (request.getAttribute(Constants.HTTP_ATTR_RESOURCE) != null) {
Boolean submitToken = ((ResourceDto) request.getAttribute(Constants.HTTP_ATTR_RESOURCE))
.getSubmitToken();

View File

@@ -11,6 +11,7 @@ import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
@@ -37,10 +38,15 @@ public class RResponseBodyAdvice implements ResponseBodyAdvice<Object> {
return body;
}
if (body == null) {
return R.success();
}
if (body instanceof SseEmitter){
return body;
}
if (body instanceof byte[]) {
return body;
}

View File

@@ -0,0 +1,69 @@
package cn.lihongjie.coal.sse.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@Service
@Slf4j
public class SseService {
@Autowired RedissonClient redissonClient;
@Autowired ObjectMapper objectMapper;
public void register(String topicId, SseEmitter emitter) {
RTopic topic = redissonClient.getTopic(topicId);
int addListener =
topic.addListener(
String.class,
(channel, msg) -> {
try {
emitter.send(msg);
log.info("{} send msg to client:{}", topicId, msg);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
emitter.onCompletion(
() -> {
topic.removeListener(addListener);
log.info("{} remove listener:{} onCompletion ", topicId, addListener);
});
emitter.onTimeout(
() -> {
topic.removeListener(addListener);
log.info("{} remove listener:{} onTimeout ", topicId, addListener);
});
emitter.onError(
(e) -> {
topic.removeListener(addListener);
log.info("{} remove listener:{} onError ", topicId, addListener, e);
});
}
@SneakyThrows
public void broadcast(String s, Object event) {
RTopic topic = redissonClient.getTopic(s);
topic.publish(event instanceof String ? event : objectMapper.writeValueAsString(event));
}
}