亳州位置汇聚对接DS警务通定位并添加定时下线任务
parent
2e1676aa36
commit
00498de68e
|
|
@ -25,8 +25,8 @@ public class EsGpsInfo implements Serializable {
|
|||
* 类型
|
||||
*/
|
||||
private String deviceType;
|
||||
private String latitude;
|
||||
private String longitude;
|
||||
private String lat;
|
||||
private String lng;
|
||||
//方向
|
||||
private String orientation;
|
||||
//高程
|
||||
|
|
|
|||
|
|
@ -47,8 +47,11 @@ public class ConsumerWorker {
|
|||
|
||||
public static LinkedBlockingDeque basedataDeque = new LinkedBlockingDeque<>(5000);
|
||||
|
||||
@DubboReference
|
||||
private RemoteDataToEsService gpsService;
|
||||
|
||||
@KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}",properties = {
|
||||
|
||||
@KafkaListener(topics = "${spring.kafka.consumer.topics}",properties = {
|
||||
"auto.offset.reset:latest"})
|
||||
public void consumer(ConsumerRecord<String,Object> record) {
|
||||
Object value = record.value();
|
||||
|
|
@ -69,11 +72,19 @@ public class ConsumerWorker {
|
|||
}
|
||||
|
||||
logger.info("esGpsInfo={}",esGpsInfo);
|
||||
boolean offer = linkedBlockingDeque.offer(esGpsInfo);
|
||||
R response = R.ok(offer);
|
||||
if(Objects.isNull(response)){
|
||||
logger.info("response == null");
|
||||
try {
|
||||
R r = gpsService.saveData(BeanUtil.toBean(esGpsInfo, RemoteGpsInfo.class) );
|
||||
if(Objects.isNull(r)){
|
||||
logger.error("response == null");
|
||||
}else {
|
||||
logger.info(r.getMsg());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
// boolean offer = linkedBlockingDeque.offer(esGpsInfo);
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ server:
|
|||
spring:
|
||||
application:
|
||||
# 应用名称
|
||||
name: stwzhj-consumer
|
||||
name: wzhj-consumer
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: @profiles.active@
|
||||
|
|
|
|||
|
|
@ -1,24 +1,40 @@
|
|||
package org.dromara.data2es.config;
|
||||
|
||||
import org.dromara.data2es.handler.RedisExpireListener;
|
||||
import org.dromara.data2es.handler.RedisExpireRecoveryHandler;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||
import org.springframework.data.redis.listener.PatternTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
|
||||
@Configuration
|
||||
public class RedisListenerConfig {
|
||||
|
||||
@Bean
|
||||
RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
|
||||
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
|
||||
listenerContainer.setConnectionFactory(connectionFactory);
|
||||
return listenerContainer;
|
||||
RedisMessageListenerContainer listenerContainer(
|
||||
RedisConnectionFactory connectionFactory,
|
||||
RedisExpireRecoveryHandler recoveryHandler) {
|
||||
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
container.setConnectionFactory(connectionFactory);
|
||||
|
||||
// 添加连接监听器用于故障转移恢复
|
||||
container.addMessageListener(recoveryHandler, new PatternTopic("__keyspace@*__:expired"));
|
||||
return container;
|
||||
}
|
||||
|
||||
@Bean
|
||||
KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
|
||||
return new RedisExpireListener(listenerContainer);
|
||||
KeyExpirationEventMessageListener redisKeyExpirationListener(
|
||||
RedisMessageListenerContainer listenerContainer,
|
||||
RedisExpireRecoveryHandler recoveryHandler) {
|
||||
|
||||
return new RedisExpireListener(listenerContainer, recoveryHandler);
|
||||
}
|
||||
|
||||
@Bean
|
||||
RedisExpireRecoveryHandler redisExpireRecoveryHandler() {
|
||||
return new RedisExpireRecoveryHandler();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,88 +8,156 @@ import org.dromara.common.core.utils.RedisConstants;
|
|||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.data2es.controller.DataToEsController;
|
||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||
import org.redisson.Redisson;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RTopic;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.redisson.connection.ConnectionListener;
|
||||
import org.redisson.connection.ConnectionManager;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-11-08 16:40
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RedisExpireListener extends KeyExpirationEventMessageListener {
|
||||
|
||||
private final RedisExpireRecoveryHandler recoveryHandler;
|
||||
|
||||
@Autowired
|
||||
DataToEsController dataToEsController;
|
||||
private volatile boolean active = true;
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(RedisExpireListener.class);
|
||||
public RedisExpireListener(
|
||||
RedisMessageListenerContainer listenerContainer,
|
||||
RedisExpireRecoveryHandler recoveryHandler) {
|
||||
|
||||
|
||||
/**
|
||||
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
|
||||
*
|
||||
* @param listenerContainer must not be {@literal null}.
|
||||
*/
|
||||
public RedisExpireListener(RedisMessageListenerContainer listenerContainer) {
|
||||
super(listenerContainer);
|
||||
this.recoveryHandler = recoveryHandler;
|
||||
recoveryHandler.registerListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
try {
|
||||
super.init();
|
||||
log.info("Redis过期监听器初始化成功");
|
||||
} catch (Exception e) {
|
||||
log.error("监听器初始化失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void reconnect() {
|
||||
if (!active) return;
|
||||
|
||||
try {
|
||||
log.info("尝试重新注册过期事件监听器...");
|
||||
// 停止当前监听
|
||||
super.destroy();
|
||||
// 重新初始化
|
||||
super.init();
|
||||
log.info("过期事件监听器重新注册成功");
|
||||
} catch (Exception e) {
|
||||
log.error("重新注册监听器失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
if (!active) return;
|
||||
|
||||
String expireKey = message.toString();
|
||||
if(StringUtils.isNotEmpty(expireKey) &&
|
||||
expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)){
|
||||
log.info("过期的Key={}", expireKey);
|
||||
|
||||
if (StringUtils.isNotEmpty(expireKey) &&
|
||||
expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)) {
|
||||
|
||||
log.info("在线定位过期的Key={}", expireKey);
|
||||
handleExpiredEvent(expireKey);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleExpiredEvent(String expiredKey) {
|
||||
private void handleExpiredEvent(String expiredKey) {
|
||||
RedissonClient redisson = RedisUtils.getClient();
|
||||
RLock lock = redisson.getLock("LOCK:" + expiredKey);
|
||||
|
||||
try {
|
||||
if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
|
||||
// 实际业务逻辑
|
||||
String[] split = expiredKey.split(":");
|
||||
|
||||
String deviceType = split[2];
|
||||
String deviceCode = split[3];
|
||||
/*if ("5".equals(deviceType) || "9".equals(deviceType) || "8".equals(deviceType) || "7".equals(deviceType)){
|
||||
return;
|
||||
}*/
|
||||
log.error("redis key expired:key={}",expiredKey);
|
||||
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType+":"+deviceCode);
|
||||
if (Objects.isNull(object)) {
|
||||
log.info("redis key={},Object=null,deviceType={},deviceCode={}", expiredKey,deviceType,deviceCode);
|
||||
|
||||
/* if ("5".equals(deviceType) || "9".equals(deviceType) ||
|
||||
"8".equals(deviceType) || "7".equals(deviceType)) {
|
||||
return;
|
||||
}
|
||||
*/
|
||||
log.info("处理过期Key: {}", expiredKey);
|
||||
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode);
|
||||
|
||||
if (Objects.isNull(object)) {
|
||||
log.info("redis key={},Object=null,deviceType={},deviceCode={}",
|
||||
expiredKey, deviceType, deviceCode);
|
||||
return;
|
||||
}
|
||||
|
||||
EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class);
|
||||
gpsInfo.setOnline(0);
|
||||
dataToEsController.saveGpsInfo(gpsInfo);
|
||||
log.info("redis key expired:key={}", expiredKey);
|
||||
log.info("处理完成: key={}", expiredKey);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("处理过期事件被中断", e);
|
||||
} catch (Exception e) {
|
||||
log.error("处理过期事件异常", e);
|
||||
} finally {
|
||||
// 仅在当前线程持有锁时释放
|
||||
if (lock.isHeldByCurrentThread()) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
active = false;
|
||||
try {
|
||||
super.destroy();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("Redis过期监听器已停止");
|
||||
}
|
||||
|
||||
// 添加连接状态监听(使用Redisson事件总线)
|
||||
@PostConstruct
|
||||
public void addSentinelConnectionListener() {
|
||||
try {
|
||||
RedissonClient redisson = RedisUtils.getClient();
|
||||
|
||||
// 订阅Redisson连接事件
|
||||
RTopic connectionEvents = redisson.getTopic("__redisson_connection_event");
|
||||
connectionEvents.addListener(String.class, (channel, msg) -> {
|
||||
if ("CONNECTED".equals(msg)) {
|
||||
log.info("Redis连接已建立: {}", msg);
|
||||
// 标记需要恢复监听
|
||||
recoveryHandler.markReconnected();
|
||||
} else if ("DISCONNECTED".equals(msg)) {
|
||||
log.warn("Redis连接断开: {}", msg);
|
||||
}
|
||||
});
|
||||
|
||||
log.info("已注册Redisson连接事件监听器");
|
||||
} catch (Exception e) {
|
||||
log.warn("无法添加Redisson连接事件监听器", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
package org.dromara.data2es.handler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Component
|
||||
public class RedisExpireRecoveryHandler implements MessageListener {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(RedisExpireRecoveryHandler.class);
|
||||
|
||||
private final AtomicBoolean reconnected = new AtomicBoolean(false);
|
||||
private final AtomicReference<RedisExpireListener> listenerRef = new AtomicReference<>();
|
||||
|
||||
public void registerListener(RedisExpireListener listener) {
|
||||
this.listenerRef.set(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
// 检测到任何事件时,检查是否需要恢复监听
|
||||
if (reconnected.compareAndSet(true, false) && listenerRef.get() != null) {
|
||||
log.warn("检测到Redis事件,尝试重新注册主监听器...");
|
||||
listenerRef.get().reconnect();
|
||||
}
|
||||
}
|
||||
|
||||
public void markReconnected() {
|
||||
reconnected.set(true);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
package org.dromara.data2es.schedule;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.core.date.DateUnit;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||
import org.dromara.data2es.service.IGpsService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-05-18 18:23
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
public class RedisOnlineUserSchedule {
|
||||
|
||||
@Autowired
|
||||
IGpsService gpsService;
|
||||
|
||||
@Scheduled(cron = "0 0/20 * * * ?")
|
||||
public void redisTimeOutRemove(){
|
||||
List<JSONObject> jlist = RedisUtils.searchAndGetKeysValues("online_users:*");
|
||||
List<EsGpsInfoVO2> gpsInfoVO2s = new ArrayList<>();
|
||||
for (JSONObject job : jlist) {
|
||||
String deviceType = job.getStr("deviceType");
|
||||
if ("05".equals(deviceType)){
|
||||
continue;
|
||||
}
|
||||
Integer online = job.getInt("online");
|
||||
if (0 == online){
|
||||
continue;
|
||||
}
|
||||
EsGpsInfoVO2 vo2 = BeanUtil.toBean(job, EsGpsInfoVO2.class);
|
||||
if (1 == vo2.getOnline() && DateUtil.between(vo2.getGpsTime(), new Date(), DateUnit.SECOND) > 1800L){
|
||||
vo2.setOnline(0);
|
||||
gpsInfoVO2s.add(vo2);
|
||||
}
|
||||
}
|
||||
if (gpsInfoVO2s.size() > 0){
|
||||
gpsService.updateOnlineStatusBatch(gpsInfoVO2s);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue