宣城新版位置汇聚

ds-bozhou
luyya 2025-03-03 11:00:15 +08:00
parent 569c86fe18
commit 692c2a31bd
30 changed files with 341 additions and 1101 deletions

View File

@ -89,12 +89,12 @@
<id>prod</id>
<properties>
<profiles.active>prod</profiles.active>
<nacos.server>127.0.0.1:8848</nacos.server>
<nacos.server>53.16.17.13:8848</nacos.server>
<nacos.discovery.group>DEFAULT_GROUP</nacos.discovery.group>
<nacos.config.group>DEFAULT_GROUP</nacos.config.group>
<nacos.username>nacos</nacos.username>
<nacos.password>nacos</nacos.password>
<logstash.address>127.0.0.1:4560</logstash.address>
<logstash.address>53.16.17.13:4560</logstash.address>
</properties>
</profile>
</profiles>

View File

@ -22,8 +22,8 @@ public class RemoteGpsInfo implements Serializable {
*
*/
private String deviceType;
private String lat;
private String lng;
private String latitude;
private String longitude;
//方向
private String orientation;
//高程

View File

@ -83,6 +83,10 @@ public class RemoteDeviceBo implements Serializable {
*/
private String remark1;
private String createTime;
private String updateTime;
/**
* 2
*/

View File

@ -573,6 +573,73 @@ public class RedisUtils {
System.out.println("redis:"+list);
}
/**
* RMap
*
* @param data
* @param timeout
* @param timeUnit
*/
public static void batchPutWithExpire(Map<String, String> data, long timeout, TimeUnit timeUnit) {
// 创建 RBatch 实例
RBatch batch = CLIENT.createBatch();
// 获取 RMapAsync 对象
RMapAsync<Object, Object> mapAsync = batch.getMap("myMap");
// 批量操作:将多个数据添加到 map 中
for (Map.Entry<String, String> entry : data.entrySet()) {
mapAsync.putAsync(entry.getKey(), entry.getValue());
}
// 执行批量操作
batch.execute();
// 获取同步的 RMap 对象并设置过期时间
RMap<Object, Object> mapSync = CLIENT.getMap("myMap");
mapSync.expire(timeout, timeUnit);
}
/**
*
*
* @param data
*/
public static void batchPut(Map<String, String> data) {
// 创建 RBatch 实例
RBatch batch = CLIENT.createBatch();
// 获取 RMapAsync 对象
RMapAsync<Object, Object> mapAsync = batch.getMap("myMap");
// 批量操作:将多个数据添加到 map 中
for (Map.Entry<String, String> entry : data.entrySet()) {
mapAsync.putAsync(entry.getKey(), entry.getValue());
}
// 执行批量操作
batch.execute();
}
/**
* key
*
* @param key key
* @return
*/
public static JSONObject getData(String key) {
// 获取同步的 RMap 对象
RMap<Object, Object> map = CLIENT.getMap("myMap");
// 根据 key 获取数据
Object value = map.get(key);
if (null == value){
return null;
}
return JSONUtil.parseObj(value.toString());
}
/*
*
* */
@ -596,12 +663,41 @@ public class RedisUtils {
return list;
}
/**
* Redis keys key
*
* @param pattern "user:*"
* @return key value
*/
public Map<String, String> getMatchingKeysAndValues(String pattern) {
RKeys rKeys = CLIENT.getKeys();
Iterable<String> keysIterable = rKeys.getKeysByPattern(pattern); // 获取匹配的 key
// 获取匹配的键值对
RMap<String, String> map = CLIENT.getMap("myMap");
Map<String, String> result = new java.util.HashMap<>();
List<JSONObject> list = new ArrayList<>();
// RBatch batch = CLIENT.createBatch();
// 批量获取这些key的值
for (String key : keysIterable) {
String value = map.get(key); // 获取每个 key 对应的 value
JSONObject jsonObject = JSONUtil.parseObj(value);
list.add(jsonObject);
}
return result;
}
/*
* keyRBucket
* */
public static JSONObject getBucket(String key){
RBucket<Object> bucket = CLIENT.getBucket(key);
Object value = bucket.get();
if (null == value){
return null;
}
return JSONUtil.parseObj(value.toString());
}

View File

@ -16,6 +16,9 @@
<module>stwzhj-workflow</module>
<module>stwzhj-data2es</module>
<module>stwzhj-baseToSt</module>
<module>stwzhj-consumer</module>
<module>stwzhj-location</module>
<module>stwzhj-dataToGas</module>
</modules>
<artifactId>stwzhj-modules</artifactId>

View File

@ -1,20 +1,9 @@
package org.dromara.kafka.consumer;
import com.ruansee.redis.JedisConfig;
import com.ruansee.redis.RedisConfig;
import com.ruansee.redis.RedisUtil;
import com.ruansee.redis.RedissionLockUtil;
import org.dromara.kafka.consumer.config.KafkaPropertiesConfig;
import org.redisson.spring.starter.RedissonAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.scheduling.annotation.EnableAsync;
/**
@ -25,7 +14,6 @@ import org.springframework.scheduling.annotation.EnableAsync;
*/
@SpringBootApplication
@EnableAsync
@EnableConfigurationProperties({KafkaPropertiesConfig.class})
@ServletComponentScan
public class KafkaConsumerApplication {
public static void main(String[] args){

View File

@ -1,136 +0,0 @@
package org.dromara.kafka.consumer.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
public final class KafkaProperties
{
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
// Topic名称安全模式下需要以管理员用户添加当前用户的访问权限
public final static String TOPIC = "t_gps_realtime";
private static Properties serverProps = new Properties();
private static Properties producerProps = new Properties();
private static Properties consumerProps = new Properties();
private static Properties clientProps = new Properties();
private static KafkaProperties instance = null;
private KafkaProperties()
{
String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
try
{
File proFile = new File(filePath + "producer.properties");
if (proFile.exists())
{
producerProps.load(new FileInputStream(filePath + "producer.properties"));
}
File conFile = new File(filePath + "producer.properties");
if (conFile.exists())
{
consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
}
File serFile = new File(filePath + "server.properties");
if (serFile.exists())
{
serverProps.load(new FileInputStream(filePath + "server.properties"));
}
File cliFile = new File(filePath + "client.properties");
if (cliFile.exists())
{
clientProps.load(new FileInputStream(filePath + "client.properties"));
}
}
catch (IOException e)
{
LOG.info("The Exception occured.", e);
}
}
public synchronized static KafkaProperties getInstance()
{
if (null == instance)
{
instance = new KafkaProperties();
}
return instance;
}
/**
*
* @param key properiteskey
* @param defValue
* @return
*/
public String getValues(String key, String defValue)
{
String rtValue = null;
if (null == key)
{
LOG.error("key is null");
}
else
{
rtValue = getPropertiesValue(key);
}
if (null == rtValue)
{
LOG.warn("KafkaProperties.getValues return null, key is " + key);
rtValue = defValue;
}
LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
return rtValue;
}
/**
* keyserver.properties
* @param key
* @return
*/
private String getPropertiesValue(String key)
{
String rtValue = serverProps.getProperty(key);
// server.properties中没有则再向producer.properties中获取
if (null == rtValue)
{
rtValue = producerProps.getProperty(key);
}
// producer中没有则再向consumer.properties中获取
if (null == rtValue)
{
rtValue = consumerProps.getProperty(key);
}
// consumer没有则再向client.properties中获取
if (null == rtValue)
{
rtValue = clientProps.getProperty(key);
}
return rtValue;
}
}

View File

@ -1,35 +0,0 @@
package org.dromara.kafka.consumer.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Profile;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-09-06 15:13
*/
@ConfigurationProperties(prefix = "mykafka")
@Profile(value = "dev")
public
class KafkaPropertiesConfig {
private String serverUrl;
private MyConsumerProperties consumerProperties = new MyConsumerProperties();
public String getServerUrl() {
return serverUrl;
}
public void setServerUrl(String serverUrl) {
this.serverUrl = serverUrl;
}
public MyConsumerProperties getConsumerProperties() {
return consumerProperties;
}
public void setConsumerProperties(MyConsumerProperties consumerProperties) {
this.consumerProperties = consumerProperties;
}
}

View File

@ -1,28 +0,0 @@
package org.dromara.kafka.consumer.config;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-09-07 14:54
*/
public class MyConsumerProperties {
private String clientId;
private String groupId = "222";
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
}

View File

@ -1,159 +0,0 @@
package org.dromara.kafka.consumer.config;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.dromara.kafka.consumer.handler.KafkaSecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
public class NewConsumer extends Thread{
private static final Logger LOG = LoggerFactory.getLogger(NewConsumer.class);
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
// 一次请求的最大等待时间
private final int waitTime = 10000;
// Broker连接地址
private final String bootstrapServers = "bootstrap.servers";
// Group id
private final String groupId = "group.id";
// 消息内容使用的反序列化类
private final String valueDeserializer = "value.deserializer";
// 消息Key值使用的反序列化类
private final String keyDeserializer = "key.deserializer";
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
private final String securityProtocol = "security.protocol";
// 服务名
private final String saslKerberosServiceName = "sasl.kerberos.service.name";
// 域名
private final String kerberosDomainName = "kerberos.domain.name";
// 是否自动提交offset
private final String enableAutoCommit = "enable.auto.commit";
// 自动提交offset的时间间隔
private final String autoCommitIntervalMs = "auto.commit.interval.ms";
// 会话超时时间
private final String sessionTimeoutMs = "session.timeout.ms";
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "aqdsj_ruansi";
/**
* NewConsumer
* @param topic Topic
*/
public NewConsumer(String topic) {
Properties props = new Properties();
KafkaProperties kafkaProc = KafkaProperties.getInstance();
// Broker连接地址
props.put(bootstrapServers,
kafkaProc.getValues(bootstrapServers, "localhost:21007"));
// Group id
props.put(groupId, "DemoConsumer");
// 是否自动提交offset
props.put(enableAutoCommit, "true");
// 自动提交offset的时间间隔
props.put(autoCommitIntervalMs, "1000");
// 会话超时时间
props.put(sessionTimeoutMs, "30000");
// 消息Key值使用的反序列化类
props.put(keyDeserializer,
"org.apache.kafka.common.serialization.IntegerDeserializer");
// 消息内容使用的反序列化类
props.put(valueDeserializer,
"org.apache.kafka.common.serialization.StringDeserializer");
// 安全协议类型
props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT"));
// 服务名
props.put(saslKerberosServiceName, "kafka");
// 域名
props.put(kerberosDomainName, kafkaProc.getValues(kerberosDomainName, "hadoop.hadoop.com"));
consumer = new KafkaConsumer<Integer, String>(props);
this.topic = topic;
}
/**
* Topic
*/
public void doWork()
{
// 订阅
consumer.subscribe(Collections.singletonList(this.topic));
// 消息消费请求
ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
// 消息处理
for (ConsumerRecord<Integer, String> record : records)
{
LOG.info("[NewConsumerExample], Received message: (" + record.key() + ", " + record.value()
+ ") at offset " + record.offset());
}
}
public static void main(String[] args)
{
if (KafkaSecurityUtil.isSecurityModel())
{
try
{
LOG.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
KafkaSecurityUtil.securityPrepare();
}
catch (IOException e)
{
LOG.error("Security prepare failure.");
LOG.error("The IOException occured : {}.", e);
return;
}
LOG.info("Security prepare success.");
}
NewConsumer consumerThread = new NewConsumer(KafkaProperties.TOPIC);
consumerThread.start();
// 等到60s后将consumer关闭实际执行过程中可修改
try
{
Thread.sleep(60000);
}
catch (InterruptedException e)
{
LOG.info("The InterruptedException occured : {}.", e);
}
finally
{
consumerThread.shutdown();
consumerThread.consumer.close();
}
}
@Override
public synchronized void start() {
doWork();
}
private void shutdown(){
Thread.currentThread().interrupt();
}
}

View File

@ -25,8 +25,8 @@ public class EsGpsInfo implements Serializable {
*
*/
private String deviceType;
private String lat;
private String lng;
private String latitude;
private String longitude;
//方向
private String orientation;
//高程

View File

@ -4,11 +4,13 @@ import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.convert.ConvertException;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.ruansee.response.ApiResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -17,9 +19,13 @@ import org.dromara.data2es.api.RemoteDataToEsService;
import org.dromara.data2es.api.domain.RemoteGpsInfo;
import org.dromara.kafka.consumer.entity.EsGpsInfo;
import org.dromara.kafka.consumer.entity.EsGpsInfoVO;
import org.dromara.system.api.domain.bo.RemoteDeviceBo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -32,88 +38,36 @@ import java.util.concurrent.LinkedBlockingDeque;
* @author chenle
* @date 2021-09-06 16:44
*/
public class ConsumerWorker implements Runnable {
private ConsumerRecord<String, Object> record;
@Slf4j
@Configuration
public class ConsumerWorker {
private Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
public static LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque<>(5000);
private String cityCode ;
public static LinkedBlockingDeque basedataDeque = new LinkedBlockingDeque<>(5000);
ConsumerWorker(ConsumerRecord<String, Object> record, String cityCode) {
this.record = record;
this.cityCode = cityCode;
}
@Override
public void run() {
//其他地市使用的方法,这里使用了一个巧妙的方法我们开发的地市都是传4位这种其他地市的cityCode传大于4位然后截取
if(cityCode.length() > 4){
cityCode = cityCode.substring(0,4);
normalRequest();
}else {
//六安、安庆等地市的方法,这些地市都是我们自己公司开发的东西。
luanrequest();
// luanrequestBatch();
}
}
/*
*
* */
private void luanrequestBatch() {
Object value = record.value();
String topic = record.topic();
List<EsGpsInfo> list = new ArrayList<>();
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
List<JSONObject> jsonObjects = JSON.parseArray((String) value, JSONObject.class);
for (JSONObject jsonObject : jsonObjects) {
EsGpsInfo esGpsInfo;
/*try {
jsonObject = JSONUtil.parseObj(((String) value));
}catch (ConvertException e){
logger.info("jsonObject=null:error={}",e.getMessage());
return;
}*/
try {
esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class);
}catch (ConvertException e){
logger.info("EsGpsInfo=null:error={}",e.getMessage());
return;
}
if(Objects.isNull(esGpsInfo)){
logger.info("esGpsInfo=null no error");
return;
}
String deviceCode = esGpsInfo.getDeviceCode();
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
logger.info("deviceCode:{} is null or is too long ",deviceCode);
return;
}
String latitude = esGpsInfo.getLat();
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
logger.info("latitude:{} is null or is zero ",latitude);
return;
}
String longitude = esGpsInfo.getLng();
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
logger.info("longitude:{} is null or is zero ",longitude);
return;
}
esGpsInfo.setInfoSource(cityCode);
esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime"))));
list.add(esGpsInfo);
}
// dataToEsService.saveGpsInfoBatch(list);
}
private void luanrequest() {
@KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}",properties = {
"auto.offset.reset:latest"})
public void consumer(ConsumerRecord<String,Object> record) {
Object value = record.value();
String topic = record.topic();
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
if ("jysb_dwxx".equals(topic)){ //定位信息
// logger.info("offset={},topic={},value={}", record.offset(), topic,value);
luanrequest(value);
} else if ("jysb_sbxx".equals(topic)) { //基础信息
// logger.info("offset={},topic={},value={}", record.offset(), topic,value);
baseDataRequest(value);
}
}
private void luanrequest(Object value) {
RemoteGpsInfo esGpsInfo;
JSONObject jsonObject;
try {
@ -138,17 +92,16 @@ public class ConsumerWorker implements Runnable {
logger.info("deviceCode:{} is null or is too long ",deviceCode);
return;
}
String latitude = esGpsInfo.getLat();
String latitude = esGpsInfo.getLatitude();
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
logger.info("latitude:{} is null or is zero ",latitude);
return;
}
String longitude = esGpsInfo.getLng();
String longitude = esGpsInfo.getLongitude();
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
logger.info("longitude:{} is null or is zero ",longitude);
return;
}
esGpsInfo.setInfoSource(cityCode);
try {
esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime"))));
}catch (Exception e){
@ -162,73 +115,73 @@ public class ConsumerWorker implements Runnable {
}
logger.info("code={},msg={}",response.getCode(),response.getMsg());
if(200 == response.getCode()){
logger.info("topic={},data2es={},gpsTime={}",topic,"success",esGpsInfo.getGpsTime());
logger.info("topic=jysb_dwxx,data2es={},gpsTime={}","success",esGpsInfo.getGpsTime());
}else{
logger.info("topic={},data2es={}",topic,response.getMsg());
logger.info("topic=jysb_dwxx,data2es={}",response.getMsg());
}
}
/**
*
*/
private void normalRequest() {
Object value = record.value();
String topic = record.topic();
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
RemoteGpsInfo esGpsInfo = new RemoteGpsInfo();
EsGpsInfoVO esGpsInfoVO;
/*
*
* */
private void baseDataRequest(Object value){
RemoteDeviceBo deviceBo;
JSONObject jsonObject;
try {
esGpsInfoVO = JSONUtil.toBean(((String) value), EsGpsInfoVO.class);
jsonObject = JSONUtil.parseObj(((String) value));
}catch (ConvertException e){
logger.info("esGpsInfoVO=null:error={}",e.getMessage());
logger.info("jsonObject=null:error={}",e.getMessage());
return;
}
if(Objects.isNull(esGpsInfoVO)){
logger.info("esGpsInfoVO=null no error");
return;
}
try {
DateTime parse = DateUtil.parse(esGpsInfoVO.getGpsTime(), "yyyy-MM-dd HH:mm:ss");
}catch (Exception e){
logger.info("gpsTime:{} format error", esGpsInfoVO.getGpsTime());
deviceBo = JSONUtil.toBean(jsonObject, RemoteDeviceBo.class);
}catch (ConvertException e){
logger.info("Device=null:error={}",e.getMessage());
return;
}
String deviceCode = esGpsInfoVO.getDeviceCode();
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
logger.info("deviceCode:{} is null or is too long ",deviceCode);
if(Objects.isNull(deviceBo)){
logger.info("deviceBo=null no error");
return;
}
String latitude = esGpsInfoVO.getLatitude();
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
logger.info("latitude:{} is null or is zero ",latitude);
if (StringUtils.isEmpty(deviceBo.getDeviceCode())){
logger.info("deviceCode is null");
return;
}
String longitude = esGpsInfoVO.getLongitude();
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
logger.info("longitude:{} is null or is zero ",longitude);
if (StringUtils.isEmpty(deviceBo.getInfoSource())){
logger.info("infoSource is null");
return;
}
BeanUtil.copyProperties(esGpsInfoVO,esGpsInfo,new CopyOptions());
esGpsInfo.setLat(latitude);
esGpsInfo.setLng(esGpsInfoVO.getLongitude());
esGpsInfo.setOrientation(esGpsInfoVO.getDirection());
esGpsInfo.setInfoSource(cityCode);
boolean offer = linkedBlockingDeque.offer(esGpsInfo);
if (!StringUtils.isEmpty(deviceBo.getCreateTime())){
try {
Date createTime = new Date(Long.valueOf(jsonObject.getStr("createTime")));
deviceBo.setCreateTime(DateUtil.format(createTime, "yyyy-MM-dd HH:mm:ss"));
}catch (Exception e){
logger.error("error_msg={}",e.getMessage());
}
}
if (!StringUtils.isEmpty(deviceBo.getUpdateTime())){
try {
Date updateTime = new Date(Long.valueOf(jsonObject.getStr("updateTime")));
deviceBo.setUpdateTime(DateUtil.format(updateTime, "yyyy-MM-dd HH:mm:ss"));
}catch (Exception e){
logger.error("error_msg={}",e.getMessage());
}
}
logger.info("deviceBo={}",deviceBo);
boolean offer = basedataDeque.offer(deviceBo);
R response = R.ok(offer);
if(Objects.isNull(response)){
logger.info("response == null");
}
logger.info("code={},msg={}",response.getCode(),response.getMsg());
if(200 == response.getCode()){
logger.info("topic={},data2es={}",topic,"success");
logger.info("topic=jysb_sbxx,data2es={},deviceCode={}","success",deviceBo.getDeviceCode());
}else{
logger.error("topic={},data2es={}",topic,"fail");
logger.info("topic=jysb_sbxx,data2es={}",response.getMsg());
}
}
}

View File

@ -7,6 +7,8 @@ import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.data2es.api.RemoteDataToEsService;
import org.dromara.data2es.api.domain.RemoteGpsInfo;
import org.dromara.kafka.consumer.entity.EsGpsInfo;
import org.dromara.system.api.RemoteDeviceService;
import org.dromara.system.api.domain.bo.RemoteDeviceBo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;
@ -31,21 +33,31 @@ public class DataInsertBatchHandler implements CommandLineRunner {
@DubboReference
private RemoteDataToEsService gpsService;
@DubboReference
private RemoteDeviceService deviceService;
@Override
public void run(String... args) throws Exception {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque;
LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque; //定位信息队列
LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
List<RemoteGpsInfo> list = new ArrayList<>();
List<RemoteDeviceBo> bases = new ArrayList<>();
Queues.drain(linkedBlockingDeque, list, 200, 5, TimeUnit.SECONDS);
Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS);
log.info("batch size={}", list.size());
log.info("basedata size={}", bases.size());
if(CollectionUtil.isNotEmpty(list)) {
gpsService.saveDataBatch(list);
}
if(CollectionUtil.isNotEmpty(bases)) {
deviceService.batchSaveDevice(bases);
}
} catch (Exception e) {
log.error("缓存队列批量消费异常:{}", e.getMessage());
}

View File

@ -1,98 +0,0 @@
package org.dromara.kafka.consumer.handler;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.dromara.data2es.api.RemoteDataToEsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-09-06 16:39
*/
public class KafkaConsumerRunnable implements Runnable {
private Map props;
private ThreadPoolExecutor taskExecutor;
private String cityCode;
private Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnable.class);
public KafkaConsumerRunnable(Map props, ThreadPoolExecutor taskExecutor,
String cityCode) {
this.props = props;
this.taskExecutor = taskExecutor;
this.cityCode = cityCode;
}
private DefaultKafkaConsumerFactory buildConsumerFactory(){
return new DefaultKafkaConsumerFactory<String, String>(props);
}
private ContainerProperties containerProperties(String[] topic, MessageListener<String, Object> messageListener) {
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(messageListener);
return containerProperties;
}
private KafkaListenerContainerFactory buildListenerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(buildConsumerFactory());
factory.setConcurrency(4);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Override
public void run() {
KafkaConsumer<String,Object> consumer = new KafkaConsumer<>(props);
List topics = (List) props.get("topics");
consumer.subscribe(topics);
consumer.poll(0); // 令订阅生效
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
for (Object topic : topics) {
String topic1 = (String) topic;
List<PartitionInfo> partitionInfos = stringListMap.get(topic1);
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition());
topicPartitions.add(partition);
}
}
consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Object> record : records) {
taskExecutor.submit(new ConsumerWorker(record, cityCode));
}
}
}
}

View File

@ -1,108 +0,0 @@
package org.dromara.kafka.consumer.handler;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import org.dromara.kafka.consumer.entity.EsGpsInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-10-28 14:48
*/
public class KafkaSecurityUtil {
static Logger logger = LoggerFactory.getLogger(KafkaSecurityUtil.class);
public static void main(String[] args) {
EsGpsInfo esGpsInfo = new EsGpsInfo();
String realtime = "2021/11/04 12:00:11";
DateTime dateTime = DateUtil.parse(realtime);
esGpsInfo.setGpsTime(dateTime.toJdkDate());
logger.info("esGpsInfo:{},deviceType={},gpsTime={}",esGpsInfo.toString(),
esGpsInfo.getDeviceType(),dateTime.toJdkDate().toString());
}
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM";
public static void securityPrepare() throws IOException
{
logger.error("进入了---securityPrepare");
//String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
//String krbFile = filePath + "krb5.conf";
//ClassPathResource classPathResource = new ClassPathResource("krb5.conf");
//String krbFile = classPathResource.getAbsolutePath();
String krbFile = "/gpsstore/krb5.conf";
// String userKeyTableFile = filePath + USER_KEYTAB_FILE;
//ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE);
String userKeyTableFile = "/gpsstore/user.keytab";
//windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
logger.error("userKeyTableFile路径---{}",userKeyTableFile);
LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile);
}
public static Boolean isSecurityModel()
{
Boolean isSecurity = false;
//String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
//ClassPathResource classPathResource = new ClassPathResource("kafkaSecurityMode");
InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafkaSecurityMode");
/*File file = classPathResource.getFile();
if(!file.exists()){
return isSecurity;
}*/
Properties securityProps = new Properties();
try
{
securityProps.load(inputStream);
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
{
isSecurity = true;
}
}
catch (Exception e)
{
logger.info("The Exception occured : {}.", e);
}
return isSecurity;
}
/*
*
*/
private static boolean isFileExists(String fileName)
{
File file = new File(fileName);
return file.exists();
}
}

