From 5f9ecaa366a6d4663118b57ddf59b5cf229053bb Mon Sep 17 00:00:00 2001 From: luyya Date: Tue, 22 Jul 2025 17:27:14 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9C=81=E5=8E=85=E4=BD=8D=E7=BD=AE=E6=B1=87?= =?UTF-8?q?=E8=81=9A=E6=B7=BB=E5=8A=A0=E5=8D=8E=E4=B8=BA=E8=AE=A4=E8=AF=81?= =?UTF-8?q?kafka=E5=92=8CES?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 +- .../src/main/resources/common-dubbo.yml | 4 +- .../kafka/consumer/config/KafkaConfig.java | 138 +++++++ .../kafka/consumer/config/LoginUtil.java | 99 +++-- .../consumer/handler/KafkaSecurityUtil.java | 108 ------ .../kafka/consumer/handler/LoginUtil.java | 215 ----------- .../kafka/consumer/handler/RealConsumer.java | 9 +- stwzhj-modules/stwzhj-data2es/pom.xml | 82 +++-- .../org/dromara/data2es/config/EsConfig.java | 6 +- .../dromara/data2es/config/HwRestClient.java | 227 ++++++++++++ .../dromara/data2es/config/KafkaConfig.java | 24 +- .../data2es/config/KafkaSecurityUtil.java | 8 +- .../org/dromara/data2es/config/LoginUtil.java | 346 +++++++++++------- .../data2es/handler/RequestHandler.java | 8 +- .../dromara/data2es/producer/NewProducer.java | 2 +- .../src/main/resources/application.yml | 3 +- .../src/main/resources/kafkaSecurityMode | 1 + .../system/schedule/DeviceRedisSchedule.java | 2 +- 18 files changed, 736 insertions(+), 550 deletions(-) create mode 100644 stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConfig.java delete mode 100644 stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaSecurityUtil.java delete mode 100644 stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java create mode 100644 stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/HwRestClient.java create mode 100644 stwzhj-modules/stwzhj-data2es/src/main/resources/kafkaSecurityMode diff --git a/pom.xml b/pom.xml index db274477..bf7c4a07 100644 --- a/pom.xml +++ b/pom.xml @@ -89,12 +89,12 @@ prod prod - 53.16.17.16:8848 + 53.16.17.13:8848 DEFAULT_GROUP DEFAULT_GROUP nacos Ycgis!2509 - 53.16.17.16:4560 + 53.16.17.13:4560 diff --git a/stwzhj-common/stwzhj-common-dubbo/src/main/resources/common-dubbo.yml b/stwzhj-common/stwzhj-common-dubbo/src/main/resources/common-dubbo.yml index 3d114d95..84c40385 100644 --- a/stwzhj-common/stwzhj-common-dubbo/src/main/resources/common-dubbo.yml +++ b/stwzhj-common/stwzhj-common-dubbo/src/main/resources/common-dubbo.yml @@ -23,14 +23,14 @@ dubbo: address: redis://${spring.data.redis.host}:${spring.data.redis.port} group: DUBBO_GROUP username: dubbo - password: ruoyi123 + password: ${spring.data.redis.password} # 集群开关 sentinel: true parameters: namespace: ${spring.profiles.active} database: ${spring.data.redis.database} timeout: ${spring.data.redis.timeout} - backup: 53.16.17.13:26380,53.16.17.14:26380,53.16.17.16:26380 + backup: ${spring.data.redis.sentinel.nodes} # metadata-report: # address: redis://${spring.data.redis.host}:${spring.data.redis.port} # group: DUBBO_GROUP diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConfig.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConfig.java new file mode 100644 index 00000000..b74d31de --- /dev/null +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConfig.java @@ -0,0 +1,138 @@ +package org.dromara.kafka.consumer.config; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Properties; + +/** + *

description:

