From 59e330b6088e4b466d9c6ba2b878b66d29541b55 Mon Sep 17 00:00:00 2001 From: lihongjie0209 Date: Tue, 21 May 2024 22:46:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- .../lihongjie/coal/common/FreeMakerUtils.java | 3 +- .../service/DataCollectorService.java | 116 ++++++---- .../service/WeightDeviceService.java | 15 ++ .../WeightDeviceDataController.java | 11 +- .../service/WeightDeviceDataService.java | 207 +++++++++++++++++- 6 files changed, 297 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 1b82ce95..7fd79ccf 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ rabbitmq add_vhost /coal docker compose exec -it rabbitmq bash rabbitmqctl add_user datacollector datacollector -rabbitmqctl set_permissions --vhost /coal/test datacollector '' 'sysExchange|amq.default' 'pms\.client\.*|dataCollector\.*' +rabbitmqctl set_permissions --vhost /coal/test datacollector '' 'sysExchange|amq.default' 'pms\.client\.*|dataCollector\.*|weight20\.*' ``` diff --git a/src/main/java/cn/lihongjie/coal/common/FreeMakerUtils.java b/src/main/java/cn/lihongjie/coal/common/FreeMakerUtils.java index 28ae1f9b..9845dc56 100644 --- a/src/main/java/cn/lihongjie/coal/common/FreeMakerUtils.java +++ b/src/main/java/cn/lihongjie/coal/common/FreeMakerUtils.java @@ -16,6 +16,7 @@ import lombok.experimental.UtilityClass; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.RegExUtils; import org.apache.commons.lang3.StringUtils; +import org.intellij.lang.annotations.Language; import org.springframework.util.Assert; import org.springframework.util.DigestUtils; @@ -127,7 +128,7 @@ public class FreeMakerUtils { } @SneakyThrows - public static String render( String template, Object model) { + public static String render(@Language("freemarker") String template, Object model) { if (StringUtils.isBlank(template)) { return ""; diff --git a/src/main/java/cn/lihongjie/coal/dataCollector/service/DataCollectorService.java b/src/main/java/cn/lihongjie/coal/dataCollector/service/DataCollectorService.java index 84b82839..d6eb99e8 100644 --- a/src/main/java/cn/lihongjie/coal/dataCollector/service/DataCollectorService.java +++ b/src/main/java/cn/lihongjie/coal/dataCollector/service/DataCollectorService.java @@ -15,11 +15,15 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.PostConstruct; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.Nullable; import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -28,18 +32,25 @@ import org.springframework.core.convert.ConversionService; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; @Service @Slf4j @Transactional public class DataCollectorService extends BaseService { + public static final MessagePostProcessor MESSAGE_POST_PROCESSOR = + m -> { + m.getMessageProperties().setExpiration(TimeUnit.SECONDS.toMillis(5) + ""); + return m; + }; @Autowired private DataCollectorRepository repository; @Autowired private DataCollectorMapper mapper; @@ -52,14 +63,7 @@ public class DataCollectorService @Autowired RabbitTemplate rabbitTemplate; @Autowired ObjectMapper objectMapper; - - @PostConstruct - public void init() { - - // findAll().stream() - // .filter(x -> StringUtils.isNotEmpty(x.getAppKey())) - // .forEach(this::createQueue); - } + @PersistenceContext EntityManager em; public DataCollectorDto create(CreateDataCollectorDto request) { DataCollectorEntity entity = mapper.toEntity(request); @@ -72,15 +76,25 @@ public class DataCollectorService return getById(entity.getId()); } + @Autowired private RabbitMQService rabbitMQService; + @PostConstruct + @Async + public void init() { + + findAll().stream() + .filter(x -> StringUtils.isNotEmpty(x.getAppKey())) + .forEach(this::createQueue); + } + public Object getLocalStatus(GetLocalStatusRequest getLocalStatusRequest) { var o = rabbitTemplate.convertSendAndReceive( "dataCollector." + getLocalStatusRequest.appKey(), - Map.of("action", "getLocalStatus") - ); + Map.of("action", "getLocalStatus"), + MESSAGE_POST_PROCESSOR); return o; } @@ -89,56 +103,50 @@ public class DataCollectorService return rabbitTemplate.convertSendAndReceive( "dataCollector." + updateLocalStatus.appKey(), - Map.of("action", "updateLocalStatus", "data", updateLocalStatus.status()) - ); + Map.of("action", "updateLocalStatus", "data", updateLocalStatus.status()), + MESSAGE_POST_PROCESSOR); } public Object deleteLocalStatus(DeleteLocalStatusRequest deleteLocalStatusRequest) { return rabbitTemplate.convertSendAndReceive( - "dataCollector." + deleteLocalStatusRequest.appKey(), Map.of("action", "deleteLocalStatus", "data", deleteLocalStatusRequest.keys()) - ); - + "dataCollector." + deleteLocalStatusRequest.appKey(), + Map.of("action", "deleteLocalStatus", "data", deleteLocalStatusRequest.keys()), + MESSAGE_POST_PROCESSOR); } - public Object getXml(GetXmlRequest request) { return rabbitTemplate.convertSendAndReceive( - "dataCollector." + request.appKey(), Map.of("action", "getXML") - ); - + "dataCollector." + request.appKey(), + Map.of("action", "getXML"), + MESSAGE_POST_PROCESSOR); } - public Object getVersion(GetXmlRequest request) { return rabbitTemplate.convertSendAndReceive( - "dataCollector." + request.appKey(), Map.of("action", "getVersion") - ); - + "dataCollector." + request.appKey(), + Map.of("action", "getVersion"), + MESSAGE_POST_PROCESSOR); } - public Object restart(GetXmlRequest request) { return rabbitTemplate.convertSendAndReceive( - "dataCollector." + request.appKey(), Map.of("action", "restart") - ); - + "dataCollector." + request.appKey(), + Map.of("action", "restart"), + MESSAGE_POST_PROCESSOR); } public Object sendMsg(String id, String action, Object data, Map headers) { DataCollectorEntity collector = get(id); - if (data == null){ + if (data == null) { data = new HashMap<>(); } - - - if (collector == null) { throw new BizException("数据采集器不存在"); } @@ -153,20 +161,6 @@ public class DataCollectorService new CorrelationData(UUID.randomUUID().toString())); } - @Nullable - private void createQueue(DataCollectorEntity entity) { - amqpAdmin.declareQueue( - new Queue("pms.client." + entity.getAppKey(), true, false, false, new HashMap<>())); - - amqpAdmin.declareQueue( - new Queue( - "dataCollector." + entity.getAppKey(), - true, - false, - false, - new HashMap<>())); - } - public DataCollectorDto update(UpdateDataCollectorDto request) { DataCollectorEntity entity = this.repository.get(request.getId()); this.mapper.updateEntity(entity, request); @@ -211,4 +205,36 @@ public class DataCollectorService return repository.findByAppKey(appKey); } + + @Nullable + private void createQueue(DataCollectorEntity entity) { + amqpAdmin.declareQueue( + new Queue("pms.client." + entity.getAppKey(), true, false, false, new HashMap<>())); + + amqpAdmin.declareQueue( + new Queue( + "dataCollector." + entity.getAppKey(), + true, + false, + false, + new HashMap<>())); + + + amqpAdmin.declareQueue( + new Queue( + "weight20." + entity.getAppKey(), + true, + false, + false, + new HashMap<>())); + } + + public DataCollectorEntity getByAppKey(String appKey) { + + return em.createQuery( + "select x from DataCollectorEntity x where x.appKey = :appKey", + DataCollectorEntity.class) + .setParameter("appKey", appKey) + .getSingleResult(); + } } diff --git a/src/main/java/cn/lihongjie/coal/weightDevice/service/WeightDeviceService.java b/src/main/java/cn/lihongjie/coal/weightDevice/service/WeightDeviceService.java index d940f557..ed480cb8 100644 --- a/src/main/java/cn/lihongjie/coal/weightDevice/service/WeightDeviceService.java +++ b/src/main/java/cn/lihongjie/coal/weightDevice/service/WeightDeviceService.java @@ -12,6 +12,9 @@ import cn.lihongjie.coal.weightDevice.entity.WeightDeviceEntity; import cn.lihongjie.coal.weightDevice.mapper.WeightDeviceMapper; import cn.lihongjie.coal.weightDevice.repository.WeightDeviceRepository; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; + import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -76,4 +79,16 @@ public class WeightDeviceService extends BaseService checkLocalData(ReSyncRequest requst) { - TypedQuery query = + TypedQuery query = em.createQuery( FreeMakerUtils.render( """ - + select distinct d.device.dataCollector.appKey from WeightDeviceDataEntity d where 1=1 @@ -97,6 +111,142 @@ public class WeightDeviceDataService + """, + requst), + String.class); + + JpaUtils.setQueryParameter(query, requst); + + var datas = query.getResultList(); + + List result = new ArrayList<>(); + + for (var appKey : datas) { + + WeightDeviceEntity device = weightDeviceService.getDeviceByAppKey(appKey); + + TypedQuery sysSum = em.createQuery( + """ + + select sum(d.mz) as mz, + + sum(d.pz) as pz, + sum(d.jz) as jz, + sum(d.kz) as kz, + sum(d.sz) as sz, + count(1) as cnt + + from WeightDeviceDataEntity d where d.dataUpdateTime >= :start and d.dataUpdateTime <= :end and d.device.id = :deviceId + + """, Tuple.class); + + + + sysSum.setParameter("start", requst.getStart()); + sysSum.setParameter("end", requst.getEnd()); + sysSum.setParameter("deviceId", device.getId()); + + List resultList = sysSum.getResultList(); + + + java.util.Map sysSumResult = JpaUtils.convertTuplesToMap(resultList).get(0); + + + + + var sql = + """ + +select +sum(毛重) as mz +sum(皮重) as pz +sum(净重) as jz +sum(扣重) as kz +sum(实重) as sz, +count(1) as cnt +from 称重信息 +where 更新时间>='%s' and 更新时间<='%s' +""" + .formatted( + requst.getStart() + .format( + DateTimeFormatter.ofPattern( + "yyyy-MM-dd HH:mm:ss.SSS")), + requst.getEnd() + .format( + DateTimeFormatter.ofPattern( + "yyyy-MM-dd HH:mm:ss.SSS"))); + + Map req = + Map.of( + "action", + "check", + "data", + Map.of("start", requst.getStart(), "end", requst.getEnd(), "sql", sql)); + + Object resp = rabbitTemplate.convertSendAndReceive("weight20." + appKey, req); + + Map resultData = Map.of("appKey", appKey, "device", device, "bf", resp, "sys", sysSumResult); + log.info("check result: {}", jacksonObjectMapper.writeValueAsString(resultData)); + + result.add(resultData); + } + + return result; + } + + @SneakyThrows + public List reSync(ReSyncRequest requst) { + + requst.setFlowNumbers( + Stream.ofAll(requst.getFlowNumbers()) + .filter(x -> StringUtils.isNotBlank(x)) + .toJavaList()); + + if (CollectionUtils.isEmpty(requst.getFlowNumbers())) { + requst.setFlowNumbers(null); + } + + if (requst.getStart() == null) { + requst.setStart(LocalDateTime.now().minusYears(1)); + } + + if (requst.getEnd() == null) { + requst.setEnd(LocalDateTime.now()); + } + + TypedQuery query = + em.createQuery( + FreeMakerUtils.render( + """ + + + from WeightDeviceDataEntity d where 1=1 + + <#if ids??> + and d.id in (:ids) + + + <#if start??> + + and d.minTime >= :start + + + + <#if end??> + + and d.minTime <= :end + + + + <#if flowNumbers?? > + + and d.flowNumber in (:flowNumbers) + + + + + """, requst), WeightDeviceDataEntity.class); @@ -109,15 +259,56 @@ public class WeightDeviceDataService datas.stream() .collect( Collectors.groupingBy( - e -> e.getDevice().getDataCollector().getId())); - + e -> e.getDevice().getDataCollector().getAppKey())); + List result = new ArrayList<>(); for (Map.Entry> entry : groupByDc.entrySet()) { - Object object = - dataCollectorService.sendMsg(entry.getKey(), "weigth20.sync", requst, Map.of()); + LocalDateTime minTime = + Stream.ofAll(entry.getValue()) + .minBy(x -> x.getMinTime()) + .map(WeightDeviceDataEntity::getMinTime) + .get() + .minusHours(1); - log.info("reSync result: {}", object); + LocalDateTime maxTime = + Stream.ofAll(entry.getValue()) + .maxBy(x -> x.getMinTime()) + .map(WeightDeviceDataEntity::getMinTime) + .get() + .plusHours(1); + + List flowNumbers = + requst.getFlowNumbers() == null + ? new ArrayList<>() + : Stream.ofAll(entry.getValue()) + .map(WeightDeviceDataEntity::getFlowNumber) + .toJavaList(); + + for (Stream flowNumber : Stream.ofAll(flowNumbers).grouped(1000).toJavaList()) { + + Map req = + Map.of( + "action", + "reSync", + "data", + Map.of( + "start", + minTime, + "end", + maxTime, + "flowNumbers", + flowNumber.toJavaList())); + Object resp = + rabbitTemplate.convertSendAndReceive("weight20." + entry.getKey(), req); + + Map resultData = + Map.of("appKey", entry.getKey(), "resp", resp, "req", req); + log.info("reSync result: {}", jacksonObjectMapper.writeValueAsString(resultData)); + result.add(resultData); + } } + + return result; } public WeightDeviceDataDto update(UpdateWeightDeviceDataDto request) {