省厅位置汇聚20250902版本

stwzhj
luyya 2025-09-02 16:16:23 +08:00
parent 81187efdbd
commit 16f8b6dec5
9 changed files with 237 additions and 57 deletions

View File

@ -1,35 +1,23 @@
package org.dromara.data2kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.dromara.data2kafka.config.KafkaProperties;
import org.dromara.data2kafka.config.LoginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@Component
//@Component
public class Consumer extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private final KafkaConsumer<String, String> consumer;
private volatile boolean closed;
// 一次请求的最大等待时间(S)
@ -75,6 +63,9 @@ public class Consumer extends Thread {
*/
private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM";
@Autowired
ThreadPoolExecutor dtpExecutor2;
/**
* Consumer
*
@ -83,7 +74,10 @@ public class Consumer extends Thread {
public Consumer() {
initSecurity();
Properties props = initProperties();
this.consumer = new KafkaConsumer<>(props);
ExecutorService executorService = Executors.newSingleThreadExecutor();
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(props,dtpExecutor2,"3408");
executorService.execute(runnable);
}
public static Properties initProperties() {
@ -113,42 +107,15 @@ public class Consumer extends Thread {
// 域名
props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
String topics = "topic.send.2,topic.send.3,topic.send.4,topic.send.5,topic.send.8";
String[] split = topics.split(",");
List list = CollectionUtils.arrayToList(split);
props.put("topics",list);
return props;
}
/**
* Topic
*/
public void run() {
while (!closed) {
try {
// 消息消费请求
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(waitTime));
// 消息处理
for (ConsumerRecord<String, String> record : records) {
LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value()
+ ") at offset " + record.offset());
}
} catch (AuthorizationException | UnsupportedVersionException
| RecordDeserializationException e) {
LOG.error(e.getMessage());
// 无法从异常中恢复
closeThread();
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
LOG.error("Invalid or no offset found, using latest");
consumer.seekToEnd(e.partitions());
consumer.commitSync();
} catch (KafkaException e) {
LOG.error(e.getMessage());
}
}
}
public void closeThread() {
if (!closed) {
closed = true;
}
}
/**
*

View File

@ -0,0 +1,138 @@
package org.dromara.data2kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public final class KafkaProperties
{
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
// Topic名称安全模式下需要以管理员用户添加当前用户的访问权限
public final static String TOPIC = "jysb_dwxx";
private static Properties serverProps = new Properties();
private static Properties producerProps = new Properties();
private static Properties consumerProps = new Properties();
private static Properties clientProps = new Properties();
private static KafkaProperties instance = null;
private KafkaProperties()
{
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/shengting/gpsstore/";
LOG.info("路径=={}",filePath);
try
{
File proFile = new File(filePath + "producer.properties");
if (proFile.exists())
{
producerProps.load(new FileInputStream(filePath + "producer.properties"));
}
File conFile = new File(filePath + "producer.properties");
if (conFile.exists())
{
consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
}
File serFile = new File(filePath + "server.properties");
if (serFile.exists())
{
serverProps.load(new FileInputStream(filePath + "server.properties"));
}
File cliFile = new File(filePath + "client.properties");
if (cliFile.exists())
{
clientProps.load(new FileInputStream(filePath + "client.properties"));
}
}
catch (IOException e)
{
LOG.info("The Exception occured.", e);
}
}
public synchronized static KafkaProperties getInstance()
{
if (null == instance)
{
instance = new KafkaProperties();
}
return instance;
}
/**
*
* @param key properiteskey
* @param defValue
* @return
*/
public String getValues(String key, String defValue)
{
String rtValue = null;
if (null == key)
{
LOG.error("key is null");
}
else
{
rtValue = getPropertiesValue(key);
}
if (null == rtValue)
{
LOG.warn("KafkaProperties.getValues return null, key is " + key);
rtValue = defValue;
}
LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
return rtValue;
}
/**
* keyserver.properties
* @param key
* @return
*/
private String getPropertiesValue(String key)
{
String rtValue = serverProps.getProperty(key);
// server.properties中没有则再向producer.properties中获取
if (null == rtValue)
{
rtValue = producerProps.getProperty(key);
}
// producer中没有则再向consumer.properties中获取
if (null == rtValue)
{
rtValue = consumerProps.getProperty(key);
}
// consumer没有则再向client.properties中获取
if (null == rtValue)
{
rtValue = clientProps.getProperty(key);
}
return rtValue;
}
}

View File

