处理消费数据格式不对问题

stwzhj
luyya 2026-03-24 19:32:42 +08:00
parent c1152bf7d8
commit ddc8d4a8bd
3 changed files with 25 additions and 23 deletions

View File

@ -19,12 +19,25 @@ spring:
username: root username: root
password: Ycgis!2509 password: Ycgis!2509
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 3 # 每个实例最多3个连接
minimum-idle: 1 # 最少1个空闲连接
idle-timeout: 300000 # 5分钟空闲超时
connection-timeout: 30000
max-lifetime: 600000 # 10分钟生命周期
# 目标数据库(PostgreSQL) - 所有地市使用相同的目标数据库 # 目标数据库(PostgreSQL) - 所有地市使用相同的目标数据库
target: target:
url: jdbc:postgresql://53.16.17.15:5432/wzhj?useUnicode=true&characterEncoding=utf8&useSSL=true&autoReconnect=true&reWriteBatchedInserts=true&stringtype=unspecified url: jdbc:postgresql://53.16.17.15:5432/wzhj?useUnicode=true&characterEncoding=utf8&useSSL=true&autoReconnect=true&reWriteBatchedInserts=true&stringtype=unspecified
username: pgsql username: pgsql
password: ycgis password: ycgis
driver-class-name: org.postgresql.Driver driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 3 # 每个实例最多3个连接
minimum-idle: 1
idle-timeout: 300000
connection-timeout: 30000
max-lifetime: 600000
jpa: jpa:
show-sql: false show-sql: false
hibernate: hibernate:

View File

@ -63,38 +63,38 @@ public class ConsumerWorker implements Runnable {
try { try {
jsonObject = JSONUtil.parseObj(((String) value)); jsonObject = JSONUtil.parseObj(((String) value));
}catch (ConvertException e){ }catch (ConvertException e){
logger.info("jsonObject=null:error={}",e.getMessage()); logger.error("jsonObject=null:error={}",e.getMessage());
return; return;
} }
try { try {
esGpsInfo = JSONUtil.toBean(jsonObject, RemoteGpsInfo.class); esGpsInfo = JSONUtil.toBean(jsonObject, RemoteGpsInfo.class);
}catch (ConvertException e){ }catch (ConvertException e){
logger.info("EsGpsInfo=null:error={}",e.getMessage()); logger.error("EsGpsInfo=null:error={}",e.getMessage());
return; return;
} }
if(Objects.isNull(esGpsInfo)){ if(Objects.isNull(esGpsInfo)){
logger.info("esGpsInfo=null no error"); logger.error("esGpsInfo=null no error");
return; return;
} }
String deviceCode = esGpsInfo.getDeviceCode(); String deviceCode = esGpsInfo.getDeviceCode();
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){ if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
logger.info("deviceCode:{} is null or is too long ",deviceCode); logger.error("deviceCode:{} is null or is too long ",deviceCode);
return; return;
} }
String latitude = esGpsInfo.getLat(); String latitude = esGpsInfo.getLat();
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){ if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
logger.info("latitude:{} is null or is zero ",latitude); logger.error("latitude:{} is null or is zero ",latitude);
return; return;
} }
String longitude = esGpsInfo.getLng(); String longitude = esGpsInfo.getLng();
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){ if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
logger.info("longitude:{} is null or is zero ",longitude); logger.error("longitude:{} is null or is zero ",longitude);
return; return;
} }
String infoSource = esGpsInfo.getInfoSource(); String infoSource = esGpsInfo.getInfoSource();
if(StringUtils.isEmpty(infoSource) ){ if(StringUtils.isEmpty(infoSource) ){
logger.info("infoSource:{} is null ",infoSource); logger.error("infoSource:{} is null ",infoSource);
return; return;
} }

View File

@ -123,22 +123,11 @@ public class KafkaConsumerService {
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> record : records) {
try { try {
// 将消息添加到批量处理器 // 将消息添加到批量处理器
EsGpsInfo esGpsInfo; String rawMessage = record.value();
JSONObject jsonObject; JSONObject jsonObject = JSONUtil.parseObj(rawMessage);
try { jsonObject.set("infoSource", kafkaConsumerConfig.getCityCode());
jsonObject = JSONUtil.parseObj(((String) record.value())); String finalJson = jsonObject.toString();
}catch (ConvertException e){ boolean added = batchMessageProcessor.addMessage(finalJson);
logger.info("jsonObject=null:error={}",e.getMessage());
return;
}
try {
esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class);
}catch (ConvertException e){
logger.info("EsGpsInfo=null:error={}",e.getMessage());
return;
}
esGpsInfo.setInfoSource(kafkaConsumerConfig.getCityCode());
boolean added = batchMessageProcessor.addMessage(esGpsInfo.toString());
if (!added) { if (!added) {
logger.warn("消息队列已满,丢弃消息: topic={}, partition={}, offset={}", logger.warn("消息队列已满,丢弃消息: topic={}, partition={}, offset={}",
record.topic(), record.partition(), record.offset()); record.topic(), record.partition(), record.offset());