diff --git a/stwzhj-modules/stwzhj-location/pom.xml b/stwzhj-modules/stwzhj-location/pom.xml index 536bb98f..f3fcd9db 100644 --- a/stwzhj-modules/stwzhj-location/pom.xml +++ b/stwzhj-modules/stwzhj-location/pom.xml @@ -93,6 +93,11 @@ stwzhj-common-encrypt + + org.dromara + stwzhj-common-redis + + org.dromara @@ -103,7 +108,12 @@ org.dromara stwzhj-api-resource - + + + org.dromara + stwzhj-api-data2es + + org.elasticsearch.client elasticsearch-rest-client @@ -132,6 +142,48 @@ + + org.apache.kafka + kafka_2.12 + 3.6.1-h0.cbu.mrs.350.r11 + + + org.apache.zookeeper + zookeeper + + + net.sf.jopt-simple + jopt-simple + + + com.huawei.mrs + manager-wc2frm + + + org.apache.kafka + kafka-clients + + + org.xerial.snappy + snappy-java + + + com.huawei.mrs + om-controller-api + + + com.101tec + zkclient + + + + + + org.apache.kafka + kafka-clients + 3.6.1-h0.cbu.mrs.350.r11 + + org.locationtech.jts @@ -146,12 +198,6 @@ 0.8 - - - org.springframework.kafka - spring-kafka - - diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/ElasticsearchConfig.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/ElasticsearchConfig.java index 0d0f185f..075ca4ef 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/ElasticsearchConfig.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/ElasticsearchConfig.java @@ -1,16 +1,12 @@ package org.dromara.location.config; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHost; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; -import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.List; @@ -18,10 +14,11 @@ import java.util.List; /** * restHighLevelClient 客户端配置类 */ -@Slf4j -//@Data -//@Configuration -//@ConfigurationProperties(prefix = "elasticsearch") + +/*@Slf4j +@Data +@Configuration +@ConfigurationProperties(prefix = "elasticsearch")*/ public class ElasticsearchConfig { // es host ip 地址(集群) @@ -83,7 +80,7 @@ public class ElasticsearchConfig { }); restHighLevelClient = new RestHighLevelClient(builder); } catch (NumberFormatException e) { - log.error("ES 连接池初始化异常"); +// log.error("ES 连接池初始化异常"); } return restHighLevelClient; } diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/EsConfig.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/EsConfig.java index cf4324bc..242dde1f 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/EsConfig.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/EsConfig.java @@ -26,7 +26,7 @@ public class EsConfig { @Bean(destroyMethod = "close",name = "restHighLevelClient") public RestHighLevelClient restClient() { // String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator; - String configPath = "/home/rsoft/config/"; + String configPath = "/rsoft/config/"; // KAFKA("KafkaClient"), ZOOKEEPER("Client"); // GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient"); diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/HwRestClient.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/HwRestClient.java new file mode 100644 index 00000000..763be69f --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/HwRestClient.java @@ -0,0 +1,227 @@ +package org.dromara.location.config; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.Node; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.springframework.core.io.ClassPathResource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + *

description:

