diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java index 0de167a..b064b49 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java @@ -39,17 +39,4 @@ public class BatchProcessorConfig { 5000 // 内部每 5秒 执行一次 ); } - - @Bean - public TdengineBatchConfig messageLogBatchProcessor() { - return new TdengineBatchConfig<>( - "messageLogBatchProcessor", - list -> tdengineService.createAlarmRecord(list), - 50000, - 1000, - 30000 // 内部每 30秒 执行一次 - ); - - - } } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java index 25400aa..07b2cf1 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.mqtt.config; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.Collection; diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index f4970ae..f5d30f2 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -25,6 +25,7 @@ import org.springframework.core.task.TaskExecutor; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; +import java.math.BigInteger; import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.*; @@ -42,8 +43,6 @@ public class BatchDeviceMessageProcessor { @Resource private TdengineBatchConfig tdengineBatchConfig; @Resource - private TdengineBatchConfig alarmProcessor; - @Resource private HandDetectorService handDetectorService; @Resource private HandAlarmService handAlarmService; @@ -207,11 +206,6 @@ public class BatchDeviceMessageProcessor { log.debug("[批量持久化] 处理日志: {} 条", context.processedLogs.size()); } - // 3. 批量保存报警消息日志 - if (!context.alarmMessageLogs.isEmpty()) { - alarmProcessor.addToBatch(context.alarmMessageLogs); - log.debug("[批量持久化] 报警消息: {} 条", context.alarmMessageLogs.size()); - } // 4. 批量创建气体报警 if (!context.gasAlarmsToCreate.isEmpty()) { // 2. 批量保存 @@ -450,6 +444,15 @@ public class BatchDeviceMessageProcessor { String battery = json.getStr("battery"); String user = json.getStr("user"); + + Integer boot = json.getInt("boot"); + String sig = json.getStr("sig"); + Long mem = json.getLong("mem"); + + + device.setBoot(boot); + device.setSig(sig); + device.setMem(mem); device.setUserName(user); device.setValue(gasValue); device.setSn(sn); @@ -851,25 +854,16 @@ public class BatchDeviceMessageProcessor { String msgContent = String.format("%s%s,%s气体浓度为%s", handVo.getUserName(), statusText, gasName, valueStr); - // 记录报警消息日志 - AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); - alarmMessageLog.setDetectorId(handVo.getId()); - alarmMessageLog.setHolderName(handVo.getUserName()); - alarmMessageLog.setSn(handVo.getSn()); - alarmMessageLog.setDeptId(handVo.getDeptId()); - alarmMessageLog.setTenantId(handVo.getTenantId()); - alarmMessageLog.setMessage(msgContent); - alarmMessageLog.setRemark("系统自动触发报警推送"); - try { AlarmDispatchEvent event = new AlarmDispatchEvent( + handVo.getId(), handVo.getDeptId(), handVo.getTenantId(), handVo.getSn(), + handVo.getUserName(), msgContent ); - // 使用 sourceSn 作为 Key,保证同一个部门/设备的报警排队有序处理 String key = handVo.getSn(); String json = JsonUtils.toJsonString(event); @@ -878,7 +872,6 @@ public class BatchDeviceMessageProcessor { log.error("[报警推送] 准备推送数据失败", e); } - context.alarmMessageLogs.add(alarmMessageLog); } @@ -942,7 +935,6 @@ public class BatchDeviceMessageProcessor { // 待保存的日志 List originalLogs = new ArrayList<>(); List processedLogs = new ArrayList<>(); - List alarmMessageLogs = new ArrayList<>(); // 待保存的报警 List gasAlarmsToCreate = new ArrayList<>(); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java index e36e3e2..d14d2e5 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/DeviceMessageProcessor.java @@ -31,7 +31,7 @@ import java.util.stream.Collectors; @Slf4j @Component -public class DeviceMessageProcessor { +public class DeviceMessageProcessor { private static final ObjectMapper objectMapper = new ObjectMapper(); @Resource @@ -42,16 +42,12 @@ public class DeviceMessageProcessor { private TdengineBatchConfig tdengineBatchConfig; @Resource - private TdengineBatchConfig alarmProcessor; - @Resource private HandDetectorService handDetectorService; @Resource private HandAlarmService handAlarmService; @Resource private AlarmRuleService alarmRuleService; @Resource - private RedissonClient redissonClient; - @Resource private FenceService fenceService; @Resource private FenceAlarmService fenceAlarmService; @@ -554,18 +550,6 @@ public class DeviceMessageProcessor { } } }); - //报警推送消息 - AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); - alarmMessageLog.setDetectorId(redisData.getId()); - alarmMessageLog.setHolderName(redisData.getName()); - alarmMessageLog.setSn(redisData.getSn()); - alarmMessageLog.setDeptId(redisData.getDeptId()); - alarmMessageLog.setTenantId(redisData.getTenantId()); - alarmMessageLog.setMessage(msgContent); - alarmMessageLog.setPushSnList(StringUtils.join(listAll, ",")); - alarmMessageLog.setMessage(msgContent); - alarmMessageLog.setRemark("系统自动触发报警推送"); - alarmProcessor.addToBatch(alarmMessageLog); } /** diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java index bedcb72..720daa0 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java @@ -2,7 +2,9 @@ package cn.iocoder.yudao.module.mqtt.processor; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.hand.service.HandDetectorService; +import cn.iocoder.yudao.module.hand.service.TdengineService; import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent; +import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; import cn.iocoder.yudao.module.mqtt.mqtt.Client; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.util.concurrent.RateLimiter; @@ -11,18 +13,22 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; + @Slf4j @Component public class HandAlarmMessageProcess { + private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0); @Resource private Client mqttClient; @Resource private HandDetectorService handDetectorService; - - private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0); + @Resource + private TdengineService tdengineService; public void processSingle(String jsonValue) { // 1. 解析 Kafka 消息 @@ -47,6 +53,21 @@ public class HandAlarmMessageProcess { // 3. 执行推送逻辑 this.publishAlarmToMqtt(targetSns, event.getMsgContent()); + // 记录报警消息日志 + AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); + alarmMessageLog.setDetectorId(event.getId()); + alarmMessageLog.setHolderName(event.getUserName()); + alarmMessageLog.setSn(event.getSourceSn()); + alarmMessageLog.setDeptId(event.getDeptId()); + alarmMessageLog.setTenantId(event.getTenantId()); + alarmMessageLog.setMessage(event.getMsgContent()); + alarmMessageLog.setRemark("系统自动触发报警推送"); + alarmMessageLog.setPushSnList(StringUtils.join(targetSns, ",")); + + ArrayList objects = new ArrayList<>(); + objects.add(alarmMessageLog); + tdengineService.createAlarmRecord(objects); + } catch (Exception e) { log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e); } @@ -58,7 +79,7 @@ public class HandAlarmMessageProcess { private void publishAlarmToMqtt(List targetSns, String message) { if (message == null) { return; - } + } // 构造 MQTT 消息体 Map payload = Map.of("message", message); String jsonPayload = JsonUtils.toJsonString(payload); diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandDetectorController.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandDetectorController.java index f9d4714..5275b2a 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandDetectorController.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/controller/admin/HandDetectorController.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.hand.controller.admin; import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; import cn.iocoder.yudao.module.hand.dal.HandDetectorDO; import cn.iocoder.yudao.module.hand.service.HandDetectorService; import cn.iocoder.yudao.module.hand.util.RedisKeyUtil; @@ -139,7 +140,8 @@ public class HandDetectorController { if (null == loginUser) { throw exception(HAND_DETECTOR_REDIS_NOT_EXISTS); } - String tenantDeviceHashKey = RedisKeyUtil.getTenantDeviceHashKey(loginUser.getTenantId()); + Long tenantId = TenantContextHolder.getTenantId(); + String tenantDeviceHashKey = RedisKeyUtil.getTenantDeviceHashKey(tenantId); Map handData = handDetectorService.getHandData(tenantDeviceHashKey); return CommonResult.success(handData); } diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java index 1d1164d..8479829 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandDetectorServiceImpl.java @@ -267,21 +267,17 @@ public class HandDetectorServiceImpl implements HandDetectorService { @Transactional(rollbackFor = Exception.class) @TenantIgnore public void dataMigrate(HandDetectorSaveReqVO migrateReqVO) { - // 1. --- 输入校验 --- if (CollectionUtils.isEmpty(migrateReqVO.getIdList()) || migrateReqVO.getTenantId() == null) { - // 如果没有提供设备ID或目标租户ID,则直接返回或抛出异常 return; } - // 2. --- 数据准备 --- // 一次性查询出所有待迁移的设备实体 List devicesToMigrate = handDetectorMapper.selectByIds(migrateReqVO.getIdList()); if (CollectionUtils.isEmpty(devicesToMigrate)) { - // 如果根据ID没有查到任何设备,直接返回 return; } - // 获取旧的租户ID(假设一批迁移的设备都来自同一个租户) + // 获取旧的租户ID Integer oldTenantId = devicesToMigrate.get(0).getTenantId(); // 新的租户ID Integer newTenantId = migrateReqVO.getTenantId(); @@ -296,7 +292,6 @@ public class HandDetectorServiceImpl implements HandDetectorService { .map(HandDetectorDO::getSn) .toList(); - // 3. --- 数据库更新 --- // 遍历设备列表,更新租户ID并清空特定于旧租户的关联数据 for (HandDetectorDO device : devicesToMigrate) { device.setTenantId(newTenantId); diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java index ce3ca3f..d4605fe 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/TdengineServiceImpl.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; import cn.iocoder.yudao.module.hand.vo.AlarmMessageLog; import cn.iocoder.yudao.module.hand.dal.HandDetectorDO; import cn.iocoder.yudao.module.hand.mapper.TdengineMapper; @@ -77,10 +78,8 @@ public class TdengineServiceImpl implements TdengineService { @Override public PageResult getHandDataLog(HandTdenginePageVO pageReqVO) { IPage page = new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()); - LoginUser loginUser = getLoginUser(); - if (loginUser != null){ - pageReqVO.setTenantId(loginUser.getTenantId()); - } + Long tenantId = TenantContextHolder.getTenantId(); + pageReqVO.setTenantId(tenantId); IPage resultPage = tdengineMapper.selectPage(page, pageReqVO); List doList = resultPage.getRecords(); @@ -96,10 +95,8 @@ public class TdengineServiceImpl implements TdengineService { public PageResult getOriginalLogPage(HandTdenginePageVO pageReqVO) { IPage page = new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()); - LoginUser loginUser = getLoginUser(); - if (loginUser != null){ - pageReqVO.setTenantId(loginUser.getTenantId()); - } + Long tenantId = TenantContextHolder.getTenantId(); + pageReqVO.setTenantId(tenantId); IPage resultPage = tdengineMapper.selectOriginalPage(page, pageReqVO); List doList = resultPage.getRecords(); @@ -113,10 +110,8 @@ public class TdengineServiceImpl implements TdengineService { @Override public List HistoricalSn(HandTdenginePo po) { - LoginUser loginUser = getLoginUser(); - if (loginUser != null){ - po.setTenantId(loginUser.getTenantId()); - } + Long tenantId = TenantContextHolder.getTenantId(); + po.setTenantId(tenantId); return tdengineMapper.HistoricalSn(po); } @@ -134,10 +129,8 @@ public class TdengineServiceImpl implements TdengineService { IPage page = new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()); - LoginUser loginUser = getLoginUser(); - if (loginUser != null){ - pageReqVO.setTenantId(loginUser.getTenantId()); - } + Long tenantId = TenantContextHolder.getTenantId(); + pageReqVO.setTenantId(tenantId); IPage resultPage = tdengineMapper.selectMessagePage(page, pageReqVO); List doList = resultPage.getRecords(); diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java index 3cd7f7b..bd62abc 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java @@ -8,9 +8,11 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @AllArgsConstructor public class AlarmDispatchEvent { + private Long id; //设备id private Long deptId; private Long tenantId; private String sourceSn; + private String userName; // 消息内容 private String msgContent; diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java index 338f109..cf077ce 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java @@ -118,4 +118,13 @@ public class HandDataVo { @Schema(description = "最后推送等级") private Double lastPushValue; + + @Schema(description = "开机时间(秒)") + private Integer boot; + + @Schema(description = "信号强度") + private String sig; + + @Schema(description = "内存") + private Long mem; } diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java index 0a6269b..2ade741 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/TdengineDataVo.java @@ -34,6 +34,16 @@ public class TdengineDataVo { @Schema(description = "持有人姓名") private String name; + @Schema(description = "开机时间(秒)") + private Integer boot; + + @Schema(description = "信号强度") + private String sig; + + @Schema(description = "内存") + private Long mem; + + @Schema(description = "租户id") private Long tenantId; diff --git a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml index e5a4735..c572c3e 100644 --- a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml +++ b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml @@ -37,14 +37,16 @@ TAGS(#{log.sn}, #{log.tenantId}) + (ts,battery,value,longitude,latitude,name,boot_seconds,signal_str,mem_bytes) + VALUES - (#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name}) + (#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name}, #{log.boot}, #{log.sig},#{log.mem})