|
|
@ -54,6 +54,7 @@ public class BatchDeviceMessageProcessor { |
|
|
private FenceAlarmService fenceAlarmService; |
|
|
private FenceAlarmService fenceAlarmService; |
|
|
@Resource |
|
|
@Resource |
|
|
private KafkaTemplate<String, String> kafkaTemplate; |
|
|
private KafkaTemplate<String, String> kafkaTemplate; |
|
|
|
|
|
private List<ConsumerRecord<String, String>> records; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
@ -93,6 +94,7 @@ public class BatchDeviceMessageProcessor { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private BatchContext prepareBatchContext(List<ConsumerRecord<String, String>> records) { |
|
|
private BatchContext prepareBatchContext(List<ConsumerRecord<String, String>> records) { |
|
|
|
|
|
this.records = records; |
|
|
BatchContext context = new BatchContext(); |
|
|
BatchContext context = new BatchContext(); |
|
|
|
|
|
|
|
|
// 1. 提取所有有效的 SNs
|
|
|
// 1. 提取所有有效的 SNs
|
|
|
@ -109,6 +111,76 @@ public class BatchDeviceMessageProcessor { |
|
|
// 2. 批量获取租户信息
|
|
|
// 2. 批量获取租户信息
|
|
|
context.snToTenantMap = getTenantIdsInBatch(sns); |
|
|
context.snToTenantMap = getTenantIdsInBatch(sns); |
|
|
|
|
|
|
|
|
|
|
|
// ======================= 新增:处理缺失租户/未注册设备的 SN =======================
|
|
|
|
|
|
List<String> missingSns = sns.stream() |
|
|
|
|
|
.filter(sn -> !context.snToTenantMap.containsKey(sn)) |
|
|
|
|
|
.toList(); |
|
|
|
|
|
if (CollectionUtils.isNotEmpty(missingSns)) { |
|
|
|
|
|
// 2.1 查库判断设备是否真的不存在(可能只是 Redis 中丢失了映射)
|
|
|
|
|
|
QueryWrapper<HandDetectorDO> query = new QueryWrapper<>(); |
|
|
|
|
|
query.in("sn", missingSns); |
|
|
|
|
|
List<HandDetectorDO> dbDetectors = handDetectorService.listAll(query); |
|
|
|
|
|
|
|
|
|
|
|
Set<String> existingDbSns = new HashSet<>(); |
|
|
|
|
|
for (HandDetectorDO dbDetector : dbDetectors) { |
|
|
|
|
|
existingDbSns.add(dbDetector.getSn()); |
|
|
|
|
|
if (dbDetector.getTenantId() != null) { |
|
|
|
|
|
context.snToTenantMap.put(dbDetector.getSn(), Long.valueOf(dbDetector.getTenantId())); |
|
|
|
|
|
// 顺便回填 Redis,防止下次继续穿透查库
|
|
|
|
|
|
try { |
|
|
|
|
|
redisUtil.hset(RedisKeyUtil.getDeviceTenantMappingKey(), dbDetector.getSn(), dbDetector.getTenantId()); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.warn("[准备数据] 回填 Redis 租户映射失败,SN: {}", dbDetector.getSn(), e); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 2.2 数据库中确实没有的,直接自动注册,默认租户为 1
|
|
|
|
|
|
List<String> trulyNewSns = missingSns.stream() |
|
|
|
|
|
.filter(sn -> !existingDbSns.contains(sn)) |
|
|
|
|
|
.toList(); |
|
|
|
|
|
|
|
|
|
|
|
if (CollectionUtils.isNotEmpty(trulyNewSns)) { |
|
|
|
|
|
List<HandDetectorDO> newDetectors = new ArrayList<>(); |
|
|
|
|
|
LocalDateTime now = LocalDateTime.now(); |
|
|
|
|
|
for (String sn : trulyNewSns) { |
|
|
|
|
|
HandDetectorDO newDetector = new HandDetectorDO(); |
|
|
|
|
|
newDetector.setSn(sn); |
|
|
|
|
|
newDetector.setTenantId(1); // 默认租户ID设为 1
|
|
|
|
|
|
newDetector.setName(sn); // 默认使用 SN 作为名称
|
|
|
|
|
|
newDetector.setEnableStatus(EnableStatus.ENABLED.value()); // 默认启用
|
|
|
|
|
|
newDetector.setCreator("system"); |
|
|
|
|
|
newDetector.setCreateTime(now); |
|
|
|
|
|
newDetector.setGasTypeId(1L); |
|
|
|
|
|
newDetector.setGasChemical("CO"); |
|
|
|
|
|
newDetectors.add(newDetector); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
|
// 保存到数据库。注意: 这里使用 save 单条保存,如果项目中存在 saveBatch(newDetectors) 则推荐替换成批量保存以提升性能。
|
|
|
|
|
|
|
|
|
|
|
|
handDetectorService.saveList(newDetectors); |
|
|
|
|
|
log.info("[自动注册] 自动保存未知设备 {} 个,默认租户ID为 1", newDetectors.size()); |
|
|
|
|
|
|
|
|
|
|
|
// 将新注册的设备信息回填进上下文并更新 Redis,这样后续步骤就能流畅处理了
|
|
|
|
|
|
for (HandDetectorDO nd : newDetectors) { |
|
|
|
|
|
context.snToTenantMap.put(nd.getSn(), 1L); |
|
|
|
|
|
try { |
|
|
|
|
|
redisUtil.hset(RedisKeyUtil.getDeviceTenantMappingKey(), nd.getSn(), "1"); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.warn("[自动注册] 回填 Redis 租户映射失败,SN: {}", nd.getSn(), e); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.error("[自动注册] 自动注册新设备流程发生异常", e); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
// ======================= 新增逻辑结束 =======================
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 按租户分组 SN
|
|
|
// 3. 按租户分组 SN
|
|
|
Map<Long, List<String>> tenantToSnsMap = sns.stream() |
|
|
Map<Long, List<String>> tenantToSnsMap = sns.stream() |
|
|
.filter(context.snToTenantMap::containsKey) |
|
|
.filter(context.snToTenantMap::containsKey) |
|
|
@ -875,7 +947,6 @@ public class BatchDeviceMessageProcessor { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
* 批量更新 Redis |
|
|
* 批量更新 Redis |
|
|
*/ |
|
|
*/ |
|
|
|