From ddc8d4a8bdad50a29a0c9292c4de0a09ea66ab38 Mon Sep 17 00:00:00 2001 From: luyya Date: Tue, 24 Mar 2026 19:32:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86=E6=B6=88=E8=B4=B9=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=A0=BC=E5=BC=8F=E4=B8=8D=E5=AF=B9=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.yml | 13 ++++++++++++ .../consumer/handler/ConsumerWorker.java | 14 ++++++------- .../service/KafkaConsumerService.java | 21 +++++-------------- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/resources/application.yml b/stwzhj-modules/stwzhj-baseToSt/src/main/resources/application.yml index ccd0c18e..5eddcb36 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/resources/application.yml +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/resources/application.yml @@ -19,12 +19,25 @@ spring: username: root password: Ycgis!2509 driver-class-name: com.mysql.cj.jdbc.Driver + hikari: + maximum-pool-size: 3 # 每个实例最多3个连接 + minimum-idle: 1 # 最少1个空闲连接 + idle-timeout: 300000 # 5分钟空闲超时 + connection-timeout: 30000 + max-lifetime: 600000 # 10分钟生命周期 + # 目标数据库(PostgreSQL) - 所有地市使用相同的目标数据库 target: url: jdbc:postgresql://53.16.17.15:5432/wzhj?useUnicode=true&characterEncoding=utf8&useSSL=true&autoReconnect=true&reWriteBatchedInserts=true&stringtype=unspecified username: pgsql password: ycgis driver-class-name: org.postgresql.Driver + hikari: + maximum-pool-size: 3 # 每个实例最多3个连接 + minimum-idle: 1 + idle-timeout: 300000 + connection-timeout: 30000 + max-lifetime: 600000 jpa: show-sql: false hibernate: diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java index 7ca38b8d..2cbd2278 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java @@ -63,38 +63,38 @@ public class ConsumerWorker implements Runnable { try { jsonObject = JSONUtil.parseObj(((String) value)); }catch (ConvertException e){ - logger.info("jsonObject=null:error={}",e.getMessage()); + logger.error("jsonObject=null:error={}",e.getMessage()); return; } try { esGpsInfo = JSONUtil.toBean(jsonObject, RemoteGpsInfo.class); }catch (ConvertException e){ - logger.info("EsGpsInfo=null:error={}",e.getMessage()); + logger.error("EsGpsInfo=null:error={}",e.getMessage()); return; } if(Objects.isNull(esGpsInfo)){ - logger.info("esGpsInfo=null no error"); + logger.error("esGpsInfo=null no error"); return; } String deviceCode = esGpsInfo.getDeviceCode(); if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){ - logger.info("deviceCode:{} is null or is too long ",deviceCode); + logger.error("deviceCode:{} is null or is too long ",deviceCode); return; } String latitude = esGpsInfo.getLat(); if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){ - logger.info("latitude:{} is null or is zero ",latitude); + logger.error("latitude:{} is null or is zero ",latitude); return; } String longitude = esGpsInfo.getLng(); if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){ - logger.info("longitude:{} is null or is zero ",longitude); + logger.error("longitude:{} is null or is zero ",longitude); return; } String infoSource = esGpsInfo.getInfoSource(); if(StringUtils.isEmpty(infoSource) ){ - logger.info("infoSource:{} is null ",infoSource); + logger.error("infoSource:{} is null ",infoSource); return; } diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java index 4a4e765c..1127b7a8 100644 --- a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java @@ -123,22 +123,11 @@ public class KafkaConsumerService { for (ConsumerRecord record : records) { try { // 将消息添加到批量处理器 - EsGpsInfo esGpsInfo; - JSONObject jsonObject; - try { - jsonObject = JSONUtil.parseObj(((String) record.value())); - }catch (ConvertException e){ - logger.info("jsonObject=null:error={}",e.getMessage()); - return; - } - try { - esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class); - }catch (ConvertException e){ - logger.info("EsGpsInfo=null:error={}",e.getMessage()); - return; - } - esGpsInfo.setInfoSource(kafkaConsumerConfig.getCityCode()); - boolean added = batchMessageProcessor.addMessage(esGpsInfo.toString()); + String rawMessage = record.value(); + JSONObject jsonObject = JSONUtil.parseObj(rawMessage); + jsonObject.set("infoSource", kafkaConsumerConfig.getCityCode()); + String finalJson = jsonObject.toString(); + boolean added = batchMessageProcessor.addMessage(finalJson); if (!added) { logger.warn("消息队列已满,丢弃消息: topic={}, partition={}, offset={}", record.topic(), record.partition(), record.offset());