省厅位置汇聚添加华为认证kafka和ES

stwzhj
luyya 2025-07-22 17:27:14 +08:00
parent 91a54e2d26
commit 5f9ecaa366
18 changed files with 736 additions and 550 deletions

View File

@ -89,12 +89,12 @@
<id>prod</id> <id>prod</id>
<properties> <properties>
<profiles.active>prod</profiles.active> <profiles.active>prod</profiles.active>
<nacos.server>53.16.17.16:8848</nacos.server> <nacos.server>53.16.17.13:8848</nacos.server>
<nacos.discovery.group>DEFAULT_GROUP</nacos.discovery.group> <nacos.discovery.group>DEFAULT_GROUP</nacos.discovery.group>
<nacos.config.group>DEFAULT_GROUP</nacos.config.group> <nacos.config.group>DEFAULT_GROUP</nacos.config.group>
<nacos.username>nacos</nacos.username> <nacos.username>nacos</nacos.username>
<nacos.password>Ycgis!2509</nacos.password> <nacos.password>Ycgis!2509</nacos.password>
<logstash.address>53.16.17.16:4560</logstash.address> <logstash.address>53.16.17.13:4560</logstash.address>
</properties> </properties>
</profile> </profile>
</profiles> </profiles>

View File

@ -23,14 +23,14 @@ dubbo:
address: redis://${spring.data.redis.host}:${spring.data.redis.port} address: redis://${spring.data.redis.host}:${spring.data.redis.port}
group: DUBBO_GROUP group: DUBBO_GROUP
username: dubbo username: dubbo
password: ruoyi123 password: ${spring.data.redis.password}
# 集群开关 # 集群开关
sentinel: true sentinel: true
parameters: parameters:
namespace: ${spring.profiles.active} namespace: ${spring.profiles.active}
database: ${spring.data.redis.database} database: ${spring.data.redis.database}
timeout: ${spring.data.redis.timeout} timeout: ${spring.data.redis.timeout}
backup: 53.16.17.13:26380,53.16.17.14:26380,53.16.17.16:26380 backup: ${spring.data.redis.sentinel.nodes}
# metadata-report: # metadata-report:
# address: redis://${spring.data.redis.host}:${spring.data.redis.port} # address: redis://${spring.data.redis.host}:${spring.data.redis.port}
# group: DUBBO_GROUP # group: DUBBO_GROUP

View File

@ -0,0 +1,138 @@
package org.dromara.kafka.consumer.config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Properties;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-03 14:15
*/
@Component
public class KafkaConfig {
private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka
// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网
// private String kafkaServers = "34.72.62.93:9092";//六安视频网
// private String kafkaServers = "127.0.0.1:9092";//本地
// private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供
private String groupId = "ruansiProducer";
// Broker地址列表
private final String bootstrapServers = "bootstrap.servers";
// 客户端ID
private final String clientId = "client.id";
// Key序列化类
private final String keySerializer = "key.serializer";
// Value序列化类
private final String valueSerializer = "value.serializer";
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
private final String securityProtocol = "security.protocol";
// 服务名
private final String saslKerberosServiceName = "sasl.kerberos.service.name";
// 域名
private final String kerberosDomainName = "kerberos.domain.name";
//默认发送20条消息
private final int messageNumToSend = 100;
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
/**
* Producer
* @param
* @param
*/
@Bean(name = "myKafkaProducer")
public KafkaProducer newProducer() {
Properties props = new Properties();
if (true)
{
try
{
logger.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
props.put(securityProtocol, "SASL_PLAINTEXT");
// props.put("sasl.mechanism", "GSSAPI");
// 服务名
props.put(saslKerberosServiceName, "kafka");
// 域名
props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM");
}
catch (IOException e)
{
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);
return null;
}
logger.info("Security prepare success.");
}else{
props.put(securityProtocol, "PLAINTEXT");
}
// Broker地址列表
props.put(bootstrapServers,kafkaServers);
// 客户端ID
props.put(clientId, "ruansiProducer");
// Key序列化类
props.put(keySerializer,
"org.apache.kafka.common.serialization.IntegerSerializer");
// Value序列化类
props.put(valueSerializer,
"org.apache.kafka.common.serialization.StringSerializer");
//批量发送信息配置
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
//props.put(securityProtocol, "SASL_PLAINTEXT");
// // 服务名
// props.put(saslKerberosServiceName, "kafka");
// // 域名
// props.put(kerberosDomainName, "hadoop.hadoop.com");
//设置自定义的分区策略类默认不传key是粘性分区尽量往一个分区中发消息。如果key不为null则默认是按照key的hashcode与 partition的取余来决定哪个partition
//props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
// props.put(securityProtocol, "SASL_PLAINTEXT");
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
// props.put("sasl.mechanism", "SCRAM-SHA-256");
// KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaProducer producer = new KafkaProducer<>(props);
return producer;
}
}

