宣城培训会BUG修改
parent
8c53620702
commit
3fd5bc2f17
|
|
@ -1,24 +1,40 @@
|
|||
package org.dromara.data2es.config;
|
||||
|
||||
import org.dromara.data2es.handler.RedisExpireListener;
|
||||
import org.dromara.data2es.handler.RedisExpireRecoveryHandler;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||
import org.springframework.data.redis.listener.PatternTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
|
||||
@Configuration
|
||||
public class RedisListenerConfig {
|
||||
|
||||
@Bean
|
||||
RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
|
||||
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
|
||||
listenerContainer.setConnectionFactory(connectionFactory);
|
||||
return listenerContainer;
|
||||
RedisMessageListenerContainer listenerContainer(
|
||||
RedisConnectionFactory connectionFactory,
|
||||
RedisExpireRecoveryHandler recoveryHandler) {
|
||||
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
container.setConnectionFactory(connectionFactory);
|
||||
|
||||
// 添加连接监听器用于故障转移恢复
|
||||
container.addMessageListener(recoveryHandler, new PatternTopic("__keyspace@*__:expired"));
|
||||
return container;
|
||||
}
|
||||
|
||||
@Bean
|
||||
KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
|
||||
return new RedisExpireListener(listenerContainer);
|
||||
KeyExpirationEventMessageListener redisKeyExpirationListener(
|
||||
RedisMessageListenerContainer listenerContainer,
|
||||
RedisExpireRecoveryHandler recoveryHandler) {
|
||||
|
||||
return new RedisExpireListener(listenerContainer, recoveryHandler);
|
||||
}
|
||||
|
||||
@Bean
|
||||
RedisExpireRecoveryHandler redisExpireRecoveryHandler() {
|
||||
return new RedisExpireRecoveryHandler();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,95 +1,164 @@
|
|||
package org.dromara.data2es.handler;
|
||||
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.dromara.common.core.utils.RedisConstants;
|
||||
import org.dromara.common.core.utils.StringUtils;
|
||||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.data2es.controller.DataToEsController;
|
||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||
import org.redisson.Redisson;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RTopic;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.redisson.connection.ConnectionListener;
|
||||
import org.redisson.connection.ConnectionManager;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-11-08 16:40
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RedisExpireListener extends KeyExpirationEventMessageListener {
|
||||
|
||||
private final RedisExpireRecoveryHandler recoveryHandler;
|
||||
|
||||
@Autowired
|
||||
DataToEsController dataToEsController;
|
||||
private volatile boolean active = true;
|
||||
|
||||
public RedisExpireListener(
|
||||
RedisMessageListenerContainer listenerContainer,
|
||||
RedisExpireRecoveryHandler recoveryHandler) {
|
||||
|
||||
/**
|
||||
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
|
||||
*
|
||||
* @param listenerContainer must not be {@literal null}.
|
||||
*/
|
||||
public RedisExpireListener(RedisMessageListenerContainer listenerContainer) {
|
||||
super(listenerContainer);
|
||||
this.recoveryHandler = recoveryHandler;
|
||||
recoveryHandler.registerListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
try {
|
||||
super.init();
|
||||
log.info("Redis过期监听器初始化成功");
|
||||
} catch (Exception e) {
|
||||
log.error("监听器初始化失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void reconnect() {
|
||||
if (!active) return;
|
||||
|
||||
try {
|
||||
log.info("尝试重新注册过期事件监听器...");
|
||||
// 停止当前监听
|
||||
super.destroy();
|
||||
// 重新初始化
|
||||
super.init();
|
||||
log.info("过期事件监听器重新注册成功");
|
||||
} catch (Exception e) {
|
||||
log.error("重新注册监听器失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
if (!active) return;
|
||||
|
||||
String expireKey = message.toString();
|
||||
log.info("过期的Key={}",expireKey);
|
||||
if(StringUtils.isNotEmpty(expireKey) &&
|
||||
expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)){
|
||||
log.info("在线定位过期的Key={}",expireKey);
|
||||
log.info("过期的Key={}", expireKey);
|
||||
|
||||
if (StringUtils.isNotEmpty(expireKey) &&
|
||||
expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)) {
|
||||
|
||||
log.info("在线定位过期的Key={}", expireKey);
|
||||
handleExpiredEvent(expireKey);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleExpiredEvent(String expiredKey) {
|
||||
private void handleExpiredEvent(String expiredKey) {
|
||||
RedissonClient redisson = RedisUtils.getClient();
|
||||
RLock lock = redisson.getLock("LOCK:" + expiredKey);
|
||||
|
||||
try {
|
||||
if (lock.tryLock(0, 30, TimeUnit.SECONDS)) {
|
||||
// 实际业务逻辑
|
||||
String[] split = expiredKey.split(":");
|
||||
|
||||
String deviceType = split[2];
|
||||
String deviceCode = split[3];
|
||||
if ("5".equals(deviceType) || "9".equals(deviceType) || "8".equals(deviceType) || "7".equals(deviceType)){
|
||||
|
||||
if ("5".equals(deviceType) || "9".equals(deviceType) ||
|
||||
"8".equals(deviceType) || "7".equals(deviceType)) {
|
||||
return;
|
||||
}
|
||||
log.error("redis key expired:key={}",expiredKey);
|
||||
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType+":"+deviceCode);
|
||||
|
||||
log.info("处理过期Key: {}", expiredKey);
|
||||
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode);
|
||||
|
||||
if (Objects.isNull(object)) {
|
||||
log.info("redis key={},Object=null,deviceType={},deviceCode={}", expiredKey,deviceType,deviceCode);
|
||||
log.info("redis key={},Object=null,deviceType={},deviceCode={}",
|
||||
expiredKey, deviceType, deviceCode);
|
||||
return;
|
||||
}
|
||||
|
||||
EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class);
|
||||
gpsInfo.setOnline(0);
|
||||
dataToEsController.saveGpsInfo(gpsInfo);
|
||||
log.info("redis key expired:key={}", expiredKey);
|
||||
log.info("处理完成: key={}", expiredKey);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("处理过期事件被中断", e);
|
||||
} catch (Exception e) {
|
||||
log.error("处理过期事件异常", e);
|
||||
} finally {
|
||||
// 仅在当前线程持有锁时释放
|
||||
if (lock.isHeldByCurrentThread()) {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
active = false;
|
||||
try {
|
||||
super.destroy();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("Redis过期监听器已停止");
|
||||
}
|
||||
|
||||
// 添加连接状态监听(使用Redisson事件总线)
|
||||
@PostConstruct
|
||||
public void addSentinelConnectionListener() {
|
||||
try {
|
||||
RedissonClient redisson = RedisUtils.getClient();
|
||||
|
||||
// 订阅Redisson连接事件
|
||||
RTopic connectionEvents = redisson.getTopic("__redisson_connection_event");
|
||||
connectionEvents.addListener(String.class, (channel, msg) -> {
|
||||
if ("CONNECTED".equals(msg)) {
|
||||
log.info("Redis连接已建立: {}", msg);
|
||||
// 标记需要恢复监听
|
||||
recoveryHandler.markReconnected();
|
||||
} else if ("DISCONNECTED".equals(msg)) {
|
||||
log.warn("Redis连接断开: {}", msg);
|
||||
}
|
||||
});
|
||||
|
||||
log.info("已注册Redisson连接事件监听器");
|
||||
} catch (Exception e) {
|
||||
log.warn("无法添加Redisson连接事件监听器", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
package org.dromara.data2es.handler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Component
|
||||
public class RedisExpireRecoveryHandler implements MessageListener {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(RedisExpireRecoveryHandler.class);
|
||||
|
||||
private final AtomicBoolean reconnected = new AtomicBoolean(false);
|
||||
private final AtomicReference<RedisExpireListener> listenerRef = new AtomicReference<>();
|
||||
|
||||
public void registerListener(RedisExpireListener listener) {
|
||||
this.listenerRef.set(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
// 检测到任何事件时,检查是否需要恢复监听
|
||||
if (reconnected.compareAndSet(true, false) && listenerRef.get() != null) {
|
||||
log.warn("检测到Redis事件,尝试重新注册主监听器...");
|
||||
listenerRef.get().reconnect();
|
||||
}
|
||||
}
|
||||
|
||||
public void markReconnected() {
|
||||
reconnected.set(true);
|
||||
}
|
||||
}
|
||||
|
|
@ -16,7 +16,7 @@ public class TDeviceExportVo implements Serializable {
|
|||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@ExcelProperty(value = "设备编号")
|
||||
@ExcelProperty(value = "设备编码")
|
||||
private String deviceCode;
|
||||
|
||||
/**
|
||||
|
|
@ -65,6 +65,9 @@ public class TDeviceExportVo implements Serializable {
|
|||
@ExcelProperty(value = "证件号码", converter = ExcelDictConvert.class)
|
||||
private String cardNum;
|
||||
|
||||
@ExcelProperty(value = "设备名称")
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 0无效,1有效
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ public class TDeviceImportVo implements Serializable {
|
|||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@ExcelProperty(value = "设备编号")
|
||||
@ExcelProperty(value = "设备编码")
|
||||
private String deviceCode;
|
||||
|
||||
/**
|
||||
|
|
@ -65,6 +65,9 @@ public class TDeviceImportVo implements Serializable {
|
|||
@ExcelProperty(value = "证件号码")
|
||||
private String cardNum;
|
||||
|
||||
@ExcelProperty(value = "设备名称")
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 0无效,1有效
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -81,11 +81,11 @@ public class TDeviceServiceImpl implements ITDeviceService {
|
|||
*/
|
||||
@Override
|
||||
public TableDataInfo<TDeviceVo> queryPageList(TDeviceBo bo, PageQuery pageQuery) {
|
||||
bo.setValid(1);
|
||||
// bo.setValid(1);
|
||||
LambdaQueryWrapper<TDevice> lqw = buildQueryWrapper(bo);
|
||||
Page<TDeviceVo> result = baseMapper.selectPageDevicetList(pageQuery.build(), lqw);
|
||||
List<TDeviceVo> list = result.getRecords();
|
||||
for (TDeviceVo vo : list) {
|
||||
/*for (TDeviceVo vo : list) {
|
||||
if ("".equals(vo.getPoliceName()) || null == vo.getPoliceName()){
|
||||
vo.setDeviceName(vo.getCarNum());
|
||||
}
|
||||
|
|
@ -98,7 +98,7 @@ public class TDeviceServiceImpl implements ITDeviceService {
|
|||
|
||||
}
|
||||
|
||||
}
|
||||
}*/
|
||||
return TableDataInfo.build(result);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@
|
|||
</select>
|
||||
|
||||
<select id="selectDeviceExportList" resultMap="DeviceExportResult">
|
||||
select u.device_code, u.device_type, u.zzjgdm, u.zzjgmc, u.police_no, u.police_name, u.phone_num, u.car_num, u.valid,
|
||||
select u.device_code, u.device_type, u.zzjgdm, u.zzjgmc, u.police_no, u.police_name, u.phone_num, u.car_num,u.device_name, u.valid,
|
||||
u.remark1, u.remark2, u.card_num, u.create_time, u.update_time
|
||||
from t_device u
|
||||
${ew.getCustomSqlSegment}
|
||||
|
|
|
|||
Loading…
Reference in New Issue