From 18b0ff501651acd2ad7d26a0efad01f5db518a87 Mon Sep 17 00:00:00 2001 From: lihongjie0209 Date: Wed, 28 Aug 2024 09:35:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0ybiot=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=AF=BC=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataCollector/listener/YbIotListener.java | 194 ++++++++++++++++++ .../coal/meter/service/MeterService.java | 17 ++ 2 files changed, 211 insertions(+) create mode 100644 src/main/java/cn/lihongjie/coal/dataCollector/listener/YbIotListener.java diff --git a/src/main/java/cn/lihongjie/coal/dataCollector/listener/YbIotListener.java b/src/main/java/cn/lihongjie/coal/dataCollector/listener/YbIotListener.java new file mode 100644 index 00000000..483fbf62 --- /dev/null +++ b/src/main/java/cn/lihongjie/coal/dataCollector/listener/YbIotListener.java @@ -0,0 +1,194 @@ +package cn.lihongjie.coal.dataCollector.listener; + +import cn.hutool.core.date.LocalDateTimeUtil; +import cn.lihongjie.coal.acDevice.service.AcDeviceService; +import cn.lihongjie.coal.acDeviceData.service.AcDeviceDataService; +import cn.lihongjie.coal.dataCollector.entity.DataCollectorEntity; +import cn.lihongjie.coal.dataCollector.service.DataCollectorService; +import cn.lihongjie.coal.dataCollectorLog.entity.DataCollectorLogEntity; +import cn.lihongjie.coal.dataCollectorLog.service.DataCollectorLogService; +import cn.lihongjie.coal.meter.entity.MeterEntity; +import cn.lihongjie.coal.meter.service.MeterService; +import cn.lihongjie.coal.meterLog.entity.MeterLogEntity; +import cn.lihongjie.coal.meterLog.service.MeterLogService; +import cn.lihongjie.coal.rabbitmq.RabbitMQConfiguration; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.codec.digest.HmacAlgorithms; +import org.apache.commons.codec.digest.HmacUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.util.*; + +/** + * {"code":200,"msg":"","data":{"list":[{"MeterID":"N\/A","MeterPureFlow":"N\/D","MeterPositiveFlow":"4990.1","MeterNegativeFlow":"0","MeterFlowRate":"0","MeterFlowSpeed":"N\/A","AINPressure":"N\/D","AINPressure2":"N\/D","IMEI":"868558065671560","ComType":"CAT1","CSQ":"16","BatteryVoltage":"3.66","DCVoltage":"0","Model":"NT800-BK","Remarks":"\u827a\u949f\u517b\u6b96\u573a","ProductID":"1119221642","LastOnlineDate":"2024-08-27 + * 21:51:37","Online":1},{"MeterID":"N\/A","MeterPureFlow":"N\/D","MeterPositiveFlow":"15903.9","MeterNegativeFlow":"0","MeterFlowRate":"0","MeterFlowSpeed":"N\/A","AINPressure":"N\/D","AINPressure2":"N\/D","IMEI":"868558065674945","ComType":"CAT1","CSQ":"15","BatteryVoltage":"3.65","DCVoltage":"0","Model":"NT800-BK","Remarks":"\u827a\u949f\u6d17\u7164\u5382\u30011\u53f7","ProductID":"1119221629","LastOnlineDate":"2024-08-27 + * 13:55:16","Online":1},{"MeterID":"N\/A","MeterPureFlow":"N\/D","MeterPositiveFlow":"0.4","MeterNegativeFlow":"0","MeterFlowRate":"0","MeterFlowSpeed":"N\/A","AINPressure":"N\/D","AINPressure2":"N\/D","IMEI":"868558066304708","ComType":"CAT1","CSQ":"16","BatteryVoltage":"3.67","DCVoltage":"0","Model":"NT800-BK","Remarks":"\u827a\u949f\u6d17\u7164\u5382\u30012\u53f7","ProductID":"1119221628","LastOnlineDate":"2024-08-27 + * 17:16:14","Online":1}],"total":3,"show":["MeterID","MeterPureFlow","MeterPositiveFlow","MeterNegativeFlow","MeterFlowRate","MeterFlowSpeed","AINPressure","AINPressure2","IMEI","ComType","CSQ","BatteryVoltage","DCVoltage","Model","Remarks","ProductID","LastOnlineDate","HourlyInterval","PeriodicalInterval","HourlyRandom","LastOnlineTime","AINPressureMeterRange","AINPressure2MeterRange","Online"],"config":[{"column":"Remarks","name":"\u4ea7\u54c1\u5907\u6ce8"},{"column":"ProductID","name":"\u7ec8\u7aefID"},{"column":"Online","name":"\u662f\u5426\u5728\u7ebf"},{"column":"MeterID","name":"\u57fa\u8868ID"},{"column":"LastOnlineDate","name":"\u6700\u65b0\u6570\u636e\u65f6\u95f4"},{"column":"MeterFlowRate","name":"\u77ac\u65f6\u6d41\u91cf"},{"column":"MeterPositiveFlow","name":"\u6b63\u7d2f\u79ef\u6d41\u91cf"},{"column":"MeterNegativeFlow","name":"\u8d1f\u7d2f\u79ef\u6d41\u91cf"},{"column":"MeterPureFlow","name":"\u51c0\u7d2f\u79ef\u6d41\u91cf"},{"column":"MeterFlowSpeed","name":"\u6d41\u901f"},{"column":"AINPressure","name":"\u6a21\u62df\u91cf\u538b\u529b"},{"column":"AINPressure2","name":"\u6a21\u62df\u91cf\u538b\u529b2"},{"column":"ComType","name":"\u7ec8\u7aef\u65e0\u7ebf\u5236\u5f0f"},{"column":"CSQ","name":"\u7ec8\u7aef\u65e0\u7ebf\u4fe1\u53f7\u8d28\u91cf"},{"column":"BatteryVoltage","name":"\u7ec8\u7aef\u7535\u6c60\u7535\u538b"},{"column":"DCVoltage","name":"\u7535\u6e90\u7535\u538b"},{"column":"Model","name":"\u7ec8\u7aef\u578b\u53f7"},{"column":"IMEI","name":"IMEI"}]}} + */ +@Component +@Slf4j +public class YbIotListener { + + @Autowired DataCollectorService dataCollectorService; + + @Autowired DataCollectorLogService dataCollectorLogService; + + @Autowired ObjectMapper objectMapper; + + @Autowired AcDeviceDataService acDeviceDataService; + + @Autowired AcDeviceService acDeviceService; + + @Autowired MeterLogService meterLogService; + @Autowired MeterService meterService; + + @SneakyThrows + @RabbitListener( + bindings = { + @QueueBinding( + value = + @org.springframework.amqp.rabbit.annotation.Queue( + value = "ybiot.data", + durable = "true"), + exchange = + @Exchange( + value = RabbitMQConfiguration.SYS_EXCHANGE, + declare = Exchange.FALSE), + key = "ybiot.*") + }) + @Transactional + public void handlePmsMessage(Message message) { + + var body = new String(message.getBody(), StandardCharsets.UTF_8); + + Map headers = message.getMessageProperties().getHeaders(); + + Object key = headers.get("appKey"); + + if (key == null) { + log.error("appKey is null"); + return; + } + + DataCollectorEntity dataCollector = dataCollectorService.findByAppKey(key.toString()); + + var sign = + new HmacUtils(HmacAlgorithms.HMAC_SHA_256, dataCollector.getAppSecret()) + .hmacHex(body.getBytes(StandardCharsets.UTF_8)); + + if (!StringUtils.equals(sign, headers.get("sign").toString())) { + log.error("sign not match {} {}", sign, headers.get("sign")); + return; + } + + JsonNode jsonNode = objectMapper.readTree(body); + + DataCollectorLogEntity logEntity = new DataCollectorLogEntity(); + logEntity.setOrganizationId(dataCollector.getOrganizationId()); + logEntity.setDataCollector(dataCollector); + + Object rk = message.getMessageProperties().getReceivedRoutingKey(); + + logEntity.setLogTime(LocalDateTime.now()); + logEntity.setType(rk.toString()); + + switch (rk.toString()) { + case "ybiot.realTimeData" -> { + JsonNode code = jsonNode.get("code"); + JsonNode msg = jsonNode.get("msg"); + JsonNode data = jsonNode.get("data"); + + if (code.asInt() != 200) { + log.error("ybiot code is not 200: {} {}", code, msg.asText()); + return; + } + + if (data == null) { + log.error("ybiot data is null"); + return; + } + + JsonNode list = data.get("list"); + + if (list == null) { + log.error("ybiot list is null"); + return; + } + + ArrayList entities = new ArrayList<>(); + + for (JsonNode node : list) { + + MeterLogEntity entity = new MeterLogEntity(); + + String productID = node.get("ProductID").asText(); + + if (StringUtils.isBlank(productID)) { + log.error("productID is blank"); + continue; + } + + String imei = node.get("IMEI").asText(); + + if (!node.has("MeterPositiveFlow")) { + + log.error("MeterPositiveFlow is null"); + continue; + } + + double meterPositiveFlow = node.get("MeterPositiveFlow").asDouble(); + + if (!node.has("LastOnlineDate")) { + + log.error("LastOnlineDate is null"); + continue; + } + + LocalDateTime lastOnlineDate = + LocalDateTimeUtil.parse( + node.get("LastOnlineDate").asText(), "yyyy-MM-dd HH:mm:ss"); + + entity.setTime(lastOnlineDate); + + entity.setSource("2"); + + entity.setValue(meterPositiveFlow); + + MeterEntity meter = meterService.findByProductID(productID); + + if (meter == null) { + log.error("meter not found: {}", productID); + continue; + } + + entity.setMeter(meter); + + entities.add(entity); + } + meterLogService.saveWithoutDup(entities); + } + default -> { + log.warn("unknown message type: {}", rk); + } + } + + dataCollectorLogService.save(logEntity); + + // log.info("new message from pms: {}", body); + } +} diff --git a/src/main/java/cn/lihongjie/coal/meter/service/MeterService.java b/src/main/java/cn/lihongjie/coal/meter/service/MeterService.java index 54a88790..d5441cc2 100644 --- a/src/main/java/cn/lihongjie/coal/meter/service/MeterService.java +++ b/src/main/java/cn/lihongjie/coal/meter/service/MeterService.java @@ -11,6 +11,9 @@ import cn.lihongjie.coal.meter.entity.MeterEntity; import cn.lihongjie.coal.meter.mapper.MeterMapper; import cn.lihongjie.coal.meter.repository.MeterRepository; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; + import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -21,6 +24,8 @@ import org.springframework.data.domain.Sort; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.List; + @Service @Slf4j @Transactional @@ -74,4 +79,16 @@ class MeterService extends BaseService { return page.map(this.mapper::toDto); } + + @PersistenceContext + EntityManager em; + + public MeterEntity findByProductID(String productID) { + + List list = em.createQuery("select m from MeterEntity m where m.ybiotDeviceId = :productID", MeterEntity.class) + .setParameter("productID", productID) + .getResultList(); + return list.isEmpty() ? null : list.get(0); + } + }