diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java index 5d236b7..5c52662 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/Client.java @@ -20,23 +20,7 @@ import java.util.concurrent.BlockingQueue; @Slf4j @Component public class Client { - private final OnMessageCallback onMessageCallback; // 1. 注入我们新的 Kafka 生产者回调 - // 【修改点2】定义全局通用的发送回调,避免每次发送都创建对象,减少GC - private final IMqttActionListener globalPublishCallback = new IMqttActionListener() { - @Override - public void onSuccess(IMqttToken asyncActionToken) { - // 高并发下,成功通常不打印日志,否则磁盘IO会爆炸 - // log.debug("发送成功: {}", asyncActionToken.getTopics()); - } - @Override - public void onFailure(IMqttToken asyncActionToken, Throwable exception) { - // 只有失败才打印日志 - log.error("MQTT异步发送失败, Topic: {}", - asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null", - exception); - } - }; // --- 1. 配置注入 (保持不变) --- @Value("${mqtt.enable:false}") private Boolean enable; @@ -63,6 +47,24 @@ public class Client { //使用异步客户端接口 private IMqttAsyncClient mqttClient; + private final OnMessageCallback onMessageCallback; // 1. 注入我们新的 Kafka 生产者回调 + private final IMqttActionListener globalPublishCallback = new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + + // log.debug("发送成功: {}", asyncActionToken.getTopics()); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + // 只有失败才打印日志 + log.error("MQTT异步发送失败, Topic: {}", + asyncActionToken.getTopics() != null ? asyncActionToken.getTopics()[0] : "null", + exception); + } + }; + + public Client(OnMessageCallback onMessageCallback) { this.onMessageCallback = onMessageCallback; } @@ -170,7 +172,6 @@ public class Client { public void publishAsync(String topic, byte[] payload, int qos, boolean retained) { if (mqttClient == null || !mqttClient.isConnected()) { // 降级:只打印日志,不抛异常,避免中断业务循环 - // 在高并发场景下,这里可以考虑加一个计数器,每N次打印一次日志 log.warn("MQTT未连接,丢弃消息: {}", topic); return; } diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java index b13b3ac..7398fb6 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/mqtt/ThreadPoolConfig.java @@ -14,11 +14,11 @@ public class ThreadPoolConfig { public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数:CPU核心数 - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); // 最大线程数:CPU核心数 * 4 executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); // 队列大小 - executor.setQueueCapacity(2000); + executor.setQueueCapacity(50000); // 线程名称前缀 executor.setThreadNamePrefix("mqtt-executor-"); // 拒绝策略:由调用者线程处理 diff --git a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java index 8202beb..7750784 100644 --- a/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java +++ b/cc-admin-master/yudao-module-hand-mqtt/src/main/java/cn/iocoder/yudao/module/mqtt/processor/BatchDeviceMessageProcessor.java @@ -712,7 +712,7 @@ public class BatchDeviceMessageProcessor { } /** - * 处理围栏报警生命周期 + * 处理围栏报警生命周期· */ private void handleFenceAlarmLifecycle(boolean isViolating, HandDataVo handVo, FenceType fenceType, List fenceList, BatchContext context) {