亳州位置汇聚改动
parent
8443d5e58f
commit
be599f9768
|
|
@ -31,5 +31,4 @@ public class EsGpsInfo implements Serializable {
|
|||
|
||||
private Integer online;
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import org.dromara.common.core.utils.RedisConstants;
|
|||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.data2es.controller.DataToEsController;
|
||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||
import org.dromara.data2es.service.IGpsService;
|
||||
import org.redisson.Redisson;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RTopic;
|
||||
|
|
@ -22,6 +23,7 @@ import org.springframework.stereotype.Component;
|
|||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
|
@ -32,7 +34,9 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener {
|
|||
private final RedisExpireRecoveryHandler recoveryHandler;
|
||||
|
||||
@Autowired
|
||||
DataToEsController dataToEsController;
|
||||
IGpsService gpsService;
|
||||
|
||||
|
||||
private volatile boolean active = true;
|
||||
|
||||
public RedisExpireListener(
|
||||
|
|
@ -89,40 +93,66 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener {
|
|||
RLock lock = redisson.getLock("LOCK:" + expiredKey);
|
||||
|
||||
try {
|
||||
if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
|
||||
// 实际业务逻辑
|
||||
String[] split = expiredKey.split(":");
|
||||
String deviceType = split[2];
|
||||
String deviceCode = split[3];
|
||||
// 等待5秒获取锁,确保集群中只有一个实例处理
|
||||
if (lock.tryLock(5, 30, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
String[] split = expiredKey.split(":");
|
||||
String deviceType = split[2];
|
||||
String deviceCode = split[3];
|
||||
|
||||
if ("5".equals(deviceType) || "9".equals(deviceType) ||
|
||||
"8".equals(deviceType) || "7".equals(deviceType)) {
|
||||
return;
|
||||
if ("5".equals(deviceType) || "9".equals(deviceType) ||
|
||||
"8".equals(deviceType) || "7".equals(deviceType)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 检查1:过期key如果重新存在了,说明有新数据刷新,不处理离线
|
||||
boolean exists = RedisUtils.isExistsObject(expiredKey);
|
||||
if (exists) {
|
||||
log.info("过期key已刷新,跳过离线处理: {}", expiredKey);
|
||||
return;
|
||||
}
|
||||
|
||||
// 检查2:设置处理标记,防止其他实例重复处理(标记保留60秒)
|
||||
String processedKey = "PROCESSED:" + expiredKey;
|
||||
boolean isNew = RedisUtils.setObjectIfAbsent(processedKey, "1", Duration.ofSeconds(60));
|
||||
if (!isNew) {
|
||||
log.info("已被其他实例处理,跳过: {}", expiredKey);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("处理过期Key: {}", expiredKey);
|
||||
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode);
|
||||
|
||||
if (Objects.isNull(object)) {
|
||||
log.info("redis key={},Object=null,deviceType={},deviceCode={}",
|
||||
expiredKey, deviceType, deviceCode);
|
||||
return;
|
||||
}
|
||||
|
||||
EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class);
|
||||
|
||||
// 检查3:如果已经是离线状态,不重复处理
|
||||
if (gpsInfo.getOnline() != null && gpsInfo.getOnline() == 0) {
|
||||
log.info("设备已离线,跳过重复处理: {}", expiredKey);
|
||||
return;
|
||||
}
|
||||
|
||||
gpsInfo.setOnline(0);
|
||||
gpsService.updateOnlineStatus(gpsInfo);
|
||||
log.info("处理完成: key={}", expiredKey);
|
||||
} finally {
|
||||
if (lock.isHeldByCurrentThread()) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
log.info("处理过期Key: {}", expiredKey);
|
||||
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode);
|
||||
|
||||
if (Objects.isNull(object)) {
|
||||
log.info("redis key={},Object=null,deviceType={},deviceCode={}",
|
||||
expiredKey, deviceType, deviceCode);
|
||||
return;
|
||||
}
|
||||
|
||||
EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class);
|
||||
gpsInfo.setOnline(0);
|
||||
dataToEsController.saveGpsInfo(gpsInfo);
|
||||
log.info("处理完成: key={}", expiredKey);
|
||||
} else {
|
||||
log.info("获取锁超时,可能其他实例正在处理: {}", expiredKey);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("处理过期事件被中断", e);
|
||||
} catch (Exception e) {
|
||||
log.error("处理过期事件异常", e);
|
||||
} finally {
|
||||
if (lock.isHeldByCurrentThread()) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -165,14 +165,17 @@ public class RequestHandler {
|
|||
long b = RedisUtils.geoAdd(Double.valueOf(esGpsInfoVo2.getLng()),
|
||||
Double.valueOf(esGpsInfoVo2.getLat()), deviceCode +"#"+ deviceType);
|
||||
|
||||
if(onlineTime > 0) {
|
||||
// 只有在线状态才设置过期监听key,离线状态不设置,避免循环触发过期事件
|
||||
if(onlineTime > 0 && (esGpsInfoVo2.getOnline() == null || esGpsInfoVo2.getOnline() == 1)) {
|
||||
//设置一个过期时间,方便key自动过期监听,设置离线 [RedisExpireListener]
|
||||
RedisUtils.set(RedisConstants.ONLINE_USERS_TEN +
|
||||
deviceType
|
||||
+ ":" + deviceCode, jsonValue, onlineTime);
|
||||
|
||||
|
||||
|
||||
} else if (esGpsInfoVo2.getOnline() != null && esGpsInfoVo2.getOnline() == 0) {
|
||||
// 离线状态,删除过期监听key
|
||||
RedisUtils.del(RedisConstants.ONLINE_USERS_TEN +
|
||||
deviceType
|
||||
+ ":" + deviceCode);
|
||||
}
|
||||
//方便根据组织机构计算数量
|
||||
|
||||
|
|
|
|||
|
|
@ -100,8 +100,8 @@ public class GpsServiceImpl implements IGpsService {
|
|||
}
|
||||
|
||||
/*
|
||||
* 批量插入
|
||||
* */
|
||||
* 批量插入
|
||||
* */
|
||||
@Override
|
||||
public R saveDataBatch(List<EsGpsInfoVO2> list) {
|
||||
Map<String, String> onlineUserDataMap = new HashMap<>();
|
||||
|
|
@ -112,7 +112,7 @@ public class GpsServiceImpl implements IGpsService {
|
|||
String deviceType = info.getDeviceType();
|
||||
String deviceCode = info.getDeviceCode();
|
||||
//设置地市zzjgdm
|
||||
info = getInfo(info);
|
||||
info = getInfo(info);
|
||||
if (Objects.isNull(info)) {
|
||||
logger.error("redis或者mysql中的Object=null,deviceType={},deviceCode={}",deviceType,deviceCode);
|
||||
continue;
|
||||
|
|
@ -365,7 +365,7 @@ public class GpsServiceImpl implements IGpsService {
|
|||
if (DateUtil.between(gpsTime, new Date(), DateUnit.SECOND) <= 3600L) {
|
||||
|
||||
if (null == esGpsInfoVo2.getOnline()){
|
||||
esGpsInfoVo2.setOnline(1);
|
||||
esGpsInfoVo2.setOnline(1);
|
||||
}
|
||||
String jsonValue = JSONUtil.toJsonStr(esGpsInfoVo2);
|
||||
|
||||
|
|
@ -413,14 +413,14 @@ public class GpsServiceImpl implements IGpsService {
|
|||
deviceEntityV2.setDeviceType(deviceType);
|
||||
RemoteDeviceVo deviceEntityV21 = new RemoteDeviceVo();
|
||||
if ("5".equals(deviceType)){
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:" + deviceType+":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
if (null == deviceEntityV21){
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:8" +":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
if (null == deviceEntityV21){
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:7" +":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:" + deviceType+":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
if (null == deviceEntityV21){
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:8" +":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
if (null == deviceEntityV21){
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:7" +":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}else {
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:" + deviceType+":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue