Compare commits
3 Commits
a7ff8bb742
...
3d59d61a04
| Author | SHA1 | Date |
|---|---|---|
|
|
3d59d61a04 | |
|
|
d02dca81c0 | |
|
|
bc47f53306 |
|
|
@ -87,6 +87,7 @@ public class ExcelUtil {
|
||||||
ServletOutputStream os = response.getOutputStream();
|
ServletOutputStream os = response.getOutputStream();
|
||||||
exportExcel(list, sheetName, clazz, false, os, null);
|
exportExcel(list, sheetName, clazz, false, os, null);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
throw new RuntimeException("导出Excel异常");
|
throw new RuntimeException("导出Excel异常");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,8 +25,8 @@ public class EsGpsInfo implements Serializable {
|
||||||
* 类型
|
* 类型
|
||||||
*/
|
*/
|
||||||
private String deviceType;
|
private String deviceType;
|
||||||
private String latitude;
|
private String lat;
|
||||||
private String longitude;
|
private String lng;
|
||||||
//方向
|
//方向
|
||||||
private String orientation;
|
private String orientation;
|
||||||
//高程
|
//高程
|
||||||
|
|
|
||||||
|
|
@ -47,8 +47,11 @@ public class ConsumerWorker {
|
||||||
|
|
||||||
public static LinkedBlockingDeque basedataDeque = new LinkedBlockingDeque<>(5000);
|
public static LinkedBlockingDeque basedataDeque = new LinkedBlockingDeque<>(5000);
|
||||||
|
|
||||||
|
@DubboReference
|
||||||
|
private RemoteDataToEsService gpsService;
|
||||||
|
|
||||||
@KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}",properties = {
|
|
||||||
|
@KafkaListener(topics = "${spring.kafka.consumer.topics}",properties = {
|
||||||
"auto.offset.reset:latest"})
|
"auto.offset.reset:latest"})
|
||||||
public void consumer(ConsumerRecord<String,Object> record) {
|
public void consumer(ConsumerRecord<String,Object> record) {
|
||||||
Object value = record.value();
|
Object value = record.value();
|
||||||
|
|
@ -69,11 +72,19 @@ public class ConsumerWorker {
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("esGpsInfo={}",esGpsInfo);
|
logger.info("esGpsInfo={}",esGpsInfo);
|
||||||
boolean offer = linkedBlockingDeque.offer(esGpsInfo);
|
try {
|
||||||
R response = R.ok(offer);
|
R r = gpsService.saveData(BeanUtil.toBean(esGpsInfo, RemoteGpsInfo.class) );
|
||||||
if(Objects.isNull(response)){
|
if(Objects.isNull(r)){
|
||||||
logger.info("response == null");
|
logger.error("response == null");
|
||||||
|
}else {
|
||||||
|
logger.info(r.getMsg());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
// boolean offer = linkedBlockingDeque.offer(esGpsInfo);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,24 +1,40 @@
|
||||||
package org.dromara.data2es.config;
|
package org.dromara.data2es.config;
|
||||||
|
|
||||||
import org.dromara.data2es.handler.RedisExpireListener;
|
import org.dromara.data2es.handler.RedisExpireListener;
|
||||||
|
import org.dromara.data2es.handler.RedisExpireRecoveryHandler;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||||
|
import org.springframework.data.redis.listener.PatternTopic;
|
||||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
public class RedisListenerConfig {
|
public class RedisListenerConfig {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
|
RedisMessageListenerContainer listenerContainer(
|
||||||
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
|
RedisConnectionFactory connectionFactory,
|
||||||
listenerContainer.setConnectionFactory(connectionFactory);
|
RedisExpireRecoveryHandler recoveryHandler) {
|
||||||
return listenerContainer;
|
|
||||||
|
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||||
|
container.setConnectionFactory(connectionFactory);
|
||||||
|
|
||||||
|
// 添加连接监听器用于故障转移恢复
|
||||||
|
container.addMessageListener(recoveryHandler, new PatternTopic("__keyspace@*__:expired"));
|
||||||
|
return container;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
|
KeyExpirationEventMessageListener redisKeyExpirationListener(
|
||||||
return new RedisExpireListener(listenerContainer);
|
RedisMessageListenerContainer listenerContainer,
|
||||||
|
RedisExpireRecoveryHandler recoveryHandler) {
|
||||||
|
|
||||||
|
return new RedisExpireListener(listenerContainer, recoveryHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
RedisExpireRecoveryHandler redisExpireRecoveryHandler() {
|
||||||
|
return new RedisExpireRecoveryHandler();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,88 +8,156 @@ import org.dromara.common.core.utils.RedisConstants;
|
||||||
import org.dromara.common.redis.utils.RedisUtils;
|
import org.dromara.common.redis.utils.RedisUtils;
|
||||||
import org.dromara.data2es.controller.DataToEsController;
|
import org.dromara.data2es.controller.DataToEsController;
|
||||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||||
|
import org.redisson.Redisson;
|
||||||
import org.redisson.api.RLock;
|
import org.redisson.api.RLock;
|
||||||
|
import org.redisson.api.RTopic;
|
||||||
import org.redisson.api.RedissonClient;
|
import org.redisson.api.RedissonClient;
|
||||||
import org.slf4j.Logger;
|
import org.redisson.connection.ConnectionListener;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.redisson.connection.ConnectionManager;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
|
||||||
import org.springframework.data.redis.connection.Message;
|
import org.springframework.data.redis.connection.Message;
|
||||||
import org.springframework.data.redis.connection.MessageListener;
|
|
||||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Date;
|
import javax.annotation.PostConstruct;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>description: </p>
|
|
||||||
*
|
|
||||||
* @author chenle
|
|
||||||
* @date 2021-11-08 16:40
|
|
||||||
*/
|
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RedisExpireListener extends KeyExpirationEventMessageListener {
|
public class RedisExpireListener extends KeyExpirationEventMessageListener {
|
||||||
|
|
||||||
|
private final RedisExpireRecoveryHandler recoveryHandler;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
DataToEsController dataToEsController;
|
DataToEsController dataToEsController;
|
||||||
|
private volatile boolean active = true;
|
||||||
|
|
||||||
Logger logger = LoggerFactory.getLogger(RedisExpireListener.class);
|
public RedisExpireListener(
|
||||||
|
RedisMessageListenerContainer listenerContainer,
|
||||||
|
RedisExpireRecoveryHandler recoveryHandler) {
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
|
|
||||||
*
|
|
||||||
* @param listenerContainer must not be {@literal null}.
|
|
||||||
*/
|
|
||||||
public RedisExpireListener(RedisMessageListenerContainer listenerContainer) {
|
|
||||||
super(listenerContainer);
|
super(listenerContainer);
|
||||||
|
this.recoveryHandler = recoveryHandler;
|
||||||
|
recoveryHandler.registerListener(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
try {
|
||||||
|
super.init();
|
||||||
|
log.info("Redis过期监听器初始化成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("监听器初始化失败", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reconnect() {
|
||||||
|
if (!active) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
log.info("尝试重新注册过期事件监听器...");
|
||||||
|
// 停止当前监听
|
||||||
|
super.destroy();
|
||||||
|
// 重新初始化
|
||||||
|
super.init();
|
||||||
|
log.info("过期事件监听器重新注册成功");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("重新注册监听器失败", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Message message, byte[] pattern) {
|
public void onMessage(Message message, byte[] pattern) {
|
||||||
|
if (!active) return;
|
||||||
|
|
||||||
String expireKey = message.toString();
|
String expireKey = message.toString();
|
||||||
if(StringUtils.isNotEmpty(expireKey) &&
|
log.info("过期的Key={}", expireKey);
|
||||||
expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)){
|
|
||||||
|
if (StringUtils.isNotEmpty(expireKey) &&
|
||||||
|
expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)) {
|
||||||
|
|
||||||
|
log.info("在线定位过期的Key={}", expireKey);
|
||||||
handleExpiredEvent(expireKey);
|
handleExpiredEvent(expireKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleExpiredEvent(String expiredKey) {
|
private void handleExpiredEvent(String expiredKey) {
|
||||||
RedissonClient redisson = RedisUtils.getClient();
|
RedissonClient redisson = RedisUtils.getClient();
|
||||||
RLock lock = redisson.getLock("LOCK:" + expiredKey);
|
RLock lock = redisson.getLock("LOCK:" + expiredKey);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
|
if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
|
||||||
// 实际业务逻辑
|
// 实际业务逻辑
|
||||||
String[] split = expiredKey.split(":");
|
String[] split = expiredKey.split(":");
|
||||||
|
|
||||||
String deviceType = split[2];
|
String deviceType = split[2];
|
||||||
String deviceCode = split[3];
|
String deviceCode = split[3];
|
||||||
if ("5".equals(deviceType) || "9".equals(deviceType) || "8".equals(deviceType) || "7".equals(deviceType)){
|
|
||||||
|
if ("5".equals(deviceType) || "9".equals(deviceType) ||
|
||||||
|
"8".equals(deviceType) || "7".equals(deviceType)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
log.error("redis key expired:key={}",expiredKey);
|
|
||||||
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType+":"+deviceCode);
|
log.info("处理过期Key: {}", expiredKey);
|
||||||
|
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode);
|
||||||
|
|
||||||
if (Objects.isNull(object)) {
|
if (Objects.isNull(object)) {
|
||||||
log.info("redis key={},Object=null,deviceType={},deviceCode={}", expiredKey,deviceType,deviceCode);
|
log.info("redis key={},Object=null,deviceType={},deviceCode={}",
|
||||||
|
expiredKey, deviceType, deviceCode);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class);
|
EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class);
|
||||||
gpsInfo.setOnline(0);
|
gpsInfo.setOnline(0);
|
||||||
dataToEsController.saveGpsInfo(gpsInfo);
|
dataToEsController.saveGpsInfo(gpsInfo);
|
||||||
log.info("redis key expired:key={}", expiredKey);
|
log.info("处理完成: key={}", expiredKey);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
Thread.currentThread().interrupt();
|
||||||
|
log.error("处理过期事件被中断", e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理过期事件异常", e);
|
||||||
} finally {
|
} finally {
|
||||||
// 仅在当前线程持有锁时释放
|
|
||||||
if (lock.isHeldByCurrentThread()) {
|
if (lock.isHeldByCurrentThread()) {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
active = false;
|
||||||
|
try {
|
||||||
|
super.destroy();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
log.info("Redis过期监听器已停止");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 添加连接状态监听(使用Redisson事件总线)
|
||||||
|
@PostConstruct
|
||||||
|
public void addSentinelConnectionListener() {
|
||||||
|
try {
|
||||||
|
RedissonClient redisson = RedisUtils.getClient();
|
||||||
|
|
||||||
|
// 订阅Redisson连接事件
|
||||||
|
RTopic connectionEvents = redisson.getTopic("__redisson_connection_event");
|
||||||
|
connectionEvents.addListener(String.class, (channel, msg) -> {
|
||||||
|
if ("CONNECTED".equals(msg)) {
|
||||||
|
log.info("Redis连接已建立: {}", msg);
|
||||||
|
// 标记需要恢复监听
|
||||||
|
recoveryHandler.markReconnected();
|
||||||
|
} else if ("DISCONNECTED".equals(msg)) {
|
||||||
|
log.warn("Redis连接断开: {}", msg);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("已注册Redisson连接事件监听器");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("无法添加Redisson连接事件监听器", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
package org.dromara.data2es.handler;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.data.redis.connection.Message;
|
||||||
|
import org.springframework.data.redis.connection.MessageListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class RedisExpireRecoveryHandler implements MessageListener {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(RedisExpireRecoveryHandler.class);
|
||||||
|
|
||||||
|
private final AtomicBoolean reconnected = new AtomicBoolean(false);
|
||||||
|
private final AtomicReference<RedisExpireListener> listenerRef = new AtomicReference<>();
|
||||||
|
|
||||||
|
public void registerListener(RedisExpireListener listener) {
|
||||||
|
this.listenerRef.set(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message, byte[] pattern) {
|
||||||
|
// 检测到任何事件时,检查是否需要恢复监听
|
||||||
|
if (reconnected.compareAndSet(true, false) && listenerRef.get() != null) {
|
||||||
|
log.warn("检测到Redis事件,尝试重新注册主监听器...");
|
||||||
|
listenerRef.get().reconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void markReconnected() {
|
||||||
|
reconnected.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -151,7 +151,7 @@ public class RequestHandler {
|
||||||
|
|
||||||
long betweenS = DateUtil.between(gpsTime, new Date(), DateUnit.SECOND);
|
long betweenS = DateUtil.between(gpsTime, new Date(), DateUnit.SECOND);
|
||||||
//过期时间应该是20分钟减去设备定位时间与当前时间的差值,比如设备已经比当前时间晚15分钟了,那么设备的过期时间应该只剩5分钟了
|
//过期时间应该是20分钟减去设备定位时间与当前时间的差值,比如设备已经比当前时间晚15分钟了,那么设备的过期时间应该只剩5分钟了
|
||||||
long onlineTime = 60*10 - betweenS;
|
long onlineTime = 60*30 - betweenS;
|
||||||
|
|
||||||
String deviceCode = esGpsInfoVo2.getDeviceCode();
|
String deviceCode = esGpsInfoVo2.getDeviceCode();
|
||||||
String deviceType = esGpsInfoVo2.getDeviceType();
|
String deviceType = esGpsInfoVo2.getDeviceType();
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import lombok.RequiredArgsConstructor;
|
||||||
import jakarta.servlet.http.HttpServletResponse;
|
import jakarta.servlet.http.HttpServletResponse;
|
||||||
import jakarta.validation.constraints.*;
|
import jakarta.validation.constraints.*;
|
||||||
import cn.dev33.satoken.annotation.SaCheckPermission;
|
import cn.dev33.satoken.annotation.SaCheckPermission;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.dromara.common.excel.core.ExcelResult;
|
import org.dromara.common.excel.core.ExcelResult;
|
||||||
import org.dromara.system.domain.vo.SysUserImportVo;
|
import org.dromara.system.domain.vo.SysUserImportVo;
|
||||||
import org.dromara.system.domain.vo.TDeviceExportVo;
|
import org.dromara.system.domain.vo.TDeviceExportVo;
|
||||||
|
|
@ -44,6 +45,7 @@ import org.springframework.web.multipart.MultipartFile;
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/device")
|
@RequestMapping("/device")
|
||||||
|
@Slf4j
|
||||||
public class TDeviceController extends BaseController {
|
public class TDeviceController extends BaseController {
|
||||||
|
|
||||||
private final ITDeviceService tDeviceService;
|
private final ITDeviceService tDeviceService;
|
||||||
|
|
@ -76,6 +78,7 @@ public class TDeviceController extends BaseController {
|
||||||
@PostMapping("/export")
|
@PostMapping("/export")
|
||||||
public void export(TDeviceBo bo, HttpServletResponse response) {
|
public void export(TDeviceBo bo, HttpServletResponse response) {
|
||||||
List<TDeviceExportVo> list = tDeviceService.selectDeviceExportList(bo);
|
List<TDeviceExportVo> list = tDeviceService.selectDeviceExportList(bo);
|
||||||
|
log.info("进入了导出方法");
|
||||||
ExcelUtil.exportExcel(list, "device", TDeviceExportVo.class, response);
|
ExcelUtil.exportExcel(list, "device", TDeviceExportVo.class, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -76,7 +76,6 @@ public class TDeviceServiceImpl implements ITDeviceService {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public TableDataInfo<TDeviceVo> queryPageList(TDeviceBo bo, PageQuery pageQuery) {
|
public TableDataInfo<TDeviceVo> queryPageList(TDeviceBo bo, PageQuery pageQuery) {
|
||||||
bo.setValid(1);
|
|
||||||
LambdaQueryWrapper<TDevice> lqw = buildQueryWrapper(bo);
|
LambdaQueryWrapper<TDevice> lqw = buildQueryWrapper(bo);
|
||||||
Page<TDeviceVo> result = baseMapper.selectPageDevicetList(pageQuery.build(), lqw);
|
Page<TDeviceVo> result = baseMapper.selectPageDevicetList(pageQuery.build(), lqw);
|
||||||
List<TDeviceVo> list = result.getRecords();
|
List<TDeviceVo> list = result.getRecords();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue