Commit f8feb3ef by 胡懿

增加消息20分钟内取出重复设备的消息,增加线程控制,保证同一时间只有5条线程处理消息

parent e8fe8c18
......@@ -20,9 +20,12 @@ import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
@Component
public class PumpStationKafkaConsumer {
......@@ -32,12 +35,16 @@ public class PumpStationKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(PumpStationKafkaConsumer.class);
private HashMap<String, Date> snCodeDateMap = new HashMap<>();
private final Semaphore semaphore = new Semaphore(5);
/**
* 主消费者 - 处理大部分topic
*/
/*@KafkaListener(
@KafkaListener(
topics = {
"MKLDYL_JCSJ", "BZYYZ_JCSJ", "BZPSQ_JCSJ", "BZXZY_JCSJ",
"BZHSZM_JCSJ", "BZPSZM_JCSJ", "MKLDYL_JCSJ", "BZYYZ_JCSJ", "BZPSQ_JCSJ", "BZXZY_JCSJ",
"BZZLB_JCSJ", "BZYZJ_JCSJ", "BZJSZM_JCSJ", "BZYLZM_JCSJ", "BZCSZM_JCSJ",
"BZSCZM_JCSJ", "BZTXZM_JCSJ", "BZTFFJ_JCSJ", "BZSFFJ_JCSJ", "BZPFFJ_JCSJ",
"BZFJ_JCSJ", "BZFSJ_JCSJ", "BZYWCJ_JCSJ_2", "BZQBJ_JCSJ", "BZYDYHQT_JCSJ",
......@@ -53,22 +60,19 @@ public class PumpStationKafkaConsumer {
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment ack) {
try {
logger.info("主消费者收到泵站数据, topic: {}, 消息长度: {}", topic, message);
// logger.info("主消费者收到泵站数据, topic: {}, 消息长度: {}", topic, message);
// 立即提交偏移量,避免阻塞
ack.acknowledge();
// 异步处理业务逻辑
processMessageAsync(message, topic);
processMessageAsync(message);
} catch (Exception e) {
logger.error("处理泵站数据时发生错误, topic: {}, 错误: {}", topic, e.getMessage(), e);
// logger.error("处理泵站数据时发生错误, topic: {}, 错误: {}", topic, e.getMessage(), e);
}
}*/
}
/**
* BZHSZM_JCSJ专用消费者
*/
/*@KafkaListener(
topics = {"BZHSZM_JCSJ"},
containerFactory = "kafkaListenerContainerFactory2",
......@@ -77,15 +81,11 @@ public class PumpStationKafkaConsumer {
public void consumePumpStationData_BZHSZM_JCSJ(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment ack) {
logger.info("BZHSZM_JCSJ专用消费者收到数据, topic: {}, 消息长度: {}", topic, message);
// logger.info("BZHSZM_JCSJ专用消费者收到数据, topic: {}, 消息长度: {}", topic, message);
ack.acknowledge();
processMessageAsync(message, topic);
}*/
/**
* BZPSZM_JCSJ专用消费者
*/
/*@KafkaListener(
processMessageAsync(message);
}
@KafkaListener(
topics = {"BZPSZM_JCSJ"},
containerFactory = "kafkaListenerContainerFactory2",
groupId = "pump_station_bzpszm_group" // 专用组
......@@ -93,17 +93,19 @@ public class PumpStationKafkaConsumer {
public void consumePumpStationData_BZPSZM_JCSJ(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment ack) {
logger.info("BZPSZM_JCSJ专用消费者收到数据, topic: {}, 消息长度: {}", topic, message);
// logger.info("BZPSZM_JCSJ专用消费者收到数据, topic: {}, 消息长度: {}", topic, message);
ack.acknowledge();
processMessageAsync(message, topic);
processMessageAsync(message);
}*/
/**
* 异步处理消息,避免阻塞消费者
*/
private void processMessageAsync(String message, String topic) {
private void processMessageAsync(String message) {
CompletableFuture.runAsync(() -> {
try {
// semaphore.acquire();
semaphore.acquire();
boolean b = false;
JSONObject msgJsonObject = JSON.parseObject(message);
String snCode = msgJsonObject.get("serialNo").toString();
......@@ -122,9 +124,16 @@ public class PumpStationKafkaConsumer {
}
if (!b) {
logger.info("异步处理完成, topic: {}, SN: {}", topic, snCode);
// logger.info("异步处理完成, topic: {}, SN: {}", topic, snCode);
return;
}
Date oldDate = snCodeDateMap.get(snCode);
if (null != oldDate && (new Date().getTime() - oldDate.getTime() < 20 * 60 * 1000)) {
return;
}
snCodeDateMap.put(snCode, new Date());
DeviceReqConfListReqVO temDRCLRV = new DeviceReqConfListReqVO();
temDRCLRV.setSnCode(snCode);
List<DeviceReqConfDO> deviceReqConfList = deviceReqConfMapper.selectList(temDRCLRV);
......@@ -141,10 +150,10 @@ public class PumpStationKafkaConsumer {
.set(DeviceReqConfDO::getSycnDate, deviceReqConfSaveReqVO.getSycnDate())
);
}
logger.info("异步处理完成, topic: {}, SN: {}", topic, snCode);
// logger.info("异步处理完成, SN: {}", snCode);
}
} catch (Exception e) {
logger.error("异步处理消息失败, topic: {}, 错误: {}", topic, e.getMessage(), e);
logger.error("异步处理消息失败, 错误: {}", e.getMessage(), e);
}
});
}
......
......@@ -818,7 +818,11 @@ public class MyIotDbUtils {
values.add(deviceIllLogInfo.getDeviceTypeBs());
values.add(deviceIllLogInfo.getDeviceTypeName());
values.add(deviceIllLogInfo.getDeviceSn());
values.add(MyDateUtils.stringToLong(deviceIllLogInfo.getAccessed()));
if (null != deviceIllLogInfo.getAccessed()) {
values.add(MyDateUtils.stringToLong(deviceIllLogInfo.getAccessed()));
} else {
values.add(null);
}
values.add(deviceIllLogInfo.getIllType());
values.add(deviceIllLogInfo.getClientIp());
values.add(deviceIllLogInfo.getRemark());
......@@ -1143,7 +1147,15 @@ public class MyIotDbUtils {
Field field8 = fields.get(8);
String deviceSn = field8.getStringValue();
Field field9 = fields.get(9);
long accessed = field9.getLongV();
Long accessed = null;
if (null != field9) {
try {
accessed = field9.getLongV();
} catch (Exception e) {
System.out.println("accessed为空");
}
}
Field field10 = fields.get(10);
String illType = field10.getStringValue();
Field field11 = fields.get(11);
......
......@@ -5,6 +5,7 @@ import cn.gintone.controller.vo.DeviceReqConfListReqVO;
import cn.gintone.dal.DeviceReqConfMapper;
import cn.gintone.dto.DeviceIllLogInfo;
import cn.gintone.entity.DeviceReqConfDO;
import cn.gintone.iotdbUtils.MyDateUtils;
import cn.gintone.iotdbUtils.MyIotDbUtils;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -44,6 +45,7 @@ public class MyScheduledTasks {
deviceIllLogInfo.setDeviceName(deviceReqConfDO.getDeviceName());
deviceIllLogInfo.setDeviceTypeId(deviceReqConfDO.getDeviceTypeId() + "");
deviceIllLogInfo.setDeviceTypeName(deviceReqConfDO.getDeviceTypeName());
deviceIllLogInfo.setAccessed(MyDateUtils.longToString(new Date().getTime()));
deviceIllLogInfo.setIllType("disconnected");
MyIotDbUtils.inserOneDeviceLogInfo_ill(iotDbConfig, deviceIllLogInfo);
}
......
......@@ -54,10 +54,12 @@ spring:
# url: jdbc:oracle:thin:@127.0.0.1:1521:xe # Oracle 连接的示例
# url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=ruoyi-vue-pro;SelectMethod=cursor;encrypt=false;rewriteBatchedStatements=true;useUnicode=true;characterEncoding=utf-8 # SQLServer 连接的示例
# url: jdbc:dm://127.0.0.1:5236?schema=RUOYI_VUE_PRO # DM 连接的示例
url: jdbc:kingbase8://127.0.0.1:54321/test # 人大金仓 KingbaseES 连接的示例
# url: jdbc:postgresql://192.168.19.128:54321/test # OpenGauss 连接的示例
username: kingbase
password: kingbase
# url: jdbc:kingbase8://127.0.0.1:54321/test # 人大金仓 KingbaseES 连接的示例
# username: kingbase
# password: kingbase
url: jdbc:kingbase8://59.195.13.251:54321/ht_aq # 人大金仓 KingbaseES 连接的示例
username: ht_aq
password: ht@1234
# username: sa # SQL Server 连接的示例
# password: Yudao@2024 # SQL Server 连接的示例
# username: SYSDBA # DM 连接的示例
......
......@@ -54,10 +54,11 @@ spring:
# url: jdbc:oracle:thin:@127.0.0.1:1521:xe # Oracle 连接的示例
# url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=ruoyi-vue-pro;SelectMethod=cursor;encrypt=false;rewriteBatchedStatements=true;useUnicode=true;characterEncoding=utf-8 # SQLServer 连接的示例
# url: jdbc:dm://127.0.0.1:5236?schema=RUOYI_VUE_PRO # DM 连接的示例
url: jdbc:kingbase8://127.0.0.1:54321/ht_aq # 人大金仓 KingbaseES 连接的示例
# url: jdbc:postgresql://192.168.19.128:54321/test # OpenGauss 连接的示例
url: jdbc:kingbase8://59.195.13.251:54321/ht_aq # 人大金仓 KingbaseES 连接的示例
username: ht_aq
password: ht@1234
# username: ht_aq
# password: ht@1234
# username: sa # SQL Server 连接的示例
# password: Yudao@2024 # SQL Server 连接的示例
# username: SYSDBA # DM 连接的示例
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment