diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java index 0de167a..90c8ea6 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java @@ -40,16 +40,4 @@ public class BatchProcessorConfig { ); } - @Bean - public TdengineBatchConfig messageLogBatchProcessor() { - return new TdengineBatchConfig<>( - "messageLogBatchProcessor", - list -> tdengineService.createAlarmRecord(list), - 50000, - 1000, - 30000 // 内部每 30秒 执行一次 - ); - - - } } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index f4970ae..c99b48a 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -42,8 +42,6 @@ public class BatchDeviceMessageProcessor { @Resource private TdengineBatchConfig tdengineBatchConfig; @Resource - private TdengineBatchConfig alarmProcessor; - @Resource private HandDetectorService handDetectorService; @Resource private HandAlarmService handAlarmService; @@ -207,11 +205,6 @@ public class BatchDeviceMessageProcessor { log.debug("[批量持久化] 处理日志: {} 条", context.processedLogs.size()); } - // 3. 批量保存报警消息日志 - if (!context.alarmMessageLogs.isEmpty()) { - alarmProcessor.addToBatch(context.alarmMessageLogs); - log.debug("[批量持久化] 报警消息: {} 条", context.alarmMessageLogs.size()); - } // 4. 批量创建气体报警 if (!context.gasAlarmsToCreate.isEmpty()) { // 2. 批量保存 @@ -851,25 +844,16 @@ public class BatchDeviceMessageProcessor { String msgContent = String.format("%s%s,%s气体浓度为%s", handVo.getUserName(), statusText, gasName, valueStr); - // 记录报警消息日志 - AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); - alarmMessageLog.setDetectorId(handVo.getId()); - alarmMessageLog.setHolderName(handVo.getUserName()); - alarmMessageLog.setSn(handVo.getSn()); - alarmMessageLog.setDeptId(handVo.getDeptId()); - alarmMessageLog.setTenantId(handVo.getTenantId()); - alarmMessageLog.setMessage(msgContent); - alarmMessageLog.setRemark("系统自动触发报警推送"); - try { AlarmDispatchEvent event = new AlarmDispatchEvent( + handVo.getId(), handVo.getDeptId(), handVo.getTenantId(), handVo.getSn(), + handVo.getUserName(), msgContent ); - // 使用 sourceSn 作为 Key,保证同一个部门/设备的报警排队有序处理 String key = handVo.getSn(); String json = JsonUtils.toJsonString(event); @@ -878,7 +862,6 @@ public class BatchDeviceMessageProcessor { log.error("[报警推送] 准备推送数据失败", e); } - context.alarmMessageLogs.add(alarmMessageLog); } @@ -942,7 +925,6 @@ public class BatchDeviceMessageProcessor { // 待保存的日志 List originalLogs = new ArrayList<>(); List processedLogs = new ArrayList<>(); - List alarmMessageLogs = new ArrayList<>(); // 待保存的报警 List gasAlarmsToCreate = new ArrayList<>(); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java index bedcb72..3e1e8f9 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java @@ -2,7 +2,10 @@ package cn.iocoder.yudao.module.mqtt.processor; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.hand.service.HandDetectorService; +import cn.iocoder.yudao.module.hand.service.TdengineService; import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent; +import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; +import cn.iocoder.yudao.module.mqtt.config.TdengineBatchConfig; import cn.iocoder.yudao.module.mqtt.mqtt.Client; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.util.concurrent.RateLimiter; @@ -11,18 +14,22 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; + @Slf4j @Component public class HandAlarmMessageProcess { + private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0); @Resource private Client mqttClient; @Resource private HandDetectorService handDetectorService; - - private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0); + @Resource + private TdengineService tdengineService; public void processSingle(String jsonValue) { // 1. 解析 Kafka 消息 @@ -47,6 +54,21 @@ public class HandAlarmMessageProcess { // 3. 执行推送逻辑 this.publishAlarmToMqtt(targetSns, event.getMsgContent()); + // 记录报警消息日志 + AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); + alarmMessageLog.setDetectorId(event.getId()); + alarmMessageLog.setHolderName(event.getUserName()); + alarmMessageLog.setSn(event.getSourceSn()); + alarmMessageLog.setDeptId(event.getDeptId()); + alarmMessageLog.setTenantId(event.getTenantId()); + alarmMessageLog.setMessage(event.getMsgContent()); + alarmMessageLog.setRemark("系统自动触发报警推送"); + alarmMessageLog.setPushSnList(StringUtils.join(targetSns, ",")); + + ArrayList objects = new ArrayList<>(); + objects.add(alarmMessageLog); + tdengineService.createAlarmRecord(objects); + } catch (Exception e) { log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e); } @@ -58,7 +80,7 @@ public class HandAlarmMessageProcess { private void publishAlarmToMqtt(List targetSns, String message) { if (message == null) { return; - } + } // 构造 MQTT 消息体 Map payload = Map.of("message", message); String jsonPayload = JsonUtils.toJsonString(payload); diff --git a/cc-admin-master/yudao-server/src/main/resources/application-local.yaml b/cc-admin-master/yudao-server/src/main/resources/application-local.yaml index 20498f5..809e2d7 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application-local.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application-local.yaml @@ -48,7 +48,7 @@ spring: primary: master datasource: master: - url: jdbc:mysql://192.168.0.180:3306/hand_alarm?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true # MySQL Connector/J 8.X 连接的示例 + url: jdbc:mysql://192.168.0.180:3306/hand_alarm_dev?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true # MySQL Connector/J 8.X 连接的示例 username: root password: Gsking164411 driver-class-name: com.mysql.cj.jdbc.Driver # MySQL Connector/J 8.X 连接的示例