diff --git a/pom.xml b/pom.xml index 407524fe..7ce73d73 100644 --- a/pom.xml +++ b/pom.xml @@ -89,12 +89,12 @@ prod prod - 127.0.0.1:8848 + 53.16.17.13:8848 DEFAULT_GROUP DEFAULT_GROUP nacos nacos - 127.0.0.1:4560 + 53.16.17.13:4560 diff --git a/stwzhj-api/stwzhj-api-data2es/src/main/java/org/dromara/data2es/api/domain/RemoteGpsInfo.java b/stwzhj-api/stwzhj-api-data2es/src/main/java/org/dromara/data2es/api/domain/RemoteGpsInfo.java index 598c0a2f..a1ed9279 100644 --- a/stwzhj-api/stwzhj-api-data2es/src/main/java/org/dromara/data2es/api/domain/RemoteGpsInfo.java +++ b/stwzhj-api/stwzhj-api-data2es/src/main/java/org/dromara/data2es/api/domain/RemoteGpsInfo.java @@ -22,8 +22,8 @@ public class RemoteGpsInfo implements Serializable { * 类型 */ private String deviceType; - private String lat; - private String lng; + private String latitude; + private String longitude; //方向 private String orientation; //高程 diff --git a/stwzhj-api/stwzhj-api-system/src/main/java/org/dromara/system/api/domain/bo/RemoteDeviceBo.java b/stwzhj-api/stwzhj-api-system/src/main/java/org/dromara/system/api/domain/bo/RemoteDeviceBo.java index eead2ac0..2a49aa1e 100644 --- a/stwzhj-api/stwzhj-api-system/src/main/java/org/dromara/system/api/domain/bo/RemoteDeviceBo.java +++ b/stwzhj-api/stwzhj-api-system/src/main/java/org/dromara/system/api/domain/bo/RemoteDeviceBo.java @@ -83,6 +83,10 @@ public class RemoteDeviceBo implements Serializable { */ private String remark1; + private String createTime; + + private String updateTime; + /** * 备注字段2 */ diff --git a/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java b/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java index bff861b8..30d6b5c5 100644 --- a/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java +++ b/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java @@ -573,6 +573,73 @@ public class RedisUtils { System.out.println("redis:"+list); } + /** + * 批量插入数据,并为整个 RMap 设置过期时间 + * + * @param data 需要批量插入的数据 + * @param timeout 设置的过期时间 + * @param timeUnit 过期时间的单位 + */ + public static void batchPutWithExpire(Map data, long timeout, TimeUnit timeUnit) { + // 创建 RBatch 实例 + RBatch batch = CLIENT.createBatch(); + + // 获取 RMapAsync 对象 + RMapAsync mapAsync = batch.getMap("myMap"); + + // 批量操作:将多个数据添加到 map 中 + for (Map.Entry entry : data.entrySet()) { + mapAsync.putAsync(entry.getKey(), entry.getValue()); + } + + // 执行批量操作 + batch.execute(); + + // 获取同步的 RMap 对象并设置过期时间 + RMap mapSync = CLIENT.getMap("myMap"); + mapSync.expire(timeout, timeUnit); + } + + /** + * 批量插入数据 + * + * @param data 需要批量插入的数据 + */ + public static void batchPut(Map data) { + // 创建 RBatch 实例 + RBatch batch = CLIENT.createBatch(); + + // 获取 RMapAsync 对象 + RMapAsync mapAsync = batch.getMap("myMap"); + + // 批量操作:将多个数据添加到 map 中 + for (Map.Entry entry : data.entrySet()) { + mapAsync.putAsync(entry.getKey(), entry.getValue()); + } + + // 执行批量操作 + batch.execute(); + + } + + /** + * 根据 key 获取指定的值 + * + * @param key 要查询的 key + * @return 查询到的值 + */ + public static JSONObject getData(String key) { + // 获取同步的 RMap 对象 + RMap map = CLIENT.getMap("myMap"); + + // 根据 key 获取数据 + Object value = map.get(key); + if (null == value){ + return null; + } + return JSONUtil.parseObj(value.toString()); + } + /* * 模糊查询 * */ @@ -596,12 +663,41 @@ public class RedisUtils { return list; } + /** + * 根据模式(例如前缀)模糊匹配 Redis 中的 keys,并获取每个 key 的值 + * + * @param pattern 模糊匹配的模式,例如 "user:*" + * @return 匹配到的 key 和 value 对 + */ + public Map getMatchingKeysAndValues(String pattern) { + RKeys rKeys = CLIENT.getKeys(); + Iterable keysIterable = rKeys.getKeysByPattern(pattern); // 获取匹配的 key + + // 获取匹配的键值对 + RMap map = CLIENT.getMap("myMap"); + Map result = new java.util.HashMap<>(); + + List list = new ArrayList<>(); +// RBatch batch = CLIENT.createBatch(); + // 批量获取这些key的值 + for (String key : keysIterable) { + String value = map.get(key); // 获取每个 key 对应的 value + JSONObject jsonObject = JSONUtil.parseObj(value); + list.add(jsonObject); + } + + return result; + } + /* * 根据key获取RBucket值 * */ public static JSONObject getBucket(String key){ RBucket bucket = CLIENT.getBucket(key); Object value = bucket.get(); + if (null == value){ + return null; + } return JSONUtil.parseObj(value.toString()); } diff --git a/stwzhj-modules/pom.xml b/stwzhj-modules/pom.xml index c3b249e9..f057905b 100644 --- a/stwzhj-modules/pom.xml +++ b/stwzhj-modules/pom.xml @@ -16,6 +16,9 @@ stwzhj-workflow stwzhj-data2es stwzhj-baseToSt + stwzhj-consumer + stwzhj-location + stwzhj-dataToGas stwzhj-modules diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java index 0fe37b81..85a0e040 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java @@ -1,20 +1,9 @@ package org.dromara.kafka.consumer; -import com.ruansee.redis.JedisConfig; -import com.ruansee.redis.RedisConfig; -import com.ruansee.redis.RedisUtil; -import com.ruansee.redis.RedissionLockUtil; -import org.dromara.kafka.consumer.config.KafkaPropertiesConfig; -import org.redisson.spring.starter.RedissonAutoConfiguration; + import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; -import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration; -import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration; -import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.web.servlet.ServletComponentScan; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.FilterType; import org.springframework.scheduling.annotation.EnableAsync; /** @@ -25,7 +14,6 @@ import org.springframework.scheduling.annotation.EnableAsync; */ @SpringBootApplication @EnableAsync -@EnableConfigurationProperties({KafkaPropertiesConfig.class}) @ServletComponentScan public class KafkaConsumerApplication { public static void main(String[] args){ diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaProperties.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaProperties.java deleted file mode 100644 index 45606d28..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaProperties.java +++ /dev/null @@ -1,136 +0,0 @@ -package org.dromara.kafka.consumer.config; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Properties; - -public final class KafkaProperties -{ - private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); - - // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 - public final static String TOPIC = "t_gps_realtime"; - - private static Properties serverProps = new Properties(); - private static Properties producerProps = new Properties(); - - private static Properties consumerProps = new Properties(); - - private static Properties clientProps = new Properties(); - - private static KafkaProperties instance = null; - - private KafkaProperties() - { - String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; - - try - { - File proFile = new File(filePath + "producer.properties"); - - if (proFile.exists()) - { - producerProps.load(new FileInputStream(filePath + "producer.properties")); - } - - File conFile = new File(filePath + "producer.properties"); - - if (conFile.exists()) - { - consumerProps.load(new FileInputStream(filePath + "consumer.properties")); - } - - File serFile = new File(filePath + "server.properties"); - - if (serFile.exists()) - { - serverProps.load(new FileInputStream(filePath + "server.properties")); - } - - File cliFile = new File(filePath + "client.properties"); - - if (cliFile.exists()) - { - clientProps.load(new FileInputStream(filePath + "client.properties")); - } - } - catch (IOException e) - { - LOG.info("The Exception occured.", e); - } - } - - public synchronized static KafkaProperties getInstance() - { - if (null == instance) - { - instance = new KafkaProperties(); - } - - return instance; - } - - /** - * 获取参数值 - * @param key properites的key值 - * @param defValue 默认值 - * @return - */ - public String getValues(String key, String defValue) - { - String rtValue = null; - - if (null == key) - { - LOG.error("key is null"); - } - else - { - rtValue = getPropertiesValue(key); - } - - if (null == rtValue) - { - LOG.warn("KafkaProperties.getValues return null, key is " + key); - rtValue = defValue; - } - - LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); - - return rtValue; - } - - /** - * 根据key值获取server.properties的值 - * @param key - * @return - */ - private String getPropertiesValue(String key) - { - String rtValue = serverProps.getProperty(key); - - // server.properties中没有,则再向producer.properties中获取 - if (null == rtValue) - { - rtValue = producerProps.getProperty(key); - } - - // producer中没有,则再向consumer.properties中获取 - if (null == rtValue) - { - rtValue = consumerProps.getProperty(key); - } - - // consumer没有,则再向client.properties中获取 - if (null == rtValue) - { - rtValue = clientProps.getProperty(key); - } - - return rtValue; - } -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaPropertiesConfig.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaPropertiesConfig.java deleted file mode 100644 index 5f4edabf..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaPropertiesConfig.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.dromara.kafka.consumer.config; - -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Profile; - -/** - *

description:

- * - * @author chenle - * @date 2021-09-06 15:13 - */ -@ConfigurationProperties(prefix = "mykafka") -@Profile(value = "dev") -public -class KafkaPropertiesConfig { - private String serverUrl; - - private MyConsumerProperties consumerProperties = new MyConsumerProperties(); - - public String getServerUrl() { - return serverUrl; - } - - public void setServerUrl(String serverUrl) { - this.serverUrl = serverUrl; - } - - public MyConsumerProperties getConsumerProperties() { - return consumerProperties; - } - - public void setConsumerProperties(MyConsumerProperties consumerProperties) { - this.consumerProperties = consumerProperties; - } -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/MyConsumerProperties.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/MyConsumerProperties.java deleted file mode 100644 index 040def94..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/MyConsumerProperties.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.dromara.kafka.consumer.config; - -/** - *

description:

- * - * @author chenle - * @date 2021-09-07 14:54 - */ -public class MyConsumerProperties { - private String clientId; - private String groupId = "222"; - - public String getClientId() { - return clientId; - } - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java deleted file mode 100644 index 2205ee31..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java +++ /dev/null @@ -1,159 +0,0 @@ -package org.dromara.kafka.consumer.config; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.dromara.kafka.consumer.handler.KafkaSecurityUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.Properties; - - -public class NewConsumer extends Thread{ - private static final Logger LOG = LoggerFactory.getLogger(NewConsumer.class); - - private final KafkaConsumer consumer; - - private final String topic; - - // 一次请求的最大等待时间 - private final int waitTime = 10000; - - // Broker连接地址 - private final String bootstrapServers = "bootstrap.servers"; - // Group id - private final String groupId = "group.id"; - // 消息内容使用的反序列化类 - private final String valueDeserializer = "value.deserializer"; - // 消息Key值使用的反序列化类 - private final String keyDeserializer = "key.deserializer"; - // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT - private final String securityProtocol = "security.protocol"; - // 服务名 - private final String saslKerberosServiceName = "sasl.kerberos.service.name"; - // 域名 - private final String kerberosDomainName = "kerberos.domain.name"; - // 是否自动提交offset - private final String enableAutoCommit = "enable.auto.commit"; - // 自动提交offset的时间间隔 - private final String autoCommitIntervalMs = "auto.commit.interval.ms"; - - // 会话超时时间 - private final String sessionTimeoutMs = "session.timeout.ms"; - - /** - * 用户自己申请的机机账号keytab文件名称 - */ - private static final String USER_KEYTAB_FILE = "user.keytab"; - - /** - * 用户自己申请的机机账号名称 - */ - private static final String USER_PRINCIPAL = "aqdsj_ruansi"; - - /** - * NewConsumer构造函数 - * @param topic 订阅的Topic名称 - */ - public NewConsumer(String topic) { - - Properties props = new Properties(); - - KafkaProperties kafkaProc = KafkaProperties.getInstance(); - // Broker连接地址 - props.put(bootstrapServers, - kafkaProc.getValues(bootstrapServers, "localhost:21007")); - // Group id - props.put(groupId, "DemoConsumer"); - // 是否自动提交offset - props.put(enableAutoCommit, "true"); - // 自动提交offset的时间间隔 - props.put(autoCommitIntervalMs, "1000"); - // 会话超时时间 - props.put(sessionTimeoutMs, "30000"); - // 消息Key值使用的反序列化类 - props.put(keyDeserializer, - "org.apache.kafka.common.serialization.IntegerDeserializer"); - // 消息内容使用的反序列化类 - props.put(valueDeserializer, - "org.apache.kafka.common.serialization.StringDeserializer"); - // 安全协议类型 - props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT")); - // 服务名 - props.put(saslKerberosServiceName, "kafka"); - // 域名 - props.put(kerberosDomainName, kafkaProc.getValues(kerberosDomainName, "hadoop.hadoop.com")); - consumer = new KafkaConsumer(props); - this.topic = topic; - } - - /** - * 订阅Topic的消息处理函数 - */ - public void doWork() - { - // 订阅 - consumer.subscribe(Collections.singletonList(this.topic)); - // 消息消费请求 - ConsumerRecords records = consumer.poll(waitTime); - // 消息处理 - for (ConsumerRecord record : records) - { - LOG.info("[NewConsumerExample], Received message: (" + record.key() + ", " + record.value() - + ") at offset " + record.offset()); - } - } - - - - public static void main(String[] args) - { - if (KafkaSecurityUtil.isSecurityModel()) - { - try - { - LOG.info("Securitymode start."); - - //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 - KafkaSecurityUtil.securityPrepare(); - } - catch (IOException e) - { - LOG.error("Security prepare failure."); - LOG.error("The IOException occured : {}.", e); - return; - } - LOG.info("Security prepare success."); - } - - NewConsumer consumerThread = new NewConsumer(KafkaProperties.TOPIC); - consumerThread.start(); - - // 等到60s后将consumer关闭,实际执行过程中可修改 - try - { - Thread.sleep(60000); - } - catch (InterruptedException e) - { - LOG.info("The InterruptedException occured : {}.", e); - } - finally - { - consumerThread.shutdown(); - consumerThread.consumer.close(); - } - } - - @Override - public synchronized void start() { - doWork(); - } - - private void shutdown(){ - Thread.currentThread().interrupt(); - } -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java index 93b5fa56..376bd816 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java @@ -25,8 +25,8 @@ public class EsGpsInfo implements Serializable { * 类型 */ private String deviceType; - private String lat; - private String lng; + private String latitude; + private String longitude; //方向 private String orientation; //高程 diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java index 37634b12..0aa0bc3a 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java @@ -4,11 +4,13 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; import cn.hutool.core.convert.ConvertException; import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; import com.ruansee.response.ApiResponse; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.dubbo.config.annotation.DubboReference; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -17,9 +19,13 @@ import org.dromara.data2es.api.RemoteDataToEsService; import org.dromara.data2es.api.domain.RemoteGpsInfo; import org.dromara.kafka.consumer.entity.EsGpsInfo; import org.dromara.kafka.consumer.entity.EsGpsInfoVO; +import org.dromara.system.api.domain.bo.RemoteDeviceBo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.KafkaListener; +import java.text.DateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -32,88 +38,36 @@ import java.util.concurrent.LinkedBlockingDeque; * @author chenle * @date 2021-09-06 16:44 */ -public class ConsumerWorker implements Runnable { - private ConsumerRecord record; +@Slf4j +@Configuration +public class ConsumerWorker { private Logger logger = LoggerFactory.getLogger(ConsumerWorker.class); public static LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque<>(5000); - private String cityCode ; + public static LinkedBlockingDeque basedataDeque = new LinkedBlockingDeque<>(5000); - ConsumerWorker(ConsumerRecord record, String cityCode) { - this.record = record; - this.cityCode = cityCode; - } - @Override - public void run() { - //其他地市使用的方法,这里使用了一个巧妙的方法,我们开发的地市都是传4位,这种其他地市的cityCode传大于4位,然后截取 - if(cityCode.length() > 4){ - cityCode = cityCode.substring(0,4); - normalRequest(); - }else { - //六安、安庆等地市的方法,这些地市都是我们自己公司开发的东西。 - luanrequest(); -// luanrequestBatch(); - } - } - - /* - * 废弃方法 - * */ - private void luanrequestBatch() { - Object value = record.value(); - String topic = record.topic(); - List list = new ArrayList<>(); - logger.info("offset={},topic={},value={}", record.offset(), topic,value); - List jsonObjects = JSON.parseArray((String) value, JSONObject.class); - for (JSONObject jsonObject : jsonObjects) { - EsGpsInfo esGpsInfo; - /*try { - jsonObject = JSONUtil.parseObj(((String) value)); - }catch (ConvertException e){ - logger.info("jsonObject=null:error={}",e.getMessage()); - return; - }*/ - try { - esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class); - }catch (ConvertException e){ - logger.info("EsGpsInfo=null:error={}",e.getMessage()); - return; - } - - if(Objects.isNull(esGpsInfo)){ - logger.info("esGpsInfo=null no error"); - return; - } - String deviceCode = esGpsInfo.getDeviceCode(); - if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){ - logger.info("deviceCode:{} is null or is too long ",deviceCode); - return; - } - String latitude = esGpsInfo.getLat(); - if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){ - logger.info("latitude:{} is null or is zero ",latitude); - return; - } - String longitude = esGpsInfo.getLng(); - if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){ - logger.info("longitude:{} is null or is zero ",longitude); - return; - } - esGpsInfo.setInfoSource(cityCode); - - esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime")))); - list.add(esGpsInfo); - } -// dataToEsService.saveGpsInfoBatch(list); - } - - private void luanrequest() { + @KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}",properties = { + "auto.offset.reset:latest"}) + public void consumer(ConsumerRecord record) { Object value = record.value(); String topic = record.topic(); - logger.info("offset={},topic={},value={}", record.offset(), topic,value); + if ("jysb_dwxx".equals(topic)){ //定位信息 +// logger.info("offset={},topic={},value={}", record.offset(), topic,value); + luanrequest(value); + } else if ("jysb_sbxx".equals(topic)) { //基础信息 +// logger.info("offset={},topic={},value={}", record.offset(), topic,value); + baseDataRequest(value); + } + + + } + + + + private void luanrequest(Object value) { RemoteGpsInfo esGpsInfo; JSONObject jsonObject; try { @@ -138,17 +92,16 @@ public class ConsumerWorker implements Runnable { logger.info("deviceCode:{} is null or is too long ",deviceCode); return; } - String latitude = esGpsInfo.getLat(); + String latitude = esGpsInfo.getLatitude(); if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){ logger.info("latitude:{} is null or is zero ",latitude); return; } - String longitude = esGpsInfo.getLng(); + String longitude = esGpsInfo.getLongitude(); if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){ logger.info("longitude:{} is null or is zero ",longitude); return; } - esGpsInfo.setInfoSource(cityCode); try { esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime")))); }catch (Exception e){ @@ -162,73 +115,73 @@ public class ConsumerWorker implements Runnable { } logger.info("code={},msg={}",response.getCode(),response.getMsg()); if(200 == response.getCode()){ - logger.info("topic={},data2es={},gpsTime={}",topic,"success",esGpsInfo.getGpsTime()); + logger.info("topic=jysb_dwxx,data2es={},gpsTime={}","success",esGpsInfo.getGpsTime()); }else{ - logger.info("topic={},data2es={}",topic,response.getMsg()); + logger.info("topic=jysb_dwxx,data2es={}",response.getMsg()); } } - - - /** - * 通用的请求(一般地市采用这个方法) - */ - private void normalRequest() { - Object value = record.value(); - String topic = record.topic(); - - logger.info("offset={},topic={},value={}", record.offset(), topic,value); - - RemoteGpsInfo esGpsInfo = new RemoteGpsInfo(); - EsGpsInfoVO esGpsInfoVO; + /* + * 处理基础数据上传 + * */ + private void baseDataRequest(Object value){ + RemoteDeviceBo deviceBo; + JSONObject jsonObject; try { - esGpsInfoVO = JSONUtil.toBean(((String) value), EsGpsInfoVO.class); + jsonObject = JSONUtil.parseObj(((String) value)); }catch (ConvertException e){ - logger.info("esGpsInfoVO=null:error={}",e.getMessage()); + logger.info("jsonObject=null:error={}",e.getMessage()); return; } - if(Objects.isNull(esGpsInfoVO)){ - logger.info("esGpsInfoVO=null no error"); - return; - } - - try { - DateTime parse = DateUtil.parse(esGpsInfoVO.getGpsTime(), "yyyy-MM-dd HH:mm:ss"); - }catch (Exception e){ - logger.info("gpsTime:{} format error", esGpsInfoVO.getGpsTime()); + deviceBo = JSONUtil.toBean(jsonObject, RemoteDeviceBo.class); + }catch (ConvertException e){ + logger.info("Device=null:error={}",e.getMessage()); return; } - - String deviceCode = esGpsInfoVO.getDeviceCode(); - if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){ - logger.info("deviceCode:{} is null or is too long ",deviceCode); + if(Objects.isNull(deviceBo)){ + logger.info("deviceBo=null no error"); return; } - String latitude = esGpsInfoVO.getLatitude(); - if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){ - logger.info("latitude:{} is null or is zero ",latitude); + if (StringUtils.isEmpty(deviceBo.getDeviceCode())){ + logger.info("deviceCode is null"); return; } - String longitude = esGpsInfoVO.getLongitude(); - if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){ - logger.info("longitude:{} is null or is zero ",longitude); + if (StringUtils.isEmpty(deviceBo.getInfoSource())){ + logger.info("infoSource is null"); return; } - BeanUtil.copyProperties(esGpsInfoVO,esGpsInfo,new CopyOptions()); - esGpsInfo.setLat(latitude); - esGpsInfo.setLng(esGpsInfoVO.getLongitude()); - esGpsInfo.setOrientation(esGpsInfoVO.getDirection()); - esGpsInfo.setInfoSource(cityCode); - - boolean offer = linkedBlockingDeque.offer(esGpsInfo); + if (!StringUtils.isEmpty(deviceBo.getCreateTime())){ + try { + Date createTime = new Date(Long.valueOf(jsonObject.getStr("createTime"))); + deviceBo.setCreateTime(DateUtil.format(createTime, "yyyy-MM-dd HH:mm:ss")); + }catch (Exception e){ + logger.error("error_msg={}",e.getMessage()); + } + } + if (!StringUtils.isEmpty(deviceBo.getUpdateTime())){ + try { + Date updateTime = new Date(Long.valueOf(jsonObject.getStr("updateTime"))); + deviceBo.setUpdateTime(DateUtil.format(updateTime, "yyyy-MM-dd HH:mm:ss")); + }catch (Exception e){ + logger.error("error_msg={}",e.getMessage()); + } + } + logger.info("deviceBo={}",deviceBo); + boolean offer = basedataDeque.offer(deviceBo); R response = R.ok(offer); + if(Objects.isNull(response)){ + logger.info("response == null"); + } + logger.info("code={},msg={}",response.getCode(),response.getMsg()); if(200 == response.getCode()){ - logger.info("topic={},data2es={}",topic,"success"); + logger.info("topic=jysb_sbxx,data2es={},deviceCode={}","success",deviceBo.getDeviceCode()); }else{ - logger.error("topic={},data2es={}",topic,"fail"); + logger.info("topic=jysb_sbxx,data2es={}",response.getMsg()); } } + + } diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java index cc798e00..e43afdd5 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java @@ -7,6 +7,8 @@ import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.data2es.api.RemoteDataToEsService; import org.dromara.data2es.api.domain.RemoteGpsInfo; import org.dromara.kafka.consumer.entity.EsGpsInfo; +import org.dromara.system.api.RemoteDeviceService; +import org.dromara.system.api.domain.bo.RemoteDeviceBo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Configuration; @@ -31,21 +33,31 @@ public class DataInsertBatchHandler implements CommandLineRunner { @DubboReference private RemoteDataToEsService gpsService; + @DubboReference + private RemoteDeviceService deviceService; + @Override public void run(String... args) throws Exception { ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); - LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque; + LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque; //定位信息队列 + LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列 singleThreadExecutor.execute(new Runnable() { @Override public void run() { while (true) { try { List list = new ArrayList<>(); + List bases = new ArrayList<>(); Queues.drain(linkedBlockingDeque, list, 200, 5, TimeUnit.SECONDS); + Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS); log.info("batch size={}", list.size()); + log.info("basedata size={}", bases.size()); if(CollectionUtil.isNotEmpty(list)) { gpsService.saveDataBatch(list); } + if(CollectionUtil.isNotEmpty(bases)) { + deviceService.batchSaveDevice(bases); + } } catch (Exception e) { log.error("缓存队列批量消费异常:{}", e.getMessage()); } diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaConsumerRunnable.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaConsumerRunnable.java deleted file mode 100644 index 6d21ea5f..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaConsumerRunnable.java +++ /dev/null @@ -1,98 +0,0 @@ -package org.dromara.kafka.consumer.handler; - -import org.apache.dubbo.config.annotation.DubboReference; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.dromara.data2es.api.RemoteDataToEsService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.MessageListener; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadPoolExecutor; - -/** - *

description:

- * - * @author chenle - * @date 2021-09-06 16:39 - */ -public class KafkaConsumerRunnable implements Runnable { - - private Map props; - private ThreadPoolExecutor taskExecutor; - - private String cityCode; - private Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnable.class); - - public KafkaConsumerRunnable(Map props, ThreadPoolExecutor taskExecutor, - String cityCode) { - this.props = props; - this.taskExecutor = taskExecutor; - this.cityCode = cityCode; - } - - private DefaultKafkaConsumerFactory buildConsumerFactory(){ - return new DefaultKafkaConsumerFactory(props); - } - - private ContainerProperties containerProperties(String[] topic, MessageListener messageListener) { - ContainerProperties containerProperties = new ContainerProperties(topic); - containerProperties.setMessageListener(messageListener); - return containerProperties; - } - - private KafkaListenerContainerFactory buildListenerFactory(){ - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); - factory.setConsumerFactory(buildConsumerFactory()); - factory.setConcurrency(4); - factory.setBatchListener(true); - - factory.getContainerProperties().setPollTimeout(3000); - return factory; - } - - - - - - - @Override - public void run() { - KafkaConsumer consumer = new KafkaConsumer<>(props); - - List topics = (List) props.get("topics"); - consumer.subscribe(topics); - consumer.poll(0); // 令订阅生效 - - List topicPartitions = new ArrayList<>(); - Map> stringListMap = consumer.listTopics(); - for (Object topic : topics) { - String topic1 = (String) topic; - List partitionInfos = stringListMap.get(topic1); - for (PartitionInfo partitionInfo : partitionInfos) { - TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition()); - topicPartitions.add(partition); - } - } - consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端 - - while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - taskExecutor.submit(new ConsumerWorker(record, cityCode)); - } - - } - } -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaSecurityUtil.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaSecurityUtil.java deleted file mode 100644 index cf4a3238..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaSecurityUtil.java +++ /dev/null @@ -1,108 +0,0 @@ -package org.dromara.kafka.consumer.handler; - -import cn.hutool.core.date.DateTime; -import cn.hutool.core.date.DateUtil; -import org.dromara.kafka.consumer.entity.EsGpsInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -/** - *

description:

- * - * @author chenle - * @date 2021-10-28 14:48 - */ -public class KafkaSecurityUtil { - - - - - static Logger logger = LoggerFactory.getLogger(KafkaSecurityUtil.class); - - public static void main(String[] args) { - EsGpsInfo esGpsInfo = new EsGpsInfo(); - String realtime = "2021/11/04 12:00:11"; - DateTime dateTime = DateUtil.parse(realtime); - esGpsInfo.setGpsTime(dateTime.toJdkDate()); - logger.info("esGpsInfo:{},deviceType={},gpsTime={}",esGpsInfo.toString(), - esGpsInfo.getDeviceType(),dateTime.toJdkDate().toString()); - } - /** - * 用户自己申请的机机账号keytab文件名称 - */ - private static final String USER_KEYTAB_FILE = "user.keytab"; - - /** - * 用户自己申请的机机账号名称 - */ - private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM"; - - public static void securityPrepare() throws IOException - { - logger.error("进入了---securityPrepare"); - //String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; - //String krbFile = filePath + "krb5.conf"; - //ClassPathResource classPathResource = new ClassPathResource("krb5.conf"); - //String krbFile = classPathResource.getAbsolutePath(); - String krbFile = "/gpsstore/krb5.conf"; -// String userKeyTableFile = filePath + USER_KEYTAB_FILE; - //ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE); - String userKeyTableFile = "/gpsstore/user.keytab"; - - //windows路径下分隔符替换 - userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); - krbFile = krbFile.replace("\\", "\\\\"); - - LoginUtil.setKrb5Config(krbFile); - LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); - logger.error("userKeyTableFile路径---{}",userKeyTableFile); - LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); - } - - public static Boolean isSecurityModel() - { - Boolean isSecurity = false; - //String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; - //ClassPathResource classPathResource = new ClassPathResource("kafkaSecurityMode"); - InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafkaSecurityMode"); - - /*File file = classPathResource.getFile(); - - if(!file.exists()){ - return isSecurity; - }*/ - - Properties securityProps = new Properties(); - - - try - { - securityProps.load(inputStream); - if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) - { - isSecurity = true; - } - } - catch (Exception e) - { - logger.info("The Exception occured : {}.", e); - } - - return isSecurity; - } - - /* - * 判断文件是否存在 - */ - private static boolean isFileExists(String fileName) - { - File file = new File(fileName); - - return file.exists(); - } -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java deleted file mode 100644 index 8fa4b0bf..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java +++ /dev/null @@ -1,215 +0,0 @@ -package org.dromara.kafka.consumer.handler; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; - -/** - *

description:

- * - * @author chenle - * @date 2021-10-28 15:40 - */ -public class LoginUtil -{ - - public enum Module - { - STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); - - private String name; - - private Module(String name) - { - this.name = name; - } - - public String getName() - { - return name; - } - } - - /** - * line operator string - */ - private static final String LINE_SEPARATOR = System.getProperty("line.separator"); - - /** - * jaas file postfix - */ - private static final String JAAS_POSTFIX = ".jaas.conf"; - - /** - * is IBM jdk or not - */ - private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); - - /** - * IBM jdk login module - */ - private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; - - /** - * oracle jdk login module - */ - private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; - - /** - * Zookeeper quorum principal. - */ - public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; - - /** - * java security krb5 file path - */ - public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; - - /** - * java security login file path - */ - public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; - - /** - * 设置jaas.conf文件 - * - * @param principal - * @param keytabPath - * @throws IOException - */ - public static void setJaasFile(String principal, String keytabPath) - throws IOException - { - String jaasPath = - new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") - + JAAS_POSTFIX; - - // windows路径下分隔符替换 - jaasPath = jaasPath.replace("\\", "\\\\"); - // 删除jaas文件 - deleteJaasFile(jaasPath); - writeJaasFile(jaasPath, principal, keytabPath); - System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); - } - - /** - * 设置zookeeper服务端principal - * - * @param zkServerPrincipal - * @throws IOException - */ - public static void setZookeeperServerPrincipal(String zkServerPrincipal) - throws IOException - { - System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); - String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); - if (ret == null) - { - throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); - } - if (!ret.equals(zkServerPrincipal)) - { - throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); - } - } - - /** - * 设置krb5文件 - * - * @param krb5ConfFile - * @throws IOException - */ - public static void setKrb5Config(String krb5ConfFile) - throws IOException - { - System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); - String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); - if (ret == null) - { - throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); - } - if (!ret.equals(krb5ConfFile)) - { - throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); - } - } - - /** - * 写入jaas文件 - * - * @throws IOException - * 写文件异常 - */ - private static void writeJaasFile(String jaasPath, String principal, String keytabPath) - throws IOException - { - FileWriter writer = new FileWriter(new File(jaasPath)); - try - { - writer.write(getJaasConfContext(principal, keytabPath)); - writer.flush(); - } - catch (IOException e) - { - throw new IOException("Failed to create jaas.conf File"); - } - finally - { - writer.close(); - } - } - - private static void deleteJaasFile(String jaasPath) - throws IOException - { - File jaasFile = new File(jaasPath); - if (jaasFile.exists()) - { - if (!jaasFile.delete()) - { - throw new IOException("Failed to delete exists jaas file."); - } - } - } - - private static String getJaasConfContext(String principal, String keytabPath) - { - Module[] allModule = Module.values(); - StringBuilder builder = new StringBuilder(); - for (Module modlue : allModule) - { - builder.append(getModuleContext(principal, keytabPath, modlue)); - } - return builder.toString(); - } - - private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) - { - StringBuilder builder = new StringBuilder(); - if (IS_IBM_JDK) - { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); - builder.append("credsType=both").append(LINE_SEPARATOR); - builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); - builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } - else - { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); - builder.append("useKeyTab=true").append(LINE_SEPARATOR); - builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); - builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); - builder.append("useTicketCache=false").append(LINE_SEPARATOR); - builder.append("storeKey=true").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } - - return builder.toString(); - } -} - diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java deleted file mode 100644 index 29307c6f..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java +++ /dev/null @@ -1,130 +0,0 @@ -package org.dromara.kafka.consumer.handler; - - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.dromara.kafka.consumer.config.KafkaPropertiesConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; - -/** - *

description:

- * - * @author chenle - * @date 2021-09-06 11:15 - */ -@Component -public class RealConsumer implements CommandLineRunner { - - private String kafkaServers; - - private String groupId; - - private String topics; - - private String cityCode = "3400"; - - - - @Autowired - KafkaPropertiesConfig kafkaPropertiesConfig; - - @Autowired - ThreadPoolExecutor dtpExecutor2; - - - private Logger logger = LoggerFactory.getLogger(RealConsumer.class); - - @Override - public void run(String... args) throws Exception { - kafkaServers = "127.0.0.1:9092"; - topics = "topic.send.2,topic.send.3,topic.send.4,topic.send.5,topic.send.8"; - groupId = "group_ruansi_xuancheng"; - cityCode = "3418"; - if(args.length > 0){ - /*kafkaServers = args[0]; - topics = args[1]; - groupId = args[2]; - cityCode = args[3];*/ - - } - ExecutorService executorService = Executors.newSingleThreadExecutor(); - Map kafkaProp = getKafkaProp(); - - - if (KafkaSecurityUtil.isSecurityModel()) - { - try - { - logger.info("Securitymode start."); - - //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 - //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT - kafkaProp.put("security.protocol","SASL_PLAINTEXT"); - //服务名 - kafkaProp.put("sasl.kerberos.service.name","kafka"); - //域名 - kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com"); - KafkaSecurityUtil.securityPrepare(); - } - catch (IOException e) - { - logger.error("Security prepare failure."); - logger.error("The IOException occured.", e); - return; - } - logger.info("Security prepare success."); - } - - KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode); - executorService.execute(runnable); - } - - - - /** - * 获取kafka配置 - * @return - */ - private Map getKafkaProp() { -// Properties map = new Properties(); - Map map = new HashMap<>(); - map.put("bootstrap.servers",kafkaServers); - map.put("group.id",groupId); - map.put("enable.auto.commit", "true"); - map.put("auto.commit.interval.ms", "1000"); - map.put("session.timeout.ms", "30000"); - map.put("key.deserializer", StringDeserializer.class); - map.put("value.deserializer", StringDeserializer.class); - map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5); -// map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1000 * 5); -// map.put("ack.mode", "manual_immediate"); - -// //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT -// map.put("security.protocol","SASL_PLAINTEXT"); -// //服务名 -// map.put("sasl.kerberos.service.name","kafka"); -// //域名 -// map.put("kerberos.domain.name","hadoop.hadoop.com"); - String[] split = topics.split(","); - List list = CollectionUtils.arrayToList(split); - map.put("topics", list); - return map; - } - - - -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/resources/application.yml b/stwzhj-modules/stwzhj-consumer/src/main/resources/application.yml index 760a3a62..d7fda697 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/resources/application.yml +++ b/stwzhj-modules/stwzhj-consumer/src/main/resources/application.yml @@ -30,3 +30,4 @@ spring: config: import: - optional:nacos:application-common.yml + - optional:nacos:${spring.application.name}.yml diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/ElasticsearchConfig.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/ElasticsearchConfig.java index a41e0289..44217c19 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/ElasticsearchConfig.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/ElasticsearchConfig.java @@ -68,7 +68,7 @@ public class ElasticsearchConfig { RestClientBuilder builder = RestClient.builder(httpHost); // 设置用户名、密码 CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); -// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); // 连接延时配置 builder.setRequestConfigCallback(requestConfigBuilder -> { requestConfigBuilder.setConnectTimeout(connectTimeOut); diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/controller/DataToEsController.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/controller/DataToEsController.java index e0f666a2..66a88528 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/controller/DataToEsController.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/controller/DataToEsController.java @@ -92,8 +92,8 @@ public class DataToEsController extends BaseController { esGpsInfo.setInfoSource("3401"); esGpsInfo.setGpsTime(new Date()); - esGpsInfo.setLat("31.1" + (a + i)); - esGpsInfo.setLng("117.2" + (b + i)); + esGpsInfo.setLatitude("31.1" + (a + i)); + esGpsInfo.setLongitude("117.2" + (b + i)); esGpsInfo.setZzjgdm("340100000000"); esGpsInfo.setZzjgmc("合肥市公安局"); esGpsInfo.setCarNum("霍邱看守所01"); diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/domain/EsGpsInfo.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/domain/EsGpsInfo.java index f38a5369..342d0623 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/domain/EsGpsInfo.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/domain/EsGpsInfo.java @@ -15,8 +15,8 @@ public class EsGpsInfo implements Serializable { */ private String deviceCode; private String deviceType; - private String lat; - private String lng; + private String latitude; + private String longitude; //方向 private String orientation; //高程 diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/dubbo/RemoteDataToEsServiceImpl.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/dubbo/RemoteDataToEsServiceImpl.java index 716d6600..449f62b4 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/dubbo/RemoteDataToEsServiceImpl.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/dubbo/RemoteDataToEsServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.data2es.dubbo; +import cn.hutool.core.bean.BeanUtil; import lombok.RequiredArgsConstructor; import org.apache.dubbo.config.annotation.DubboService; import org.dromara.common.core.domain.R; @@ -21,6 +22,6 @@ public class RemoteDataToEsServiceImpl implements RemoteDataToEsService { @Override public R saveDataBatch(List gpsInfoList) { - return gpsService.saveDataBatch(MapstructUtils.convert(gpsInfoList, EsGpsInfoVO2.class)); + return gpsService.saveDataBatch(BeanUtil.copyToList(gpsInfoList, EsGpsInfoVO2.class)); } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java index 438d8507..0396544c 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java @@ -29,6 +29,7 @@ import org.springframework.scheduling.annotation.Async; import java.io.IOException; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; @Configuration public class RequestHandler { @@ -93,6 +94,18 @@ public class RequestHandler { RedisUtils.batchInsert(map,time); } + /** + * 批量在线用户存入 + * @param map + */ + public void batchPut(Map map){ + RedisUtils.batchPut(map); + } + + public void batchPutWithExpire(Map map,long time){ + RedisUtils.batchPutWithExpire(map,time, TimeUnit.SECONDS); + } + @Async public void redisDeleteBatch(List deleteKeys){ RedisUtils.deleteObject(deleteKeys); diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/schedule/GpsTaskTest.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/schedule/GpsTaskTest.java index bbbaf47a..fbdb6a13 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/schedule/GpsTaskTest.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/schedule/GpsTaskTest.java @@ -45,9 +45,9 @@ public class GpsTaskTest { map.put("gpsTime",new Date()); map.put("locationDesc","合肥市公安局"); - esGpsInfo.setLat("31.3" + (a + i)); + esGpsInfo.setLatitude("31.3" + (a + i)); map.put("lat","31." + (a + i)); - esGpsInfo.setLng("117.2" + (b + i)); + esGpsInfo.setLongitude("117.2" + (b + i)); map.put("lng","117." + (b + i)); //gpsService.saveData(map); diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java index 9def2bf2..a8ba7fca 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java @@ -4,12 +4,14 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; +import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import org.apache.commons.lang.StringUtils; import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.common.core.domain.R; import org.dromara.common.core.utils.RedisConstants; +import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.domain.EsGpsInfo; import org.dromara.data2es.domain.EsGpsInfoVO2; import org.dromara.data2es.domain.entity.GpsInfoEntity; @@ -63,10 +65,10 @@ public class GpsServiceImpl implements IGpsService { double lng ; double lat ; try { - lng = Double.parseDouble(esGpsInfo.getLng()); - lat = Double.parseDouble(esGpsInfo.getLat()); + lng = Double.parseDouble(esGpsInfo.getLongitude()); + lat = Double.parseDouble(esGpsInfo.getLatitude()); }catch (NumberFormatException e){ - throw new MyBusinessException("经纬度转double异常,经度为:"+esGpsInfo.getLng() +"纬度为"+esGpsInfo.getLat()); + throw new MyBusinessException("经纬度转double异常,经度为:"+esGpsInfo.getLongitude() +"纬度为"+esGpsInfo.getLatitude()); } gpsInfoEntity.setLocation(new Double[]{lng,lat}); @@ -103,7 +105,6 @@ public class GpsServiceImpl implements IGpsService { List deleteKeys = new ArrayList<>(); BulkRequest bulkRequest = new BulkRequest(); for (EsGpsInfoVO2 info : list) { - if(StringUtils.isBlank(info.getInfoSource())){ logger.info("infoSource 为空"); continue; @@ -112,7 +113,7 @@ public class GpsServiceImpl implements IGpsService { info = getInfoByInfoSource(info); //redis buildRedisMap(info,onlineUserDataMap,orgCodeDataMap,deleteKeys); - +// logger.error("接收数据={},deviceCode={},gpsTime={}",info,info.getDeviceCode(),info.getGpsTime()); IndexRequest indexRequest = buildEsIndexRequest(info); bulkRequest.add(indexRequest); @@ -121,7 +122,9 @@ public class GpsServiceImpl implements IGpsService { } requestHandler.redisOnlineUserBatch(onlineUserDataMap, RedisConstants.REDIS_ONLINE_USER_NEVER_EXPIRE); - requestHandler.redisOnlineUserBatch(orgCodeDataMap, 300); + requestHandler.redisOnlineUserBatch(orgCodeDataMap, 600); +// requestHandler.batchPut(onlineUserDataMap); +// requestHandler.batchPutWithExpire(orgCodeDataMap,600); requestHandler.redisDeleteBatch(deleteKeys); requestHandler.esRealBulkSave(bulkRequest); @@ -138,33 +141,31 @@ public class GpsServiceImpl implements IGpsService { private EsGpsInfoVO2 getInfoByInfoSource(EsGpsInfo esGpsInfo) { EsGpsInfoVO2 esGpsInfoVO2 = new EsGpsInfoVO2(); BeanUtil.copyProperties(esGpsInfo,esGpsInfoVO2); - if(null == esGpsInfoVO2.getZzjgdm() || "".equals(esGpsInfoVO2.getZzjgdm())){ - RemoteDeviceVo vo = deviceService.getDeviceInfo(esGpsInfoVO2.getDeviceCode(),esGpsInfoVO2.getDeviceType()); - if (null != vo){ - esGpsInfoVO2.setZzjgdm(vo.getZzjgdm()); - esGpsInfoVO2.setZzjgmc(vo.getZzjgmc()); - esGpsInfoVO2.setPoliceName(vo.getPoliceName()); - esGpsInfoVO2.setPoliceNo(vo.getPoliceNo()); - esGpsInfoVO2.setCarNum(vo.getCarNum()); - String deviceType = vo.getDeviceType(); - if(StringUtils.isNotBlank(deviceType)){ - deviceType = deviceType.replaceAll("\"", ""); - if(deviceType.charAt(0) == '0' && deviceType.length() > 1){ - deviceType = deviceType.substring(1); - if(deviceType.equals("1")){ - deviceType = "2"; - } + JSONObject object = RedisUtils.getBucket("deviceInfo:"+esGpsInfo.getInfoSource()+":"+esGpsInfo.getDeviceCode()); +// RemoteDeviceVo vo = deviceService.getDeviceInfo(esGpsInfoVO2.getDeviceCode(),esGpsInfoVO2.getInfoSource()); + if (null != object){ + RemoteDeviceVo vo = BeanUtil.toBean(object,RemoteDeviceVo.class); + esGpsInfoVO2.setZzjgdm(vo.getZzjgdm()); + esGpsInfoVO2.setZzjgmc(vo.getZzjgmc()); + esGpsInfoVO2.setPoliceName(vo.getPoliceName()); + esGpsInfoVO2.setPoliceNo(vo.getPoliceNo()); + esGpsInfoVO2.setCarNum(vo.getCarNum()); + String deviceType = vo.getDeviceType(); + if(StringUtils.isNotBlank(deviceType)){ + deviceType = deviceType.replaceAll("\"", ""); + if(deviceType.charAt(0) == '0' && deviceType.length() > 1){ + deviceType = deviceType.substring(1); + if(deviceType.equals("1")){ + deviceType = "2"; } } - esGpsInfoVO2.setDeviceType(deviceType); - }else { - esGpsInfoVO2.setDeviceType("99"); - esGpsInfoVO2.setZzjgdm(esGpsInfo.getInfoSource()+"00000000"); } - + esGpsInfoVO2.setDeviceType(deviceType); + }else { + esGpsInfoVO2.setDeviceType("99"); + esGpsInfoVO2.setZzjgdm(esGpsInfo.getInfoSource()+"00000000"); } - return esGpsInfoVO2; } @@ -246,10 +247,10 @@ public class GpsServiceImpl implements IGpsService { double lng ; double lat ; try { - lng = Double.parseDouble(esGpsInfo.getLng()); - lat = Double.parseDouble(esGpsInfo.getLat()); + lng = Double.parseDouble(esGpsInfo.getLongitude()); + lat = Double.parseDouble(esGpsInfo.getLatitude()); }catch (NumberFormatException e){ - throw new MyBusinessException("经纬度转double异常,经度为:"+esGpsInfo.getLng() +"纬度为"+esGpsInfo.getLat()); + throw new MyBusinessException("经纬度转double异常,经度为:"+esGpsInfo.getLongitude() +"纬度为"+esGpsInfo.getLatitude()); } gpsInfoEntity.setLocation(new Double[]{lng,lat}); diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/TDevice.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/TDevice.java index a87818ec..0b81b76f 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/TDevice.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/TDevice.java @@ -92,10 +92,8 @@ public class TDevice { */ private String remark2; - @TableField(fill = FieldFill.INSERT) private String createTime; - @TableField(fill = FieldFill.INSERT_UPDATE) private String updateTime; private String lrdwdm; diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/dubbo/RemoteDeviceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/dubbo/RemoteDeviceImpl.java index ed7b2b20..cfd66a8e 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/dubbo/RemoteDeviceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/dubbo/RemoteDeviceImpl.java @@ -57,10 +57,10 @@ public class RemoteDeviceImpl implements RemoteDeviceService { } @Override - public RemoteDeviceVo getDeviceInfo(String deviceCode, String deviceType) { + public RemoteDeviceVo getDeviceInfo(String deviceCode, String infoSource) { TDeviceBo bo = new TDeviceBo(); bo.setDeviceCode(deviceCode); - bo.setDeviceType(deviceType); + bo.setInfoSource(infoSource); return BeanUtil.toBean(deviceService.queryOne(bo), RemoteDeviceVo.class) ; } } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java index 07f90884..e19dbc9e 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java @@ -1,29 +1,71 @@ package org.dromara.system.schedule; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.date.DateUtil; import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.RedisConstants; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.system.domain.DeviceRedis; +import org.dromara.system.domain.bo.TDeviceBo; +import org.dromara.system.domain.vo.TDeviceVo; import org.dromara.system.service.IDeviceRedisService; +import org.dromara.system.service.ITDeviceService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; +import javax.annotation.PostConstruct; +import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; @Configuration +@Slf4j public class DeviceRedisSchedule { @Autowired IDeviceRedisService redisService; + @Autowired + ITDeviceService deviceService; + + @Value("${deviceInfo.lastUpdateTime}") + private String lastUpdateTime; + /* * 把Redis中 online_user数据存入t_device_redis表中 * */ - @Scheduled(cron = "0/30 * * * * ?") +// @Scheduled(cron = "0/30 * * * * ?") public void handleDeviceRedis(){ List jlist = RedisUtils.searchAndGetKeysValues("online_users:*"); redisService.insertBatch(BeanUtil.copyToList(jlist, DeviceRedis.class)); } +// @Scheduled(cron = "0 0 0/1 * * ?") + public void handleDeviceInfoToRedis(){ + if (null == lastUpdateTime || "".equals(lastUpdateTime)){ + log.error("lastUpdateTime=null"); + } + TDeviceBo bo = new TDeviceBo(); + bo.setBeginTime(lastUpdateTime); + bo.setEndTime(DateUtil.formatDateTime(new Date())); + List list = deviceService.queryList(bo); + if (list.size() >0){ + lastUpdateTime = list.get(0).getUpdateTime(); + } + Map deviceInfoDataMap = new HashMap<>(); + for (TDeviceVo vo : list) { + String jsonValue = JSONUtil.toJsonStr(vo); + + String infoKey = "deviceInfo:" + vo.getInfoSource()+":"+vo.getDeviceCode(); + deviceInfoDataMap.put(infoKey, jsonValue); + + } + RedisUtils.batchInsert(deviceInfoDataMap,-1); + } + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java index 607c9f1b..8ee6e27d 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java @@ -25,6 +25,7 @@ import org.springframework.util.CollectionUtils; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * deviceService业务层处理 @@ -128,6 +129,7 @@ public class TDeviceServiceImpl implements ITDeviceService { lqw.eq(StringUtils.isNotBlank(bo.getRemark2()), TDevice::getRemark2, bo.getRemark2()); lqw.between(bo.getBeginTime() != null && bo.getEndTime() != null, TDevice::getUpdateTime, bo.getBeginTime(), bo.getEndTime()); + lqw.orderByDesc(TDevice::getUpdateTime); return lqw; } @@ -184,8 +186,44 @@ public class TDeviceServiceImpl implements ITDeviceService { } @Override - public Boolean batchSaveOrUpdate(List List) { - return baseMapper.insertOrUpdateBatch(List); + public Boolean batchSaveOrUpdate(List list) { + boolean flag = true; + // 先根据 field1 和 field2 查询出已存在的记录 + List existingEntities = baseMapper.selectList(new QueryWrapper() + .in("device_code", list.stream().map(TDevice::getDeviceCode).collect(Collectors.toList())) + .in("info_source", list.stream().map(TDevice::getInfoSource).collect(Collectors.toList()))); + + // 找到需要更新的记录 + List toUpdate = new ArrayList<>(); + // 找到需要插入的记录 + List toInsert = new ArrayList<>(); + + for (TDevice entity : list) { + boolean exists = false; + for (TDevice existingEntity : existingEntities) { + if (entity.getDeviceCode().equals(existingEntity.getDeviceCode()) && entity.getInfoSource().equals(existingEntity.getInfoSource())) { + entity.setId(existingEntity.getId()); // 设置 ID 以便更新 + toUpdate.add(entity); + exists = true; + break; + } + } + if (!exists) { + toInsert.add(entity); + } + } + + // 批量更新 + if (!toUpdate.isEmpty()) { + flag = baseMapper.updateBatchById(toUpdate); + } + + // 批量插入 + if (!toInsert.isEmpty()) { + flag = baseMapper.insertBatch(toInsert); // insertBatchSomeColumn 是 MyBatis-Plus 提供的批量插入方法 + } + return flag; +// return baseMapper.insertOrUpdateBatch(List); } @Override diff --git a/stwzhj-visual/stwzhj-nacos/src/main/resources/application.properties b/stwzhj-visual/stwzhj-nacos/src/main/resources/application.properties index 12560508..7d30f1ae 100644 --- a/stwzhj-visual/stwzhj-nacos/src/main/resources/application.properties +++ b/stwzhj-visual/stwzhj-nacos/src/main/resources/application.properties @@ -35,15 +35,14 @@ spring.application.name=ruoyi-nacos ### Deprecated configuration property, it is recommended to use `spring.sql.init.platform` replaced. # spring.datasource.platform=mysql nacos.plugin.datasource.log.enabled=false -spring.sql.init.platform=postgresql +spring.sql.init.platform=mysql ### Count of DB: db.num=1 ### Connect URL of DB: -db.url.0=jdbc:postgresql://localhost:5432/ypc-config?tcpKeepAlive=true&reWriteBatchedInserts=true&ApplicationName=ruoyi-nacos -db.user.0=postgres -db.password.0=ycgis -db.pool.config.driverClassName=org.postgresql.Driver +db.url.0=jdbc:mysql://127.0.0.1:3306/ry-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true +db.user.0=root +db.password.0=root ### the maximum retry times for push nacos.config.push.maxRetryTime=50