View File

@ -4,17 +4,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.util.Properties;
public class LoginUtil public class LoginUtil {
{ private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);
static Logger logger = LoggerFactory.getLogger(LoginUtil.class); /**
* no JavaDoc
public enum Module */
{ public enum Module {
STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); KAFKA("KafkaClient"), ZOOKEEPER("Client");
private String name; private String name;
@ -77,8 +79,7 @@ public class LoginUtil
* @throws IOException * @throws IOException
*/ */
public static void setJaasFile(String principal, String keytabPath) public static void setJaasFile(String principal, String keytabPath)
throws IOException throws IOException {
{
String jaasPath = String jaasPath =
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ JAAS_POSTFIX; + JAAS_POSTFIX;
@ -88,7 +89,6 @@ public class LoginUtil
// 删除jaas文件 // 删除jaas文件
deleteJaasFile(jaasPath); deleteJaasFile(jaasPath);
writeJaasFile(jaasPath, principal, keytabPath); writeJaasFile(jaasPath, principal, keytabPath);
logger.error("jaasPath--{}",jaasPath);
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
} }
@ -99,8 +99,7 @@ public class LoginUtil
* @throws IOException * @throws IOException
*/ */
public static void setZookeeperServerPrincipal(String zkServerPrincipal) public static void setZookeeperServerPrincipal(String zkServerPrincipal)
throws IOException throws IOException {
{
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
if (ret == null) if (ret == null)
@ -120,8 +119,7 @@ public class LoginUtil
* @throws IOException * @throws IOException
*/ */
public static void setKrb5Config(String krb5ConfFile) public static void setKrb5Config(String krb5ConfFile)
throws IOException throws IOException {
{
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
if (ret == null) if (ret == null)
@ -141,8 +139,7 @@ public class LoginUtil
* *
*/ */
private static void writeJaasFile(String jaasPath, String principal, String keytabPath) private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
throws IOException throws IOException {
{
FileWriter writer = new FileWriter(new File(jaasPath)); FileWriter writer = new FileWriter(new File(jaasPath));
try try
{ {
@ -160,8 +157,7 @@ public class LoginUtil
} }
private static void deleteJaasFile(String jaasPath) private static void deleteJaasFile(String jaasPath)
throws IOException throws IOException {
{
File jaasFile = new File(jaasPath); File jaasFile = new File(jaasPath);
if (jaasFile.exists()) if (jaasFile.exists())
{ {
@ -172,8 +168,7 @@ public class LoginUtil
} }
} }
private static String getJaasConfContext(String principal, String keytabPath) private static String getJaasConfContext(String principal, String keytabPath) {
{
Module[] allModule = Module.values(); Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (Module modlue : allModule) for (Module modlue : allModule)
@ -183,11 +178,9 @@ public class LoginUtil
return builder.toString(); return builder.toString();
} }
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
{
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK) if (IS_IBM_JDK) {
{
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR); builder.append("credsType=both").append(LINE_SEPARATOR);
@ -195,9 +188,7 @@ public class LoginUtil
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR); builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR); builder.append("};").append(LINE_SEPARATOR);
} } else {
else
{
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR); builder.append("useKeyTab=true").append(LINE_SEPARATOR);
@ -211,4 +202,58 @@ public class LoginUtil
return builder.toString(); return builder.toString();
} }
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/rsoft/config/";
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + keyTabFile;
// windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
LoginUtil.setJaasFile(principal, userKeyTableFile);
}
/**
* Check security mode
*
* @return boolean
*/
public static Boolean isSecurityModel() {
Boolean isSecurity = false;
String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
Properties securityProps = new Properties();
// file does not exist.
if (!isFileExists(krbFilePath)) {
return isSecurity;
}
try {
securityProps.load(new FileInputStream(krbFilePath));
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
{
isSecurity = true;
}
} catch (Exception e) {
LOG.info("The Exception occured : {}.", e);
}
return isSecurity;
}
/*
*
*/
private static boolean isFileExists(String fileName) {
File file = new File(fileName);
return file.exists();
}
} }

View File

@ -1,108 +0,0 @@
package org.dromara.kafka.consumer.handler;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import org.dromara.kafka.consumer.entity.EsGpsInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-10-28 14:48
*/
public class KafkaSecurityUtil {
static Logger logger = LoggerFactory.getLogger(KafkaSecurityUtil.class);
public static void main(String[] args) {
EsGpsInfo esGpsInfo = new EsGpsInfo();
String realtime = "2021/11/04 12:00:11";
DateTime dateTime = DateUtil.parse(realtime);
esGpsInfo.setGpsTime(dateTime.toJdkDate());
logger.info("esGpsInfo:{},deviceType={},gpsTime={}",esGpsInfo.toString(),
esGpsInfo.getDeviceType(),dateTime.toJdkDate().toString());
}
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM";
public static void securityPrepare() throws IOException
{
logger.error("进入了---securityPrepare");
//String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
//String krbFile = filePath + "krb5.conf";
//ClassPathResource classPathResource = new ClassPathResource("krb5.conf");
//String krbFile = classPathResource.getAbsolutePath();
String krbFile = "/gpsstore/krb5.conf";
// String userKeyTableFile = filePath + USER_KEYTAB_FILE;
//ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE);
String userKeyTableFile = "/gpsstore/user.keytab";
//windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
logger.error("userKeyTableFile路径---{}",userKeyTableFile);
LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile);
}
public static Boolean isSecurityModel()
{
Boolean isSecurity = false;
//String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
//ClassPathResource classPathResource = new ClassPathResource("kafkaSecurityMode");
InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafkaSecurityMode");
/*File file = classPathResource.getFile();
if(!file.exists()){
return isSecurity;
}*/
Properties securityProps = new Properties();
try
{
securityProps.load(inputStream);
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
{
isSecurity = true;
}
}
catch (Exception e)
{
logger.info("The Exception occured : {}.", e);
}
return isSecurity;
}
/*
*
*/
private static boolean isFileExists(String fileName)
{
File file = new File(fileName);
return file.exists();
}
}

