From 3fd5bc2f17677d37649a602409f4fe2403126c4b Mon Sep 17 00:00:00 2001 From: luyya Date: Wed, 16 Jul 2025 20:02:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=A3=E5=9F=8E=E5=9F=B9=E8=AE=AD=E4=BC=9ABU?= =?UTF-8?q?G=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../data2es/config/RedisListenerConfig.java | 28 +++- .../data2es/handler/RedisExpireListener.java | 131 +++++++++++++----- .../handler/RedisExpireRecoveryHandler.java | 36 +++++ .../system/domain/vo/TDeviceExportVo.java | 5 +- .../system/domain/vo/TDeviceImportVo.java | 5 +- .../service/impl/TDeviceServiceImpl.java | 6 +- .../resources/mapper/system/TDeviceMapper.xml | 2 +- 7 files changed, 170 insertions(+), 43 deletions(-) create mode 100644 stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java index 586af34e..de22c389 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java @@ -1,24 +1,40 @@ package org.dromara.data2es.config; import org.dromara.data2es.handler.RedisExpireListener; +import org.dromara.data2es.handler.RedisExpireRecoveryHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @Configuration public class RedisListenerConfig { @Bean - RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) { - RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); - listenerContainer.setConnectionFactory(connectionFactory); - return listenerContainer; + RedisMessageListenerContainer listenerContainer( + RedisConnectionFactory connectionFactory, + RedisExpireRecoveryHandler recoveryHandler) { + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + + // 添加连接监听器用于故障转移恢复 + container.addMessageListener(recoveryHandler, new PatternTopic("__keyspace@*__:expired")); + return container; } @Bean - KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { - return new RedisExpireListener(listenerContainer); + KeyExpirationEventMessageListener redisKeyExpirationListener( + RedisMessageListenerContainer listenerContainer, + RedisExpireRecoveryHandler recoveryHandler) { + + return new RedisExpireListener(listenerContainer, recoveryHandler); + } + + @Bean + RedisExpireRecoveryHandler redisExpireRecoveryHandler() { + return new RedisExpireRecoveryHandler(); } } diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java index b1ba8fdc..4edebeaa 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java @@ -1,95 +1,164 @@ package org.dromara.data2es.handler; + import cn.hutool.core.bean.BeanUtil; import cn.hutool.json.JSONObject; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; import org.dromara.common.core.utils.RedisConstants; +import org.dromara.common.core.utils.StringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.controller.DataToEsController; import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.redisson.Redisson; import org.redisson.api.RLock; +import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.redisson.connection.ConnectionListener; +import org.redisson.connection.ConnectionManager; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; -import java.util.Date; +import javax.annotation.PostConstruct; +import java.net.InetSocketAddress; import java.util.Objects; import java.util.concurrent.TimeUnit; -/** - *

description:

- * - * @author chenle - * @date 2021-11-08 16:40 - */ @Component @Slf4j public class RedisExpireListener extends KeyExpirationEventMessageListener { + private final RedisExpireRecoveryHandler recoveryHandler; + @Autowired DataToEsController dataToEsController; + private volatile boolean active = true; + public RedisExpireListener( + RedisMessageListenerContainer listenerContainer, + RedisExpireRecoveryHandler recoveryHandler) { - /** - * Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages. - * - * @param listenerContainer must not be {@literal null}. - */ - public RedisExpireListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); + this.recoveryHandler = recoveryHandler; + recoveryHandler.registerListener(this); + } + + @Override + public void init() { + try { + super.init(); + log.info("Redis过期监听器初始化成功"); + } catch (Exception e) { + log.error("监听器初始化失败", e); + } + } + + public void reconnect() { + if (!active) return; + + try { + log.info("尝试重新注册过期事件监听器..."); + // 停止当前监听 + super.destroy(); + // 重新初始化 + super.init(); + log.info("过期事件监听器重新注册成功"); + } catch (Exception e) { + log.error("重新注册监听器失败", e); + } } @Override public void onMessage(Message message, byte[] pattern) { + if (!active) return; + String expireKey = message.toString(); - log.info("过期的Key={}",expireKey); - if(StringUtils.isNotEmpty(expireKey) && - expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)){ - log.info("在线定位过期的Key={}",expireKey); + log.info("过期的Key={}", expireKey); + + if (StringUtils.isNotEmpty(expireKey) && + expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)) { + + log.info("在线定位过期的Key={}", expireKey); handleExpiredEvent(expireKey); } } - private void handleExpiredEvent(String expiredKey) { + private void handleExpiredEvent(String expiredKey) { RedissonClient redisson = RedisUtils.getClient(); RLock lock = redisson.getLock("LOCK:" + expiredKey); + try { if (lock.tryLock(0, 30, TimeUnit.SECONDS)) { // 实际业务逻辑 String[] split = expiredKey.split(":"); - String deviceType = split[2]; String deviceCode = split[3]; - if ("5".equals(deviceType) || "9".equals(deviceType) || "8".equals(deviceType) || "7".equals(deviceType)){ + + if ("5".equals(deviceType) || "9".equals(deviceType) || + "8".equals(deviceType) || "7".equals(deviceType)) { return; } - log.error("redis key expired:key={}",expiredKey); - JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType+":"+deviceCode); + + log.info("处理过期Key: {}", expiredKey); + JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode); + if (Objects.isNull(object)) { - log.info("redis key={},Object=null,deviceType={},deviceCode={}", expiredKey,deviceType,deviceCode); + log.info("redis key={},Object=null,deviceType={},deviceCode={}", + expiredKey, deviceType, deviceCode); return; } + EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class); gpsInfo.setOnline(0); dataToEsController.saveGpsInfo(gpsInfo); - log.info("redis key expired:key={}", expiredKey); + log.info("处理完成: key={}", expiredKey); } } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); + log.error("处理过期事件被中断", e); + } catch (Exception e) { + log.error("处理过期事件异常", e); } finally { - // 仅在当前线程持有锁时释放 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } + @Override + public void destroy() { + active = false; + try { + super.destroy(); + } catch (Exception e) { + throw new RuntimeException(e); + } + log.info("Redis过期监听器已停止"); + } + + // 添加连接状态监听(使用Redisson事件总线) + @PostConstruct + public void addSentinelConnectionListener() { + try { + RedissonClient redisson = RedisUtils.getClient(); + + // 订阅Redisson连接事件 + RTopic connectionEvents = redisson.getTopic("__redisson_connection_event"); + connectionEvents.addListener(String.class, (channel, msg) -> { + if ("CONNECTED".equals(msg)) { + log.info("Redis连接已建立: {}", msg); + // 标记需要恢复监听 + recoveryHandler.markReconnected(); + } else if ("DISCONNECTED".equals(msg)) { + log.warn("Redis连接断开: {}", msg); + } + }); + + log.info("已注册Redisson连接事件监听器"); + } catch (Exception e) { + log.warn("无法添加Redisson连接事件监听器", e); + } + } } diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java new file mode 100644 index 00000000..81bc87a0 --- /dev/null +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java @@ -0,0 +1,36 @@ +package org.dromara.data2es.handler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +@Component +public class RedisExpireRecoveryHandler implements MessageListener { + + private static final Logger log = LoggerFactory.getLogger(RedisExpireRecoveryHandler.class); + + private final AtomicBoolean reconnected = new AtomicBoolean(false); + private final AtomicReference listenerRef = new AtomicReference<>(); + + public void registerListener(RedisExpireListener listener) { + this.listenerRef.set(listener); + } + + @Override + public void onMessage(Message message, byte[] pattern) { + // 检测到任何事件时,检查是否需要恢复监听 + if (reconnected.compareAndSet(true, false) && listenerRef.get() != null) { + log.warn("检测到Redis事件,尝试重新注册主监听器..."); + listenerRef.get().reconnect(); + } + } + + public void markReconnected() { + reconnected.set(true); + } +} diff --git a/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceExportVo.java b/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceExportVo.java index a611044c..22793e2c 100644 --- a/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceExportVo.java +++ b/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceExportVo.java @@ -16,7 +16,7 @@ public class TDeviceExportVo implements Serializable { @Serial private static final long serialVersionUID = 1L; - @ExcelProperty(value = "设备编号") + @ExcelProperty(value = "设备编码") private String deviceCode; /** @@ -65,6 +65,9 @@ public class TDeviceExportVo implements Serializable { @ExcelProperty(value = "证件号码", converter = ExcelDictConvert.class) private String cardNum; + @ExcelProperty(value = "设备名称") + private String deviceName; + /** * 0无效,1有效 */ diff --git a/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceImportVo.java b/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceImportVo.java index 69b1ecc8..70fe44fa 100644 --- a/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceImportVo.java +++ b/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceImportVo.java @@ -16,7 +16,7 @@ public class TDeviceImportVo implements Serializable { @Serial private static final long serialVersionUID = 1L; - @ExcelProperty(value = "设备编号") + @ExcelProperty(value = "设备编码") private String deviceCode; /** @@ -65,6 +65,9 @@ public class TDeviceImportVo implements Serializable { @ExcelProperty(value = "证件号码") private String cardNum; + @ExcelProperty(value = "设备名称") + private String deviceName; + /** * 0无效,1有效 */ diff --git a/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java b/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java index a3ee4f3b..cffe24c0 100644 --- a/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java +++ b/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java @@ -81,11 +81,11 @@ public class TDeviceServiceImpl implements ITDeviceService { */ @Override public TableDataInfo queryPageList(TDeviceBo bo, PageQuery pageQuery) { - bo.setValid(1); +// bo.setValid(1); LambdaQueryWrapper lqw = buildQueryWrapper(bo); Page result = baseMapper.selectPageDevicetList(pageQuery.build(), lqw); List list = result.getRecords(); - for (TDeviceVo vo : list) { + /*for (TDeviceVo vo : list) { if ("".equals(vo.getPoliceName()) || null == vo.getPoliceName()){ vo.setDeviceName(vo.getCarNum()); } @@ -98,7 +98,7 @@ public class TDeviceServiceImpl implements ITDeviceService { } - } + }*/ return TableDataInfo.build(result); } diff --git a/stwzhj-modules/wzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml b/stwzhj-modules/wzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml index 23e996b7..c4df6384 100644 --- a/stwzhj-modules/wzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml +++ b/stwzhj-modules/wzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml @@ -21,7 +21,7 @@