亳州位置汇聚改动

ds-bozhou
luyya 2025-07-10 18:03:58 +08:00
parent ac0080fe7f
commit 638c54684f
9 changed files with 81 additions and 65 deletions

View File

@ -19,31 +19,31 @@ dubbo:
password: ${spring.cloud.nacos.password}
parameters:
namespace: ${spring.profiles.active}
metadata-report:
address: redis://${spring.data.redis.host}:${spring.data.redis.port}
group: DUBBO_GROUP
username: dubbo
password: Ycgis@2509
# 集群开关
sentinel: false
parameters:
namespace: ${spring.profiles.active}
database: ${spring.data.redis.database}
timeout: ${spring.data.redis.timeout}
backup: 53.176.146.98:26380,53.176.146.99:26380,53.176.146.100:26380
# metadata-report:
# address: redis://${spring.data.redis.host}:${spring.data.redis.port}
# group: DUBBO_GROUP
# username: dubbo
# password: ${spring.data.redis.password}
# password: Ycgis@2509
# # 集群开关
# cluster: false
# sentinel: false
# parameters:
# namespace: ${spring.profiles.active}
# database: ${spring.data.redis.database}
# timeout: ${spring.data.redis.timeout}
# # 集群地址 cluster 为 true 生效
# backup: 127.0.0.1:6379,127.0.0.1:6381
# backup: 53.176.146.98:26380,53.176.146.99:26380,53.176.146.100:26380
metadata-report:
address: redis://${spring.data.redis.host}:${spring.data.redis.port}
group: DUBBO_GROUP
username: dubbo
password: ${spring.data.redis.password}
# 集群开关
cluster: false
parameters:
namespace: ${spring.profiles.active}
database: ${spring.data.redis.database}
timeout: ${spring.data.redis.timeout}
# 集群地址 cluster 为 true 生效
backup: 127.0.0.1:6379,127.0.0.1:6381
# 消费者相关配置
consumer:
# 结果缓存(LRU算法)

View File

@ -68,9 +68,9 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener {
String deviceType = split[2];
String deviceCode = split[3];
if ("5".equals(deviceType) || "9".equals(deviceType) || "8".equals(deviceType) || "7".equals(deviceType)){
/*if ("5".equals(deviceType) || "9".equals(deviceType) || "8".equals(deviceType) || "7".equals(deviceType)){
return;
}
}*/
log.error("redis key expired:key={}",expiredKey);
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType+":"+deviceCode);
if (Objects.isNull(object)) {

View File

@ -33,13 +33,13 @@ public class BaseDataSchedule {
@Autowired
DSQinwuService dsQinwuService;
@Value("${ruansi.ruansi-kafka.send-to-third-enabled}")
// @Value("${ruansi.ruansi-kafka.send-to-third-enabled}")
private boolean sendToThirdEnabled;
@Value("${ruansi.ruansi-kafka.start-update-time}")
// @Value("${ruansi.ruansi-kafka.start-update-time}")
private String startUpdateTime;
@Value("${ruansi.ruansi-kafka.ds-preurl}")
// @Value("${ruansi.ruansi-kafka.ds-preurl}")
private String dsPreurl;
/**
*

View File

@ -8,6 +8,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.data2es.api.RemoteDataToEsService;
import org.dromara.data2es.api.domain.RemoteGpsInfo;
import org.dromara.extract.WzhjExtractApplication;
import org.dromara.extract.domain.EsGpsInfo;
import org.dromara.extract.exception.MyBusinessException;
import org.dromara.extract.service.ITDeviceGpsService;
@ -16,6 +17,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
@ -42,6 +45,7 @@ public class DeviceGPSController {
@Value("${ruansi.start_update_time}")
private String startUpdateTime;
@Value("${ruansi.jly_update_time}")
private String jlyUpdateTime;
@Value("${ruansi.last_update_time}")
@ -60,8 +64,8 @@ public class DeviceGPSController {
return DateUtil.formatDateTime(info.getGpsTime());
}
@Scheduled(cron = "0/10 * * * * ?")
@Async
// @Scheduled(cron = "0/10 * * * * ?")
// @Async
public void bdgcGps(){
if(StringUtils.isBlank(lastUpdateTime)){
try {
@ -106,8 +110,8 @@ public class DeviceGPSController {
}
@Scheduled(cron = "0/8 * * * * ?")
@Async
@Scheduled(cron = "0/5 * * * * ?")
// @Async
public void jlyGps(){
if(StringUtils.isBlank(jlyUpdateTime)){
jlyUpdateTime = startUpdateTime;
@ -141,7 +145,7 @@ public class DeviceGPSController {
}
@Scheduled(cron = "0/30 * * * * ?")
// @Scheduled(cron = "0/30 * * * * ?")
public void jlyGpsStatus(){
if(StringUtils.isBlank(jlyUpdateTime)){
jlyUpdateTime = startUpdateTime;
@ -193,4 +197,12 @@ public class DeviceGPSController {
}
}
/*public static void main(String[] args) {
String time = "2025-07-10T10:25:40";
EsGpsInfo gpsInfo = new EsGpsInfo();
gpsInfo.setGpsTime(DateUtil.parse(time));
System.out.println(gpsInfo.toString());
}
*/
}

View File

@ -29,13 +29,13 @@ public class JJDeviceSchedule {
@DubboReference
RemoteDeviceService remoteDeviceService;
@Value("${ruansi.start_jlyupdate_time}")
// @Value("${ruansi.start_jlyupdate_time}")
private String startUpdateTime;
private String lastUpdateTime = "";
@Scheduled(cron = "0 0/10 * * * ?")
// @Scheduled(cron = "0 0/10 * * * ?")
public void saveORUpdateJly(){
if(StringUtils.isBlank(lastUpdateTime)){
lastUpdateTime = startUpdateTime;

View File

@ -25,6 +25,7 @@ public class TDeviceGpsServiceImpl implements ITDeviceGpsService {
}
@Override
@DS("zfjly")
public List<EsGpsInfo> selectJlyGPS(EsGpsInfo esGpsInfo) {
return deviceMapper.selectJlyGPS(esGpsInfo);
}

View File

@ -49,11 +49,11 @@
</select>
<select id="selectJlyGPS" parameterType="org.dromara.extract.domain.EsGpsInfo" resultMap="DeviceResult">
select zdbh deviceCode,'5' deviceType, longitude lng, latitude lat,gpstime gpsTime,speed from bas_police_gps
select indexcode deviceCode,'5' deviceType, longitude lng, latitude lat,to_timestamp(updatatime, 'YYYY-MM-DD HH24:MI:SS') gpsTime from public.mobileposition_view
<where>
<if test="gpsTime != null "> and gpstime >= #{gpsTime}</if>
<if test="gpsTime != null "> and to_timestamp(updatatime, 'YYYY-MM-DD HH24:MI:SS') > #{gpsTime} </if>
</where>
order by gpstime desc
order by updatatime desc
</select>
<select id="selectJlyStatus" parameterType="org.dromara.extract.domain.EsGpsInfo" resultMap="DeviceResult">

View File

@ -47,18 +47,25 @@ public class AsyncUtils {
* @throws ParseException
*/
@Async(value = "taskExecutor")
public void saveData(byte[] bytes) throws Exception {
public void saveData(byte[] bytes, String deviceType) {
//logger.info("当前线程名={}",Thread.currentThread().getName());
RemoteGpsInfo esGpsInfo = parseBytes(bytes);
esGpsInfo.setDeviceType("3");
//和redis过期监听时间一定要一致
if (DateUtil.between(new Date(),esGpsInfo.getGpsTime(),DateUnit.MINUTE)> 10){
esGpsInfo.setOnline(0);
}else {
esGpsInfo.setOnline(1);
RemoteGpsInfo esGpsInfo = null;
try {
logger.error("接收类型={}",deviceType);
esGpsInfo = parseBytes(bytes);
esGpsInfo.setDeviceType(deviceType);
//和redis过期监听时间一定要一致
if (DateUtil.between(new Date(),esGpsInfo.getGpsTime(),DateUnit.MINUTE)> 10){
esGpsInfo.setOnline(0);
}else {
esGpsInfo.setOnline(1);
}
logger.error("转换后的实体类={}",esGpsInfo.toString());
R response = dataEsService.saveData(esGpsInfo);
} catch (Exception e) {
e.printStackTrace();
}
logger.error(esGpsInfo.toString());
R response = dataEsService.saveData(esGpsInfo);
//logger.error("位置信息接口={},失败信息={},失败设备={}",response.getCode(),response.getMessage(),esGpsInfo.getDeviceId());
}
@ -67,7 +74,7 @@ public class AsyncUtils {
* AA--10101010 10101010 2^15+2^13+2^11+2^9 + 170=
* @param bytes
*/
private RemoteGpsInfo parseBytes(byte[] bytes) throws ParseException {
private RemoteGpsInfo parseBytes(byte[] bytes) {
checkHeaderByte(bytes);
@ -101,7 +108,6 @@ public class AsyncUtils {
logger.error("jingdu:"+jingdu);
long year = BitConverter.byteArrayToShort(Arrays.copyOfRange(copyByte, 44, 46), true);
logger.error("year:"+year);
//月日时分秒
byte[] sfm = Arrays.copyOfRange(copyByte, 46, 51);
String time = singleByteToString2(sfm);
@ -126,7 +132,7 @@ public class AsyncUtils {
* @param time 20210616123208
*/
private RemoteGpsInfo BuilderEsGpsInfo(String gpsId, double lonD, double latD, long speed,
long angle, long height, long jingdu, String time) throws ParseException {
long angle, long height, long jingdu, String time) {
RemoteGpsInfo gpsInfo = new RemoteGpsInfo();
gpsInfo.setDeviceCode(parseGpsId(gpsId));
gpsInfo.setDeviceType(parseDeviceType(gpsId));
@ -141,11 +147,16 @@ public class AsyncUtils {
return gpsInfo;
}
private Date parseGpsTime(String time) throws ParseException {
private Date parseGpsTime(String time) {
if(StringUtils.isEmpty(time)){
throw new MyBusinessException("时间为空");
logger.error("时间为空");
}
Date date = null;
try {
date = DateUtils.parseDate(time, new String[]{"yyyy-MM-dd HH:mm:ss", "yyyyMMddHHmmss"});
} catch (ParseException e) {
e.printStackTrace();
}
Date date = DateUtils.parseDate(time, new String[]{"yyyy-MM-dd HH:mm:ss", "yyyyMMddHHmmss"});
return date;
}
@ -166,7 +177,7 @@ public class AsyncUtils {
* @return
*/
private String parseDeviceType(String gpsId) {
checkGpsId(gpsId);
// checkGpsId(gpsId);
return gpsId;
}
@ -185,12 +196,13 @@ public class AsyncUtils {
* @return gpsId
*/
private String parseGpsId(String gpsId) {
checkGpsId(gpsId);
// checkGpsId(gpsId); 公车的id长度大于8
return gpsId;
}
private void checkGpsId(String gpsId) {
if(gpsId.length() != 8){
logger.error("gpsId为空");
throw new MyBusinessException("gpsId为空");
}
}

View File

@ -23,10 +23,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.*;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Date;
@ -76,14 +73,16 @@ public class OriginalUdpReceiver implements ApplicationListener<ApplicationReady
public void onApplicationEvent(ApplicationReadyEvent event) {
logger.info("原生方式启动");
try {
// Executors.newSingleThreadExecutor().execute(new UDPProcess("53.238.79.4",Integer.parseInt("10013")));
Executors.newSingleThreadExecutor().execute(new UDPProcess("53.176.146.100",Integer.parseInt("10013")));
// Executors.newSingleThreadExecutor().execute(new UDPProcess("10.129.221.10",Integer.parseInt("10033")));
Executors.newSingleThreadExecutor().execute(new UDPProcess("10.129.221.10",Integer.parseInt("10034")));
} catch (SocketException e) {
e.printStackTrace();
}
}
class UDPProcess implements Runnable {
DatagramSocket socket = null;
@ -101,18 +100,10 @@ public class OriginalUdpReceiver implements ApplicationListener<ApplicationReady
try {
socket.receive(packet);
byte[] data = packet.getData();
String unSendData = new String(buffer,0,data.length);
//String sendData = bytes2hexStr(data);
logger.error("接收到了数据:"+data);
asyncUtils.saveData(data);
/*taskExecutor.execute(() -> {
try {
saveData(data);
} catch (ParseException e) {
e.printStackTrace();
}
});*/
logger.error("接收到了数据"+data);
asyncUtils.saveData(data,"3");
} catch (Exception e) {
e.printStackTrace();
}