View File

@ -1,215 +0,0 @@
package org.dromara.kafka.consumer.handler;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-10-28 15:40
*/
public class LoginUtil
{
public enum Module
{
STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client");
private String name;
private Module(String name)
{
this.name = name;
}
public String getName()
{
return name;
}
}
/**
* line operator string
*/
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
/**
* jaas file postfix
*/
private static final String JAAS_POSTFIX = ".jaas.conf";
/**
* is IBM jdk or not
*/
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
/**
* IBM jdk login module
*/
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
/**
* oracle jdk login module
*/
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
/**
* Zookeeper quorum principal.
*/
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
/**
* java security krb5 file path
*/
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
/**
* java security login file path
*/
public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
/**
* jaas.conf
*
* @param principal
* @param keytabPath
* @throws IOException
*/
public static void setJaasFile(String principal, String keytabPath)
throws IOException
{
String jaasPath =
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ JAAS_POSTFIX;
// windows路径下分隔符替换
jaasPath = jaasPath.replace("\\", "\\\\");
// 删除jaas文件
deleteJaasFile(jaasPath);
writeJaasFile(jaasPath, principal, keytabPath);
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
}
/**
* zookeeperprincipal
*
* @param zkServerPrincipal
* @throws IOException
*/
public static void setZookeeperServerPrincipal(String zkServerPrincipal)
throws IOException
{
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
if (ret == null)
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
}
if (!ret.equals(zkServerPrincipal))
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
}
}
/**
* krb5
*
* @param krb5ConfFile
* @throws IOException
*/
public static void setKrb5Config(String krb5ConfFile)
throws IOException
{
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
if (ret == null)
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
}
if (!ret.equals(krb5ConfFile))
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
}
}
/**
* jaas
*
* @throws IOException
*
*/
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
throws IOException
{
FileWriter writer = new FileWriter(new File(jaasPath));
try
{
writer.write(getJaasConfContext(principal, keytabPath));
writer.flush();
}
catch (IOException e)
{
throw new IOException("Failed to create jaas.conf File");
}
finally
{
writer.close();
}
}
private static void deleteJaasFile(String jaasPath)
throws IOException
{
File jaasFile = new File(jaasPath);
if (jaasFile.exists())
{
if (!jaasFile.delete())
{
throw new IOException("Failed to delete exists jaas file.");
}
}
}
private static String getJaasConfContext(String principal, String keytabPath)
{
Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder();
for (Module modlue : allModule)
{
builder.append(getModuleContext(principal, keytabPath, modlue));
}
return builder.toString();
}
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module)
{
StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK)
{
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
else
{
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
builder.append("storeKey=true").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
return builder.toString();
}
}

View File

@ -4,6 +4,7 @@ package org.dromara.kafka.consumer.handler;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.dromara.kafka.consumer.config.KafkaPropertiesConfig; import org.dromara.kafka.consumer.config.KafkaPropertiesConfig;
import org.dromara.kafka.consumer.config.LoginUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -55,17 +56,17 @@ public class RealConsumer implements CommandLineRunner {
groupId = "group_ruansi_xuancheng"; groupId = "group_ruansi_xuancheng";
cityCode = "3418"; cityCode = "3418";
if(args.length > 0){ if(args.length > 0){
/*kafkaServers = args[0]; kafkaServers = args[0];
topics = args[1]; topics = args[1];
groupId = args[2]; groupId = args[2];
cityCode = args[3];*/ cityCode = args[3];
} }
ExecutorService executorService = Executors.newSingleThreadExecutor(); ExecutorService executorService = Executors.newSingleThreadExecutor();
Map kafkaProp = getKafkaProp(); Map kafkaProp = getKafkaProp();
if (KafkaSecurityUtil.isSecurityModel()) if (LoginUtil.isSecurityModel())
{ {
try try
{ {
@ -78,7 +79,7 @@ public class RealConsumer implements CommandLineRunner {
kafkaProp.put("sasl.kerberos.service.name","kafka"); kafkaProp.put("sasl.kerberos.service.name","kafka");
//域名 //域名
kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com"); kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com");
KafkaSecurityUtil.securityPrepare(); LoginUtil.setJaasFile("","");
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -114,42 +114,66 @@
<artifactId>stwzhj-api-data2es</artifactId> <artifactId>stwzhj-api-data2es</artifactId>
</dependency> </dependency>
<!--elasticsearch-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.14.0</version>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.elasticsearch.client</groupId> <groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId> <artifactId>elasticsearch-rest-client</artifactId>
<version>7.14.0</version> <version>7.10.2-h0.cbu.mrs.350.r11</version>
</dependency> </dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.6.0-hw-ei-302002</version>
</dependency>
<dependency> <dependency>
<groupId>org.elasticsearch.client</groupId> <groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId> <artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14.0</version> <version>7.10.2-h0.cbu.mrs.350.r11</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.elasticsearch</groupId> <groupId>org.elasticsearch.plugin</groupId>
<artifactId>elasticsearch</artifactId> <artifactId>parent-join-client</artifactId>
</exclusion> </exclusion>
<exclusion> <exclusion>
<artifactId>elasticsearch-rest-client</artifactId> <groupId>org.elasticsearch.plugin</groupId>
<groupId>org.elasticsearch.client</groupId> <artifactId>aggs-matrix-stats-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.10.2-h0.cbu.mrs.350.r11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.6.1-h0.cbu.mrs.350.r11</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>manager-wc2frm</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>om-controller-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
@ -157,16 +181,10 @@
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>kafka-clients</artifactId>
<version>2.4.0-hw-ei-302002</version> <version>3.6.1-h0.cbu.mrs.350.r11</version>
</dependency> </dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -27,11 +27,11 @@ public class EsConfig {
@Bean(destroyMethod = "close",name = "restHighLevelClient") @Bean(destroyMethod = "close",name = "restHighLevelClient")
public RestHighLevelClient restClient() { public RestHighLevelClient restClient() {
// String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator; // String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator;
String configPath = "/rsoft/"; String configPath = "/rsoft/config/";
// KAFKA("KafkaClient"), ZOOKEEPER("Client"); // KAFKA("KafkaClient"), ZOOKEEPER("Client");
GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient"); // GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient");
GenerateEnumUtil.addEnum(LoginUtil.Module.class,"ZOOKEEPER","Client"); // GenerateEnumUtil.addEnum(LoginUtil.Module.class,"ZOOKEEPER","Client");
HwRestClient hwRestClient = new HwRestClient(configPath); HwRestClient hwRestClient = new HwRestClient(configPath);
RestHighLevelClient highLevelClient = new RestHighLevelClient(hwRestClient.getRestClientBuilder()); RestHighLevelClient highLevelClient = new RestHighLevelClient(hwRestClient.getRestClientBuilder());
return highLevelClient; return highLevelClient;

View File

@ -0,0 +1,227 @@
package org.dromara.data2es.config;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.core.io.ClassPathResource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-01 14:25
*/
public class HwRestClient {
private static final Log LOG = LogFactory.getLog(HwRestClient.class);
private static String isSecureMode;
private static String esServerHost;
private static int connectTimeout;
private static int socketTimeout;
private static int connectionRequestTimeout;
private static int maxConnTotal;
private static int maxConnPerRoute;
private static String principal;
private static final String UN_SECURITY_MODE = "false";
private static final String COLON = ":";
private static final String COMMA = ",";
private static HttpHost[] hostArray;
private String configPath;
private static boolean SNIFFER_ENABLE = false;
private static final String username = "yhy_ahrs_rcw@HADOOP.COM";
public static final String password = "Ycgis!2509";
public HwRestClient(String configPath) {
this.configPath = configPath;
if (!this.getConfig()) {
LOG.error("Get config failed.");
}
}
public HwRestClient() {
/*this.configPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
if (!this.getConfig()) {
LOG.error("Get config failed.");
}*/
if (!this.getConfig()) {
LOG.error("Get config failed.");
}
this.configPath = "/rsoft/config/";
}
private boolean getConfig() {
Properties properties = new Properties();
ClassPathResource classPathResource = new ClassPathResource("/esParams.properties");
try {
properties.load(classPathResource.getInputStream());
} catch (IOException var5) {
LOG.error("Failed to load properties file : " + var5.getMessage());
return false;
}
try {
esServerHost = properties.getProperty("esServerHost");
connectTimeout = Integer.valueOf(properties.getProperty("connectTimeout"));
socketTimeout = Integer.valueOf(properties.getProperty("socketTimeout"));
connectionRequestTimeout = Integer.valueOf(properties.getProperty("connectionRequestTimeout"));
maxConnPerRoute = Integer.valueOf(properties.getProperty("maxConnPerRoute"));
maxConnTotal = Integer.valueOf(properties.getProperty("maxConnTotal"));
isSecureMode = properties.getProperty("isSecureMode");
principal = properties.getProperty("principal");
SNIFFER_ENABLE = Boolean.valueOf(properties.getProperty("snifferEnable"));
LOG.info("esServerHost:" + esServerHost);
LOG.info("connectTimeout:" + connectTimeout);
LOG.info("socketTimeout:" + socketTimeout);
LOG.info("connectionRequestTimeout:" + connectionRequestTimeout);
LOG.info("maxConnPerRouteTotal:" + maxConnPerRoute);
LOG.info("maxConnTotal:" + maxConnTotal);
LOG.info("isSecureMode:" + isSecureMode);
LOG.info("principal:" + principal);
return true;
} catch (NumberFormatException var4) {
LOG.error("Failed to get parameters !", var4);
return false;
}
}
public boolean isSnifferEnable() {
return SNIFFER_ENABLE;
}
private HttpHost[] getHostArray() {
String schema;
if ("false".equals(isSecureMode)) {
schema = "http";
} else {
schema = "https";
}
List<HttpHost> hosts = new ArrayList();
String[] hostArray1 = esServerHost.split(",");
String[] var4 = hostArray1;
int var5 = hostArray1.length;
for (int var6 = 0; var6 < var5; ++var6) {
String host = var4[var6];
String[] ipPort = host.split(":");
HttpHost hostNew = new HttpHost(ipPort[0], Integer.valueOf(ipPort[1]), schema);
hosts.add(hostNew);
}
return (HttpHost[]) hosts.toArray(new HttpHost[0]);
}
private void setSecConfig() {
try {
LOG.info("Config path is " + this.configPath);
LoginUtil.setJaasFile(principal, this.configPath + "user.keytab");
LoginUtil.setKrb5Config(this.configPath + "krb5.conf");
System.setProperty("elasticsearch.kerberos.jaas.appname", "EsClient");
System.setProperty("es.security.indication", "true");
LOG.info("es.security.indication is " + System.getProperty("es.security.indication"));
} catch (Exception var2) {
LOG.error("Failed to set security conf", var2);
}
}
public RestClientBuilder getRestClientBuilder() {
hostArray = getHostArray();
if ("false".equals(isSecureMode)) {
System.setProperty("es.security.indication", "false");
} else {
setSecConfig();
}
RestClientBuilder builder = RestClient.builder(hostArray);
Header[] defaultHeaders = new Header[]{new BasicHeader("Accept", "application/json"), new BasicHeader("Content-type", "application/json")};
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(HwRestClient.connectTimeout).
setSocketTimeout(HwRestClient.socketTimeout).
setConnectionRequestTimeout(HwRestClient.connectionRequestTimeout);
}
});
builder.setDefaultHeaders(defaultHeaders);
/* es https */
/*try {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
// 信任所有
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}).build();
SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE);
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
//httpClientBuilder.setSSLStrategy(sessionStrategy);
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
}
});
}catch (Exception e){
e.printStackTrace();
}*/
/* es https */
return builder;
}
public RestClient getRestClient() {
if (this.ifConfigWasWrong()) {
return null;
} else {
RestClientBuilder restClientBuilder = this.getRestClientBuilder();
RestClient restClient = restClientBuilder.build();
this.setNodes(restClient);
LOG.info("The Low Level Rest Client has been created.");
return restClient;
}
}
private boolean ifConfigWasWrong() {
if (this.configPath != null && this.configPath.length() != 0) {
return false;
} else {
LOG.info("Config path is not allowed to be empty.");
return true;
}
}
private void setNodes(RestClient restClient) {
List<Node> nodes = new ArrayList();
HttpHost[] var3 = hostArray;
int var4 = var3.length;
for (int var5 = 0; var5 < var4; ++var5) {
HttpHost httpHost = var3[var5];
nodes.add(new Node(httpHost));
}
restClient.setNodes(nodes);
}
}

View File

@ -7,7 +7,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
@ -19,12 +18,12 @@ import java.util.Properties;
* @author chenle * @author chenle
* @date 2021-11-03 14:15 * @date 2021-11-03 14:15
*/ */
//@Component @Component
public class KafkaConfig { public class KafkaConfig {
private Logger logger = LoggerFactory.getLogger(KafkaConfig.class); private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
private String kafkaServers = "140.168.2.31:21007,140.168.2.32:21007,140.168.2.33:21007"; //省厅 kafka private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka
// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网 // private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网
// private String kafkaServers = "34.72.62.93:9092";//六安视频网 // private String kafkaServers = "34.72.62.93:9092";//六安视频网
// private String kafkaServers = "127.0.0.1:9092";//本地 // private String kafkaServers = "127.0.0.1:9092";//本地
@ -63,12 +62,12 @@ public class KafkaConfig {
/** /**
* keytab * keytab
*/ */
private static final String USER_KEYTAB_FILE = "请修改为真实keytab文件名"; private static final String USER_KEYTAB_FILE = "user.keytab";
/** /**
* *
*/ */
private static final String USER_PRINCIPAL = "请修改为真实用户名称"; private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
/** /**
* Producer * Producer
@ -80,18 +79,19 @@ public class KafkaConfig {
public KafkaProducer newProducer() { public KafkaProducer newProducer() {
Properties props = new Properties(); Properties props = new Properties();
if (KafkaSecurityUtil.isSecurityModel()) if (true)
{ {
try try
{ {
logger.info("Securitymode start."); logger.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
KafkaSecurityUtil.securityPrepare(); LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
props.put(securityProtocol, "SASL_PLAINTEXT"); props.put(securityProtocol, "SASL_PLAINTEXT");
// props.put("sasl.mechanism", "GSSAPI");
// 服务名 // 服务名
props.put(saslKerberosServiceName, "kafka"); props.put(saslKerberosServiceName, "kafka");
// 域名 // 域名
props.put(kerberosDomainName, "hadoop.hadoop.com"); props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM");
} }
catch (IOException e) catch (IOException e)
{ {
@ -109,7 +109,7 @@ public class KafkaConfig {
// Broker地址列表 // Broker地址列表
props.put(bootstrapServers,kafkaServers); props.put(bootstrapServers,kafkaServers);
// 客户端ID // 客户端ID
// props.put(clientId, "ruansiProducer"); props.put(clientId, "ruansiProducer");
// Key序列化类 // Key序列化类
props.put(keySerializer, props.put(keySerializer,
"org.apache.kafka.common.serialization.IntegerSerializer"); "org.apache.kafka.common.serialization.IntegerSerializer");
@ -137,12 +137,6 @@ public class KafkaConfig {
return producer; return producer;
} }
@Bean
public KafkaAdmin admin(KafkaProperties properties){
KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
admin.setFatalIfBrokerNotAvailable(true);
return admin;
}
} }

View File

@ -25,7 +25,7 @@ public class KafkaSecurityUtil {
/** /**
* *
*/ */
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@HADOOP.COM"; private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
public static void securityPrepare() throws IOException public static void securityPrepare() throws IOException
{ {
@ -34,17 +34,17 @@ public class KafkaSecurityUtil {
//String krbFile = filePath + "krb5.conf"; //String krbFile = filePath + "krb5.conf";
//ClassPathResource classPathResource = new ClassPathResource("krb5.conf"); //ClassPathResource classPathResource = new ClassPathResource("krb5.conf");
//String krbFile = classPathResource.getAbsolutePath(); //String krbFile = classPathResource.getAbsolutePath();
String krbFile = "/rsoft/krb5.conf"; String krbFile = "/rsoft/config/krb5.conf";
// String userKeyTableFile = filePath + USER_KEYTAB_FILE; // String userKeyTableFile = filePath + USER_KEYTAB_FILE;
//ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE); //ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE);
String userKeyTableFile = "/rsoft/user.keytab"; String userKeyTableFile = "/rsoft/config/user.keytab";
//windows路径下分隔符替换 //windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\"); krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile); LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); LoginUtil.setZookeeperServerPrincipal("zookeeper/A528C942_01A6_1BEF_7A75_0187DC82C40F.COM");
//logger.error("userKeyTableFile路径---{}",userKeyTableFile); //logger.error("userKeyTableFile路径---{}",userKeyTableFile);
LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile);
} }

View File

@ -2,143 +2,97 @@ package org.dromara.data2es.config;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.util.Properties;
public class LoginUtil {
private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);
/** /**
* <p>description: </p> * no JavaDoc
* */
* @author chenle public enum Module {
* @date 2021-11-01 14:30 KAFKA("KafkaClient"), ZOOKEEPER("Client");
private String name;
private Module(String name)
{
this.name = name;
}
public String getName()
{
return name;
}
}
/**
* line operator string
*/ */
public class LoginUtil {
private static final Log LOG = LogFactory.getLog(LoginUtil.class);
private static final String LINE_SEPARATOR = System.getProperty("line.separator"); private static final String LINE_SEPARATOR = System.getProperty("line.separator");
private static final String ES = "es.";
/**
* jaas file postfix
*/
private static final String JAAS_POSTFIX = ".jaas.conf"; private static final String JAAS_POSTFIX = ".jaas.conf";
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; /**
private static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config"; * is IBM jdk or not
private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf"; */
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
private static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
private static boolean WriteFlag = false;
public LoginUtil() { /**
} * IBM jdk login module
*/
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
static void setKrb5Config(String krb5ConfFile) throws IOException { /**
System.setProperty("java.security.krb5.conf", krb5ConfFile); * oracle jdk login module
String ret = System.getProperty("java.security.krb5.conf"); */
if (ret == null) { private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
LOG.error("java.security.krb5.conf is null.");
throw new IOException("java.security.krb5.conf is null.");
} else if (!ret.equals(krb5ConfFile)) {
LOG.error("java.security.krb5.conf is " + ret + " is not " + krb5ConfFile + ".");
throw new IOException("java.security.krb5.conf is " + ret + " is not " + krb5ConfFile + ".");
}
}
/**
* Zookeeper quorum principal.
*/
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
/**
* java security krb5 file path
*/
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
static synchronized void setJaasFile(String principal, String keytabPath) throws IOException { /**
//String filePath = keytabPath.substring(0, keytabPath.lastIndexOf(File.separator)); * java security login file path
// String jaasPath = filePath + File.separator + "es." + System.getProperty("user.name") + ".jaas.conf"; */
// jaasPath = jaasPath.replace("\\", "\\\\"); public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
/**
* jaas.conf
*
* @param principal
* @param keytabPath
* @throws IOException
*/
public static void setJaasFile(String principal, String keytabPath)
throws IOException {
String jaasPath = String jaasPath =
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ JAAS_POSTFIX; + JAAS_POSTFIX;
keytabPath = keytabPath.replace("\\", "\\\\"); // windows路径下分隔符替换
LOG.info("jaasPath is {} " + jaasPath); jaasPath = jaasPath.replace("\\", "\\\\");
LOG.info("keytabPath is " + keytabPath); // 删除jaas文件
if ((new File(jaasPath)).exists()) {
if (!WriteFlag) {
deleteJaasFile(jaasPath); deleteJaasFile(jaasPath);
writeJaasFile(jaasPath, principal, keytabPath); writeJaasFile(jaasPath, principal, keytabPath);
System.setProperty("java.security.auth.login.config", jaasPath); System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
WriteFlag = true;
} }
} else {
writeJaasFile(jaasPath, principal, keytabPath);
System.setProperty("java.security.auth.login.config", jaasPath);
WriteFlag = true;
}
}
private static void writeJaasFile(String jaasPath, String principal, String keytabPath) throws IOException {
try {
FileWriter writer = new FileWriter(new File(jaasPath));
try {
writer.write(getJaasConfContext(principal, keytabPath));
writer.flush();
} catch (Throwable var7) {
try {
writer.close();
} catch (Throwable var6) {
var7.addSuppressed(var6);
}
throw var7;
}
writer.close();
} catch (IOException var8) {
throw new IOException("Failed to create jaas.conf File");
}
}
private static void deleteJaasFile(String jaasPath) throws IOException {
File jaasFile = new File(jaasPath);
if (jaasFile.exists() && !jaasFile.delete()) {
throw new IOException("Failed to delete exists jaas file.");
}
}
private static String getJaasConfContext(String principal, String keytabPath) {
Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder();
Module[] var4 = allModule;
int var5 = allModule.length;
for(int var6 = 0; var6 < var5; ++var6) {
Module modlue = var4[var6];
builder.append(getModuleContext(principal, keytabPath, modlue));
}
return builder.toString();
}
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK) {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append("com.ibm.security.auth.module.Krb5LoginModule required").append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR);
builder.append("principal=\"").append(userPrincipal).append("\"").append(LINE_SEPARATOR);
builder.append("useKeytab=\"").append(keyTabPath).append("\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
} else {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append("com.sun.security.auth.module.Krb5LoginModule required").append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
builder.append("keyTab=\"").append(keyTabPath).append("\"").append(LINE_SEPARATOR);
builder.append("principal=\"").append(userPrincipal).append("\"").append(LINE_SEPARATOR);
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
builder.append("storeKey=true").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
return builder.toString();
}
/** /**
* zookeeperprincipal * zookeeperprincipal
@ -147,8 +101,7 @@ public class LoginUtil {
* @throws IOException * @throws IOException
*/ */
public static void setZookeeperServerPrincipal(String zkServerPrincipal) public static void setZookeeperServerPrincipal(String zkServerPrincipal)
throws IOException throws IOException {
{
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
if (ret == null) if (ret == null)
@ -161,17 +114,148 @@ public class LoginUtil {
} }
} }
public static enum Module { /**
Elasticsearch("EsClient"); * krb5
*
private String name; * @param krb5ConfFile
* @throws IOException
private Module(String name) { */
this.name = name; public static void setKrb5Config(String krb5ConfFile)
throws IOException {
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
if (ret == null)
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
}
if (!ret.equals(krb5ConfFile))
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
}
} }
public String getName() { /**
return this.name; * jaas
*
* @throws IOException
*
*/
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
throws IOException {
FileWriter writer = new FileWriter(new File(jaasPath));
try
{
writer.write(getJaasConfContext(principal, keytabPath));
writer.flush();
}
catch (IOException e)
{
throw new IOException("Failed to create jaas.conf File");
}
finally
{
writer.close();
}
}
private static void deleteJaasFile(String jaasPath)
throws IOException {
File jaasFile = new File(jaasPath);
if (jaasFile.exists())
{
if (!jaasFile.delete())
{
throw new IOException("Failed to delete exists jaas file.");
} }
} }
} }
private static String getJaasConfContext(String principal, String keytabPath) {
Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder();
for (Module modlue : allModule)
{
builder.append(getModuleContext(principal, keytabPath, modlue));
}
return builder.toString();
}
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK) {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
} else {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
builder.append("storeKey=true").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
return builder.toString();
}
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/rsoft/config/";
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + keyTabFile;
// windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
LoginUtil.setJaasFile(principal, userKeyTableFile);
}
/**
* Check security mode
*
* @return boolean
*/
public static Boolean isSecurityModel() {
Boolean isSecurity = false;
String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
Properties securityProps = new Properties();
// file does not exist.
if (!isFileExists(krbFilePath)) {
return isSecurity;
}
try {
securityProps.load(new FileInputStream(krbFilePath));
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
{
isSecurity = true;
}
} catch (Exception e) {
LOG.info("The Exception occured : {}.", e);
}
return isSecurity;
}
/*
*
*/
private static boolean isFileExists(String fileName) {
File file = new File(fileName);
return file.exists();
}
}

