@ -9,13 +9,20 @@ import cn.iocoder.yudao.module.hand.service.*;
import cn.iocoder.yudao.module.hand.util.* ;
import cn.iocoder.yudao.module.hand.vo.* ;
import cn.iocoder.yudao.module.mqtt.config.TdengineBatchConfig ;
import cn.iocoder.yudao.module.mqtt.mqtt.Client ;
import com.fasterxml.jackson.core.JsonProcessingException ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import jakarta.annotation.Resource ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.lang3.StringUtils ;
import org.eclipse.paho.client.mqttv3.IMqttClient ;
import org.eclipse.paho.client.mqttv3.MqttMessage ;
import org.redisson.api.RLock ;
import org.redisson.api.RedissonClient ;
import org.springframework.core.task.TaskExecutor ;
import org.springframework.stereotype.Component ;
import java.nio.charset.StandardCharsets ;
import java.sql.Timestamp ;
import java.time.LocalDateTime ;
import java.util.* ;
@ -25,6 +32,7 @@ import java.util.concurrent.TimeUnit;
@Component
public class DeviceMessageProcessor {
private static final ObjectMapper objectMapper = new ObjectMapper ( ) ;
@Resource
private RedisUtil redisUtil ;
@Resource
@ -33,21 +41,22 @@ public class DeviceMessageProcessor {
private TdengineBatchConfig < TdengineDataVo > tdengineBatchConfig ;
@Resource
private HandDetectorService handDetectorService ;
@Resource
private HandAlarmService handAlarmService ;
@Resource
private AlarmRuleService alarmRuleService ;
@Resource
private RedissonClient redissonClient ;
@Resource
private FenceService fenceService ;
@Resource
private FenceAlarmService fenceAlarmService ;
@Resource
private Client client ;
@Resource ( name = "mqttAlarmExecutor" )
private TaskExecutor alarmExecutor ;
public void process ( String topic , String payload ) {
log . debug ( "[设备上报] 开始处理 -> 主题: {}, 内容: {}" , topic , payload ) ;
@ -92,6 +101,7 @@ public class DeviceMessageProcessor {
log . info ( "未启用的手持探测器 sn: {}" , topic ) ;
return ;
}
// 数据解析与转换
HandDataVo handVo = dataConvert ( topic , payload , detector ) ;
@ -272,7 +282,7 @@ public class DeviceMessageProcessor {
private void fanceAlarm ( HandDataVo handVo ) {
if ( StringUtils . isBlank ( handVo . getFenceIds ( ) ) ) {
handVo . setFenceStatus ( HandAlarmType . NORMAL . getType ( ) ) ;
log . info ( "当前设备未绑定围栏,sn{}" , handVo . getSn ( ) ) ;
log . error ( "当前设备未绑定围栏,sn{}" , handVo . getSn ( ) ) ;
return ;
}
List < Long > list = Arrays . stream ( handVo . getFenceIds ( ) . split ( "," ) )
@ -404,6 +414,9 @@ public class DeviceMessageProcessor {
private AlarmRuleDO getAlarmRule ( HandDataVo handVo , Map < Long , List < AlarmRuleDO > > ruleMap ) {
if ( null = = handVo . getValue ( ) ) {
return null ;
}
double gasValue = handVo . getValue ( ) ;
Long gasTypeId = handVo . getGasTypeId ( ) ;
@ -445,6 +458,9 @@ public class DeviceMessageProcessor {
return redisData ;
}
String sn = redisData . getSn ( ) ;
String gasName = alarmRule . getGasTypeName ( ) ;
LocalDateTime now = LocalDateTime . now ( ) ;
//离线报警结束
if ( OnlineStatusType . ONLINE . getType ( ) . equals ( redisData . getOnlineStatus ( ) )
@ -452,6 +468,7 @@ public class DeviceMessageProcessor {
HandAlarmDO handAlarmDO = new HandAlarmDO ( ) ;
handAlarmDO . setId ( redisData . getAlarmId ( ) ) ;
handAlarmDO . setTAlarmEnd ( now ) ;
handAlarmDO . setStatus ( EnableStatus . HANDLE . value ( ) ) ;
handAlarmService . updateById ( handAlarmDO ) ;
//删除离线报警
redisData . setAlarmId ( null ) ;
@ -465,6 +482,12 @@ public class DeviceMessageProcessor {
redisData . setAlarmLevel ( 0 ) ;
redisData . setTAlarmEnd ( now ) ;
redisData . setGasStatus ( HandAlarmType . NORMAL . getType ( ) ) ;
// 【新增逻辑】推送报警结束
sendGroupMessage ( redisData , gasName , redisData . getValue ( ) , false ) ;
// 清除上一次推送的数值记录,防止下次报警数值一样时不推送(如果需要)
redisData . setLastPushValue ( null ) ;
return redisData ;
}
@ -498,11 +521,60 @@ public class DeviceMessageProcessor {
// 更新最大值
redisData . setMaxValue ( redisData . getValue ( ) ) ;
}
if ( redisData . getLastPushValue ( ) = = null | | ! redisData . getLastPushValue ( ) . equals ( redisData . getValue ( ) ) ) {
sendGroupMessage ( redisData , gasName , redisData . getValue ( ) , true ) ;
// 更新最后推送的值
redisData . setLastPushValue ( redisData . getValue ( ) ) ;
}
}
redisData . setGasStatus ( HandAlarmType . ALARM . getType ( ) ) ;
return redisData ;
}
/ * *
* 辅助方法 : 发送同组人消息
*
* @param gasName 气体名称
* @param value 当前浓度值
* @param isAlarming true = 报警中 , false = 报警结束
* /
private void sendGroupMessage ( HandDataVo redisData , String gasName , Double value , boolean isAlarming ) {
if ( null = = redisData . getDeptId ( ) ) return ;
// 1. 准备数据(在主线程做,只做一次)
String valueStr = ( value ! = null & & value % 1 = = 0 ) ? String . valueOf ( value . intValue ( ) ) : String . valueOf ( value ) ;
String statusText = isAlarming ? "报警" : "报警结束" ;
String msgContent = String . format ( "%s%s,%s气体浓度为%s" , redisData . getName ( ) , statusText , gasName , valueStr ) ;
Map < String , String > map = new HashMap < > ( ) ;
map . put ( "message" , msgContent ) ;
final String payload ; // 声明为 final 供内部类使用
try {
payload = objectMapper . writeValueAsString ( map ) ;
} catch ( JsonProcessingException e ) {
log . error ( "JSON转换失败" , e ) ;
return ;
}
// 2. 查人(查同组所有设备的SN)
List < HandDetectorDO > listAll = handDetectorService . getListAll ( redisData . getDeptId ( ) , redisData . getTenantId ( ) ) ;
if ( listAll = = null | | listAll . isEmpty ( ) ) return ;
alarmExecutor . execute ( ( ) - > {
for ( HandDetectorDO device : listAll ) {
String deviceSn = device . getSn ( ) ;
if ( deviceSn = = null | | deviceSn . isEmpty ( ) ) continue ;
String topic = deviceSn + "/zds_down" ;
try {
client . publish ( topic , payload ) ;
} catch ( Exception e ) {
log . error ( "异步报警推送失败, SN: {}" , deviceSn , e ) ;
}
}
} ) ;
}
/ * *
* 数据转换
@ -543,7 +615,6 @@ public class DeviceMessageProcessor {
detector . setGpsType ( Integer . parseInt ( type ) ) ;
return detector ;
}