+ * + * @author chenle + * @date 2021-11-03 14:15 + */ +@Component +public class KafkaConfig { + + private Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka +// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网 +// private String kafkaServers = "34.72.62.93:9092";//六安视频网 +// private String kafkaServers = "127.0.0.1:9092";//本地 +// private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供 + + private String groupId = "ruansiProducer"; + + + + + // Broker地址列表 + private final String bootstrapServers = "bootstrap.servers"; + + // 客户端ID + private final String clientId = "client.id"; + + // Key序列化类 + private final String keySerializer = "key.serializer"; + + // Value序列化类 + private final String valueSerializer = "value.serializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final String securityProtocol = "security.protocol"; + + // 服务名 + private final String saslKerberosServiceName = "sasl.kerberos.service.name"; + + // 域名 + private final String kerberosDomainName = "kerberos.domain.name"; + + //默认发送20条消息 + private final int messageNumToSend = 100; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"; + + /** + * 新Producer 构造函数 + * @param + * @param + */ + + @Bean(name = "myKafkaProducer") + public KafkaProducer newProducer() { + Properties props = new Properties(); + + if (true) + { + try + { + logger.info("Securitymode start."); + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); + props.put(securityProtocol, "SASL_PLAINTEXT"); +// props.put("sasl.mechanism", "GSSAPI"); + // 服务名 + props.put(saslKerberosServiceName, "kafka"); + // 域名 + props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"); + } + catch (IOException e) + { + logger.error("Security prepare failure."); + logger.error("The IOException occured.", e); + return null; + } + logger.info("Security prepare success."); + }else{ + props.put(securityProtocol, "PLAINTEXT"); + } + + + + // Broker地址列表 + props.put(bootstrapServers,kafkaServers); + // 客户端ID + props.put(clientId, "ruansiProducer"); + // Key序列化类 + props.put(keySerializer, + "org.apache.kafka.common.serialization.IntegerSerializer"); + // Value序列化类 + props.put(valueSerializer, + "org.apache.kafka.common.serialization.StringSerializer"); + //批量发送信息配置 + props.put("batch.size", 16384); + props.put("linger.ms", 1); + props.put("buffer.memory", 33554432); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + //props.put(securityProtocol, "SASL_PLAINTEXT"); +// // 服务名 +// props.put(saslKerberosServiceName, "kafka"); +// // 域名 +// props.put(kerberosDomainName, "hadoop.hadoop.com"); + //设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition + //props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner"); +// props.put(securityProtocol, "SASL_PLAINTEXT"); +// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";"); +// props.put("sasl.mechanism", "SCRAM-SHA-256"); +// KafkaProducer producer = new KafkaProducer<>(props); + KafkaProducer producer = new KafkaProducer<>(props); + + return producer; + } + + + +} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/LoginUtil.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/LoginUtil.java index 2f7ae3fe..2697dbff 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/LoginUtil.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/LoginUtil.java @@ -4,17 +4,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; +import java.util.Properties; -public class LoginUtil -{ +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); - static Logger logger = LoggerFactory.getLogger(LoginUtil.class); - - public enum Module - { - STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); + /** + * no JavaDoc + */ + public enum Module { + KAFKA("KafkaClient"), ZOOKEEPER("Client"); private String name; @@ -77,8 +79,7 @@ public class LoginUtil * @throws IOException */ public static void setJaasFile(String principal, String keytabPath) - throws IOException - { + throws IOException { String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX; @@ -88,7 +89,6 @@ public class LoginUtil // 删除jaas文件 deleteJaasFile(jaasPath); writeJaasFile(jaasPath, principal, keytabPath); - logger.error("jaasPath--{}",jaasPath); System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); } @@ -99,8 +99,7 @@ public class LoginUtil * @throws IOException */ public static void setZookeeperServerPrincipal(String zkServerPrincipal) - throws IOException - { + throws IOException { System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); if (ret == null) @@ -120,8 +119,7 @@ public class LoginUtil * @throws IOException */ public static void setKrb5Config(String krb5ConfFile) - throws IOException - { + throws IOException { System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); if (ret == null) @@ -141,8 +139,7 @@ public class LoginUtil * 写文件异常 */ private static void writeJaasFile(String jaasPath, String principal, String keytabPath) - throws IOException - { + throws IOException { FileWriter writer = new FileWriter(new File(jaasPath)); try { @@ -160,8 +157,7 @@ public class LoginUtil } private static void deleteJaasFile(String jaasPath) - throws IOException - { + throws IOException { File jaasFile = new File(jaasPath); if (jaasFile.exists()) { @@ -172,8 +168,7 @@ public class LoginUtil } } - private static String getJaasConfContext(String principal, String keytabPath) - { + private static String getJaasConfContext(String principal, String keytabPath) { Module[] allModule = Module.values(); StringBuilder builder = new StringBuilder(); for (Module modlue : allModule) @@ -183,11 +178,9 @@ public class LoginUtil return builder.toString(); } - private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) - { + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { StringBuilder builder = new StringBuilder(); - if (IS_IBM_JDK) - { + if (IS_IBM_JDK) { builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); builder.append("credsType=both").append(LINE_SEPARATOR); @@ -195,9 +188,7 @@ public class LoginUtil builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); builder.append("debug=true;").append(LINE_SEPARATOR); builder.append("};").append(LINE_SEPARATOR); - } - else - { + } else { builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); builder.append("useKeyTab=true").append(LINE_SEPARATOR); @@ -211,4 +202,58 @@ public class LoginUtil return builder.toString(); } + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/rsoft/config/"; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) + { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return isSecurity; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } } diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaSecurityUtil.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaSecurityUtil.java deleted file mode 100644 index cf4a3238..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaSecurityUtil.java +++ /dev/null @@ -1,108 +0,0 @@ -package org.dromara.kafka.consumer.handler; - -import cn.hutool.core.date.DateTime; -import cn.hutool.core.date.DateUtil; -import org.dromara.kafka.consumer.entity.EsGpsInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -/** - *

description:

- * - * @author chenle - * @date 2021-10-28 14:48 - */ -public class KafkaSecurityUtil { - - - - - static Logger logger = LoggerFactory.getLogger(KafkaSecurityUtil.class); - - public static void main(String[] args) { - EsGpsInfo esGpsInfo = new EsGpsInfo(); - String realtime = "2021/11/04 12:00:11"; - DateTime dateTime = DateUtil.parse(realtime); - esGpsInfo.setGpsTime(dateTime.toJdkDate()); - logger.info("esGpsInfo:{},deviceType={},gpsTime={}",esGpsInfo.toString(), - esGpsInfo.getDeviceType(),dateTime.toJdkDate().toString()); - } - /** - * 用户自己申请的机机账号keytab文件名称 - */ - private static final String USER_KEYTAB_FILE = "user.keytab"; - - /** - * 用户自己申请的机机账号名称 - */ - private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM"; - - public static void securityPrepare() throws IOException - { - logger.error("进入了---securityPrepare"); - //String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; - //String krbFile = filePath + "krb5.conf"; - //ClassPathResource classPathResource = new ClassPathResource("krb5.conf"); - //String krbFile = classPathResource.getAbsolutePath(); - String krbFile = "/gpsstore/krb5.conf"; -// String userKeyTableFile = filePath + USER_KEYTAB_FILE; - //ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE); - String userKeyTableFile = "/gpsstore/user.keytab"; - - //windows路径下分隔符替换 - userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); - krbFile = krbFile.replace("\\", "\\\\"); - - LoginUtil.setKrb5Config(krbFile); - LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); - logger.error("userKeyTableFile路径---{}",userKeyTableFile); - LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); - } - - public static Boolean isSecurityModel() - { - Boolean isSecurity = false; - //String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; - //ClassPathResource classPathResource = new ClassPathResource("kafkaSecurityMode"); - InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafkaSecurityMode"); - - /*File file = classPathResource.getFile(); - - if(!file.exists()){ - return isSecurity; - }*/ - - Properties securityProps = new Properties(); - - - try - { - securityProps.load(inputStream); - if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) - { - isSecurity = true; - } - } - catch (Exception e) - { - logger.info("The Exception occured : {}.", e); - } - - return isSecurity; - } - - /* - * 判断文件是否存在 - */ - private static boolean isFileExists(String fileName) - { - File file = new File(fileName); - - return file.exists(); - } -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java deleted file mode 100644 index 8fa4b0bf..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java +++ /dev/null @@ -1,215 +0,0 @@ -package org.dromara.kafka.consumer.handler; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; - -/** - *

description:

- * - * @author chenle - * @date 2021-10-28 15:40 - */ -public class LoginUtil -{ - - public enum Module - { - STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); - - private String name; - - private Module(String name) - { - this.name = name; - } - - public String getName() - { - return name; - } - } - - /** - * line operator string - */ - private static final String LINE_SEPARATOR = System.getProperty("line.separator"); - - /** - * jaas file postfix - */ - private static final String JAAS_POSTFIX = ".jaas.conf"; - - /** - * is IBM jdk or not - */ - private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); - - /** - * IBM jdk login module - */ - private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; - - /** - * oracle jdk login module - */ - private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; - - /** - * Zookeeper quorum principal. - */ - public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; - - /** - * java security krb5 file path - */ - public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; - - /** - * java security login file path - */ - public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; - - /** - * 设置jaas.conf文件 - * - * @param principal - * @param keytabPath - * @throws IOException - */ - public static void setJaasFile(String principal, String keytabPath) - throws IOException - { - String jaasPath = - new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") - + JAAS_POSTFIX; - - // windows路径下分隔符替换 - jaasPath = jaasPath.replace("\\", "\\\\"); - // 删除jaas文件 - deleteJaasFile(jaasPath); - writeJaasFile(jaasPath, principal, keytabPath); - System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); - } - - /** - * 设置zookeeper服务端principal - * - * @param zkServerPrincipal - * @throws IOException - */ - public static void setZookeeperServerPrincipal(String zkServerPrincipal) - throws IOException - { - System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); - String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); - if (ret == null) - { - throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); - } - if (!ret.equals(zkServerPrincipal)) - { - throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); - } - } - - /** - * 设置krb5文件 - * - * @param krb5ConfFile - * @throws IOException - */ - public static void setKrb5Config(String krb5ConfFile) - throws IOException - { - System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); - String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); - if (ret == null) - { - throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); - } - if (!ret.equals(krb5ConfFile)) - { - throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); - } - } - - /** - * 写入jaas文件 - * - * @throws IOException - * 写文件异常 - */ - private static void writeJaasFile(String jaasPath, String principal, String keytabPath) - throws IOException - { - FileWriter writer = new FileWriter(new File(jaasPath)); - try - { - writer.write(getJaasConfContext(principal, keytabPath)); - writer.flush(); - } - catch (IOException e) - { - throw new IOException("Failed to create jaas.conf File"); - } - finally - { - writer.close(); - } - } - - private static void deleteJaasFile(String jaasPath) - throws IOException - { - File jaasFile = new File(jaasPath); - if (jaasFile.exists()) - { - if (!jaasFile.delete()) - { - throw new IOException("Failed to delete exists jaas file."); - } - } - } - - private static String getJaasConfContext(String principal, String keytabPath) - { - Module[] allModule = Module.values(); - StringBuilder builder = new StringBuilder(); - for (Module modlue : allModule) - { - builder.append(getModuleContext(principal, keytabPath, modlue)); - } - return builder.toString(); - } - - private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) - { - StringBuilder builder = new StringBuilder(); - if (IS_IBM_JDK) - { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); - builder.append("credsType=both").append(LINE_SEPARATOR); - builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); - builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } - else - { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); - builder.append("useKeyTab=true").append(LINE_SEPARATOR); - builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); - builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); - builder.append("useTicketCache=false").append(LINE_SEPARATOR); - builder.append("storeKey=true").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } - - return builder.toString(); - } -} - diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java index 29307c6f..c294cd08 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java @@ -4,6 +4,7 @@ package org.dromara.kafka.consumer.handler; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.dromara.kafka.consumer.config.KafkaPropertiesConfig; +import org.dromara.kafka.consumer.config.LoginUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -55,17 +56,17 @@ public class RealConsumer implements CommandLineRunner { groupId = "group_ruansi_xuancheng"; cityCode = "3418"; if(args.length > 0){ - /*kafkaServers = args[0]; + kafkaServers = args[0]; topics = args[1]; groupId = args[2]; - cityCode = args[3];*/ + cityCode = args[3]; } ExecutorService executorService = Executors.newSingleThreadExecutor(); Map kafkaProp = getKafkaProp(); - if (KafkaSecurityUtil.isSecurityModel()) + if (LoginUtil.isSecurityModel()) { try { @@ -78,7 +79,7 @@ public class RealConsumer implements CommandLineRunner { kafkaProp.put("sasl.kerberos.service.name","kafka"); //域名 kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com"); - KafkaSecurityUtil.securityPrepare(); + LoginUtil.setJaasFile("",""); } catch (IOException e) { diff --git a/stwzhj-modules/stwzhj-data2es/pom.xml b/stwzhj-modules/stwzhj-data2es/pom.xml index 06133650..51890e0c 100644 --- a/stwzhj-modules/stwzhj-data2es/pom.xml +++ b/stwzhj-modules/stwzhj-data2es/pom.xml @@ -114,42 +114,66 @@ stwzhj-api-data2es - - - org.elasticsearch - elasticsearch - 7.14.0 - - - log4j-api - org.apache.logging.log4j - - - org.elasticsearch.client elasticsearch-rest-client - 7.14.0 + 7.10.2-h0.cbu.mrs.350.r11 - - - org.elasticsearch.client - elasticsearch-rest-client - 7.6.0-hw-ei-302002 - - org.elasticsearch.client elasticsearch-rest-high-level-client - 7.14.0 + 7.10.2-h0.cbu.mrs.350.r11 - org.elasticsearch - elasticsearch + org.elasticsearch.plugin + parent-join-client - elasticsearch-rest-client - org.elasticsearch.client + org.elasticsearch.plugin + aggs-matrix-stats-client + + + + + + org.elasticsearch + elasticsearch + 7.10.2-h0.cbu.mrs.350.r11 + + + + + org.apache.kafka + kafka_2.12 + 3.6.1-h0.cbu.mrs.350.r11 + + + org.apache.zookeeper + zookeeper + + + net.sf.jopt-simple + jopt-simple + + + com.huawei.mrs + manager-wc2frm + + + org.apache.kafka + kafka-clients + + + org.xerial.snappy + snappy-java + + + com.huawei.mrs + om-controller-api + + + com.101tec + zkclient @@ -157,16 +181,10 @@ org.apache.kafka kafka-clients - 2.4.0-hw-ei-302002 + 3.6.1-h0.cbu.mrs.350.r11 - - - org.springframework.kafka - spring-kafka - - diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/EsConfig.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/EsConfig.java index 586dec4a..26664d73 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/EsConfig.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/EsConfig.java @@ -27,11 +27,11 @@ public class EsConfig { @Bean(destroyMethod = "close",name = "restHighLevelClient") public RestHighLevelClient restClient() { // String configPath = System.getProperty("user.dir") + File.separator+ "app_data2es_aq" + File.separator + "conf" + File.separator; - String configPath = "/rsoft/"; + String configPath = "/rsoft/config/"; // KAFKA("KafkaClient"), ZOOKEEPER("Client"); - GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient"); - GenerateEnumUtil.addEnum(LoginUtil.Module.class,"ZOOKEEPER","Client"); +// GenerateEnumUtil.addEnum(LoginUtil.Module.class,"KAFKA","KafkaClient"); +// GenerateEnumUtil.addEnum(LoginUtil.Module.class,"ZOOKEEPER","Client"); HwRestClient hwRestClient = new HwRestClient(configPath); RestHighLevelClient highLevelClient = new RestHighLevelClient(hwRestClient.getRestClientBuilder()); return highLevelClient; diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/HwRestClient.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/HwRestClient.java new file mode 100644 index 00000000..fb8f4209 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/HwRestClient.java @@ -0,0 +1,227 @@ +package org.dromara.data2es.config; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.message.BasicHeader; +import org.elasticsearch.client.Node; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.springframework.core.io.ClassPathResource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + *

description:

+ * + * @author chenle + * @date 2021-11-01 14:25 + */ +public class HwRestClient { + private static final Log LOG = LogFactory.getLog(HwRestClient.class); + private static String isSecureMode; + private static String esServerHost; + private static int connectTimeout; + private static int socketTimeout; + private static int connectionRequestTimeout; + private static int maxConnTotal; + private static int maxConnPerRoute; + private static String principal; + private static final String UN_SECURITY_MODE = "false"; + private static final String COLON = ":"; + private static final String COMMA = ","; + private static HttpHost[] hostArray; + private String configPath; + private static boolean SNIFFER_ENABLE = false; + + private static final String username = "yhy_ahrs_rcw@HADOOP.COM"; + + public static final String password = "Ycgis!2509"; + + public HwRestClient(String configPath) { + this.configPath = configPath; + if (!this.getConfig()) { + LOG.error("Get config failed."); + } + + } + + public HwRestClient() { + /*this.configPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; + if (!this.getConfig()) { + LOG.error("Get config failed."); + }*/ + if (!this.getConfig()) { + LOG.error("Get config failed."); + } + this.configPath = "/rsoft/config/"; + } + + private boolean getConfig() { + Properties properties = new Properties(); + ClassPathResource classPathResource = new ClassPathResource("/esParams.properties"); + + try { + properties.load(classPathResource.getInputStream()); + } catch (IOException var5) { + LOG.error("Failed to load properties file : " + var5.getMessage()); + return false; + } + + try { + esServerHost = properties.getProperty("esServerHost"); + connectTimeout = Integer.valueOf(properties.getProperty("connectTimeout")); + socketTimeout = Integer.valueOf(properties.getProperty("socketTimeout")); + connectionRequestTimeout = Integer.valueOf(properties.getProperty("connectionRequestTimeout")); + maxConnPerRoute = Integer.valueOf(properties.getProperty("maxConnPerRoute")); + maxConnTotal = Integer.valueOf(properties.getProperty("maxConnTotal")); + isSecureMode = properties.getProperty("isSecureMode"); + principal = properties.getProperty("principal"); + SNIFFER_ENABLE = Boolean.valueOf(properties.getProperty("snifferEnable")); + LOG.info("esServerHost:" + esServerHost); + LOG.info("connectTimeout:" + connectTimeout); + LOG.info("socketTimeout:" + socketTimeout); + LOG.info("connectionRequestTimeout:" + connectionRequestTimeout); + LOG.info("maxConnPerRouteTotal:" + maxConnPerRoute); + LOG.info("maxConnTotal:" + maxConnTotal); + LOG.info("isSecureMode:" + isSecureMode); + LOG.info("principal:" + principal); + return true; + } catch (NumberFormatException var4) { + LOG.error("Failed to get parameters !", var4); + return false; + } + } + + public boolean isSnifferEnable() { + return SNIFFER_ENABLE; + } + + private HttpHost[] getHostArray() { + String schema; + if ("false".equals(isSecureMode)) { + schema = "http"; + } else { + schema = "https"; + } + + List hosts = new ArrayList(); + String[] hostArray1 = esServerHost.split(","); + String[] var4 = hostArray1; + int var5 = hostArray1.length; + + for (int var6 = 0; var6 < var5; ++var6) { + String host = var4[var6]; + String[] ipPort = host.split(":"); + HttpHost hostNew = new HttpHost(ipPort[0], Integer.valueOf(ipPort[1]), schema); + hosts.add(hostNew); + } + + return (HttpHost[]) hosts.toArray(new HttpHost[0]); + } + + private void setSecConfig() { + try { + LOG.info("Config path is " + this.configPath); + LoginUtil.setJaasFile(principal, this.configPath + "user.keytab"); + LoginUtil.setKrb5Config(this.configPath + "krb5.conf"); + System.setProperty("elasticsearch.kerberos.jaas.appname", "EsClient"); + System.setProperty("es.security.indication", "true"); + LOG.info("es.security.indication is " + System.getProperty("es.security.indication")); + } catch (Exception var2) { + LOG.error("Failed to set security conf", var2); + } + + } + + public RestClientBuilder getRestClientBuilder() { + hostArray = getHostArray(); + if ("false".equals(isSecureMode)) { + System.setProperty("es.security.indication", "false"); + } else { + setSecConfig(); + } + + RestClientBuilder builder = RestClient.builder(hostArray); + Header[] defaultHeaders = new Header[]{new BasicHeader("Accept", "application/json"), new BasicHeader("Content-type", "application/json")}; + builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { + @Override + public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { + + return requestConfigBuilder.setConnectTimeout(HwRestClient.connectTimeout). + setSocketTimeout(HwRestClient.socketTimeout). + setConnectionRequestTimeout(HwRestClient.connectionRequestTimeout); + } + }); + builder.setDefaultHeaders(defaultHeaders); + + /* es https */ + /*try { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + + SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() { + // 信任所有 + @Override + public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException { + return true; + } + }).build(); + SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE); + builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { + @Override + public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { + httpClientBuilder.disableAuthCaching(); + //httpClientBuilder.setSSLStrategy(sessionStrategy); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + return httpClientBuilder; + } + }); + + }catch (Exception e){ + e.printStackTrace(); + }*/ + /* es https */ + return builder; + } + + public RestClient getRestClient() { + if (this.ifConfigWasWrong()) { + return null; + } else { + RestClientBuilder restClientBuilder = this.getRestClientBuilder(); + RestClient restClient = restClientBuilder.build(); + this.setNodes(restClient); + LOG.info("The Low Level Rest Client has been created."); + return restClient; + } + } + + private boolean ifConfigWasWrong() { + if (this.configPath != null && this.configPath.length() != 0) { + return false; + } else { + LOG.info("Config path is not allowed to be empty."); + return true; + } + } + + private void setNodes(RestClient restClient) { + List nodes = new ArrayList(); + HttpHost[] var3 = hostArray; + int var4 = var3.length; + + for (int var5 = 0; var5 < var4; ++var5) { + HttpHost httpHost = var3[var5]; + nodes.add(new Node(httpHost)); + } + + restClient.setNodes(nodes); + } +} diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java index 162d631c..5f529e51 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java @@ -7,7 +7,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; -import org.springframework.kafka.core.KafkaAdmin; import org.springframework.stereotype.Component; import java.io.IOException; @@ -19,12 +18,12 @@ import java.util.Properties; * @author chenle * @date 2021-11-03 14:15 */ -//@Component +@Component public class KafkaConfig { private Logger logger = LoggerFactory.getLogger(KafkaConfig.class); - private String kafkaServers = "140.168.2.31:21007,140.168.2.32:21007,140.168.2.33:21007"; //省厅 kafka + private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka // private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网 // private String kafkaServers = "34.72.62.93:9092";//六安视频网 // private String kafkaServers = "127.0.0.1:9092";//本地 @@ -63,12 +62,12 @@ public class KafkaConfig { /** * 用户自己申请的机机账号keytab文件名称 */ - private static final String USER_KEYTAB_FILE = "请修改为真实keytab文件名"; + private static final String USER_KEYTAB_FILE = "user.keytab"; /** * 用户自己申请的机机账号名称 */ - private static final String USER_PRINCIPAL = "请修改为真实用户名称"; + private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"; /** * 新Producer 构造函数 @@ -80,18 +79,19 @@ public class KafkaConfig { public KafkaProducer newProducer() { Properties props = new Properties(); - if (KafkaSecurityUtil.isSecurityModel()) + if (true) { try { logger.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 - KafkaSecurityUtil.securityPrepare(); + LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); props.put(securityProtocol, "SASL_PLAINTEXT"); +// props.put("sasl.mechanism", "GSSAPI"); // 服务名 props.put(saslKerberosServiceName, "kafka"); // 域名 - props.put(kerberosDomainName, "hadoop.hadoop.com"); + props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"); } catch (IOException e) { @@ -109,7 +109,7 @@ public class KafkaConfig { // Broker地址列表 props.put(bootstrapServers,kafkaServers); // 客户端ID -// props.put(clientId, "ruansiProducer"); + props.put(clientId, "ruansiProducer"); // Key序列化类 props.put(keySerializer, "org.apache.kafka.common.serialization.IntegerSerializer"); @@ -137,12 +137,6 @@ public class KafkaConfig { return producer; } - @Bean - public KafkaAdmin admin(KafkaProperties properties){ - KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties()); - admin.setFatalIfBrokerNotAvailable(true); - return admin; - } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaSecurityUtil.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaSecurityUtil.java index 48730ce7..acbc5468 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaSecurityUtil.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaSecurityUtil.java @@ -25,7 +25,7 @@ public class KafkaSecurityUtil { /** * 用户自己申请的机机账号名称 */ - private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@HADOOP.COM"; + private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"; public static void securityPrepare() throws IOException { @@ -34,17 +34,17 @@ public class KafkaSecurityUtil { //String krbFile = filePath + "krb5.conf"; //ClassPathResource classPathResource = new ClassPathResource("krb5.conf"); //String krbFile = classPathResource.getAbsolutePath(); - String krbFile = "/rsoft/krb5.conf"; + String krbFile = "/rsoft/config/krb5.conf"; // String userKeyTableFile = filePath + USER_KEYTAB_FILE; //ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE); - String userKeyTableFile = "/rsoft/user.keytab"; + String userKeyTableFile = "/rsoft/config/user.keytab"; //windows路径下分隔符替换 userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); krbFile = krbFile.replace("\\", "\\\\"); LoginUtil.setKrb5Config(krbFile); - LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setZookeeperServerPrincipal("zookeeper/A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"); //logger.error("userKeyTableFile路径---{}",userKeyTableFile); LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/LoginUtil.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/LoginUtil.java index 2b5f2aac..9f451f53 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/LoginUtil.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/LoginUtil.java @@ -2,144 +2,98 @@ package org.dromara.data2es.config; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; +import java.util.Properties; -/** - *

description:

- * - * @author chenle - * @date 2021-11-01 14:30 - */ public class LoginUtil { - private static final Log LOG = LogFactory.getLog(LoginUtil.class); + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) + { + this.name = name; + } + + public String getName() + { + return name; + } + } + + /** + * line operator string + */ private static final String LINE_SEPARATOR = System.getProperty("line.separator"); - private static final String ES = "es."; + + /** + * jaas file postfix + */ private static final String JAAS_POSTFIX = ".jaas.conf"; - private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; - private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; - private static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config"; - private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf"; + + /** + * is IBM jdk or not + */ private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); - private static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; - private static boolean WriteFlag = false; - public LoginUtil() { - } + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; - static void setKrb5Config(String krb5ConfFile) throws IOException { - System.setProperty("java.security.krb5.conf", krb5ConfFile); - String ret = System.getProperty("java.security.krb5.conf"); - if (ret == null) { - LOG.error("java.security.krb5.conf is null."); - throw new IOException("java.security.krb5.conf is null."); - } else if (!ret.equals(krb5ConfFile)) { - LOG.error("java.security.krb5.conf is " + ret + " is not " + krb5ConfFile + "."); - throw new IOException("java.security.krb5.conf is " + ret + " is not " + krb5ConfFile + "."); - } - } + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; - static synchronized void setJaasFile(String principal, String keytabPath) throws IOException { - //String filePath = keytabPath.substring(0, keytabPath.lastIndexOf(File.separator)); -// String jaasPath = filePath + File.separator + "es." + System.getProperty("user.name") + ".jaas.conf"; -// jaasPath = jaasPath.replace("\\", "\\\\"); + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { String jaasPath = - new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") - + JAAS_POSTFIX; - - keytabPath = keytabPath.replace("\\", "\\\\"); - LOG.info("jaasPath is {} " + jaasPath); - LOG.info("keytabPath is " + keytabPath); - if ((new File(jaasPath)).exists()) { - if (!WriteFlag) { - deleteJaasFile(jaasPath); - writeJaasFile(jaasPath, principal, keytabPath); - System.setProperty("java.security.auth.login.config", jaasPath); - WriteFlag = true; - } - } else { - writeJaasFile(jaasPath, principal, keytabPath); - System.setProperty("java.security.auth.login.config", jaasPath); - WriteFlag = true; - } + new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + + JAAS_POSTFIX; + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 + deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); } - private static void writeJaasFile(String jaasPath, String principal, String keytabPath) throws IOException { - try { - FileWriter writer = new FileWriter(new File(jaasPath)); - - try { - writer.write(getJaasConfContext(principal, keytabPath)); - writer.flush(); - } catch (Throwable var7) { - try { - writer.close(); - } catch (Throwable var6) { - var7.addSuppressed(var6); - } - - throw var7; - } - - writer.close(); - } catch (IOException var8) { - throw new IOException("Failed to create jaas.conf File"); - } - } - - private static void deleteJaasFile(String jaasPath) throws IOException { - File jaasFile = new File(jaasPath); - if (jaasFile.exists() && !jaasFile.delete()) { - throw new IOException("Failed to delete exists jaas file."); - } - } - - private static String getJaasConfContext(String principal, String keytabPath) { - Module[] allModule = Module.values(); - StringBuilder builder = new StringBuilder(); - Module[] var4 = allModule; - int var5 = allModule.length; - - for(int var6 = 0; var6 < var5; ++var6) { - Module modlue = var4[var6]; - builder.append(getModuleContext(principal, keytabPath, modlue)); - } - - return builder.toString(); - } - - private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { - StringBuilder builder = new StringBuilder(); - if (IS_IBM_JDK) { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append("com.ibm.security.auth.module.Krb5LoginModule required").append(LINE_SEPARATOR); - builder.append("credsType=both").append(LINE_SEPARATOR); - builder.append("principal=\"").append(userPrincipal).append("\"").append(LINE_SEPARATOR); - builder.append("useKeytab=\"").append(keyTabPath).append("\"").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } else { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append("com.sun.security.auth.module.Krb5LoginModule required").append(LINE_SEPARATOR); - builder.append("useKeyTab=true").append(LINE_SEPARATOR); - builder.append("keyTab=\"").append(keyTabPath).append("\"").append(LINE_SEPARATOR); - builder.append("principal=\"").append(userPrincipal).append("\"").append(LINE_SEPARATOR); - builder.append("useTicketCache=false").append(LINE_SEPARATOR); - builder.append("storeKey=true").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } - - return builder.toString(); - } - - /** * 设置zookeeper服务端principal * @@ -147,8 +101,7 @@ public class LoginUtil { * @throws IOException */ public static void setZookeeperServerPrincipal(String zkServerPrincipal) - throws IOException - { + throws IOException { System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); if (ret == null) @@ -161,17 +114,148 @@ public class LoginUtil { } } - public static enum Module { - Elasticsearch("EsClient"); - - private String name; - - private Module(String name) { - this.name = name; + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); } - - public String getName() { - return this.name; + if (!ret.equals(krb5ConfFile)) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); } } + + /** + * 写入jaas文件 + * + * @throws IOException + * 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try + { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } + catch (IOException e) + { + throw new IOException("Failed to create jaas.conf File"); + } + finally + { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) + { + if (!jaasFile.delete()) + { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) + { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/rsoft/config/"; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) + { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return isSecurity; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java index 438d8507..438bc8ab 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java @@ -13,6 +13,7 @@ import org.apache.commons.lang.StringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.domain.EsGpsInfo; import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.dromara.data2es.producer.NewProducer; import org.dromara.data2es.service.IGpsService; import org.dromara.data2es.util.ConfigConstants; import org.elasticsearch.action.bulk.BulkRequest; @@ -23,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Async; import java.io.IOException; @@ -34,7 +34,7 @@ import java.util.concurrent.CompletableFuture; public class RequestHandler { @Autowired - private KafkaTemplate kafkaTemplate; + private NewProducer producer; @Autowired private RestHighLevelClient restHighLevelClient; @@ -71,11 +71,11 @@ public class RequestHandler { //kafkaProducer.send(esGpsInfo, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+ infoSource); //todo 2023年3月30日 cpu过载暂时隐藏 - kafkaTemplate.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2)); + producer.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2)); //kafkaProducer.send(esGpsInfoVO2, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType); //地市的kafka数据,如接收地市某个设备的数据可以对接此kafka topic //todo 暂时隐藏 - kafkaTemplate.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2)); + producer.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2)); } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/NewProducer.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/NewProducer.java index 462df090..74d996f0 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/NewProducer.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/producer/NewProducer.java @@ -16,7 +16,7 @@ import javax.annotation.Resource; * @author chenle * @date 2021-11-01 17:20 */ -//@Component +@Component public class NewProducer { @Autowired diff --git a/stwzhj-modules/stwzhj-data2es/src/main/resources/application.yml b/stwzhj-modules/stwzhj-data2es/src/main/resources/application.yml index ea12e957..141dcfbe 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/resources/application.yml +++ b/stwzhj-modules/stwzhj-data2es/src/main/resources/application.yml @@ -10,7 +10,8 @@ spring: profiles: # 环境配置 active: @profiles.active@ - + autoconfigure: + exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration --- # nacos 配置 spring: cloud: diff --git a/stwzhj-modules/stwzhj-data2es/src/main/resources/kafkaSecurityMode b/stwzhj-modules/stwzhj-data2es/src/main/resources/kafkaSecurityMode new file mode 100644 index 00000000..ed59a5e4 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2es/src/main/resources/kafkaSecurityMode @@ -0,0 +1 @@ +kafka.client.security.mode = yes diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java index 222f08aa..5ad6118b 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/schedule/DeviceRedisSchedule.java @@ -39,7 +39,7 @@ public class DeviceRedisSchedule { /* * 把Redis中 online_user数据存入t_device_redis表中 * */ -// @Scheduled(cron = "0/30 * * * * ?") + @Scheduled(cron = "0/30 * * * * ?") public void handleDeviceRedis(){ List jlist = RedisUtils.searchAndGetKeysValues("online_users:*"); redisService.insertBatch(BeanUtil.copyToList(jlist, DeviceRedis.class));