This commit is contained in:
2024-05-21 22:46:48 +08:00
parent 33b513e09e
commit 59e330b608
6 changed files with 297 additions and 57 deletions

View File

@@ -100,7 +100,7 @@ rabbitmq add_vhost /coal
docker compose exec -it rabbitmq bash
rabbitmqctl add_user datacollector datacollector
rabbitmqctl set_permissions --vhost /coal/test datacollector '' 'sysExchange|amq.default' 'pms\.client\.*|dataCollector\.*'
rabbitmqctl set_permissions --vhost /coal/test datacollector '' 'sysExchange|amq.default' 'pms\.client\.*|dataCollector\.*|weight20\.*'
```

View File

@@ -16,6 +16,7 @@ import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RegExUtils;
import org.apache.commons.lang3.StringUtils;
import org.intellij.lang.annotations.Language;
import org.springframework.util.Assert;
import org.springframework.util.DigestUtils;
@@ -127,7 +128,7 @@ public class FreeMakerUtils {
}
@SneakyThrows
public static String render( String template, Object model) {
public static String render(@Language("freemarker") String template, Object model) {
if (StringUtils.isBlank(template)) {
return "";

View File

@@ -15,11 +15,15 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -28,18 +32,25 @@ import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
@Transactional
public class DataCollectorService
extends BaseService<DataCollectorEntity, DataCollectorRepository> {
public static final MessagePostProcessor MESSAGE_POST_PROCESSOR =
m -> {
m.getMessageProperties().setExpiration(TimeUnit.SECONDS.toMillis(5) + "");
return m;
};
@Autowired private DataCollectorRepository repository;
@Autowired private DataCollectorMapper mapper;
@@ -52,14 +63,7 @@ public class DataCollectorService
@Autowired RabbitTemplate rabbitTemplate;
@Autowired ObjectMapper objectMapper;
@PostConstruct
public void init() {
// findAll().stream()
// .filter(x -> StringUtils.isNotEmpty(x.getAppKey()))
// .forEach(this::createQueue);
}
@PersistenceContext EntityManager em;
public DataCollectorDto create(CreateDataCollectorDto request) {
DataCollectorEntity entity = mapper.toEntity(request);
@@ -72,15 +76,25 @@ public class DataCollectorService
return getById(entity.getId());
}
@Autowired private RabbitMQService rabbitMQService;
@PostConstruct
@Async
public void init() {
findAll().stream()
.filter(x -> StringUtils.isNotEmpty(x.getAppKey()))
.forEach(this::createQueue);
}
public Object getLocalStatus(GetLocalStatusRequest getLocalStatusRequest) {
var o =
rabbitTemplate.convertSendAndReceive(
"dataCollector." + getLocalStatusRequest.appKey(),
Map.of("action", "getLocalStatus")
);
Map.of("action", "getLocalStatus"),
MESSAGE_POST_PROCESSOR);
return o;
}
@@ -89,56 +103,50 @@ public class DataCollectorService
return rabbitTemplate.convertSendAndReceive(
"dataCollector." + updateLocalStatus.appKey(),
Map.of("action", "updateLocalStatus", "data", updateLocalStatus.status())
);
Map.of("action", "updateLocalStatus", "data", updateLocalStatus.status()),
MESSAGE_POST_PROCESSOR);
}
public Object deleteLocalStatus(DeleteLocalStatusRequest deleteLocalStatusRequest) {
return rabbitTemplate.convertSendAndReceive(
"dataCollector." + deleteLocalStatusRequest.appKey(), Map.of("action", "deleteLocalStatus", "data", deleteLocalStatusRequest.keys())
);
"dataCollector." + deleteLocalStatusRequest.appKey(),
Map.of("action", "deleteLocalStatus", "data", deleteLocalStatusRequest.keys()),
MESSAGE_POST_PROCESSOR);
}
public Object getXml(GetXmlRequest request) {
return rabbitTemplate.convertSendAndReceive(
"dataCollector." + request.appKey(), Map.of("action", "getXML")
);
"dataCollector." + request.appKey(),
Map.of("action", "getXML"),
MESSAGE_POST_PROCESSOR);
}
public Object getVersion(GetXmlRequest request) {
return rabbitTemplate.convertSendAndReceive(
"dataCollector." + request.appKey(), Map.of("action", "getVersion")
);
"dataCollector." + request.appKey(),
Map.of("action", "getVersion"),
MESSAGE_POST_PROCESSOR);
}
public Object restart(GetXmlRequest request) {
return rabbitTemplate.convertSendAndReceive(
"dataCollector." + request.appKey(), Map.of("action", "restart")
);
"dataCollector." + request.appKey(),
Map.of("action", "restart"),
MESSAGE_POST_PROCESSOR);
}
public Object sendMsg(String id, String action, Object data, Map<String, String> headers) {
DataCollectorEntity collector = get(id);
if (data == null){
if (data == null) {
data = new HashMap<>();
}
if (collector == null) {
throw new BizException("数据采集器不存在");
}
@@ -153,20 +161,6 @@ public class DataCollectorService
new CorrelationData(UUID.randomUUID().toString()));
}
@Nullable
private void createQueue(DataCollectorEntity entity) {
amqpAdmin.declareQueue(
new Queue("pms.client." + entity.getAppKey(), true, false, false, new HashMap<>()));
amqpAdmin.declareQueue(
new Queue(
"dataCollector." + entity.getAppKey(),
true,
false,
false,
new HashMap<>()));
}
public DataCollectorDto update(UpdateDataCollectorDto request) {
DataCollectorEntity entity = this.repository.get(request.getId());
this.mapper.updateEntity(entity, request);
@@ -211,4 +205,36 @@ public class DataCollectorService
return repository.findByAppKey(appKey);
}
@Nullable
private void createQueue(DataCollectorEntity entity) {
amqpAdmin.declareQueue(
new Queue("pms.client." + entity.getAppKey(), true, false, false, new HashMap<>()));
amqpAdmin.declareQueue(
new Queue(
"dataCollector." + entity.getAppKey(),
true,
false,
false,
new HashMap<>()));
amqpAdmin.declareQueue(
new Queue(
"weight20." + entity.getAppKey(),
true,
false,
false,
new HashMap<>()));
}
public DataCollectorEntity getByAppKey(String appKey) {
return em.createQuery(
"select x from DataCollectorEntity x where x.appKey = :appKey",
DataCollectorEntity.class)
.setParameter("appKey", appKey)
.getSingleResult();
}
}

View File

@@ -12,6 +12,9 @@ import cn.lihongjie.coal.weightDevice.entity.WeightDeviceEntity;
import cn.lihongjie.coal.weightDevice.mapper.WeightDeviceMapper;
import cn.lihongjie.coal.weightDevice.repository.WeightDeviceRepository;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -76,4 +79,16 @@ public class WeightDeviceService extends BaseService<WeightDeviceEntity, WeightD
return page.map(this.mapper::toDto);
}
@PersistenceContext EntityManager em;
public WeightDeviceEntity getDeviceByAppKey(String appKey) {
return em.createQuery(
"select x from WeightDeviceEntity x where x.dataCollector.appKey = :appKey",
WeightDeviceEntity.class)
.setParameter("appKey", appKey)
.getSingleResult();
}
}

View File

@@ -58,10 +58,17 @@ public class WeightDeviceDataController {
@PostMapping("/reSync")
public Object reSync(@RequestBody ReSyncRequest request) {
this.service.reSync(request);
return true;
return this.service.reSync(request);
}
@PostMapping("/checkLocalData")
public Object checkLocalData(@RequestBody ReSyncRequest request) {
return this.service.checkLocalData(request);
}
@PostMapping("/archive")
public Object archive(@RequestBody IdRequest request) {
this.service.archive(request);

View File

@@ -10,20 +10,27 @@ import cn.lihongjie.coal.dataCollectorLog.service.DataCollectorLogService;
import cn.lihongjie.coal.dbFunctions.DbFunctionService;
import cn.lihongjie.coal.exception.BizException;
import cn.lihongjie.coal.rabbitmq.RabbitMQService;
import cn.lihongjie.coal.weightDevice.entity.WeightDeviceEntity;
import cn.lihongjie.coal.weightDevice.service.WeightDeviceService;
import cn.lihongjie.coal.weightDeviceData.dto.*;
import cn.lihongjie.coal.weightDeviceData.entity.WeightDeviceDataEntity;
import cn.lihongjie.coal.weightDeviceData.mapper.WeightDeviceDataMapper;
import cn.lihongjie.coal.weightDeviceData.repository.WeightDeviceDataRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.CaseFormat;
import io.vavr.collection.Stream;
import jakarta.persistence.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Page;
@@ -33,6 +40,9 @@ import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -55,6 +65,7 @@ public class WeightDeviceDataService
@Autowired RabbitMQService rabbitMQService;
private DataCollectorLogService dataCollectorLogService;
@Autowired RabbitTemplate rabbitTemplate;
public WeightDeviceDataDto create(CreateWeightDeviceDataDto request) {
WeightDeviceDataEntity entity = mapper.toEntity(request);
@@ -62,14 +73,17 @@ public class WeightDeviceDataService
this.repository.save(entity);
return getById(entity.getId());
}
@Autowired WeightDeviceService weightDeviceService;
@Autowired private ObjectMapper jacksonObjectMapper;
public void reSync(ReSyncRequest requst) {
@SneakyThrows
public List<Map> checkLocalData(ReSyncRequest requst) {
TypedQuery<WeightDeviceDataEntity> query =
TypedQuery<String> query =
em.createQuery(
FreeMakerUtils.render(
"""
select distinct d.device.dataCollector.appKey
from WeightDeviceDataEntity d where 1=1
@@ -97,6 +111,142 @@ public class WeightDeviceDataService
""",
requst),
String.class);
JpaUtils.setQueryParameter(query, requst);
var datas = query.getResultList();
List<Map> result = new ArrayList<>();
for (var appKey : datas) {
WeightDeviceEntity device = weightDeviceService.getDeviceByAppKey(appKey);
TypedQuery<Tuple> sysSum = em.createQuery(
"""
select sum(d.mz) as mz,
sum(d.pz) as pz,
sum(d.jz) as jz,
sum(d.kz) as kz,
sum(d.sz) as sz,
count(1) as cnt
from WeightDeviceDataEntity d where d.dataUpdateTime >= :start and d.dataUpdateTime <= :end and d.device.id = :deviceId
""", Tuple.class);
sysSum.setParameter("start", requst.getStart());
sysSum.setParameter("end", requst.getEnd());
sysSum.setParameter("deviceId", device.getId());
List<Tuple> resultList = sysSum.getResultList();
java.util.Map<String, Object> sysSumResult = JpaUtils.convertTuplesToMap(resultList).get(0);
var sql =
"""
select
sum(毛重) as mz
sum(皮重) as pz
sum(净重) as jz
sum(扣重) as kz
sum(实重) as sz,
count(1) as cnt
from 称重信息
where 更新时间>='%s' and 更新时间<='%s'
"""
.formatted(
requst.getStart()
.format(
DateTimeFormatter.ofPattern(
"yyyy-MM-dd HH:mm:ss.SSS")),
requst.getEnd()
.format(
DateTimeFormatter.ofPattern(
"yyyy-MM-dd HH:mm:ss.SSS")));
Map<String, Object> req =
Map.of(
"action",
"check",
"data",
Map.of("start", requst.getStart(), "end", requst.getEnd(), "sql", sql));
Object resp = rabbitTemplate.convertSendAndReceive("weight20." + appKey, req);
Map<String, Object> resultData = Map.of("appKey", appKey, "device", device, "bf", resp, "sys", sysSumResult);
log.info("check result: {}", jacksonObjectMapper.writeValueAsString(resultData));
result.add(resultData);
}
return result;
}
@SneakyThrows
public List<Map> reSync(ReSyncRequest requst) {
requst.setFlowNumbers(
Stream.ofAll(requst.getFlowNumbers())
.filter(x -> StringUtils.isNotBlank(x))
.toJavaList());
if (CollectionUtils.isEmpty(requst.getFlowNumbers())) {
requst.setFlowNumbers(null);
}
if (requst.getStart() == null) {
requst.setStart(LocalDateTime.now().minusYears(1));
}
if (requst.getEnd() == null) {
requst.setEnd(LocalDateTime.now());
}
TypedQuery<WeightDeviceDataEntity> query =
em.createQuery(
FreeMakerUtils.render(
"""
from WeightDeviceDataEntity d where 1=1
<#if ids??>
and d.id in (:ids)
</#if>
<#if start??>
and d.minTime >= :start
</#if>
<#if end??>
and d.minTime <= :end
</#if>
<#if flowNumbers?? >
and d.flowNumber in (:flowNumbers)
</#if>
""",
requst),
WeightDeviceDataEntity.class);
@@ -109,15 +259,56 @@ public class WeightDeviceDataService
datas.stream()
.collect(
Collectors.groupingBy(
e -> e.getDevice().getDataCollector().getId()));
e -> e.getDevice().getDataCollector().getAppKey()));
List<Map> result = new ArrayList<>();
for (Map.Entry<String, List<WeightDeviceDataEntity>> entry : groupByDc.entrySet()) {
Object object =
dataCollectorService.sendMsg(entry.getKey(), "weigth20.sync", requst, Map.of());
LocalDateTime minTime =
Stream.ofAll(entry.getValue())
.minBy(x -> x.getMinTime())
.map(WeightDeviceDataEntity::getMinTime)
.get()
.minusHours(1);
log.info("reSync result: {}", object);
LocalDateTime maxTime =
Stream.ofAll(entry.getValue())
.maxBy(x -> x.getMinTime())
.map(WeightDeviceDataEntity::getMinTime)
.get()
.plusHours(1);
List<String> flowNumbers =
requst.getFlowNumbers() == null
? new ArrayList<>()
: Stream.ofAll(entry.getValue())
.map(WeightDeviceDataEntity::getFlowNumber)
.toJavaList();
for (Stream<String> flowNumber : Stream.ofAll(flowNumbers).grouped(1000).toJavaList()) {
Map<String, Object> req =
Map.of(
"action",
"reSync",
"data",
Map.of(
"start",
minTime,
"end",
maxTime,
"flowNumbers",
flowNumber.toJavaList()));
Object resp =
rabbitTemplate.convertSendAndReceive("weight20." + entry.getKey(), req);
Map<String, Object> resultData =
Map.of("appKey", entry.getKey(), "resp", resp, "req", req);
log.info("reSync result: {}", jacksonObjectMapper.writeValueAsString(resultData));
result.add(resultData);
}
}
return result;
}
public WeightDeviceDataDto update(UpdateWeightDeviceDataDto request) {