diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java index 8e39084..43f4d87 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java @@ -38,24 +38,37 @@ public class KafkaConfig { } @Bean("batchFactory") - public KafkaListenerContainerFactory batchFactory(ConsumerFactory consumerFactory, - KafkaTemplate kafkaTemplate) { + public KafkaListenerContainerFactory batchFactory( + ConsumerFactory consumerFactory, + KafkaTemplate kafkaTemplate) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); - // 【核心设置】开启批量监听!没有这就还是单条处理 + // 1. 开启批量监听 factory.setBatchListener(true); - // 【并发设置】根据你的分区数设置,比如你有 3 个分区就设为 3 + // 2. 并发数 (按分区数调整) factory.setConcurrency(6); + + // 3. 批量手动 ACK (配合 enable-auto-commit: false) factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); factory.getContainerProperties().setPollTimeout(3000); - // 创建一个 DeadLetterPublishingRecoverer,它知道如何将失败消息发送到 DLT + // 4. 错误处理与死信队列 + // 定义死信发送器 DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, - (record, ex) -> new TopicPartition(KafkaTopicType.DEAD_LETTER_TOPIC.getValue(), record.partition())); + (record, ex) -> { + // 策略:原 Topic + ".DLT" (例如 zds_up -> zds_up.DLT) + // 这样比写死 KafkaTopicType.DEAD_LETTER_TOPIC 更灵活,不同业务互不干扰 + return new TopicPartition(record.topic() + ".DLT", record.partition()); + }); + // 定义回退策略:间隔 1秒,重试 2次 + // 注意:Batch 模式下,如果重试失败,整批数据都会进死信 DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 2L)); + + // 配置给工厂 factory.setCommonErrorHandler(errorHandler); return factory; diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java index da9b533..8633b61 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.mqtt.kafka; import cn.iocoder.yudao.module.mqtt.processor.BatchDeviceMessageProcessor; import cn.iocoder.yudao.module.mqtt.processor.DeviceMessageProcessor; +import cn.iocoder.yudao.module.mqtt.processor.HandAlarmMessageProcess; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Qualifier; @@ -19,20 +20,19 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static cn.iocoder.yudao.module.mqtt.kafka.KafkaTopicType.DEAD_LETTER_TOPIC; @Slf4j @Service // <--- 添加这个注解! public class KafkaMessageConsumer { - private final DeviceMessageProcessor deviceMessageProcessor; + private final HandAlarmMessageProcess handAlarmMessageProcess; private final BatchDeviceMessageProcessor batchDeviceMessageProcessor; - public KafkaMessageConsumer(DeviceMessageProcessor deviceMessageProcessor, + public KafkaMessageConsumer(HandAlarmMessageProcess handAlarmMessageProcess, BatchDeviceMessageProcessor batchDeviceMessageProcessor) { - this.deviceMessageProcessor = deviceMessageProcessor; + this.handAlarmMessageProcess = handAlarmMessageProcess; this.batchDeviceMessageProcessor = batchDeviceMessageProcessor; } @@ -57,6 +57,24 @@ public class KafkaMessageConsumer { throw e; } } + @KafkaListener(topics = "zds_up_alarm", containerFactory = "batchFactory") + public void HandAlarmMessage(List> records) { + if (records.isEmpty()) { + return; + } + log.info(">>> 开始逐条处理 Batch,共 {} 条", records.size()); + + // 遍历 List,一条条处理 + for (ConsumerRecord record : records) { + try { + handAlarmMessageProcess.processSingle(record.value()); + } catch (Exception e) { + log.error("单条消息处理失败,已跳过。Key: {}, Value: {}", record.key(), record.value(), e); + } + } + + log.info("Batch 处理完毕"); + } } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaTopicType.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaTopicType.java index f7964f8..0d44f06 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaTopicType.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaTopicType.java @@ -9,34 +9,25 @@ import java.util.stream.Collectors; /** * Kafka 主题枚举类,用于统一管理和引用 Topic 名称 */ -@Getter // Lombok 注解,自动为所有字段生成 public 的 getter 方法,例如 getValue() +@Getter public enum KafkaTopicType { - // 1. 常量名和值的含义保持一致,更清晰 DEVICE_STATUS_UP("zds_up"), - DEAD_LETTER_TOPIC("zds_up_dlt"); + ALARM_TOPIC("zds_up_alarm"); + - // 2. 添加了 final 字段 private final String value; - // 3. 【核心修复】添加了私有构造函数,用于初始化 final 字段 KafkaTopicType(String value) { this.value = value; } - // --- 以下是更高效和健壮的 from 方法实现 --- // 4. 使用静态 Map 缓存,提高查找性能,避免每次调用都遍历 private static final Map LOOKUP_MAP = Arrays.stream(values()) .collect(Collectors.toMap(KafkaTopicType::getValue, Function.identity())); - /** - * 根据字符串值查找对应的枚举常量. - * - * @param value topic 字符串值 - * @return 对应的枚举常量 - * @throws IllegalArgumentException 如果找不到匹配的常量 - */ + public static KafkaTopicType from(String value) { return LOOKUP_MAP.get(value); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java index 448de26..5bea96b 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java @@ -9,6 +9,7 @@ import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.core.task.TaskExecutor; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -19,12 +20,14 @@ import java.util.regex.Pattern; @Slf4j @Component public class OnMessageCallback implements MqttCallbackExtended { - private final Pattern topicPattern = Pattern.compile("([^/]+)/?(.+)?"); @Resource private KafkaTemplate kafkaTemplate; @Resource private ApplicationEventPublisher eventPublisher; + @Resource(name = "mqttExecutor") + private TaskExecutor kafkaForwardExecutor; + @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("MQTT 连接成功。是否为重连: {}, 服务器 URI: {}", reconnect, serverURI); @@ -38,29 +41,34 @@ public class OnMessageCallback implements MqttCallbackExtended { } @Override - public void messageArrived(String MqttTopic, MqttMessage message) { - String payload = new String(message.getPayload()); - List groups = ReUtil.getAllGroups(topicPattern, MqttTopic, false); + public void messageArrived(String mqttTopic, MqttMessage message) { - if (groups.get(1) == null) { - log.warn("无法从topic {} 中获取消息发送者ID", MqttTopic); - return; - } - String sn = groups.get(0); - String topic = groups.get(1); - KafkaTopicType from = KafkaTopicType.from(groups.get(1)); - if (null == from) { - log.warn("发送的topic无效{}", topic); - return; - } + // 【关键】使用注入的线程池异步处理 + kafkaForwardExecutor.execute(() -> processAndSend(mqttTopic, message)); + + } + private void processAndSend(String mqttTopic, MqttMessage message) { try { - kafkaTemplate.send(topic, sn, payload); - log.debug("成功将消息转发到 Kafka Topic [{}]: payload=[{}]", topic, payload); + int slashIndex = mqttTopic.indexOf('/'); + if (slashIndex == -1) return; + + String sn = mqttTopic.substring(0, slashIndex); + String suffix = mqttTopic.substring(slashIndex + 1); + + // 2. 校验 Topic + KafkaTopicType from = KafkaTopicType.from(suffix); + if (from == null) return; + + // 3. 转发 Kafka (异步 IO) + // 配合 application.yml 的 linger.ms,底层会自动批量发送 + String payload = new String(message.getPayload()); + kafkaTemplate.send(suffix, sn, payload); + } catch (Exception e) { - log.error("转发消息到 Kafka Topic [{}] 时发生错误!", topic, e); + // 生产环境建议降低日志级别或采样打印,防止日志把磁盘打满 + log.error("转发异常 Topic: {}", mqttTopic, e); } } - @Override public void deliveryComplete(IMqttDeliveryToken token) { // no-op diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java index 99936b1..b13b3ac 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java @@ -15,8 +15,8 @@ public class ThreadPoolConfig { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数:CPU核心数 executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); - // 最大线程数:CPU核心数 * 2 - executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); + // 最大线程数:CPU核心数 * 4 + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); // 队列大小 executor.setQueueCapacity(2000); // 线程名称前缀 @@ -31,8 +31,10 @@ public class ThreadPoolConfig { @Bean("mqttAlarmExecutor") public TaskExecutor mqttAlarmExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(5); - executor.setMaxPoolSize(10); + // 核心线程数:CPU核心数 + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + // 最大线程数:CPU核心数 * 2 + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); executor.setQueueCapacity(10000); // 队列大一点,应对瞬间并发 executor.setThreadNamePrefix("alarm-push-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index 705f883..8202beb 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.mqtt.processor; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.common.util.object.BeanUtils; import cn.iocoder.yudao.module.hand.dal.*; import cn.iocoder.yudao.module.hand.enums.*; @@ -9,6 +10,7 @@ import cn.iocoder.yudao.module.hand.service.*; import cn.iocoder.yudao.module.hand.util.*; import cn.iocoder.yudao.module.hand.vo.*; import cn.iocoder.yudao.module.mqtt.config.TdengineBatchConfig; +import cn.iocoder.yudao.module.mqtt.kafka.KafkaTopicType; import cn.iocoder.yudao.module.mqtt.mqtt.Client; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; @@ -20,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.core.task.TaskExecutor; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.sql.Timestamp; @@ -61,11 +64,7 @@ public class BatchDeviceMessageProcessor { @Resource private FenceAlarmService fenceAlarmService; @Resource - private Client client; - @Resource(name = "mqttAlarmExecutor") - private TaskExecutor alarmExecutor; - - private final RateLimiter mqttRateLimiter = RateLimiter.create(50.0); + private KafkaTemplate kafkaTemplate; /** @@ -445,10 +444,17 @@ public class BatchDeviceMessageProcessor { */ private HandDataVo parseAndConvertData(String sn, String payload, HandDataVo device) { try { + // 1. 【防御性编程】处理双重序列化问题 + // 如果 payload 首尾有引号,说明是 "{\"a\":1}" 格式,需要先解开一层 + if (payload.startsWith("\"") && payload.endsWith("\"")) { + // 去掉首尾引号,并把转义字符 \" 变成 " + payload = payload.substring(1, payload.length() - 1).replace("\\\"", "\""); + } + JSONObject json = JSONUtil.parseObj(payload); - Double gasValue = json.getDouble("gas", null); - String loc = json.getStr("loc"); + Double gasValue = json.getDouble("gas"); + String loc = json.getStr("loc"); // Hutool 会把数组转成字符串 "[115, 39, 0]" String battery = json.getStr("battery"); device.setValue(gasValue); @@ -459,21 +465,29 @@ public class BatchDeviceMessageProcessor { // 解析位置信息 if (StringUtils.isNotBlank(loc)) { - String coords = loc.substring(1, loc.length() - 1); + // 去掉首尾的中括号 + String coords = loc.replace("[", "").replace("]", ""); String[] parts = coords.split(","); - if (parts.length == 3) { + if (parts.length >= 2) { + // 2. 【关键修正】使用 trim() 去除空格 + // Double.parseDouble 实际上可以处理空格,但 trim 一下更安全 Map converted = CoordinateTransferUtils.wgs84ToGcj02( - Double.parseDouble(parts[0]), - Double.parseDouble(parts[1]) + Double.parseDouble(parts[0].trim()), + Double.parseDouble(parts[1].trim()) ); device.setLongitude(converted.get("lon")); device.setLatitude(converted.get("lat")); - device.setGpsType(Integer.parseInt(parts[2])); + + if (parts.length >= 3) { + // 3. 【关键修正】Integer.parseInt 必须 trim(),否则 " 0" 会报错 + device.setGpsType(Integer.parseInt(parts[2].trim())); + } } } } catch (Exception e) { + // 打印完整的 payload 以便排查 log.error("[数据转换] 解析失败,SN: {}, payload: {}", sn, payload, e); } @@ -838,63 +852,37 @@ public class BatchDeviceMessageProcessor { handVo.getName(), statusText, gasName, valueStr); // 记录报警消息日志 - AlarmMessageLog log = new AlarmMessageLog(); - log.setDetectorId(handVo.getId()); - log.setHolderName(handVo.getName()); - log.setSn(handVo.getSn()); - log.setDeptId(handVo.getDeptId()); - log.setTenantId(handVo.getTenantId()); - log.setMessage(msgContent); - log.setRemark("系统自动触发报警推送"); + AlarmMessageLog alarmMessageLog = new AlarmMessageLog(); + alarmMessageLog.setDetectorId(handVo.getId()); + alarmMessageLog.setHolderName(handVo.getName()); + alarmMessageLog.setSn(handVo.getSn()); + alarmMessageLog.setDeptId(handVo.getDeptId()); + alarmMessageLog.setTenantId(handVo.getTenantId()); + alarmMessageLog.setMessage(msgContent); + alarmMessageLog.setRemark("系统自动触发报警推送"); try { - List targetSns = handDetectorService.getSnListByDept( + AlarmDispatchEvent event = new AlarmDispatchEvent( handVo.getDeptId(), handVo.getTenantId(), - handVo.getSn() - ); + handVo.getSn(), + msgContent + ); - if (targetSns != null && !targetSns.isEmpty()) { - log.setPushSnList(String.join(",", targetSns)); - // 异步推送 MQTT 消息 - publishAlarmToMqtt(targetSns, msgContent); - } + // 异步发送 Kafka,耗时极短 (<1ms) + // 使用 sourceSn 作为 Key,保证同一个部门/设备的报警排队有序处理 + String key = handVo.getSn(); + String json = JsonUtils.toJsonString(event); + kafkaTemplate.send(KafkaTopicType.ALARM_TOPIC.getValue(), key, json); } catch (Exception e) { - this.log.error("[报警推送] 准备推送数据失败", e); + log.error("[报警推送] 准备推送数据失败", e); } - context.alarmMessageLogs.add(log); + context.alarmMessageLogs.add(alarmMessageLog); } - /** - * 异步推送 MQTT 报警消息 - */ - private void publishAlarmToMqtt(List targetSns, String message) { - alarmExecutor.execute(() -> { - Map payload = Map.of("message", message); - try { - String json = OBJECT_MAPPER.writeValueAsString(payload); - - for (String sn : targetSns) { - if (StringUtils.isBlank(sn)) continue; - - // 限流 - mqttRateLimiter.acquire(); - - try { - String topic = sn + "/zds_down"; - client.publish(topic, json); - } catch (Exception e) { - log.error("[MQTT推送] 失败,SN: {}", sn, e); - } - } - } catch (JsonProcessingException e) { - log.error("[MQTT推送] JSON序列化失败", e); - } - }); - } /** * 批量更新 Redis diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java new file mode 100644 index 0000000..6fea5b9 --- /dev/null +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java @@ -0,0 +1,82 @@ +package cn.iocoder.yudao.module.mqtt.processor; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.hand.service.HandDetectorService; +import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent; +import cn.iocoder.yudao.module.mqtt.mqtt.Client; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.util.concurrent.RateLimiter; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +@Slf4j +@Component +public class HandAlarmMessageProcess { + + @Resource + private Client mqttClient; + @Resource + private HandDetectorService handDetectorService; + + private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0); + + public void processSingle(String jsonValue) { + // 1. 解析 Kafka 消息 + AlarmDispatchEvent event = JsonUtils.parseObject(jsonValue, AlarmDispatchEvent.class); + if (event == null) { + log.warn("[报警推送] 消息解析为空,忽略: {}", jsonValue); + return; + } + + try { + // 2. 【查库】根据部门/租户/源设备SN,查询需要推送的目标设备列表 + List targetSns = handDetectorService.getSnListByDept( + event.getDeptId(), + event.getTenantId(), + event.getSourceSn() + ); + + if (targetSns == null || targetSns.isEmpty()) { + log.info("[报警推送] 无需推送,目标列表为空. SourceSN: {}", event.getSourceSn()); + return; + } + // 3. 执行推送逻辑 + this.publishAlarmToMqtt(targetSns, event.getMsgContent()); + + } catch (Exception e) { + log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e); + } + } + + /** + * 执行 MQTT 推送 (包含限流) + */ + private void publishAlarmToMqtt(List targetSns, String message) { + if (message != null) { + return; + } + // 构造 MQTT 消息体 + Map payload = Map.of("message", message); + String jsonPayload = JsonUtils.toJsonString(payload); + + for (String sn : targetSns) { + if (StringUtils.isBlank(sn)) continue; + + + mqttRateLimiter.acquire(); + try { + String topic = sn + "/zds_down"; + // 发送 + mqttClient.publish(topic, jsonPayload); + + } catch (Exception e) { + // 5. 【隔离】单个设备推送失败,不要影响列表里的下一个设备 + log.error("[MQTT推送] 单个发送失败,SN: {}", sn, e); + } + } + } +} diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/FenceAlarmDO.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/FenceAlarmDO.java index 599502b..0d4a2ce 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/FenceAlarmDO.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/dal/FenceAlarmDO.java @@ -84,7 +84,7 @@ public class FenceAlarmDO extends BaseDO { /** * 租户id */ - private Integer tenantId; + private Long tenantId; /** * 部门id diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java index eb17e72..de483d9 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/FenceAlarmService.java @@ -64,7 +64,7 @@ public interface FenceAlarmService { void update( FenceAlarmDO updateReqVO); - void batchCreateFenceAlarm(List fenceAlarmsToCreate); + void batchCreateFenceAlarm(List fenceAlarmsToCreate); void batchUpdateById(List fenceAlarmsToUpdate); diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java index e8f77b5..8afdcc8 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/HandAlarmService.java @@ -64,7 +64,7 @@ public interface HandAlarmService { void insertBatch(List doList); - void batchCreateHandAlarm(List gasAlarmsToCreate); + void batchCreateHandAlarm(List gasAlarmsToCreate); void batchUpdateById(List gasAlarmsToUpdate); } \ No newline at end of file diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java index 4a35d5d..79e2c07 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/FenceAlarmServiceImpl.java @@ -94,10 +94,9 @@ public class FenceAlarmServiceImpl implements FenceAlarmService { } @Override - public void batchCreateFenceAlarm(List fenceAlarmsToCreate) { + public void batchCreateFenceAlarm(List fenceAlarmsToCreate) { - List fenceAlarmDOs = BeanUtils.toBean(fenceAlarmsToCreate, FenceAlarmDO.class); - fenceAlarmMapper.insertBatch(fenceAlarmDOs); + fenceAlarmMapper.insertBatch(fenceAlarmsToCreate); } @Override diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java index 9ab539d..e2c24f9 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/service/impl/HandAlarmServiceImpl.java @@ -99,10 +99,9 @@ public class HandAlarmServiceImpl implements HandAlarmService { @Override @Transactional(rollbackFor = Exception.class) @TenantIgnore - public void batchCreateHandAlarm(List gasAlarmsToCreate) { + public void batchCreateHandAlarm(List gasAlarmsToCreate) { - List bean = BeanUtils.toBean(gasAlarmsToCreate, HandAlarmDO.class); - handAlarmMapper.insertBatch(bean); + handAlarmMapper.insertBatch(gasAlarmsToCreate); } @Override diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java new file mode 100644 index 0000000..3cd7f7b --- /dev/null +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/AlarmDispatchEvent.java @@ -0,0 +1,17 @@ +package cn.iocoder.yudao.module.hand.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class AlarmDispatchEvent { + private Long deptId; + private Long tenantId; + private String sourceSn; + + // 消息内容 + private String msgContent; +} diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java index 91a7c6b..31c6506 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandDataVo.java @@ -112,4 +112,5 @@ public class HandDataVo { @Schema(description = "最后推送数据") private Double lastPushValue; + } diff --git a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java index 36a5812..7b0eefd 100644 --- a/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java +++ b/cc-admin-master/yudao-module-hand/src/main/java/cn/iocoder/yudao/module/hand/vo/HandTdenginePageVO.java @@ -27,5 +27,6 @@ public class HandTdenginePageVO extends PageParam { private Long tenantId; + private Long deptId; } diff --git a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml index 6169ef9..e85e79c 100644 --- a/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml +++ b/cc-admin-master/yudao-module-hand/src/main/resources/mapper/TdengineMapper.xml @@ -162,14 +162,17 @@ AND tenant_id = #{vo.tenantId} - AND ts >= #{po.startTime} + AND ts >= #{vo.startTime} - AND ts <= #{po.endTime} + AND ts <= #{vo.endTime} + + + AND dept_id; = #{vo.deptId} - AND holder_name LIKE #{holderNamePattern} + AND holder_name LIKE #{vo.holderNamePattern} diff --git a/cc-admin-master/yudao-server/src/main/resources/application.yaml b/cc-admin-master/yudao-server/src/main/resources/application.yaml index ed0c593..be33b8a 100644 --- a/cc-admin-master/yudao-server/src/main/resources/application.yaml +++ b/cc-admin-master/yudao-server/src/main/resources/application.yaml @@ -125,25 +125,31 @@ rocketmq: group: ${spring.application.name}_PRODUCER # 生产者分组 spring: - # Kafka 配置项,对应 KafkaProperties 配置类 kafka: - # Kafka Producer 配置项 producer: - acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。 - retries: 3 # 发送失败时,重试发送的次数 - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化 - # Kafka Consumer 配置项 - consumer: - auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解 - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer - max-poll-records: 300 - fetch-max-wait: 3000 # 等待拉取消息的最大时间(毫秒) + acks: 1 + retries: 3 + batch-size: 16384 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: - spring.json.trusted.packages: '*' - group-id: consumer-${spring.application.name} # 消费者分组 - # Kafka Consumer Listener 监听器配置 + linger.ms: 10 + buffer.memory: 33554432 + consumer: + enable-auto-commit: false + auto-offset-reset: earliest + max-poll-records: 1000 + fetch-max-wait: 3000 + + # 【修正3】改为 StringDeserializer + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + + group-id: consumer-${spring.application.name} + listener: - missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 + missing-topics-fatal: false + --- #################### 芋道相关配置 #################### yudao: