From bd7fbfbde4f5b3ac9845ae442b7ab7134c456b5f Mon Sep 17 00:00:00 2001 From: luyya Date: Tue, 5 Aug 2025 17:44:48 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9C=81=E5=8E=85=E4=BD=8D=E7=BD=AE=E6=B1=87?= =?UTF-8?q?=E8=81=9A=E7=BB=9F=E8=AE=A1=E3=80=81redis=E5=80=BC=E4=B8=BA?= =?UTF-8?q?=E7=A9=BA=E5=88=A4=E6=96=AD=E3=80=81location=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E7=82=B9=E7=BA=BF=E9=9D=A2=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/redis/utils/RedisUtils.java | 3 + .../dromara/data2es/config/KafkaConfig.java | 155 ------------- .../data2es/config/KafkaProperties.java | 138 ++++++++++++ .../data2es/config/KafkaSecurityUtil.java | 93 -------- .../org/dromara/data2es/config/LoginUtil.java | 8 +- .../data2es/config/RedisListenerConfig.java | 28 ++- .../dubbo/RemoteDataToEsServiceImpl.java | 3 +- .../data2es/handler/RedisExpireListener.java | 132 ++++++++--- .../handler/RedisExpireRecoveryHandler.java | 36 +++ .../data2es/handler/RequestHandler.java | 10 +- .../dromara/data2es/producer/NewProducer.java | 2 +- .../dromara/data2es/producer/Producer.java | 213 ++++++++++++++++++ .../data2es/service/impl/GpsServiceImpl.java | 68 ++++-- .../org/dromara/data2es/util/Base64Utils.java | 44 ++++ .../data2es/util/HwRestClientUtils.java | 80 +++++++ stwzhj-modules/stwzhj-location/pom.xml | 44 ++-- .../location/config/ElasticsearchConfig.java | 6 +- .../org/dromara/location/config/EsConfig.java | 39 ++++ .../controller/ElasticSearchController.java | 47 ++++ .../controller/LocationController.java | 1 - .../dromara/location/domain/EsGpsInfo.java | 36 +++ .../dromara/location/domain/EsGpsInfoVO2.java | 21 ++ .../location/domain/SpatialQueryRequest.java | 53 +++++ .../location/domain/entity/GpsInfoEntity.java | 49 ++++ .../location/service/ISearchService.java | 6 + .../service/impl/CorrectGeoQueryService.java | 172 ++++++++++++++ .../service/impl/SearchServiceImpl.java | 70 +++++- .../system/IndexStaticsController.java | 10 +- .../dromara/system/domain/DeviceRedis.java | 3 + .../system/domain/vo/DeviceStaticsVo.java | 12 + .../system/mapper/DeviceRedisMapper.java | 3 + .../dromara/system/mapper/TDeviceMapper.java | 2 + .../system/schedule/DeviceRedisSchedule.java | 2 +- .../system/service/IDeviceRedisService.java | 3 + .../service/impl/DeviceRedisServiceImpl.java | 56 +++++ .../service/impl/SysDeptServiceImpl.java | 1 + .../mapper/system/DeviceRedisMapper.xml | 31 ++- .../resources/mapper/system/TDeviceMapper.xml | 7 + 38 files changed, 1346 insertions(+), 341 deletions(-) delete mode 100644 stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java create mode 100644 stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaProperties.java delete mode 100644 stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaSecurityUtil.java create mode 100644 stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java create mode 100644 stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/Producer.java create mode 100644 stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/util/Base64Utils.java create mode 100644 stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/util/HwRestClientUtils.java create mode 100644 stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/EsConfig.java create mode 100644 stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/EsGpsInfo.java create mode 100644 stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/EsGpsInfoVO2.java create mode 100644 stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/SpatialQueryRequest.java create mode 100644 stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/entity/GpsInfoEntity.java create mode 100644 stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/CorrectGeoQueryService.java diff --git a/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java b/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java index bff861b8..a7367e4e 100644 --- a/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java +++ b/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java @@ -602,6 +602,9 @@ public class RedisUtils { public static JSONObject getBucket(String key){ RBucket bucket = CLIENT.getBucket(key); Object value = bucket.get(); + if (null == value){ + return null; + } return JSONUtil.parseObj(value.toString()); } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java deleted file mode 100644 index 331a90de..00000000 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java +++ /dev/null @@ -1,155 +0,0 @@ -package org.dromara.data2es.config; - -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.config.SslConfigs; -import org.dromara.data2es.producer.NewProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -import java.io.IOException; -import java.util.Properties; - -/** - *

description:

- * - * @author chenle - * @date 2021-11-03 14:15 - */ -@Component -public class KafkaConfig { - - private Logger logger = LoggerFactory.getLogger(KafkaConfig.class); - - private String kafkaServers = "53.1.212.25:21009,53.1.212.26:21009,53.1.212.27:21009"; //省厅 kafka -// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网 -// private String kafkaServers = "34.72.62.93:9092";//六安视频网 -// private String kafkaServers = "127.0.0.1:9092";//本地 -// private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供 - - private String groupId = "ruansiProducer"; - - - private static final Logger LOG = LoggerFactory.getLogger(NewProducer.class); - - - // Broker地址列表 - private final String bootstrapServers = "bootstrap.servers"; - - // 客户端ID - private final String clientId = "client.id"; - - // Key序列化类 - private final String keySerializer = "key.serializer"; - - // Value序列化类 - private final String valueSerializer = "value.serializer"; - - // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT - private final String securityProtocol = "security.protocol"; - - // 服务名 - private final String saslKerberosServiceName = "sasl.kerberos.service.name"; - - // 域名 - private final String kerberosDomainName = "kerberos.domain.name"; - - //默认发送20条消息 - private final int messageNumToSend = 100; - - /** - * 用户自己申请的机机账号keytab文件名称 - */ - private static final String USER_KEYTAB_FILE = "user.keytab"; - - /** - * 用户自己申请的机机账号名称 - */ - private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"; - - private static final String USER_NAME = "yhy_ahrs_rcw"; - - private static final String PASS_WORD = "Ycgis@2509"; - - /** - * 新Producer 构造函数 - * @param - * @param - */ - - @Bean(name = "myKafkaProducer") - public KafkaProducer newProducer() { - Properties props = new Properties(); - - if (true) - { - try - { - logger.info("Securitymode start."); - //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 -// LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); - props.put(securityProtocol, "SASL_SSL"); - props.put("sasl.mechanism", "PLAIN"); // 使用 PLAIN 机制 - - // SSL 配置 - 使用系统默认信任库 - props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/home/kafka.truststore.jks"); - props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Ycgis@2509"); - props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, "JKS"); - - // PLAIN 机制的 JAAS 配置 - String jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required " - + "username=\"" + USER_NAME + "\" " - + "password=\"" + PASS_WORD + "\";"; - - props.put("sasl.jaas.config", jaasConfig); - } - catch (Exception e) - { - logger.error("Security prepare failure."); - logger.error("The IOException occured.", e); - return null; - } - logger.info("Security prepare success."); - }else{ - props.put(securityProtocol, "PLAINTEXT"); - } - - - - // Broker地址列表 - props.put(bootstrapServers,kafkaServers); - // 客户端ID - props.put(clientId, "ruansiProducer"); - // Key序列化类 - props.put(keySerializer, - "org.apache.kafka.common.serialization.IntegerSerializer"); - // Value序列化类 - props.put(valueSerializer, - "org.apache.kafka.common.serialization.StringSerializer"); - //批量发送信息配置 - props.put("batch.size", 16384); - props.put("linger.ms", 1); - props.put("buffer.memory", 33554432); - // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT - //props.put(securityProtocol, "SASL_PLAINTEXT"); -// // 服务名 -// props.put(saslKerberosServiceName, "kafka"); -// // 域名 -// props.put(kerberosDomainName, "hadoop.hadoop.com"); - //设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition - //props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner"); -// props.put(securityProtocol, "SASL_PLAINTEXT"); -// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";"); -// props.put("sasl.mechanism", "SCRAM-SHA-256"); -// KafkaProducer producer = new KafkaProducer<>(props); - KafkaProducer producer = new KafkaProducer<>(props); - - return producer; - } - - - -} diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaProperties.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaProperties.java new file mode 100644 index 00000000..4a157bfc --- /dev/null +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaProperties.java @@ -0,0 +1,138 @@ +package org.dromara.data2es.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties +{ + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String TOPIC = "jysb_dwxx"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private KafkaProperties() + { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/home/rsoft/config/"; + LOG.info("路径=={}",filePath); + try + { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) + { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) + { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) + { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) + { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } + catch (IOException e) + { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() + { + if (null == instance) + { + instance = new KafkaProperties(); + } + + return instance; + } + + /** + * 获取参数值 + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) + { + String rtValue = null; + + if (null == key) + { + LOG.error("key is null"); + } + else + { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) + { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * @param key + * @return + */ + private String getPropertiesValue(String key) + { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) + { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) + { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) + { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaSecurityUtil.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaSecurityUtil.java deleted file mode 100644 index acbc5468..00000000 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaSecurityUtil.java +++ /dev/null @@ -1,93 +0,0 @@ -package org.dromara.data2es.config; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -/** - *

description:

- * - * @author chenle - * @date 2021-10-28 14:48 - */ -public class KafkaSecurityUtil { - - static Logger logger = LoggerFactory.getLogger(KafkaSecurityUtil.class); - /** - * 用户自己申请的机机账号keytab文件名称 - */ - private static final String USER_KEYTAB_FILE = "user.keytab"; - - /** - * 用户自己申请的机机账号名称 - */ - private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"; - - public static void securityPrepare() throws IOException - { - //logger.error("进入了---securityPrepare"); - //String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; - //String krbFile = filePath + "krb5.conf"; - //ClassPathResource classPathResource = new ClassPathResource("krb5.conf"); - //String krbFile = classPathResource.getAbsolutePath(); - String krbFile = "/rsoft/config/krb5.conf"; -// String userKeyTableFile = filePath + USER_KEYTAB_FILE; - //ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE); - String userKeyTableFile = "/rsoft/config/user.keytab"; - - //windows路径下分隔符替换 - userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); - krbFile = krbFile.replace("\\", "\\\\"); - - LoginUtil.setKrb5Config(krbFile); - LoginUtil.setZookeeperServerPrincipal("zookeeper/A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"); - //logger.error("userKeyTableFile路径---{}",userKeyTableFile); - LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); - } - - public static Boolean isSecurityModel() - { - Boolean isSecurity = false; - //String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; - //ClassPathResource classPathResource = new ClassPathResource("kafkaSecurityMode"); - InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafkaSecurityMode"); - - /*File file = classPathResource.getFile(); - - if(!file.exists()){ - return isSecurity; - }*/ - - Properties securityProps = new Properties(); - - - try - { - securityProps.load(inputStream); - if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) - { - isSecurity = true; - } - } - catch (Exception e) - { - logger.info("The Exception occured : {}.", e); - } - - return isSecurity; - } - - /* - * 判断文件是否存在 - */ - private static boolean isFileExists(String fileName) - { - File file = new File(fileName); - - return file.exists(); - } -} diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/LoginUtil.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/LoginUtil.java index 9f451f53..556f4076 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/LoginUtil.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/LoginUtil.java @@ -1,7 +1,5 @@ package org.dromara.data2es.config; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,7 +205,7 @@ public class LoginUtil { public static void securityPrepare(String principal, String keyTabFile) throws IOException { // String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; - String filePath = "/rsoft/config/"; + String filePath = "/home/rsoft/config/"; String krbFile = filePath + "krb5.conf"; String userKeyTableFile = filePath + keyTabFile; @@ -227,8 +225,8 @@ public class LoginUtil { */ public static Boolean isSecurityModel() { Boolean isSecurity = false; - String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; - +// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; + String krbFilePath = "/home/rsoft/config/kafkaSecurityMode"; Properties securityProps = new Properties(); // file does not exist. diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java index 586af34e..de22c389 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java @@ -1,24 +1,40 @@ package org.dromara.data2es.config; import org.dromara.data2es.handler.RedisExpireListener; +import org.dromara.data2es.handler.RedisExpireRecoveryHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @Configuration public class RedisListenerConfig { @Bean - RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) { - RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); - listenerContainer.setConnectionFactory(connectionFactory); - return listenerContainer; + RedisMessageListenerContainer listenerContainer( + RedisConnectionFactory connectionFactory, + RedisExpireRecoveryHandler recoveryHandler) { + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + + // 添加连接监听器用于故障转移恢复 + container.addMessageListener(recoveryHandler, new PatternTopic("__keyspace@*__:expired")); + return container; } @Bean - KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { - return new RedisExpireListener(listenerContainer); + KeyExpirationEventMessageListener redisKeyExpirationListener( + RedisMessageListenerContainer listenerContainer, + RedisExpireRecoveryHandler recoveryHandler) { + + return new RedisExpireListener(listenerContainer, recoveryHandler); + } + + @Bean + RedisExpireRecoveryHandler redisExpireRecoveryHandler() { + return new RedisExpireRecoveryHandler(); } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/dubbo/RemoteDataToEsServiceImpl.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/dubbo/RemoteDataToEsServiceImpl.java index 716d6600..449f62b4 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/dubbo/RemoteDataToEsServiceImpl.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/dubbo/RemoteDataToEsServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.data2es.dubbo; +import cn.hutool.core.bean.BeanUtil; import lombok.RequiredArgsConstructor; import org.apache.dubbo.config.annotation.DubboService; import org.dromara.common.core.domain.R; @@ -21,6 +22,6 @@ public class RemoteDataToEsServiceImpl implements RemoteDataToEsService { @Override public R saveDataBatch(List gpsInfoList) { - return gpsService.saveDataBatch(MapstructUtils.convert(gpsInfoList, EsGpsInfoVO2.class)); + return gpsService.saveDataBatch(BeanUtil.copyToList(gpsInfoList, EsGpsInfoVO2.class)); } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java index 4430f4ff..82626656 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java @@ -8,59 +8,86 @@ import org.dromara.common.core.utils.RedisConstants; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.controller.DataToEsController; import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.redisson.Redisson; import org.redisson.api.RLock; +import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.redisson.connection.ConnectionListener; +import org.redisson.connection.ConnectionManager; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; -import java.util.Date; +import javax.annotation.PostConstruct; +import java.net.InetSocketAddress; import java.util.Objects; import java.util.concurrent.TimeUnit; -/** - *

description:

- * - * @author chenle - * @date 2021-11-08 16:40 - */ @Component @Slf4j public class RedisExpireListener extends KeyExpirationEventMessageListener { + private final RedisExpireRecoveryHandler recoveryHandler; + @Autowired DataToEsController dataToEsController; + private volatile boolean active = true; - Logger logger = LoggerFactory.getLogger(RedisExpireListener.class); + public RedisExpireListener( + RedisMessageListenerContainer listenerContainer, + RedisExpireRecoveryHandler recoveryHandler) { - - /** - * Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages. - * - * @param listenerContainer must not be {@literal null}. - */ - public RedisExpireListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); + this.recoveryHandler = recoveryHandler; + recoveryHandler.registerListener(this); + } + + @Override + public void init() { + try { + super.init(); + log.info("Redis过期监听器初始化成功"); + } catch (Exception e) { + log.error("监听器初始化失败", e); + } + } + + public void reconnect() { + if (!active) return; + + try { + log.info("尝试重新注册过期事件监听器..."); + // 停止当前监听 + super.destroy(); + // 重新初始化 + super.init(); + log.info("过期事件监听器重新注册成功"); + } catch (Exception e) { + log.error("重新注册监听器失败", e); + } } @Override public void onMessage(Message message, byte[] pattern) { + if (!active) return; + String expireKey = message.toString(); - if(StringUtils.isNotEmpty(expireKey) && - expireKey.startsWith(RedisConstants.ORG_CODE_PRE)){ + log.info("过期的Key={}", expireKey); + + if (StringUtils.isNotEmpty(expireKey) && + expireKey.startsWith(RedisConstants.ORG_CODE_PRE)) { + + log.info("在线定位过期的Key={}", expireKey); handleExpiredEvent(expireKey); } } - private void handleExpiredEvent(String expiredKey) { + private void handleExpiredEvent(String expiredKey) { RedissonClient redisson = RedisUtils.getClient(); RLock lock = redisson.getLock("LOCK:" + expiredKey); + try { if (lock.tryLock(0, 30, TimeUnit.SECONDS)) { // 实际业务逻辑 @@ -68,22 +95,69 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener { String zzjgdm = split[1]; String deviceType = split[2]; String deviceCode = split[3]; - log.error("redis key expired:key={}",expiredKey); - JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS+ zzjgdm +":" + deviceType+":"+deviceCode); - if (Objects.isNull(object)) { - log.info("redis key={},Object=null,deviceType={},deviceCode={}", expiredKey,deviceType,deviceCode); + + if ("5".equals(deviceType) ) { return; } + + log.info("处理过期Key: {}", expiredKey); + JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS +zzjgdm +":"+ deviceType + ":" + deviceCode); + + if (Objects.isNull(object)) { + log.info("redis key={},Object=null,deviceType={},deviceCode={}", + expiredKey, deviceType, deviceCode); + return; + } + EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class); gpsInfo.setOnline(0); dataToEsController.saveGpsInfo(gpsInfo); - log.info("redis key expired:key={}", expiredKey); + log.info("处理完成: key={}", expiredKey); } } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); + log.error("处理过期事件被中断", e); + } catch (Exception e) { + log.error("处理过期事件异常", e); } finally { - lock.unlock(); + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } } } + @Override + public void destroy() { + active = false; + try { + super.destroy(); + } catch (Exception e) { + throw new RuntimeException(e); + } + log.info("Redis过期监听器已停止"); + } + + // 添加连接状态监听(使用Redisson事件总线) + @PostConstruct + public void addSentinelConnectionListener() { + try { + RedissonClient redisson = RedisUtils.getClient(); + + // 订阅Redisson连接事件 + RTopic connectionEvents = redisson.getTopic("__redisson_connection_event"); + connectionEvents.addListener(String.class, (channel, msg) -> { + if ("CONNECTED".equals(msg)) { + log.info("Redis连接已建立: {}", msg); + // 标记需要恢复监听 + recoveryHandler.markReconnected(); + } else if ("DISCONNECTED".equals(msg)) { + log.warn("Redis连接断开: {}", msg); + } + }); + + log.info("已注册Redisson连接事件监听器"); + } catch (Exception e) { + log.warn("无法添加Redisson连接事件监听器", e); + } + } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java new file mode 100644 index 00000000..81bc87a0 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java @@ -0,0 +1,36 @@ +package org.dromara.data2es.handler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +@Component +public class RedisExpireRecoveryHandler implements MessageListener { + + private static final Logger log = LoggerFactory.getLogger(RedisExpireRecoveryHandler.class); + + private final AtomicBoolean reconnected = new AtomicBoolean(false); + private final AtomicReference listenerRef = new AtomicReference<>(); + + public void registerListener(RedisExpireListener listener) { + this.listenerRef.set(listener); + } + + @Override + public void onMessage(Message message, byte[] pattern) { + // 检测到任何事件时,检查是否需要恢复监听 + if (reconnected.compareAndSet(true, false) && listenerRef.get() != null) { + log.warn("检测到Redis事件,尝试重新注册主监听器..."); + listenerRef.get().reconnect(); + } + } + + public void markReconnected() { + reconnected.set(true); + } +} diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java index 438bc8ab..48141846 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java @@ -13,7 +13,7 @@ import org.apache.commons.lang.StringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.domain.EsGpsInfo; import org.dromara.data2es.domain.EsGpsInfoVO2; -import org.dromara.data2es.producer.NewProducer; +import org.dromara.data2es.producer.Producer; import org.dromara.data2es.service.IGpsService; import org.dromara.data2es.util.ConfigConstants; import org.elasticsearch.action.bulk.BulkRequest; @@ -34,7 +34,7 @@ import java.util.concurrent.CompletableFuture; public class RequestHandler { @Autowired - private NewProducer producer; + private Producer producer; @Autowired private RestHighLevelClient restHighLevelClient; @@ -70,12 +70,12 @@ public class RequestHandler { //kafkaProducer.send(esGpsInfo, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+ infoSource); //todo 2023年3月30日 cpu过载暂时隐藏 - - producer.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2)); + logger.info("发送消息topic={}",esGpsInfoVO2); + producer.sendMessage(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2)); //kafkaProducer.send(esGpsInfoVO2, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType); //地市的kafka数据,如接收地市某个设备的数据可以对接此kafka topic //todo 暂时隐藏 - producer.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2)); +// producer.sendMessage(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2)); } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/NewProducer.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/NewProducer.java index 74d996f0..462df090 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/NewProducer.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/NewProducer.java @@ -16,7 +16,7 @@ import javax.annotation.Resource; * @author chenle * @date 2021-11-01 17:20 */ -@Component +//@Component public class NewProducer { @Autowired diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/Producer.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/Producer.java new file mode 100644 index 00000000..d43a67c1 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/Producer.java @@ -0,0 +1,213 @@ +package org.dromara.data2es.producer; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import org.dromara.data2es.config.KafkaProperties; +import org.dromara.data2es.config.LoginUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.Future; + +/** + *

description:

+ * + * @author chenle + * @date 2021-11-03 14:15 + */ +@Component +public class Producer { + + private static final Logger logger = LoggerFactory.getLogger(Producer.class); + + private final KafkaProducer producer; + + // 私有静态实例(volatile 保证可见性和有序性) + private static volatile Producer instance; + + + + private final Boolean isAsync = true; + + + + // Broker地址列表 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "yhy_ahrs_rcw"; + + + + /** + * Producer constructor + * + */ + public Producer() { + initSecurity(); + Properties props = initProperties(); + this.producer = new KafkaProducer<>(props); + } + + // 获取单例实例的公共方法(双重校验锁) + public static Producer getInstance() { + if (instance == null) { + synchronized (Producer.class) { + if (instance == null) { + instance = new Producer(); + } + } + } + return instance; + } + + // 添加 ShutdownHook 确保资源释放(推荐) + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (instance != null && instance.producer != null) { + instance.producer.close(); + } + })); + } + + + /** + * 初始化安全认证 + */ + public void initSecurity() { + if (LoginUtil.isSecurityModel()) + { + try { + logger.info("Securitymode start."); + + // !!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); + } catch (IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured.", e); + } + logger.info("Security prepare success."); + } + } + + public static Properties initProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 +// props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner")); + return props; + } + + /** + * 发送消息(核心方法) + * + * @param topic + * @param message 消息内容 + * @return 同步发送时返回 RecordMetadata,异步发送返回 null + */ + public RecordMetadata sendMessage(String topic, String message) { + try { + logger.info("发送消息topic={},info={}",topic,message); + long startTime = System.currentTimeMillis(); + ProducerRecord record = new ProducerRecord<>(topic, message); + if (isAsync) { + // 异步发送 + producer.send(record, new DemoCallBack(startTime,topic, message)); + return null; + } else { + Future future = producer.send(record); + logger.info("同步发送成功: Object={}", future.get().topic()); + return future.get(); + + } + }catch (Exception e){ + e.printStackTrace(); + } + return null; + } + + + + /** + * 内部回调类 + */ + private static class DemoCallBack implements Callback { + private final Logger logger = LoggerFactory.getLogger(DemoCallBack.class); + private final long startTime; + + private final String topic; + private final String message; + + public DemoCallBack(long startTime, String topic, String message) { + this.startTime = startTime; + this.topic = topic; + this.message = message; + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + long elapsedTime = System.currentTimeMillis() - startTime; + if (metadata != null) { + logger.info("topic=({}, {}) sent to partition({}), offset({}) in {} ms", + topic, message, metadata.partition(), metadata.offset(), elapsedTime); + } else if (exception != null) { + logger.error("Message sending failed", exception); + } + } + } + +} diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java index d2860b42..d2fe27b8 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.io.IOException; import java.time.LocalDate; import java.time.format.DateTimeFormatter; @@ -47,7 +48,7 @@ import java.util.*; @Service public class GpsServiceImpl implements IGpsService { - @Autowired + @Resource(name = "restHighLevelClient") private RestHighLevelClient restHighLevelClient; @DubboReference @@ -112,6 +113,7 @@ public class GpsServiceImpl implements IGpsService { } //设置地市zzjgdm info = getInfoByInfoSource(info); + //redis buildRedisMap(info,onlineUserDataMap,orgCodeDataMap,deleteKeys); @@ -122,9 +124,9 @@ public class GpsServiceImpl implements IGpsService { requestHandler.sendToKafka(info); } - requestHandler.redisOnlineUserBatch(onlineUserDataMap, RedisConstants.REDIS_ONLINE_USER_NEVER_EXPIRE); - requestHandler.redisOnlineUserBatch(orgCodeDataMap, 300); - requestHandler.redisDeleteBatch(deleteKeys); + requestHandler.redisOnlineUserBatch(onlineUserDataMap, 2592000); //存放30天 + requestHandler.redisOnlineUserBatch(orgCodeDataMap, 3600); //此处和buildRedisMap方法判断在线的时间一直 +// requestHandler.redisDeleteBatch(deleteKeys); //此处换成了数据库的模式 所以这个就不用了 requestHandler.esRealBulkSave(bulkRequest); @@ -151,7 +153,7 @@ public class GpsServiceImpl implements IGpsService { esGpsInfoVO2.setPoliceNo(vo.getPoliceNo()); esGpsInfoVO2.setCarNum(vo.getCarNum()); String deviceType = vo.getDeviceType(); - if(StringUtils.isNotBlank(deviceType)){ + /*if(StringUtils.isNotBlank(deviceType)){ deviceType = deviceType.replaceAll("\"", ""); if(deviceType.charAt(0) == '0' && deviceType.length() > 1){ deviceType = deviceType.substring(1); @@ -159,13 +161,37 @@ public class GpsServiceImpl implements IGpsService { deviceType = "2"; } } - } + }*/ esGpsInfoVO2.setDeviceType(deviceType); }else { esGpsInfoVO2.setDeviceType("99"); esGpsInfoVO2.setZzjgdm(esGpsInfo.getInfoSource()+"00000000"); } + }else { + String deviceType = esGpsInfoVO2.getDeviceType(); + switch (deviceType){ + case "1" : + esGpsInfoVO2.setDeviceType("03"); + break; + case "2" : + esGpsInfoVO2.setDeviceType("01"); + break; + case "3" : + esGpsInfoVO2.setDeviceType("03"); + break; + case "4" : + esGpsInfoVO2.setDeviceType("04"); + break; + case "5" : + esGpsInfoVO2.setDeviceType("05"); + break; + case "8" : + esGpsInfoVO2.setDeviceType("01"); + break; + default: + esGpsInfoVO2.setDeviceType("99"); + } } @@ -207,6 +233,7 @@ public class GpsServiceImpl implements IGpsService { return todayIndexName; } + //存入数据到索引 private IndexRequest getIndexRequest(String indexName, GpsInfoEntity gpsInfoEntity) { Date gpsTime = gpsInfoEntity.getGpsTime(); @@ -215,6 +242,14 @@ public class GpsServiceImpl implements IGpsService { gpsInfoEntity.setGpsTime(dateTime.toJdkDate()); } Map map = BeanUtil.beanToMap(gpsInfoEntity); + + Map geoShape = new HashMap<>(); + geoShape.put("type", "Point"); // 注意首字母大写! + geoShape.put("coordinates", Arrays.asList( + gpsInfoEntity.getLocation()[1], // 经度 (X) + gpsInfoEntity.getLocation()[0] // 纬度 (Y) + )); + map.put("location_shape", geoShape); // geo_shape格式 UUID uuid = UUID.randomUUID(); gpsInfoEntity.setId(uuid.toString()); IndexRequest indexRequest = new IndexRequest(indexName,"_doc",uuid.toString()).source(map); @@ -232,7 +267,10 @@ public class GpsServiceImpl implements IGpsService { boolean exists = false; try { exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT); - } catch (IOException e) { + } catch (Exception e) { + if (e.getMessage().contains("403 Forbidden")){ + return false; + } e.printStackTrace(); } // logger.error("exists,{}",exists); @@ -296,13 +334,7 @@ public class GpsServiceImpl implements IGpsService { //2、计算某个类型的 keys *:deviceType:*,计算某个市局某个类型 keys org_code:3401*:2* String orgCodeKey = RedisConstants.ORG_CODE_PRE + zzjgdm + ":" + deviceType + ":" + deviceCode; - - if(esGpsInfoVo2.getOnline() == 1) { - orgCodeDataMap.put(orgCodeKey, jsonValue); - }else{ - deleteKeys.add(orgCodeKey); //离线的删除 - } - + orgCodeDataMap.put(orgCodeKey, jsonValue); } } } @@ -319,12 +351,18 @@ public class GpsServiceImpl implements IGpsService { .startObject("deviceCode") .field("type", "text") .endObject() + .startObject("infoSource") + .field("type", "text") + .endObject() .startObject("deviceType") .field("type", "text") .endObject() .startObject("location") .field("type", "geo_point") .endObject() + .startObject("location_shape") + .field("type", "geo_shape") + .endObject() .startObject("orientation") .field("type", "text") .endObject() @@ -349,7 +387,7 @@ public class GpsServiceImpl implements IGpsService { .put("index.number_of_replicas", 1)); logger.error("generateMappingRequest-index创建成功"); - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); logger.error("generateMappingRequest-index创建失败"); } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/util/Base64Utils.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/util/Base64Utils.java new file mode 100644 index 00000000..da1a0505 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/util/Base64Utils.java @@ -0,0 +1,44 @@ +package org.dromara.data2es.util; + +import org.apache.commons.codec.binary.Base64; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * base64编码工具 + * + * @since 2020-09-30 + */ +public class Base64Utils { + private static final Logger LOG = LogManager.getLogger(Base64Utils.class); + + public static void main(String[] args) { + if (args != null + && args.length >= 1 + && args[0] != null + && !args[0].isEmpty()) { + System.out.println(encodeBase64(args[0])); + } + } + + /** + * base64加密 + * + * @param needEncodeString 需要加密的内容 + * @return 加密结果 + */ + private static String encodeBase64(String needEncodeString) { + return Base64.encodeBase64String(needEncodeString.getBytes()); + } + + /** + * base64解密 + * + * @param needDecodeBase64Str 需要解密的内容 + * @return 解密结果 + */ + private static String decodeBase64(String needDecodeBase64Str) { + byte[] result = Base64.decodeBase64(needDecodeBase64Str.getBytes()); + return new String(result); + } +} diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/util/HwRestClientUtils.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/util/HwRestClientUtils.java new file mode 100644 index 00000000..54d8335b --- /dev/null +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/util/HwRestClientUtils.java @@ -0,0 +1,80 @@ +package org.dromara.data2es.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.hwclient.HwRestClient; + +import java.io.File; +import java.io.IOException; + +/** + * 客户端工具 + * + * @since 2020-09-30 + */ +public class HwRestClientUtils { + private static final Logger LOG = LogManager.getLogger(HwRestClientUtils.class); + /** + * 配置文件路径位置 + */ + private static final int CONFIG_PATH_ARGUMENT_INDEX = 0; + + /** + * 获取HwRestClient + * + * @param args 配置参数 + * @return HwRestClient + */ + public static HwRestClient getHwRestClient(String[] args) { + HwRestClient hwRestClient; + if (args == null + || args.length < 1 + || args[CONFIG_PATH_ARGUMENT_INDEX] == null + || args[CONFIG_PATH_ARGUMENT_INDEX].isEmpty()) { + hwRestClient = new HwRestClient(); + } else { + String configPath = args[CONFIG_PATH_ARGUMENT_INDEX]; + File configFile = new File(configPath); + if (configFile.exists()) { + if (configFile.isDirectory()) { + hwRestClient = new HwRestClient(configPath); + } else { + try { + hwRestClient = + new HwRestClient( + configFile + .getCanonicalPath() + .substring( + 0, + configFile.getCanonicalPath().lastIndexOf(File.separator) + 1), + configFile.getName()); + } catch (IOException e) { + hwRestClient = new HwRestClient(); + } + } + } else { + hwRestClient = new HwRestClient(); + } + } + return hwRestClient; + } + + /** + * high level 客户端,判断索引是否存在 + * + * @param highLevelClient high level 客户端 + * @return 索引是否存在 + */ + public static boolean isExistIndexForHighLevel(RestHighLevelClient highLevelClient, String indexName) { + GetIndexRequest isExistsRequest = new GetIndexRequest(indexName); + try { + return highLevelClient.indices().exists(isExistsRequest, RequestOptions.DEFAULT); + } catch (IOException e) { + LOG.error("Judge index exist {} failed", indexName, e); + } + return false; + } +} diff --git a/stwzhj-modules/stwzhj-location/pom.xml b/stwzhj-modules/stwzhj-location/pom.xml index 17f45728..536bb98f 100644 --- a/stwzhj-modules/stwzhj-location/pom.xml +++ b/stwzhj-modules/stwzhj-location/pom.xml @@ -104,38 +104,48 @@ stwzhj-api-resource - - org.elasticsearch - elasticsearch - 7.14.0 - - - log4j-api - org.apache.logging.log4j - - - org.elasticsearch.client elasticsearch-rest-client - 7.14.0 + 7.10.2-h0.cbu.mrs.350.r11 org.elasticsearch.client elasticsearch-rest-high-level-client - 7.14.0 + 7.10.2-h0.cbu.mrs.350.r11 - org.elasticsearch - elasticsearch + org.elasticsearch.plugin + parent-join-client - elasticsearch-rest-client - org.elasticsearch.client + org.elasticsearch.plugin + aggs-matrix-stats-client + + org.elasticsearch + elasticsearch + 7.10.2-h0.cbu.mrs.350.r11 + + + + + + org.locationtech.jts + jts-core + 1.18.2 + + + + + org.locationtech.spatial4j + spatial4j + 0.8 + + org.springframework.kafka diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/ElasticsearchConfig.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/ElasticsearchConfig.java index 5012741c..0d0f185f 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/ElasticsearchConfig.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/ElasticsearchConfig.java @@ -19,9 +19,9 @@ import java.util.List; * restHighLevelClient 客户端配置类 */ @Slf4j -@Data -@Configuration -@ConfigurationProperties(prefix = "elasticsearch") +//@Data +//@Configuration +//@ConfigurationProperties(prefix = "elasticsearch") public class ElasticsearchConfig { // es host ip 地址(集群) diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/EsConfig.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/EsConfig.java new file mode 100644 index 00000000..cf4324bc --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/EsConfig.java @@ -0,0 +1,39 @@ +package org.dromara.location.config; + +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.hwclient.HwRestClient; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +/** + *

description:

+ * + * @author chenle + * @date 2021-07-05 18:22 + */ +@Component(value = "esConfig") +public class EsConfig { + + private String prefix = "gpsinfo"; + + public String indexNameByDay(){ + return prefix+ LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + } + + @Bean(destroyMethod = "close",name = "restHighLevelClient") + public RestHighLevelClient restClient() { +// String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator; + String configPath = "/home/rsoft/config/"; + +// KAFKA("KafkaClient"), ZOOKEEPER("Client"); +// GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient"); +// GenerateEnumUtil.addEnum(LoginUtil.Module.class,"ZOOKEEPER","Client"); + HwRestClient hwRestClient = new HwRestClient(configPath); + RestHighLevelClient highLevelClient = new RestHighLevelClient(hwRestClient.getRestClientBuilder()); + return highLevelClient; + } + +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/ElasticSearchController.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/ElasticSearchController.java index 1fe047e1..c3a2effa 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/ElasticSearchController.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/ElasticSearchController.java @@ -4,12 +4,21 @@ package org.dromara.location.controller; import lombok.RequiredArgsConstructor; import org.dromara.common.core.domain.R; import org.dromara.common.web.core.BaseController; +import org.dromara.location.domain.EsGpsInfoVO2; +import org.dromara.location.domain.SpatialQueryRequest; import org.dromara.location.service.ISearchService; +import org.dromara.location.service.impl.CorrectGeoQueryService; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.springframework.util.CollectionUtils; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; @@ -20,6 +29,8 @@ public class ElasticSearchController extends BaseController { private final ISearchService searchService; + private final CorrectGeoQueryService geoQueryService; + @RequestMapping("/searchCar") public R searchByType(@RequestBody Map params){ //String startTime,String endTime,String deviceId @@ -39,4 +50,40 @@ public class ElasticSearchController extends BaseController { return R.ok(gpsInfoEntities); } + /* + * 点周边查询 + * */ + @PostMapping("/spatial-query") + public R spatialQuery(@RequestBody SpatialQueryRequest request) { + String todayIndexName = "rs_gpsinfo"+ LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + try { + long startTime = System.currentTimeMillis(); + + // 1. 构建空间查询 + QueryBuilder spatialQuery = geoQueryService.buildSpatialQuery(request); + + // 2. 添加时间范围过滤 + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .filter(spatialQuery); + + if (request.getStartTime() != null && request.getEndTime() != null) { + boolQuery.filter(QueryBuilders.rangeQuery("gpsTime") + .gte(request.getStartTime()) + .lte(request.getEndTime())); + } + + // 3. 执行轨迹查询(按设备去重) + List results = searchService.queryDistinctDevicesNearPoint(boolQuery,todayIndexName); + + // 4. 构建响应 + + return R.ok(results); + + } catch (IllegalArgumentException e) { + return R.fail(e.getMessage()); + } catch (Exception e) { + return R.fail("服务器内部错误"); + } + } + } diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/LocationController.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/LocationController.java index ff53efa7..c55dbf3f 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/LocationController.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/LocationController.java @@ -40,7 +40,6 @@ public class LocationController { * */ @PostMapping("/getAllLocation") public R getAllLocaltion(@RequestBody Map params){ - String now = DateUtil.format(new Date(),"YYYY-MM-dd"); String keys = "online_users:"; String key = null; // 在不同条件下赋值 最后根据此key取值 if(CollectionUtils.isEmpty(params)){ diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/EsGpsInfo.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/EsGpsInfo.java new file mode 100644 index 00000000..bde0e4dd --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/EsGpsInfo.java @@ -0,0 +1,36 @@ +package org.dromara.location.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +@Data +public class EsGpsInfo implements Serializable { + private static final long serialVersionUID = 6429544067398830194L; + + /** + * 设备串号,设备唯一值 + */ + private String deviceCode; + private String deviceType; + private String lat; + private String lng; + //方向 + private String orientation; + //高程 + private String height; + //精度 + private String deltaH; + private String speed; + + @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + private Date gpsTime; + //3401 ,3402 地市代码 + private String infoSource; + + private Integer online; + + +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/EsGpsInfoVO2.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/EsGpsInfoVO2.java new file mode 100644 index 00000000..17468335 --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/EsGpsInfoVO2.java @@ -0,0 +1,21 @@ +package org.dromara.location.domain; + +import lombok.Data; + +/** + *

description:

+ * + * @author chenle + * @date 2021-10-11 15:14 + */ +@Data +public class EsGpsInfoVO2 extends EsGpsInfo { + private static final long serialVersionUID = -4252583194984423318L; + + private String zzjgdm; + private String zzjgmc; + private String policeNo; + private String policeName; + private String phoneNum; + private String carNum; +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/SpatialQueryRequest.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/SpatialQueryRequest.java new file mode 100644 index 00000000..d502b277 --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/SpatialQueryRequest.java @@ -0,0 +1,53 @@ +package org.dromara.location.domain; + +import lombok.Data; +import java.util.List; + +@Data +public class SpatialQueryRequest { + public enum QueryType { POINT, LINE, POLYGON } + + private QueryType queryType; + + // 点查询参数 + private Point center; + private Double radius; + private DistanceUnit unit = DistanceUnit.KILOMETERS; + + // 线查询参数 + private List line; + private Double buffer; // 缓冲区距离(单位:米) + + // 面查询参数 + private List polygon; + + // 时间范围过滤(可选) + private String startTime; + private String endTime; + + // 分页参数(可选) + private Integer page = 0; + private Integer size = 100; + + @Data + public static class Point { + private double lon; + private double lat; + } + + public enum DistanceUnit { + METERS(org.elasticsearch.common.unit.DistanceUnit.METERS), + KILOMETERS(org.elasticsearch.common.unit.DistanceUnit.KILOMETERS), + MILES(org.elasticsearch.common.unit.DistanceUnit.MILES); + + private final org.elasticsearch.common.unit.DistanceUnit esUnit; + + DistanceUnit(org.elasticsearch.common.unit.DistanceUnit esUnit) { + this.esUnit = esUnit; + } + + public org.elasticsearch.common.unit.DistanceUnit getEsUnit() { + return esUnit; + } + } +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/entity/GpsInfoEntity.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/entity/GpsInfoEntity.java new file mode 100644 index 00000000..e6c9faac --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/domain/entity/GpsInfoEntity.java @@ -0,0 +1,49 @@ +package org.dromara.location.domain.entity; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; +import org.springframework.data.annotation.Id; + +import java.io.Serializable; +import java.util.Date; + +/** + *

description:

+ * gps定位信息(es表) + * @author chenle + * @date 2021-05-14 9:39 + */ +@Data +//@Document(indexName = "#{esConfig.indexNameByDay()}" ,shards = 3,replicas = 1,createIndex = false) +public class GpsInfoEntity implements Serializable { + + private static final long serialVersionUID = 7233463305371277306L; + @Id + private String id; + + private String deviceCode; + /** + * 设备串号,设备唯一值 + */ + private String deviceType; + + + + private Double[] location; + + //方向 + private String orientation; + //高程 + private String height; + //精度 + private String deltaH; + private String speed; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date gpsTime; + + //地市代码 3401,3402 + private String infoSource; + + +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/ISearchService.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/ISearchService.java index 55915b5d..b57e17b1 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/ISearchService.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/ISearchService.java @@ -1,5 +1,9 @@ package org.dromara.location.service; +import org.dromara.location.domain.EsGpsInfoVO2; +import org.elasticsearch.index.query.QueryBuilder; + +import java.io.IOException; import java.util.List; import java.util.Map; @@ -7,5 +11,7 @@ import java.util.Map; public interface ISearchService { public List searchCar(String deviceCode, String startTime, String endTime,String deviceType) ; + List queryDistinctDevicesNearPoint(QueryBuilder spatialQuery, String indexName) throws IOException; + } diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/CorrectGeoQueryService.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/CorrectGeoQueryService.java new file mode 100644 index 00000000..0a5d7ca8 --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/CorrectGeoQueryService.java @@ -0,0 +1,172 @@ +package org.dromara.location.service.impl; + +import org.dromara.location.domain.SpatialQueryRequest; +import org.elasticsearch.common.geo.ShapeRelation; +import org.elasticsearch.common.geo.builders.*; +import org.elasticsearch.common.unit.DistanceUnit; +import org.elasticsearch.index.query.*; +import org.locationtech.jts.geom.*; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.util.List; + +@Service +public class CorrectGeoQueryService { + + private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(); + + public QueryBuilder buildSpatialQuery(SpatialQueryRequest request) throws IOException { + switch (request.getQueryType()) { + case POINT: + return buildPointQuery(request); + case LINE: + return buildLineQuery(request); + case POLYGON: + return buildPolygonQuery(request); + default: + throw new IllegalArgumentException("不支持的查询类型: " + request.getQueryType()); + } + } + + /** + * 点查询(圆形区域) + */ + private QueryBuilder buildPointQuery(SpatialQueryRequest request) { + if (request.getCenter() == null || request.getRadius() == null) { + throw new IllegalArgumentException("点查询需要center和radius参数"); + } + + return QueryBuilders.geoDistanceQuery("location") + .point(request.getCenter().getLat(), request.getCenter().getLon()) + .distance(request.getRadius(), request.getUnit().getEsUnit()); + } + + /** + * 线查询(带缓冲区) + */ + private QueryBuilder buildLineQuery(SpatialQueryRequest request) throws IOException { + if (request.getLine() == null || request.getLine().size() < 2) { + throw new IllegalArgumentException("线查询需要至少两个点"); + } + if (request.getBuffer() == null || request.getBuffer() <= 0) { + throw new IllegalArgumentException("线查询需要有效的缓冲区距离"); + } + + // 创建JTS线 + Coordinate[] coords = new Coordinate[request.getLine().size()]; + for (int i = 0; i < request.getLine().size(); i++) { + SpatialQueryRequest.Point p = request.getLine().get(i); + coords[i] = new Coordinate(p.getLon(), p.getLat()); + } + LineString jtsLine = GEOMETRY_FACTORY.createLineString(coords); + + // 计算缓冲区 + double bufferInDegrees = metersToDegrees(request.getBuffer()); + Geometry buffer = jtsLine.buffer(bufferInDegrees); + + // 创建ES形状 + ShapeBuilder shapeBuilder = convertGeometryToEsShape(buffer); + + return QueryBuilders.geoShapeQuery("location_shape", shapeBuilder) + .relation(ShapeRelation.INTERSECTS); + } + + /** + * 面查询 - 100%正确的实现 + */ + private QueryBuilder buildPolygonQuery(SpatialQueryRequest request) { + if (request.getPolygon() == null || request.getPolygon().size() < 3) { + throw new IllegalArgumentException("面查询需要至少三个点"); + } + + // 1. 创建CoordinatesBuilder + CoordinatesBuilder coordsBuilder = new CoordinatesBuilder(); + + // 2. 添加所有点 + for (SpatialQueryRequest.Point p : request.getPolygon()) { + coordsBuilder.coordinate(p.getLon(), p.getLat()); + } + // 添加第一个点闭合多边形 + coordsBuilder.coordinate( + request.getPolygon().get(0).getLon(), + request.getPolygon().get(0).getLat() + ); + + // 3. 创建多边形 - 直接使用CoordinatesBuilder + PolygonBuilder polygonBuilder = new PolygonBuilder(coordsBuilder); + + // 4. 构建几何对象 + try { + return QueryBuilders.geoShapeQuery("location_shape", polygonBuilder.buildGeometry()) + .relation(ShapeRelation.INTERSECTS); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * 将米转换为近似度数 + */ + private double metersToDegrees(double meters) { + return meters / 111000.0; // 1度 ≈ 111公里 + } + + /** + * 将JTS Geometry转换为ES ShapeBuilder + */ + private ShapeBuilder convertGeometryToEsShape(Geometry geometry) { + if (geometry instanceof Polygon) { + return convertJtsPolygon((Polygon) geometry); + } else if (geometry instanceof MultiPolygon) { + return convertJtsMultiPolygon((MultiPolygon) geometry); + } else { + throw new IllegalArgumentException("不支持的几何类型: " + geometry.getGeometryType()); + } + } + + /** + * 将JTS Polygon转换为ES ShapeBuilder - 100%正确的实现 + */ + private ShapeBuilder convertJtsPolygon(Polygon polygon) { + // 1. 创建外环坐标 + CoordinatesBuilder shellCoords = new CoordinatesBuilder(); + for (Coordinate coord : polygon.getExteriorRing().getCoordinates()) { + shellCoords.coordinate(coord.x, coord.y); + } + + // 2. 创建多边形构建器 + PolygonBuilder polygonBuilder = new PolygonBuilder(shellCoords); + + // 3. 添加内环(孔) - 正确使用LineStringBuilder + for (int i = 0; i < polygon.getNumInteriorRing(); i++) { + // 创建孔洞的坐标 + CoordinatesBuilder holeCoords = new CoordinatesBuilder(); + for (Coordinate coord : polygon.getInteriorRingN(i).getCoordinates()) { + holeCoords.coordinate(coord.x, coord.y); + } + + // 创建LineStringBuilder + LineStringBuilder holeLine = new LineStringBuilder(holeCoords); + + // 添加孔洞 - 使用LineStringBuilder + polygonBuilder.hole(holeLine); + } + + return polygonBuilder; + } + + /** + * 将JTS MultiPolygon转换为ES ShapeBuilder + */ + private ShapeBuilder convertJtsMultiPolygon(MultiPolygon multiPolygon) { + MultiPolygonBuilder builder = new MultiPolygonBuilder(); + + for (int i = 0; i < multiPolygon.getNumGeometries(); i++) { + Polygon polygon = (Polygon) multiPolygon.getGeometryN(i); + builder.polygon((PolygonBuilder) convertJtsPolygon(polygon)); + } + + return builder; + } +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java index 2e60d742..5fe4bc1b 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java @@ -1,20 +1,25 @@ package org.dromara.location.service.impl; +import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.date.DateField; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import lombok.RequiredArgsConstructor; +import org.dromara.location.domain.EsGpsInfoVO2; import org.dromara.location.service.ISearchService; import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.common.unit.DistanceUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.*; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.TopHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.springframework.beans.factory.annotation.Autowired; @@ -35,7 +40,7 @@ import java.util.function.Consumer; @Service public class SearchServiceImpl implements ISearchService { - @Autowired + @Resource(name = "restHighLevelClient") private RestHighLevelClient restHighLevelClient; @@ -139,6 +144,61 @@ public class SearchServiceImpl implements ISearchService { } + /** + * 查询周边设备的最新轨迹点(根据deviceCode去重) + *点线面 + * @param spatialQuery 点线面构造查询 + * @param indexName 索引名称 + * @return 去重后的设备列表(每个设备只返回最新的一条记录) + */ + public List queryDistinctDevicesNearPoint(QueryBuilder spatialQuery, String indexName) throws IOException { + + // 构建聚合 + TermsAggregationBuilder aggregation = AggregationBuilders.terms("distinct_devices") + .field("deviceCode.keyword") + .size(100) + .subAggregation( + AggregationBuilders.topHits("latest_record") + .size(1) + .sort("gpsTime", SortOrder.DESC) + ); + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() + .query(spatialQuery) + .aggregation(aggregation) + .size(0); + + SearchRequest request = new SearchRequest(indexName) + .source(sourceBuilder); + + // 4. 执行查询 + SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); + + // 5. 解析聚合结果 + Terms terms = response.getAggregations().get("distinct_devices"); + List results = new ArrayList<>(); + + for (Terms.Bucket bucket : terms.getBuckets()) { + String deviceCode = bucket.getKeyAsString(); + + // 获取每个设备的最新记录 + TopHits topHits = bucket.getAggregations().get("latest_record"); + SearchHit[] hits = topHits.getHits().getHits(); + + if (hits.length > 0) { + SearchHit latestHit = hits[0]; + Map source = latestHit.getSourceAsMap(); + + EsGpsInfoVO2 gpsInfoVO2 = BeanUtil.copyProperties(source, EsGpsInfoVO2.class); + + results.add(gpsInfoVO2); + } + } + + return results; + } + + // 工具方法 public static Consumer consumerWithIndex(BiConsumer consumer) { diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java index 9d28f3ff..e70fdf82 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java @@ -141,7 +141,15 @@ public class IndexStaticsController extends BaseController { return R.ok(list); } - + /* + * 各地市总数、在线数和各类终端数量 + * + * */ + @PostMapping("/dsqkStatics") + public R dsqkStatics(){ + List list = redisService.dsStatics(); + return R.ok(list); + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/DeviceRedis.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/DeviceRedis.java index ccdc6e6c..542a6acb 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/DeviceRedis.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/DeviceRedis.java @@ -5,6 +5,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import java.io.Serial; +import java.util.Date; @Data @TableName("t_device_redis") @@ -21,4 +22,6 @@ public class DeviceRedis { private String zzjgdm; + private Date gpsTime; + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/DeviceStaticsVo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/DeviceStaticsVo.java index 066241a9..15821915 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/DeviceStaticsVo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/DeviceStaticsVo.java @@ -19,4 +19,16 @@ public class DeviceStaticsVo implements Serializable { private Integer onlineCo; + private String deviceType; + + private Integer jcco; + + private Integer stco; + + private Integer jlyco; + + private Integer ydjwco; + + private Integer qtco; + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/DeviceRedisMapper.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/DeviceRedisMapper.java index 308e93d8..a95a3902 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/DeviceRedisMapper.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/DeviceRedisMapper.java @@ -3,6 +3,7 @@ package org.dromara.system.mapper; import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; import org.dromara.system.domain.DeviceRedis; import org.dromara.system.domain.vo.DeviceRedisVo; +import org.dromara.system.domain.vo.DeviceStaticsVo; import java.util.List; @@ -11,4 +12,6 @@ public interface DeviceRedisMapper extends BaseMapperPlus list); List countByCondition(DeviceRedis redis); + + List dsStatics(); } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/TDeviceMapper.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/TDeviceMapper.java index 591e203c..fbd0aac4 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/TDeviceMapper.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/TDeviceMapper.java @@ -17,4 +17,6 @@ public interface TDeviceMapper extends BaseMapperPlus { List countByDs(); + List countByDsAndType(); + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java index 5ad6118b..802b2531 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java @@ -39,7 +39,7 @@ public class DeviceRedisSchedule { /* * 把Redis中 online_user数据存入t_device_redis表中 * */ - @Scheduled(cron = "0/30 * * * * ?") + @Scheduled(cron = "0 0/5 * * * ?") public void handleDeviceRedis(){ List jlist = RedisUtils.searchAndGetKeysValues("online_users:*"); redisService.insertBatch(BeanUtil.copyToList(jlist, DeviceRedis.class)); diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/IDeviceRedisService.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/IDeviceRedisService.java index 8870892c..efc527b8 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/IDeviceRedisService.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/IDeviceRedisService.java @@ -2,6 +2,7 @@ package org.dromara.system.service; import org.dromara.system.domain.DeviceRedis; import org.dromara.system.domain.vo.DeviceRedisVo; +import org.dromara.system.domain.vo.DeviceStaticsVo; import java.util.List; @@ -9,4 +10,6 @@ public interface IDeviceRedisService { int insertBatch(List list); List countByCondition(DeviceRedis redis); + + List dsStatics(); } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java index a66adf19..bca2366e 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java @@ -8,7 +8,11 @@ import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.dromara.system.domain.DeviceRedis; import org.dromara.system.domain.vo.DeviceRedisVo; +import org.dromara.system.domain.vo.DeviceStaticsVo; +import org.dromara.system.domain.vo.SysDictDataVo; import org.dromara.system.mapper.DeviceRedisMapper; +import org.dromara.system.mapper.SysDictDataMapper; +import org.dromara.system.mapper.TDeviceMapper; import org.dromara.system.service.IDeviceRedisService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -23,6 +27,10 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService { private final DeviceRedisMapper baseMapper; + private final TDeviceMapper deviceMapper; + + private final SysDictDataMapper dictDataMapper; + @Autowired SqlSessionFactory sqlSessionFactory; @@ -61,4 +69,52 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService { public List countByCondition(DeviceRedis redis) { return baseMapper.countByCondition(redis); } + + @Override + public List dsStatics() { + //1、查询各地市终端总数和在线数 + List list = baseMapper.dsStatics(); + //2、查询各地市各类终端数量 + List dsvo = deviceMapper.countByDsAndType(); + //3、查询车辆类型字典 + List dictDataVos = dictDataMapper.selectDictDataByType("zd_device_type"); + //4、根据1 2和3的数据匹配出各类终端中文名 + for (DeviceStaticsVo vo : list) { + for (DeviceStaticsVo staticsVo : dsvo) { + //如果 vo的机构代码和staticsVo一样 就查询字典值并设置到对应的字段 + if (vo.getZzjgdm().equals(staticsVo.getZzjgdm())){ + String deviceType = staticsVo.getDeviceType(); + for (SysDictDataVo dataVo : dictDataVos) { + if (staticsVo.getDeviceType().equals(dataVo.getDictValue())){ //如果匹配设置值 + switch (deviceType){ + case "01" : + vo.setJcco(staticsVo.getCo()); + break; + case "03" : + vo.setStco(staticsVo.getCo()); + break; + case "04" : + vo.setYdjwco(staticsVo.getCo()); + break; + case "05" : + vo.setJlyco(staticsVo.getCo()); + break; + case "99" : + vo.setQtco(staticsVo.getCo()); + break; + default: + vo.setQtco(staticsVo.getCo()+vo.getQtco()); + } + + } + } + } + } + } + return list; + } + + + + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java index 07cc150e..05786975 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java @@ -345,6 +345,7 @@ public class SysDeptServiceImpl implements ISysDeptService { LambdaQueryWrapper lqw = new LambdaQueryWrapper<>(); lqw.eq(SysDept::getParentId,"0"); lqw.ne(SysDept::getDeptId,"340000000000"); + lqw.orderByAsc(SysDept::getDeptId); return baseMapper.selectDeptList(lqw); } diff --git a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml index 79d7d2ed..2e7ba5b1 100644 --- a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml +++ b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml @@ -8,16 +8,20 @@ + + + + - insert into t_device_redis (device_code,device_type,online,zzjgdm) + insert into t_device_redis (device_code,device_type,online,zzjgdm,gps_time) values ( - #{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm} + #{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm},#{entity.gpsTime} ) ON conflict(device_code,device_type) do update set - (online,zzjgdm) =(EXCLUDED.online,EXCLUDED.zzjgdm) + (online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gpsTime) @@ -44,4 +48,25 @@ WHERE d.dict_type = 'zd_device_type' + + + + diff --git a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml index 3e8426d0..bd50034b 100644 --- a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml +++ b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml @@ -12,4 +12,11 @@ SELECT SUBSTR(zzjgdm,1,4) zzjgdm,count(*) co from t_device GROUP BY SUBSTR(zzjgdm,1,4) HAVING SUBSTR(zzjgdm,1,4) is not null + +