diff --git a/stwzhj-api/stwzhj-api-system/src/main/java/org/dromara/system/api/domain/bo/RemoteDeviceBo.java b/stwzhj-api/stwzhj-api-system/src/main/java/org/dromara/system/api/domain/bo/RemoteDeviceBo.java index eead2ac0..11b25005 100644 --- a/stwzhj-api/stwzhj-api-system/src/main/java/org/dromara/system/api/domain/bo/RemoteDeviceBo.java +++ b/stwzhj-api/stwzhj-api-system/src/main/java/org/dromara/system/api/domain/bo/RemoteDeviceBo.java @@ -88,6 +88,10 @@ public class RemoteDeviceBo implements Serializable { */ private String remark2; + private String gbbm; + + private String tdbm; + private String lrdwdm; private String lrdwmc; diff --git a/stwzhj-modules/stwzhj-data2StKafka/pom.xml b/stwzhj-modules/stwzhj-data2StKafka/pom.xml index 0a2bf2a3..f17a578f 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/pom.xml +++ b/stwzhj-modules/stwzhj-data2StKafka/pom.xml @@ -18,83 +18,18 @@ - org.dromara - stwzhj-common-nacos + com.alibaba + fastjson - - org.dromara - stwzhj-common-sentinel - - - - - org.dromara - stwzhj-common-log - - - - org.dromara - stwzhj-common-dict - - - - org.dromara - stwzhj-common-doc - - - - org.dromara - stwzhj-common-web - - - - org.dromara - stwzhj-common-dubbo - - - - org.dromara - stwzhj-common-seata - - - - org.dromara - stwzhj-common-idempotent - - - - org.dromara - stwzhj-common-tenant - - - - org.dromara - stwzhj-common-security - - - - org.dromara - stwzhj-common-translation - - - - org.dromara - stwzhj-common-sensitive - - - - org.dromara - stwzhj-common-encrypt - - - - org.dromara - stwzhj-api-data2es + org.apache.commons + commons-lang3 + + cn.dynamictp diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/Data2KafkaApplication.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/Data2KafkaApplication.java index 8fa85496..1ab8f775 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/Data2KafkaApplication.java +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/Data2KafkaApplication.java @@ -1,13 +1,13 @@ package org.dromara.data2kafka; -import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; +import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration; import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; import org.springframework.scheduling.annotation.EnableScheduling; -@EnableDubbo @EnableScheduling @SpringBootApplication public class Data2KafkaApplication { diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/Consumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/Consumer.java new file mode 100644 index 00000000..9ed151d2 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/Consumer.java @@ -0,0 +1,172 @@ +package org.dromara.data2kafka.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.dromara.data2kafka.config.KafkaProperties; +import org.dromara.data2kafka.config.LoginUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +@Component +public class Consumer extends Thread { + private static final Logger LOG = LoggerFactory.getLogger(Consumer.class); + + private final KafkaConsumer consumer; + + + private volatile boolean closed; + + + // 一次请求的最大等待时间(S) + private final int waitTime = 1; + + // Broker连接地址 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // Group id + private final static String GROUP_ID = "group.id"; + + // 消息内容使用的反序列化类 + private final static String VALUE_DESERIALIZER = "value.deserializer"; + + // 消息Key值使用的反序列化类 + private final static String KEY_DESERIALIZER = "key.deserializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 是否自动提交offset + private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + + // 自动提交offset的时间间隔 + private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + + // 会话超时时间 + private final static String SESSION_TIMEOUT_MS = "session.timeout.ms"; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM"; + + /** + * Consumer构造函数 + * + * @param + */ + public Consumer() { + initSecurity(); + Properties props = initProperties(); + this.consumer = new KafkaConsumer<>(props); + } + + public static Properties initProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker连接地址 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // Group id + props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); + // 是否自动提交offset + props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); + // 自动提交offset的时间间隔 + props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); + // 会话超时时间 + props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); + // 消息Key值使用的反序列化类 + props.put(KEY_DESERIALIZER, + kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 消息内容使用的反序列化类 + props.put(VALUE_DESERIALIZER, + kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); + // 安全协议类型 + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + + return props; + } + + /** + * 订阅Topic的消息处理函数 + */ + public void run() { + while (!closed) { + try { + // 消息消费请求 + ConsumerRecords records = consumer.poll(Duration.ofSeconds(waitTime)); + // 消息处理 + for (ConsumerRecord record : records) { + LOG.info("[ConsumerExample], Received message: (" + record.key() + ", " + record.value() + + ") at offset " + record.offset()); + } + } catch (AuthorizationException | UnsupportedVersionException + | RecordDeserializationException e) { + LOG.error(e.getMessage()); + // 无法从异常中恢复 + closeThread(); + } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { + LOG.error("Invalid or no offset found, using latest"); + consumer.seekToEnd(e.partitions()); + consumer.commitSync(); + } catch (KafkaException e) { + LOG.error(e.getMessage()); + } + } + } + + public void closeThread() { + if (!closed) { + closed = true; + } + } + + /** + * 初始化安全认证 + */ + public void initSecurity() { + if (org.dromara.data2kafka.config.LoginUtil.isSecurityModel()) + { + try { + LOG.info("Securitymode start."); + + // !!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); + } catch (IOException e) { + LOG.error("Security prepare failure."); + LOG.error("The IOException occured.", e); + } + LOG.info("Security prepare success."); + } + } + +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java index 10845aaa..63b766e6 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java @@ -8,12 +8,11 @@ import cn.hutool.core.date.DateUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; -import org.apache.commons.lang.StringUtils; + +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.dromara.data2es.api.domain.RemoteGpsInfo; import org.dromara.data2kafka.domain.EsGpsInfo; import org.dromara.data2kafka.domain.EsGpsInfoVO; -import org.dromara.data2kafka.producer.NewProducer; import org.dromara.data2kafka.producer.Producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +117,7 @@ public class ConsumerWorker implements Runnable { String topic = record.topic(); // logger.info("offset={},topic={},value={}", record.offset(), topic,value); - RemoteGpsInfo esGpsInfo; + EsGpsInfo esGpsInfo; JSONObject jsonObject; try { jsonObject = JSONUtil.parseObj(((String) value)); @@ -127,7 +126,7 @@ public class ConsumerWorker implements Runnable { return; } try { - esGpsInfo = JSONUtil.toBean(jsonObject, RemoteGpsInfo.class); + esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class); }catch (ConvertException e){ logger.info("EsGpsInfo=null:error={}",e.getMessage()); return; diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/LoginUtil.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/LoginUtil.java new file mode 100644 index 00000000..83c63b1c --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/LoginUtil.java @@ -0,0 +1,259 @@ +package org.dromara.data2kafka.consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) + { + this.name = name; + } + + public String getName() + { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { + String jaasPath = + new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 + deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) + { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) + { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException + * 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try + { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } + catch (IOException e) + { + throw new IOException("Failed to create jaas.conf File"); + } + finally + { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) + { + if (!jaasFile.delete()) + { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) + { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/shengting/gpsstore/"; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; +// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; + String krbFilePath = "/shengting/gpsstore/kafkaSecurityMode"; + Properties securityProps = new Properties(); + + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) + { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return isSecurity; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java index 2d6ae101..6fdca5d8 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java @@ -3,7 +3,6 @@ package org.dromara.data2kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; -import org.dromara.data2kafka.config.LoginUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -11,7 +10,6 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import javax.annotation.Resource; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -41,6 +39,16 @@ public class RealConsumer implements CommandLineRunner { private String cityCode = "3400"; + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM"; + @Autowired ThreadPoolExecutor dtpExecutor2; @@ -64,11 +72,11 @@ public class RealConsumer implements CommandLineRunner { Map kafkaProp = getKafkaProp(); checkNetworkConnection("53.1.213.25",21007); - if (false) + if (LoginUtil.isSecurityModel()) { try { - logger.info("Securitymode start."); + logger.info("consumer Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT @@ -77,7 +85,8 @@ public class RealConsumer implements CommandLineRunner { kafkaProp.put("sasl.kerberos.service.name","kafka"); //域名 kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com"); - LoginUtil.setJaasFile("",""); + LoginUtil.securityPrepare(USER_PRINCIPAL,USER_KEYTAB_FILE); +// LoginUtil.setJaasFile("",""); } catch (IOException e) { @@ -87,11 +96,10 @@ public class RealConsumer implements CommandLineRunner { } logger.info("Security prepare success."); } - kafkaProp.put("security.protocol","SASL_PLAINTEXT"); -// System.setProperty("java.security.auth.login.config","/gpsstore/kafka_client_scram_consumer_jaas.conf"); - kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-2024\";"); - kafkaProp.put("sasl.mechanism", "PLAIN"); + /*kafkaProp.put("security.protocol", "SASL_PLAINTEXT"); + kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2024\";"); + kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");*/ KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode); executorService.execute(runnable); } diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/NewProducer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/NewProducer.java deleted file mode 100644 index 869cbfae..00000000 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/NewProducer.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.dromara.data2kafka.producer; - -import com.alibaba.fastjson.JSONObject; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.concurrent.ExecutionException; - -/** - *

description:

- * - * @author chenle - * @date 2021-11-01 17:20 - */ -//@Component -public class NewProducer { - - @Autowired - @Resource(name = "myKafkaProducer") - KafkaProducer kafkaProducer; - - private Logger LOG = LoggerFactory.getLogger(NewProducer.class); - - - /** - * 生产者线程执行函数,循环发送消息。 - */ - public void send(Object obj,String topic) { - String obj2String = JSONObject.toJSONString(obj); - - // 构造消息记录 - ProducerRecord record = new ProducerRecord(topic, obj2String); - try { - // 同步发送 - Object o = kafkaProducer.send(record).get(); - LOG.info("同步发送成功: Object={}", JSONObject.toJSONString(o)); - } catch (InterruptedException ie) { - ie.printStackTrace(); - LOG.error("The InterruptedException occured : {}.", ie); - } catch (ExecutionException ee) { - ee.printStackTrace(); - LOG.error("The ExecutionException occured : {}.", ee); - } - - /*kafkaProducer.send(record, (recordMetadata, e) -> { - if (e != null) { - LOG.error("send--The Exception occured.", e); - } - if (recordMetadata != null) - { - LOG.info("sent to partition(" + recordMetadata.partition() + "), " - + "offset(" + recordMetadata.offset()+"),topic="+recordMetadata.topic()); - } - });*/ - - } - - - - -} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml index 01621de8..147b89cb 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml @@ -12,23 +12,3 @@ spring: active: @profiles.active@ autoconfigure: exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration ---- # nacos 配置 -spring: - cloud: - nacos: - # nacos 服务地址 - server-addr: @nacos.server@ - username: @nacos.username@ - password: @nacos.password@ - discovery: - # 注册组 - group: @nacos.discovery.group@ - namespace: ${spring.profiles.active} - config: - # 配置组 - group: @nacos.config.group@ - namespace: ${spring.profiles.active} - config: - import: - - optional:nacos:application-common.yml - - optional:nacos:${spring.application.name}.yml diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/controller/DeviceInfoController.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/controller/DeviceInfoController.java index 58ac27b8..4a60d6f0 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/controller/DeviceInfoController.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/controller/DeviceInfoController.java @@ -1,6 +1,7 @@ package org.dromara.data2es.controller; import cn.hutool.core.bean.BeanUtil; +import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.common.core.domain.R; import org.dromara.common.web.core.BaseController; @@ -21,6 +22,7 @@ import java.util.Objects; @RequestMapping("device") @RestController +@Slf4j public class DeviceInfoController extends BaseController { @DubboReference @@ -34,6 +36,10 @@ public class DeviceInfoController extends BaseController { return R.fail("参数为空"); } Object dataList = params.get("dataList"); + Object infoSource = params.get("infoSource"); + if(Objects.isNull(infoSource)){ + return R.fail("参数 [infoSource] 为空"); + } if(Objects.isNull(dataList)){ return R.fail("参数 [dataList] 为空"); } @@ -42,7 +48,10 @@ public class DeviceInfoController extends BaseController { return R.fail("单次数据超过了100条"); } List list = BeanUtil.copyToList(dataList1, RemoteDeviceBo.class); - + for (RemoteDeviceBo deviceBo : list) { + deviceBo.setInfoSource(params.get("infoSource").toString()); + } + log.error("插入设备记录={}",list.toString()); boolean inserted = deviceService.batchSaveDevice(list); if(inserted) { diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java index 82626656..012fc1bb 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java @@ -96,7 +96,7 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener { String deviceType = split[2]; String deviceCode = split[3]; - if ("5".equals(deviceType) ) { + if ("05".equals(deviceType) ) { return; } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java index c87cfb99..66996c16 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/service/impl/GpsServiceImpl.java @@ -125,7 +125,7 @@ public class GpsServiceImpl implements IGpsService { } requestHandler.redisOnlineUserBatch(onlineUserDataMap, 2592000); //存放30天 - requestHandler.redisOnlineUserBatch(orgCodeDataMap, 3600); //此处和buildRedisMap方法判断在线的时间一直 + requestHandler.redisOnlineUserBatch(orgCodeDataMap, 1800); //此处和buildRedisMap方法判断在线的时间一直 // requestHandler.redisDeleteBatch(deleteKeys); //此处换成了数据库的模式 所以这个就不用了 requestHandler.esRealBulkSave(bulkRequest); @@ -317,7 +317,7 @@ public class GpsServiceImpl implements IGpsService { } if (!Objects.isNull(gpsTime)) { - if (DateUtil.between(gpsTime, new Date(), DateUnit.SECOND) <= 3600L) { + if (DateUtil.between(gpsTime, new Date(), DateUnit.SECOND) <= 1800L) { if (null == esGpsInfoVo2.getOnline()){ esGpsInfoVo2.setOnline(1); diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java index e70fdf82..c596428d 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java @@ -49,52 +49,20 @@ public class IndexStaticsController extends BaseController { * */ @PostMapping("/onLineBar") public R onLineBar(){ - List deptVoList = deptService.getDsList(); - List staticsVoList = deviceService.countByDs(); - List list = new ArrayList<>(); //用来接收处理后的统计结果 - for (SysDeptVo deptVo : deptVoList) { - boolean bl = false; //用来统计结果是否有当前这个机构 - for (DeviceStaticsVo staticsVo : staticsVoList) { - String deptId = staticsVo.getZzjgdm()+"00000000"; - if (deptId.equals(deptVo.getDeptId())){ - staticsVo.setZzjgdm(deptId); - staticsVo.setZzjgmc(deptVo.getDeptName().replaceAll("公安局","")); - int onlineCo = RedisUtils.searchKeys("org_code:"+staticsVo.getZzjgdm()+"*"); - staticsVo.setOnlineCo(onlineCo); - list.add(staticsVo); - bl = true; - break; - } - } - if (!bl){ - DeviceStaticsVo staticsVo = new DeviceStaticsVo(); - staticsVo.setZzjgdm(deptVo.getDeptId()); - staticsVo.setZzjgmc(deptVo.getDeptName().replaceAll("公安局","")); - staticsVo.setCo(0); - staticsVo.setOnlineCo(0); - list.add(staticsVo); - } - } + List list = redisService.dsqk(); return R.ok(list); } /* * 根据Code查询终端总数和在线数 + * 数据总览 * */ @GetMapping("/dsOnlineCount") public R dsOnlineCount(String code){ - TDeviceBo bo = new TDeviceBo(); - bo.setInfoSource(code); - Long co = deviceService.countByCondition(bo); - int onlineCo = 0; - if (null == code || "".equals(code)){ - onlineCo = RedisUtils.searchKeys("org_code:*"); - }else { - onlineCo = RedisUtils.searchKeys("org_code:"+code+"*"); - } + DeviceStaticsVo vo = redisService.qszl(); HashMap map = new HashMap(); - map.put("co",co); - map.put("onlineCo",onlineCo); + map.put("co",vo.getCo()); + map.put("onlineCo",vo.getOnlineCo()); return R.ok(map); } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/TDevice.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/TDevice.java index a87818ec..b2971fcc 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/TDevice.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/TDevice.java @@ -92,6 +92,10 @@ public class TDevice { */ private String remark2; + private String gbbm; + + private String tdbm; + @TableField(fill = FieldFill.INSERT) private String createTime; diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/TDeviceBo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/TDeviceBo.java index 2c7f1324..d2eae574 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/TDeviceBo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/TDeviceBo.java @@ -88,6 +88,10 @@ public class TDeviceBo extends BaseEntity { */ private String remark1; + private String gbbm; + + private String tdbm; + /** * 备注字段2 */ diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/DeviceRedisVo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/DeviceRedisVo.java index fa9f329f..a6a4b2be 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/DeviceRedisVo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/DeviceRedisVo.java @@ -14,14 +14,12 @@ public class DeviceRedisVo { private String deviceType; - private String online; - private String zzjgdm; private String typeName; private Integer co; - private Integer onlien; + private Integer online; } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceVo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceVo.java index 38215862..ff084373 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceVo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceVo.java @@ -116,6 +116,10 @@ public class TDeviceVo implements Serializable { @ExcelProperty(value = "备注字段2") private String remark2; + private String gbbm; + + private String tdbm; + private String createTime; diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/dubbo/RemoteDeviceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/dubbo/RemoteDeviceImpl.java index ed7b2b20..5dac710a 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/dubbo/RemoteDeviceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/dubbo/RemoteDeviceImpl.java @@ -3,6 +3,7 @@ package org.dromara.system.dubbo; import cn.hutool.core.bean.BeanUtil; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.config.annotation.DubboService; import org.dromara.common.core.domain.R; import org.dromara.common.core.utils.MapstructUtils; @@ -20,17 +21,29 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @RequiredArgsConstructor @Service @DubboService +@Slf4j public class RemoteDeviceImpl implements RemoteDeviceService { private final ITDeviceService deviceService; @Override public boolean batchSaveDevice(List boList) { - List devices = BeanUtil.copyToList(boList, TDevice.class); + List devices = boList.stream().map(bo -> { + log.info("RemoteDeviceBo gbbm = " + bo.getGbbm()); + // 先转成 Map + Map map = BeanUtil.beanToMap(bo); + log.info("Map gbbm = " + map.get("gbbm")); // 看 map 里有没有 + + // 再从 Map 转 TDevice + TDevice device = BeanUtil.toBean(map, TDevice.class); + return device; + }).collect(Collectors.toList()); boolean flag = deviceService.batchSaveOrUpdate(devices); return flag; } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/DeviceRedisMapper.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/DeviceRedisMapper.java index a95a3902..73d95821 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/DeviceRedisMapper.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/DeviceRedisMapper.java @@ -14,4 +14,6 @@ public interface DeviceRedisMapper extends BaseMapperPlus countByCondition(DeviceRedis redis); List dsStatics(); + + DeviceStaticsVo qszl(); } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/TDeviceMapper.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/TDeviceMapper.java index fbd0aac4..2490afb0 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/TDeviceMapper.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/mapper/TDeviceMapper.java @@ -19,4 +19,6 @@ public interface TDeviceMapper extends BaseMapperPlus { List countByDsAndType(); + boolean insertOrUpdateByUpsert(List list); + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/IDeviceRedisService.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/IDeviceRedisService.java index efc527b8..6a0d30db 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/IDeviceRedisService.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/IDeviceRedisService.java @@ -12,4 +12,8 @@ public interface IDeviceRedisService { List countByCondition(DeviceRedis redis); List dsStatics(); + + List dsqk(); + + DeviceStaticsVo qszl(); } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/ITDeviceService.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/ITDeviceService.java index 8cc93964..c8645d84 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/ITDeviceService.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/ITDeviceService.java @@ -71,7 +71,7 @@ public interface ITDeviceService { */ Boolean deleteWithValidByIds(Collection ids, Boolean isValid); - Boolean batchSaveOrUpdate(List List); + Boolean batchSaveOrUpdate(List list); List countByDs(); diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java index d86b6cf5..ba8a4ae1 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java @@ -116,6 +116,16 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService { return baseMapper.countByCondition(redis); } + @Override + public List dsqk() { + return baseMapper.dsStatics(); + } + + @Override + public DeviceStaticsVo qszl() { + return baseMapper.qszl(); + } + @Override public List dsStatics() { //1、查询各地市终端总数和在线数 @@ -128,10 +138,13 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService { for (DeviceStaticsVo vo : list) { for (DeviceStaticsVo staticsVo : dsvo) { //如果 vo的机构代码和staticsVo一样 就查询字典值并设置到对应的字段 - if (vo.getZzjgdm().equals(staticsVo.getZzjgdm())){ + if (vo.getZzjgdm().equals(staticsVo.getZzjgdm()+"00000000")){ String deviceType = staticsVo.getDeviceType(); + if(null == deviceType){ + continue; + } for (SysDictDataVo dataVo : dictDataVos) { - if (staticsVo.getDeviceType().equals(dataVo.getDictValue())){ //如果匹配设置值 + if (dataVo.getDictValue().equals(deviceType)){ //如果匹配设置值 switch (deviceType){ case "01" : vo.setJcco(staticsVo.getCo()); @@ -145,11 +158,8 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService { case "05" : vo.setJlyco(staticsVo.getCo()); break; - case "99" : - vo.setQtco(staticsVo.getCo()); - break; default: - vo.setQtco(staticsVo.getCo()+vo.getQtco()); + vo.setQtco(staticsVo.getCo()); } } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java index 607c9f1b..39435e71 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java @@ -2,6 +2,7 @@ package org.dromara.system.service.impl; import cn.hutool.core.date.DateUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.common.core.domain.R; import org.dromara.common.core.utils.MapstructUtils; @@ -34,6 +35,7 @@ import java.util.regex.Pattern; */ @RequiredArgsConstructor @Service +@Slf4j public class TDeviceServiceImpl implements ITDeviceService { private final TDeviceMapper baseMapper; @@ -184,8 +186,9 @@ public class TDeviceServiceImpl implements ITDeviceService { } @Override - public Boolean batchSaveOrUpdate(List List) { - return baseMapper.insertOrUpdateBatch(List); + public Boolean batchSaveOrUpdate(List list) { + log.info("查询设备={}",list.toString()); + return baseMapper.insertOrUpdateByUpsert(list); } @Override diff --git a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml index 58432343..37568851 100644 --- a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml +++ b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml @@ -24,6 +24,11 @@ (online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gps_time) + + + - SELECT d.dept_id zzjgdm,short_name zzjgmc,COALESCE(td.co,0) co,COALESCE(rd.online,0) online FROM + SELECT d.dept_id zzjgdm,short_name zzjgmc,COALESCE(td.co,0) co,COALESCE(rd.online,0) onlineCo FROM sys_dept d LEFT JOIN (SELECT substr(zzjgdm, 1, 4) dept_id,count(*) co from (SELECT * FROM t_device @@ -60,10 +65,10 @@ on substr(d.dept_id,1,4) = td.dept_id LEFT JOIN - (SELECT substr(zzjgdm, 1, 4) dept_id,count(*) online from (SELECT * FROM t_device_redis + (SELECT info_source dept_id,count(*) online from (SELECT * FROM t_device_redis WHERE online = '1' ) r - GROUP BY substr(zzjgdm,1, 4) ) rd + GROUP BY info_source ) rd on substr(d.dept_id,1,4) = rd.dept_id WHERE d.parent_id = '0' and d.dept_id != '340000000000' ORDER BY zzjgdm diff --git a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml index bd50034b..a172aef4 100644 --- a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml +++ b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/TDeviceMapper.xml @@ -19,4 +19,75 @@ GROUP BY substr(zzjgdm,1, 4),device_type HAVING substr(zzjgdm,1,4) is not null ORDER BY zzjgdm + + INSERT INTO t_device ( + device_code, device_type, info_source, zzjgdm, zzjgmc, + police_no, police_name, remark1, remark2, create_time, update_time, + car_num, sbpp, sbxh, card_num, phone_num, gbbm, tdbm + ) VALUES + + ( + #{item.deviceCode}, #{item.deviceType}, #{item.infoSource}, + #{item.zzjgdm}, #{item.zzjgmc}, #{item.policeNo}, #{item.policeName}, + #{item.remark1}, #{item.remark2}, + COALESCE(#{item.createTime}, NOW()), + NOW(), + #{item.carNum}, #{item.sbpp}, #{item.sbxh}, + #{item.cardNum}, #{item.phoneNum}, #{item.gbbm}, #{item.tdbm} + ) + + ON CONFLICT (device_code, info_source) + DO UPDATE SET + device_type = EXCLUDED.device_type, + zzjgdm = CASE + WHEN EXCLUDED.zzjgdm IS NULL OR EXCLUDED.zzjgdm = '' + THEN t_device.zzjgdm + ELSE EXCLUDED.zzjgdm + END, + zzjgmc = CASE + WHEN EXCLUDED.zzjgmc IS NULL OR EXCLUDED.zzjgmc = '' + THEN t_device.zzjgmc + ELSE EXCLUDED.zzjgmc + END, + police_no = CASE + WHEN EXCLUDED.police_no IS NULL OR EXCLUDED.police_no = '' + THEN t_device.police_no + ELSE EXCLUDED.police_no + END, + police_name = CASE + WHEN EXCLUDED.police_name IS NULL OR EXCLUDED.police_name = '' + THEN t_device.police_name + ELSE EXCLUDED.police_name + END, + remark1 = EXCLUDED.remark1, + remark2 = EXCLUDED.remark2, + update_time = NOW(), + car_num = CASE + WHEN EXCLUDED.car_num IS NULL OR EXCLUDED.car_num = '' + THEN t_device.car_num + ELSE EXCLUDED.car_num + END, + sbpp = EXCLUDED.sbpp, + sbxh = EXCLUDED.sbxh, + card_num = CASE + WHEN EXCLUDED.card_num IS NULL OR EXCLUDED.card_num = '' + THEN t_device.card_num + ELSE EXCLUDED.card_num + END, + phone_num = CASE + WHEN EXCLUDED.phone_num IS NULL OR EXCLUDED.phone_num = '' + THEN t_device.phone_num + ELSE EXCLUDED.phone_num + END, + gbbm = CASE + WHEN EXCLUDED.gbbm IS NULL OR EXCLUDED.gbbm = '' + THEN t_device.gbbm + ELSE EXCLUDED.gbbm + END, + tdbm = CASE + WHEN EXCLUDED.tdbm IS NULL OR EXCLUDED.tdbm = '' + THEN t_device.tdbm + ELSE EXCLUDED.tdbm + END +