diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/domain/EsGpsInfo.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/domain/EsGpsInfo.java index d585b0ba..c1ba8b6d 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/domain/EsGpsInfo.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/domain/EsGpsInfo.java @@ -31,5 +31,4 @@ public class EsGpsInfo implements Serializable { private Integer online; - } diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java index 31da1e42..5692ba44 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java @@ -8,6 +8,7 @@ import org.dromara.common.core.utils.RedisConstants; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.controller.DataToEsController; import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.dromara.data2es.service.IGpsService; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RTopic; @@ -22,6 +23,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -32,7 +34,9 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener { private final RedisExpireRecoveryHandler recoveryHandler; @Autowired - DataToEsController dataToEsController; + IGpsService gpsService; + + private volatile boolean active = true; public RedisExpireListener( @@ -89,40 +93,66 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener { RLock lock = redisson.getLock("LOCK:" + expiredKey); try { - if (lock.tryLock(0, 30, TimeUnit.SECONDS)) { - // 实际业务逻辑 - String[] split = expiredKey.split(":"); - String deviceType = split[2]; - String deviceCode = split[3]; + // 等待5秒获取锁,确保集群中只有一个实例处理 + if (lock.tryLock(5, 30, TimeUnit.SECONDS)) { + try { + String[] split = expiredKey.split(":"); + String deviceType = split[2]; + String deviceCode = split[3]; - if ("5".equals(deviceType) || "9".equals(deviceType) || - "8".equals(deviceType) || "7".equals(deviceType)) { - return; + if ("5".equals(deviceType) || "9".equals(deviceType) || + "8".equals(deviceType) || "7".equals(deviceType)) { + return; + } + + // 检查1:过期key如果重新存在了,说明有新数据刷新,不处理离线 + boolean exists = RedisUtils.isExistsObject(expiredKey); + if (exists) { + log.info("过期key已刷新,跳过离线处理: {}", expiredKey); + return; + } + + // 检查2:设置处理标记,防止其他实例重复处理(标记保留60秒) + String processedKey = "PROCESSED:" + expiredKey; + boolean isNew = RedisUtils.setObjectIfAbsent(processedKey, "1", Duration.ofSeconds(60)); + if (!isNew) { + log.info("已被其他实例处理,跳过: {}", expiredKey); + return; + } + + log.info("处理过期Key: {}", expiredKey); + JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode); + + if (Objects.isNull(object)) { + log.info("redis key={},Object=null,deviceType={},deviceCode={}", + expiredKey, deviceType, deviceCode); + return; + } + + EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class); + + // 检查3:如果已经是离线状态,不重复处理 + if (gpsInfo.getOnline() != null && gpsInfo.getOnline() == 0) { + log.info("设备已离线,跳过重复处理: {}", expiredKey); + return; + } + + gpsInfo.setOnline(0); + gpsService.updateOnlineStatus(gpsInfo); + log.info("处理完成: key={}", expiredKey); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } } - - log.info("处理过期Key: {}", expiredKey); - JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode); - - if (Objects.isNull(object)) { - log.info("redis key={},Object=null,deviceType={},deviceCode={}", - expiredKey, deviceType, deviceCode); - return; - } - - EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class); - gpsInfo.setOnline(0); - dataToEsController.saveGpsInfo(gpsInfo); - log.info("处理完成: key={}", expiredKey); + } else { + log.info("获取锁超时,可能其他实例正在处理: {}", expiredKey); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("处理过期事件被中断", e); } catch (Exception e) { log.error("处理过期事件异常", e); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } } } diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java index f186ee90..19f0a74e 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java @@ -165,14 +165,17 @@ public class RequestHandler { long b = RedisUtils.geoAdd(Double.valueOf(esGpsInfoVo2.getLng()), Double.valueOf(esGpsInfoVo2.getLat()), deviceCode +"#"+ deviceType); - if(onlineTime > 0) { + // 只有在线状态才设置过期监听key,离线状态不设置,避免循环触发过期事件 + if(onlineTime > 0 && (esGpsInfoVo2.getOnline() == null || esGpsInfoVo2.getOnline() == 1)) { //设置一个过期时间,方便key自动过期监听,设置离线 [RedisExpireListener] RedisUtils.set(RedisConstants.ONLINE_USERS_TEN + deviceType + ":" + deviceCode, jsonValue, onlineTime); - - - + } else if (esGpsInfoVo2.getOnline() != null && esGpsInfoVo2.getOnline() == 0) { + // 离线状态,删除过期监听key + RedisUtils.del(RedisConstants.ONLINE_USERS_TEN + + deviceType + + ":" + deviceCode); } //方便根据组织机构计算数量 diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java index eeb7c520..4f96a215 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java @@ -100,8 +100,8 @@ public class GpsServiceImpl implements IGpsService { } /* - * 批量插入 - * */ + * 批量插入 + * */ @Override public R saveDataBatch(List list) { Map onlineUserDataMap = new HashMap<>(); @@ -112,7 +112,7 @@ public class GpsServiceImpl implements IGpsService { String deviceType = info.getDeviceType(); String deviceCode = info.getDeviceCode(); //设置地市zzjgdm - info = getInfo(info); + info = getInfo(info); if (Objects.isNull(info)) { logger.error("redis或者mysql中的Object=null,deviceType={},deviceCode={}",deviceType,deviceCode); continue; @@ -365,7 +365,7 @@ public class GpsServiceImpl implements IGpsService { if (DateUtil.between(gpsTime, new Date(), DateUnit.SECOND) <= 3600L) { if (null == esGpsInfoVo2.getOnline()){ - esGpsInfoVo2.setOnline(1); + esGpsInfoVo2.setOnline(1); } String jsonValue = JSONUtil.toJsonStr(esGpsInfoVo2); @@ -413,14 +413,14 @@ public class GpsServiceImpl implements IGpsService { deviceEntityV2.setDeviceType(deviceType); RemoteDeviceVo deviceEntityV21 = new RemoteDeviceVo(); if ("5".equals(deviceType)){ - deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:" + deviceType+":"+deviceCode), RemoteDeviceVo.class) ; - if (null == deviceEntityV21){ - deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:8" +":"+deviceCode), RemoteDeviceVo.class) ; - if (null == deviceEntityV21){ - deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:7" +":"+deviceCode), RemoteDeviceVo.class) ; + deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:" + deviceType+":"+deviceCode), RemoteDeviceVo.class) ; + if (null == deviceEntityV21){ + deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:8" +":"+deviceCode), RemoteDeviceVo.class) ; + if (null == deviceEntityV21){ + deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:7" +":"+deviceCode), RemoteDeviceVo.class) ; - } - } + } + } }else { deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:" + deviceType+":"+deviceCode), RemoteDeviceVo.class) ; }