Browse Source

子表无法新增 fix

master
wangwei_123 3 weeks ago
parent
commit
b50acac1d8
  1. 4
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java
  2. 17
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java
  3. 10
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
  4. 12
      cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml
  5. 2
      cc-admin-master/yudao-server/src/main/resources/application.yaml

4
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/BatchProcessorConfig.java

@ -24,7 +24,7 @@ public class BatchProcessorConfig {
"HandLogProcessor", "HandLogProcessor",
list -> tdengineService.saveHandLogBatch(list), list -> tdengineService.saveHandLogBatch(list),
50000, 50000,
1000,
2000,
5000 // 内部每 5秒 执行一次 5000 // 内部每 5秒 执行一次
); );
} }
@ -35,7 +35,7 @@ public class BatchProcessorConfig {
"handLogBatchDataProcessor", "handLogBatchDataProcessor",
list -> tdengineService.saveDataLogBatch(list), list -> tdengineService.saveDataLogBatch(list),
50000, 50000,
1000,
2000,
5000 // 内部每 5秒 执行一次 5000 // 内部每 5秒 执行一次
); );
} }

17
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/config/TdengineBatchConfig.java

@ -51,21 +51,31 @@ public class TdengineBatchConfig<T> {
try { try {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long waitTime = maxWaitTimeMs - (now - lastFlushTime); long waitTime = maxWaitTimeMs - (now - lastFlushTime);
// 保护性逻辑:如果计算出的等待时间<=0,说明已经超时了,设为1ms避免 poll 报错或死等
if (waitTime <= 0) waitTime = 1; if (waitTime <= 0) waitTime = 1;
// 1. 尝试获取数据
T firstItem = dataQueue.poll(waitTime, TimeUnit.MILLISECONDS); T firstItem = dataQueue.poll(waitTime, TimeUnit.MILLISECONDS);
if (firstItem != null) { if (firstItem != null) {
buffer.add(firstItem); buffer.add(firstItem);
// 2. 如果拿到了第一个,尝试把队列里剩下的都捞出来(直到填满 batchSize)
dataQueue.drainTo(buffer, batchSize - buffer.size()); dataQueue.drainTo(buffer, batchSize - buffer.size());
} }
// 3. 判断条件
boolean sizeReached = buffer.size() >= batchSize; boolean sizeReached = buffer.size() >= batchSize;
boolean timeReached = (System.currentTimeMillis() - lastFlushTime) >= maxWaitTimeMs; boolean timeReached = (System.currentTimeMillis() - lastFlushTime) >= maxWaitTimeMs;
if ((sizeReached || timeReached) && !buffer.isEmpty()) {
doFlush(buffer);
buffer.clear();
if (sizeReached || timeReached) {
// 只有当 buffer 有数据时才执行入库
if (!buffer.isEmpty()) {
doFlush(buffer);
buffer.clear();
}
//无论是否执行了入库,只要时间到了(timeReached)或者数量够了(sizeReached)
lastFlushTime = System.currentTimeMillis(); lastFlushTime = System.currentTimeMillis();
} }
@ -77,6 +87,7 @@ public class TdengineBatchConfig<T> {
log.error("[{}] Loop异常", processorName, e); log.error("[{}] Loop异常", processorName, e);
} }
} }
// 停机时刷出剩余数据
if (!buffer.isEmpty()) doFlush(buffer); if (!buffer.isEmpty()) doFlush(buffer);
} }

10
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -184,7 +184,7 @@ public class BatchDeviceMessageProcessor {
processFenceAlarm(handVo, context); processFenceAlarm(handVo, context);
// 9. 保存处理后的数据日志 // 9. 保存处理后的数据日志
context.processedLogs.add(createTdengineDataVo(handVo));
context.processedLogs.add(createTdengineDataVo(handVo,tenantId));
// 10. 记录需要更新到 Redis 的数据 // 10. 记录需要更新到 Redis 的数据
context.redisUpdates.put(sn, handVo); context.redisUpdates.put(sn, handVo);
@ -546,7 +546,7 @@ public class BatchDeviceMessageProcessor {
Integer newLevel = alarmRule.getAlarmLevel(); Integer newLevel = alarmRule.getAlarmLevel();
// 首次报警 // 首次报警
if (!isCurrentlyAlarming) {
if (!isCurrentlyAlarming && newLevel != null) {
handVo.setFirstValue(handVo.getValue()); handVo.setFirstValue(handVo.getValue());
handVo.setTAlarmStart(now); handVo.setTAlarmStart(now);
handVo.setMaxValue(handVo.getValue()); handVo.setMaxValue(handVo.getValue());
@ -642,6 +642,8 @@ public class BatchDeviceMessageProcessor {
newAlarm.setTenantId(handVo.getTenantId()); newAlarm.setTenantId(handVo.getTenantId());
newAlarm.setCreator("system"); newAlarm.setCreator("system");
newAlarm.setCreateTime(now); newAlarm.setCreateTime(now);
newAlarm.setDeptId(handVo.getDeptId());
newAlarm.setAlarmLevel(HandAlarmType.ALARM.getType());
context.gasAlarmsToCreate.add(newAlarm); context.gasAlarmsToCreate.add(newAlarm);
handVo.setBatteryStatus(EnableStatus.ENABLED.value()); handVo.setBatteryStatus(EnableStatus.ENABLED.value());
@ -911,9 +913,9 @@ public class BatchDeviceMessageProcessor {
/** /**
* 创建处理后的日志对象 * 创建处理后的日志对象
*/ */
private TdengineDataVo createTdengineDataVo(HandDataVo handVo) {
private TdengineDataVo createTdengineDataVo(HandDataVo handVo, Long tenantId) {
TdengineDataVo vo = BeanUtils.toBean(handVo, TdengineDataVo.class); TdengineDataVo vo = BeanUtils.toBean(handVo, TdengineDataVo.class);
vo.setTenantId(handVo.getTenantId());
vo.setTenantId(tenantId);
vo.setTs(new Timestamp(System.currentTimeMillis())); vo.setTs(new Timestamp(System.currentTimeMillis()));
return vo; return vo;
} }

12
cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml

@ -27,15 +27,15 @@
<insert id="saveDataLogBatch"> <insert id="saveDataLogBatch">
INSERT INTO INSERT INTO
<foreach collection="dataVoList" item="log" separator=" "> <foreach collection="dataVoList" item="log" separator=" ">
<!-- 1. 动态指定子表名 -->
device_data_log_${log.sn}
<!-- 1. 表名加反引号 -->
`device_data_log_${log.sn}`
<!-- 2. 指定超级表 --> <!-- 2. 指定超级表 -->
USING device_data_log USING device_data_log
<!-- 3. 指定标签 (用于自动建表) -->
<!-- 3. 标签 -->
TAGS(#{log.sn}, #{log.tenantId}) TAGS(#{log.sn}, #{log.tenantId})
<!-- 4. 指定列名 -->
(ts, battery, `value`, longitude, latitude, `name`)
<!-- 5. 插入具体值 -->
VALUES VALUES
(#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name}) (#{log.ts}, #{log.battery}, #{log.value}, #{log.longitude}, #{log.latitude}, #{log.name})
</foreach> </foreach>

2
cc-admin-master/yudao-server/src/main/resources/application.yaml

@ -2,7 +2,7 @@ spring:
application: application:
name: gas_mobile name: gas_mobile
profiles: profiles:
active: local
active: prod
main: main:
allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。 allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。

Loading…
Cancel
Save