安庆认证版

stwzhj
luyya 2025-09-01 16:09:47 +08:00
parent 3bd524964b
commit 81187efdbd
26 changed files with 618 additions and 230 deletions

View File

@ -88,6 +88,10 @@ public class RemoteDeviceBo implements Serializable {
*/
private String remark2;
private String gbbm;
private String tdbm;
private String lrdwdm;
private String lrdwmc;

View File

@ -18,83 +18,18 @@
<dependencies>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-nacos</artifactId>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-sentinel</artifactId>
</dependency>
<!-- RuoYi Common Log -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-log</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-dict</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-doc</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-web</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-seata</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-idempotent</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-tenant</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-security</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-translation</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-sensitive</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-encrypt</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-api-data2es</artifactId>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--动态线程池-->
<dependency>
<groupId>cn.dynamictp</groupId>

View File

@ -1,13 +1,13 @@
package org.dromara.data2kafka;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableDubbo
@EnableScheduling
@SpringBootApplication
public class Data2KafkaApplication {

View File

@ -0,0 +1,172 @@
package org.dromara.data2kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.dromara.data2kafka.config.KafkaProperties;
import org.dromara.data2kafka.config.LoginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@Component
public class Consumer extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private final KafkaConsumer<String, String> consumer;
private volatile boolean closed;
// 一次请求的最大等待时间(S)
private final int waitTime = 1;
// Broker连接地址
private final static String BOOTSTRAP_SERVER = "bootstrap.servers";
// Group id
private final static String GROUP_ID = "group.id";
// 消息内容使用的反序列化类
private final static String VALUE_DESERIALIZER = "value.deserializer";
// 消息Key值使用的反序列化类
private final static String KEY_DESERIALIZER = "key.deserializer";
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
private final static String SECURITY_PROTOCOL = "security.protocol";
// 服务名
private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
// 域名
private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name";
// 是否自动提交offset
private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit";
// 自动提交offset的时间间隔
private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
// 会话超时时间
private final static String SESSION_TIMEOUT_MS = "session.timeout.ms";
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM";
/**
* Consumer
*
* @param
*/
public Consumer() {
initSecurity();
Properties props = initProperties();
this.consumer = new KafkaConsumer<>(props);
}
public static Properties initProperties() {
Properties props = new Properties();
KafkaProperties kafkaProc = KafkaProperties.getInstance();
// Broker连接地址
props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
// Group id
props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));
// 是否自动提交offset
props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true"));
// 自动提交offset的时间间隔
props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000"));
// 会话超时时间
props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000"));
// 消息Key值使用的反序列化类
props.put(KEY_DESERIALIZER,
kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
// 消息内容使用的反序列化类
props.put(VALUE_DESERIALIZER,
kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
// 安全协议类型
props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
// 服务名
props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
// 域名
props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
return props;
}
/**
* Topic
*/
public void run() {
while (!closed) {
try {
// 消息消费请求
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(waitTime));
// 消息处理
for (ConsumerRecord<String, String> record : records) {
LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value()
+ ") at offset " + record.offset());
}
} catch (AuthorizationException | UnsupportedVersionException
| RecordDeserializationException e) {
LOG.error(e.getMessage());
// 无法从异常中恢复
closeThread();
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
LOG.error("Invalid or no offset found, using latest");
consumer.seekToEnd(e.partitions());
consumer.commitSync();
} catch (KafkaException e) {
LOG.error(e.getMessage());
}
}
}
public void closeThread() {
if (!closed) {
closed = true;
}
}
/**
*
*/
public void initSecurity() {
if (org.dromara.data2kafka.config.LoginUtil.isSecurityModel())
{
try {
LOG.info("Securitymode start.");
// !!注意,安全认证时,需要用户手动修改为自己申请的机机账号
LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
} catch (IOException e) {
LOG.error("Security prepare failure.");
LOG.error("The IOException occured.", e);
}
LOG.info("Security prepare success.");
}
}
}

View File

@ -8,12 +8,11 @@ import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dromara.data2es.api.domain.RemoteGpsInfo;
import org.dromara.data2kafka.domain.EsGpsInfo;
import org.dromara.data2kafka.domain.EsGpsInfoVO;
import org.dromara.data2kafka.producer.NewProducer;
import org.dromara.data2kafka.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -118,7 +117,7 @@ public class ConsumerWorker implements Runnable {
String topic = record.topic();
// logger.info("offset={},topic={},value={}", record.offset(), topic,value);
RemoteGpsInfo esGpsInfo;
EsGpsInfo esGpsInfo;
JSONObject jsonObject;
try {
jsonObject = JSONUtil.parseObj(((String) value));
@ -127,7 +126,7 @@ public class ConsumerWorker implements Runnable {
return;
}
try {
esGpsInfo = JSONUtil.toBean(jsonObject, RemoteGpsInfo.class);
esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class);
}catch (ConvertException e){
logger.info("EsGpsInfo=null:error={}",e.getMessage());
return;

View File

@ -0,0 +1,259 @@
package org.dromara.data2kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
public class LoginUtil {
private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);
/**
* no JavaDoc
*/
public enum Module {
KAFKA("KafkaClient"), ZOOKEEPER("Client");
private String name;
private Module(String name)
{
this.name = name;
}
public String getName()
{
return name;
}
}
/**
* line operator string
*/
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
/**
* jaas file postfix
*/
private static final String JAAS_POSTFIX = ".jaas.conf";
/**
* is IBM jdk or not
*/
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
/**
* IBM jdk login module
*/
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
/**
* oracle jdk login module
*/
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
/**
* Zookeeper quorum principal.
*/
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
/**
* java security krb5 file path
*/
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
/**
* java security login file path
*/
public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
/**
* jaas.conf
*
* @param principal
* @param keytabPath
* @throws IOException
*/
public static void setJaasFile(String principal, String keytabPath)
throws IOException {
String jaasPath =
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ JAAS_POSTFIX;
// windows路径下分隔符替换
jaasPath = jaasPath.replace("\\", "\\\\");
// 删除jaas文件
deleteJaasFile(jaasPath);
writeJaasFile(jaasPath, principal, keytabPath);
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
}
/**
* zookeeperprincipal
*
* @param zkServerPrincipal
* @throws IOException
*/
public static void setZookeeperServerPrincipal(String zkServerPrincipal)
throws IOException {
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
if (ret == null)
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
}
if (!ret.equals(zkServerPrincipal))
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
}
}
/**
* krb5
*
* @param krb5ConfFile
* @throws IOException
*/
public static void setKrb5Config(String krb5ConfFile)
throws IOException {
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
if (ret == null)
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
}
if (!ret.equals(krb5ConfFile))
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
}
}
/**
* jaas
*
* @throws IOException
*
*/
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
throws IOException {
FileWriter writer = new FileWriter(new File(jaasPath));
try
{
writer.write(getJaasConfContext(principal, keytabPath));
writer.flush();
}
catch (IOException e)
{
throw new IOException("Failed to create jaas.conf File");
}
finally
{
writer.close();
}
}
private static void deleteJaasFile(String jaasPath)
throws IOException {
File jaasFile = new File(jaasPath);
if (jaasFile.exists())
{
if (!jaasFile.delete())
{
throw new IOException("Failed to delete exists jaas file.");
}
}
}
private static String getJaasConfContext(String principal, String keytabPath) {
Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder();
for (Module modlue : allModule)
{
builder.append(getModuleContext(principal, keytabPath, modlue));
}
return builder.toString();
}
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK) {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
} else {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
builder.append("storeKey=true").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
return builder.toString();
}
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/shengting/gpsstore/";
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + keyTabFile;
// windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
LoginUtil.setJaasFile(principal, userKeyTableFile);
}
/**
* Check security mode
*
* @return boolean
*/
public static Boolean isSecurityModel() {
Boolean isSecurity = false;
// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
String krbFilePath = "/shengting/gpsstore/kafkaSecurityMode";
Properties securityProps = new Properties();
// file does not exist.
if (!isFileExists(krbFilePath)) {
return isSecurity;
}
try {
securityProps.load(new FileInputStream(krbFilePath));
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
{
isSecurity = true;
}
} catch (Exception e) {
LOG.info("The Exception occured : {}.", e);
}
return isSecurity;
}
/*
*
*/
private static boolean isFileExists(String fileName) {
File file = new File(fileName);
return file.exists();
}
}

View File

@ -3,7 +3,6 @@ package org.dromara.data2kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.dromara.data2kafka.config.LoginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -11,7 +10,6 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@ -41,6 +39,16 @@ public class RealConsumer implements CommandLineRunner {
private String cityCode = "3400";
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM";
@Autowired
ThreadPoolExecutor dtpExecutor2;
@ -64,11 +72,11 @@ public class RealConsumer implements CommandLineRunner {
Map kafkaProp = getKafkaProp();
checkNetworkConnection("53.1.213.25",21007);
if (false)
if (LoginUtil.isSecurityModel())
{
try
{
logger.info("Securitymode start.");
logger.info("consumer Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
//认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
@ -77,7 +85,8 @@ public class RealConsumer implements CommandLineRunner {
kafkaProp.put("sasl.kerberos.service.name","kafka");
//域名
kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com");
LoginUtil.setJaasFile("","");
LoginUtil.securityPrepare(USER_PRINCIPAL,USER_KEYTAB_FILE);
// LoginUtil.setJaasFile("","");
}
catch (IOException e)
{
@ -87,11 +96,10 @@ public class RealConsumer implements CommandLineRunner {
}
logger.info("Security prepare success.");
}
kafkaProp.put("security.protocol","SASL_PLAINTEXT");
// System.setProperty("java.security.auth.login.config","/gpsstore/kafka_client_scram_consumer_jaas.conf");
kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-2024\";");
kafkaProp.put("sasl.mechanism", "PLAIN");
/*kafkaProp.put("security.protocol", "SASL_PLAINTEXT");
kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2024\";");
kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");*/
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
executorService.execute(runnable);
}

View File

@ -1,66 +0,0 @@
package org.dromara.data2kafka.producer;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-01 17:20
*/
//@Component
public class NewProducer {
@Autowired
@Resource(name = "myKafkaProducer")
KafkaProducer kafkaProducer;
private Logger LOG = LoggerFactory.getLogger(NewProducer.class);
/**
* 线
*/
public void send(Object obj,String topic) {
String obj2String = JSONObject.toJSONString(obj);
// 构造消息记录
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, obj2String);
try {
// 同步发送
Object o = kafkaProducer.send(record).get();
LOG.info("同步发送成功: Object={}", JSONObject.toJSONString(o));
} catch (InterruptedException ie) {
ie.printStackTrace();
LOG.error("The InterruptedException occured : {}.", ie);
} catch (ExecutionException ee) {
ee.printStackTrace();
LOG.error("The ExecutionException occured : {}.", ee);
}
/*kafkaProducer.send(record, (recordMetadata, e) -> {
if (e != null) {
LOG.error("send--The Exception occured.", e);
}
if (recordMetadata != null)
{
LOG.info("sent to partition(" + recordMetadata.partition() + "), "
+ "offset(" + recordMetadata.offset()+"),topic="+recordMetadata.topic());
}
});*/
}
}

View File

@ -12,23 +12,3 @@ spring:
active: @profiles.active@
autoconfigure:
exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
--- # nacos 配置
spring:
cloud:
nacos:
# nacos 服务地址
server-addr: @nacos.server@
username: @nacos.username@
password: @nacos.password@
discovery:
# 注册组
group: @nacos.discovery.group@
namespace: ${spring.profiles.active}
config:
# 配置组
group: @nacos.config.group@
namespace: ${spring.profiles.active}
config:
import:
- optional:nacos:application-common.yml
- optional:nacos:${spring.application.name}.yml

View File

@ -1,6 +1,7 @@
package org.dromara.data2es.controller;
import cn.hutool.core.bean.BeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.common.core.domain.R;
import org.dromara.common.web.core.BaseController;
@ -21,6 +22,7 @@ import java.util.Objects;
@RequestMapping("device")
@RestController
@Slf4j
public class DeviceInfoController extends BaseController {
@DubboReference
@ -34,6 +36,10 @@ public class DeviceInfoController extends BaseController {
return R.fail("参数为空");
}
Object dataList = params.get("dataList");
Object infoSource = params.get("infoSource");
if(Objects.isNull(infoSource)){
return R.fail("参数 [infoSource] 为空");
}
if(Objects.isNull(dataList)){
return R.fail("参数 [dataList] 为空");
}
@ -42,7 +48,10 @@ public class DeviceInfoController extends BaseController {
return R.fail("单次数据超过了100条");
}
List<RemoteDeviceBo> list = BeanUtil.copyToList(dataList1, RemoteDeviceBo.class);
for (RemoteDeviceBo deviceBo : list) {
deviceBo.setInfoSource(params.get("infoSource").toString());
}
log.error("插入设备记录={}",list.toString());
boolean inserted = deviceService.batchSaveDevice(list);
if(inserted) {

View File

@ -96,7 +96,7 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener {
String deviceType = split[2];
String deviceCode = split[3];
if ("5".equals(deviceType) ) {
if ("05".equals(deviceType) ) {
return;
}

View File

@ -125,7 +125,7 @@ public class GpsServiceImpl implements IGpsService {
}
requestHandler.redisOnlineUserBatch(onlineUserDataMap, 2592000); //存放30天
requestHandler.redisOnlineUserBatch(orgCodeDataMap, 3600); //此处和buildRedisMap方法判断在线的时间一直
requestHandler.redisOnlineUserBatch(orgCodeDataMap, 1800); //此处和buildRedisMap方法判断在线的时间一直
// requestHandler.redisDeleteBatch(deleteKeys); //此处换成了数据库的模式 所以这个就不用了
requestHandler.esRealBulkSave(bulkRequest);
@ -317,7 +317,7 @@ public class GpsServiceImpl implements IGpsService {
}
if (!Objects.isNull(gpsTime)) {
if (DateUtil.between(gpsTime, new Date(), DateUnit.SECOND) <= 3600L) {
if (DateUtil.between(gpsTime, new Date(), DateUnit.SECOND) <= 1800L) {
if (null == esGpsInfoVo2.getOnline()){
esGpsInfoVo2.setOnline(1);

View File

@ -49,52 +49,20 @@ public class IndexStaticsController extends BaseController {
* */
@PostMapping("/onLineBar")
public R onLineBar(){
List<SysDeptVo> deptVoList = deptService.getDsList();
List<DeviceStaticsVo> staticsVoList = deviceService.countByDs();
List<DeviceStaticsVo> list = new ArrayList<>(); //用来接收处理后的统计结果
for (SysDeptVo deptVo : deptVoList) {
boolean bl = false; //用来统计结果是否有当前这个机构
for (DeviceStaticsVo staticsVo : staticsVoList) {
String deptId = staticsVo.getZzjgdm()+"00000000";
if (deptId.equals(deptVo.getDeptId())){
staticsVo.setZzjgdm(deptId);
staticsVo.setZzjgmc(deptVo.getDeptName().replaceAll("公安局",""));
int onlineCo = RedisUtils.searchKeys("org_code:"+staticsVo.getZzjgdm()+"*");
staticsVo.setOnlineCo(onlineCo);
list.add(staticsVo);
bl = true;
break;
}
}
if (!bl){
DeviceStaticsVo staticsVo = new DeviceStaticsVo();
staticsVo.setZzjgdm(deptVo.getDeptId());
staticsVo.setZzjgmc(deptVo.getDeptName().replaceAll("公安局",""));
staticsVo.setCo(0);
staticsVo.setOnlineCo(0);
list.add(staticsVo);
}
}
List<DeviceStaticsVo> list = redisService.dsqk();
return R.ok(list);
}
/*
* Code线
*
* */
@GetMapping("/dsOnlineCount")
public R dsOnlineCount(String code){
TDeviceBo bo = new TDeviceBo();
bo.setInfoSource(code);
Long co = deviceService.countByCondition(bo);
int onlineCo = 0;
if (null == code || "".equals(code)){
onlineCo = RedisUtils.searchKeys("org_code:*");
}else {
onlineCo = RedisUtils.searchKeys("org_code:"+code+"*");
}
DeviceStaticsVo vo = redisService.qszl();
HashMap map = new HashMap();
map.put("co",co);
map.put("onlineCo",onlineCo);
map.put("co",vo.getCo());
map.put("onlineCo",vo.getOnlineCo());
return R.ok(map);
}

View File

@ -92,6 +92,10 @@ public class TDevice {
*/
private String remark2;
private String gbbm;
private String tdbm;
@TableField(fill = FieldFill.INSERT)
private String createTime;

View File

@ -88,6 +88,10 @@ public class TDeviceBo extends BaseEntity {
*/
private String remark1;
private String gbbm;
private String tdbm;
/**
* 2
*/

View File

@ -14,14 +14,12 @@ public class DeviceRedisVo {
private String deviceType;
private String online;
private String zzjgdm;
private String typeName;
private Integer co;
private Integer onlien;
private Integer online;
}

View File

@ -116,6 +116,10 @@ public class TDeviceVo implements Serializable {
@ExcelProperty(value = "备注字段2")
private String remark2;
private String gbbm;
private String tdbm;
private String createTime;

View File

@ -3,6 +3,7 @@ package org.dromara.system.dubbo;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboService;
import org.dromara.common.core.domain.R;
import org.dromara.common.core.utils.MapstructUtils;
@ -20,17 +21,29 @@ import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@RequiredArgsConstructor
@Service
@DubboService
@Slf4j
public class RemoteDeviceImpl implements RemoteDeviceService {
private final ITDeviceService deviceService;
@Override
public boolean batchSaveDevice(List<RemoteDeviceBo> boList) {
List<TDevice> devices = BeanUtil.copyToList(boList, TDevice.class);
List<TDevice> devices = boList.stream().map(bo -> {
log.info("RemoteDeviceBo gbbm = " + bo.getGbbm());
// 先转成 Map
Map<String, Object> map = BeanUtil.beanToMap(bo);
log.info("Map gbbm = " + map.get("gbbm")); // 看 map 里有没有
// 再从 Map 转 TDevice
TDevice device = BeanUtil.toBean(map, TDevice.class);
return device;
}).collect(Collectors.toList());
boolean flag = deviceService.batchSaveOrUpdate(devices);
return flag;
}

View File

@ -14,4 +14,6 @@ public interface DeviceRedisMapper extends BaseMapperPlus<DeviceRedis,DeviceRedi
List<DeviceRedisVo> countByCondition(DeviceRedis redis);
List<DeviceStaticsVo> dsStatics();
DeviceStaticsVo qszl();
}

View File

@ -19,4 +19,6 @@ public interface TDeviceMapper extends BaseMapperPlus<TDevice, TDeviceVo> {
List<DeviceStaticsVo> countByDsAndType();
boolean insertOrUpdateByUpsert(List<TDevice> list);
}

View File

@ -12,4 +12,8 @@ public interface IDeviceRedisService {
List<DeviceRedisVo> countByCondition(DeviceRedis redis);
List<DeviceStaticsVo> dsStatics();
List<DeviceStaticsVo> dsqk();
DeviceStaticsVo qszl();
}

View File

@ -71,7 +71,7 @@ public interface ITDeviceService {
*/
Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid);
Boolean batchSaveOrUpdate(List<TDevice> List);
Boolean batchSaveOrUpdate(List<TDevice> list);
List<DeviceStaticsVo> countByDs();

View File

@ -116,6 +116,16 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService {
return baseMapper.countByCondition(redis);
}
@Override
public List<DeviceStaticsVo> dsqk() {
return baseMapper.dsStatics();
}
@Override
public DeviceStaticsVo qszl() {
return baseMapper.qszl();
}
@Override
public List<DeviceStaticsVo> dsStatics() {
//1、查询各地市终端总数和在线数
@ -128,10 +138,13 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService {
for (DeviceStaticsVo vo : list) {
for (DeviceStaticsVo staticsVo : dsvo) {
//如果 vo的机构代码和staticsVo一样 就查询字典值并设置到对应的字段
if (vo.getZzjgdm().equals(staticsVo.getZzjgdm())){
if (vo.getZzjgdm().equals(staticsVo.getZzjgdm()+"00000000")){
String deviceType = staticsVo.getDeviceType();
if(null == deviceType){
continue;
}
for (SysDictDataVo dataVo : dictDataVos) {
if (staticsVo.getDeviceType().equals(dataVo.getDictValue())){ //如果匹配设置值
if (dataVo.getDictValue().equals(deviceType)){ //如果匹配设置值
switch (deviceType){
case "01" :
vo.setJcco(staticsVo.getCo());
@ -145,11 +158,8 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService {
case "05" :
vo.setJlyco(staticsVo.getCo());
break;
case "99" :
vo.setQtco(staticsVo.getCo());
break;
default:
vo.setQtco(staticsVo.getCo()+vo.getQtco());
vo.setQtco(staticsVo.getCo());
}
}

View File

@ -2,6 +2,7 @@ package org.dromara.system.service.impl;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.common.core.domain.R;
import org.dromara.common.core.utils.MapstructUtils;
@ -34,6 +35,7 @@ import java.util.regex.Pattern;
*/
@RequiredArgsConstructor
@Service
@Slf4j
public class TDeviceServiceImpl implements ITDeviceService {
private final TDeviceMapper baseMapper;
@ -184,8 +186,9 @@ public class TDeviceServiceImpl implements ITDeviceService {
}
@Override
public Boolean batchSaveOrUpdate(List<TDevice> List) {
return baseMapper.insertOrUpdateBatch(List);
public Boolean batchSaveOrUpdate(List<TDevice> list) {
log.info("查询设备={}",list.toString());
return baseMapper.insertOrUpdateByUpsert(list);
}
@Override

View File

@ -24,6 +24,11 @@
(online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gps_time)
</insert>
<!-- 全省在线、总数统计-->
<select id="qszl" resultMap="deviceStaticsResult">
SELECT count(*) co ,(SELECT count(*) onlineCo FROM t_device_redis WHERE online = '1' ) onlineCo FROM t_device td WHERE "valid" = 1 and td.zzjgdm is not null
</select>
<!-- 全省各类设备总数、在线数 -->
<select id="countByCondition" resultMap="deviceRedisResult">
SELECT d.dict_label type_name,dict_value,COALESCE(r.co,0) online,COALESCE(td.co,0) co from sys_dict_data d
@ -50,7 +55,7 @@
<!-- 各地市设备总数和在线数 -->
<select id="dsStatics" resultMap="deviceStaticsResult">
SELECT d.dept_id zzjgdm,short_name zzjgmc,COALESCE(td.co,0) co,COALESCE(rd.online,0) online FROM
SELECT d.dept_id zzjgdm,short_name zzjgmc,COALESCE(td.co,0) co,COALESCE(rd.online,0) onlineCo FROM
sys_dept d
LEFT JOIN
(SELECT substr(zzjgdm, 1, 4) dept_id,count(*) co from (SELECT * FROM t_device
@ -60,10 +65,10 @@
on substr(d.dept_id,1,4) = td.dept_id
LEFT JOIN
(SELECT substr(zzjgdm, 1, 4) dept_id,count(*) online from (SELECT * FROM t_device_redis
(SELECT info_source dept_id,count(*) online from (SELECT * FROM t_device_redis
WHERE online = '1'
) r
GROUP BY substr(zzjgdm,1, 4) ) rd
GROUP BY info_source ) rd
on substr(d.dept_id,1,4) = rd.dept_id
WHERE d.parent_id = '0' and d.dept_id != '340000000000' ORDER BY zzjgdm
</select>

View File

@ -19,4 +19,75 @@
GROUP BY substr(zzjgdm,1, 4),device_type HAVING substr(zzjgdm,1,4) is not null ORDER BY zzjgdm
</select>
<insert id="insertOrUpdateByUpsert">
INSERT INTO t_device (
device_code, device_type, info_source, zzjgdm, zzjgmc,
police_no, police_name, remark1, remark2, create_time, update_time,
car_num, sbpp, sbxh, card_num, phone_num, gbbm, tdbm
) VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.deviceCode}, #{item.deviceType}, #{item.infoSource},
#{item.zzjgdm}, #{item.zzjgmc}, #{item.policeNo}, #{item.policeName},
#{item.remark1}, #{item.remark2},
COALESCE(#{item.createTime}, NOW()),
NOW(),
#{item.carNum}, #{item.sbpp}, #{item.sbxh},
#{item.cardNum}, #{item.phoneNum}, #{item.gbbm}, #{item.tdbm}
)
</foreach>
ON CONFLICT (device_code, info_source)
DO UPDATE SET
device_type = EXCLUDED.device_type,
zzjgdm = CASE
WHEN EXCLUDED.zzjgdm IS NULL OR EXCLUDED.zzjgdm = ''
THEN t_device.zzjgdm
ELSE EXCLUDED.zzjgdm
END,
zzjgmc = CASE
WHEN EXCLUDED.zzjgmc IS NULL OR EXCLUDED.zzjgmc = ''
THEN t_device.zzjgmc
ELSE EXCLUDED.zzjgmc
END,
police_no = CASE
WHEN EXCLUDED.police_no IS NULL OR EXCLUDED.police_no = ''
THEN t_device.police_no
ELSE EXCLUDED.police_no
END,
police_name = CASE
WHEN EXCLUDED.police_name IS NULL OR EXCLUDED.police_name = ''
THEN t_device.police_name
ELSE EXCLUDED.police_name
END,
remark1 = EXCLUDED.remark1,
remark2 = EXCLUDED.remark2,
update_time = NOW(),
car_num = CASE
WHEN EXCLUDED.car_num IS NULL OR EXCLUDED.car_num = ''
THEN t_device.car_num
ELSE EXCLUDED.car_num
END,
sbpp = EXCLUDED.sbpp,
sbxh = EXCLUDED.sbxh,
card_num = CASE
WHEN EXCLUDED.card_num IS NULL OR EXCLUDED.card_num = ''
THEN t_device.card_num
ELSE EXCLUDED.card_num
END,
phone_num = CASE
WHEN EXCLUDED.phone_num IS NULL OR EXCLUDED.phone_num = ''
THEN t_device.phone_num
ELSE EXCLUDED.phone_num
END,
gbbm = CASE
WHEN EXCLUDED.gbbm IS NULL OR EXCLUDED.gbbm = ''
THEN t_device.gbbm
ELSE EXCLUDED.gbbm
END,
tdbm = CASE
WHEN EXCLUDED.tdbm IS NULL OR EXCLUDED.tdbm = ''
THEN t_device.tdbm
ELSE EXCLUDED.tdbm
END
</insert>
</mapper>