diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java index 586af34e..de22c389 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java @@ -1,24 +1,40 @@ package org.dromara.data2es.config; import org.dromara.data2es.handler.RedisExpireListener; +import org.dromara.data2es.handler.RedisExpireRecoveryHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @Configuration public class RedisListenerConfig { @Bean - RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) { - RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); - listenerContainer.setConnectionFactory(connectionFactory); - return listenerContainer; + RedisMessageListenerContainer listenerContainer( + RedisConnectionFactory connectionFactory, + RedisExpireRecoveryHandler recoveryHandler) { + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + + // 添加连接监听器用于故障转移恢复 + container.addMessageListener(recoveryHandler, new PatternTopic("__keyspace@*__:expired")); + return container; } @Bean - KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { - return new RedisExpireListener(listenerContainer); + KeyExpirationEventMessageListener redisKeyExpirationListener( + RedisMessageListenerContainer listenerContainer, + RedisExpireRecoveryHandler recoveryHandler) { + + return new RedisExpireListener(listenerContainer, recoveryHandler); + } + + @Bean + RedisExpireRecoveryHandler redisExpireRecoveryHandler() { + return new RedisExpireRecoveryHandler(); } } diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java index b1ba8fdc..4edebeaa 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java @@ -1,95 +1,164 @@ package org.dromara.data2es.handler; + import cn.hutool.core.bean.BeanUtil; import cn.hutool.json.JSONObject; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; import org.dromara.common.core.utils.RedisConstants; +import org.dromara.common.core.utils.StringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.controller.DataToEsController; import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.redisson.Redisson; import org.redisson.api.RLock; +import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.redisson.connection.ConnectionListener; +import org.redisson.connection.ConnectionManager; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; -import java.util.Date; +import javax.annotation.PostConstruct; +import java.net.InetSocketAddress; import java.util.Objects; import java.util.concurrent.TimeUnit; -/** - *
description:
- * - * @author chenle - * @date 2021-11-08 16:40 - */ @Component @Slf4j public class RedisExpireListener extends KeyExpirationEventMessageListener { + private final RedisExpireRecoveryHandler recoveryHandler; + @Autowired DataToEsController dataToEsController; + private volatile boolean active = true; + public RedisExpireListener( + RedisMessageListenerContainer listenerContainer, + RedisExpireRecoveryHandler recoveryHandler) { - /** - * Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages. - * - * @param listenerContainer must not be {@literal null}. - */ - public RedisExpireListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); + this.recoveryHandler = recoveryHandler; + recoveryHandler.registerListener(this); + } + + @Override + public void init() { + try { + super.init(); + log.info("Redis过期监听器初始化成功"); + } catch (Exception e) { + log.error("监听器初始化失败", e); + } + } + + public void reconnect() { + if (!active) return; + + try { + log.info("尝试重新注册过期事件监听器..."); + // 停止当前监听 + super.destroy(); + // 重新初始化 + super.init(); + log.info("过期事件监听器重新注册成功"); + } catch (Exception e) { + log.error("重新注册监听器失败", e); + } } @Override public void onMessage(Message message, byte[] pattern) { + if (!active) return; + String expireKey = message.toString(); - log.info("过期的Key={}",expireKey); - if(StringUtils.isNotEmpty(expireKey) && - expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)){ - log.info("在线定位过期的Key={}",expireKey); + log.info("过期的Key={}", expireKey); + + if (StringUtils.isNotEmpty(expireKey) && + expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)) { + + log.info("在线定位过期的Key={}", expireKey); handleExpiredEvent(expireKey); } } - private void handleExpiredEvent(String expiredKey) { + private void handleExpiredEvent(String expiredKey) { RedissonClient redisson = RedisUtils.getClient(); RLock lock = redisson.getLock("LOCK:" + expiredKey); + try { if (lock.tryLock(0, 30, TimeUnit.SECONDS)) { // 实际业务逻辑 String[] split = expiredKey.split(":"); - String deviceType = split[2]; String deviceCode = split[3]; - if ("5".equals(deviceType) || "9".equals(deviceType) || "8".equals(deviceType) || "7".equals(deviceType)){ + + if ("5".equals(deviceType) || "9".equals(deviceType) || + "8".equals(deviceType) || "7".equals(deviceType)) { return; } - log.error("redis key expired:key={}",expiredKey); - JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType+":"+deviceCode); + + log.info("处理过期Key: {}", expiredKey); + JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode); + if (Objects.isNull(object)) { - log.info("redis key={},Object=null,deviceType={},deviceCode={}", expiredKey,deviceType,deviceCode); + log.info("redis key={},Object=null,deviceType={},deviceCode={}", + expiredKey, deviceType, deviceCode); return; } + EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class); gpsInfo.setOnline(0); dataToEsController.saveGpsInfo(gpsInfo); - log.info("redis key expired:key={}", expiredKey); + log.info("处理完成: key={}", expiredKey); } } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); + log.error("处理过期事件被中断", e); + } catch (Exception e) { + log.error("处理过期事件异常", e); } finally { - // 仅在当前线程持有锁时释放 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } + @Override + public void destroy() { + active = false; + try { + super.destroy(); + } catch (Exception e) { + throw new RuntimeException(e); + } + log.info("Redis过期监听器已停止"); + } + + // 添加连接状态监听(使用Redisson事件总线) + @PostConstruct + public void addSentinelConnectionListener() { + try { + RedissonClient redisson = RedisUtils.getClient(); + + // 订阅Redisson连接事件 + RTopic connectionEvents = redisson.getTopic("__redisson_connection_event"); + connectionEvents.addListener(String.class, (channel, msg) -> { + if ("CONNECTED".equals(msg)) { + log.info("Redis连接已建立: {}", msg); + // 标记需要恢复监听 + recoveryHandler.markReconnected(); + } else if ("DISCONNECTED".equals(msg)) { + log.warn("Redis连接断开: {}", msg); + } + }); + + log.info("已注册Redisson连接事件监听器"); + } catch (Exception e) { + log.warn("无法添加Redisson连接事件监听器", e); + } + } } diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java new file mode 100644 index 00000000..81bc87a0 --- /dev/null +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java @@ -0,0 +1,36 @@ +package org.dromara.data2es.handler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +@Component +public class RedisExpireRecoveryHandler implements MessageListener { + + private static final Logger log = LoggerFactory.getLogger(RedisExpireRecoveryHandler.class); + + private final AtomicBoolean reconnected = new AtomicBoolean(false); + private final AtomicReference