diff --git a/pom.xml b/pom.xml
index db274477..bf7c4a07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,12 +89,12 @@
description:
+ * + * @author chenle + * @date 2021-11-03 14:15 + */ +@Component +public class KafkaConfig { + + private Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka +// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网 +// private String kafkaServers = "34.72.62.93:9092";//六安视频网 +// private String kafkaServers = "127.0.0.1:9092";//本地 +// private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供 + + private String groupId = "ruansiProducer"; + + + + + // Broker地址列表 + private final String bootstrapServers = "bootstrap.servers"; + + // 客户端ID + private final String clientId = "client.id"; + + // Key序列化类 + private final String keySerializer = "key.serializer"; + + // Value序列化类 + private final String valueSerializer = "value.serializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final String securityProtocol = "security.protocol"; + + // 服务名 + private final String saslKerberosServiceName = "sasl.kerberos.service.name"; + + // 域名 + private final String kerberosDomainName = "kerberos.domain.name"; + + //默认发送20条消息 + private final int messageNumToSend = 100; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"; + + /** + * 新Producer 构造函数 + * @param + * @param + */ + + @Bean(name = "myKafkaProducer") + public KafkaProducer newProducer() { + Properties props = new Properties(); + + if (true) + { + try + { + logger.info("Securitymode start."); + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); + props.put(securityProtocol, "SASL_PLAINTEXT"); +// props.put("sasl.mechanism", "GSSAPI"); + // 服务名 + props.put(saslKerberosServiceName, "kafka"); + // 域名 + props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"); + } + catch (IOException e) + { + logger.error("Security prepare failure."); + logger.error("The IOException occured.", e); + return null; + } + logger.info("Security prepare success."); + }else{ + props.put(securityProtocol, "PLAINTEXT"); + } + + + + // Broker地址列表 + props.put(bootstrapServers,kafkaServers); + // 客户端ID + props.put(clientId, "ruansiProducer"); + // Key序列化类 + props.put(keySerializer, + "org.apache.kafka.common.serialization.IntegerSerializer"); + // Value序列化类 + props.put(valueSerializer, + "org.apache.kafka.common.serialization.StringSerializer"); + //批量发送信息配置 + props.put("batch.size", 16384); + props.put("linger.ms", 1); + props.put("buffer.memory", 33554432); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + //props.put(securityProtocol, "SASL_PLAINTEXT"); +// // 服务名 +// props.put(saslKerberosServiceName, "kafka"); +// // 域名 +// props.put(kerberosDomainName, "hadoop.hadoop.com"); + //设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition + //props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner"); +// props.put(securityProtocol, "SASL_PLAINTEXT"); +// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";"); +// props.put("sasl.mechanism", "SCRAM-SHA-256"); +// KafkaProducerdescription:
- * - * @author chenle - * @date 2021-10-28 14:48 - */ -public class KafkaSecurityUtil { - - - - - static Logger logger = LoggerFactory.getLogger(KafkaSecurityUtil.class); - - public static void main(String[] args) { - EsGpsInfo esGpsInfo = new EsGpsInfo(); - String realtime = "2021/11/04 12:00:11"; - DateTime dateTime = DateUtil.parse(realtime); - esGpsInfo.setGpsTime(dateTime.toJdkDate()); - logger.info("esGpsInfo:{},deviceType={},gpsTime={}",esGpsInfo.toString(), - esGpsInfo.getDeviceType(),dateTime.toJdkDate().toString()); - } - /** - * 用户自己申请的机机账号keytab文件名称 - */ - private static final String USER_KEYTAB_FILE = "user.keytab"; - - /** - * 用户自己申请的机机账号名称 - */ - private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM"; - - public static void securityPrepare() throws IOException - { - logger.error("进入了---securityPrepare"); - //String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; - //String krbFile = filePath + "krb5.conf"; - //ClassPathResource classPathResource = new ClassPathResource("krb5.conf"); - //String krbFile = classPathResource.getAbsolutePath(); - String krbFile = "/gpsstore/krb5.conf"; -// String userKeyTableFile = filePath + USER_KEYTAB_FILE; - //ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE); - String userKeyTableFile = "/gpsstore/user.keytab"; - - //windows路径下分隔符替换 - userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); - krbFile = krbFile.replace("\\", "\\\\"); - - LoginUtil.setKrb5Config(krbFile); - LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); - logger.error("userKeyTableFile路径---{}",userKeyTableFile); - LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); - } - - public static Boolean isSecurityModel() - { - Boolean isSecurity = false; - //String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; - //ClassPathResource classPathResource = new ClassPathResource("kafkaSecurityMode"); - InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafkaSecurityMode"); - - /*File file = classPathResource.getFile(); - - if(!file.exists()){ - return isSecurity; - }*/ - - Properties securityProps = new Properties(); - - - try - { - securityProps.load(inputStream); - if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) - { - isSecurity = true; - } - } - catch (Exception e) - { - logger.info("The Exception occured : {}.", e); - } - - return isSecurity; - } - - /* - * 判断文件是否存在 - */ - private static boolean isFileExists(String fileName) - { - File file = new File(fileName); - - return file.exists(); - } -} diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java deleted file mode 100644 index 8fa4b0bf..00000000 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/LoginUtil.java +++ /dev/null @@ -1,215 +0,0 @@ -package org.dromara.kafka.consumer.handler; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; - -/** - *description:
- * - * @author chenle - * @date 2021-10-28 15:40 - */ -public class LoginUtil -{ - - public enum Module - { - STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); - - private String name; - - private Module(String name) - { - this.name = name; - } - - public String getName() - { - return name; - } - } - - /** - * line operator string - */ - private static final String LINE_SEPARATOR = System.getProperty("line.separator"); - - /** - * jaas file postfix - */ - private static final String JAAS_POSTFIX = ".jaas.conf"; - - /** - * is IBM jdk or not - */ - private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); - - /** - * IBM jdk login module - */ - private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; - - /** - * oracle jdk login module - */ - private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; - - /** - * Zookeeper quorum principal. - */ - public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; - - /** - * java security krb5 file path - */ - public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; - - /** - * java security login file path - */ - public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; - - /** - * 设置jaas.conf文件 - * - * @param principal - * @param keytabPath - * @throws IOException - */ - public static void setJaasFile(String principal, String keytabPath) - throws IOException - { - String jaasPath = - new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") - + JAAS_POSTFIX; - - // windows路径下分隔符替换 - jaasPath = jaasPath.replace("\\", "\\\\"); - // 删除jaas文件 - deleteJaasFile(jaasPath); - writeJaasFile(jaasPath, principal, keytabPath); - System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); - } - - /** - * 设置zookeeper服务端principal - * - * @param zkServerPrincipal - * @throws IOException - */ - public static void setZookeeperServerPrincipal(String zkServerPrincipal) - throws IOException - { - System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); - String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); - if (ret == null) - { - throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); - } - if (!ret.equals(zkServerPrincipal)) - { - throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); - } - } - - /** - * 设置krb5文件 - * - * @param krb5ConfFile - * @throws IOException - */ - public static void setKrb5Config(String krb5ConfFile) - throws IOException - { - System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); - String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); - if (ret == null) - { - throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); - } - if (!ret.equals(krb5ConfFile)) - { - throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); - } - } - - /** - * 写入jaas文件 - * - * @throws IOException - * 写文件异常 - */ - private static void writeJaasFile(String jaasPath, String principal, String keytabPath) - throws IOException - { - FileWriter writer = new FileWriter(new File(jaasPath)); - try - { - writer.write(getJaasConfContext(principal, keytabPath)); - writer.flush(); - } - catch (IOException e) - { - throw new IOException("Failed to create jaas.conf File"); - } - finally - { - writer.close(); - } - } - - private static void deleteJaasFile(String jaasPath) - throws IOException - { - File jaasFile = new File(jaasPath); - if (jaasFile.exists()) - { - if (!jaasFile.delete()) - { - throw new IOException("Failed to delete exists jaas file."); - } - } - } - - private static String getJaasConfContext(String principal, String keytabPath) - { - Module[] allModule = Module.values(); - StringBuilder builder = new StringBuilder(); - for (Module modlue : allModule) - { - builder.append(getModuleContext(principal, keytabPath, modlue)); - } - return builder.toString(); - } - - private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) - { - StringBuilder builder = new StringBuilder(); - if (IS_IBM_JDK) - { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); - builder.append("credsType=both").append(LINE_SEPARATOR); - builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); - builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } - else - { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); - builder.append("useKeyTab=true").append(LINE_SEPARATOR); - builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); - builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); - builder.append("useTicketCache=false").append(LINE_SEPARATOR); - builder.append("storeKey=true").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } - - return builder.toString(); - } -} - diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java index 29307c6f..c294cd08 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java @@ -4,6 +4,7 @@ package org.dromara.kafka.consumer.handler; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.dromara.kafka.consumer.config.KafkaPropertiesConfig; +import org.dromara.kafka.consumer.config.LoginUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -55,17 +56,17 @@ public class RealConsumer implements CommandLineRunner { groupId = "group_ruansi_xuancheng"; cityCode = "3418"; if(args.length > 0){ - /*kafkaServers = args[0]; + kafkaServers = args[0]; topics = args[1]; groupId = args[2]; - cityCode = args[3];*/ + cityCode = args[3]; } ExecutorService executorService = Executors.newSingleThreadExecutor(); Map kafkaProp = getKafkaProp(); - if (KafkaSecurityUtil.isSecurityModel()) + if (LoginUtil.isSecurityModel()) { try { @@ -78,7 +79,7 @@ public class RealConsumer implements CommandLineRunner { kafkaProp.put("sasl.kerberos.service.name","kafka"); //域名 kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com"); - KafkaSecurityUtil.securityPrepare(); + LoginUtil.setJaasFile("",""); } catch (IOException e) { diff --git a/stwzhj-modules/stwzhj-data2es/pom.xml b/stwzhj-modules/stwzhj-data2es/pom.xml index 06133650..51890e0c 100644 --- a/stwzhj-modules/stwzhj-data2es/pom.xml +++ b/stwzhj-modules/stwzhj-data2es/pom.xml @@ -114,42 +114,66 @@description:
+ * + * @author chenle + * @date 2021-11-01 14:25 + */ +public class HwRestClient { + private static final Log LOG = LogFactory.getLog(HwRestClient.class); + private static String isSecureMode; + private static String esServerHost; + private static int connectTimeout; + private static int socketTimeout; + private static int connectionRequestTimeout; + private static int maxConnTotal; + private static int maxConnPerRoute; + private static String principal; + private static final String UN_SECURITY_MODE = "false"; + private static final String COLON = ":"; + private static final String COMMA = ","; + private static HttpHost[] hostArray; + private String configPath; + private static boolean SNIFFER_ENABLE = false; + + private static final String username = "yhy_ahrs_rcw@HADOOP.COM"; + + public static final String password = "Ycgis!2509"; + + public HwRestClient(String configPath) { + this.configPath = configPath; + if (!this.getConfig()) { + LOG.error("Get config failed."); + } + + } + + public HwRestClient() { + /*this.configPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; + if (!this.getConfig()) { + LOG.error("Get config failed."); + }*/ + if (!this.getConfig()) { + LOG.error("Get config failed."); + } + this.configPath = "/rsoft/config/"; + } + + private boolean getConfig() { + Properties properties = new Properties(); + ClassPathResource classPathResource = new ClassPathResource("/esParams.properties"); + + try { + properties.load(classPathResource.getInputStream()); + } catch (IOException var5) { + LOG.error("Failed to load properties file : " + var5.getMessage()); + return false; + } + + try { + esServerHost = properties.getProperty("esServerHost"); + connectTimeout = Integer.valueOf(properties.getProperty("connectTimeout")); + socketTimeout = Integer.valueOf(properties.getProperty("socketTimeout")); + connectionRequestTimeout = Integer.valueOf(properties.getProperty("connectionRequestTimeout")); + maxConnPerRoute = Integer.valueOf(properties.getProperty("maxConnPerRoute")); + maxConnTotal = Integer.valueOf(properties.getProperty("maxConnTotal")); + isSecureMode = properties.getProperty("isSecureMode"); + principal = properties.getProperty("principal"); + SNIFFER_ENABLE = Boolean.valueOf(properties.getProperty("snifferEnable")); + LOG.info("esServerHost:" + esServerHost); + LOG.info("connectTimeout:" + connectTimeout); + LOG.info("socketTimeout:" + socketTimeout); + LOG.info("connectionRequestTimeout:" + connectionRequestTimeout); + LOG.info("maxConnPerRouteTotal:" + maxConnPerRoute); + LOG.info("maxConnTotal:" + maxConnTotal); + LOG.info("isSecureMode:" + isSecureMode); + LOG.info("principal:" + principal); + return true; + } catch (NumberFormatException var4) { + LOG.error("Failed to get parameters !", var4); + return false; + } + } + + public boolean isSnifferEnable() { + return SNIFFER_ENABLE; + } + + private HttpHost[] getHostArray() { + String schema; + if ("false".equals(isSecureMode)) { + schema = "http"; + } else { + schema = "https"; + } + + Listdescription:
- * - * @author chenle - * @date 2021-11-01 14:30 - */ public class LoginUtil { - private static final Log LOG = LogFactory.getLog(LoginUtil.class); + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) + { + this.name = name; + } + + public String getName() + { + return name; + } + } + + /** + * line operator string + */ private static final String LINE_SEPARATOR = System.getProperty("line.separator"); - private static final String ES = "es."; + + /** + * jaas file postfix + */ private static final String JAAS_POSTFIX = ".jaas.conf"; - private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; - private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; - private static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config"; - private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf"; + + /** + * is IBM jdk or not + */ private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); - private static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; - private static boolean WriteFlag = false; - public LoginUtil() { - } + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; - static void setKrb5Config(String krb5ConfFile) throws IOException { - System.setProperty("java.security.krb5.conf", krb5ConfFile); - String ret = System.getProperty("java.security.krb5.conf"); - if (ret == null) { - LOG.error("java.security.krb5.conf is null."); - throw new IOException("java.security.krb5.conf is null."); - } else if (!ret.equals(krb5ConfFile)) { - LOG.error("java.security.krb5.conf is " + ret + " is not " + krb5ConfFile + "."); - throw new IOException("java.security.krb5.conf is " + ret + " is not " + krb5ConfFile + "."); - } - } + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; - static synchronized void setJaasFile(String principal, String keytabPath) throws IOException { - //String filePath = keytabPath.substring(0, keytabPath.lastIndexOf(File.separator)); -// String jaasPath = filePath + File.separator + "es." + System.getProperty("user.name") + ".jaas.conf"; -// jaasPath = jaasPath.replace("\\", "\\\\"); + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { String jaasPath = - new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") - + JAAS_POSTFIX; - - keytabPath = keytabPath.replace("\\", "\\\\"); - LOG.info("jaasPath is {} " + jaasPath); - LOG.info("keytabPath is " + keytabPath); - if ((new File(jaasPath)).exists()) { - if (!WriteFlag) { - deleteJaasFile(jaasPath); - writeJaasFile(jaasPath, principal, keytabPath); - System.setProperty("java.security.auth.login.config", jaasPath); - WriteFlag = true; - } - } else { - writeJaasFile(jaasPath, principal, keytabPath); - System.setProperty("java.security.auth.login.config", jaasPath); - WriteFlag = true; - } + new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + + JAAS_POSTFIX; + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 + deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); } - private static void writeJaasFile(String jaasPath, String principal, String keytabPath) throws IOException { - try { - FileWriter writer = new FileWriter(new File(jaasPath)); - - try { - writer.write(getJaasConfContext(principal, keytabPath)); - writer.flush(); - } catch (Throwable var7) { - try { - writer.close(); - } catch (Throwable var6) { - var7.addSuppressed(var6); - } - - throw var7; - } - - writer.close(); - } catch (IOException var8) { - throw new IOException("Failed to create jaas.conf File"); - } - } - - private static void deleteJaasFile(String jaasPath) throws IOException { - File jaasFile = new File(jaasPath); - if (jaasFile.exists() && !jaasFile.delete()) { - throw new IOException("Failed to delete exists jaas file."); - } - } - - private static String getJaasConfContext(String principal, String keytabPath) { - Module[] allModule = Module.values(); - StringBuilder builder = new StringBuilder(); - Module[] var4 = allModule; - int var5 = allModule.length; - - for(int var6 = 0; var6 < var5; ++var6) { - Module modlue = var4[var6]; - builder.append(getModuleContext(principal, keytabPath, modlue)); - } - - return builder.toString(); - } - - private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { - StringBuilder builder = new StringBuilder(); - if (IS_IBM_JDK) { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append("com.ibm.security.auth.module.Krb5LoginModule required").append(LINE_SEPARATOR); - builder.append("credsType=both").append(LINE_SEPARATOR); - builder.append("principal=\"").append(userPrincipal).append("\"").append(LINE_SEPARATOR); - builder.append("useKeytab=\"").append(keyTabPath).append("\"").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } else { - builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); - builder.append("com.sun.security.auth.module.Krb5LoginModule required").append(LINE_SEPARATOR); - builder.append("useKeyTab=true").append(LINE_SEPARATOR); - builder.append("keyTab=\"").append(keyTabPath).append("\"").append(LINE_SEPARATOR); - builder.append("principal=\"").append(userPrincipal).append("\"").append(LINE_SEPARATOR); - builder.append("useTicketCache=false").append(LINE_SEPARATOR); - builder.append("storeKey=true").append(LINE_SEPARATOR); - builder.append("debug=true;").append(LINE_SEPARATOR); - builder.append("};").append(LINE_SEPARATOR); - } - - return builder.toString(); - } - - /** * 设置zookeeper服务端principal * @@ -147,8 +101,7 @@ public class LoginUtil { * @throws IOException */ public static void setZookeeperServerPrincipal(String zkServerPrincipal) - throws IOException - { + throws IOException { System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); if (ret == null) @@ -161,17 +114,148 @@ public class LoginUtil { } } - public static enum Module { - Elasticsearch("EsClient"); - - private String name; - - private Module(String name) { - this.name = name; + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); } - - public String getName() { - return this.name; + if (!ret.equals(krb5ConfFile)) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); } } + + /** + * 写入jaas文件 + * + * @throws IOException + * 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try + { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } + catch (IOException e) + { + throw new IOException("Failed to create jaas.conf File"); + } + finally + { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) + { + if (!jaasFile.delete()) + { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) + { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/rsoft/config/"; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) + { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return isSecurity; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java index 438d8507..438bc8ab 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RequestHandler.java @@ -13,6 +13,7 @@ import org.apache.commons.lang.StringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.data2es.domain.EsGpsInfo; import org.dromara.data2es.domain.EsGpsInfoVO2; +import org.dromara.data2es.producer.NewProducer; import org.dromara.data2es.service.IGpsService; import org.dromara.data2es.util.ConfigConstants; import org.elasticsearch.action.bulk.BulkRequest; @@ -23,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Async; import java.io.IOException; @@ -34,7 +34,7 @@ import java.util.concurrent.CompletableFuture; public class RequestHandler { @Autowired - private KafkaTemplate