xh 1 week ago
parent
commit
f5d0920ac3
  1. 4
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java
  2. 13
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java
  3. 20
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
  4. 14
      cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml

4
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java

@ -24,7 +24,7 @@ public class BatchProcessorConfig {
"HandLogProcessor",
list -> tdengineService.saveHandLogBatch(list),
50000,
1000,
2000,
5000 // 内部每 5秒 执行一次
);
}
@ -35,7 +35,7 @@ public class BatchProcessorConfig {
"handLogBatchDataProcessor",
list -> tdengineService.saveDataLogBatch(list),
50000,
1000,
2000,
5000 // 内部每 5秒 执行一次
);
}

13
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java

@ -51,21 +51,31 @@ public class TdengineBatchConfig<T> {
try {
long now = System.currentTimeMillis();
long waitTime = maxWaitTimeMs - (now - lastFlushTime);
// 保护性逻辑:如果计算出的等待时间<=0,说明已经超时了,设为1ms避免 poll 报错或死等
if (waitTime <= 0) waitTime = 1;
// 1. 尝试获取数据
T firstItem = dataQueue.poll(waitTime, TimeUnit.MILLISECONDS);
if (firstItem != null) {
buffer.add(firstItem);
// 2. 如果拿到了第一个,尝试把队列里剩下的都捞出来(直到填满 batchSize)
dataQueue.drainTo(buffer, batchSize - buffer.size());
}
// 3. 判断条件
boolean sizeReached = buffer.size() >= batchSize;
boolean timeReached = (System.currentTimeMillis() - lastFlushTime) >= maxWaitTimeMs;
if ((sizeReached || timeReached) && !buffer.isEmpty()) {
if (sizeReached || timeReached) {
// 只有当 buffer 有数据时才执行入库
if (!buffer.isEmpty()) {
doFlush(buffer);
buffer.clear();
}
//无论是否执行了入库,只要时间到了(timeReached)或者数量够了(sizeReached)
lastFlushTime = System.currentTimeMillis();
}
@ -77,6 +87,7 @@ public class TdengineBatchConfig<T> {
log.error("[{}] Loop异常", processorName, e);
}
}
// 停机时刷出剩余数据
if (!buffer.isEmpty()) doFlush(buffer);
}

20
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -31,20 +31,10 @@ import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 批量设备消息处理器
* <p>
* 核心优化
* 1. 批量获取基础数据租户信息设备信息报警规则
* 2. 内存中完成所有业务逻辑计算
* 3. 批量执行所有数据库写操作
*/
@Slf4j
@Component
public class BatchDeviceMessageProcessor {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Resource
private RedisUtil redisUtil;
@Resource
@ -194,7 +184,7 @@ public class BatchDeviceMessageProcessor {
processFenceAlarm(handVo, context);
// 9. 保存处理后的数据日志
context.processedLogs.add(createTdengineDataVo(handVo));
context.processedLogs.add(createTdengineDataVo(handVo,tenantId));
// 10. 记录需要更新到 Redis 的数据
context.redisUpdates.put(sn, handVo);
@ -556,7 +546,7 @@ public class BatchDeviceMessageProcessor {
Integer newLevel = alarmRule.getAlarmLevel();
// 首次报警
if (!isCurrentlyAlarming) {
if (!isCurrentlyAlarming && newLevel != null) {
handVo.setFirstValue(handVo.getValue());
handVo.setTAlarmStart(now);
handVo.setMaxValue(handVo.getValue());
@ -652,6 +642,8 @@ public class BatchDeviceMessageProcessor {
newAlarm.setTenantId(handVo.getTenantId());
newAlarm.setCreator("system");
newAlarm.setCreateTime(now);
newAlarm.setDeptId(handVo.getDeptId());
newAlarm.setAlarmLevel(HandAlarmType.ALARM.getType());
context.gasAlarmsToCreate.add(newAlarm);
handVo.setBatteryStatus(EnableStatus.ENABLED.value());
@ -921,9 +913,9 @@ public class BatchDeviceMessageProcessor {
/**
* 创建处理后的日志对象
*/
private TdengineDataVo createTdengineDataVo(HandDataVo handVo) {
private TdengineDataVo createTdengineDataVo(HandDataVo handVo, Long tenantId) {
TdengineDataVo vo = BeanUtils.toBean(handVo, TdengineDataVo.class);
vo.setTenantId(handVo.getTenantId());
vo.setTenantId(tenantId);
vo.setTs(new Timestamp(System.currentTimeMillis()));
return vo;
}

14
cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml

@ -27,15 +27,15 @@
<insert id="saveDataLogBatch">
INSERT INTO
<foreach collection="dataVoList" item="log" separator=" ">
<!-- 1. 动态指定子表名 -->
device_data_log_${log.sn}
<!-- 1. 表名加反引号 -->
`device_data_log_${log.sn}`
<!-- 2. 指定超级表 -->
USING device_data_log
<!-- 3. 指定标签 (用于自动建表) -->
<!-- 3. 标签 -->
TAGS(#{log.sn}, #{log.tenantId})
<!-- 4. 指定列名 -->
(ts, battery, `value`, longitude, latitude, `name`)
<!-- 5. 插入具体值 -->
VALUES
(#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name})
</foreach>
@ -168,7 +168,7 @@
AND ts &lt;= #{vo.endTime}
</if>
<if test="vo.deptId != null">
AND dept_id; = #{vo.deptId}
AND dept_id = #{vo.deptId}
</if>
<if test="vo.holderName != null and vo.holderName != ''">
<bind name="holderNamePattern" value="'%' + vo.holderName + '%'" />

Loading…
Cancel
Save