添加ybiot数据导入

This commit is contained in:
2024-08-28 09:35:15 +08:00
parent 6e5abeb9e7
commit 18b0ff5016
2 changed files with 211 additions and 0 deletions

View File

@@ -0,0 +1,194 @@
package cn.lihongjie.coal.dataCollector.listener;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.lihongjie.coal.acDevice.service.AcDeviceService;
import cn.lihongjie.coal.acDeviceData.service.AcDeviceDataService;
import cn.lihongjie.coal.dataCollector.entity.DataCollectorEntity;
import cn.lihongjie.coal.dataCollector.service.DataCollectorService;
import cn.lihongjie.coal.dataCollectorLog.entity.DataCollectorLogEntity;
import cn.lihongjie.coal.dataCollectorLog.service.DataCollectorLogService;
import cn.lihongjie.coal.meter.entity.MeterEntity;
import cn.lihongjie.coal.meter.service.MeterService;
import cn.lihongjie.coal.meterLog.entity.MeterLogEntity;
import cn.lihongjie.coal.meterLog.service.MeterLogService;
import cn.lihongjie.coal.rabbitmq.RabbitMQConfiguration;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.HmacAlgorithms;
import org.apache.commons.codec.digest.HmacUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.*;
/**
* {"code":200,"msg":"","data":{"list":[{"MeterID":"N\/A","MeterPureFlow":"N\/D","MeterPositiveFlow":"4990.1","MeterNegativeFlow":"0","MeterFlowRate":"0","MeterFlowSpeed":"N\/A","AINPressure":"N\/D","AINPressure2":"N\/D","IMEI":"868558065671560","ComType":"CAT1","CSQ":"16","BatteryVoltage":"3.66","DCVoltage":"0","Model":"NT800-BK","Remarks":"\u827a\u949f\u517b\u6b96\u573a","ProductID":"1119221642","LastOnlineDate":"2024-08-27
* 21:51:37","Online":1},{"MeterID":"N\/A","MeterPureFlow":"N\/D","MeterPositiveFlow":"15903.9","MeterNegativeFlow":"0","MeterFlowRate":"0","MeterFlowSpeed":"N\/A","AINPressure":"N\/D","AINPressure2":"N\/D","IMEI":"868558065674945","ComType":"CAT1","CSQ":"15","BatteryVoltage":"3.65","DCVoltage":"0","Model":"NT800-BK","Remarks":"\u827a\u949f\u6d17\u7164\u5382\u30011\u53f7","ProductID":"1119221629","LastOnlineDate":"2024-08-27
* 13:55:16","Online":1},{"MeterID":"N\/A","MeterPureFlow":"N\/D","MeterPositiveFlow":"0.4","MeterNegativeFlow":"0","MeterFlowRate":"0","MeterFlowSpeed":"N\/A","AINPressure":"N\/D","AINPressure2":"N\/D","IMEI":"868558066304708","ComType":"CAT1","CSQ":"16","BatteryVoltage":"3.67","DCVoltage":"0","Model":"NT800-BK","Remarks":"\u827a\u949f\u6d17\u7164\u5382\u30012\u53f7","ProductID":"1119221628","LastOnlineDate":"2024-08-27
* 17:16:14","Online":1}],"total":3,"show":["MeterID","MeterPureFlow","MeterPositiveFlow","MeterNegativeFlow","MeterFlowRate","MeterFlowSpeed","AINPressure","AINPressure2","IMEI","ComType","CSQ","BatteryVoltage","DCVoltage","Model","Remarks","ProductID","LastOnlineDate","HourlyInterval","PeriodicalInterval","HourlyRandom","LastOnlineTime","AINPressureMeterRange","AINPressure2MeterRange","Online"],"config":[{"column":"Remarks","name":"\u4ea7\u54c1\u5907\u6ce8"},{"column":"ProductID","name":"\u7ec8\u7aefID"},{"column":"Online","name":"\u662f\u5426\u5728\u7ebf"},{"column":"MeterID","name":"\u57fa\u8868ID"},{"column":"LastOnlineDate","name":"\u6700\u65b0\u6570\u636e\u65f6\u95f4"},{"column":"MeterFlowRate","name":"\u77ac\u65f6\u6d41\u91cf"},{"column":"MeterPositiveFlow","name":"\u6b63\u7d2f\u79ef\u6d41\u91cf"},{"column":"MeterNegativeFlow","name":"\u8d1f\u7d2f\u79ef\u6d41\u91cf"},{"column":"MeterPureFlow","name":"\u51c0\u7d2f\u79ef\u6d41\u91cf"},{"column":"MeterFlowSpeed","name":"\u6d41\u901f"},{"column":"AINPressure","name":"\u6a21\u62df\u91cf\u538b\u529b"},{"column":"AINPressure2","name":"\u6a21\u62df\u91cf\u538b\u529b2"},{"column":"ComType","name":"\u7ec8\u7aef\u65e0\u7ebf\u5236\u5f0f"},{"column":"CSQ","name":"\u7ec8\u7aef\u65e0\u7ebf\u4fe1\u53f7\u8d28\u91cf"},{"column":"BatteryVoltage","name":"\u7ec8\u7aef\u7535\u6c60\u7535\u538b"},{"column":"DCVoltage","name":"\u7535\u6e90\u7535\u538b"},{"column":"Model","name":"\u7ec8\u7aef\u578b\u53f7"},{"column":"IMEI","name":"IMEI"}]}}
*/
@Component
@Slf4j
public class YbIotListener {
@Autowired DataCollectorService dataCollectorService;
@Autowired DataCollectorLogService dataCollectorLogService;
@Autowired ObjectMapper objectMapper;
@Autowired AcDeviceDataService acDeviceDataService;
@Autowired AcDeviceService acDeviceService;
@Autowired MeterLogService meterLogService;
@Autowired MeterService meterService;
@SneakyThrows
@RabbitListener(
bindings = {
@QueueBinding(
value =
@org.springframework.amqp.rabbit.annotation.Queue(
value = "ybiot.data",
durable = "true"),
exchange =
@Exchange(
value = RabbitMQConfiguration.SYS_EXCHANGE,
declare = Exchange.FALSE),
key = "ybiot.*")
})
@Transactional
public void handlePmsMessage(Message message) {
var body = new String(message.getBody(), StandardCharsets.UTF_8);
Map<String, Object> headers = message.getMessageProperties().getHeaders();
Object key = headers.get("appKey");
if (key == null) {
log.error("appKey is null");
return;
}
DataCollectorEntity dataCollector = dataCollectorService.findByAppKey(key.toString());
var sign =
new HmacUtils(HmacAlgorithms.HMAC_SHA_256, dataCollector.getAppSecret())
.hmacHex(body.getBytes(StandardCharsets.UTF_8));
if (!StringUtils.equals(sign, headers.get("sign").toString())) {
log.error("sign not match {} {}", sign, headers.get("sign"));
return;
}
JsonNode jsonNode = objectMapper.readTree(body);
DataCollectorLogEntity logEntity = new DataCollectorLogEntity();
logEntity.setOrganizationId(dataCollector.getOrganizationId());
logEntity.setDataCollector(dataCollector);
Object rk = message.getMessageProperties().getReceivedRoutingKey();
logEntity.setLogTime(LocalDateTime.now());
logEntity.setType(rk.toString());
switch (rk.toString()) {
case "ybiot.realTimeData" -> {
JsonNode code = jsonNode.get("code");
JsonNode msg = jsonNode.get("msg");
JsonNode data = jsonNode.get("data");
if (code.asInt() != 200) {
log.error("ybiot code is not 200: {} {}", code, msg.asText());
return;
}
if (data == null) {
log.error("ybiot data is null");
return;
}
JsonNode list = data.get("list");
if (list == null) {
log.error("ybiot list is null");
return;
}
ArrayList<MeterLogEntity> entities = new ArrayList<>();
for (JsonNode node : list) {
MeterLogEntity entity = new MeterLogEntity();
String productID = node.get("ProductID").asText();
if (StringUtils.isBlank(productID)) {
log.error("productID is blank");
continue;
}
String imei = node.get("IMEI").asText();
if (!node.has("MeterPositiveFlow")) {
log.error("MeterPositiveFlow is null");
continue;
}
double meterPositiveFlow = node.get("MeterPositiveFlow").asDouble();
if (!node.has("LastOnlineDate")) {
log.error("LastOnlineDate is null");
continue;
}
LocalDateTime lastOnlineDate =
LocalDateTimeUtil.parse(
node.get("LastOnlineDate").asText(), "yyyy-MM-dd HH:mm:ss");
entity.setTime(lastOnlineDate);
entity.setSource("2");
entity.setValue(meterPositiveFlow);
MeterEntity meter = meterService.findByProductID(productID);
if (meter == null) {
log.error("meter not found: {}", productID);
continue;
}
entity.setMeter(meter);
entities.add(entity);
}
meterLogService.saveWithoutDup(entities);
}
default -> {
log.warn("unknown message type: {}", rk);
}
}
dataCollectorLogService.save(logEntity);
// log.info("new message from pms: {}", body);
}
}

View File

@@ -11,6 +11,9 @@ import cn.lihongjie.coal.meter.entity.MeterEntity;
import cn.lihongjie.coal.meter.mapper.MeterMapper;
import cn.lihongjie.coal.meter.repository.MeterRepository;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -21,6 +24,8 @@ import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Service
@Slf4j
@Transactional
@@ -74,4 +79,16 @@ class MeterService extends BaseService<MeterEntity, MeterRepository> {
return page.map(this.mapper::toDto);
}
@PersistenceContext
EntityManager em;
public MeterEntity findByProductID(String productID) {
List<MeterEntity> list = em.createQuery("select m from MeterEntity m where m.ybiotDeviceId = :productID", MeterEntity.class)
.setParameter("productID", productID)
.getResultList();
return list.isEmpty() ? null : list.get(0);
}
}