18 changed files with 279 additions and 150 deletions
@ -0,0 +1,82 @@ |
|||||
|
package cn.iocoder.yudao.module.mqtt.processor; |
||||
|
|
||||
|
import cn.iocoder.yudao.framework.common.util.json.JsonUtils; |
||||
|
import cn.iocoder.yudao.module.hand.service.HandDetectorService; |
||||
|
import cn.iocoder.yudao.module.hand.vo.AlarmDispatchEvent; |
||||
|
import cn.iocoder.yudao.module.mqtt.mqtt.Client; |
||||
|
import com.fasterxml.jackson.core.JsonProcessingException; |
||||
|
import com.google.common.util.concurrent.RateLimiter; |
||||
|
import jakarta.annotation.Resource; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.apache.commons.lang3.StringUtils; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class HandAlarmMessageProcess { |
||||
|
|
||||
|
@Resource |
||||
|
private Client mqttClient; |
||||
|
@Resource |
||||
|
private HandDetectorService handDetectorService; |
||||
|
|
||||
|
private final RateLimiter mqttRateLimiter = RateLimiter.create(3000.0); |
||||
|
|
||||
|
public void processSingle(String jsonValue) { |
||||
|
// 1. 解析 Kafka 消息
|
||||
|
AlarmDispatchEvent event = JsonUtils.parseObject(jsonValue, AlarmDispatchEvent.class); |
||||
|
if (event == null) { |
||||
|
log.warn("[报警推送] 消息解析为空,忽略: {}", jsonValue); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
try { |
||||
|
// 2. 【查库】根据部门/租户/源设备SN,查询需要推送的目标设备列表
|
||||
|
List<String> targetSns = handDetectorService.getSnListByDept( |
||||
|
event.getDeptId(), |
||||
|
event.getTenantId(), |
||||
|
event.getSourceSn() |
||||
|
); |
||||
|
|
||||
|
if (targetSns == null || targetSns.isEmpty()) { |
||||
|
log.info("[报警推送] 无需推送,目标列表为空. SourceSN: {}", event.getSourceSn()); |
||||
|
return; |
||||
|
} |
||||
|
// 3. 执行推送逻辑
|
||||
|
this.publishAlarmToMqtt(targetSns, event.getMsgContent()); |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("[报警推送] 处理异常 SourceSN: {}", event.getSourceSn(), e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 执行 MQTT 推送 (包含限流) |
||||
|
*/ |
||||
|
private void publishAlarmToMqtt(List<String> targetSns, String message) { |
||||
|
if (message != null) { |
||||
|
return; |
||||
|
} |
||||
|
// 构造 MQTT 消息体
|
||||
|
Map<String, String> payload = Map.of("message", message); |
||||
|
String jsonPayload = JsonUtils.toJsonString(payload); |
||||
|
|
||||
|
for (String sn : targetSns) { |
||||
|
if (StringUtils.isBlank(sn)) continue; |
||||
|
|
||||
|
|
||||
|
mqttRateLimiter.acquire(); |
||||
|
try { |
||||
|
String topic = sn + "/zds_down"; |
||||
|
// 发送
|
||||
|
mqttClient.publish(topic, jsonPayload); |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
// 5. 【隔离】单个设备推送失败,不要影响列表里的下一个设备
|
||||
|
log.error("[MQTT推送] 单个发送失败,SN: {}", sn, e); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,17 @@ |
|||||
|
package cn.iocoder.yudao.module.hand.vo; |
||||
|
|
||||
|
import lombok.AllArgsConstructor; |
||||
|
import lombok.Data; |
||||
|
import lombok.NoArgsConstructor; |
||||
|
|
||||
|
@Data |
||||
|
@NoArgsConstructor |
||||
|
@AllArgsConstructor |
||||
|
public class AlarmDispatchEvent { |
||||
|
private Long deptId; |
||||
|
private Long tenantId; |
||||
|
private String sourceSn; |
||||
|
|
||||
|
// 消息内容
|
||||
|
private String msgContent; |
||||
|
} |
||||
Loading…
Reference in new issue