View File

@ -1,215 +0,0 @@
package org.dromara.kafka.consumer.handler;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-10-28 15:40
*/
public class LoginUtil
{
public enum Module
{
STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client");
private String name;
private Module(String name)
{
this.name = name;
}
public String getName()
{
return name;
}
}
/**
* line operator string
*/
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
/**
* jaas file postfix
*/
private static final String JAAS_POSTFIX = ".jaas.conf";
/**
* is IBM jdk or not
*/
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
/**
* IBM jdk login module
*/
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
/**
* oracle jdk login module
*/
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
/**
* Zookeeper quorum principal.
*/
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
/**
* java security krb5 file path
*/
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
/**
* java security login file path
*/
public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
/**
* jaas.conf
*
* @param principal
* @param keytabPath
* @throws IOException
*/
public static void setJaasFile(String principal, String keytabPath)
throws IOException
{
String jaasPath =
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ JAAS_POSTFIX;
// windows路径下分隔符替换
jaasPath = jaasPath.replace("\\", "\\\\");
// 删除jaas文件
deleteJaasFile(jaasPath);
writeJaasFile(jaasPath, principal, keytabPath);
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
}
/**
* zookeeperprincipal
*
* @param zkServerPrincipal
* @throws IOException
*/
public static void setZookeeperServerPrincipal(String zkServerPrincipal)
throws IOException
{
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
if (ret == null)
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
}
if (!ret.equals(zkServerPrincipal))
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
}
}
/**
* krb5
*
* @param krb5ConfFile
* @throws IOException
*/
public static void setKrb5Config(String krb5ConfFile)
throws IOException
{
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
if (ret == null)
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
}
if (!ret.equals(krb5ConfFile))
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
}
}
/**
* jaas
*
* @throws IOException
*
*/
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
throws IOException
{
FileWriter writer = new FileWriter(new File(jaasPath));
try
{
writer.write(getJaasConfContext(principal, keytabPath));
writer.flush();
}
catch (IOException e)
{
throw new IOException("Failed to create jaas.conf File");
}
finally
{
writer.close();
}
}
private static void deleteJaasFile(String jaasPath)
throws IOException
{
File jaasFile = new File(jaasPath);
if (jaasFile.exists())
{
if (!jaasFile.delete())
{
throw new IOException("Failed to delete exists jaas file.");
}
}
}
private static String getJaasConfContext(String principal, String keytabPath)
{
Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder();
for (Module modlue : allModule)
{
builder.append(getModuleContext(principal, keytabPath, modlue));
}
return builder.toString();
}
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module)
{
StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK)
{
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
else
{
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
builder.append("storeKey=true").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
return builder.toString();
}
}

