mirror of
https://codeup.aliyun.com/64f7d6b8ce01efaafef1e678/coal/coal.git
synced 2026-01-25 07:46:40 +08:00
完善车辆预约
This commit is contained in:
@@ -3,8 +3,6 @@ package cn.lihongjie.coal.acAppointment.entity;
|
||||
import cn.lihongjie.coal.acDevice.entity.AcDeviceEntity;
|
||||
import cn.lihongjie.coal.base.entity.OrgCommonEntity;
|
||||
|
||||
import io.hypersistence.utils.hibernate.type.array.ListArrayType;
|
||||
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.ManyToOne;
|
||||
|
||||
@@ -13,9 +11,6 @@ import lombok.Data;
|
||||
import org.hibernate.annotations.ColumnDefault;
|
||||
import org.hibernate.annotations.Comment;
|
||||
import org.hibernate.annotations.Formula;
|
||||
import org.hibernate.annotations.Type;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@Entity
|
||||
@@ -63,10 +58,10 @@ public class AcAppointmentEntity extends OrgCommonEntity {
|
||||
private java.time.LocalDateTime sendFinishTime;
|
||||
|
||||
|
||||
|
||||
@Formula("(select array_agg(d.id) from t_ac_device_data d where d.plate_no = plate_no and d.pass_time between start_time and end_time)")
|
||||
@Type(ListArrayType.class)
|
||||
private List<String> acDeviceDataIds;
|
||||
//
|
||||
// @Formula("(select array_agg(d.id) from t_ac_device_data d where d.plate_no = plate_no and d.pass_time between start_time and end_time)")
|
||||
// @Type(ListArrayType.class)
|
||||
// private List<String> acDeviceDataIds;
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
package cn.lihongjie.coal.acAppointment.entity;
|
||||
|
||||
import cn.lihongjie.coal.acDevice.entity.AcDeviceEntity;
|
||||
import cn.lihongjie.coal.base.entity.OrgCommonEntity;
|
||||
|
||||
import io.hypersistence.utils.hibernate.type.array.ListArrayType;
|
||||
|
||||
import jakarta.persistence.Entity;
|
||||
import jakarta.persistence.ManyToOne;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import org.hibernate.annotations.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@Entity
|
||||
@Subselect("select * from t_ac_appointment")
|
||||
public class AcAppointmentViewEntity extends OrgCommonEntity {
|
||||
|
||||
@ManyToOne
|
||||
private AcDeviceEntity device;
|
||||
|
||||
@Comment("预约开始时间")
|
||||
private java.time.LocalDateTime startTime;
|
||||
|
||||
@Comment("预约结束时间")
|
||||
private java.time.LocalDateTime endTime;
|
||||
|
||||
@Comment("车牌号")
|
||||
private String plateNo;
|
||||
|
||||
@Comment("姓名")
|
||||
private String driverName;
|
||||
|
||||
@Comment("联系电话")
|
||||
private String driverPhone;
|
||||
|
||||
@Comment("事由")
|
||||
private String reason;
|
||||
|
||||
|
||||
|
||||
@Comment("下发时间")
|
||||
private java.time.LocalDateTime sendTime;
|
||||
|
||||
@Comment("下发状态")
|
||||
private String sendStatus;
|
||||
|
||||
@Formula(
|
||||
"(select i.name\n"
|
||||
+ "from t_dictionary d,\n"
|
||||
+ " t_dictionary_item i\n"
|
||||
+ "where d.id = i.dictionary_id\n"
|
||||
+ " and d.code = 'common.step.status'\n"
|
||||
+ " and i.code = send_status)")
|
||||
private String sendStatusName;
|
||||
|
||||
@Comment("下发完成时间")
|
||||
private java.time.LocalDateTime sendFinishTime;
|
||||
|
||||
|
||||
|
||||
@Formula("(select array_agg(d.id) from t_ac_device_data d where d.plate_no = plate_no and d.pass_time between start_time and end_time and d.device_id = device_id)")
|
||||
@Type(ListArrayType.class)
|
||||
private List<String> acDeviceDataIds;
|
||||
|
||||
|
||||
|
||||
@Comment("删除时间")
|
||||
private java.time.LocalDateTime deleteTime;
|
||||
|
||||
|
||||
@Comment("下发状态")
|
||||
private String deleteStatus;
|
||||
|
||||
@Formula(
|
||||
"(select i.name\n"
|
||||
+ "from t_dictionary d,\n"
|
||||
+ " t_dictionary_item i\n"
|
||||
+ "where d.id = i.dictionary_id\n"
|
||||
+ " and d.code = 'common.step.status'\n"
|
||||
+ " and i.code = delete_status)")
|
||||
private String deleteStatusName;
|
||||
|
||||
@Comment("删除完成时间")
|
||||
private java.time.LocalDateTime deleteFinishTime;
|
||||
|
||||
@Comment("预约状态")
|
||||
private String appointmentStatus;
|
||||
|
||||
@Formula(
|
||||
"(select i.name\n"
|
||||
+ "from t_dictionary d,\n"
|
||||
+ " t_dictionary_item i\n"
|
||||
+ "where d.id = i.dictionary_id\n"
|
||||
+ " and d.code = 'appointment.status'\n"
|
||||
+ " and i.code = appointment_status)")
|
||||
private String appointmentStatusName;
|
||||
|
||||
@Comment("归档状态")
|
||||
@ColumnDefault("'0'")
|
||||
private String archiveStatus = "0";
|
||||
|
||||
@Formula(
|
||||
"(select i.name\n"
|
||||
+ "from t_dictionary d,\n"
|
||||
+ " t_dictionary_item i\n"
|
||||
+ "where d.id = i.dictionary_id\n"
|
||||
+ " and d.code = 'archiveStatus'\n"
|
||||
+ " and i.code = archive_status)")
|
||||
private String archiveStatusName;
|
||||
}
|
||||
@@ -18,12 +18,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.springframework.amqp.core.AmqpAdmin;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.handler.annotation.Headers;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@@ -93,7 +92,7 @@ public class AcAppointmentListener {
|
||||
map.put("matchedhandling", "0");
|
||||
map.put("timestamp", LocalDateTime.now().format(RabbitMQConfiguration.DATE_TIME_FORMATTER));
|
||||
map.put("reserve1", "");
|
||||
map.put("reserve2", appointment.getId());
|
||||
map.put("reserve2", "");
|
||||
map.put("parkpermission", "1,2,3,4,5");
|
||||
map.put("vehicleinfo_cardtype", "0");
|
||||
map.put("groupbelonged", "0");
|
||||
@@ -130,10 +129,12 @@ public class AcAppointmentListener {
|
||||
declare = Exchange.FALSE),
|
||||
key = "acAppointment.*")
|
||||
})
|
||||
@Transactional
|
||||
public void handlePmsMessage(String body, @Headers Map<String, Object> headers) {
|
||||
@Transactional()
|
||||
public void handlePmsMessage(Message message) {
|
||||
|
||||
var rk = headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY).toString();
|
||||
String body = new String(message.getBody());
|
||||
|
||||
var rk = message.getMessageProperties().getReceivedRoutingKey();
|
||||
|
||||
switch (rk) {
|
||||
case "acAppointment.create":
|
||||
@@ -169,8 +170,9 @@ public class AcAppointmentListener {
|
||||
String appointId = node.get("appointId").asText();
|
||||
String eventType = node.get("eventType").asText();
|
||||
String status = node.get("status").asText();
|
||||
LocalDateTime time = LocalDateTime.parse(node.get("time").asText(), RabbitMQConfiguration.DATE_TIME_FORMATTER);
|
||||
|
||||
LocalDateTime time =
|
||||
LocalDateTime.parse(
|
||||
node.get("time").asText(), RabbitMQConfiguration.DATE_TIME_FORMATTER);
|
||||
|
||||
AcAppointmentEntity appointment;
|
||||
try {
|
||||
@@ -182,60 +184,39 @@ public class AcAppointmentListener {
|
||||
return;
|
||||
}
|
||||
|
||||
if (StringUtils.equals(status, "3")){
|
||||
log.info("{} fail: {}\n{}", eventType, appointId, node.get("message").asText());
|
||||
}else {
|
||||
if (StringUtils.equals(status, "3")) {
|
||||
log.info("{} fail: {}\n{}", eventType, appointId, node.get("message").asText());
|
||||
} else {
|
||||
|
||||
log.info("{} success: {}", eventType, appointId);
|
||||
log.info("{} success: {}", eventType, appointId);
|
||||
}
|
||||
|
||||
|
||||
switch (eventType){
|
||||
|
||||
|
||||
case "acAppointment.create" ->{
|
||||
|
||||
|
||||
switch (eventType) {
|
||||
case "acAppointment.create" -> {
|
||||
appointment.setSendFinishTime(time);
|
||||
appointment.setSendStatus(status);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
case "acAppointment.update" ->{
|
||||
|
||||
case "acAppointment.update" -> {
|
||||
appointment.setSendFinishTime(time);
|
||||
appointment.setSendStatus(status);
|
||||
|
||||
}
|
||||
|
||||
case "acAppointment.delete" ->{
|
||||
|
||||
|
||||
case "acAppointment.delete" -> {
|
||||
log.info("appointment delete: {}", appointId);
|
||||
|
||||
}
|
||||
|
||||
case "acAppointment.cancel" ->{
|
||||
|
||||
case "acAppointment.cancel" -> {
|
||||
appointment.setDeleteFinishTime(time);
|
||||
appointment.setDeleteStatus(status);
|
||||
|
||||
}
|
||||
case "acAppointment.expire" ->{
|
||||
|
||||
case "acAppointment.expire" -> {
|
||||
appointment.setDeleteFinishTime(time);
|
||||
appointment.setDeleteStatus(status);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
this.acAppointmentService.save(appointment);
|
||||
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@@ -274,6 +255,11 @@ public class AcAppointmentListener {
|
||||
|
||||
String queueName = "pms.client." + appKey;
|
||||
|
||||
AcAppointmentEntity app = acAppointmentService.get(id);
|
||||
app.setDeleteStatus("1");
|
||||
app.setDeleteTime(LocalDateTime.now());
|
||||
app.setDeleteFinishTime(null);
|
||||
acAppointmentService.save(app);
|
||||
rabbitMQService.send("", queueName, map, new HashMap<>());
|
||||
}
|
||||
}
|
||||
@@ -294,6 +280,11 @@ public class AcAppointmentListener {
|
||||
|
||||
String queueName = "pms.client." + appKey;
|
||||
|
||||
AcAppointmentEntity app = acAppointmentService.get(id);
|
||||
app.setDeleteStatus("1");
|
||||
app.setDeleteTime(LocalDateTime.now());
|
||||
app.setDeleteFinishTime(null);
|
||||
acAppointmentService.save(app);
|
||||
rabbitMQService.send("", queueName, map, new HashMap<>());
|
||||
}
|
||||
}
|
||||
@@ -346,7 +337,9 @@ public class AcAppointmentListener {
|
||||
|
||||
private void handleCreate(String body) {
|
||||
|
||||
AcAppointmentEntity appointment = acAppointmentService.get(body);
|
||||
AcAppointmentEntity appointment = null;
|
||||
|
||||
appointment = acAppointmentService.get(body);
|
||||
|
||||
if (StringUtils.isEmpty(appointment.getSendStatus())
|
||||
|| "0".equals(appointment.getSendStatus())) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import cn.lihongjie.coal.acAppointment.dto.AcAppointmentDto;
|
||||
import cn.lihongjie.coal.acAppointment.dto.CreateAcAppointmentDto;
|
||||
import cn.lihongjie.coal.acAppointment.dto.UpdateAcAppointmentDto;
|
||||
import cn.lihongjie.coal.acAppointment.entity.AcAppointmentEntity;
|
||||
import cn.lihongjie.coal.acAppointment.entity.AcAppointmentViewEntity;
|
||||
import cn.lihongjie.coal.base.mapper.BaseMapper;
|
||||
import cn.lihongjie.coal.base.mapper.CommonEntityMapper;
|
||||
import cn.lihongjie.coal.base.mapper.CommonMapper;
|
||||
@@ -20,4 +21,8 @@ public interface AcAppointmentMapper
|
||||
AcAppointmentEntity,
|
||||
AcAppointmentDto,
|
||||
CreateAcAppointmentDto,
|
||||
UpdateAcAppointmentDto> {}
|
||||
UpdateAcAppointmentDto> {
|
||||
|
||||
AcAppointmentDto toDto(AcAppointmentViewEntity entity);
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
package cn.lihongjie.coal.acAppointment.repository;
|
||||
|
||||
import cn.lihongjie.coal.acAppointment.entity.AcAppointmentViewEntity;
|
||||
import cn.lihongjie.coal.base.dao.BaseRepository;
|
||||
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface AcAppointmentViewRepository extends BaseRepository<AcAppointmentViewEntity> {}
|
||||
@@ -5,8 +5,10 @@ import cn.lihongjie.coal.acAppointment.dto.BatchCreateAcAppointmentDto;
|
||||
import cn.lihongjie.coal.acAppointment.dto.CreateAcAppointmentDto;
|
||||
import cn.lihongjie.coal.acAppointment.dto.UpdateAcAppointmentDto;
|
||||
import cn.lihongjie.coal.acAppointment.entity.AcAppointmentEntity;
|
||||
import cn.lihongjie.coal.acAppointment.entity.AcAppointmentViewEntity;
|
||||
import cn.lihongjie.coal.acAppointment.mapper.AcAppointmentMapper;
|
||||
import cn.lihongjie.coal.acAppointment.repository.AcAppointmentRepository;
|
||||
import cn.lihongjie.coal.acAppointment.repository.AcAppointmentViewRepository;
|
||||
import cn.lihongjie.coal.base.dto.CommonQuery;
|
||||
import cn.lihongjie.coal.base.dto.IdRequest;
|
||||
import cn.lihongjie.coal.base.service.BaseService;
|
||||
@@ -32,10 +34,7 @@ import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@@ -49,6 +48,9 @@ public class AcAppointmentService
|
||||
@Autowired private ConversionService conversionService;
|
||||
@Autowired private DbFunctionService dbFunctionService;
|
||||
|
||||
@Autowired
|
||||
private AcAppointmentViewRepository viewRepository;
|
||||
|
||||
public AcAppointmentDto create(CreateAcAppointmentDto request) {
|
||||
|
||||
AcAppointmentEntity entity = mapper.toEntity(request);
|
||||
@@ -57,7 +59,8 @@ public class AcAppointmentService
|
||||
entity.setDeleteStatus("0");
|
||||
this.repository.save(entity);
|
||||
|
||||
rabbitMQService.sendToSysExchange("acAppointment.create", entity.getId(), new HashMap<String, String>());
|
||||
rabbitMQService.sendToSysExchange(
|
||||
"acAppointment.create", entity.getId(), new HashMap<String, String>());
|
||||
|
||||
return getById(entity.getId());
|
||||
}
|
||||
@@ -99,7 +102,8 @@ public class AcAppointmentService
|
||||
this.mapper.updateEntity(entity, request);
|
||||
|
||||
this.repository.save(entity);
|
||||
rabbitMQService.sendToSysExchange("acAppointment.update", entity.getId(), new HashMap<String, String>());
|
||||
rabbitMQService.sendToSysExchange(
|
||||
"acAppointment.update", entity.getId(), new HashMap<String, String>());
|
||||
|
||||
return getById(entity.getId());
|
||||
}
|
||||
@@ -116,23 +120,24 @@ public class AcAppointmentService
|
||||
|
||||
List<AcAppointmentEntity> all = this.repository.findAllById(request.getIds());
|
||||
|
||||
List<Map<String, String>> payload = all.stream()
|
||||
.map(
|
||||
x -> {
|
||||
return Map.of(
|
||||
"id",
|
||||
x.getId(),
|
||||
"appKey",
|
||||
x.getDevice().getDataCollector().getAppKey());
|
||||
}).collect(Collectors.toList());
|
||||
List<Map<String, String>> payload =
|
||||
all.stream()
|
||||
.map(
|
||||
x -> {
|
||||
return Map.of(
|
||||
"id",
|
||||
x.getId(),
|
||||
"appKey",
|
||||
x.getDevice().getDataCollector().getAppKey());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
|
||||
rabbitMQService.sendToSysExchange("acAppointment.delete", payload, new HashMap<String, String>());
|
||||
rabbitMQService.sendToSysExchange(
|
||||
"acAppointment.delete", payload, new HashMap<String, String>());
|
||||
|
||||
this.repository.deleteAllById(request.getIds());
|
||||
}
|
||||
|
||||
|
||||
public void cancel(IdRequest request) {
|
||||
ArchiveUtils.checkArchiveStatus(
|
||||
this.repository::findAllById,
|
||||
@@ -145,48 +150,53 @@ public class AcAppointmentService
|
||||
|
||||
List<AcAppointmentEntity> all = this.repository.findAllById(request.getIds());
|
||||
|
||||
List<Map<String, String>> payload = all.stream()
|
||||
.map(
|
||||
x -> {
|
||||
return Map.of(
|
||||
"id",
|
||||
x.getId(),
|
||||
"appKey",
|
||||
x.getDevice().getDataCollector().getAppKey());
|
||||
}).collect(Collectors.toList());
|
||||
List<Map<String, String>> payload =
|
||||
all.stream()
|
||||
.map(
|
||||
x -> {
|
||||
return Map.of(
|
||||
"id",
|
||||
x.getId(),
|
||||
"appKey",
|
||||
x.getDevice().getDataCollector().getAppKey());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
all.forEach(x -> x.setAppointmentStatus("1"));
|
||||
all.forEach(x -> x.setArchiveStatus("1"));
|
||||
this.repository.saveAll(all);
|
||||
rabbitMQService.sendToSysExchange("acAppointment.cancel", payload, new HashMap<String, String>());
|
||||
|
||||
|
||||
|
||||
|
||||
rabbitMQService.sendToSysExchange(
|
||||
"acAppointment.cancel", payload, new HashMap<String, String>());
|
||||
}
|
||||
|
||||
public void expire() {
|
||||
|
||||
public void expire(){
|
||||
List<AcAppointmentEntity> all =
|
||||
this.repository.findAll(
|
||||
new Specification<AcAppointmentEntity>() {
|
||||
@Override
|
||||
public Predicate toPredicate(
|
||||
Root<AcAppointmentEntity> root,
|
||||
CriteriaQuery<?> query,
|
||||
CriteriaBuilder criteriaBuilder) {
|
||||
return criteriaBuilder.and(
|
||||
criteriaBuilder.lessThan(
|
||||
root.get("endTime"), LocalDateTime.now()),
|
||||
criteriaBuilder.equal(root.get("appointmentStatus"), "0"));
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
List<AcAppointmentEntity> all = this.repository.findAll(new Specification<AcAppointmentEntity>() {
|
||||
@Override
|
||||
public Predicate toPredicate(Root<AcAppointmentEntity> root, CriteriaQuery<?> query, CriteriaBuilder criteriaBuilder) {
|
||||
return criteriaBuilder.and(
|
||||
criteriaBuilder.lessThan(root.get("endTime"), LocalDateTime.now()), criteriaBuilder.equal(root.get("appointmentStatus"), "0"))
|
||||
;
|
||||
}
|
||||
});
|
||||
|
||||
List<Map<String, String>> payload = all.stream()
|
||||
.map(
|
||||
x -> {
|
||||
return Map.of(
|
||||
"id",
|
||||
x.getId(),
|
||||
"appKey",
|
||||
x.getDevice().getDataCollector().getAppKey());
|
||||
}).collect(Collectors.toList());
|
||||
List<Map<String, String>> payload =
|
||||
all.stream()
|
||||
.map(
|
||||
x -> {
|
||||
return Map.of(
|
||||
"id",
|
||||
x.getId(),
|
||||
"appKey",
|
||||
x.getDevice().getDataCollector().getAppKey());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
all.forEach(x -> x.setAppointmentStatus("2"));
|
||||
|
||||
@@ -194,9 +204,8 @@ public class AcAppointmentService
|
||||
|
||||
this.repository.saveAll(all);
|
||||
|
||||
rabbitMQService.sendToSysExchange("acAppointment.expire", payload, new HashMap<String, String>());
|
||||
|
||||
|
||||
rabbitMQService.sendToSysExchange(
|
||||
"acAppointment.expire", payload, new HashMap<String, String>());
|
||||
}
|
||||
|
||||
public AcAppointmentDto getById(String id) {
|
||||
@@ -206,8 +215,8 @@ public class AcAppointmentService
|
||||
}
|
||||
|
||||
public Page<AcAppointmentDto> list(CommonQuery query) {
|
||||
Page<AcAppointmentEntity> page =
|
||||
repository.findAll(
|
||||
Page<AcAppointmentViewEntity> page =
|
||||
viewRepository.findAll(
|
||||
query.specification(conversionService),
|
||||
PageRequest.of(
|
||||
query.getPageNo(),
|
||||
|
||||
@@ -14,7 +14,6 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.annotation.Before;
|
||||
import org.aspectj.lang.reflect.MethodSignature;
|
||||
import org.hibernate.Filter;
|
||||
import org.hibernate.Session;
|
||||
@@ -78,12 +77,5 @@ public class OrgScopeAop {
|
||||
return pjp.proceed();
|
||||
}
|
||||
|
||||
@Before(value = "@annotation(org.springframework.transaction.annotation.Transactional))")
|
||||
public void beforeTransactionMethod() {
|
||||
|
||||
if (orgScope.get() != null) {
|
||||
|
||||
Session session = entityManager.unwrap(Session.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,12 +11,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.codec.digest.HmacAlgorithms;
|
||||
import org.apache.commons.codec.digest.HmacUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.annotation.Exchange;
|
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.handler.annotation.Headers;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@@ -45,8 +44,11 @@ public class DataCollectorListener {
|
||||
key = "dataCollector.*")
|
||||
})
|
||||
@Transactional
|
||||
public void handleDataCollectorMessage(String body, @Headers Map<String, Object> headers) {
|
||||
public void handleDataCollectorMessage(Message message) {
|
||||
|
||||
var body = new String(message.getBody(), java.nio.charset.StandardCharsets.UTF_8);
|
||||
var headers = message.getMessageProperties().getHeaders();
|
||||
|
||||
Object key = headers.get("appKey");
|
||||
|
||||
if (key == null) {
|
||||
@@ -69,7 +71,7 @@ public class DataCollectorListener {
|
||||
log.setDataCollector(dataCollector);
|
||||
log.setOrganizationId(dataCollector.getOrganizationId());
|
||||
log.setLogTime(LocalDateTime.now());
|
||||
Object rk = headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY);
|
||||
Object rk = message.getMessageProperties().getReceivedRoutingKey();
|
||||
|
||||
switch (rk.toString()) {
|
||||
case "dataCollector.online":
|
||||
|
||||
@@ -15,7 +15,6 @@ import jakarta.annotation.PostConstruct;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.springframework.amqp.core.AmqpAdmin;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
@@ -48,9 +47,9 @@ public class DataCollectorService
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
|
||||
findAll().stream()
|
||||
.filter(x -> StringUtils.isNotEmpty(x.getAppKey()))
|
||||
.forEach(this::createQueue);
|
||||
// findAll().stream()
|
||||
// .filter(x -> StringUtils.isNotEmpty(x.getAppKey()))
|
||||
// .forEach(this::createQueue);
|
||||
}
|
||||
|
||||
public DataCollectorDto create(CreateDataCollectorDto request) {
|
||||
|
||||
@@ -8,6 +8,9 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.amqp.support.converter.SimpleMessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.amqp.RabbitTemplateCustomizer;
|
||||
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
|
||||
@@ -22,7 +25,6 @@ import java.util.*;
|
||||
@Configuration
|
||||
public class RabbitMQConfiguration {
|
||||
|
||||
|
||||
public static final String SYS_EXCHANGE = "sysExchange";
|
||||
public static final String SYS_DELAY_EXCHANGE = "sysDelayExchange";
|
||||
public static final String DELAY_QUEUE = "delayQueue";
|
||||
@@ -34,21 +36,23 @@ public class RabbitMQConfiguration {
|
||||
"1d"
|
||||
};
|
||||
|
||||
public static final Long[] DELAY_QUEUES_TIME = Arrays.stream(DELAY_QUEUES).map(x -> DurationStyle.detectAndParse(x).toMillis()).toArray(Long[]::new);
|
||||
public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
public static final Long[] DELAY_QUEUES_TIME =
|
||||
Arrays.stream(DELAY_QUEUES)
|
||||
.map(x -> DurationStyle.detectAndParse(x).toMillis())
|
||||
.toArray(Long[]::new);
|
||||
public static final DateTimeFormatter DATE_TIME_FORMATTER =
|
||||
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
@Autowired private ObjectMapper objectMapper;
|
||||
|
||||
|
||||
|
||||
@Bean
|
||||
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
|
||||
ConnectionFactory connectionFactory,
|
||||
ConnectionFactory connectionFactory,
|
||||
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
|
||||
SimpleRabbitListenerContainerFactory factory =
|
||||
new SimpleRabbitListenerContainerFactory();
|
||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||
configurer.configure(factory, connectionFactory);
|
||||
factory.setDefaultRequeueRejected(false);
|
||||
factory.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));
|
||||
return factory;
|
||||
}
|
||||
|
||||
@@ -56,11 +60,34 @@ public class RabbitMQConfiguration {
|
||||
RabbitTemplateCustomizer rabbitTemplateCustomizer() {
|
||||
|
||||
return template -> {
|
||||
template.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));
|
||||
Jackson2JsonMessageConverter messageConverter =
|
||||
new Jackson2JsonMessageConverter(objectMapper);
|
||||
|
||||
SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter();
|
||||
|
||||
template.setMessageConverter(
|
||||
new MessageConverter() {
|
||||
@Override
|
||||
public Message toMessage(Object object, MessageProperties messageProperties)
|
||||
throws MessageConversionException {
|
||||
|
||||
if (object instanceof String || object instanceof byte[]) {
|
||||
return simpleMessageConverter.toMessage(object, messageProperties);
|
||||
}
|
||||
|
||||
|
||||
return messageConverter.toMessage(object, messageProperties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object fromMessage(Message message)
|
||||
throws MessageConversionException {
|
||||
return messageConverter.fromMessage(message);
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public TopicExchange topicExchange() {
|
||||
TopicExchange sysExchange = new TopicExchange(SYS_EXCHANGE, true, false, new HashMap<>());
|
||||
@@ -68,7 +95,6 @@ public class RabbitMQConfiguration {
|
||||
return sysExchange;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public HeadersExchange delayExchange() {
|
||||
HashMap<String, Object> arguments = new HashMap<>();
|
||||
@@ -100,8 +126,6 @@ public class RabbitMQConfiguration {
|
||||
bindArgs));
|
||||
}
|
||||
|
||||
|
||||
|
||||
return new Declarables(declarables);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,30 +1,74 @@
|
||||
package cn.lihongjie.coal.rabbitmq;
|
||||
|
||||
import jakarta.persistence.EntityManager;
|
||||
import jakarta.persistence.PersistenceContext;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.apache.commons.collections4.MapUtils;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class RabbitMQService {
|
||||
|
||||
@Autowired RabbitTemplate rabbitTemplate;
|
||||
|
||||
@PersistenceContext EntityManager em;
|
||||
|
||||
private static void execCommand(Runnable command) {
|
||||
if (TransactionSynchronizationManager.isActualTransactionActive()) {
|
||||
|
||||
TransactionSynchronizationManager.registerSynchronization(
|
||||
new TransactionSynchronization() {
|
||||
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
if (status == STATUS_COMMITTED){
|
||||
command.run();
|
||||
}else {
|
||||
|
||||
log.info("transaction is not committed, skip send message");
|
||||
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
command.run();
|
||||
}
|
||||
}
|
||||
|
||||
public void send(String exchange, String routingKey, Object data, Map<String, String> headers) {
|
||||
|
||||
rabbitTemplate.convertAndSend(
|
||||
exchange,
|
||||
routingKey,
|
||||
data,
|
||||
message -> {
|
||||
if (MapUtils.isNotEmpty(headers))
|
||||
message.getMessageProperties().getHeaders().putAll(headers);
|
||||
return message;
|
||||
});
|
||||
Runnable command =
|
||||
() -> {
|
||||
log.info("send to exchange: {} {}", exchange, routingKey);
|
||||
// try {
|
||||
// Thread.sleep(1000);
|
||||
// } catch (InterruptedException e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
log.info("send to exchange: {} {}", exchange, routingKey);
|
||||
|
||||
rabbitTemplate.convertAndSend(
|
||||
exchange,
|
||||
routingKey,
|
||||
data,
|
||||
message -> {
|
||||
if (MapUtils.isNotEmpty(headers))
|
||||
message.getMessageProperties().getHeaders().putAll(headers);
|
||||
return message;
|
||||
});
|
||||
};
|
||||
|
||||
execCommand(command);
|
||||
}
|
||||
|
||||
public void sendToSysDelayExchange(String routingKey, Object data, long delay) {
|
||||
@@ -33,16 +77,22 @@ public class RabbitMQService {
|
||||
|
||||
public void send(String exchange, String routingKey, Object data, long delay) {
|
||||
|
||||
rabbitTemplate.convertAndSend(
|
||||
exchange,
|
||||
routingKey,
|
||||
data,
|
||||
message -> {
|
||||
message.getMessageProperties()
|
||||
.getHeaders()
|
||||
.put("x-delay-queue-name", getBestDelayQueueName(delay));
|
||||
return message;
|
||||
});
|
||||
Runnable command =
|
||||
() ->
|
||||
rabbitTemplate.convertAndSend(
|
||||
exchange,
|
||||
routingKey,
|
||||
data,
|
||||
message -> {
|
||||
message.getMessageProperties()
|
||||
.getHeaders()
|
||||
.put(
|
||||
"x-delay-queue-name",
|
||||
getBestDelayQueueName(delay));
|
||||
return message;
|
||||
});
|
||||
|
||||
execCommand(command);
|
||||
}
|
||||
|
||||
private String getBestDelayQueueName(long delay) {
|
||||
@@ -54,7 +104,7 @@ public class RabbitMQService {
|
||||
return RabbitMQConfiguration.DELAY_QUEUES[i == 0 ? 0 : i - 1];
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
throw new IllegalArgumentException("delay is too long: " + delay);
|
||||
}
|
||||
|
||||
@@ -63,6 +113,8 @@ public class RabbitMQService {
|
||||
}
|
||||
|
||||
public void sendToSysExchange(String routingKey, Object data, Map<String, String> headers) {
|
||||
log.info("send to sys exchange: {}", routingKey);
|
||||
|
||||
send(RabbitMQConfiguration.SYS_EXCHANGE, routingKey, data, headers);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,8 @@
|
||||
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
|
||||
<include resource="org/springframework/boot/logging/logback/file-appender.xml"/>
|
||||
<logger name="org.quartz" level="WARN"/>
|
||||
<root level="INFO">
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="CONSOLE"/>
|
||||
<appender-ref ref="FILE"/>
|
||||
</root>
|
||||
|
||||
Reference in New Issue
Block a user