diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java index 7cba2c9..ae414cd 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java @@ -24,7 +24,7 @@ public class BatchProcessorConfig { "HandLogProcessor", list -> tdengineService.saveHandLogBatch(list), 50000, - 1000, + 2000, 5000 // 内部每 5秒 执行一次 ); } @@ -35,7 +35,7 @@ public class BatchProcessorConfig { "handLogBatchDataProcessor", list -> tdengineService.saveDataLogBatch(list), 50000, - 1000, + 2000, 5000 // 内部每 5秒 执行一次 ); } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java index 0a9bc43..25400aa 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java @@ -51,21 +51,31 @@ public class TdengineBatchConfig { try { long now = System.currentTimeMillis(); long waitTime = maxWaitTimeMs - (now - lastFlushTime); + + // 保护性逻辑:如果计算出的等待时间<=0,说明已经超时了,设为1ms避免 poll 报错或死等 if (waitTime <= 0) waitTime = 1; + // 1. 尝试获取数据 T firstItem = dataQueue.poll(waitTime, TimeUnit.MILLISECONDS); if (firstItem != null) { buffer.add(firstItem); + // 2. 如果拿到了第一个,尝试把队列里剩下的都捞出来(直到填满 batchSize) dataQueue.drainTo(buffer, batchSize - buffer.size()); } + // 3. 判断条件 boolean sizeReached = buffer.size() >= batchSize; boolean timeReached = (System.currentTimeMillis() - lastFlushTime) >= maxWaitTimeMs; - if ((sizeReached || timeReached) && !buffer.isEmpty()) { - doFlush(buffer); - buffer.clear(); + if (sizeReached || timeReached) { + // 只有当 buffer 有数据时才执行入库 + if (!buffer.isEmpty()) { + doFlush(buffer); + buffer.clear(); + } + + //无论是否执行了入库,只要时间到了(timeReached)或者数量够了(sizeReached) lastFlushTime = System.currentTimeMillis(); } @@ -77,6 +87,7 @@ public class TdengineBatchConfig { log.error("[{}] Loop异常", processorName, e); } } + // 停机时刷出剩余数据 if (!buffer.isEmpty()) doFlush(buffer); } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index 11fcaff..069ebb9 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -184,7 +184,7 @@ public class BatchDeviceMessageProcessor { processFenceAlarm(handVo, context); // 9. 保存处理后的数据日志 - context.processedLogs.add(createTdengineDataVo(handVo)); + context.processedLogs.add(createTdengineDataVo(handVo,tenantId)); // 10. 记录需要更新到 Redis 的数据 context.redisUpdates.put(sn, handVo); @@ -546,7 +546,7 @@ public class BatchDeviceMessageProcessor { Integer newLevel = alarmRule.getAlarmLevel(); // 首次报警 - if (!isCurrentlyAlarming) { + if (!isCurrentlyAlarming && newLevel != null) { handVo.setFirstValue(handVo.getValue()); handVo.setTAlarmStart(now); handVo.setMaxValue(handVo.getValue()); @@ -642,6 +642,8 @@ public class BatchDeviceMessageProcessor { newAlarm.setTenantId(handVo.getTenantId()); newAlarm.setCreator("system"); newAlarm.setCreateTime(now); + newAlarm.setDeptId(handVo.getDeptId()); + newAlarm.setAlarmLevel(HandAlarmType.ALARM.getType()); context.gasAlarmsToCreate.add(newAlarm); handVo.setBatteryStatus(EnableStatus.ENABLED.value()); @@ -911,9 +913,9 @@ public class BatchDeviceMessageProcessor { /** * 创建处理后的日志对象 */ - private TdengineDataVo createTdengineDataVo(HandDataVo handVo) { + private TdengineDataVo createTdengineDataVo(HandDataVo handVo, Long tenantId) { TdengineDataVo vo = BeanUtils.toBean(handVo, TdengineDataVo.class); - vo.setTenantId(handVo.getTenantId()); + vo.setTenantId(tenantId); vo.setTs(new Timestamp(System.currentTimeMillis())); return vo; } diff --git a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml index 1642688..364466d 100644 --- a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml +++ b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml @@ -27,15 +27,15 @@ INSERT INTO - - device_data_log_${log.sn} + + `device_data_log_${log.sn}` + USING device_data_log - + + TAGS(#{log.sn}, #{log.tenantId}) - - (ts, battery, `value`, longitude, latitude, `name`) - + VALUES (#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name}) diff --git a/cc-admin-master/yudao-server/src/main/resources/application.yaml b/cc-admin-master/yudao-server/src/main/resources/application.yaml index 628b13f..be33b8a 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application.yaml @@ -2,7 +2,7 @@ spring: application: name: gas_mobile profiles: - active: local + active: prod main: allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。