cosumer和websocket修改

ds-xuancheng
luyya 2025-06-27 11:05:53 +08:00
parent 86bf08da8c
commit 7553cd9d56
3 changed files with 79 additions and 14 deletions

View File

@ -52,21 +52,33 @@ public class ConsumerWorker {
"auto.offset.reset:latest"})
public void consumer(ConsumerRecord<String,Object> record) {
Object value = record.value();
String topic = record.topic();
EsGpsInfo esGpsInfo = JSONUtil.toBean((String) value, EsGpsInfo.class);
Date gpsTime = esGpsInfo.getGpsTime();
// log.info("value={}",value);
if(Objects.isNull(gpsTime)){
log.error("gpsTime == null,deviceCode={}",esGpsInfo.getDeviceCode());
return;
}
String deviceType = esGpsInfo.getDeviceType();
if(StringUtils.isBlank(deviceType)){
log.error("deviceType is null, deviceCode={}",esGpsInfo.getDeviceCode());
return;
}
if(DateUtil.between(gpsTime,new Date(), DateUnit.MINUTE) < 30){
esGpsInfo.setOnline(1);
}
if ("jysb_dwxx".equals(topic)){ //定位信息
// logger.info("offset={},topic={},value={}", record.offset(), topic,value);
luanrequest(value);
} else if ("jysb_sbxx".equals(topic)) { //基础信息
// logger.info("offset={},topic={},value={}", record.offset(), topic,value);
baseDataRequest(value);
logger.info("esGpsInfo={}",esGpsInfo);
boolean offer = linkedBlockingDeque.offer(esGpsInfo);
R response = R.ok(offer);
if(Objects.isNull(response)){
logger.info("response == null");
}
}
private void luanrequest(Object value) {
RemoteGpsInfo esGpsInfo;
JSONObject jsonObject;

View File

@ -40,24 +40,23 @@ public class DataInsertBatchHandler implements CommandLineRunner {
public void run(String... args) throws Exception {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque; //定位信息队列
LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列
// LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
List<RemoteGpsInfo> list = new ArrayList<>();
List<RemoteDeviceBo> bases = new ArrayList<>();
// List<RemoteDeviceBo> bases = new ArrayList<>();
Queues.drain(linkedBlockingDeque, list, 200, 5, TimeUnit.SECONDS);
Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS);
// Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS);
log.info("batch size={}", list.size());
log.info("basedata size={}", bases.size());
if(CollectionUtil.isNotEmpty(list)) {
gpsService.saveDataBatch(list);
}
if(CollectionUtil.isNotEmpty(bases)) {
/*if(CollectionUtil.isNotEmpty(bases)) {
deviceService.batchSaveDevice(bases);
}
}*/
} catch (Exception e) {
log.error("缓存队列批量消费异常:{}", e.getMessage());
}

View File

@ -0,0 +1,54 @@
package org.dromara.webscoket.exception;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-06-07 10:56
*/
public class MyBusinessException extends RuntimeException {
private static final long serialVersionUID = 1L;
private String msg;
private int code = 500;
public MyBusinessException(String msg) {
super(msg);
this.msg = msg;
}
public MyBusinessException(String msg, Throwable e) {
super(msg, e);
this.msg = msg;
}
public MyBusinessException(String msg, int code) {
super(msg);
this.msg = msg;
this.code = code;
}
public MyBusinessException(String msg, int code, Throwable e) {
super(msg, e);
this.msg = msg;
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
}