Browse Source

报警推送修改

master
wangwei_123 1 week ago
parent
commit
61803382ee
  1. 12
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java
  2. 22
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
  3. 26
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java
  4. 2
      cc-admin-master/yudao-server/src/main/resources/application-local.yaml

12
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java

@ -40,16 +40,4 @@ public class BatchProcessorConfig {
);
}
@Bean
public TdengineBatchConfig<AlarmMessageLog> messageLogBatchProcessor() {
return new TdengineBatchConfig<>(
"messageLogBatchProcessor",
list -> tdengineService.createAlarmRecord(list),
50000,
1000,
30000 // 内部每 30秒 执行一次
);
}
}

22
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -42,8 +42,6 @@ public class BatchDeviceMessageProcessor {
@Resource
private TdengineBatchConfig<TdengineDataVo> tdengineBatchConfig;
@Resource
private TdengineBatchConfig<AlarmMessageLog> alarmProcessor;
@Resource
private HandDetectorService handDetectorService;
@Resource
private HandAlarmService handAlarmService;
@ -207,11 +205,6 @@ public class BatchDeviceMessageProcessor {
log.debug("[批量持久化] 处理日志: {} 条", context.processedLogs.size());
}
// 3. 批量保存报警消息日志
if (!context.alarmMessageLogs.isEmpty()) {
alarmProcessor.addToBatch(context.alarmMessageLogs);
log.debug("[批量持久化] 报警消息: {} 条", context.alarmMessageLogs.size());
}
// 4. 批量创建气体报警
if (!context.gasAlarmsToCreate.isEmpty()) {
// 2. 批量保存
@ -851,25 +844,16 @@ public class BatchDeviceMessageProcessor {
String msgContent = String.format("%s%s,%s气体浓度为%s",
handVo.getUserName(), statusText, gasName, valueStr);
// 记录报警消息日志
AlarmMessageLog alarmMessageLog = new AlarmMessageLog();
alarmMessageLog.setDetectorId(handVo.getId());
alarmMessageLog.setHolderName(handVo.getUserName());
alarmMessageLog.setSn(handVo.getSn());
alarmMessageLog.setDeptId(handVo.getDeptId());
alarmMessageLog.setTenantId(handVo.getTenantId());
alarmMessageLog.setMessage(msgContent);
alarmMessageLog.setRemark("系统自动触发报警推送");
try {
AlarmDispatchEvent event = new AlarmDispatchEvent(
handVo.getId(),
handVo.getDeptId(),
handVo.getTenantId(),
handVo.getSn(),
handVo.getUserName(),
msgContent
);
// 使用 sourceSn 作为 Key,保证同一个部门/设备的报警排队有序处理
String key = handVo.getSn();
String json = JsonUtils.toJsonString(event);
@ -878,7 +862,6 @@ public class BatchDeviceMessageProcessor {
log.error("[报警推送] 准备推送数据失败", e);
}
context.alarmMessageLogs.add(alarmMessageLog);
}
@ -942,7 +925,6 @@ public class BatchDeviceMessageProcessor {
// 待保存的日志
List<HandOriginalLog> originalLogs = new ArrayList<>();
List<TdengineDataVo> processedLogs = new ArrayList<>();
List<AlarmMessageLog> alarmMessageLogs = new ArrayList<>();
// 待保存的报警
List<HandAlarmDO> gasAlarmsToCreate = new ArrayList<>();

26
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java

@ -2,7 +2,10 @@ package cn.iocoder.yudao.module.mqtt.processor;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.hand.service.HandDetectorService;
import cn.iocoder.yudao.module.hand.service.TdengineService;
import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent;
import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog;
import cn.iocoder.yudao.module.mqtt.config.TdengineBatchConfig;
import cn.iocoder.yudao.module.mqtt.mqtt.Client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.RateLimiter;
@ -11,18 +14,22 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class HandAlarmMessageProcess {
private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0);
@Resource
private Client mqttClient;
@Resource
private HandDetectorService handDetectorService;
private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0);
@Resource
private TdengineService tdengineService;
public void processSingle(String jsonValue) {
// 1. 解析 Kafka 消息
@ -47,6 +54,21 @@ public class HandAlarmMessageProcess {
// 3. 执行推送逻辑
this.publishAlarmToMqtt(targetSns, event.getMsgContent());
// 记录报警消息日志
AlarmMessageLog alarmMessageLog = new AlarmMessageLog();
alarmMessageLog.setDetectorId(event.getId());
alarmMessageLog.setHolderName(event.getUserName());
alarmMessageLog.setSn(event.getSourceSn());
alarmMessageLog.setDeptId(event.getDeptId());
alarmMessageLog.setTenantId(event.getTenantId());
alarmMessageLog.setMessage(event.getMsgContent());
alarmMessageLog.setRemark("系统自动触发报警推送");
alarmMessageLog.setPushSnList(StringUtils.join(targetSns, ","));
ArrayList<AlarmMessageLog> objects = new ArrayList<>();
objects.add(alarmMessageLog);
tdengineService.createAlarmRecord(objects);
} catch (Exception e) {
log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e);
}

2
cc-admin-master/yudao-server/src/main/resources/application-local.yaml

@ -48,7 +48,7 @@ spring:
primary: master
datasource:
master:
url: jdbc:mysql://192.168.0.180:3306/hand_alarm?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true # MySQL Connector/J 8.X 连接的示例
url: jdbc:mysql://192.168.0.180:3306/hand_alarm_dev?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true # MySQL Connector/J 8.X 连接的示例
username: root
password: Gsking164411
driver-class-name: com.mysql.cj.jdbc.Driver # MySQL Connector/J 8.X 连接的示例

Loading…
Cancel
Save