省厅版本位置汇聚对接华为认证的kafka ES
parent
8febeeb5e4
commit
040885e507
6
pom.xml
6
pom.xml
|
|
@ -89,12 +89,12 @@
|
|||
<id>prod</id>
|
||||
<properties>
|
||||
<profiles.active>prod</profiles.active>
|
||||
<nacos.server>127.0.0.1:8848</nacos.server>
|
||||
<nacos.server>53.16.17.16:8848</nacos.server>
|
||||
<nacos.discovery.group>DEFAULT_GROUP</nacos.discovery.group>
|
||||
<nacos.config.group>DEFAULT_GROUP</nacos.config.group>
|
||||
<nacos.username>nacos</nacos.username>
|
||||
<nacos.password>nacos</nacos.password>
|
||||
<logstash.address>127.0.0.1:4560</logstash.address>
|
||||
<nacos.password>Ycgis!2509</nacos.password>
|
||||
<logstash.address>53.16.17.16:4560</logstash.address>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
|
|
|||
|
|
@ -131,6 +131,13 @@
|
|||
<artifactId>elasticsearch-rest-client</artifactId>
|
||||
<version>7.14.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.elasticsearch.client</groupId>
|
||||
<artifactId>elasticsearch-rest-client</artifactId>
|
||||
<version>7.6.0-hw-ei-302002</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.elasticsearch.client</groupId>
|
||||
<artifactId>elasticsearch-rest-high-level-client</artifactId>
|
||||
|
|
@ -147,6 +154,12 @@
|
|||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>2.4.0-hw-ei-302002</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- kafka -->
|
||||
<dependency>
|
||||
|
|
|
|||
|
|
@ -20,10 +20,11 @@ import java.util.List;
|
|||
/**
|
||||
* restHighLevelClient 客户端配置类
|
||||
*/
|
||||
@Slf4j
|
||||
|
||||
/*@Slf4j
|
||||
@Data
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "elasticsearch")
|
||||
@ConfigurationProperties(prefix = "elasticsearch")*/
|
||||
public class ElasticsearchConfig {
|
||||
|
||||
// es host ip 地址(集群)
|
||||
|
|
@ -85,7 +86,7 @@ public class ElasticsearchConfig {
|
|||
});
|
||||
restHighLevelClient = new RestHighLevelClient(builder);
|
||||
} catch (NumberFormatException e) {
|
||||
log.error("ES 连接池初始化异常");
|
||||
// log.error("ES 连接池初始化异常");
|
||||
}
|
||||
return restHighLevelClient;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
package org.dromara.data2es.config;
|
||||
|
||||
import org.dromara.data2es.util.GenerateEnumUtil;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.hwclient.HwRestClient;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-07-05 18:22
|
||||
*/
|
||||
@Component(value = "esConfig")
|
||||
public class EsConfig {
|
||||
|
||||
private String prefix = "gpsinfo";
|
||||
|
||||
public String indexNameByDay(){
|
||||
return prefix+ LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
||||
}
|
||||
|
||||
@Bean(destroyMethod = "close",name = "restHighLevelClient")
|
||||
public RestHighLevelClient restClient() {
|
||||
// String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator;
|
||||
String configPath = "/rsoft/";
|
||||
|
||||
// KAFKA("KafkaClient"), ZOOKEEPER("Client");
|
||||
GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient");
|
||||
GenerateEnumUtil.addEnum(LoginUtil.Module.class,"ZOOKEEPER","Client");
|
||||
HwRestClient hwRestClient = new HwRestClient(configPath);
|
||||
RestHighLevelClient highLevelClient = new RestHighLevelClient(hwRestClient.getRestClientBuilder());
|
||||
return highLevelClient;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -24,11 +24,11 @@ public class KafkaConfig {
|
|||
|
||||
private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
|
||||
|
||||
// private String kafkaServers = "140.168.2.31:21007,140.168.2.32:21007,140.168.2.33:21007";
|
||||
private String kafkaServers = "140.168.2.31:21007,140.168.2.32:21007,140.168.2.33:21007"; //省厅 kafka
|
||||
// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网
|
||||
// private String kafkaServers = "34.72.62.93:9092";//六安视频网
|
||||
// private String kafkaServers = "127.0.0.1:9092";//本地
|
||||
private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供
|
||||
// private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供
|
||||
|
||||
private String groupId = "ruansiProducer";
|
||||
|
||||
|
|
@ -128,11 +128,11 @@ public class KafkaConfig {
|
|||
// props.put(kerberosDomainName, "hadoop.hadoop.com");
|
||||
//设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition
|
||||
//props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
|
||||
props.put(securityProtocol, "SASL_PLAINTEXT");
|
||||
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
|
||||
props.put("sasl.mechanism", "SCRAM-SHA-256");
|
||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||
// KafkaProducer producer = new KafkaProducer<>(props);
|
||||
// props.put(securityProtocol, "SASL_PLAINTEXT");
|
||||
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
|
||||
// props.put("sasl.mechanism", "SCRAM-SHA-256");
|
||||
// KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||
KafkaProducer producer = new KafkaProducer<>(props);
|
||||
|
||||
return producer;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ public class KafkaSecurityUtil {
|
|||
/**
|
||||
* 用户自己申请的机机账号名称
|
||||
*/
|
||||
private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM";
|
||||
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@HADOOP.COM";
|
||||
|
||||
public static void securityPrepare() throws IOException
|
||||
{
|
||||
|
|
@ -34,10 +34,10 @@ public class KafkaSecurityUtil {
|
|||
//String krbFile = filePath + "krb5.conf";
|
||||
//ClassPathResource classPathResource = new ClassPathResource("krb5.conf");
|
||||
//String krbFile = classPathResource.getAbsolutePath();
|
||||
String krbFile = "/gpsstore/krb5.conf";
|
||||
String krbFile = "/rsoft/krb5.conf";
|
||||
// String userKeyTableFile = filePath + USER_KEYTAB_FILE;
|
||||
//ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE);
|
||||
String userKeyTableFile = "/gpsstore/user.keytab";
|
||||
String userKeyTableFile = "/rsoft/user.keytab";
|
||||
|
||||
//windows路径下分隔符替换
|
||||
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
|
||||
|
|
|
|||
|
|
@ -0,0 +1,146 @@
|
|||
package org.dromara.data2es.util;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.*;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author luya
|
||||
* @date 2025-07-08
|
||||
*/
|
||||
public class GenerateEnumUtil {
|
||||
|
||||
static Logger log = LoggerFactory.getLogger(GenerateEnumUtil.class);
|
||||
|
||||
/**
|
||||
* 添加枚举类
|
||||
*
|
||||
* @param enumClass 枚举类
|
||||
* @param enumName 枚举名称 (不可重复)
|
||||
* @param params 属性参数,按顺序写入
|
||||
*/
|
||||
public static <T extends Enum<?>> void addEnum(Class<T> enumClass, String enumName, Object... params) {
|
||||
sanityChecks(enumClass, enumName);
|
||||
Field valuesField = null;
|
||||
Field[] fields = enumClass.getDeclaredFields();
|
||||
List<Class<?>> paramTypes = new LinkedList<>();
|
||||
|
||||
for (Field field : fields) {
|
||||
if (field.isEnumConstant() && field.getName().equals(enumName)) {
|
||||
log.warn("该枚举类已经存在!");
|
||||
return;
|
||||
}
|
||||
if (field.isSynthetic() && field.getName().contains("$VALUES")) {
|
||||
valuesField = field;
|
||||
}
|
||||
if (!field.isSynthetic() && !field.isEnumConstant()) {
|
||||
paramTypes.add(field.getType());
|
||||
}
|
||||
}
|
||||
|
||||
if (valuesField == null) {
|
||||
throw new RuntimeException("未获取到合成类型");
|
||||
}
|
||||
|
||||
try {
|
||||
// 设置可访问
|
||||
valuesField.setAccessible(true);
|
||||
T[] previousValues = (T[]) valuesField.get(null);
|
||||
List<T> values = new ArrayList<>(Arrays.asList(previousValues));
|
||||
|
||||
// 创建新枚举实例
|
||||
T newValue = makeEnum(enumClass, enumName, values.size(), paramTypes.toArray(new Class[0]), params);
|
||||
values.add(newValue);
|
||||
|
||||
// 更新枚举数组
|
||||
setFailsafeFieldValue(valuesField, null, values.toArray((T[]) Array.newInstance(enumClass, 0)));
|
||||
|
||||
// 清理枚举缓存
|
||||
cleanEnumCache(enumClass);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("添加枚举失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验参数
|
||||
*/
|
||||
private static <T extends Enum<?>> void sanityChecks(Class<T> enumClass, String enumName) {
|
||||
if (!Enum.class.isAssignableFrom(enumClass)) {
|
||||
throw new RuntimeException(enumClass + " 不是一个枚举类。");
|
||||
}
|
||||
if (enumName == null || enumName.trim().isEmpty()) {
|
||||
throw new RuntimeException("枚举名称不能为空");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建枚举实例
|
||||
*/
|
||||
private static <T> T makeEnum(Class<T> enumClass, String enumName, int ordinal,
|
||||
Class<?>[] additionalTypes, Object[] additionalValues) throws Exception {
|
||||
Class<?>[] paramTypes = new Class[additionalTypes.length + 2];
|
||||
paramTypes[0] = String.class;
|
||||
paramTypes[1] = int.class;
|
||||
System.arraycopy(additionalTypes, 0, paramTypes, 2, additionalTypes.length);
|
||||
|
||||
// 获取并调用构造器
|
||||
Constructor<T> constructor = enumClass.getDeclaredConstructor(paramTypes);
|
||||
constructor.setAccessible(true);
|
||||
|
||||
Object[] params = new Object[additionalValues.length + 2];
|
||||
params[0] = enumName;
|
||||
params[1] = ordinal;
|
||||
System.arraycopy(additionalValues, 0, params, 2, additionalValues.length);
|
||||
|
||||
return constructor.newInstance(params);
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置字段值(包括final字段)
|
||||
*/
|
||||
private static void setFailsafeFieldValue(Field field, Object target, Object value)
|
||||
throws NoSuchFieldException, IllegalAccessException {
|
||||
|
||||
field.setAccessible(true);
|
||||
|
||||
// 处理final修饰符
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
|
||||
int originalModifiers = field.getModifiers();
|
||||
modifiersField.setInt(field, originalModifiers & ~Modifier.FINAL);
|
||||
|
||||
// 设置字段值
|
||||
field.set(target, value);
|
||||
|
||||
// 恢复原始修饰符(可选)
|
||||
modifiersField.setInt(field, originalModifiers);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理枚举缓存
|
||||
*/
|
||||
private static void cleanEnumCache(Class<?> enumClass)
|
||||
throws NoSuchFieldException, IllegalAccessException {
|
||||
|
||||
blankField(enumClass, "enumConstantDirectory");
|
||||
blankField(enumClass, "enumConstants");
|
||||
}
|
||||
|
||||
private static void blankField(Class<?> enumClass, String fieldName)
|
||||
throws NoSuchFieldException, IllegalAccessException {
|
||||
|
||||
for (Field field : Class.class.getDeclaredFields()) {
|
||||
if (field.getName().contains(fieldName)) {
|
||||
field.setAccessible(true);
|
||||
setFailsafeFieldValue(field, enumClass, null);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue