From 16f8b6dec58b78359aceac03098ce6f3f99b3c74 Mon Sep 17 00:00:00 2001 From: luyya Date: Tue, 2 Sep 2025 16:16:23 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9C=81=E5=8E=85=E4=BD=8D=E7=BD=AE=E6=B1=87?= =?UTF-8?q?=E8=81=9A20250902=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dromara/data2kafka/consumer/Consumer.java | 73 +++------ .../data2kafka/consumer/KafkaProperties.java | 138 ++++++++++++++++++ .../data2kafka/consumer/RealConsumer.java | 2 +- .../src/main/resources/application.yml | 14 ++ .../schedule/RedisOnlineUserSchedule.java | 42 ++++++ .../dromara/data2es/service/IGpsService.java | 2 + .../data2es/service/impl/GpsServiceImpl.java | 20 ++- .../service/impl/DeviceRedisServiceImpl.java | 1 - .../mapper/system/DeviceRedisMapper.xml | 2 +- 9 files changed, 237 insertions(+), 57 deletions(-) create mode 100644 stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaProperties.java diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/Consumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/Consumer.java index 9ed151d2..4ec817a0 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/Consumer.java +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/Consumer.java @@ -1,35 +1,23 @@ package org.dromara.data2kafka.consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; -import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.AuthorizationException; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.dromara.data2kafka.config.KafkaProperties; -import org.dromara.data2kafka.config.LoginUtil; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.io.IOException; -import java.time.Duration; -import java.util.Collections; +import java.util.List; import java.util.Properties; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; -@Component +//@Component public class Consumer extends Thread { private static final Logger LOG = LoggerFactory.getLogger(Consumer.class); - private final KafkaConsumer consumer; - - - private volatile boolean closed; // 一次请求的最大等待时间(S) @@ -75,6 +63,9 @@ public class Consumer extends Thread { */ private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM"; + @Autowired + ThreadPoolExecutor dtpExecutor2; + /** * Consumer构造函数 * @@ -83,7 +74,10 @@ public class Consumer extends Thread { public Consumer() { initSecurity(); Properties props = initProperties(); - this.consumer = new KafkaConsumer<>(props); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(props,dtpExecutor2,"3408"); + executorService.execute(runnable); + } public static Properties initProperties() { @@ -113,42 +107,15 @@ public class Consumer extends Thread { // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + String topics = "topic.send.2,topic.send.3,topic.send.4,topic.send.5,topic.send.8"; + String[] split = topics.split(","); + List list = CollectionUtils.arrayToList(split); + props.put("topics",list); + return props; } - /** - * 订阅Topic的消息处理函数 - */ - public void run() { - while (!closed) { - try { - // 消息消费请求 - ConsumerRecords records = consumer.poll(Duration.ofSeconds(waitTime)); - // 消息处理 - for (ConsumerRecord record : records) { - LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() - + ") at offset " + record.offset()); - } - } catch (AuthorizationException | UnsupportedVersionException - | RecordDeserializationException e) { - LOG.error(e.getMessage()); - // 无法从异常中恢复 - closeThread(); - } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { - LOG.error("Invalid or no offset found, using latest"); - consumer.seekToEnd(e.partitions()); - consumer.commitSync(); - } catch (KafkaException e) { - LOG.error(e.getMessage()); - } - } - } - public void closeThread() { - if (!closed) { - closed = true; - } - } /** * 初始化安全认证 diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaProperties.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaProperties.java new file mode 100644 index 00000000..ee3943d3 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaProperties.java @@ -0,0 +1,138 @@ +package org.dromara.data2kafka.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties +{ + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String TOPIC = "jysb_dwxx"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private KafkaProperties() + { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/shengting/gpsstore/"; + LOG.info("路径=={}",filePath); + try + { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) + { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) + { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) + { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) + { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } + catch (IOException e) + { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() + { + if (null == instance) + { + instance = new KafkaProperties(); + } + + return instance; + } + + /** + * 获取参数值 + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) + { + String rtValue = null; + + if (null == key) + { + LOG.error("key is null"); + } + else + { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) + { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * @param key + * @return + */ + private String getPropertiesValue(String key) + { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) + { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) + { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) + { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java index 6fdca5d8..84c7356d 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java @@ -72,7 +72,7 @@ public class RealConsumer implements CommandLineRunner { Map kafkaProp = getKafkaProp(); checkNetworkConnection("53.1.213.25",21007); - if (LoginUtil.isSecurityModel()) + if (false) { try { diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml index 147b89cb..a1ca2e20 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml @@ -12,3 +12,17 @@ spring: active: @profiles.active@ autoconfigure: exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration + +# 日志配置 +logging: + level: + org.springframework: warn + org.apache.dubbo: warn + com.alibaba.nacos: warn + org.mybatis.spring.mapper: error + org.apache.dubbo.config: error + org.apache.kafka: DEBUG + org.springframework.kafka: DEBUG + # 临时处理 spring 调整日志级别导致启动警告问题 不影响使用等待 alibaba 适配 + org.springframework.context.support.PostProcessorRegistrationDelegate: error + config: classpath:logback-plus.xml diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/schedule/RedisOnlineUserSchedule.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/schedule/RedisOnlineUserSchedule.java index 48740302..632133a2 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/schedule/RedisOnlineUserSchedule.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/schedule/RedisOnlineUserSchedule.java @@ -1,5 +1,20 @@ package org.dromara.data2es.schedule; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import cn.hutool.json.JSONObject; +import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.dromara.data2es.service.IGpsService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + /** *

description:

* @@ -7,7 +22,34 @@ package org.dromara.data2es.schedule; * @date 2021-05-18 18:23 */ +@Configuration public class RedisOnlineUserSchedule { + @Autowired + IGpsService gpsService; + + @Scheduled(cron = "0 0/20 * * * ?") + public void redisTimeOutRemove(){ + List jlist = RedisUtils.searchAndGetKeysValues("online_users:*"); + List gpsInfoVO2s = new ArrayList<>(); + for (JSONObject job : jlist) { + String deviceType = job.getStr("deviceType"); + if ("05".equals(deviceType)){ + continue; + } + Integer online = job.getInt("online"); + if (0 == online){ + continue; + } + EsGpsInfoVO2 vo2 = BeanUtil.toBean(job, EsGpsInfoVO2.class); + if (1 == vo2.getOnline() && DateUtil.between(vo2.getGpsTime(), new Date(), DateUnit.SECOND) > 1800L){ + gpsInfoVO2s.add(vo2); + } + } + if (gpsInfoVO2s.size() > 0){ + gpsService.updateDataStatus(gpsInfoVO2s); + } + } + } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/IGpsService.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/IGpsService.java index 2b2f326d..3e0a5302 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/IGpsService.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/IGpsService.java @@ -14,4 +14,6 @@ public interface IGpsService { R saveDataBatch(List esGpsInfoVO2s); + R updateDataStatus(List esGpsInfoVO2s); + } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java index 66996c16..662cae16 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java @@ -124,7 +124,7 @@ public class GpsServiceImpl implements IGpsService { requestHandler.sendToKafka(info); } - requestHandler.redisOnlineUserBatch(onlineUserDataMap, 2592000); //存放30天 + requestHandler.redisOnlineUserBatch(onlineUserDataMap, 864000); //存放30天 requestHandler.redisOnlineUserBatch(orgCodeDataMap, 1800); //此处和buildRedisMap方法判断在线的时间一直 // requestHandler.redisDeleteBatch(deleteKeys); //此处换成了数据库的模式 所以这个就不用了 @@ -133,6 +133,24 @@ public class GpsServiceImpl implements IGpsService { return R.ok(); } + @Override + public R updateDataStatus(List esGpsInfoVO2s) { + Map onlineUserDataMap = new HashMap<>(); + for (EsGpsInfoVO2 info : esGpsInfoVO2s) { + String zzjgdm = info.getZzjgdm(); + String deviceCode = info.getDeviceCode(); + String deviceType = info.getDeviceType(); + String jsonValue = JSONUtil.toJsonStr(info); + + String onlineUsersKey = RedisConstants.ONLINE_USERS + + zzjgdm + ":" + deviceType + + ":" + deviceCode; + onlineUserDataMap.put(onlineUsersKey, jsonValue); + requestHandler.sendToKafka(info); + } + requestHandler.redisOnlineUserBatch(onlineUserDataMap, 864000); //存放10天 + return R.ok(); + } /** * 获取基本信息(主要是组织机构) 不查库 否者对库压力过大 diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java index ba8a4ae1..6a3c411d 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java @@ -46,7 +46,6 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService { Map uniqueMap = new LinkedHashMap<>(); for (DeviceRedis device : list) { String key = device.getDeviceCode() + "|" + - device.getDeviceType() + "|" + device.getInfoSource(); // 保留最后一次出现的记录(根据业务需求调整) uniqueMap.put(key, device); diff --git a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml index 37568851..2851a471 100644 --- a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml +++ b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml @@ -20,7 +20,7 @@ #{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm},#{entity.gpsTime},#{entity.infoSource} ) - ON conflict(device_code,device_type,info_source) do update set + ON conflict(device_code,info_source) do update set (online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gps_time)