View File

@ -1,130 +0,0 @@
package org.dromara.kafka.consumer.handler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.dromara.kafka.consumer.config.KafkaPropertiesConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-09-06 11:15
*/
@Component
public class RealConsumer implements CommandLineRunner {
private String kafkaServers;
private String groupId;
private String topics;
private String cityCode = "3400";
@Autowired
KafkaPropertiesConfig kafkaPropertiesConfig;
@Autowired
ThreadPoolExecutor dtpExecutor2;
private Logger logger = LoggerFactory.getLogger(RealConsumer.class);
@Override
public void run(String... args) throws Exception {
kafkaServers = "127.0.0.1:9092";
topics = "topic.send.2,topic.send.3,topic.send.4,topic.send.5,topic.send.8";
groupId = "group_ruansi_xuancheng";
cityCode = "3418";
if(args.length > 0){
/*kafkaServers = args[0];
topics = args[1];
groupId = args[2];
cityCode = args[3];*/
}
ExecutorService executorService = Executors.newSingleThreadExecutor();
Map kafkaProp = getKafkaProp();
if (KafkaSecurityUtil.isSecurityModel())
{
try
{
logger.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
//认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
kafkaProp.put("security.protocol","SASL_PLAINTEXT");
//服务名
kafkaProp.put("sasl.kerberos.service.name","kafka");
//域名
kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com");
KafkaSecurityUtil.securityPrepare();
}
catch (IOException e)
{
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);
return;
}
logger.info("Security prepare success.");
}
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
executorService.execute(runnable);
}
/**
* kafka
* @return
*/
private Map<String, Object> getKafkaProp() {
// Properties map = new Properties();
Map<String, Object> map = new HashMap<>();
map.put("bootstrap.servers",kafkaServers);
map.put("group.id",groupId);
map.put("enable.auto.commit", "true");
map.put("auto.commit.interval.ms", "1000");
map.put("session.timeout.ms", "30000");
map.put("key.deserializer", StringDeserializer.class);
map.put("value.deserializer", StringDeserializer.class);
map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
// map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1000 * 5);
// map.put("ack.mode", "manual_immediate");
// //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
// map.put("security.protocol","SASL_PLAINTEXT");
// //服务名
// map.put("sasl.kerberos.service.name","kafka");
// //域名
// map.put("kerberos.domain.name","hadoop.hadoop.com");
String[] split = topics.split(",");
List list = CollectionUtils.arrayToList(split);
map.put("topics", list);
return map;
}
}

View File

@ -30,3 +30,4 @@ spring:
config:
import:
- optional:nacos:application-common.yml
- optional:nacos:${spring.application.name}.yml

View File

@ -68,7 +68,7 @@ public class ElasticsearchConfig {
RestClientBuilder builder = RestClient.builder(httpHost);
// 设置用户名、密码
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
// 连接延时配置
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(connectTimeOut);

View File

@ -92,8 +92,8 @@ public class DataToEsController extends BaseController {
esGpsInfo.setInfoSource("3401");
esGpsInfo.setGpsTime(new Date());
esGpsInfo.setLat("31.1" + (a + i));
esGpsInfo.setLng("117.2" + (b + i));
esGpsInfo.setLatitude("31.1" + (a + i));
esGpsInfo.setLongitude("117.2" + (b + i));
esGpsInfo.setZzjgdm("340100000000");
esGpsInfo.setZzjgmc("合肥市公安局");
esGpsInfo.setCarNum("霍邱看守所01");

View File

@ -15,8 +15,8 @@ public class EsGpsInfo implements Serializable {
*/
private String deviceCode;
private String deviceType;
private String lat;
private String lng;
private String latitude;
private String longitude;
//方向
private String orientation;
//高程

View File

@ -1,5 +1,6 @@
package org.dromara.data2es.dubbo;
import cn.hutool.core.bean.BeanUtil;
import lombok.RequiredArgsConstructor;
import org.apache.dubbo.config.annotation.DubboService;
import org.dromara.common.core.domain.R;
@ -21,6 +22,6 @@ public class RemoteDataToEsServiceImpl implements RemoteDataToEsService {
@Override
public R saveDataBatch(List<RemoteGpsInfo> gpsInfoList) {
return gpsService.saveDataBatch(MapstructUtils.convert(gpsInfoList, EsGpsInfoVO2.class));
return gpsService.saveDataBatch(BeanUtil.copyToList(gpsInfoList, EsGpsInfoVO2.class));
}
}

View File

@ -29,6 +29,7 @@ import org.springframework.scheduling.annotation.Async;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Configuration
public class RequestHandler {
@ -93,6 +94,18 @@ public class RequestHandler {
RedisUtils.batchInsert(map,time);
}
/**
* 线
* @param map
*/
public void batchPut(Map<String,String> map){
RedisUtils.batchPut(map);
}
public void batchPutWithExpire(Map<String,String> map,long time){
RedisUtils.batchPutWithExpire(map,time, TimeUnit.SECONDS);
}
@Async
public void redisDeleteBatch(List<String> deleteKeys){
RedisUtils.deleteObject(deleteKeys);

View File

@ -45,9 +45,9 @@ public class GpsTaskTest {
map.put("gpsTime",new Date());
map.put("locationDesc","合肥市公安局");
esGpsInfo.setLat("31.3" + (a + i));
esGpsInfo.setLatitude("31.3" + (a + i));
map.put("lat","31." + (a + i));
esGpsInfo.setLng("117.2" + (b + i));
esGpsInfo.setLongitude("117.2" + (b + i));
map.put("lng","117." + (b + i));
//gpsService.saveData(map);

View File

@ -4,12 +4,14 @@ import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang.StringUtils;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.common.core.domain.R;
import org.dromara.common.core.utils.RedisConstants;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.data2es.domain.EsGpsInfo;
import org.dromara.data2es.domain.EsGpsInfoVO2;
import org.dromara.data2es.domain.entity.GpsInfoEntity;
@ -63,10 +65,10 @@ public class GpsServiceImpl implements IGpsService {
double lng ;
double lat ;
try {
lng = Double.parseDouble(esGpsInfo.getLng());
lat = Double.parseDouble(esGpsInfo.getLat());
lng = Double.parseDouble(esGpsInfo.getLongitude());
lat = Double.parseDouble(esGpsInfo.getLatitude());
}catch (NumberFormatException e){
throw new MyBusinessException("经纬度转double异常经度为"+esGpsInfo.getLng() +"纬度为"+esGpsInfo.getLat());
throw new MyBusinessException("经纬度转double异常经度为"+esGpsInfo.getLongitude() +"纬度为"+esGpsInfo.getLatitude());
}
gpsInfoEntity.setLocation(new Double[]{lng,lat});
@ -103,7 +105,6 @@ public class GpsServiceImpl implements IGpsService {
List<String> deleteKeys = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();
for (EsGpsInfoVO2 info : list) {
if(StringUtils.isBlank(info.getInfoSource())){
logger.info("infoSource 为空");
continue;
@ -112,7 +113,7 @@ public class GpsServiceImpl implements IGpsService {
info = getInfoByInfoSource(info);
//redis
buildRedisMap(info,onlineUserDataMap,orgCodeDataMap,deleteKeys);
// logger.error("接收数据={},deviceCode={},gpsTime={}",info,info.getDeviceCode(),info.getGpsTime());
IndexRequest indexRequest = buildEsIndexRequest(info);
bulkRequest.add(indexRequest);
@ -121,7 +122,9 @@ public class GpsServiceImpl implements IGpsService {
}
requestHandler.redisOnlineUserBatch(onlineUserDataMap, RedisConstants.REDIS_ONLINE_USER_NEVER_EXPIRE);
requestHandler.redisOnlineUserBatch(orgCodeDataMap, 300);
requestHandler.redisOnlineUserBatch(orgCodeDataMap, 600);
// requestHandler.batchPut(onlineUserDataMap);
// requestHandler.batchPutWithExpire(orgCodeDataMap,600);
requestHandler.redisDeleteBatch(deleteKeys);
requestHandler.esRealBulkSave(bulkRequest);
@ -138,33 +141,31 @@ public class GpsServiceImpl implements IGpsService {
private EsGpsInfoVO2 getInfoByInfoSource(EsGpsInfo esGpsInfo) {
EsGpsInfoVO2 esGpsInfoVO2 = new EsGpsInfoVO2();
BeanUtil.copyProperties(esGpsInfo,esGpsInfoVO2);
if(null == esGpsInfoVO2.getZzjgdm() || "".equals(esGpsInfoVO2.getZzjgdm())){
RemoteDeviceVo vo = deviceService.getDeviceInfo(esGpsInfoVO2.getDeviceCode(),esGpsInfoVO2.getDeviceType());
if (null != vo){
esGpsInfoVO2.setZzjgdm(vo.getZzjgdm());
esGpsInfoVO2.setZzjgmc(vo.getZzjgmc());
esGpsInfoVO2.setPoliceName(vo.getPoliceName());
esGpsInfoVO2.setPoliceNo(vo.getPoliceNo());
esGpsInfoVO2.setCarNum(vo.getCarNum());
String deviceType = vo.getDeviceType();
if(StringUtils.isNotBlank(deviceType)){
deviceType = deviceType.replaceAll("\"", "");
if(deviceType.charAt(0) == '0' && deviceType.length() > 1){
deviceType = deviceType.substring(1);
if(deviceType.equals("1")){
deviceType = "2";
}
JSONObject object = RedisUtils.getBucket("deviceInfo:"+esGpsInfo.getInfoSource()+":"+esGpsInfo.getDeviceCode());
// RemoteDeviceVo vo = deviceService.getDeviceInfo(esGpsInfoVO2.getDeviceCode(),esGpsInfoVO2.getInfoSource());
if (null != object){
RemoteDeviceVo vo = BeanUtil.toBean(object,RemoteDeviceVo.class);
esGpsInfoVO2.setZzjgdm(vo.getZzjgdm());
esGpsInfoVO2.setZzjgmc(vo.getZzjgmc());
esGpsInfoVO2.setPoliceName(vo.getPoliceName());
esGpsInfoVO2.setPoliceNo(vo.getPoliceNo());
esGpsInfoVO2.setCarNum(vo.getCarNum());
String deviceType = vo.getDeviceType();
if(StringUtils.isNotBlank(deviceType)){
deviceType = deviceType.replaceAll("\"", "");
if(deviceType.charAt(0) == '0' && deviceType.length() > 1){
deviceType = deviceType.substring(1);
if(deviceType.equals("1")){
deviceType = "2";
}
}
esGpsInfoVO2.setDeviceType(deviceType);
}else {
esGpsInfoVO2.setDeviceType("99");
esGpsInfoVO2.setZzjgdm(esGpsInfo.getInfoSource()+"00000000");
}
esGpsInfoVO2.setDeviceType(deviceType);
}else {
esGpsInfoVO2.setDeviceType("99");
esGpsInfoVO2.setZzjgdm(esGpsInfo.getInfoSource()+"00000000");
}
return esGpsInfoVO2;
}
@ -246,10 +247,10 @@ public class GpsServiceImpl implements IGpsService {
double lng ;
double lat ;
try {
lng = Double.parseDouble(esGpsInfo.getLng());
lat = Double.parseDouble(esGpsInfo.getLat());
lng = Double.parseDouble(esGpsInfo.getLongitude());
lat = Double.parseDouble(esGpsInfo.getLatitude());
}catch (NumberFormatException e){
throw new MyBusinessException("经纬度转double异常经度为"+esGpsInfo.getLng() +"纬度为"+esGpsInfo.getLat());
throw new MyBusinessException("经纬度转double异常经度为"+esGpsInfo.getLongitude() +"纬度为"+esGpsInfo.getLatitude());
}
gpsInfoEntity.setLocation(new Double[]{lng,lat});

View File

@ -92,10 +92,8 @@ public class TDevice {
*/
private String remark2;
@TableField(fill = FieldFill.INSERT)
private String createTime;
@TableField(fill = FieldFill.INSERT_UPDATE)
private String updateTime;
private String lrdwdm;

View File

@ -57,10 +57,10 @@ public class RemoteDeviceImpl implements RemoteDeviceService {
}
@Override
public RemoteDeviceVo getDeviceInfo(String deviceCode, String deviceType) {
public RemoteDeviceVo getDeviceInfo(String deviceCode, String infoSource) {
TDeviceBo bo = new TDeviceBo();
bo.setDeviceCode(deviceCode);
bo.setDeviceType(deviceType);
bo.setInfoSource(infoSource);
return BeanUtil.toBean(deviceService.queryOne(bo), RemoteDeviceVo.class) ;
}
}

View File

@ -1,29 +1,71 @@
package org.dromara.system.schedule;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.RedisConstants;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.system.domain.DeviceRedis;
import org.dromara.system.domain.bo.TDeviceBo;
import org.dromara.system.domain.vo.TDeviceVo;
import org.dromara.system.service.IDeviceRedisService;
import org.dromara.system.service.ITDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@Slf4j
public class DeviceRedisSchedule {
@Autowired
IDeviceRedisService redisService;
@Autowired
ITDeviceService deviceService;
@Value("${deviceInfo.lastUpdateTime}")
private String lastUpdateTime;
/*
* Redis online_usert_device_redis
* */
@Scheduled(cron = "0/30 * * * * ?")
// @Scheduled(cron = "0/30 * * * * ?")
public void handleDeviceRedis(){
List<JSONObject> jlist = RedisUtils.searchAndGetKeysValues("online_users:*");
redisService.insertBatch(BeanUtil.copyToList(jlist, DeviceRedis.class));
}
// @Scheduled(cron = "0 0 0/1 * * ?")
public void handleDeviceInfoToRedis(){
if (null == lastUpdateTime || "".equals(lastUpdateTime)){
log.error("lastUpdateTime=null");
}
TDeviceBo bo = new TDeviceBo();
bo.setBeginTime(lastUpdateTime);
bo.setEndTime(DateUtil.formatDateTime(new Date()));
List<TDeviceVo> list = deviceService.queryList(bo);
if (list.size() >0){
lastUpdateTime = list.get(0).getUpdateTime();
}
Map<String, String> deviceInfoDataMap = new HashMap<>();
for (TDeviceVo vo : list) {
String jsonValue = JSONUtil.toJsonStr(vo);
String infoKey = "deviceInfo:" + vo.getInfoSource()+":"+vo.getDeviceCode();
deviceInfoDataMap.put(infoKey, jsonValue);
}
RedisUtils.batchInsert(deviceInfoDataMap,-1);
}
}

View File

@ -25,6 +25,7 @@ import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* deviceService
@ -128,6 +129,7 @@ public class TDeviceServiceImpl implements ITDeviceService {
lqw.eq(StringUtils.isNotBlank(bo.getRemark2()), TDevice::getRemark2, bo.getRemark2());
lqw.between(bo.getBeginTime() != null && bo.getEndTime() != null,
TDevice::getUpdateTime, bo.getBeginTime(), bo.getEndTime());
lqw.orderByDesc(TDevice::getUpdateTime);
return lqw;
}
@ -184,8 +186,44 @@ public class TDeviceServiceImpl implements ITDeviceService {
}
@Override
public Boolean batchSaveOrUpdate(List<TDevice> List) {
return baseMapper.insertOrUpdateBatch(List);
public Boolean batchSaveOrUpdate(List<TDevice> list) {
boolean flag = true;
// 先根据 field1 和 field2 查询出已存在的记录
List<TDevice> existingEntities = baseMapper.selectList(new QueryWrapper<TDevice>()
.in("device_code", list.stream().map(TDevice::getDeviceCode).collect(Collectors.toList()))
.in("info_source", list.stream().map(TDevice::getInfoSource).collect(Collectors.toList())));
// 找到需要更新的记录
List<TDevice> toUpdate = new ArrayList<>();
// 找到需要插入的记录
List<TDevice> toInsert = new ArrayList<>();
for (TDevice entity : list) {
boolean exists = false;
for (TDevice existingEntity : existingEntities) {
if (entity.getDeviceCode().equals(existingEntity.getDeviceCode()) && entity.getInfoSource().equals(existingEntity.getInfoSource())) {
entity.setId(existingEntity.getId()); // 设置 ID 以便更新
toUpdate.add(entity);
exists = true;
break;
}
}
if (!exists) {
toInsert.add(entity);
}
}
// 批量更新
if (!toUpdate.isEmpty()) {
flag = baseMapper.updateBatchById(toUpdate);
}
// 批量插入
if (!toInsert.isEmpty()) {
flag = baseMapper.insertBatch(toInsert); // insertBatchSomeColumn 是 MyBatis-Plus 提供的批量插入方法
}
return flag;
// return baseMapper.insertOrUpdateBatch(List);
}
@Override

View File

@ -35,15 +35,14 @@ spring.application.name=ruoyi-nacos
### Deprecated configuration property, it is recommended to use `spring.sql.init.platform` replaced.
# spring.datasource.platform=mysql
nacos.plugin.datasource.log.enabled=false
spring.sql.init.platform=postgresql
spring.sql.init.platform=mysql
### Count of DB:
db.num=1
### Connect URL of DB:
db.url.0=jdbc:postgresql://localhost:5432/ypc-config?tcpKeepAlive=true&reWriteBatchedInserts=true&ApplicationName=ruoyi-nacos
db.user.0=postgres
db.password.0=ycgis
db.pool.config.driverClassName=org.postgresql.Driver
db.url.0=jdbc:mysql://127.0.0.1:3306/ry-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
db.user.0=root
db.password.0=root
### the maximum retry times for push
nacos.config.push.maxRetryTime=50