diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/AsyncConfig.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/AsyncConfig.java index ecdfc330..afae926d 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/AsyncConfig.java +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/AsyncConfig.java @@ -27,7 +27,7 @@ public class AsyncConfig { taskExecutor.setMaxPoolSize(20); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); - taskExecutor.setThreadNamePrefix("hfapp--kafkaConsumer--"); + taskExecutor.setThreadNamePrefix("wzhj--kafkaConsumer--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java index 376bd816..e5c90542 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java @@ -15,7 +15,6 @@ import java.util.Date; @Data public class EsGpsInfo implements Serializable { - private static final long serialVersionUID = 7455495841680488351L; /** * 唯一码(外部系统)合肥版本不需要 21位id, * 到时候上传省厅的时候 需要在kafka发送端处理,生成一个省厅需要的21位id @@ -25,8 +24,8 @@ public class EsGpsInfo implements Serializable { * 类型 */ private String deviceType; - private String latitude; - private String longitude; + private String lat; + private String lng; //方向 private String orientation; //高程 diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java index dcd05131..bf8b17f6 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java @@ -19,9 +19,11 @@ import org.dromara.data2es.api.RemoteDataToEsService; import org.dromara.data2es.api.domain.RemoteGpsInfo; import org.dromara.kafka.consumer.entity.EsGpsInfo; import org.dromara.kafka.consumer.entity.EsGpsInfoVO; +import org.dromara.kafka.consumer.util.KafkaAsyncUtil; import org.dromara.system.api.domain.bo.RemoteDeviceBo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; @@ -47,6 +49,9 @@ public class ConsumerWorker { public static LinkedBlockingDeque basedataDeque = new LinkedBlockingDeque<>(5000); + @Autowired + private KafkaAsyncUtil asyncUtils; + @KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}",properties = { "auto.offset.reset:latest"}) @@ -54,7 +59,7 @@ public class ConsumerWorker { Object value = record.value(); EsGpsInfo esGpsInfo = JSONUtil.toBean((String) value, EsGpsInfo.class); Date gpsTime = esGpsInfo.getGpsTime(); -// log.info("value={}",value); + log.info("value={}",value); if(Objects.isNull(gpsTime)){ log.error("gpsTime == null,deviceCode={}",esGpsInfo.getDeviceCode()); return; @@ -69,11 +74,13 @@ public class ConsumerWorker { } logger.info("esGpsInfo={}",esGpsInfo); - boolean offer = linkedBlockingDeque.offer(esGpsInfo); - R response = R.ok(offer); - if(Objects.isNull(response)){ - logger.info("response == null"); + RemoteGpsInfo gpsInfo = BeanUtil.toBean(esGpsInfo, RemoteGpsInfo.class); + try { + asyncUtils.saveData(gpsInfo); + } catch (Exception e) { + e.printStackTrace(); } +// boolean offer = linkedBlockingDeque.offer(esGpsInfo); } diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java index d168b763..e99a0d2c 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java @@ -33,8 +33,6 @@ public class DataInsertBatchHandler implements CommandLineRunner { @DubboReference private RemoteDataToEsService gpsService; - @DubboReference - private RemoteDeviceService deviceService; @Override public void run(String... args) throws Exception { diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/util/KafkaAsyncUtil.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/util/KafkaAsyncUtil.java new file mode 100644 index 00000000..aeda2090 --- /dev/null +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/util/KafkaAsyncUtil.java @@ -0,0 +1,37 @@ +package org.dromara.kafka.consumer.util; + +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import org.apache.dubbo.config.annotation.DubboReference; +import org.dromara.common.core.domain.R; +import org.dromara.data2es.api.RemoteDataToEsService; +import org.dromara.data2es.api.domain.RemoteGpsInfo; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.stereotype.Component; + +import java.text.ParseException; +import java.util.Date; + +@EnableAsync +@Component +public class KafkaAsyncUtil { + + @DubboReference + RemoteDataToEsService dataEsService; + + /** + * 保存data + * @param bytes + * @throws ParseException + */ + @Async(value = "taskExecutor") + public void saveData(RemoteGpsInfo esGpsInfo) throws Exception { + //logger.info("当前线程名={}",Thread.currentThread().getName()); + //和redis过期监听时间一定要一致 + + R response = dataEsService.saveData(esGpsInfo); + //logger.error("位置信息接口={},失败信息={},失败设备={}",response.getCode(),response.getMessage(),esGpsInfo.getDeviceId()); + + } +} diff --git a/stwzhj-modules/wzhj-consumer/src/main/resources/application.yml b/stwzhj-modules/wzhj-consumer/src/main/resources/application.yml index d7fda697..e50c04f4 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/resources/application.yml +++ b/stwzhj-modules/wzhj-consumer/src/main/resources/application.yml @@ -1,12 +1,12 @@ # Tomcat server: - port: 9214 + port: 9114 # Spring spring: application: # 应用名称 - name: stwzhj-consumer + name: wzhj-consumer profiles: # 环境配置 active: @profiles.active@ @@ -30,4 +30,5 @@ spring: config: import: - optional:nacos:application-common.yml + - optional:nacos:datasource.yml - optional:nacos:${spring.application.name}.yml diff --git a/stwzhj-modules/wzhj-consumer/src/main/resources/logback-plus.xml b/stwzhj-modules/wzhj-consumer/src/main/resources/logback-plus.xml index efdad6ff..d71cb666 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/wzhj-consumer/src/main/resources/logback-plus.xml @@ -1,86 +1,49 @@ - - - - - - - - - - - - - - - - + + + + + + + + ${log.path}/info.${log.file}.log - - ERROR - - DENY - - ACCEPT + INFO + ACCEPT + DENY - - ${LOG_PATH}${LOG_FILE} - - UTF-8 - %date [%level] [%thread] %logger{60} [%file : %line] %msg%n - - ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz - 50MB - 20 + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} - - - - - - - Error - - - ${LOG_PATH}error.${LOG_FILE} - - - - ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz - - 50MB - 180 - - - - - UTF-8 - %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + - - - + + + diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/ElasticsearchConfig.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/ElasticsearchConfig.java index a41e0289..44217c19 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/ElasticsearchConfig.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/ElasticsearchConfig.java @@ -68,7 +68,7 @@ public class ElasticsearchConfig { RestClientBuilder builder = RestClient.builder(httpHost); // 设置用户名、密码 CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); -// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); // 连接延时配置 builder.setRequestConfigCallback(requestConfigBuilder -> { requestConfigBuilder.setConnectTimeout(connectTimeOut); diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java index 472ce1bd..30c4dacb 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java @@ -144,6 +144,9 @@ public class GpsServiceImpl implements IGpsService { public R saveData(EsGpsInfoVO2 info) throws ExecutionException, InterruptedException{ //设置地市zzjgdm info = getInfo(info); + if (null == info){ + return R.fail("暂无设备相关信息"); + } IndexRequest indexRequest = buildEsIndexRequest(info); //存es CompletableFuture esFuture = doRequest(info); diff --git a/stwzhj-modules/wzhj-data2es/src/main/resources/logback-plus.xml b/stwzhj-modules/wzhj-data2es/src/main/resources/logback-plus.xml index caaa3455..b144d683 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/wzhj-data2es/src/main/resources/logback-plus.xml @@ -1,28 +1,49 @@ - - - - - - + + + + + + + + ${log.path}/info.${log.file}.log + + INFO + ACCEPT + DENY + + + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + - ${console.log.pattern} - utf-8 + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + - - - - - - - - + + + + + diff --git a/stwzhj-modules/wzhj-extract/src/main/resources/logback-plus.xml b/stwzhj-modules/wzhj-extract/src/main/resources/logback-plus.xml index caaa3455..6b92c630 100644 --- a/stwzhj-modules/wzhj-extract/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/wzhj-extract/src/main/resources/logback-plus.xml @@ -1,28 +1,49 @@ - - - - - - + + + + + + + + ${log.path}/info.${log.file}.log + + INFO + ACCEPT + DENY + + + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + - ${console.log.pattern} - utf-8 + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + - - - - - - - - + + + + + diff --git a/stwzhj-modules/wzhj-location/src/main/resources/logback-plus.xml b/stwzhj-modules/wzhj-location/src/main/resources/logback-plus.xml index caaa3455..fcb89dcf 100644 --- a/stwzhj-modules/wzhj-location/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/wzhj-location/src/main/resources/logback-plus.xml @@ -1,28 +1,49 @@ - - - - - - + + + + + + + + ${log.path}/info.${log.file}.log + + INFO + ACCEPT + DENY + + + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + - ${console.log.pattern} - utf-8 + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + - - - - - - - - + + + + + diff --git a/stwzhj-modules/wzhj-system/src/main/resources/logback-plus.xml b/stwzhj-modules/wzhj-system/src/main/resources/logback-plus.xml index caaa3455..ef7fff34 100644 --- a/stwzhj-modules/wzhj-system/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/wzhj-system/src/main/resources/logback-plus.xml @@ -1,28 +1,49 @@ - - - - - - + + + + + + + + ${log.path}/info.${log.file}.log + + INFO + ACCEPT + DENY + + + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + - ${console.log.pattern} - utf-8 + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + - - - - - - - - + + + + + diff --git a/stwzhj-modules/wzhj-udp/src/main/java/org/dromara/udp/controller/OriginalUdpReceiver.java b/stwzhj-modules/wzhj-udp/src/main/java/org/dromara/udp/controller/OriginalUdpReceiver.java index f702b648..59601748 100644 --- a/stwzhj-modules/wzhj-udp/src/main/java/org/dromara/udp/controller/OriginalUdpReceiver.java +++ b/stwzhj-modules/wzhj-udp/src/main/java/org/dromara/udp/controller/OriginalUdpReceiver.java @@ -77,7 +77,7 @@ public class OriginalUdpReceiver implements ApplicationListener - - - - - - + + + + + + + + ${log.path}/info.${log.file}.log + + INFO + ACCEPT + DENY + + + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + - ${console.log.pattern} - utf-8 + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + - - - - - - - - + + + + +