Browse Source

改成英文逗号

master^2
wangwei_123 2 days ago
parent
commit
0a98e1e802
  1. 1
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java
  2. 21
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java
  3. 1
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java
  4. 2
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java
  5. 5
      cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java

1
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaConfig.java

@ -47,7 +47,6 @@ public class KafkaConfig {
return new TopicPartition(record.topic() + ".DLT", 0); return new TopicPartition(record.topic() + ".DLT", 0);
}); });
// 【问题4 FIX】批量场景下重试次数设为 0,失败立即进 DLT
// 避免整批阻塞消费线程,由 DLT 消费者做单条重试 // 避免整批阻塞消费线程,由 DLT 消费者做单条重试
// 如果业务可以接受短暂阻塞,可改为 FixedBackOff(1000L, 1L) 最多重试1次 // 如果业务可以接受短暂阻塞,可改为 FixedBackOff(1000L, 1L) 最多重试1次
DefaultErrorHandler errorHandler = new DefaultErrorHandler( DefaultErrorHandler errorHandler = new DefaultErrorHandler(

21
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/kafka/KafkaMessageConsumer.java

@ -1,38 +1,27 @@
package cn.iocoder.yudao.module.mqtt.kafka; package cn.iocoder.yudao.module.mqtt.kafka;
import cn.iocoder.yudao.module.mqtt.processor.BatchDeviceMessageProcessor; import cn.iocoder.yudao.module.mqtt.processor.BatchDeviceMessageProcessor;
import cn.iocoder.yudao.module.mqtt.processor.DeviceMessageProcessor;
import cn.iocoder.yudao.module.mqtt.processor.HandAlarmMessageProcess;
import cn.iocoder.yudao.module.mqtt.processor.HandAlarmMessageProcessor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Slf4j @Slf4j
@Service // <--- 添加这个注解! @Service // <--- 添加这个注解!
public class KafkaMessageConsumer { public class KafkaMessageConsumer {
private final HandAlarmMessageProcess handAlarmMessageProcess;
private final HandAlarmMessageProcessor handAlarmMessageProcessor;
private final BatchDeviceMessageProcessor batchDeviceMessageProcessor; private final BatchDeviceMessageProcessor batchDeviceMessageProcessor;
public KafkaMessageConsumer(HandAlarmMessageProcess handAlarmMessageProcess,
public KafkaMessageConsumer(HandAlarmMessageProcessor handAlarmMessageProcessor,
BatchDeviceMessageProcessor batchDeviceMessageProcessor) { BatchDeviceMessageProcessor batchDeviceMessageProcessor) {
this.handAlarmMessageProcess = handAlarmMessageProcess;
this.handAlarmMessageProcessor = handAlarmMessageProcessor;
this.batchDeviceMessageProcessor = batchDeviceMessageProcessor; this.batchDeviceMessageProcessor = batchDeviceMessageProcessor;
} }
@ -67,7 +56,7 @@ public class KafkaMessageConsumer {
// 遍历 List,一条条处理 // 遍历 List,一条条处理
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> record : records) {
try { try {
handAlarmMessageProcess.processSingle(record.value());
handAlarmMessageProcessor.processSingle(record.value());
} catch (Exception e) { } catch (Exception e) {
log.error("单条消息处理失败,已跳过。Key: {}, Value: {}", record.key(), record.value(), e); log.error("单条消息处理失败,已跳过。Key: {}, Value: {}", record.key(), record.value(), e);

1
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/OnMessageCallback.java

@ -60,7 +60,6 @@ public class OnMessageCallback implements MqttCallbackExtended {
if (from == null) return; if (from == null) return;
// 3. 转发 Kafka (异步 IO) // 3. 转发 Kafka (异步 IO)
// 配合 application.yml 的 linger.ms,底层会自动批量发送
String payload = new String(message.getPayload()); String payload = new String(message.getPayload());
kafkaTemplate.send(suffix, sn, payload); kafkaTemplate.send(suffix, sn, payload);

2
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java

@ -921,7 +921,7 @@ public class BatchDeviceMessageProcessor {
: String.valueOf(value); : String.valueOf(value);
String statusText = isAlarming ? "报警" : "报警结束"; String statusText = isAlarming ? "报警" : "报警结束";
String msgContent = String.format("%s%s%s气体浓度为%s",
String msgContent = String.format("%s%s,%s气体浓度为%s",
handVo.getUserName(), statusText, gasName, valueStr); handVo.getUserName(), statusText, gasName, valueStr);
try { try {

5
cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcess.java → cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/HandAlarmMessageProcessor.java

@ -20,7 +20,7 @@ import java.util.Map;
@Slf4j @Slf4j
@Component @Component
public class HandAlarmMessageProcess {
public class HandAlarmMessageProcessor {
/** /**
* 问题6 FIX限流速率从配置文件读取不再硬编码 * 问题6 FIX限流速率从配置文件读取不再硬编码
@ -35,7 +35,7 @@ public class HandAlarmMessageProcess {
@Resource @Resource
private TdengineService tdengineService; private TdengineService tdengineService;
public HandAlarmMessageProcess(
public HandAlarmMessageProcessor(
@Value("${mqtt.alarm.rate-limit:3000.0}") double rateLimit) { @Value("${mqtt.alarm.rate-limit:3000.0}") double rateLimit) {
this.mqttRateLimiter = RateLimiter.create(rateLimit); this.mqttRateLimiter = RateLimiter.create(rateLimit);
} }
@ -106,7 +106,6 @@ public class HandAlarmMessageProcess {
mqttRateLimiter.acquire(); mqttRateLimiter.acquire();
try { try {
// 【BUG-2 FIX】JSON 构造移至 try 块内,确保异常不会绕过限流语义
String topic = sn + "/zds_down"; String topic = sn + "/zds_down";
String jsonPayload = JsonUtils.toJsonString(Map.of("message", message)); String jsonPayload = JsonUtils.toJsonString(Map.of("message", message));
mqttClient.publish(topic, jsonPayload); mqttClient.publish(topic, jsonPayload);
Loading…
Cancel
Save