增加延时队列支持

This commit is contained in:
2024-03-29 14:12:16 +08:00
parent 8f8f0ca83d
commit f6fdbc5380
4 changed files with 103 additions and 5 deletions

View File

@@ -12,6 +12,7 @@ import cn.lihongjie.coal.coalParameterDef.repository.CoalParameterDefRepository;
import cn.lihongjie.coal.common.Ctx;
import cn.lihongjie.coal.common.GroovyScriptUtils;
import cn.lihongjie.coal.organization.service.OrganizationService;
import cn.lihongjie.coal.rabbitmq.RabbitMQConfiguration;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -189,7 +190,7 @@ public class CoalParameterDefService
durable = "true"),
exchange =
@org.springframework.amqp.rabbit.annotation.Exchange(
value = "sysExchange", declare = Exchange.FALSE),
value = RabbitMQConfiguration.SYS_EXCHANGE, declare = Exchange.FALSE),
key = "organization.create")
})
public void onOrganizationCreate(String orgId) {

View File

@@ -1,7 +1,9 @@
package cn.lihongjie.coal.rabbitmq;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.*;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.convert.DurationStyle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -12,9 +14,61 @@ import java.util.*;
public class RabbitMQConfiguration {
public static final String SYS_EXCHANGE = "sysExchange";
public static final String SYS_DELAY_EXCHANGE = "sysDelayExchange";
public static final String DELAY_QUEUE = "delayQueue";
public static final String[] DELAY_QUEUES =
new String[] {
"1s", "5s", "10s", "30s", "1m", "2m", "3m", "4m", "5m", "10m", "15m", "20m", "25m",
"30m", "35m", "40m", "45m", "50m", "55m", "1h", "2h", "3h", "4h", "5h", "6h", "12h",
"1d"
};
public static final Long[] DELAY_QUEUES_TIME = Arrays.stream(DELAY_QUEUES).map(x -> DurationStyle.detectAndParse(x).toMillis()).toArray(Long[]::new);
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("sysExchange", true, false);
TopicExchange sysExchange = new TopicExchange(SYS_EXCHANGE, true, false, new HashMap<>());
return sysExchange;
}
@Bean
public HeadersExchange delayExchange() {
HashMap<String, Object> arguments = new HashMap<>();
HeadersExchange sysDelayExchange =
new HeadersExchange(SYS_DELAY_EXCHANGE, true, false, arguments);
return sysDelayExchange;
}
@Bean
public Declarables delayQueues() {
List<Declarable> declarables = new ArrayList<>();
for (String delayQueue : DELAY_QUEUES) {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", SYS_EXCHANGE);
arguments.put("x-message-ttl", DurationStyle.detectAndParse(delayQueue).toMillis());
declarables.add(new Queue("delayQueue." + delayQueue, true, false, false, arguments));
HashMap<String, Object> bindArgs = new HashMap<>();
bindArgs.put("x-match", "all");
bindArgs.put("x-delay-queue-name", delayQueue);
declarables.add(
new Binding(
"delayQueue." + delayQueue,
Binding.DestinationType.QUEUE,
SYS_DELAY_EXCHANGE,
null,
bindArgs));
}
return new Declarables(declarables);
}
}

View File

@@ -9,12 +9,43 @@ public class RabbitMQService {
@Autowired RabbitTemplate rabbitTemplate;
public void send(String exchange, String routingKey, String data){
public void send(String exchange, String routingKey, String data) {
rabbitTemplate.convertAndSend(exchange, routingKey, data);
}
public void sendToSysDelayExchange(String routingKey, String data, long delay) {
send(RabbitMQConfiguration.SYS_DELAY_EXCHANGE, routingKey, data, delay);
}
public void send(String exchange, String routingKey, String data, long delay) {
rabbitTemplate.convertAndSend(
exchange,
routingKey,
data,
message -> {
message.getMessageProperties()
.getHeaders()
.put("x-delay-queue-name", getBestDelayQueueName(delay));
return message;
});
}
private String getBestDelayQueueName(long delay) {
int length = RabbitMQConfiguration.DELAY_QUEUES_TIME.length;
for (int i = 0; i < length; i++) {
if (delay < (RabbitMQConfiguration.DELAY_QUEUES_TIME[i])) {
return RabbitMQConfiguration.DELAY_QUEUES[i == 0 ? 0 : i - 1];
}
}
throw new IllegalArgumentException("delay is too long: " + delay);
}
public void sendToSysExchange(String routingKey, String data) {
send("sysExchange", routingKey, data);
send(RabbitMQConfiguration.SYS_EXCHANGE, routingKey, data);
}
}

View File

@@ -5,6 +5,7 @@ import cn.lihongjie.coal.annotation.SysLog;
import cn.lihongjie.coal.base.controller.BaseController;
import cn.lihongjie.coal.base.dto.IdRequest;
import cn.lihongjie.coal.common.Ctx;
import cn.lihongjie.coal.rabbitmq.RabbitMQService;
import cn.lihongjie.coal.user.dto.UserDto;
import cn.lihongjie.coal.user.service.UserService;
@@ -22,15 +23,26 @@ public class LoginController extends BaseController {
@Autowired UserService userService;
@Autowired RabbitMQService rabbitMQService;
@PostMapping("/login")
@SysLog(action = "登录")
@Anonymous
public UserDto login(@RequestBody LoginDto dto) {
this.service.login(dto);
return userService.getById(Ctx.getUserId());
}
@PostMapping("/genCaptcha")
@Anonymous
public CaptchaDto genCaptcha() {