+ * + * @author chenle + * @date 2021-11-01 14:25 + */ +public class HwRestClient { + private static final Log LOG = LogFactory.getLog(HwRestClient.class); + private static String isSecureMode; + private static String esServerHost; + private static int connectTimeout; + private static int socketTimeout; + private static int connectionRequestTimeout; + private static int maxConnTotal; + private static int maxConnPerRoute; + private static String principal; + private static final String UN_SECURITY_MODE = "false"; + private static final String COLON = ":"; + private static final String COMMA = ","; + private static HttpHost[] hostArray; + private String configPath; + private static boolean SNIFFER_ENABLE = false; + + private static final String username = "yhy_ahrs_rcw@HADOOP.COM"; + + public static final String password = "Ycgis!2509"; + + public HwRestClient(String configPath) { + this.configPath = configPath; + if (!this.getConfig()) { + LOG.error("Get config failed."); + } + + } + + public HwRestClient() { + /*this.configPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; + if (!this.getConfig()) { + LOG.error("Get config failed."); + }*/ + if (!this.getConfig()) { + LOG.error("Get config failed."); + } + this.configPath = "/rsoft/config/"; + } + + private boolean getConfig() { + Properties properties = new Properties(); + ClassPathResource classPathResource = new ClassPathResource("/esParams.properties"); + + try { + properties.load(classPathResource.getInputStream()); + } catch (IOException var5) { + LOG.error("Failed to load properties file : " + var5.getMessage()); + return false; + } + + try { + esServerHost = properties.getProperty("esServerHost"); + connectTimeout = Integer.valueOf(properties.getProperty("connectTimeout")); + socketTimeout = Integer.valueOf(properties.getProperty("socketTimeout")); + connectionRequestTimeout = Integer.valueOf(properties.getProperty("connectionRequestTimeout")); + maxConnPerRoute = Integer.valueOf(properties.getProperty("maxConnPerRoute")); + maxConnTotal = Integer.valueOf(properties.getProperty("maxConnTotal")); + isSecureMode = properties.getProperty("isSecureMode"); + principal = properties.getProperty("principal"); + SNIFFER_ENABLE = Boolean.valueOf(properties.getProperty("snifferEnable")); + LOG.info("esServerHost:" + esServerHost); + LOG.info("connectTimeout:" + connectTimeout); + LOG.info("socketTimeout:" + socketTimeout); + LOG.info("connectionRequestTimeout:" + connectionRequestTimeout); + LOG.info("maxConnPerRouteTotal:" + maxConnPerRoute); + LOG.info("maxConnTotal:" + maxConnTotal); + LOG.info("isSecureMode:" + isSecureMode); + LOG.info("principal:" + principal); + return true; + } catch (NumberFormatException var4) { + LOG.error("Failed to get parameters !", var4); + return false; + } + } + + public boolean isSnifferEnable() { + return SNIFFER_ENABLE; + } + + private HttpHost[] getHostArray() { + String schema; + if ("false".equals(isSecureMode)) { + schema = "http"; + } else { + schema = "https"; + } + + List hosts = new ArrayList(); + String[] hostArray1 = esServerHost.split(","); + String[] var4 = hostArray1; + int var5 = hostArray1.length; + + for (int var6 = 0; var6 < var5; ++var6) { + String host = var4[var6]; + String[] ipPort = host.split(":"); + HttpHost hostNew = new HttpHost(ipPort[0], Integer.valueOf(ipPort[1]), schema); + hosts.add(hostNew); + } + + return (HttpHost[]) hosts.toArray(new HttpHost[0]); + } + + private void setSecConfig() { + try { + LOG.info("Config path is " + this.configPath); + LoginUtil.setJaasFile(principal, this.configPath + "user.keytab"); + LoginUtil.setKrb5Config(this.configPath + "krb5.conf"); + System.setProperty("elasticsearch.kerberos.jaas.appname", "EsClient"); + System.setProperty("es.security.indication", "true"); + LOG.info("es.security.indication is " + System.getProperty("es.security.indication")); + } catch (Exception var2) { + LOG.error("Failed to set security conf", var2); + } + + } + + public RestClientBuilder getRestClientBuilder() { + hostArray = getHostArray(); + if ("false".equals(isSecureMode)) { + System.setProperty("es.security.indication", "false"); + } else { + setSecConfig(); + } + + RestClientBuilder builder = RestClient.builder(hostArray); + Header[] defaultHeaders = new Header[]{new BasicHeader("Accept", "application/json"), new BasicHeader("Content-type", "application/json")}; + builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { + @Override + public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { + + return requestConfigBuilder.setConnectTimeout(HwRestClient.connectTimeout). + setSocketTimeout(HwRestClient.socketTimeout). + setConnectionRequestTimeout(HwRestClient.connectionRequestTimeout); + } + }); + builder.setDefaultHeaders(defaultHeaders); + + /* es https */ + /*try { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + + SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() { + // 信任所有 + @Override + public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException { + return true; + } + }).build(); + SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE); + builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { + @Override + public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { + httpClientBuilder.disableAuthCaching(); + //httpClientBuilder.setSSLStrategy(sessionStrategy); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + return httpClientBuilder; + } + }); + + }catch (Exception e){ + e.printStackTrace(); + }*/ + /* es https */ + return builder; + } + + public RestClient getRestClient() { + if (this.ifConfigWasWrong()) { + return null; + } else { + RestClientBuilder restClientBuilder = this.getRestClientBuilder(); + RestClient restClient = restClientBuilder.build(); + this.setNodes(restClient); + LOG.info("The Low Level Rest Client has been created."); + return restClient; + } + } + + private boolean ifConfigWasWrong() { + if (this.configPath != null && this.configPath.length() != 0) { + return false; + } else { + LOG.info("Config path is not allowed to be empty."); + return true; + } + } + + private void setNodes(RestClient restClient) { + List nodes = new ArrayList(); + HttpHost[] var3 = hostArray; + int var4 = var3.length; + + for (int var5 = 0; var5 < var4; ++var5) { + HttpHost httpHost = var3[var5]; + nodes.add(new Node(httpHost)); + } + + restClient.setNodes(nodes); + } +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/KafkaProperties.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/KafkaProperties.java new file mode 100644 index 00000000..05a08c97 --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/KafkaProperties.java @@ -0,0 +1,138 @@ +package org.dromara.location.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public final class KafkaProperties +{ + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String TOPIC = "jysb_dwxx"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private KafkaProperties() + { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/home/rsoft/config/"; + LOG.info("路径=={}",filePath); + try + { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) + { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) + { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) + { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) + { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } + catch (IOException e) + { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() + { + if (null == instance) + { + instance = new KafkaProperties(); + } + + return instance; + } + + /** + * 获取参数值 + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) + { + String rtValue = null; + + if (null == key) + { + LOG.error("key is null"); + } + else + { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) + { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * @param key + * @return + */ + private String getPropertiesValue(String key) + { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) + { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) + { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) + { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/LoginUtil.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/LoginUtil.java new file mode 100644 index 00000000..1de053aa --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/config/LoginUtil.java @@ -0,0 +1,259 @@ +package org.dromara.location.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) + { + this.name = name; + } + + public String getName() + { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { + String jaasPath = + new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 + deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) + { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) + { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException + * 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try + { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } + catch (IOException e) + { + throw new IOException("Failed to create jaas.conf File"); + } + finally + { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) + { + if (!jaasFile.delete()) + { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) + { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/home/rsoft/config/"; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; +// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; + String krbFilePath = "/home/rsoft/config/kafkaSecurityMode"; + Properties securityProps = new Properties(); + + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) + { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return isSecurity; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java index 5fe4bc1b..4d283538 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java @@ -41,7 +41,7 @@ import java.util.function.Consumer; public class SearchServiceImpl implements ISearchService { @Resource(name = "restHighLevelClient") - private RestHighLevelClient restHighLevelClient; + private RestHighLevelClient restHighLevelClient; @Override diff --git a/stwzhj-modules/stwzhj-location/src/main/resources/application.yml b/stwzhj-modules/stwzhj-location/src/main/resources/application.yml index 7ab487c2..525847aa 100644 --- a/stwzhj-modules/stwzhj-location/src/main/resources/application.yml +++ b/stwzhj-modules/stwzhj-location/src/main/resources/application.yml @@ -10,6 +10,8 @@ spring: profiles: # 环境配置 active: @profiles.active@ + autoconfigure: + exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration --- # nacos 配置 spring: diff --git a/stwzhj-modules/stwzhj-location/src/main/resources/kafkaSecurityMode b/stwzhj-modules/stwzhj-location/src/main/resources/kafkaSecurityMode new file mode 100644 index 00000000..ed59a5e4 --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/resources/kafkaSecurityMode @@ -0,0 +1 @@ +kafka.client.security.mode = yes diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/DeviceRedis.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/DeviceRedis.java index 542a6acb..f08ae1bc 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/DeviceRedis.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/DeviceRedis.java @@ -24,4 +24,6 @@ public class DeviceRedis { private Date gpsTime; + private String infoSource; + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java index bca2366e..d86b6cf5 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/DeviceRedisServiceImpl.java @@ -3,6 +3,7 @@ package org.dromara.system.service.impl; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.ibatis.executor.BatchResult; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; @@ -18,7 +19,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; @Slf4j @RequiredArgsConstructor @@ -35,34 +39,76 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService { SqlSessionFactory sqlSessionFactory; @Override - @Transactional(rollbackFor = Exception.class) public int insertBatch(List list) { + log.error("本次查询离线在线数据大小={}", list.size()); + + // 在插入前对数据进行去重(基于唯一约束) + Map uniqueMap = new LinkedHashMap<>(); + for (DeviceRedis device : list) { + String key = device.getDeviceCode() + "|" + + device.getDeviceType() + "|" + + device.getInfoSource(); + // 保留最后一次出现的记录(根据业务需求调整) + uniqueMap.put(key, device); + } + List uniqueList = new ArrayList<>(uniqueMap.values()); + + log.error("去重后数据大小={}", uniqueList.size()); + int groupSize = 2000; - int groupNo = list.size() / groupSize; - SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH); - int num = 0; + int totalSize = uniqueList.size(); + SqlSession sqlSession = null; + int totalAffectedRows = 0; + try { - if (list.size() <= groupSize) { - num += baseMapper.insertBatch(list); - } else { - List subList=null; - for (int i = 0; i < groupNo; i++) { - subList = list.subList(0, groupSize); - num += baseMapper.insertBatch(subList); - list.subList(0, groupSize).clear(); - } - if (list.size() > 0) { - num += baseMapper.insertBatch(list); + sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); // 手动控制事务 + DeviceRedisMapper batchMapper = sqlSession.getMapper(DeviceRedisMapper.class); + + // 详细记录分组信息 + log.error("数据分组信息: 总大小={}, 分组大小={}, 预计分组数={}", + totalSize, groupSize, (int) Math.ceil((double) totalSize / groupSize)); + + // 分组处理(不修改原始list) + for (int i = 0; i < totalSize; i += groupSize) { + int toIndex = Math.min(i + groupSize, totalSize); + List subList = uniqueList.subList(i, toIndex); + + try { + batchMapper.insertBatch(subList); + log.error("分组 {} 成功提交: {}~{} 条", + i/groupSize+1, i, toIndex); + } catch (Exception e) { + // 详细记录每个分组的异常 + log.error("分组 {} 插入失败 ({}~{} 条): {}", + i/groupSize+1, i, toIndex, e.getMessage(), e); } } - sqlSession.flushStatements(); - return num; - }catch (Exception e){ - e.printStackTrace(); - }finally { - sqlSession.close(); - return num; + + // 显式提交事务并获取影响行数 + List batchResults = sqlSession.flushStatements(); + sqlSession.commit(); + + // 统计实际影响行数 + for (BatchResult br : batchResults) { + for (int count : br.getUpdateCounts()) { + totalAffectedRows += count; + } + } + } catch (Exception e) { + // 记录全局异常 + log.error("批量插入整体失败: {}", e.getMessage(), e); + if (sqlSession != null) { + sqlSession.rollback(); + log.error("事务已回滚"); + } + } finally { + if (sqlSession != null) { + sqlSession.close(); + } + log.error("实际入库数量={}/{}", totalAffectedRows, totalSize); } + + return totalAffectedRows; } @Override diff --git a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml index 2e7ba5b1..58432343 100644 --- a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml +++ b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/DeviceRedisMapper.xml @@ -13,15 +13,15 @@ - insert into t_device_redis (device_code,device_type,online,zzjgdm,gps_time) + insert into t_device_redis (device_code,device_type,online,zzjgdm,gps_time,info_source) values ( - #{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm},#{entity.gpsTime} + #{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm},#{entity.gpsTime},#{entity.infoSource} ) - ON conflict(device_code,device_type) do update set - (online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gpsTime) + ON conflict(device_code,device_type,info_source) do update set + (online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gps_time)