ES认证报错修改
parent
bd7fbfbde4
commit
549e085df8
|
|
@ -93,6 +93,11 @@
|
||||||
<artifactId>stwzhj-common-encrypt</artifactId>
|
<artifactId>stwzhj-common-encrypt</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-redis</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- RuoYi Api System -->
|
<!-- RuoYi Api System -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.dromara</groupId>
|
<groupId>org.dromara</groupId>
|
||||||
|
|
@ -103,7 +108,12 @@
|
||||||
<groupId>org.dromara</groupId>
|
<groupId>org.dromara</groupId>
|
||||||
<artifactId>stwzhj-api-resource</artifactId>
|
<artifactId>stwzhj-api-resource</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!--elasticsearch-->
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-api-data2es</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.elasticsearch.client</groupId>
|
<groupId>org.elasticsearch.client</groupId>
|
||||||
<artifactId>elasticsearch-rest-client</artifactId>
|
<artifactId>elasticsearch-rest-client</artifactId>
|
||||||
|
|
@ -132,6 +142,48 @@
|
||||||
|
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka_2.12</artifactId>
|
||||||
|
<version>3.6.1-h0.cbu.mrs.350.r11</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
|
<artifactId>zookeeper</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>net.sf.jopt-simple</groupId>
|
||||||
|
<artifactId>jopt-simple</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.huawei.mrs</groupId>
|
||||||
|
<artifactId>manager-wc2frm</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-clients</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.xerial.snappy</groupId>
|
||||||
|
<artifactId>snappy-java</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.huawei.mrs</groupId>
|
||||||
|
<artifactId>om-controller-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.101tec</groupId>
|
||||||
|
<artifactId>zkclient</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.kafka</groupId>
|
||||||
|
<artifactId>kafka-clients</artifactId>
|
||||||
|
<version>3.6.1-h0.cbu.mrs.350.r11</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- JTS 几何库 -->
|
<!-- JTS 几何库 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.locationtech.jts</groupId>
|
<groupId>org.locationtech.jts</groupId>
|
||||||
|
|
@ -146,12 +198,6 @@
|
||||||
<version>0.8</version>
|
<version>0.8</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- kafka -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.kafka</groupId>
|
|
||||||
<artifactId>spring-kafka</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,12 @@
|
||||||
package org.dromara.location.config;
|
package org.dromara.location.config;
|
||||||
|
|
||||||
import lombok.Data;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
import org.apache.http.client.CredentialsProvider;
|
import org.apache.http.client.CredentialsProvider;
|
||||||
import org.apache.http.impl.client.BasicCredentialsProvider;
|
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||||
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestClient;
|
||||||
import org.elasticsearch.client.RestClientBuilder;
|
import org.elasticsearch.client.RestClientBuilder;
|
||||||
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -18,10 +14,11 @@ import java.util.List;
|
||||||
/**
|
/**
|
||||||
* restHighLevelClient 客户端配置类
|
* restHighLevelClient 客户端配置类
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
|
||||||
//@Data
|
/*@Slf4j
|
||||||
//@Configuration
|
@Data
|
||||||
//@ConfigurationProperties(prefix = "elasticsearch")
|
@Configuration
|
||||||
|
@ConfigurationProperties(prefix = "elasticsearch")*/
|
||||||
public class ElasticsearchConfig {
|
public class ElasticsearchConfig {
|
||||||
|
|
||||||
// es host ip 地址(集群)
|
// es host ip 地址(集群)
|
||||||
|
|
@ -83,7 +80,7 @@ public class ElasticsearchConfig {
|
||||||
});
|
});
|
||||||
restHighLevelClient = new RestHighLevelClient(builder);
|
restHighLevelClient = new RestHighLevelClient(builder);
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
log.error("ES 连接池初始化异常");
|
// log.error("ES 连接池初始化异常");
|
||||||
}
|
}
|
||||||
return restHighLevelClient;
|
return restHighLevelClient;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ public class EsConfig {
|
||||||
@Bean(destroyMethod = "close",name = "restHighLevelClient")
|
@Bean(destroyMethod = "close",name = "restHighLevelClient")
|
||||||
public RestHighLevelClient restClient() {
|
public RestHighLevelClient restClient() {
|
||||||
// String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator;
|
// String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator;
|
||||||
String configPath = "/home/rsoft/config/";
|
String configPath = "/rsoft/config/";
|
||||||
|
|
||||||
// KAFKA("KafkaClient"), ZOOKEEPER("Client");
|
// KAFKA("KafkaClient"), ZOOKEEPER("Client");
|
||||||
// GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient");
|
// GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient");
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,227 @@
|
||||||
|
package org.dromara.location.config;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.http.Header;
|
||||||
|
import org.apache.http.HttpHost;
|
||||||
|
import org.apache.http.client.config.RequestConfig;
|
||||||
|
import org.apache.http.message.BasicHeader;
|
||||||
|
import org.elasticsearch.client.Node;
|
||||||
|
import org.elasticsearch.client.RestClient;
|
||||||
|
import org.elasticsearch.client.RestClientBuilder;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>description: </p>
|
||||||
|
*
|
||||||
|
* @author chenle
|
||||||
|
* @date 2021-11-01 14:25
|
||||||
|
*/
|
||||||
|
public class HwRestClient {
|
||||||
|
private static final Log LOG = LogFactory.getLog(HwRestClient.class);
|
||||||
|
private static String isSecureMode;
|
||||||
|
private static String esServerHost;
|
||||||
|
private static int connectTimeout;
|
||||||
|
private static int socketTimeout;
|
||||||
|
private static int connectionRequestTimeout;
|
||||||
|
private static int maxConnTotal;
|
||||||
|
private static int maxConnPerRoute;
|
||||||
|
private static String principal;
|
||||||
|
private static final String UN_SECURITY_MODE = "false";
|
||||||
|
private static final String COLON = ":";
|
||||||
|
private static final String COMMA = ",";
|
||||||
|
private static HttpHost[] hostArray;
|
||||||
|
private String configPath;
|
||||||
|
private static boolean SNIFFER_ENABLE = false;
|
||||||
|
|
||||||
|
private static final String username = "yhy_ahrs_rcw@HADOOP.COM";
|
||||||
|
|
||||||
|
public static final String password = "Ycgis!2509";
|
||||||
|
|
||||||
|
public HwRestClient(String configPath) {
|
||||||
|
this.configPath = configPath;
|
||||||
|
if (!this.getConfig()) {
|
||||||
|
LOG.error("Get config failed.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public HwRestClient() {
|
||||||
|
/*this.configPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
|
||||||
|
if (!this.getConfig()) {
|
||||||
|
LOG.error("Get config failed.");
|
||||||
|
}*/
|
||||||
|
if (!this.getConfig()) {
|
||||||
|
LOG.error("Get config failed.");
|
||||||
|
}
|
||||||
|
this.configPath = "/rsoft/config/";
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean getConfig() {
|
||||||
|
Properties properties = new Properties();
|
||||||
|
ClassPathResource classPathResource = new ClassPathResource("/esParams.properties");
|
||||||
|
|
||||||
|
try {
|
||||||
|
properties.load(classPathResource.getInputStream());
|
||||||
|
} catch (IOException var5) {
|
||||||
|
LOG.error("Failed to load properties file : " + var5.getMessage());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
esServerHost = properties.getProperty("esServerHost");
|
||||||
|
connectTimeout = Integer.valueOf(properties.getProperty("connectTimeout"));
|
||||||
|
socketTimeout = Integer.valueOf(properties.getProperty("socketTimeout"));
|
||||||
|
connectionRequestTimeout = Integer.valueOf(properties.getProperty("connectionRequestTimeout"));
|
||||||
|
maxConnPerRoute = Integer.valueOf(properties.getProperty("maxConnPerRoute"));
|
||||||
|
maxConnTotal = Integer.valueOf(properties.getProperty("maxConnTotal"));
|
||||||
|
isSecureMode = properties.getProperty("isSecureMode");
|
||||||
|
principal = properties.getProperty("principal");
|
||||||
|
SNIFFER_ENABLE = Boolean.valueOf(properties.getProperty("snifferEnable"));
|
||||||
|
LOG.info("esServerHost:" + esServerHost);
|
||||||
|
LOG.info("connectTimeout:" + connectTimeout);
|
||||||
|
LOG.info("socketTimeout:" + socketTimeout);
|
||||||
|
LOG.info("connectionRequestTimeout:" + connectionRequestTimeout);
|
||||||
|
LOG.info("maxConnPerRouteTotal:" + maxConnPerRoute);
|
||||||
|
LOG.info("maxConnTotal:" + maxConnTotal);
|
||||||
|
LOG.info("isSecureMode:" + isSecureMode);
|
||||||
|
LOG.info("principal:" + principal);
|
||||||
|
return true;
|
||||||
|
} catch (NumberFormatException var4) {
|
||||||
|
LOG.error("Failed to get parameters !", var4);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSnifferEnable() {
|
||||||
|
return SNIFFER_ENABLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private HttpHost[] getHostArray() {
|
||||||
|
String schema;
|
||||||
|
if ("false".equals(isSecureMode)) {
|
||||||
|
schema = "http";
|
||||||
|
} else {
|
||||||
|
schema = "https";
|
||||||
|
}
|
||||||
|
|
||||||
|
List<HttpHost> hosts = new ArrayList();
|
||||||
|
String[] hostArray1 = esServerHost.split(",");
|
||||||
|
String[] var4 = hostArray1;
|
||||||
|
int var5 = hostArray1.length;
|
||||||
|
|
||||||
|
for (int var6 = 0; var6 < var5; ++var6) {
|
||||||
|
String host = var4[var6];
|
||||||
|
String[] ipPort = host.split(":");
|
||||||
|
HttpHost hostNew = new HttpHost(ipPort[0], Integer.valueOf(ipPort[1]), schema);
|
||||||
|
hosts.add(hostNew);
|
||||||
|
}
|
||||||
|
|
||||||
|
return (HttpHost[]) hosts.toArray(new HttpHost[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setSecConfig() {
|
||||||
|
try {
|
||||||
|
LOG.info("Config path is " + this.configPath);
|
||||||
|
LoginUtil.setJaasFile(principal, this.configPath + "user.keytab");
|
||||||
|
LoginUtil.setKrb5Config(this.configPath + "krb5.conf");
|
||||||
|
System.setProperty("elasticsearch.kerberos.jaas.appname", "EsClient");
|
||||||
|
System.setProperty("es.security.indication", "true");
|
||||||
|
LOG.info("es.security.indication is " + System.getProperty("es.security.indication"));
|
||||||
|
} catch (Exception var2) {
|
||||||
|
LOG.error("Failed to set security conf", var2);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public RestClientBuilder getRestClientBuilder() {
|
||||||
|
hostArray = getHostArray();
|
||||||
|
if ("false".equals(isSecureMode)) {
|
||||||
|
System.setProperty("es.security.indication", "false");
|
||||||
|
} else {
|
||||||
|
setSecConfig();
|
||||||
|
}
|
||||||
|
|
||||||
|
RestClientBuilder builder = RestClient.builder(hostArray);
|
||||||
|
Header[] defaultHeaders = new Header[]{new BasicHeader("Accept", "application/json"), new BasicHeader("Content-type", "application/json")};
|
||||||
|
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
|
||||||
|
@Override
|
||||||
|
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
|
||||||
|
|
||||||
|
return requestConfigBuilder.setConnectTimeout(HwRestClient.connectTimeout).
|
||||||
|
setSocketTimeout(HwRestClient.socketTimeout).
|
||||||
|
setConnectionRequestTimeout(HwRestClient.connectionRequestTimeout);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
builder.setDefaultHeaders(defaultHeaders);
|
||||||
|
|
||||||
|
/* es https */
|
||||||
|
/*try {
|
||||||
|
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||||
|
credentialsProvider.setCredentials(AuthScope.ANY,
|
||||||
|
new UsernamePasswordCredentials(username, password));
|
||||||
|
|
||||||
|
SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
|
||||||
|
// 信任所有
|
||||||
|
@Override
|
||||||
|
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}).build();
|
||||||
|
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE);
|
||||||
|
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
|
||||||
|
@Override
|
||||||
|
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
|
||||||
|
httpClientBuilder.disableAuthCaching();
|
||||||
|
//httpClientBuilder.setSSLStrategy(sessionStrategy);
|
||||||
|
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
||||||
|
return httpClientBuilder;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
}catch (Exception e){
|
||||||
|
e.printStackTrace();
|
||||||
|
}*/
|
||||||
|
/* es https */
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RestClient getRestClient() {
|
||||||
|
if (this.ifConfigWasWrong()) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
RestClientBuilder restClientBuilder = this.getRestClientBuilder();
|
||||||
|
RestClient restClient = restClientBuilder.build();
|
||||||
|
this.setNodes(restClient);
|
||||||
|
LOG.info("The Low Level Rest Client has been created.");
|
||||||
|
return restClient;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean ifConfigWasWrong() {
|
||||||
|
if (this.configPath != null && this.configPath.length() != 0) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
LOG.info("Config path is not allowed to be empty.");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setNodes(RestClient restClient) {
|
||||||
|
List<Node> nodes = new ArrayList();
|
||||||
|
HttpHost[] var3 = hostArray;
|
||||||
|
int var4 = var3.length;
|
||||||
|
|
||||||
|
for (int var5 = 0; var5 < var4; ++var5) {
|
||||||
|
HttpHost httpHost = var3[var5];
|
||||||
|
nodes.add(new Node(httpHost));
|
||||||
|
}
|
||||||
|
|
||||||
|
restClient.setNodes(nodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,138 @@
|
||||||
|
package org.dromara.location.config;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
public final class KafkaProperties
|
||||||
|
{
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
|
||||||
|
|
||||||
|
// Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限
|
||||||
|
public final static String TOPIC = "jysb_dwxx";
|
||||||
|
|
||||||
|
private static Properties serverProps = new Properties();
|
||||||
|
|
||||||
|
private static Properties producerProps = new Properties();
|
||||||
|
|
||||||
|
private static Properties consumerProps = new Properties();
|
||||||
|
|
||||||
|
private static Properties clientProps = new Properties();
|
||||||
|
|
||||||
|
private static KafkaProperties instance = null;
|
||||||
|
|
||||||
|
private KafkaProperties()
|
||||||
|
{
|
||||||
|
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
|
||||||
|
String filePath = "/home/rsoft/config/";
|
||||||
|
LOG.info("路径=={}",filePath);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
File proFile = new File(filePath + "producer.properties");
|
||||||
|
|
||||||
|
if (proFile.exists())
|
||||||
|
{
|
||||||
|
producerProps.load(new FileInputStream(filePath + "producer.properties"));
|
||||||
|
}
|
||||||
|
|
||||||
|
File conFile = new File(filePath + "producer.properties");
|
||||||
|
|
||||||
|
if (conFile.exists())
|
||||||
|
{
|
||||||
|
consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
|
||||||
|
}
|
||||||
|
|
||||||
|
File serFile = new File(filePath + "server.properties");
|
||||||
|
|
||||||
|
if (serFile.exists())
|
||||||
|
{
|
||||||
|
serverProps.load(new FileInputStream(filePath + "server.properties"));
|
||||||
|
}
|
||||||
|
|
||||||
|
File cliFile = new File(filePath + "client.properties");
|
||||||
|
|
||||||
|
if (cliFile.exists())
|
||||||
|
{
|
||||||
|
clientProps.load(new FileInputStream(filePath + "client.properties"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e)
|
||||||
|
{
|
||||||
|
LOG.info("The Exception occured.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized static KafkaProperties getInstance()
|
||||||
|
{
|
||||||
|
if (null == instance)
|
||||||
|
{
|
||||||
|
instance = new KafkaProperties();
|
||||||
|
}
|
||||||
|
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取参数值
|
||||||
|
* @param key properites的key值
|
||||||
|
* @param defValue 默认值
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public String getValues(String key, String defValue)
|
||||||
|
{
|
||||||
|
String rtValue = null;
|
||||||
|
|
||||||
|
if (null == key)
|
||||||
|
{
|
||||||
|
LOG.error("key is null");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
rtValue = getPropertiesValue(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (null == rtValue)
|
||||||
|
{
|
||||||
|
LOG.warn("KafkaProperties.getValues return null, key is " + key);
|
||||||
|
rtValue = defValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
|
||||||
|
|
||||||
|
return rtValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 根据key值获取server.properties的值
|
||||||
|
* @param key
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private String getPropertiesValue(String key)
|
||||||
|
{
|
||||||
|
String rtValue = serverProps.getProperty(key);
|
||||||
|
|
||||||
|
// server.properties中没有,则再向producer.properties中获取
|
||||||
|
if (null == rtValue)
|
||||||
|
{
|
||||||
|
rtValue = producerProps.getProperty(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
// producer中没有,则再向consumer.properties中获取
|
||||||
|
if (null == rtValue)
|
||||||
|
{
|
||||||
|
rtValue = consumerProps.getProperty(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
// consumer没有,则再向client.properties中获取
|
||||||
|
if (null == rtValue)
|
||||||
|
{
|
||||||
|
rtValue = clientProps.getProperty(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rtValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,259 @@
|
||||||
|
package org.dromara.location.config;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
public class LoginUtil {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* no JavaDoc
|
||||||
|
*/
|
||||||
|
public enum Module {
|
||||||
|
KAFKA("KafkaClient"), ZOOKEEPER("Client");
|
||||||
|
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
private Module(String name)
|
||||||
|
{
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName()
|
||||||
|
{
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* line operator string
|
||||||
|
*/
|
||||||
|
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* jaas file postfix
|
||||||
|
*/
|
||||||
|
private static final String JAAS_POSTFIX = ".jaas.conf";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* is IBM jdk or not
|
||||||
|
*/
|
||||||
|
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* IBM jdk login module
|
||||||
|
*/
|
||||||
|
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* oracle jdk login module
|
||||||
|
*/
|
||||||
|
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Zookeeper quorum principal.
|
||||||
|
*/
|
||||||
|
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* java security krb5 file path
|
||||||
|
*/
|
||||||
|
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* java security login file path
|
||||||
|
*/
|
||||||
|
public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置jaas.conf文件
|
||||||
|
*
|
||||||
|
* @param principal
|
||||||
|
* @param keytabPath
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void setJaasFile(String principal, String keytabPath)
|
||||||
|
throws IOException {
|
||||||
|
String jaasPath =
|
||||||
|
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
|
||||||
|
+ JAAS_POSTFIX;
|
||||||
|
|
||||||
|
// windows路径下分隔符替换
|
||||||
|
jaasPath = jaasPath.replace("\\", "\\\\");
|
||||||
|
// 删除jaas文件
|
||||||
|
deleteJaasFile(jaasPath);
|
||||||
|
writeJaasFile(jaasPath, principal, keytabPath);
|
||||||
|
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置zookeeper服务端principal
|
||||||
|
*
|
||||||
|
* @param zkServerPrincipal
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void setZookeeperServerPrincipal(String zkServerPrincipal)
|
||||||
|
throws IOException {
|
||||||
|
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
|
||||||
|
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
|
||||||
|
if (ret == null)
|
||||||
|
{
|
||||||
|
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
|
||||||
|
}
|
||||||
|
if (!ret.equals(zkServerPrincipal))
|
||||||
|
{
|
||||||
|
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 设置krb5文件
|
||||||
|
*
|
||||||
|
* @param krb5ConfFile
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void setKrb5Config(String krb5ConfFile)
|
||||||
|
throws IOException {
|
||||||
|
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
|
||||||
|
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
|
||||||
|
if (ret == null)
|
||||||
|
{
|
||||||
|
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
|
||||||
|
}
|
||||||
|
if (!ret.equals(krb5ConfFile))
|
||||||
|
{
|
||||||
|
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 写入jaas文件
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* 写文件异常
|
||||||
|
*/
|
||||||
|
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
|
||||||
|
throws IOException {
|
||||||
|
FileWriter writer = new FileWriter(new File(jaasPath));
|
||||||
|
try
|
||||||
|
{
|
||||||
|
writer.write(getJaasConfContext(principal, keytabPath));
|
||||||
|
writer.flush();
|
||||||
|
}
|
||||||
|
catch (IOException e)
|
||||||
|
{
|
||||||
|
throw new IOException("Failed to create jaas.conf File");
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void deleteJaasFile(String jaasPath)
|
||||||
|
throws IOException {
|
||||||
|
File jaasFile = new File(jaasPath);
|
||||||
|
if (jaasFile.exists())
|
||||||
|
{
|
||||||
|
if (!jaasFile.delete())
|
||||||
|
{
|
||||||
|
throw new IOException("Failed to delete exists jaas file.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getJaasConfContext(String principal, String keytabPath) {
|
||||||
|
Module[] allModule = Module.values();
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
for (Module modlue : allModule)
|
||||||
|
{
|
||||||
|
builder.append(getModuleContext(principal, keytabPath, modlue));
|
||||||
|
}
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
if (IS_IBM_JDK) {
|
||||||
|
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
|
||||||
|
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
|
||||||
|
builder.append("credsType=both").append(LINE_SEPARATOR);
|
||||||
|
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
|
||||||
|
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
|
||||||
|
builder.append("debug=true;").append(LINE_SEPARATOR);
|
||||||
|
builder.append("};").append(LINE_SEPARATOR);
|
||||||
|
} else {
|
||||||
|
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
|
||||||
|
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
|
||||||
|
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
|
||||||
|
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
|
||||||
|
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
|
||||||
|
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
|
||||||
|
builder.append("storeKey=true").append(LINE_SEPARATOR);
|
||||||
|
builder.append("debug=true;").append(LINE_SEPARATOR);
|
||||||
|
builder.append("};").append(LINE_SEPARATOR);
|
||||||
|
}
|
||||||
|
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
|
||||||
|
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
|
||||||
|
String filePath = "/home/rsoft/config/";
|
||||||
|
String krbFile = filePath + "krb5.conf";
|
||||||
|
String userKeyTableFile = filePath + keyTabFile;
|
||||||
|
|
||||||
|
// windows路径下分隔符替换
|
||||||
|
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
|
||||||
|
krbFile = krbFile.replace("\\", "\\\\");
|
||||||
|
|
||||||
|
LoginUtil.setKrb5Config(krbFile);
|
||||||
|
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
|
||||||
|
LoginUtil.setJaasFile(principal, userKeyTableFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check security mode
|
||||||
|
*
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
public static Boolean isSecurityModel() {
|
||||||
|
Boolean isSecurity = false;
|
||||||
|
// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
|
||||||
|
String krbFilePath = "/home/rsoft/config/kafkaSecurityMode";
|
||||||
|
Properties securityProps = new Properties();
|
||||||
|
|
||||||
|
// file does not exist.
|
||||||
|
if (!isFileExists(krbFilePath)) {
|
||||||
|
return isSecurity;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
securityProps.load(new FileInputStream(krbFilePath));
|
||||||
|
|
||||||
|
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
|
||||||
|
{
|
||||||
|
isSecurity = true;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("The Exception occured : {}.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return isSecurity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* 判断文件是否存在
|
||||||
|
*/
|
||||||
|
private static boolean isFileExists(String fileName) {
|
||||||
|
File file = new File(fileName);
|
||||||
|
|
||||||
|
return file.exists();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -41,7 +41,7 @@ import java.util.function.Consumer;
|
||||||
public class SearchServiceImpl implements ISearchService {
|
public class SearchServiceImpl implements ISearchService {
|
||||||
|
|
||||||
@Resource(name = "restHighLevelClient")
|
@Resource(name = "restHighLevelClient")
|
||||||
private RestHighLevelClient restHighLevelClient;
|
private RestHighLevelClient restHighLevelClient;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,8 @@ spring:
|
||||||
profiles:
|
profiles:
|
||||||
# 环境配置
|
# 环境配置
|
||||||
active: @profiles.active@
|
active: @profiles.active@
|
||||||
|
autoconfigure:
|
||||||
|
exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
|
||||||
|
|
||||||
--- # nacos 配置
|
--- # nacos 配置
|
||||||
spring:
|
spring:
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
kafka.client.security.mode = yes
|
||||||
|
|
@ -24,4 +24,6 @@ public class DeviceRedis {
|
||||||
|
|
||||||
private Date gpsTime;
|
private Date gpsTime;
|
||||||
|
|
||||||
|
private String infoSource;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package org.dromara.system.service.impl;
|
||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.ibatis.executor.BatchResult;
|
||||||
import org.apache.ibatis.session.ExecutorType;
|
import org.apache.ibatis.session.ExecutorType;
|
||||||
import org.apache.ibatis.session.SqlSession;
|
import org.apache.ibatis.session.SqlSession;
|
||||||
import org.apache.ibatis.session.SqlSessionFactory;
|
import org.apache.ibatis.session.SqlSessionFactory;
|
||||||
|
|
@ -18,7 +19,10 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
|
|
@ -35,34 +39,76 @@ public class DeviceRedisServiceImpl implements IDeviceRedisService {
|
||||||
SqlSessionFactory sqlSessionFactory;
|
SqlSessionFactory sqlSessionFactory;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public int insertBatch(List<DeviceRedis> list) {
|
public int insertBatch(List<DeviceRedis> list) {
|
||||||
|
log.error("本次查询离线在线数据大小={}", list.size());
|
||||||
|
|
||||||
|
// 在插入前对数据进行去重(基于唯一约束)
|
||||||
|
Map<String, DeviceRedis> uniqueMap = new LinkedHashMap<>();
|
||||||
|
for (DeviceRedis device : list) {
|
||||||
|
String key = device.getDeviceCode() + "|" +
|
||||||
|
device.getDeviceType() + "|" +
|
||||||
|
device.getInfoSource();
|
||||||
|
// 保留最后一次出现的记录(根据业务需求调整)
|
||||||
|
uniqueMap.put(key, device);
|
||||||
|
}
|
||||||
|
List<DeviceRedis> uniqueList = new ArrayList<>(uniqueMap.values());
|
||||||
|
|
||||||
|
log.error("去重后数据大小={}", uniqueList.size());
|
||||||
|
|
||||||
int groupSize = 2000;
|
int groupSize = 2000;
|
||||||
int groupNo = list.size() / groupSize;
|
int totalSize = uniqueList.size();
|
||||||
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
|
SqlSession sqlSession = null;
|
||||||
int num = 0;
|
int totalAffectedRows = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (list.size() <= groupSize) {
|
sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false); // 手动控制事务
|
||||||
num += baseMapper.insertBatch(list);
|
DeviceRedisMapper batchMapper = sqlSession.getMapper(DeviceRedisMapper.class);
|
||||||
} else {
|
|
||||||
List<DeviceRedis> subList=null;
|
// 详细记录分组信息
|
||||||
for (int i = 0; i < groupNo; i++) {
|
log.error("数据分组信息: 总大小={}, 分组大小={}, 预计分组数={}",
|
||||||
subList = list.subList(0, groupSize);
|
totalSize, groupSize, (int) Math.ceil((double) totalSize / groupSize));
|
||||||
num += baseMapper.insertBatch(subList);
|
|
||||||
list.subList(0, groupSize).clear();
|
// 分组处理(不修改原始list)
|
||||||
}
|
for (int i = 0; i < totalSize; i += groupSize) {
|
||||||
if (list.size() > 0) {
|
int toIndex = Math.min(i + groupSize, totalSize);
|
||||||
num += baseMapper.insertBatch(list);
|
List<DeviceRedis> subList = uniqueList.subList(i, toIndex);
|
||||||
|
|
||||||
|
try {
|
||||||
|
batchMapper.insertBatch(subList);
|
||||||
|
log.error("分组 {} 成功提交: {}~{} 条",
|
||||||
|
i/groupSize+1, i, toIndex);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 详细记录每个分组的异常
|
||||||
|
log.error("分组 {} 插入失败 ({}~{} 条): {}",
|
||||||
|
i/groupSize+1, i, toIndex, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sqlSession.flushStatements();
|
|
||||||
return num;
|
// 显式提交事务并获取影响行数
|
||||||
}catch (Exception e){
|
List<BatchResult> batchResults = sqlSession.flushStatements();
|
||||||
e.printStackTrace();
|
sqlSession.commit();
|
||||||
}finally {
|
|
||||||
sqlSession.close();
|
// 统计实际影响行数
|
||||||
return num;
|
for (BatchResult br : batchResults) {
|
||||||
|
for (int count : br.getUpdateCounts()) {
|
||||||
|
totalAffectedRows += count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 记录全局异常
|
||||||
|
log.error("批量插入整体失败: {}", e.getMessage(), e);
|
||||||
|
if (sqlSession != null) {
|
||||||
|
sqlSession.rollback();
|
||||||
|
log.error("事务已回滚");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (sqlSession != null) {
|
||||||
|
sqlSession.close();
|
||||||
|
}
|
||||||
|
log.error("实际入库数量={}/{}", totalAffectedRows, totalSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return totalAffectedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -13,15 +13,15 @@
|
||||||
</resultMap>
|
</resultMap>
|
||||||
|
|
||||||
<insert id="insertBatch">
|
<insert id="insertBatch">
|
||||||
insert into t_device_redis (device_code,device_type,online,zzjgdm,gps_time)
|
insert into t_device_redis (device_code,device_type,online,zzjgdm,gps_time,info_source)
|
||||||
values
|
values
|
||||||
<foreach collection="list" item="entity" separator=",">
|
<foreach collection="list" item="entity" separator=",">
|
||||||
(
|
(
|
||||||
#{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm},#{entity.gpsTime}
|
#{entity.deviceCode},#{entity.deviceType},#{entity.online},#{entity.zzjgdm},#{entity.gpsTime},#{entity.infoSource}
|
||||||
)
|
)
|
||||||
</foreach>
|
</foreach>
|
||||||
ON conflict(device_code,device_type) do update set
|
ON conflict(device_code,device_type,info_source) do update set
|
||||||
(online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gpsTime)
|
(online,zzjgdm,gps_time) =(EXCLUDED.online,EXCLUDED.zzjgdm,EXCLUDED.gps_time)
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
<!-- 全省各类设备总数、在线数 -->
|
<!-- 全省各类设备总数、在线数 -->
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue