From 9201cec965e605c923829f75c06bc6279ac2360c Mon Sep 17 00:00:00 2001 From: lihongjie0209 Date: Sat, 6 Apr 2024 10:52:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E8=BD=A6=E8=BE=86=E9=A2=84?= =?UTF-8?q?=E7=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/AcAppointmentEntity.java | 13 +- .../entity/AcAppointmentViewEntity.java | 114 ++++++++++++++++ .../listener/AcAppointmentListener.java | 73 +++++------ .../mapper/AcAppointmentMapper.java | 7 +- .../AcAppointmentViewRepository.java | 9 ++ .../service/AcAppointmentService.java | 123 ++++++++++-------- .../cn/lihongjie/coal/aop/OrgScopeAop.java | 8 -- .../listener/DataCollectorListener.java | 10 +- .../service/DataCollectorService.java | 7 +- .../coal/rabbitmq/RabbitMQConfiguration.java | 50 +++++-- .../coal/rabbitmq/RabbitMQService.java | 92 ++++++++++--- src/main/resources/logback-spring.xml | 3 +- 12 files changed, 352 insertions(+), 157 deletions(-) create mode 100644 src/main/java/cn/lihongjie/coal/acAppointment/entity/AcAppointmentViewEntity.java create mode 100644 src/main/java/cn/lihongjie/coal/acAppointment/repository/AcAppointmentViewRepository.java diff --git a/src/main/java/cn/lihongjie/coal/acAppointment/entity/AcAppointmentEntity.java b/src/main/java/cn/lihongjie/coal/acAppointment/entity/AcAppointmentEntity.java index ed298e82..ede474e2 100644 --- a/src/main/java/cn/lihongjie/coal/acAppointment/entity/AcAppointmentEntity.java +++ b/src/main/java/cn/lihongjie/coal/acAppointment/entity/AcAppointmentEntity.java @@ -3,8 +3,6 @@ package cn.lihongjie.coal.acAppointment.entity; import cn.lihongjie.coal.acDevice.entity.AcDeviceEntity; import cn.lihongjie.coal.base.entity.OrgCommonEntity; -import io.hypersistence.utils.hibernate.type.array.ListArrayType; - import jakarta.persistence.Entity; import jakarta.persistence.ManyToOne; @@ -13,9 +11,6 @@ import lombok.Data; import org.hibernate.annotations.ColumnDefault; import org.hibernate.annotations.Comment; import org.hibernate.annotations.Formula; -import org.hibernate.annotations.Type; - -import java.util.List; @Data @Entity @@ -63,10 +58,10 @@ public class AcAppointmentEntity extends OrgCommonEntity { private java.time.LocalDateTime sendFinishTime; - - @Formula("(select array_agg(d.id) from t_ac_device_data d where d.plate_no = plate_no and d.pass_time between start_time and end_time)") - @Type(ListArrayType.class) - private List acDeviceDataIds; +// +// @Formula("(select array_agg(d.id) from t_ac_device_data d where d.plate_no = plate_no and d.pass_time between start_time and end_time)") +// @Type(ListArrayType.class) +// private List acDeviceDataIds; diff --git a/src/main/java/cn/lihongjie/coal/acAppointment/entity/AcAppointmentViewEntity.java b/src/main/java/cn/lihongjie/coal/acAppointment/entity/AcAppointmentViewEntity.java new file mode 100644 index 00000000..9b0c9cc8 --- /dev/null +++ b/src/main/java/cn/lihongjie/coal/acAppointment/entity/AcAppointmentViewEntity.java @@ -0,0 +1,114 @@ +package cn.lihongjie.coal.acAppointment.entity; + +import cn.lihongjie.coal.acDevice.entity.AcDeviceEntity; +import cn.lihongjie.coal.base.entity.OrgCommonEntity; + +import io.hypersistence.utils.hibernate.type.array.ListArrayType; + +import jakarta.persistence.Entity; +import jakarta.persistence.ManyToOne; + +import lombok.Data; + +import org.hibernate.annotations.*; + +import java.util.List; + +@Data +@Entity +@Subselect("select * from t_ac_appointment") +public class AcAppointmentViewEntity extends OrgCommonEntity { + + @ManyToOne + private AcDeviceEntity device; + + @Comment("预约开始时间") + private java.time.LocalDateTime startTime; + + @Comment("预约结束时间") + private java.time.LocalDateTime endTime; + + @Comment("车牌号") + private String plateNo; + + @Comment("姓名") + private String driverName; + + @Comment("联系电话") + private String driverPhone; + + @Comment("事由") + private String reason; + + + + @Comment("下发时间") + private java.time.LocalDateTime sendTime; + + @Comment("下发状态") + private String sendStatus; + + @Formula( + "(select i.name\n" + + "from t_dictionary d,\n" + + " t_dictionary_item i\n" + + "where d.id = i.dictionary_id\n" + + " and d.code = 'common.step.status'\n" + + " and i.code = send_status)") + private String sendStatusName; + + @Comment("下发完成时间") + private java.time.LocalDateTime sendFinishTime; + + + + @Formula("(select array_agg(d.id) from t_ac_device_data d where d.plate_no = plate_no and d.pass_time between start_time and end_time and d.device_id = device_id)") + @Type(ListArrayType.class) + private List acDeviceDataIds; + + + + @Comment("删除时间") + private java.time.LocalDateTime deleteTime; + + + @Comment("下发状态") + private String deleteStatus; + + @Formula( + "(select i.name\n" + + "from t_dictionary d,\n" + + " t_dictionary_item i\n" + + "where d.id = i.dictionary_id\n" + + " and d.code = 'common.step.status'\n" + + " and i.code = delete_status)") + private String deleteStatusName; + + @Comment("删除完成时间") + private java.time.LocalDateTime deleteFinishTime; + + @Comment("预约状态") + private String appointmentStatus; + + @Formula( + "(select i.name\n" + + "from t_dictionary d,\n" + + " t_dictionary_item i\n" + + "where d.id = i.dictionary_id\n" + + " and d.code = 'appointment.status'\n" + + " and i.code = appointment_status)") + private String appointmentStatusName; + + @Comment("归档状态") + @ColumnDefault("'0'") + private String archiveStatus = "0"; + + @Formula( + "(select i.name\n" + + "from t_dictionary d,\n" + + " t_dictionary_item i\n" + + "where d.id = i.dictionary_id\n" + + " and d.code = 'archiveStatus'\n" + + " and i.code = archive_status)") + private String archiveStatusName; +} diff --git a/src/main/java/cn/lihongjie/coal/acAppointment/listener/AcAppointmentListener.java b/src/main/java/cn/lihongjie/coal/acAppointment/listener/AcAppointmentListener.java index 818644f4..20f50de4 100644 --- a/src/main/java/cn/lihongjie/coal/acAppointment/listener/AcAppointmentListener.java +++ b/src/main/java/cn/lihongjie/coal/acAppointment/listener/AcAppointmentListener.java @@ -18,12 +18,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -93,7 +92,7 @@ public class AcAppointmentListener { map.put("matchedhandling", "0"); map.put("timestamp", LocalDateTime.now().format(RabbitMQConfiguration.DATE_TIME_FORMATTER)); map.put("reserve1", ""); - map.put("reserve2", appointment.getId()); + map.put("reserve2", ""); map.put("parkpermission", "1,2,3,4,5"); map.put("vehicleinfo_cardtype", "0"); map.put("groupbelonged", "0"); @@ -130,10 +129,12 @@ public class AcAppointmentListener { declare = Exchange.FALSE), key = "acAppointment.*") }) - @Transactional - public void handlePmsMessage(String body, @Headers Map headers) { + @Transactional() + public void handlePmsMessage(Message message) { - var rk = headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY).toString(); + String body = new String(message.getBody()); + + var rk = message.getMessageProperties().getReceivedRoutingKey(); switch (rk) { case "acAppointment.create": @@ -169,8 +170,9 @@ public class AcAppointmentListener { String appointId = node.get("appointId").asText(); String eventType = node.get("eventType").asText(); String status = node.get("status").asText(); - LocalDateTime time = LocalDateTime.parse(node.get("time").asText(), RabbitMQConfiguration.DATE_TIME_FORMATTER); - + LocalDateTime time = + LocalDateTime.parse( + node.get("time").asText(), RabbitMQConfiguration.DATE_TIME_FORMATTER); AcAppointmentEntity appointment; try { @@ -182,60 +184,39 @@ public class AcAppointmentListener { return; } - if (StringUtils.equals(status, "3")){ - log.info("{} fail: {}\n{}", eventType, appointId, node.get("message").asText()); - }else { + if (StringUtils.equals(status, "3")) { + log.info("{} fail: {}\n{}", eventType, appointId, node.get("message").asText()); + } else { - log.info("{} success: {}", eventType, appointId); + log.info("{} success: {}", eventType, appointId); } - - switch (eventType){ - - - case "acAppointment.create" ->{ - - + switch (eventType) { + case "acAppointment.create" -> { appointment.setSendFinishTime(time); appointment.setSendStatus(status); - - - } - - case "acAppointment.update" ->{ - + case "acAppointment.update" -> { appointment.setSendFinishTime(time); appointment.setSendStatus(status); - } - case "acAppointment.delete" ->{ - - + case "acAppointment.delete" -> { log.info("appointment delete: {}", appointId); - } - case "acAppointment.cancel" ->{ - + case "acAppointment.cancel" -> { appointment.setDeleteFinishTime(time); appointment.setDeleteStatus(status); - } - case "acAppointment.expire" ->{ - + case "acAppointment.expire" -> { appointment.setDeleteFinishTime(time); appointment.setDeleteStatus(status); - } - } - this.acAppointmentService.save(appointment); - } @SneakyThrows @@ -274,6 +255,11 @@ public class AcAppointmentListener { String queueName = "pms.client." + appKey; + AcAppointmentEntity app = acAppointmentService.get(id); + app.setDeleteStatus("1"); + app.setDeleteTime(LocalDateTime.now()); + app.setDeleteFinishTime(null); + acAppointmentService.save(app); rabbitMQService.send("", queueName, map, new HashMap<>()); } } @@ -294,6 +280,11 @@ public class AcAppointmentListener { String queueName = "pms.client." + appKey; + AcAppointmentEntity app = acAppointmentService.get(id); + app.setDeleteStatus("1"); + app.setDeleteTime(LocalDateTime.now()); + app.setDeleteFinishTime(null); + acAppointmentService.save(app); rabbitMQService.send("", queueName, map, new HashMap<>()); } } @@ -346,7 +337,9 @@ public class AcAppointmentListener { private void handleCreate(String body) { - AcAppointmentEntity appointment = acAppointmentService.get(body); + AcAppointmentEntity appointment = null; + + appointment = acAppointmentService.get(body); if (StringUtils.isEmpty(appointment.getSendStatus()) || "0".equals(appointment.getSendStatus())) { diff --git a/src/main/java/cn/lihongjie/coal/acAppointment/mapper/AcAppointmentMapper.java b/src/main/java/cn/lihongjie/coal/acAppointment/mapper/AcAppointmentMapper.java index ad6a6d0b..44c606aa 100644 --- a/src/main/java/cn/lihongjie/coal/acAppointment/mapper/AcAppointmentMapper.java +++ b/src/main/java/cn/lihongjie/coal/acAppointment/mapper/AcAppointmentMapper.java @@ -4,6 +4,7 @@ import cn.lihongjie.coal.acAppointment.dto.AcAppointmentDto; import cn.lihongjie.coal.acAppointment.dto.CreateAcAppointmentDto; import cn.lihongjie.coal.acAppointment.dto.UpdateAcAppointmentDto; import cn.lihongjie.coal.acAppointment.entity.AcAppointmentEntity; +import cn.lihongjie.coal.acAppointment.entity.AcAppointmentViewEntity; import cn.lihongjie.coal.base.mapper.BaseMapper; import cn.lihongjie.coal.base.mapper.CommonEntityMapper; import cn.lihongjie.coal.base.mapper.CommonMapper; @@ -20,4 +21,8 @@ public interface AcAppointmentMapper AcAppointmentEntity, AcAppointmentDto, CreateAcAppointmentDto, - UpdateAcAppointmentDto> {} + UpdateAcAppointmentDto> { + + AcAppointmentDto toDto(AcAppointmentViewEntity entity); + +} diff --git a/src/main/java/cn/lihongjie/coal/acAppointment/repository/AcAppointmentViewRepository.java b/src/main/java/cn/lihongjie/coal/acAppointment/repository/AcAppointmentViewRepository.java new file mode 100644 index 00000000..b2f990f2 --- /dev/null +++ b/src/main/java/cn/lihongjie/coal/acAppointment/repository/AcAppointmentViewRepository.java @@ -0,0 +1,9 @@ +package cn.lihongjie.coal.acAppointment.repository; + +import cn.lihongjie.coal.acAppointment.entity.AcAppointmentViewEntity; +import cn.lihongjie.coal.base.dao.BaseRepository; + +import org.springframework.stereotype.Repository; + +@Repository +public interface AcAppointmentViewRepository extends BaseRepository {} diff --git a/src/main/java/cn/lihongjie/coal/acAppointment/service/AcAppointmentService.java b/src/main/java/cn/lihongjie/coal/acAppointment/service/AcAppointmentService.java index eaf846a9..79bdbac9 100644 --- a/src/main/java/cn/lihongjie/coal/acAppointment/service/AcAppointmentService.java +++ b/src/main/java/cn/lihongjie/coal/acAppointment/service/AcAppointmentService.java @@ -5,8 +5,10 @@ import cn.lihongjie.coal.acAppointment.dto.BatchCreateAcAppointmentDto; import cn.lihongjie.coal.acAppointment.dto.CreateAcAppointmentDto; import cn.lihongjie.coal.acAppointment.dto.UpdateAcAppointmentDto; import cn.lihongjie.coal.acAppointment.entity.AcAppointmentEntity; +import cn.lihongjie.coal.acAppointment.entity.AcAppointmentViewEntity; import cn.lihongjie.coal.acAppointment.mapper.AcAppointmentMapper; import cn.lihongjie.coal.acAppointment.repository.AcAppointmentRepository; +import cn.lihongjie.coal.acAppointment.repository.AcAppointmentViewRepository; import cn.lihongjie.coal.base.dto.CommonQuery; import cn.lihongjie.coal.base.dto.IdRequest; import cn.lihongjie.coal.base.service.BaseService; @@ -32,10 +34,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; @Service @@ -49,6 +48,9 @@ public class AcAppointmentService @Autowired private ConversionService conversionService; @Autowired private DbFunctionService dbFunctionService; + @Autowired + private AcAppointmentViewRepository viewRepository; + public AcAppointmentDto create(CreateAcAppointmentDto request) { AcAppointmentEntity entity = mapper.toEntity(request); @@ -57,7 +59,8 @@ public class AcAppointmentService entity.setDeleteStatus("0"); this.repository.save(entity); - rabbitMQService.sendToSysExchange("acAppointment.create", entity.getId(), new HashMap()); + rabbitMQService.sendToSysExchange( + "acAppointment.create", entity.getId(), new HashMap()); return getById(entity.getId()); } @@ -99,7 +102,8 @@ public class AcAppointmentService this.mapper.updateEntity(entity, request); this.repository.save(entity); - rabbitMQService.sendToSysExchange("acAppointment.update", entity.getId(), new HashMap()); + rabbitMQService.sendToSysExchange( + "acAppointment.update", entity.getId(), new HashMap()); return getById(entity.getId()); } @@ -116,23 +120,24 @@ public class AcAppointmentService List all = this.repository.findAllById(request.getIds()); - List> payload = all.stream() - .map( - x -> { - return Map.of( - "id", - x.getId(), - "appKey", - x.getDevice().getDataCollector().getAppKey()); - }).collect(Collectors.toList()); + List> payload = + all.stream() + .map( + x -> { + return Map.of( + "id", + x.getId(), + "appKey", + x.getDevice().getDataCollector().getAppKey()); + }) + .collect(Collectors.toList()); - - rabbitMQService.sendToSysExchange("acAppointment.delete", payload, new HashMap()); + rabbitMQService.sendToSysExchange( + "acAppointment.delete", payload, new HashMap()); this.repository.deleteAllById(request.getIds()); } - public void cancel(IdRequest request) { ArchiveUtils.checkArchiveStatus( this.repository::findAllById, @@ -145,48 +150,53 @@ public class AcAppointmentService List all = this.repository.findAllById(request.getIds()); - List> payload = all.stream() - .map( - x -> { - return Map.of( - "id", - x.getId(), - "appKey", - x.getDevice().getDataCollector().getAppKey()); - }).collect(Collectors.toList()); + List> payload = + all.stream() + .map( + x -> { + return Map.of( + "id", + x.getId(), + "appKey", + x.getDevice().getDataCollector().getAppKey()); + }) + .collect(Collectors.toList()); all.forEach(x -> x.setAppointmentStatus("1")); all.forEach(x -> x.setArchiveStatus("1")); this.repository.saveAll(all); - rabbitMQService.sendToSysExchange("acAppointment.cancel", payload, new HashMap()); - - - - + rabbitMQService.sendToSysExchange( + "acAppointment.cancel", payload, new HashMap()); } + public void expire() { - public void expire(){ + List all = + this.repository.findAll( + new Specification() { + @Override + public Predicate toPredicate( + Root root, + CriteriaQuery query, + CriteriaBuilder criteriaBuilder) { + return criteriaBuilder.and( + criteriaBuilder.lessThan( + root.get("endTime"), LocalDateTime.now()), + criteriaBuilder.equal(root.get("appointmentStatus"), "0")); + } + }); - - List all = this.repository.findAll(new Specification() { - @Override - public Predicate toPredicate(Root root, CriteriaQuery query, CriteriaBuilder criteriaBuilder) { - return criteriaBuilder.and( - criteriaBuilder.lessThan(root.get("endTime"), LocalDateTime.now()), criteriaBuilder.equal(root.get("appointmentStatus"), "0")) - ; - } - }); - - List> payload = all.stream() - .map( - x -> { - return Map.of( - "id", - x.getId(), - "appKey", - x.getDevice().getDataCollector().getAppKey()); - }).collect(Collectors.toList()); + List> payload = + all.stream() + .map( + x -> { + return Map.of( + "id", + x.getId(), + "appKey", + x.getDevice().getDataCollector().getAppKey()); + }) + .collect(Collectors.toList()); all.forEach(x -> x.setAppointmentStatus("2")); @@ -194,9 +204,8 @@ public class AcAppointmentService this.repository.saveAll(all); - rabbitMQService.sendToSysExchange("acAppointment.expire", payload, new HashMap()); - - + rabbitMQService.sendToSysExchange( + "acAppointment.expire", payload, new HashMap()); } public AcAppointmentDto getById(String id) { @@ -206,8 +215,8 @@ public class AcAppointmentService } public Page list(CommonQuery query) { - Page page = - repository.findAll( + Page page = + viewRepository.findAll( query.specification(conversionService), PageRequest.of( query.getPageNo(), diff --git a/src/main/java/cn/lihongjie/coal/aop/OrgScopeAop.java b/src/main/java/cn/lihongjie/coal/aop/OrgScopeAop.java index 002904a8..e4bc04cd 100644 --- a/src/main/java/cn/lihongjie/coal/aop/OrgScopeAop.java +++ b/src/main/java/cn/lihongjie/coal/aop/OrgScopeAop.java @@ -14,7 +14,6 @@ import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; -import org.aspectj.lang.annotation.Before; import org.aspectj.lang.reflect.MethodSignature; import org.hibernate.Filter; import org.hibernate.Session; @@ -78,12 +77,5 @@ public class OrgScopeAop { return pjp.proceed(); } - @Before(value = "@annotation(org.springframework.transaction.annotation.Transactional))") - public void beforeTransactionMethod() { - if (orgScope.get() != null) { - - Session session = entityManager.unwrap(Session.class); - } - } } diff --git a/src/main/java/cn/lihongjie/coal/dataCollector/listener/DataCollectorListener.java b/src/main/java/cn/lihongjie/coal/dataCollector/listener/DataCollectorListener.java index a0ea7b75..44c301aa 100644 --- a/src/main/java/cn/lihongjie/coal/dataCollector/listener/DataCollectorListener.java +++ b/src/main/java/cn/lihongjie/coal/dataCollector/listener/DataCollectorListener.java @@ -11,12 +11,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.HmacAlgorithms; import org.apache.commons.codec.digest.HmacUtils; import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -45,8 +44,11 @@ public class DataCollectorListener { key = "dataCollector.*") }) @Transactional - public void handleDataCollectorMessage(String body, @Headers Map headers) { + public void handleDataCollectorMessage(Message message) { + var body = new String(message.getBody(), java.nio.charset.StandardCharsets.UTF_8); + var headers = message.getMessageProperties().getHeaders(); + Object key = headers.get("appKey"); if (key == null) { @@ -69,7 +71,7 @@ public class DataCollectorListener { log.setDataCollector(dataCollector); log.setOrganizationId(dataCollector.getOrganizationId()); log.setLogTime(LocalDateTime.now()); - Object rk = headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY); + Object rk = message.getMessageProperties().getReceivedRoutingKey(); switch (rk.toString()) { case "dataCollector.online": diff --git a/src/main/java/cn/lihongjie/coal/dataCollector/service/DataCollectorService.java b/src/main/java/cn/lihongjie/coal/dataCollector/service/DataCollectorService.java index 6e9900a9..a5248fa4 100644 --- a/src/main/java/cn/lihongjie/coal/dataCollector/service/DataCollectorService.java +++ b/src/main/java/cn/lihongjie/coal/dataCollector/service/DataCollectorService.java @@ -15,7 +15,6 @@ import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.Nullable; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Queue; @@ -48,9 +47,9 @@ public class DataCollectorService @PostConstruct public void init() { - findAll().stream() - .filter(x -> StringUtils.isNotEmpty(x.getAppKey())) - .forEach(this::createQueue); +// findAll().stream() +// .filter(x -> StringUtils.isNotEmpty(x.getAppKey())) +// .forEach(this::createQueue); } public DataCollectorDto create(CreateDataCollectorDto request) { diff --git a/src/main/java/cn/lihongjie/coal/rabbitmq/RabbitMQConfiguration.java b/src/main/java/cn/lihongjie/coal/rabbitmq/RabbitMQConfiguration.java index 2e590ae5..0cfc9259 100644 --- a/src/main/java/cn/lihongjie/coal/rabbitmq/RabbitMQConfiguration.java +++ b/src/main/java/cn/lihongjie/coal/rabbitmq/RabbitMQConfiguration.java @@ -8,6 +8,9 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConversionException; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.RabbitTemplateCustomizer; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; @@ -22,7 +25,6 @@ import java.util.*; @Configuration public class RabbitMQConfiguration { - public static final String SYS_EXCHANGE = "sysExchange"; public static final String SYS_DELAY_EXCHANGE = "sysDelayExchange"; public static final String DELAY_QUEUE = "delayQueue"; @@ -34,21 +36,23 @@ public class RabbitMQConfiguration { "1d" }; - public static final Long[] DELAY_QUEUES_TIME = Arrays.stream(DELAY_QUEUES).map(x -> DurationStyle.detectAndParse(x).toMillis()).toArray(Long[]::new); - public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + public static final Long[] DELAY_QUEUES_TIME = + Arrays.stream(DELAY_QUEUES) + .map(x -> DurationStyle.detectAndParse(x).toMillis()) + .toArray(Long[]::new); + public static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private ObjectMapper objectMapper; - - @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( - ConnectionFactory connectionFactory, + ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { - SimpleRabbitListenerContainerFactory factory = - new SimpleRabbitListenerContainerFactory(); + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setDefaultRequeueRejected(false); + factory.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper())); return factory; } @@ -56,11 +60,34 @@ public class RabbitMQConfiguration { RabbitTemplateCustomizer rabbitTemplateCustomizer() { return template -> { - template.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper)); + Jackson2JsonMessageConverter messageConverter = + new Jackson2JsonMessageConverter(objectMapper); + + SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter(); + + template.setMessageConverter( + new MessageConverter() { + @Override + public Message toMessage(Object object, MessageProperties messageProperties) + throws MessageConversionException { + + if (object instanceof String || object instanceof byte[]) { + return simpleMessageConverter.toMessage(object, messageProperties); + } + + + return messageConverter.toMessage(object, messageProperties); + } + + @Override + public Object fromMessage(Message message) + throws MessageConversionException { + return messageConverter.fromMessage(message); + } + }); }; } - @Bean public TopicExchange topicExchange() { TopicExchange sysExchange = new TopicExchange(SYS_EXCHANGE, true, false, new HashMap<>()); @@ -68,7 +95,6 @@ public class RabbitMQConfiguration { return sysExchange; } - @Bean public HeadersExchange delayExchange() { HashMap arguments = new HashMap<>(); @@ -100,8 +126,6 @@ public class RabbitMQConfiguration { bindArgs)); } - - return new Declarables(declarables); } } diff --git a/src/main/java/cn/lihongjie/coal/rabbitmq/RabbitMQService.java b/src/main/java/cn/lihongjie/coal/rabbitmq/RabbitMQService.java index 92ccfbe0..501f487b 100644 --- a/src/main/java/cn/lihongjie/coal/rabbitmq/RabbitMQService.java +++ b/src/main/java/cn/lihongjie/coal/rabbitmq/RabbitMQService.java @@ -1,30 +1,74 @@ package cn.lihongjie.coal.rabbitmq; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; + +import lombok.extern.slf4j.Slf4j; + import org.apache.commons.collections4.MapUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; import java.util.HashMap; import java.util.Map; @Service +@Slf4j public class RabbitMQService { @Autowired RabbitTemplate rabbitTemplate; + @PersistenceContext EntityManager em; + + private static void execCommand(Runnable command) { + if (TransactionSynchronizationManager.isActualTransactionActive()) { + + TransactionSynchronizationManager.registerSynchronization( + new TransactionSynchronization() { + + @Override + public void afterCompletion(int status) { + if (status == STATUS_COMMITTED){ + command.run(); + }else { + + log.info("transaction is not committed, skip send message"); + + } + } + }); + } else { + command.run(); + } + } + public void send(String exchange, String routingKey, Object data, Map headers) { - rabbitTemplate.convertAndSend( - exchange, - routingKey, - data, - message -> { - if (MapUtils.isNotEmpty(headers)) - message.getMessageProperties().getHeaders().putAll(headers); - return message; - }); + Runnable command = + () -> { + log.info("send to exchange: {} {}", exchange, routingKey); +// try { +// Thread.sleep(1000); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } + log.info("send to exchange: {} {}", exchange, routingKey); + rabbitTemplate.convertAndSend( + exchange, + routingKey, + data, + message -> { + if (MapUtils.isNotEmpty(headers)) + message.getMessageProperties().getHeaders().putAll(headers); + return message; + }); + }; + + execCommand(command); } public void sendToSysDelayExchange(String routingKey, Object data, long delay) { @@ -33,16 +77,22 @@ public class RabbitMQService { public void send(String exchange, String routingKey, Object data, long delay) { - rabbitTemplate.convertAndSend( - exchange, - routingKey, - data, - message -> { - message.getMessageProperties() - .getHeaders() - .put("x-delay-queue-name", getBestDelayQueueName(delay)); - return message; - }); + Runnable command = + () -> + rabbitTemplate.convertAndSend( + exchange, + routingKey, + data, + message -> { + message.getMessageProperties() + .getHeaders() + .put( + "x-delay-queue-name", + getBestDelayQueueName(delay)); + return message; + }); + + execCommand(command); } private String getBestDelayQueueName(long delay) { @@ -54,7 +104,7 @@ public class RabbitMQService { return RabbitMQConfiguration.DELAY_QUEUES[i == 0 ? 0 : i - 1]; } } - + throw new IllegalArgumentException("delay is too long: " + delay); } @@ -63,6 +113,8 @@ public class RabbitMQService { } public void sendToSysExchange(String routingKey, Object data, Map headers) { + log.info("send to sys exchange: {}", routingKey); + send(RabbitMQConfiguration.SYS_EXCHANGE, routingKey, data, headers); } } diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index 214bc722..fa57e878 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -9,7 +9,8 @@ - + +