Commit 7c13140a by 胡懿

修复线程执行一段时候后不执行了的问题

parent f8feb3ef
......@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Component
public class PumpStationKafkaConsumer {
......@@ -44,7 +45,7 @@ public class PumpStationKafkaConsumer {
*/
@KafkaListener(
topics = {
"BZHSZM_JCSJ", "BZPSZM_JCSJ", "MKLDYL_JCSJ", "BZYYZ_JCSJ", "BZPSQ_JCSJ", "BZXZY_JCSJ",
"MKLDYL_JCSJ", "BZYYZ_JCSJ", "BZPSQ_JCSJ", "BZXZY_JCSJ",
"BZZLB_JCSJ", "BZYZJ_JCSJ", "BZJSZM_JCSJ", "BZYLZM_JCSJ", "BZCSZM_JCSJ",
"BZSCZM_JCSJ", "BZTXZM_JCSJ", "BZTFFJ_JCSJ", "BZSFFJ_JCSJ", "BZPFFJ_JCSJ",
"BZFJ_JCSJ", "BZFSJ_JCSJ", "BZYWCJ_JCSJ_2", "BZQBJ_JCSJ", "BZYDYHQT_JCSJ",
......@@ -66,14 +67,14 @@ public class PumpStationKafkaConsumer {
ack.acknowledge();
// 异步处理业务逻辑
processMessageAsync(message);
processMessageContent(topic, message);
} catch (Exception e) {
// logger.error("处理泵站数据时发生错误, topic: {}, 错误: {}", topic, e.getMessage(), e);
logger.error("处理泵站数据时发生错误, topic: {}, 错误: {}", topic, e.getMessage(), e);
}
}
/*@KafkaListener(
@KafkaListener(
topics = {"BZHSZM_JCSJ"},
containerFactory = "kafkaListenerContainerFactory2",
groupId = "pump_station_bzhszm_group" // 专用组
......@@ -82,8 +83,12 @@ public class PumpStationKafkaConsumer {
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment ack) {
// logger.info("BZHSZM_JCSJ专用消费者收到数据, topic: {}, 消息长度: {}", topic, message);
ack.acknowledge();
processMessageAsync(message);
try {
ack.acknowledge();
processMessageContent(topic, message);
} catch (Exception e) {
e.printStackTrace();
}
}
@KafkaListener(
topics = {"BZPSZM_JCSJ"},
......@@ -94,67 +99,233 @@ public class PumpStationKafkaConsumer {
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment ack) {
// logger.info("BZPSZM_JCSJ专用消费者收到数据, topic: {}, 消息长度: {}", topic, message);
ack.acknowledge();
processMessageAsync(message);
}*/
try {
ack.acknowledge();
processMessageContent(topic, message);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 异步处理消息,避免阻塞消费者
* 处理消息内容
*/
private void processMessageAsync(String message) {
CompletableFuture.runAsync(() -> {
try {
// semaphore.acquire();
semaphore.acquire();
boolean b = false;
JSONObject msgJsonObject = JSON.parseObject(message);
String snCode = msgJsonObject.get("serialNo").toString();
JSONArray services = msgJsonObject.getJSONArray("services");
if (null != services) {
JSONObject paramsObject = services.getJSONObject(0).getJSONObject("params");
if (null != paramsObject) {
String ts = paramsObject.getString("ts");
if (null != ts) {
long l = MyDateUtils.stringToLong(ts);
if (new Date().getTime() - l < 24 * 60 * 60 * 1000) {
b = true;
}
}
}
}
private void processMessageContent(String topic, String message) {
try {
// 先进行基本的消息验证
if (message == null || message.trim().isEmpty()) {
logger.warn("收到空消息, topic: {}", topic);
return;
}
if (!b) {
// logger.info("异步处理完成, topic: {}, SN: {}", topic, snCode);
return;
}
// 解析JSON消息
JSONObject msgJsonObject = parseMessageJson(topic, message);
if (msgJsonObject == null) {
return;
}
Date oldDate = snCodeDateMap.get(snCode);
if (null != oldDate && (new Date().getTime() - oldDate.getTime() < 20 * 60 * 1000)) {
return;
}
// 获取序列号
String snCode = getSnCode(msgJsonObject, topic);
if (snCode == null) {
return;
}
// 检查时间有效性
if (!isMessageTimeValid(msgJsonObject)) {
logger.debug("消息时间无效, topic: {}, SN: {}", topic, snCode);
return;
}
// 检查消息频率
if (shouldSkipMessage(snCode)) {
logger.debug("消息频率过高被跳过, topic: {}, SN: {}", topic, snCode);
return;
}
// 更新设备配置状态
updateDeviceConfigStatus(snCode, topic);
} catch (Exception e) {
logger.error("处理消息内容失败, topic: {}, 错误: {}", topic, e.getMessage(), e);
}
}
/**
* 解析JSON消息 - 修复版本
*/
private JSONObject parseMessageJson(String topic, String message) {
// 先检查消息是否以 { 开头,以 } 结尾
if (!isValidJsonFormat(message)) {
logger.error("消息格式无效,不是合法的JSON, topic: {}, message: {}",
topic, getMessagePreview(message));
return null;
}
try {
JSONObject jsonObject = JSON.parseObject(message);
if (jsonObject == null) {
logger.error("JSON解析结果为null, topic: {}, message: {}",
topic, getMessagePreview(message));
return null;
}
return jsonObject;
} catch (Exception e) {
// 安全的日志记录,避免消息内容本身导致的问题
logger.error("JSON转换错误, topic: {}, message预览: {}, 错误类型: {}, 错误详情: {}",
topic,
getMessagePreview(message),
e.getClass().getSimpleName(),
e.getMessage());
return null;
}
}
/**
* 安全的获取消息预览
*/
private String getMessagePreview(String message) {
if (message == null) {
return "null";
}
try {
// 限制预览长度,避免日志过大
String preview = message.trim();
if (preview.length() > 200) {
preview = preview.substring(0, 200) + "...";
}
// 移除可能影响日志格式的字符
preview = preview.replace("\n", " ").replace("\r", " ");
return preview;
} catch (Exception e) {
return "无法解析消息内容";
}
}
/**
* 检查消息是否是合法的JSON格式
*/
private boolean isValidJsonFormat(String message) {
if (message == null || message.trim().isEmpty()) {
return false;
}
String trimmed = message.trim();
return trimmed.startsWith("{") && trimmed.endsWith("}");
}
/**
* 获取序列号
*/
private String getSnCode(JSONObject msgJsonObject, String topic) {
try {
return msgJsonObject.getString("serialNo");
} catch (Exception e) {
logger.error("获取序列号失败, topic: {}", topic, e);
return null;
}
}
/**
* 检查消息时间是否有效(24小时内)
*/
private boolean isMessageTimeValid(JSONObject msgJsonObject) {
try {
JSONArray services = msgJsonObject.getJSONArray("services");
if (services == null || services.isEmpty()) {
return false;
}
JSONObject paramsObject = services.getJSONObject(0).getJSONObject("params");
if (paramsObject == null) {
return false;
}
String ts = paramsObject.getString("ts");
if (ts == null) {
return false;
}
long messageTime = MyDateUtils.stringToLong(ts);
long currentTime = System.currentTimeMillis();
// 检查消息时间是否在24小时内
return (currentTime - messageTime) < (24 * 60 * 60 * 1000);
snCodeDateMap.put(snCode, new Date());
DeviceReqConfListReqVO temDRCLRV = new DeviceReqConfListReqVO();
temDRCLRV.setSnCode(snCode);
List<DeviceReqConfDO> deviceReqConfList = deviceReqConfMapper.selectList(temDRCLRV);
if (null != deviceReqConfList && deviceReqConfList.size() > 0) {
for (DeviceReqConfDO drcd : deviceReqConfList) {
DeviceReqConfDO deviceReqConfSaveReqVO = new DeviceReqConfDO();
deviceReqConfSaveReqVO.setId(drcd.getId());
deviceReqConfSaveReqVO.setState(1);
deviceReqConfSaveReqVO.setSycnDate(new Date());
deviceReqConfMapper.update(new UpdateWrapper<DeviceReqConfDO>().lambda()
.eq(DeviceReqConfDO::getId, deviceReqConfSaveReqVO.getId())
.set(DeviceReqConfDO::getState, 1)
.set(DeviceReqConfDO::getSycnDate, deviceReqConfSaveReqVO.getSycnDate())
);
}
// logger.info("异步处理完成, SN: {}", snCode);
} catch (Exception e) {
logger.error("检查消息时间有效性失败", e);
return false;
}
}
/**
* 检查是否应该跳过该消息(基于频率控制)
*/
private boolean shouldSkipMessage(String snCode) {
Date oldDate = snCodeDateMap.get(snCode);
if (oldDate == null) {
return false;
}
long timeDiff = System.currentTimeMillis() - oldDate.getTime();
// 20分钟内重复消息被跳过
return timeDiff < (20 * 60 * 1000);
}
/**
* 更新设备配置状态
*/
private void updateDeviceConfigStatus(String snCode, String topic) {
try {
// 更新SN码的时间戳
snCodeDateMap.put(snCode, new Date());
// 查询设备配置
DeviceReqConfListReqVO query = new DeviceReqConfListReqVO();
query.setSnCode(snCode);
List<DeviceReqConfDO> deviceReqConfList = deviceReqConfMapper.selectList(query);
if (deviceReqConfList == null || deviceReqConfList.isEmpty()) {
logger.debug("未找到设备配置, SN: {}", snCode);
return;
}
// 批量更新设备配置状态
int updateCount = updateDeviceConfigBatch(deviceReqConfList);
logger.info("设备配置状态更新完成, topic: {}, SN: {}, 更新记录数: {}",
topic, snCode, updateCount);
} catch (Exception e) {
logger.error("更新设备配置状态失败, SN: {}", snCode, e);
throw new RuntimeException("更新设备配置状态失败", e);
}
}
/**
* 批量更新设备配置
*/
private int updateDeviceConfigBatch(List<DeviceReqConfDO> deviceReqConfList) {
int updateCount = 0;
Date currentDate = new Date();
for (DeviceReqConfDO deviceConf : deviceReqConfList) {
try {
int result = deviceReqConfMapper.update(new UpdateWrapper<DeviceReqConfDO>().lambda()
.eq(DeviceReqConfDO::getId, deviceConf.getId())
.set(DeviceReqConfDO::getState, 1)
.set(DeviceReqConfDO::getSycnDate, currentDate)
);
if (result > 0) {
updateCount++;
}
} catch (Exception e) {
logger.error("异步处理消息失败, 错误: {}", e.getMessage(), e);
logger.error("更新设备配置失败, ID: {}", deviceConf.getId(), e);
}
});
}
return updateCount;
}
}
\ No newline at end of file
......@@ -166,7 +166,7 @@ kafka:
# 添加消费者稳定性配置
session-timeout-ms: 30000
heartbeat-interval-ms: 10000
max-poll-records: 500
max-poll-records: 10
--- #################### 服务保障相关配置 ####################
# Lock4j 配置项
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment