diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java index 376bd816..93b5fa56 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/entity/EsGpsInfo.java @@ -25,8 +25,8 @@ public class EsGpsInfo implements Serializable { * 类型 */ private String deviceType; - private String latitude; - private String longitude; + private String lat; + private String lng; //方向 private String orientation; //高程 diff --git a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java index f1bd9073..980d0312 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java +++ b/stwzhj-modules/wzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java @@ -47,8 +47,11 @@ public class ConsumerWorker { public static LinkedBlockingDeque basedataDeque = new LinkedBlockingDeque<>(5000); + @DubboReference + private RemoteDataToEsService gpsService; - @KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}",properties = { + + @KafkaListener(topics = "${spring.kafka.consumer.topics}",properties = { "auto.offset.reset:latest"}) public void consumer(ConsumerRecord record) { Object value = record.value(); @@ -69,11 +72,19 @@ public class ConsumerWorker { } logger.info("esGpsInfo={}",esGpsInfo); - boolean offer = linkedBlockingDeque.offer(esGpsInfo); - R response = R.ok(offer); - if(Objects.isNull(response)){ - logger.info("response == null"); + try { + R r = gpsService.saveData(BeanUtil.toBean(esGpsInfo, RemoteGpsInfo.class) ); + if(Objects.isNull(r)){ + logger.error("response == null"); + }else { + logger.info(r.getMsg()); + } + } catch (Exception e) { + e.printStackTrace(); } +// boolean offer = linkedBlockingDeque.offer(esGpsInfo); + + } diff --git a/stwzhj-modules/wzhj-consumer/src/main/resources/application.yml b/stwzhj-modules/wzhj-consumer/src/main/resources/application.yml index d7fda697..55af8995 100644 --- a/stwzhj-modules/wzhj-consumer/src/main/resources/application.yml +++ b/stwzhj-modules/wzhj-consumer/src/main/resources/application.yml @@ -6,7 +6,7 @@ server: spring: application: # 应用名称 - name: stwzhj-consumer + name: wzhj-consumer profiles: # 环境配置 active: @profiles.active@ diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java index 586af34e..de22c389 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/RedisListenerConfig.java @@ -1,24 +1,40 @@ package org.dromara.data2es.config; import org.dromara.data2es.handler.RedisExpireListener; +import org.dromara.data2es.handler.RedisExpireRecoveryHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; @Configuration public class RedisListenerConfig { @Bean - RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) { - RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); - listenerContainer.setConnectionFactory(connectionFactory); - return listenerContainer; + RedisMessageListenerContainer listenerContainer( + RedisConnectionFactory connectionFactory, + RedisExpireRecoveryHandler recoveryHandler) { + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + + // 添加连接监听器用于故障转移恢复 + container.addMessageListener(recoveryHandler, new PatternTopic("__keyspace@*__:expired")); + return container; } @Bean - KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { - return new RedisExpireListener(listenerContainer); + KeyExpirationEventMessageListener redisKeyExpirationListener( + RedisMessageListenerContainer listenerContainer, + RedisExpireRecoveryHandler recoveryHandler) { + + return new RedisExpireListener(listenerContainer, recoveryHandler); + } + + @Bean + RedisExpireRecoveryHandler redisExpireRecoveryHandler() { + return new RedisExpireRecoveryHandler(); } } diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java index 614ce36a..a24e308d 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java @@ -8,88 +8,156 @@ import org.dromara.common.core.utils.RedisConstants; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.controller.DataToEsController; import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.redisson.Redisson; import org.redisson.api.RLock; +import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.redisson.connection.ConnectionListener; +import org.redisson.connection.ConnectionManager; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; -import java.util.Date; +import javax.annotation.PostConstruct; +import java.net.InetSocketAddress; import java.util.Objects; import java.util.concurrent.TimeUnit; -/** - *

description:

- * - * @author chenle - * @date 2021-11-08 16:40 - */ @Component @Slf4j public class RedisExpireListener extends KeyExpirationEventMessageListener { + private final RedisExpireRecoveryHandler recoveryHandler; + @Autowired DataToEsController dataToEsController; + private volatile boolean active = true; - Logger logger = LoggerFactory.getLogger(RedisExpireListener.class); + public RedisExpireListener( + RedisMessageListenerContainer listenerContainer, + RedisExpireRecoveryHandler recoveryHandler) { - - /** - * Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages. - * - * @param listenerContainer must not be {@literal null}. - */ - public RedisExpireListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); + this.recoveryHandler = recoveryHandler; + recoveryHandler.registerListener(this); + } + + @Override + public void init() { + try { + super.init(); + log.info("Redis过期监听器初始化成功"); + } catch (Exception e) { + log.error("监听器初始化失败", e); + } + } + + public void reconnect() { + if (!active) return; + + try { + log.info("尝试重新注册过期事件监听器..."); + // 停止当前监听 + super.destroy(); + // 重新初始化 + super.init(); + log.info("过期事件监听器重新注册成功"); + } catch (Exception e) { + log.error("重新注册监听器失败", e); + } } @Override public void onMessage(Message message, byte[] pattern) { + if (!active) return; + String expireKey = message.toString(); - if(StringUtils.isNotEmpty(expireKey) && - expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)){ + log.info("过期的Key={}", expireKey); + + if (StringUtils.isNotEmpty(expireKey) && + expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)) { + + log.info("在线定位过期的Key={}", expireKey); handleExpiredEvent(expireKey); } } - private void handleExpiredEvent(String expiredKey) { + private void handleExpiredEvent(String expiredKey) { RedissonClient redisson = RedisUtils.getClient(); RLock lock = redisson.getLock("LOCK:" + expiredKey); + try { if (lock.tryLock(0, 30, TimeUnit.SECONDS)) { // 实际业务逻辑 String[] split = expiredKey.split(":"); - String deviceType = split[2]; String deviceCode = split[3]; - /*if ("5".equals(deviceType) || "9".equals(deviceType) || "8".equals(deviceType) || "7".equals(deviceType)){ - return; - }*/ - log.error("redis key expired:key={}",expiredKey); - JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType+":"+deviceCode); - if (Objects.isNull(object)) { - log.info("redis key={},Object=null,deviceType={},deviceCode={}", expiredKey,deviceType,deviceCode); + + /* if ("5".equals(deviceType) || "9".equals(deviceType) || + "8".equals(deviceType) || "7".equals(deviceType)) { return; } +*/ + log.info("处理过期Key: {}", expiredKey); + JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode); + + if (Objects.isNull(object)) { + log.info("redis key={},Object=null,deviceType={},deviceCode={}", + expiredKey, deviceType, deviceCode); + return; + } + EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class); gpsInfo.setOnline(0); dataToEsController.saveGpsInfo(gpsInfo); - log.info("redis key expired:key={}", expiredKey); + log.info("处理完成: key={}", expiredKey); } } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); + log.error("处理过期事件被中断", e); + } catch (Exception e) { + log.error("处理过期事件异常", e); } finally { - // 仅在当前线程持有锁时释放 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } + @Override + public void destroy() { + active = false; + try { + super.destroy(); + } catch (Exception e) { + throw new RuntimeException(e); + } + log.info("Redis过期监听器已停止"); + } + + // 添加连接状态监听(使用Redisson事件总线) + @PostConstruct + public void addSentinelConnectionListener() { + try { + RedissonClient redisson = RedisUtils.getClient(); + + // 订阅Redisson连接事件 + RTopic connectionEvents = redisson.getTopic("__redisson_connection_event"); + connectionEvents.addListener(String.class, (channel, msg) -> { + if ("CONNECTED".equals(msg)) { + log.info("Redis连接已建立: {}", msg); + // 标记需要恢复监听 + recoveryHandler.markReconnected(); + } else if ("DISCONNECTED".equals(msg)) { + log.warn("Redis连接断开: {}", msg); + } + }); + + log.info("已注册Redisson连接事件监听器"); + } catch (Exception e) { + log.warn("无法添加Redisson连接事件监听器", e); + } + } } diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java new file mode 100644 index 00000000..81bc87a0 --- /dev/null +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireRecoveryHandler.java @@ -0,0 +1,36 @@ +package org.dromara.data2es.handler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +@Component +public class RedisExpireRecoveryHandler implements MessageListener { + + private static final Logger log = LoggerFactory.getLogger(RedisExpireRecoveryHandler.class); + + private final AtomicBoolean reconnected = new AtomicBoolean(false); + private final AtomicReference listenerRef = new AtomicReference<>(); + + public void registerListener(RedisExpireListener listener) { + this.listenerRef.set(listener); + } + + @Override + public void onMessage(Message message, byte[] pattern) { + // 检测到任何事件时,检查是否需要恢复监听 + if (reconnected.compareAndSet(true, false) && listenerRef.get() != null) { + log.warn("检测到Redis事件,尝试重新注册主监听器..."); + listenerRef.get().reconnect(); + } + } + + public void markReconnected() { + reconnected.set(true); + } +} diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/schedule/RedisOnlineUserSchedule.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/schedule/RedisOnlineUserSchedule.java new file mode 100644 index 00000000..8d99d3c5 --- /dev/null +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/schedule/RedisOnlineUserSchedule.java @@ -0,0 +1,56 @@ +package org.dromara.data2es.schedule; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import cn.hutool.json.JSONObject; +import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.dromara.data2es.service.IGpsService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + *

description:

+ * + * @author chenle + * @date 2021-05-18 18:23 + */ + +@Configuration +public class RedisOnlineUserSchedule { + + @Autowired + IGpsService gpsService; + + @Scheduled(cron = "0 0/20 * * * ?") + public void redisTimeOutRemove(){ + List jlist = RedisUtils.searchAndGetKeysValues("online_users:*"); + List gpsInfoVO2s = new ArrayList<>(); + for (JSONObject job : jlist) { + String deviceType = job.getStr("deviceType"); + if ("05".equals(deviceType)){ + continue; + } + Integer online = job.getInt("online"); + if (0 == online){ + continue; + } + EsGpsInfoVO2 vo2 = BeanUtil.toBean(job, EsGpsInfoVO2.class); + if (1 == vo2.getOnline() && DateUtil.between(vo2.getGpsTime(), new Date(), DateUnit.SECOND) > 1800L){ + vo2.setOnline(0); + gpsInfoVO2s.add(vo2); + } + } + if (gpsInfoVO2s.size() > 0){ + gpsService.updateOnlineStatusBatch(gpsInfoVO2s); + } + } + + +}