View File

@ -13,6 +13,7 @@ import org.apache.commons.lang.StringUtils;
import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.data2es.domain.EsGpsInfo; import org.dromara.data2es.domain.EsGpsInfo;
import org.dromara.data2es.domain.EsGpsInfoVO2; import org.dromara.data2es.domain.EsGpsInfoVO2;
import org.dromara.data2es.producer.NewProducer;
import org.dromara.data2es.service.IGpsService; import org.dromara.data2es.service.IGpsService;
import org.dromara.data2es.util.ConfigConstants; import org.dromara.data2es.util.ConfigConstants;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
@ -23,7 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import java.io.IOException; import java.io.IOException;
@ -34,7 +34,7 @@ import java.util.concurrent.CompletableFuture;
public class RequestHandler { public class RequestHandler {
@Autowired @Autowired
private KafkaTemplate<String, String> kafkaTemplate; private NewProducer producer;
@Autowired @Autowired
private RestHighLevelClient restHighLevelClient; private RestHighLevelClient restHighLevelClient;
@ -71,11 +71,11 @@ public class RequestHandler {
//kafkaProducer.send(esGpsInfo, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+ infoSource); //kafkaProducer.send(esGpsInfo, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+ infoSource);
//todo 2023年3月30日 cpu过载暂时隐藏 //todo 2023年3月30日 cpu过载暂时隐藏
kafkaTemplate.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2)); producer.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2));
//kafkaProducer.send(esGpsInfoVO2, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType); //kafkaProducer.send(esGpsInfoVO2, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType);
//地市的kafka数据如接收地市某个设备的数据可以对接此kafka topic //地市的kafka数据如接收地市某个设备的数据可以对接此kafka topic
//todo 暂时隐藏 //todo 暂时隐藏
kafkaTemplate.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2)); producer.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2));
} }
} }

View File

@ -16,7 +16,7 @@ import javax.annotation.Resource;
* @author chenle * @author chenle
* @date 2021-11-01 17:20 * @date 2021-11-01 17:20
*/ */
//@Component @Component
public class NewProducer { public class NewProducer {
@Autowired @Autowired

View File

@ -10,7 +10,8 @@ spring:
profiles: profiles:
# 环境配置 # 环境配置
active: @profiles.active@ active: @profiles.active@
autoconfigure:
exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
--- # nacos 配置 --- # nacos 配置
spring: spring:
cloud: cloud:

View File

@ -0,0 +1 @@
kafka.client.security.mode = yes

View File

@ -39,7 +39,7 @@ public class DeviceRedisSchedule {
/* /*
* Redis online_usert_device_redis * Redis online_usert_device_redis
* */ * */
// @Scheduled(cron = "0/30 * * * * ?") @Scheduled(cron = "0/30 * * * * ?")
public void handleDeviceRedis(){ public void handleDeviceRedis(){
List<JSONObject> jlist = RedisUtils.searchAndGetKeysValues("online_users:*"); List<JSONObject> jlist = RedisUtils.searchAndGetKeysValues("online_users:*");
redisService.insertBatch(BeanUtil.copyToList(jlist, DeviceRedis.class)); redisService.insertBatch(BeanUtil.copyToList(jlist, DeviceRedis.class));