@ -72,7 +72,7 @@ public class RealConsumer implements CommandLineRunner {
Map kafkaProp = getKafkaProp();
checkNetworkConnection("53.1.213.25",21007);
if (LoginUtil.isSecurityModel())
if (false)
{
try
{

View File

@ -12,3 +12,17 @@ spring:
active: @profiles.active@
autoconfigure:
exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
# 日志配置
logging:
level:
org.springframework: warn
org.apache.dubbo: warn
com.alibaba.nacos: warn
org.mybatis.spring.mapper: error
org.apache.dubbo.config: error
org.apache.kafka: DEBUG
org.springframework.kafka: DEBUG
# 临时处理 spring 调整日志级别导致启动警告问题 不影响使用等待 alibaba 适配
org.springframework.context.support.PostProcessorRegistrationDelegate: error
config: classpath:logback-plus.xml

View File

@ -1,5 +1,20 @@
package org.dromara.data2es.schedule;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.data2es.domain.EsGpsInfoVO2;
import org.dromara.data2es.service.IGpsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* <p>description: </p>
*
@ -7,7 +22,34 @@ package org.dromara.data2es.schedule;
* @date 2021-05-18 18:23
*/
@Configuration
public class RedisOnlineUserSchedule {
@Autowired
IGpsService gpsService;
@Scheduled(cron = "0 0/20 * * * ?")
public void redisTimeOutRemove(){
List<JSONObject> jlist = RedisUtils.searchAndGetKeysValues("online_users:*");
List<EsGpsInfoVO2> gpsInfoVO2s = new ArrayList<>();
for (JSONObject job : jlist) {
String deviceType = job.getStr("deviceType");
if ("05".equals(deviceType)){
continue;
}
Integer online = job.getInt("online");
if (0 == online){
continue;
}
EsGpsInfoVO2 vo2 = BeanUtil.toBean(job, EsGpsInfoVO2.class);
if (1 == vo2.getOnline() && DateUtil.between(vo2.getGpsTime(), new Date(), DateUnit.SECOND) > 1800L){
gpsInfoVO2s.add(vo2);
}
}
if (gpsInfoVO2s.size() > 0){
gpsService.updateDataStatus(gpsInfoVO2s);
}
}
}

View File

@ -14,4 +14,6 @@ public interface IGpsService {
R saveDataBatch(List<EsGpsInfoVO2> esGpsInfoVO2s);
R updateDataStatus(List<EsGpsInfoVO2> esGpsInfoVO2s);
}

View File

@ -124,7 +124,7 @@ public class GpsServiceImpl implements IGpsService {
requestHandler.sendToKafka(info);
}
requestHandler.redisOnlineUserBatch(onlineUserDataMap, 2592000); //存放30天
requestHandler.redisOnlineUserBatch(onlineUserDataMap, 864000); //存放30天
requestHandler.redisOnlineUserBatch(orgCodeDataMap, 1800); //此处和buildRedisMap方法判断在线的时间一直
// requestHandler.redisDeleteBatch(deleteKeys); //此处换成了数据库的模式 所以这个就不用了
@ -133,6 +133,24 @@ public class GpsServiceImpl implements IGpsService {
return R.ok();
}
@Override
public R updateDataStatus(List<EsGpsInfoVO2> esGpsInfoVO2s) {
Map<String, String> onlineUserDataMap = new HashMap<>();
for (EsGpsInfoVO2 info : esGpsInfoVO2s) {
String zzjgdm = info.getZzjgdm();
String deviceCode = info.getDeviceCode();
String deviceType = info.getDeviceType();
String jsonValue = JSONUtil.toJsonStr(info);
String onlineUsersKey = RedisConstants.ONLINE_USERS +
zzjgdm + ":" + deviceType +
":" + deviceCode;
onlineUserDataMap.put(onlineUsersKey, jsonValue);
requestHandler.sendToKafka(info);
}
requestHandler.redisOnlineUserBatch(onlineUserDataMap, 864000); //存放10天
return R.ok();
}
/**
*

View File

@ -46,7 +46,6 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService {
Map<String, DeviceRedis> uniqueMap = new LinkedHashMap<>();
for (DeviceRedis device : list) {
String key = device.getDeviceCode() + "|" +
device.getDeviceType() + "|" +
device.getInfoSource();
// 保留最后一次出现的记录(根据业务需求调整)
uniqueMap.put(key, device);

View File

@ -20,7 +20,7 @@
#{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm},#{entity.gpsTime},#{entity.infoSource}
)
</foreach>
ON conflict(device_code,device_type,info_source) do update set
ON conflict(device_code,info_source) do update set
(online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gps_time)
</insert>