diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java index b906fcd5..dcd05131 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java @@ -52,21 +52,33 @@ public class ConsumerWorker { "auto.offset.reset:latest"}) public void consumer(ConsumerRecord record) { Object value = record.value(); - String topic = record.topic(); + EsGpsInfo esGpsInfo = JSONUtil.toBean((String) value, EsGpsInfo.class); + Date gpsTime = esGpsInfo.getGpsTime(); +// log.info("value={}",value); + if(Objects.isNull(gpsTime)){ + log.error("gpsTime == null,deviceCode={}",esGpsInfo.getDeviceCode()); + return; + } + String deviceType = esGpsInfo.getDeviceType(); + if(StringUtils.isBlank(deviceType)){ + log.error("deviceType is null, deviceCode={}",esGpsInfo.getDeviceCode()); + return; + } + if(DateUtil.between(gpsTime,new Date(), DateUnit.MINUTE) < 30){ + esGpsInfo.setOnline(1); + } - if ("jysb_dwxx".equals(topic)){ //定位信息 -// logger.info("offset={},topic={},value={}", record.offset(), topic,value); - luanrequest(value); - } else if ("jysb_sbxx".equals(topic)) { //基础信息 -// logger.info("offset={},topic={},value={}", record.offset(), topic,value); - baseDataRequest(value); + logger.info("esGpsInfo={}",esGpsInfo); + boolean offer = linkedBlockingDeque.offer(esGpsInfo); + R response = R.ok(offer); + if(Objects.isNull(response)){ + logger.info("response == null"); } } - private void luanrequest(Object value) { RemoteGpsInfo esGpsInfo; JSONObject jsonObject; diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java index e43afdd5..d168b763 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java @@ -40,24 +40,23 @@ public class DataInsertBatchHandler implements CommandLineRunner { public void run(String... args) throws Exception { ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque; //定位信息队列 - LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列 +// LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列 singleThreadExecutor.execute(new Runnable() { @Override public void run() { while (true) { try { List list = new ArrayList<>(); - List bases = new ArrayList<>(); +// List bases = new ArrayList<>(); Queues.drain(linkedBlockingDeque, list, 200, 5, TimeUnit.SECONDS); - Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS); +// Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS); log.info("batch size={}", list.size()); - log.info("basedata size={}", bases.size()); if(CollectionUtil.isNotEmpty(list)) { gpsService.saveDataBatch(list); } - if(CollectionUtil.isNotEmpty(bases)) { + /*if(CollectionUtil.isNotEmpty(bases)) { deviceService.batchSaveDevice(bases); - } + }*/ } catch (Exception e) { log.error("缓存队列批量消费异常:{}", e.getMessage()); } diff --git a/stwzhj-modules/wzhj-websocket/src/main/java/org/dromara/webscoket/exception/MyBusinessException.java b/stwzhj-modules/wzhj-websocket/src/main/java/org/dromara/webscoket/exception/MyBusinessException.java new file mode 100644 index 00000000..010009bc --- /dev/null +++ b/stwzhj-modules/wzhj-websocket/src/main/java/org/dromara/webscoket/exception/MyBusinessException.java @@ -0,0 +1,54 @@ +package org.dromara.webscoket.exception; + +/** + *

description:

+ * + * @author chenle + * @date 2021-06-07 10:56 + */ +public class MyBusinessException extends RuntimeException { + private static final long serialVersionUID = 1L; + + private String msg; + private int code = 500; + + public MyBusinessException(String msg) { + super(msg); + this.msg = msg; + } + + public MyBusinessException(String msg, Throwable e) { + super(msg, e); + this.msg = msg; + } + + public MyBusinessException(String msg, int code) { + super(msg); + this.msg = msg; + this.code = code; + } + + public MyBusinessException(String msg, int code, Throwable e) { + super(msg, e); + this.msg = msg; + this.code = code; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + +}