From 92f88ad4cdfab9ade03d163979d551fa34083781 Mon Sep 17 00:00:00 2001
From: wangwei_123 <1255324804@qq.com>
Date: Fri, 26 Dec 2025 17:10:23 +0800
Subject: [PATCH 1/2] =?UTF-8?q?=E6=89=8B=E6=8C=81=E8=A1=A8=E6=89=B9?=
=?UTF-8?q?=E5=A4=84=E7=90=86?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../module/mqtt/processor/BatchDeviceMessageProcessor.java | 10 ----------
.../src/main/resources/mapper/TdengineMapper.xml | 2 +-
.../yudao-server/src/main/resources/application.yaml | 2 +-
3 files changed, 2 insertions(+), 12 deletions(-)
diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
index 7750784..11fcaff 100644
--- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
+++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
@@ -31,20 +31,10 @@ import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
-/**
- * 批量设备消息处理器
- *
- * 核心优化:
- * 1. 批量获取基础数据(租户信息、设备信息、报警规则)
- * 2. 内存中完成所有业务逻辑计算
- * 3. 批量执行所有数据库写操作
- */
@Slf4j
@Component
public class BatchDeviceMessageProcessor {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
@Resource
private RedisUtil redisUtil;
@Resource
diff --git a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
index e85e79c..1642688 100644
--- a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
+++ b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
@@ -168,7 +168,7 @@
AND ts <= #{vo.endTime}
- AND dept_id; = #{vo.deptId}
+ AND dept_id = #{vo.deptId}
diff --git a/cc-admin-master/yudao-server/src/main/resources/application.yaml b/cc-admin-master/yudao-server/src/main/resources/application.yaml
index be33b8a..628b13f 100644
--- a/cc-admin-master/yudao-server/src/main/resources/application.yaml
+++ b/cc-admin-master/yudao-server/src/main/resources/application.yaml
@@ -2,7 +2,7 @@ spring:
application:
name: gas_mobile
profiles:
- active: prod
+ active: local
main:
allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。
From b50acac1d84b3bd75c76afd75a74d7298dec8d04 Mon Sep 17 00:00:00 2001
From: wangwei_123 <1255324804@qq.com>
Date: Tue, 13 Jan 2026 14:26:24 +0800
Subject: [PATCH 2/2] =?UTF-8?q?=E5=AD=90=E8=A1=A8=E6=97=A0=E6=B3=95?=
=?UTF-8?q?=E6=96=B0=E5=A2=9E=20fix?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../yudao/module/mqtt/config/BatchProcessorConfig.java | 4 ++--
.../yudao/module/mqtt/config/TdengineBatchConfig.java | 17 ++++++++++++++---
.../mqtt/processor/BatchDeviceMessageProcessor.java | 10 ++++++----
.../src/main/resources/mapper/TdengineMapper.xml | 12 ++++++------
.../yudao-server/src/main/resources/application.yaml | 2 +-
5 files changed, 29 insertions(+), 16 deletions(-)
diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java
index 7cba2c9..ae414cd 100644
--- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java
+++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java
@@ -24,7 +24,7 @@ public class BatchProcessorConfig {
"HandLogProcessor",
list -> tdengineService.saveHandLogBatch(list),
50000,
- 1000,
+ 2000,
5000 // 内部每 5秒 执行一次
);
}
@@ -35,7 +35,7 @@ public class BatchProcessorConfig {
"handLogBatchDataProcessor",
list -> tdengineService.saveDataLogBatch(list),
50000,
- 1000,
+ 2000,
5000 // 内部每 5秒 执行一次
);
}
diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java
index 0a9bc43..25400aa 100644
--- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java
+++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java
@@ -51,21 +51,31 @@ public class TdengineBatchConfig {
try {
long now = System.currentTimeMillis();
long waitTime = maxWaitTimeMs - (now - lastFlushTime);
+
+ // 保护性逻辑:如果计算出的等待时间<=0,说明已经超时了,设为1ms避免 poll 报错或死等
if (waitTime <= 0) waitTime = 1;
+ // 1. 尝试获取数据
T firstItem = dataQueue.poll(waitTime, TimeUnit.MILLISECONDS);
if (firstItem != null) {
buffer.add(firstItem);
+ // 2. 如果拿到了第一个,尝试把队列里剩下的都捞出来(直到填满 batchSize)
dataQueue.drainTo(buffer, batchSize - buffer.size());
}
+ // 3. 判断条件
boolean sizeReached = buffer.size() >= batchSize;
boolean timeReached = (System.currentTimeMillis() - lastFlushTime) >= maxWaitTimeMs;
- if ((sizeReached || timeReached) && !buffer.isEmpty()) {
- doFlush(buffer);
- buffer.clear();
+ if (sizeReached || timeReached) {
+ // 只有当 buffer 有数据时才执行入库
+ if (!buffer.isEmpty()) {
+ doFlush(buffer);
+ buffer.clear();
+ }
+
+ //无论是否执行了入库,只要时间到了(timeReached)或者数量够了(sizeReached)
lastFlushTime = System.currentTimeMillis();
}
@@ -77,6 +87,7 @@ public class TdengineBatchConfig {
log.error("[{}] Loop异常", processorName, e);
}
}
+ // 停机时刷出剩余数据
if (!buffer.isEmpty()) doFlush(buffer);
}
diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
index 11fcaff..069ebb9 100644
--- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
+++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
@@ -184,7 +184,7 @@ public class BatchDeviceMessageProcessor {
processFenceAlarm(handVo, context);
// 9. 保存处理后的数据日志
- context.processedLogs.add(createTdengineDataVo(handVo));
+ context.processedLogs.add(createTdengineDataVo(handVo,tenantId));
// 10. 记录需要更新到 Redis 的数据
context.redisUpdates.put(sn, handVo);
@@ -546,7 +546,7 @@ public class BatchDeviceMessageProcessor {
Integer newLevel = alarmRule.getAlarmLevel();
// 首次报警
- if (!isCurrentlyAlarming) {
+ if (!isCurrentlyAlarming && newLevel != null) {
handVo.setFirstValue(handVo.getValue());
handVo.setTAlarmStart(now);
handVo.setMaxValue(handVo.getValue());
@@ -642,6 +642,8 @@ public class BatchDeviceMessageProcessor {
newAlarm.setTenantId(handVo.getTenantId());
newAlarm.setCreator("system");
newAlarm.setCreateTime(now);
+ newAlarm.setDeptId(handVo.getDeptId());
+ newAlarm.setAlarmLevel(HandAlarmType.ALARM.getType());
context.gasAlarmsToCreate.add(newAlarm);
handVo.setBatteryStatus(EnableStatus.ENABLED.value());
@@ -911,9 +913,9 @@ public class BatchDeviceMessageProcessor {
/**
* 创建处理后的日志对象
*/
- private TdengineDataVo createTdengineDataVo(HandDataVo handVo) {
+ private TdengineDataVo createTdengineDataVo(HandDataVo handVo, Long tenantId) {
TdengineDataVo vo = BeanUtils.toBean(handVo, TdengineDataVo.class);
- vo.setTenantId(handVo.getTenantId());
+ vo.setTenantId(tenantId);
vo.setTs(new Timestamp(System.currentTimeMillis()));
return vo;
}
diff --git a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
index 1642688..364466d 100644
--- a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
+++ b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
@@ -27,15 +27,15 @@
INSERT INTO
-
- device_data_log_${log.sn}
+
+ `device_data_log_${log.sn}`
+
USING device_data_log
-
+
+
TAGS(#{log.sn}, #{log.tenantId})
-
- (ts, battery, `value`, longitude, latitude, `name`)
-
+
VALUES
(#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name})
diff --git a/cc-admin-master/yudao-server/src/main/resources/application.yaml b/cc-admin-master/yudao-server/src/main/resources/application.yaml
index 628b13f..be33b8a 100644
--- a/cc-admin-master/yudao-server/src/main/resources/application.yaml
+++ b/cc-admin-master/yudao-server/src/main/resources/application.yaml
@@ -2,7 +2,7 @@ spring:
application:
name: gas_mobile
profiles:
- active: local
+ active: prod
main:
allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。