省厅位置汇聚统计、redis值为空判断、location新增点线面查询

stwzhj
luyya 2025-08-05 17:44:48 +08:00
parent 1f906ebb39
commit bd7fbfbde4
38 changed files with 1346 additions and 341 deletions

View File

@ -602,6 +602,9 @@ public class RedisUtils {
public static JSONObject getBucket(String key){
RBucket<Object> bucket = CLIENT.getBucket(key);
Object value = bucket.get();
if (null == value){
return null;
}
return JSONUtil.parseObj(value.toString());
}

View File

@ -1,155 +0,0 @@
package org.dromara.data2es.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.config.SslConfigs;
import org.dromara.data2es.producer.NewProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Properties;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-03 14:15
*/
@Component
public class KafkaConfig {
private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
private String kafkaServers = "53.1.212.25:21009,53.1.212.26:21009,53.1.212.27:21009"; //省厅 kafka
// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网
// private String kafkaServers = "34.72.62.93:9092";//六安视频网
// private String kafkaServers = "127.0.0.1:9092";//本地
// private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供
private String groupId = "ruansiProducer";
private static final Logger LOG = LoggerFactory.getLogger(NewProducer.class);
// Broker地址列表
private final String bootstrapServers = "bootstrap.servers";
// 客户端ID
private final String clientId = "client.id";
// Key序列化类
private final String keySerializer = "key.serializer";
// Value序列化类
private final String valueSerializer = "value.serializer";
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
private final String securityProtocol = "security.protocol";
// 服务名
private final String saslKerberosServiceName = "sasl.kerberos.service.name";
// 域名
private final String kerberosDomainName = "kerberos.domain.name";
//默认发送20条消息
private final int messageNumToSend = 100;
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
private static final String USER_NAME = "yhy_ahrs_rcw";
private static final String PASS_WORD = "Ycgis@2509";
/**
* Producer
* @param
* @param
*/
@Bean(name = "myKafkaProducer")
public KafkaProducer newProducer() {
Properties props = new Properties();
if (true)
{
try
{
logger.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
// LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
props.put(securityProtocol, "SASL_SSL");
props.put("sasl.mechanism", "PLAIN"); // 使用 PLAIN 机制
// SSL 配置 - 使用系统默认信任库
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/home/kafka.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Ycgis@2509");
props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, "JKS");
// PLAIN 机制的 JAAS 配置
String jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ "username=\"" + USER_NAME + "\" "
+ "password=\"" + PASS_WORD + "\";";
props.put("sasl.jaas.config", jaasConfig);
}
catch (Exception e)
{
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);
return null;
}
logger.info("Security prepare success.");
}else{
props.put(securityProtocol, "PLAINTEXT");
}
// Broker地址列表
props.put(bootstrapServers,kafkaServers);
// 客户端ID
props.put(clientId, "ruansiProducer");
// Key序列化类
props.put(keySerializer,
"org.apache.kafka.common.serialization.IntegerSerializer");
// Value序列化类
props.put(valueSerializer,
"org.apache.kafka.common.serialization.StringSerializer");
//批量发送信息配置
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
//props.put(securityProtocol, "SASL_PLAINTEXT");
// // 服务名
// props.put(saslKerberosServiceName, "kafka");
// // 域名
// props.put(kerberosDomainName, "hadoop.hadoop.com");
//设置自定义的分区策略类默认不传key是粘性分区尽量往一个分区中发消息。如果key不为null则默认是按照key的hashcode与 partition的取余来决定哪个partition
//props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
// props.put(securityProtocol, "SASL_PLAINTEXT");
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
// props.put("sasl.mechanism", "SCRAM-SHA-256");
// KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaProducer producer = new KafkaProducer<>(props);
return producer;
}
}

View File

@ -0,0 +1,138 @@
package org.dromara.data2es.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public final class KafkaProperties
{
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
// Topic名称安全模式下需要以管理员用户添加当前用户的访问权限
public final static String TOPIC = "jysb_dwxx";
private static Properties serverProps = new Properties();
private static Properties producerProps = new Properties();
private static Properties consumerProps = new Properties();
private static Properties clientProps = new Properties();
private static KafkaProperties instance = null;
private KafkaProperties()
{
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/home/rsoft/config/";
LOG.info("路径=={}",filePath);
try
{
File proFile = new File(filePath + "producer.properties");
if (proFile.exists())
{
producerProps.load(new FileInputStream(filePath + "producer.properties"));
}
File conFile = new File(filePath + "producer.properties");
if (conFile.exists())
{
consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
}
File serFile = new File(filePath + "server.properties");
if (serFile.exists())
{
serverProps.load(new FileInputStream(filePath + "server.properties"));
}
File cliFile = new File(filePath + "client.properties");
if (cliFile.exists())
{
clientProps.load(new FileInputStream(filePath + "client.properties"));
}
}
catch (IOException e)
{
LOG.info("The Exception occured.", e);
}
}
public synchronized static KafkaProperties getInstance()
{
if (null == instance)
{
instance = new KafkaProperties();
}
return instance;
}
/**
*
* @param key properiteskey
* @param defValue
* @return
*/
public String getValues(String key, String defValue)
{
String rtValue = null;
if (null == key)
{
LOG.error("key is null");
}
else
{
rtValue = getPropertiesValue(key);
}
if (null == rtValue)
{
LOG.warn("KafkaProperties.getValues return null, key is " + key);
rtValue = defValue;
}
LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
return rtValue;
}
/**
* keyserver.properties
* @param key
* @return
*/
private String getPropertiesValue(String key)
{
String rtValue = serverProps.getProperty(key);
// server.properties中没有则再向producer.properties中获取
if (null == rtValue)
{
rtValue = producerProps.getProperty(key);
}
// producer中没有则再向consumer.properties中获取
if (null == rtValue)
{
rtValue = consumerProps.getProperty(key);
}
// consumer没有则再向client.properties中获取
if (null == rtValue)
{
rtValue = clientProps.getProperty(key);
}
return rtValue;
}
}

View File

@ -1,93 +0,0 @@
package org.dromara.data2es.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-10-28 14:48
*/
public class KafkaSecurityUtil {
static Logger logger = LoggerFactory.getLogger(KafkaSecurityUtil.class);
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
public static void securityPrepare() throws IOException
{
//logger.error("进入了---securityPrepare");
//String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
//String krbFile = filePath + "krb5.conf";
//ClassPathResource classPathResource = new ClassPathResource("krb5.conf");
//String krbFile = classPathResource.getAbsolutePath();
String krbFile = "/rsoft/config/krb5.conf";
// String userKeyTableFile = filePath + USER_KEYTAB_FILE;
//ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE);
String userKeyTableFile = "/rsoft/config/user.keytab";
//windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/A528C942_01A6_1BEF_7A75_0187DC82C40F.COM");
//logger.error("userKeyTableFile路径---{}",userKeyTableFile);
LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile);
}
public static Boolean isSecurityModel()
{
Boolean isSecurity = false;
//String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
//ClassPathResource classPathResource = new ClassPathResource("kafkaSecurityMode");
InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafkaSecurityMode");
/*File file = classPathResource.getFile();
if(!file.exists()){
return isSecurity;
}*/
Properties securityProps = new Properties();
try
{
securityProps.load(inputStream);
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
{
isSecurity = true;
}
}
catch (Exception e)
{
logger.info("The Exception occured : {}.", e);
}
return isSecurity;
}
/*
*
*/
private static boolean isFileExists(String fileName)
{
File file = new File(fileName);
return file.exists();
}
}

View File

@ -1,7 +1,5 @@
package org.dromara.data2es.config;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -207,7 +205,7 @@ public class LoginUtil {
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/rsoft/config/";
String filePath = "/home/rsoft/config/";
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + keyTabFile;
@ -227,8 +225,8 @@ public class LoginUtil {
*/
public static Boolean isSecurityModel() {
Boolean isSecurity = false;
String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
String krbFilePath = "/home/rsoft/config/kafkaSecurityMode";
Properties securityProps = new Properties();
// file does not exist.

View File

@ -1,24 +1,40 @@
package org.dromara.data2es.config;
import org.dromara.data2es.handler.RedisExpireListener;
import org.dromara.data2es.handler.RedisExpireRecoveryHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
return listenerContainer;
RedisMessageListenerContainer listenerContainer(
RedisConnectionFactory connectionFactory,
RedisExpireRecoveryHandler recoveryHandler) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 添加连接监听器用于故障转移恢复
container.addMessageListener(recoveryHandler, new PatternTopic("__keyspace@*__:expired"));
return container;
}
@Bean
KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
return new RedisExpireListener(listenerContainer);
KeyExpirationEventMessageListener redisKeyExpirationListener(
RedisMessageListenerContainer listenerContainer,
RedisExpireRecoveryHandler recoveryHandler) {
return new RedisExpireListener(listenerContainer, recoveryHandler);
}
@Bean
RedisExpireRecoveryHandler redisExpireRecoveryHandler() {
return new RedisExpireRecoveryHandler();
}
}

View File

@ -1,5 +1,6 @@
package org.dromara.data2es.dubbo;
import cn.hutool.core.bean.BeanUtil;
import lombok.RequiredArgsConstructor;
import org.apache.dubbo.config.annotation.DubboService;
import org.dromara.common.core.domain.R;
@ -21,6 +22,6 @@ public class RemoteDataToEsServiceImpl implements RemoteDataToEsService {
@Override
public R saveDataBatch(List<RemoteGpsInfo> gpsInfoList) {
return gpsService.saveDataBatch(MapstructUtils.convert(gpsInfoList, EsGpsInfoVO2.class));
return gpsService.saveDataBatch(BeanUtil.copyToList(gpsInfoList, EsGpsInfoVO2.class));
}
}

View File

@ -8,52 +8,78 @@ import org.dromara.common.core.utils.RedisConstants;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.data2es.controller.DataToEsController;
import org.dromara.data2es.domain.EsGpsInfoVO2;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.Date;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-08 16:40
*/
@Component
@Slf4j
public class RedisExpireListener extends KeyExpirationEventMessageListener {
private final RedisExpireRecoveryHandler recoveryHandler;
@Autowired
DataToEsController dataToEsController;
private volatile boolean active = true;
Logger logger = LoggerFactory.getLogger(RedisExpireListener.class);
public RedisExpireListener(
RedisMessageListenerContainer listenerContainer,
RedisExpireRecoveryHandler recoveryHandler) {
/**
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
*/
public RedisExpireListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
this.recoveryHandler = recoveryHandler;
recoveryHandler.registerListener(this);
}
@Override
public void init() {
try {
super.init();
log.info("Redis过期监听器初始化成功");
} catch (Exception e) {
log.error("监听器初始化失败", e);
}
}
public void reconnect() {
if (!active) return;
try {
log.info("尝试重新注册过期事件监听器...");
// 停止当前监听
super.destroy();
// 重新初始化
super.init();
log.info("过期事件监听器重新注册成功");
} catch (Exception e) {
log.error("重新注册监听器失败", e);
}
}
@Override
public void onMessage(Message message, byte[] pattern) {
if (!active) return;
String expireKey = message.toString();
log.info("过期的Key={}", expireKey);
if (StringUtils.isNotEmpty(expireKey) &&
expireKey.startsWith(RedisConstants.ORG_CODE_PRE)) {
log.info("在线定位过期的Key={}", expireKey);
handleExpiredEvent(expireKey);
}
}
@ -61,6 +87,7 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener {
private void handleExpiredEvent(String expiredKey) {
RedissonClient redisson = RedisUtils.getClient();
RLock lock = redisson.getLock("LOCK:" + expiredKey);
try {
if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
// 实际业务逻辑
@ -68,22 +95,69 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener {
String zzjgdm = split[1];
String deviceType = split[2];
String deviceCode = split[3];
log.error("redis key expired:key={}",expiredKey);
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS+ zzjgdm +":" + deviceType+":"+deviceCode);
if (Objects.isNull(object)) {
log.info("redis key={},Object=nulldeviceType={},deviceCode={}", expiredKey,deviceType,deviceCode);
if ("5".equals(deviceType) ) {
return;
}
log.info("处理过期Key: {}", expiredKey);
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS +zzjgdm +":"+ deviceType + ":" + deviceCode);
if (Objects.isNull(object)) {
log.info("redis key={},Object=nulldeviceType={},deviceCode={}",
expiredKey, deviceType, deviceCode);
return;
}
EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class);
gpsInfo.setOnline(0);
dataToEsController.saveGpsInfo(gpsInfo);
log.info("redis key expired:key={}", expiredKey);
log.info("处理完成: key={}", expiredKey);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
log.error("处理过期事件被中断", e);
} catch (Exception e) {
log.error("处理过期事件异常", e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
@Override
public void destroy() {
active = false;
try {
super.destroy();
} catch (Exception e) {
throw new RuntimeException(e);
}
log.info("Redis过期监听器已停止");
}
// 添加连接状态监听使用Redisson事件总线
@PostConstruct
public void addSentinelConnectionListener() {
try {
RedissonClient redisson = RedisUtils.getClient();
// 订阅Redisson连接事件
RTopic connectionEvents = redisson.getTopic("__redisson_connection_event");
connectionEvents.addListener(String.class, (channel, msg) -> {
if ("CONNECTED".equals(msg)) {
log.info("Redis连接已建立: {}", msg);
// 标记需要恢复监听
recoveryHandler.markReconnected();
} else if ("DISCONNECTED".equals(msg)) {
log.warn("Redis连接断开: {}", msg);
}
});
log.info("已注册Redisson连接事件监听器");
} catch (Exception e) {
log.warn("无法添加Redisson连接事件监听器", e);
}
}
}

View File

@ -0,0 +1,36 @@
package org.dromara.data2es.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@Component
public class RedisExpireRecoveryHandler implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(RedisExpireRecoveryHandler.class);
private final AtomicBoolean reconnected = new AtomicBoolean(false);
private final AtomicReference<RedisExpireListener> listenerRef = new AtomicReference<>();
public void registerListener(RedisExpireListener listener) {
this.listenerRef.set(listener);
}
@Override
public void onMessage(Message message, byte[] pattern) {
// 检测到任何事件时,检查是否需要恢复监听
if (reconnected.compareAndSet(true, false) && listenerRef.get() != null) {
log.warn("检测到Redis事件尝试重新注册主监听器...");
listenerRef.get().reconnect();
}
}
public void markReconnected() {
reconnected.set(true);
}
}

View File

@ -13,7 +13,7 @@ import org.apache.commons.lang.StringUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.data2es.domain.EsGpsInfo;
import org.dromara.data2es.domain.EsGpsInfoVO2;
import org.dromara.data2es.producer.NewProducer;
import org.dromara.data2es.producer.Producer;
import org.dromara.data2es.service.IGpsService;
import org.dromara.data2es.util.ConfigConstants;
import org.elasticsearch.action.bulk.BulkRequest;
@ -34,7 +34,7 @@ import java.util.concurrent.CompletableFuture;
public class RequestHandler {
@Autowired
private NewProducer producer;
private Producer producer;
@Autowired
private RestHighLevelClient restHighLevelClient;
@ -70,12 +70,12 @@ public class RequestHandler {
//kafkaProducer.send(esGpsInfo, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+ infoSource);
//todo 2023年3月30日 cpu过载暂时隐藏
producer.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2));
logger.info("发送消息topic={}",esGpsInfoVO2);
producer.sendMessage(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2));
//kafkaProducer.send(esGpsInfoVO2, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType);
//地市的kafka数据如接收地市某个设备的数据可以对接此kafka topic
//todo 暂时隐藏
producer.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2));
// producer.sendMessage(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2));
}
}

View File

@ -16,7 +16,7 @@ import javax.annotation.Resource;
* @author chenle
* @date 2021-11-01 17:20
*/
@Component
//@Component
public class NewProducer {
@Autowired

View File

@ -0,0 +1,213 @@
package org.dromara.data2es.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.dromara.data2es.config.KafkaProperties;
import org.dromara.data2es.config.LoginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-03 14:15
*/
@Component
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private final KafkaProducer<String, String> producer;
// 私有静态实例volatile 保证可见性和有序性)
private static volatile Producer instance;
private final Boolean isAsync = true;
// Broker地址列表
private final static String BOOTSTRAP_SERVER = "bootstrap.servers";
// 客户端ID
private final static String CLIENT_ID = "client.id";
// Key序列化类
private final static String KEY_SERIALIZER = "key.serializer";
// Value序列化类
private final static String VALUE_SERIALIZER = "value.serializer";
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
private final static String SECURITY_PROTOCOL = "security.protocol";
// 服务名
private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
// 域名
private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name";
// 分区类名
private final static String PARTITIONER_NAME = "partitioner.class";
// 默认发送100条消息
private final static int MESSAGE_NUM = 100;
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw";
/**
* Producer constructor
*
*/
public Producer() {
initSecurity();
Properties props = initProperties();
this.producer = new KafkaProducer<>(props);
}
// 获取单例实例的公共方法(双重校验锁)
public static Producer getInstance() {
if (instance == null) {
synchronized (Producer.class) {
if (instance == null) {
instance = new Producer();
}
}
}
return instance;
}
// 添加 ShutdownHook 确保资源释放(推荐)
static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (instance != null && instance.producer != null) {
instance.producer.close();
}
}));
}
/**
*
*/
public void initSecurity() {
if (LoginUtil.isSecurityModel())
{
try {
logger.info("Securitymode start.");
// !!注意,安全认证时,需要用户手动修改为自己申请的机机账号
LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
} catch (IOException e) {
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);
}
logger.info("Security prepare success.");
}
}
public static Properties initProperties() {
Properties props = new Properties();
KafkaProperties kafkaProc = KafkaProperties.getInstance();
// Broker地址列表
props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
// 客户端ID
props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
// Key序列化类
props.put(KEY_SERIALIZER,
kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
// Value序列化类
props.put(VALUE_SERIALIZER,
kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
// 服务名
props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
// 域名
props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
// 分区类名
// props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner"));
return props;
}
/**
*
*
* @param topic
* @param message
* @return RecordMetadata null
*/
public RecordMetadata sendMessage(String topic, String message) {
try {
logger.info("发送消息topic={},info={}",topic,message);
long startTime = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
if (isAsync) {
// 异步发送
producer.send(record, new DemoCallBack(startTime,topic, message));
return null;
} else {
Future<RecordMetadata> future = producer.send(record);
logger.info("同步发送成功: Object={}", future.get().topic());
return future.get();
}
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
*
*/
private static class DemoCallBack implements Callback {
private final Logger logger = LoggerFactory.getLogger(DemoCallBack.class);
private final long startTime;
private final String topic;
private final String message;
public DemoCallBack(long startTime, String topic, String message) {
this.startTime = startTime;
this.topic = topic;
this.message = message;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
logger.info("topic=({}, {}) sent to partition({}), offset({}) in {} ms",
topic, message, metadata.partition(), metadata.offset(), elapsedTime);
} else if (exception != null) {
logger.error("Message sending failed", exception);
}
}
}
}

View File

@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
@ -47,7 +48,7 @@ import java.util.*;
@Service
public class GpsServiceImpl implements IGpsService {
@Autowired
@Resource(name = "restHighLevelClient")
private RestHighLevelClient restHighLevelClient;
@DubboReference
@ -112,6 +113,7 @@ public class GpsServiceImpl implements IGpsService {
}
//设置地市zzjgdm
info = getInfoByInfoSource(info);
//redis
buildRedisMap(info,onlineUserDataMap,orgCodeDataMap,deleteKeys);
@ -122,9 +124,9 @@ public class GpsServiceImpl implements IGpsService {
requestHandler.sendToKafka(info);
}
requestHandler.redisOnlineUserBatch(onlineUserDataMap, RedisConstants.REDIS_ONLINE_USER_NEVER_EXPIRE);
requestHandler.redisOnlineUserBatch(orgCodeDataMap, 300);
requestHandler.redisDeleteBatch(deleteKeys);
requestHandler.redisOnlineUserBatch(onlineUserDataMap, 2592000); //存放30天
requestHandler.redisOnlineUserBatch(orgCodeDataMap, 3600); //此处和buildRedisMap方法判断在线的时间一直
// requestHandler.redisDeleteBatch(deleteKeys); //此处换成了数据库的模式 所以这个就不用了
requestHandler.esRealBulkSave(bulkRequest);
@ -151,7 +153,7 @@ public class GpsServiceImpl implements IGpsService {
esGpsInfoVO2.setPoliceNo(vo.getPoliceNo());
esGpsInfoVO2.setCarNum(vo.getCarNum());
String deviceType = vo.getDeviceType();
if(StringUtils.isNotBlank(deviceType)){
/*if(StringUtils.isNotBlank(deviceType)){
deviceType = deviceType.replaceAll("\"", "");
if(deviceType.charAt(0) == '0' && deviceType.length() > 1){
deviceType = deviceType.substring(1);
@ -159,13 +161,37 @@ public class GpsServiceImpl implements IGpsService {
deviceType = "2";
}
}
}
}*/
esGpsInfoVO2.setDeviceType(deviceType);
}else {
esGpsInfoVO2.setDeviceType("99");
esGpsInfoVO2.setZzjgdm(esGpsInfo.getInfoSource()+"00000000");
}
}else {
String deviceType = esGpsInfoVO2.getDeviceType();
switch (deviceType){
case "1" :
esGpsInfoVO2.setDeviceType("03");
break;
case "2" :
esGpsInfoVO2.setDeviceType("01");
break;
case "3" :
esGpsInfoVO2.setDeviceType("03");
break;
case "4" :
esGpsInfoVO2.setDeviceType("04");
break;
case "5" :
esGpsInfoVO2.setDeviceType("05");
break;
case "8" :
esGpsInfoVO2.setDeviceType("01");
break;
default:
esGpsInfoVO2.setDeviceType("99");
}
}
@ -207,6 +233,7 @@ public class GpsServiceImpl implements IGpsService {
return todayIndexName;
}
//存入数据到索引
private IndexRequest getIndexRequest(String indexName, GpsInfoEntity gpsInfoEntity) {
Date gpsTime = gpsInfoEntity.getGpsTime();
@ -215,6 +242,14 @@ public class GpsServiceImpl implements IGpsService {
gpsInfoEntity.setGpsTime(dateTime.toJdkDate());
}
Map<String, Object> map = BeanUtil.beanToMap(gpsInfoEntity);
Map<String, Object> geoShape = new HashMap<>();
geoShape.put("type", "Point"); // 注意首字母大写!
geoShape.put("coordinates", Arrays.asList(
gpsInfoEntity.getLocation()[1], // 经度 (X)
gpsInfoEntity.getLocation()[0] // 纬度 (Y)
));
map.put("location_shape", geoShape); // geo_shape格式
UUID uuid = UUID.randomUUID();
gpsInfoEntity.setId(uuid.toString());
IndexRequest indexRequest = new IndexRequest(indexName,"_doc",uuid.toString()).source(map);
@ -232,7 +267,10 @@ public class GpsServiceImpl implements IGpsService {
boolean exists = false;
try {
exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
} catch (Exception e) {
if (e.getMessage().contains("403 Forbidden")){
return false;
}
e.printStackTrace();
}
// logger.error("exists,{}",exists);
@ -296,13 +334,7 @@ public class GpsServiceImpl implements IGpsService {
//2、计算某个类型的 keys *:deviceType:*,计算某个市局某个类型 keys org_code:3401*:2*
String orgCodeKey = RedisConstants.ORG_CODE_PRE + zzjgdm + ":" + deviceType
+ ":" + deviceCode;
if(esGpsInfoVo2.getOnline() == 1) {
orgCodeDataMap.put(orgCodeKey, jsonValue);
}else{
deleteKeys.add(orgCodeKey); //离线的删除
}
}
}
}
@ -319,12 +351,18 @@ public class GpsServiceImpl implements IGpsService {
.startObject("deviceCode")
.field("type", "text")
.endObject()
.startObject("infoSource")
.field("type", "text")
.endObject()
.startObject("deviceType")
.field("type", "text")
.endObject()
.startObject("location")
.field("type", "geo_point")
.endObject()
.startObject("location_shape")
.field("type", "geo_shape")
.endObject()
.startObject("orientation")
.field("type", "text")
.endObject()
@ -349,7 +387,7 @@ public class GpsServiceImpl implements IGpsService {
.put("index.number_of_replicas", 1));
logger.error("generateMappingRequest-index创建成功");
} catch (IOException e) {
} catch (Exception e) {
e.printStackTrace();
logger.error("generateMappingRequest-index创建失败");
}

View File

@ -0,0 +1,44 @@
package org.dromara.data2es.util;
import org.apache.commons.codec.binary.Base64;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* base64
*
* @since 2020-09-30
*/
public class Base64Utils {
private static final Logger LOG = LogManager.getLogger(Base64Utils.class);
public static void main(String[] args) {
if (args != null
&& args.length >= 1
&& args[0] != null
&& !args[0].isEmpty()) {
System.out.println(encodeBase64(args[0]));
}
}
/**
* base64
*
* @param needEncodeString
* @return
*/
private static String encodeBase64(String needEncodeString) {
return Base64.encodeBase64String(needEncodeString.getBytes());
}
/**
* base64
*
* @param needDecodeBase64Str
* @return
*/
private static String decodeBase64(String needDecodeBase64Str) {
byte[] result = Base64.decodeBase64(needDecodeBase64Str.getBytes());
return new String(result);
}
}

View File

@ -0,0 +1,80 @@
package org.dromara.data2es.util;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.hwclient.HwRestClient;
import java.io.File;
import java.io.IOException;
/**
*
*
* @since 2020-09-30
*/
public class HwRestClientUtils {
private static final Logger LOG = LogManager.getLogger(HwRestClientUtils.class);
/**
*
*/
private static final int CONFIG_PATH_ARGUMENT_INDEX = 0;
/**
* HwRestClient
*
* @param args
* @return HwRestClient
*/
public static HwRestClient getHwRestClient(String[] args) {
HwRestClient hwRestClient;
if (args == null
|| args.length < 1
|| args[CONFIG_PATH_ARGUMENT_INDEX] == null
|| args[CONFIG_PATH_ARGUMENT_INDEX].isEmpty()) {
hwRestClient = new HwRestClient();
} else {
String configPath = args[CONFIG_PATH_ARGUMENT_INDEX];
File configFile = new File(configPath);
if (configFile.exists()) {
if (configFile.isDirectory()) {
hwRestClient = new HwRestClient(configPath);
} else {
try {
hwRestClient =
new HwRestClient(
configFile
.getCanonicalPath()
.substring(
0,
configFile.getCanonicalPath().lastIndexOf(File.separator) + 1),
configFile.getName());
} catch (IOException e) {
hwRestClient = new HwRestClient();
}
}
} else {
hwRestClient = new HwRestClient();
}
}
return hwRestClient;
}
/**
* high level
*
* @param highLevelClient high level
* @return
*/
public static boolean isExistIndexForHighLevel(RestHighLevelClient highLevelClient, String indexName) {
GetIndexRequest isExistsRequest = new GetIndexRequest(indexName);
try {
return highLevelClient.indices().exists(isExistsRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
LOG.error("Judge index exist {} failed", indexName, e);
}
return false;
}
}

View File

@ -104,38 +104,48 @@
<artifactId>stwzhj-api-resource</artifactId>
</dependency>
<!--elasticsearch-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.14.0</version>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.14.0</version>
<version>7.10.2-h0.cbu.mrs.350.r11</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14.0</version>
<version>7.10.2-h0.cbu.mrs.350.r11</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>parent-join-client</artifactId>
</exclusion>
<exclusion>
<artifactId>elasticsearch-rest-client</artifactId>
<groupId>org.elasticsearch.client</groupId>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>aggs-matrix-stats-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.10.2-h0.cbu.mrs.350.r11</version>
</dependency>
<!-- JTS 几何库 -->
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.18.2</version>
</dependency>
<!-- Spatial4j 用于空间计算 -->
<dependency>
<groupId>org.locationtech.spatial4j</groupId>
<artifactId>spatial4j</artifactId>
<version>0.8</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>

View File

@ -19,9 +19,9 @@ import java.util.List;
* restHighLevelClient
*/
@Slf4j
@Data
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
//@Data
//@Configuration
//@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticsearchConfig {
// es host ip 地址(集群)

View File

@ -0,0 +1,39 @@
package org.dromara.location.config;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.hwclient.HwRestClient;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-07-05 18:22
*/
@Component(value = "esConfig")
public class EsConfig {
private String prefix = "gpsinfo";
public String indexNameByDay(){
return prefix+ LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
}
@Bean(destroyMethod = "close",name = "restHighLevelClient")
public RestHighLevelClient restClient() {
// String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator;
String configPath = "/home/rsoft/config/";
// KAFKA("KafkaClient"), ZOOKEEPER("Client");
// GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient");
// GenerateEnumUtil.addEnum(LoginUtil.Module.class,"ZOOKEEPER","Client");
HwRestClient hwRestClient = new HwRestClient(configPath);
RestHighLevelClient highLevelClient = new RestHighLevelClient(hwRestClient.getRestClientBuilder());
return highLevelClient;
}
}

View File

@ -4,12 +4,21 @@ package org.dromara.location.controller;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.domain.R;
import org.dromara.common.web.core.BaseController;
import org.dromara.location.domain.EsGpsInfoVO2;
import org.dromara.location.domain.SpatialQueryRequest;
import org.dromara.location.service.ISearchService;
import org.dromara.location.service.impl.CorrectGeoQueryService;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
@ -20,6 +29,8 @@ public class ElasticSearchController extends BaseController {
private final ISearchService searchService;
private final CorrectGeoQueryService geoQueryService;
@RequestMapping("/searchCar")
public R searchByType(@RequestBody Map<String,Object> params){
//String startTime,String endTime,String deviceId
@ -39,4 +50,40 @@ public class ElasticSearchController extends BaseController {
return R.ok(gpsInfoEntities);
}
/*
*
* */
@PostMapping("/spatial-query")
public R spatialQuery(@RequestBody SpatialQueryRequest request) {
String todayIndexName = "rs_gpsinfo"+ LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
try {
long startTime = System.currentTimeMillis();
// 1. 构建空间查询
QueryBuilder spatialQuery = geoQueryService.buildSpatialQuery(request);
// 2. 添加时间范围过滤
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(spatialQuery);
if (request.getStartTime() != null && request.getEndTime() != null) {
boolQuery.filter(QueryBuilders.rangeQuery("gpsTime")
.gte(request.getStartTime())
.lte(request.getEndTime()));
}
// 3. 执行轨迹查询(按设备去重)
List<EsGpsInfoVO2> results = searchService.queryDistinctDevicesNearPoint(boolQuery,todayIndexName);
// 4. 构建响应
return R.ok(results);
} catch (IllegalArgumentException e) {
return R.fail(e.getMessage());
} catch (Exception e) {
return R.fail("服务器内部错误");
}
}
}

View File

@ -40,7 +40,6 @@ public class LocationController {
* */
@PostMapping("/getAllLocation")
public R getAllLocaltion(@RequestBody Map<String,Object> params){
String now = DateUtil.format(new Date(),"YYYY-MM-dd");
String keys = "online_users:";
String key = null; // 在不同条件下赋值 最后根据此key取值
if(CollectionUtils.isEmpty(params)){

View File

@ -0,0 +1,36 @@
package org.dromara.location.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
public class EsGpsInfo implements Serializable {
private static final long serialVersionUID = 6429544067398830194L;
/**
*
*/
private String deviceCode;
private String deviceType;
private String lat;
private String lng;
//方向
private String orientation;
//高程
private String height;
//精度
private String deltaH;
private String speed;
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date gpsTime;
//3401 ,3402 地市代码
private String infoSource;
private Integer online;
}

View File

@ -0,0 +1,21 @@
package org.dromara.location.domain;
import lombok.Data;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-10-11 15:14
*/
@Data
public class EsGpsInfoVO2 extends EsGpsInfo {
private static final long serialVersionUID = -4252583194984423318L;
private String zzjgdm;
private String zzjgmc;
private String policeNo;
private String policeName;
private String phoneNum;
private String carNum;
}

View File

@ -0,0 +1,53 @@
package org.dromara.location.domain;
import lombok.Data;
import java.util.List;
@Data
public class SpatialQueryRequest {
public enum QueryType { POINT, LINE, POLYGON }
private QueryType queryType;
// 点查询参数
private Point center;
private Double radius;
private DistanceUnit unit = DistanceUnit.KILOMETERS;
// 线查询参数
private List<Point> line;
private Double buffer; // 缓冲区距离(单位:米)
// 面查询参数
private List<Point> polygon;
// 时间范围过滤(可选)
private String startTime;
private String endTime;
// 分页参数(可选)
private Integer page = 0;
private Integer size = 100;
@Data
public static class Point {
private double lon;
private double lat;
}
public enum DistanceUnit {
METERS(org.elasticsearch.common.unit.DistanceUnit.METERS),
KILOMETERS(org.elasticsearch.common.unit.DistanceUnit.KILOMETERS),
MILES(org.elasticsearch.common.unit.DistanceUnit.MILES);
private final org.elasticsearch.common.unit.DistanceUnit esUnit;
DistanceUnit(org.elasticsearch.common.unit.DistanceUnit esUnit) {
this.esUnit = esUnit;
}
public org.elasticsearch.common.unit.DistanceUnit getEsUnit() {
return esUnit;
}
}
}

View File

@ -0,0 +1,49 @@
package org.dromara.location.domain.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.springframework.data.annotation.Id;
import java.io.Serializable;
import java.util.Date;
/**
* <p>description: </p>
* gps(es)
* @author chenle
* @date 2021-05-14 9:39
*/
@Data
//@Document(indexName = "#{esConfig.indexNameByDay()}" ,shards = 3,replicas = 1,createIndex = false)
public class GpsInfoEntity implements Serializable {
private static final long serialVersionUID = 7233463305371277306L;
@Id
private String id;
private String deviceCode;
/**
*
*/
private String deviceType;
private Double[] location;
//方向
private String orientation;
//高程
private String height;
//精度
private String deltaH;
private String speed;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gpsTime;
//地市代码 34013402
private String infoSource;
}

View File

@ -1,5 +1,9 @@
package org.dromara.location.service;
import org.dromara.location.domain.EsGpsInfoVO2;
import org.elasticsearch.index.query.QueryBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -7,5 +11,7 @@ import java.util.Map;
public interface ISearchService {
public List<Map> searchCar(String deviceCode, String startTime, String endTime,String deviceType) ;
List<EsGpsInfoVO2> queryDistinctDevicesNearPoint(QueryBuilder spatialQuery, String indexName) throws IOException;
}

View File

@ -0,0 +1,172 @@
package org.dromara.location.service.impl;
import org.dromara.location.domain.SpatialQueryRequest;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.geo.builders.*;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.index.query.*;
import org.locationtech.jts.geom.*;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.List;
@Service
public class CorrectGeoQueryService {
private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory();
public QueryBuilder buildSpatialQuery(SpatialQueryRequest request) throws IOException {
switch (request.getQueryType()) {
case POINT:
return buildPointQuery(request);
case LINE:
return buildLineQuery(request);
case POLYGON:
return buildPolygonQuery(request);
default:
throw new IllegalArgumentException("不支持的查询类型: " + request.getQueryType());
}
}
/**
*
*/
private QueryBuilder buildPointQuery(SpatialQueryRequest request) {
if (request.getCenter() == null || request.getRadius() == null) {
throw new IllegalArgumentException("点查询需要center和radius参数");
}
return QueryBuilders.geoDistanceQuery("location")
.point(request.getCenter().getLat(), request.getCenter().getLon())
.distance(request.getRadius(), request.getUnit().getEsUnit());
}
/**
* 线
*/
private QueryBuilder buildLineQuery(SpatialQueryRequest request) throws IOException {
if (request.getLine() == null || request.getLine().size() < 2) {
throw new IllegalArgumentException("线查询需要至少两个点");
}
if (request.getBuffer() == null || request.getBuffer() <= 0) {
throw new IllegalArgumentException("线查询需要有效的缓冲区距离");
}
// 创建JTS线
Coordinate[] coords = new Coordinate[request.getLine().size()];
for (int i = 0; i < request.getLine().size(); i++) {
SpatialQueryRequest.Point p = request.getLine().get(i);
coords[i] = new Coordinate(p.getLon(), p.getLat());
}
LineString jtsLine = GEOMETRY_FACTORY.createLineString(coords);
// 计算缓冲区
double bufferInDegrees = metersToDegrees(request.getBuffer());
Geometry buffer = jtsLine.buffer(bufferInDegrees);
// 创建ES形状
ShapeBuilder shapeBuilder = convertGeometryToEsShape(buffer);
return QueryBuilders.geoShapeQuery("location_shape", shapeBuilder)
.relation(ShapeRelation.INTERSECTS);
}
/**
* - 100%
*/
private QueryBuilder buildPolygonQuery(SpatialQueryRequest request) {
if (request.getPolygon() == null || request.getPolygon().size() < 3) {
throw new IllegalArgumentException("面查询需要至少三个点");
}
// 1. 创建CoordinatesBuilder
CoordinatesBuilder coordsBuilder = new CoordinatesBuilder();
// 2. 添加所有点
for (SpatialQueryRequest.Point p : request.getPolygon()) {
coordsBuilder.coordinate(p.getLon(), p.getLat());
}
// 添加第一个点闭合多边形
coordsBuilder.coordinate(
request.getPolygon().get(0).getLon(),
request.getPolygon().get(0).getLat()
);
// 3. 创建多边形 - 直接使用CoordinatesBuilder
PolygonBuilder polygonBuilder = new PolygonBuilder(coordsBuilder);
// 4. 构建几何对象
try {
return QueryBuilders.geoShapeQuery("location_shape", polygonBuilder.buildGeometry())
.relation(ShapeRelation.INTERSECTS);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
*
*/
private double metersToDegrees(double meters) {
return meters / 111000.0; // 1度 ≈ 111公里
}
/**
* JTS GeometryES ShapeBuilder
*/
private ShapeBuilder convertGeometryToEsShape(Geometry geometry) {
if (geometry instanceof Polygon) {
return convertJtsPolygon((Polygon) geometry);
} else if (geometry instanceof MultiPolygon) {
return convertJtsMultiPolygon((MultiPolygon) geometry);
} else {
throw new IllegalArgumentException("不支持的几何类型: " + geometry.getGeometryType());
}
}
/**
* JTS PolygonES ShapeBuilder - 100%
*/
private ShapeBuilder convertJtsPolygon(Polygon polygon) {
// 1. 创建外环坐标
CoordinatesBuilder shellCoords = new CoordinatesBuilder();
for (Coordinate coord : polygon.getExteriorRing().getCoordinates()) {
shellCoords.coordinate(coord.x, coord.y);
}
// 2. 创建多边形构建器
PolygonBuilder polygonBuilder = new PolygonBuilder(shellCoords);
// 3. 添加内环(孔) - 正确使用LineStringBuilder
for (int i = 0; i < polygon.getNumInteriorRing(); i++) {
// 创建孔洞的坐标
CoordinatesBuilder holeCoords = new CoordinatesBuilder();
for (Coordinate coord : polygon.getInteriorRingN(i).getCoordinates()) {
holeCoords.coordinate(coord.x, coord.y);
}
// 创建LineStringBuilder
LineStringBuilder holeLine = new LineStringBuilder(holeCoords);
// 添加孔洞 - 使用LineStringBuilder
polygonBuilder.hole(holeLine);
}
return polygonBuilder;
}
/**
* JTS MultiPolygonES ShapeBuilder
*/
private ShapeBuilder convertJtsMultiPolygon(MultiPolygon multiPolygon) {
MultiPolygonBuilder builder = new MultiPolygonBuilder();
for (int i = 0; i < multiPolygon.getNumGeometries(); i++) {
Polygon polygon = (Polygon) multiPolygon.getGeometryN(i);
builder.polygon((PolygonBuilder) convertJtsPolygon(polygon));
}
return builder;
}
}

View File

@ -1,20 +1,25 @@
package org.dromara.location.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import org.dromara.location.domain.EsGpsInfoVO2;
import org.dromara.location.service.ISearchService;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
@ -35,7 +40,7 @@ import java.util.function.Consumer;
@Service
public class SearchServiceImpl implements ISearchService {
@Autowired
@Resource(name = "restHighLevelClient")
private RestHighLevelClient restHighLevelClient;
@ -139,6 +144,61 @@ public class SearchServiceImpl implements ISearchService {
}
/**
* deviceCode
*线
* @param spatialQuery 线
* @param indexName
* @return
*/
public List<EsGpsInfoVO2> queryDistinctDevicesNearPoint(QueryBuilder spatialQuery, String indexName) throws IOException {
// 构建聚合
TermsAggregationBuilder aggregation = AggregationBuilders.terms("distinct_devices")
.field("deviceCode.keyword")
.size(100)
.subAggregation(
AggregationBuilders.topHits("latest_record")
.size(1)
.sort("gpsTime", SortOrder.DESC)
);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(spatialQuery)
.aggregation(aggregation)
.size(0);
SearchRequest request = new SearchRequest(indexName)
.source(sourceBuilder);
// 4. 执行查询
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 5. 解析聚合结果
Terms terms = response.getAggregations().get("distinct_devices");
List<EsGpsInfoVO2> results = new ArrayList<>();
for (Terms.Bucket bucket : terms.getBuckets()) {
String deviceCode = bucket.getKeyAsString();
// 获取每个设备的最新记录
TopHits topHits = bucket.getAggregations().get("latest_record");
SearchHit[] hits = topHits.getHits().getHits();
if (hits.length > 0) {
SearchHit latestHit = hits[0];
Map<String, Object> source = latestHit.getSourceAsMap();
EsGpsInfoVO2 gpsInfoVO2 = BeanUtil.copyProperties(source, EsGpsInfoVO2.class);
results.add(gpsInfoVO2);
}
}
return results;
}
// 工具方法
public static <T> Consumer<T> consumerWithIndex(BiConsumer<T, Integer> consumer) {

View File

@ -141,7 +141,15 @@ public class IndexStaticsController extends BaseController {
return R.ok(list);
}
/*
* 线
*
* */
@PostMapping("/dsqkStatics")
public R dsqkStatics(){
List<DeviceStaticsVo> list = redisService.dsStatics();
return R.ok(list);
}

View File

@ -5,6 +5,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serial;
import java.util.Date;
@Data
@TableName("t_device_redis")
@ -21,4 +22,6 @@ public class DeviceRedis {
private String zzjgdm;
private Date gpsTime;
}

View File

@ -19,4 +19,16 @@ public class DeviceStaticsVo implements Serializable {
private Integer onlineCo;
private String deviceType;
private Integer jcco;
private Integer stco;
private Integer jlyco;
private Integer ydjwco;
private Integer qtco;
}

View File

@ -3,6 +3,7 @@ package org.dromara.system.mapper;
import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
import org.dromara.system.domain.DeviceRedis;
import org.dromara.system.domain.vo.DeviceRedisVo;
import org.dromara.system.domain.vo.DeviceStaticsVo;
import java.util.List;
@ -11,4 +12,6 @@ public interface DeviceRedisMapper extends BaseMapperPlus<DeviceRedis,DeviceRedi
int insertBatch(List<DeviceRedis> list);
List<DeviceRedisVo> countByCondition(DeviceRedis redis);
List<DeviceStaticsVo> dsStatics();
}

View File

@ -17,4 +17,6 @@ public interface TDeviceMapper extends BaseMapperPlus<TDevice, TDeviceVo> {
List<DeviceStaticsVo> countByDs();
List<DeviceStaticsVo> countByDsAndType();
}

View File

@ -39,7 +39,7 @@ public class DeviceRedisSchedule {
/*
* Redis online_usert_device_redis
* */
@Scheduled(cron = "0/30 * * * * ?")
@Scheduled(cron = "0 0/5 * * * ?")
public void handleDeviceRedis(){
List<JSONObject> jlist = RedisUtils.searchAndGetKeysValues("online_users:*");
redisService.insertBatch(BeanUtil.copyToList(jlist, DeviceRedis.class));

View File

@ -2,6 +2,7 @@ package org.dromara.system.service;
import org.dromara.system.domain.DeviceRedis;
import org.dromara.system.domain.vo.DeviceRedisVo;
import org.dromara.system.domain.vo.DeviceStaticsVo;
import java.util.List;
@ -9,4 +10,6 @@ public interface IDeviceRedisService {
int insertBatch(List<DeviceRedis> list);
List<DeviceRedisVo> countByCondition(DeviceRedis redis);
List<DeviceStaticsVo> dsStatics();
}

View File

@ -8,7 +8,11 @@ import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.dromara.system.domain.DeviceRedis;
import org.dromara.system.domain.vo.DeviceRedisVo;
import org.dromara.system.domain.vo.DeviceStaticsVo;
import org.dromara.system.domain.vo.SysDictDataVo;
import org.dromara.system.mapper.DeviceRedisMapper;
import org.dromara.system.mapper.SysDictDataMapper;
import org.dromara.system.mapper.TDeviceMapper;
import org.dromara.system.service.IDeviceRedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -23,6 +27,10 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService {
private final DeviceRedisMapper baseMapper;
private final TDeviceMapper deviceMapper;
private final SysDictDataMapper dictDataMapper;
@Autowired
SqlSessionFactory sqlSessionFactory;
@ -61,4 +69,52 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService {
public List<DeviceRedisVo> countByCondition(DeviceRedis redis) {
return baseMapper.countByCondition(redis);
}
@Override
public List<DeviceStaticsVo> dsStatics() {
//1、查询各地市终端总数和在线数
List<DeviceStaticsVo> list = baseMapper.dsStatics();
//2、查询各地市各类终端数量
List<DeviceStaticsVo> dsvo = deviceMapper.countByDsAndType();
//3、查询车辆类型字典
List<SysDictDataVo> dictDataVos = dictDataMapper.selectDictDataByType("zd_device_type");
//4、根据1 2和3的数据匹配出各类终端中文名
for (DeviceStaticsVo vo : list) {
for (DeviceStaticsVo staticsVo : dsvo) {
//如果 vo的机构代码和staticsVo一样 就查询字典值并设置到对应的字段
if (vo.getZzjgdm().equals(staticsVo.getZzjgdm())){
String deviceType = staticsVo.getDeviceType();
for (SysDictDataVo dataVo : dictDataVos) {
if (staticsVo.getDeviceType().equals(dataVo.getDictValue())){ //如果匹配设置值
switch (deviceType){
case "01" :
vo.setJcco(staticsVo.getCo());
break;
case "03" :
vo.setStco(staticsVo.getCo());
break;
case "04" :
vo.setYdjwco(staticsVo.getCo());
break;
case "05" :
vo.setJlyco(staticsVo.getCo());
break;
case "99" :
vo.setQtco(staticsVo.getCo());
break;
default:
vo.setQtco(staticsVo.getCo()+vo.getQtco());
}
}
}
}
}
}
return list;
}
}

View File

@ -345,6 +345,7 @@ public class SysDeptServiceImpl implements ISysDeptService {
LambdaQueryWrapper<SysDept> lqw = new LambdaQueryWrapper<>();
lqw.eq(SysDept::getParentId,"0");
lqw.ne(SysDept::getDeptId,"340000000000");
lqw.orderByAsc(SysDept::getDeptId);
return baseMapper.selectDeptList(lqw);
}

View File

@ -8,16 +8,20 @@
</resultMap>
<resultMap id="deviceStaticsResult" type="org.dromara.system.domain.vo.DeviceStaticsVo">
</resultMap>
<insert id="insertBatch">
insert into t_device_redis (device_code,device_type,online,zzjgdm)
insert into t_device_redis (device_code,device_type,online,zzjgdm,gps_time)
values
<foreach collection="list" item="entity" separator=",">
(
#{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm}
#{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm},#{entity.gpsTime}
)
</foreach>
ON conflict(device_code,device_type) do update set
(online,zzjgdm) =(EXCLUDED.online,EXCLUDED.zzjgdm)
(online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gpsTime)
</insert>
<!-- 全省各类设备总数、在线数 -->
@ -44,4 +48,25 @@
WHERE d.dict_type = 'zd_device_type'
</select>
<!-- 各地市设备总数和在线数 -->
<select id="dsStatics" resultMap="deviceStaticsResult">
SELECT d.dept_id zzjgdm,short_name zzjgmc,COALESCE(td.co,0) co,COALESCE(rd.online,0) online FROM
sys_dept d
LEFT JOIN
(SELECT substr(zzjgdm, 1, 4) dept_id,count(*) co from (SELECT * FROM t_device
WHERE "valid" = 1
) r
GROUP BY substr(zzjgdm,1, 4) HAVING substr(zzjgdm,1,4) is not null ) td
on substr(d.dept_id,1,4) = td.dept_id
LEFT JOIN
(SELECT substr(zzjgdm, 1, 4) dept_id,count(*) online from (SELECT * FROM t_device_redis
WHERE online = '1'
) r
GROUP BY substr(zzjgdm,1, 4) ) rd
on substr(d.dept_id,1,4) = rd.dept_id
WHERE d.parent_id = '0' and d.dept_id != '340000000000' ORDER BY zzjgdm
</select>
</mapper>

View File

@ -12,4 +12,11 @@
SELECT SUBSTR(zzjgdm,1,4) zzjgdm,count(*) co from t_device GROUP BY SUBSTR(zzjgdm,1,4) HAVING SUBSTR(zzjgdm,1,4) is not null
</select>
<select id="countByDsAndType" resultMap="deviceStaticsResult">
SELECT substr(zzjgdm, 1, 4) zzjgdm,device_type,count(*) co from (SELECT * FROM t_device
WHERE "valid" = 1 and zzjgdm like '34%'
) r
GROUP BY substr(zzjgdm,1, 4),device_type HAVING substr(zzjgdm,1,4) is not null ORDER BY zzjgdm
</select>
</mapper>