diff --git a/stwzhj-modules/stwzhj-consumer/pom.xml b/stwzhj-modules/stwzhj-consumer/pom.xml
index 7b704482..749c6a15 100644
--- a/stwzhj-modules/stwzhj-consumer/pom.xml
+++ b/stwzhj-modules/stwzhj-consumer/pom.xml
@@ -96,11 +96,6 @@
stwzhj-api-data2es
-
- org.apache.kafka
- kafka-clients
- 2.4.0-hw-ei-302002
-
com.ruansee.app
@@ -137,16 +132,48 @@
- org.springframework.kafka
- spring-kafka
+ org.apache.kafka
+ kafka_2.12
+ 3.6.1-h0.cbu.mrs.350.r11
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ net.sf.jopt-simple
+ jopt-simple
+
+
+ com.huawei.mrs
+ manager-wc2frm
+
org.apache.kafka
kafka-clients
+
+ org.xerial.snappy
+ snappy-java
+
+
+ com.huawei.mrs
+ om-controller-api
+
+
+ com.101tec
+ zkclient
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.6.1-h0.cbu.mrs.350.r11
+
+
+
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java
index 0fe37b81..6f96a401 100644
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java
+++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java
@@ -4,7 +4,6 @@ import com.ruansee.redis.JedisConfig;
import com.ruansee.redis.RedisConfig;
import com.ruansee.redis.RedisUtil;
import com.ruansee.redis.RedissionLockUtil;
-import org.dromara.kafka.consumer.config.KafkaPropertiesConfig;
import org.redisson.spring.starter.RedissonAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -25,7 +24,6 @@ import org.springframework.scheduling.annotation.EnableAsync;
*/
@SpringBootApplication
@EnableAsync
-@EnableConfigurationProperties({KafkaPropertiesConfig.class})
@ServletComponentScan
public class KafkaConsumerApplication {
public static void main(String[] args){
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConfig.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConfig.java
deleted file mode 100644
index b74d31de..00000000
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConfig.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package org.dromara.kafka.consumer.config;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- *
description:
- *
- * @author chenle
- * @date 2021-11-03 14:15
- */
-@Component
-public class KafkaConfig {
-
- private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
-
- private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka
-// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网
-// private String kafkaServers = "34.72.62.93:9092";//六安视频网
-// private String kafkaServers = "127.0.0.1:9092";//本地
-// private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供
-
- private String groupId = "ruansiProducer";
-
-
-
-
- // Broker地址列表
- private final String bootstrapServers = "bootstrap.servers";
-
- // 客户端ID
- private final String clientId = "client.id";
-
- // Key序列化类
- private final String keySerializer = "key.serializer";
-
- // Value序列化类
- private final String valueSerializer = "value.serializer";
-
- // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
- private final String securityProtocol = "security.protocol";
-
- // 服务名
- private final String saslKerberosServiceName = "sasl.kerberos.service.name";
-
- // 域名
- private final String kerberosDomainName = "kerberos.domain.name";
-
- //默认发送20条消息
- private final int messageNumToSend = 100;
-
- /**
- * 用户自己申请的机机账号keytab文件名称
- */
- private static final String USER_KEYTAB_FILE = "user.keytab";
-
- /**
- * 用户自己申请的机机账号名称
- */
- private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
-
- /**
- * 新Producer 构造函数
- * @param
- * @param
- */
-
- @Bean(name = "myKafkaProducer")
- public KafkaProducer newProducer() {
- Properties props = new Properties();
-
- if (true)
- {
- try
- {
- logger.info("Securitymode start.");
- //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
- LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
- props.put(securityProtocol, "SASL_PLAINTEXT");
-// props.put("sasl.mechanism", "GSSAPI");
- // 服务名
- props.put(saslKerberosServiceName, "kafka");
- // 域名
- props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM");
- }
- catch (IOException e)
- {
- logger.error("Security prepare failure.");
- logger.error("The IOException occured.", e);
- return null;
- }
- logger.info("Security prepare success.");
- }else{
- props.put(securityProtocol, "PLAINTEXT");
- }
-
-
-
- // Broker地址列表
- props.put(bootstrapServers,kafkaServers);
- // 客户端ID
- props.put(clientId, "ruansiProducer");
- // Key序列化类
- props.put(keySerializer,
- "org.apache.kafka.common.serialization.IntegerSerializer");
- // Value序列化类
- props.put(valueSerializer,
- "org.apache.kafka.common.serialization.StringSerializer");
- //批量发送信息配置
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
- //props.put(securityProtocol, "SASL_PLAINTEXT");
-// // 服务名
-// props.put(saslKerberosServiceName, "kafka");
-// // 域名
-// props.put(kerberosDomainName, "hadoop.hadoop.com");
- //设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition
- //props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
-// props.put(securityProtocol, "SASL_PLAINTEXT");
-// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
-// props.put("sasl.mechanism", "SCRAM-SHA-256");
-// KafkaProducer producer = new KafkaProducer<>(props);
- KafkaProducer producer = new KafkaProducer<>(props);
-
- return producer;
- }
-
-
-
-}
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaProperties.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaProperties.java
index 45606d28..5c4bdd0c 100644
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaProperties.java
+++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaProperties.java
@@ -13,9 +13,10 @@ public final class KafkaProperties
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
// Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限
- public final static String TOPIC = "t_gps_realtime";
+ public final static String TOPIC = "jysb_dwxx";
private static Properties serverProps = new Properties();
+
private static Properties producerProps = new Properties();
private static Properties consumerProps = new Properties();
@@ -26,8 +27,9 @@ public final class KafkaProperties
private KafkaProperties()
{
- String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
-
+// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
+ String filePath = "/home/rsoft/config/";
+ LOG.info("路径=={}",filePath);
try
{
File proFile = new File(filePath + "producer.properties");
@@ -48,14 +50,14 @@ public final class KafkaProperties
if (serFile.exists())
{
- serverProps.load(new FileInputStream(filePath + "server.properties"));
+ serverProps.load(new FileInputStream(filePath + "server.properties"));
}
File cliFile = new File(filePath + "client.properties");
if (cliFile.exists())
{
- clientProps.load(new FileInputStream(filePath + "client.properties"));
+ clientProps.load(new FileInputStream(filePath + "client.properties"));
}
}
catch (IOException e)
@@ -75,11 +77,11 @@ public final class KafkaProperties
}
/**
- * 获取参数值
- * @param key properites的key值
- * @param defValue 默认值
- * @return
- */
+ * 获取参数值
+ * @param key properites的key值
+ * @param defValue 默认值
+ * @return
+ */
public String getValues(String key, String defValue)
{
String rtValue = null;
@@ -105,10 +107,10 @@ public final class KafkaProperties
}
/**
- * 根据key值获取server.properties的值
- * @param key
- * @return
- */
+ * 根据key值获取server.properties的值
+ * @param key
+ * @return
+ */
private String getPropertiesValue(String key)
{
String rtValue = serverProps.getProperty(key);
@@ -128,7 +130,7 @@ public final class KafkaProperties
// consumer没有,则再向client.properties中获取
if (null == rtValue)
{
- rtValue = clientProps.getProperty(key);
+ rtValue = clientProps.getProperty(key);
}
return rtValue;
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaPropertiesConfig.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaPropertiesConfig.java
deleted file mode 100644
index 5f4edabf..00000000
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaPropertiesConfig.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.dromara.kafka.consumer.config;
-
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Profile;
-
-/**
- * description:
- *
- * @author chenle
- * @date 2021-09-06 15:13
- */
-@ConfigurationProperties(prefix = "mykafka")
-@Profile(value = "dev")
-public
-class KafkaPropertiesConfig {
- private String serverUrl;
-
- private MyConsumerProperties consumerProperties = new MyConsumerProperties();
-
- public String getServerUrl() {
- return serverUrl;
- }
-
- public void setServerUrl(String serverUrl) {
- this.serverUrl = serverUrl;
- }
-
- public MyConsumerProperties getConsumerProperties() {
- return consumerProperties;
- }
-
- public void setConsumerProperties(MyConsumerProperties consumerProperties) {
- this.consumerProperties = consumerProperties;
- }
-}
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/LoginUtil.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/LoginUtil.java
index 2697dbff..a6f5c719 100644
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/LoginUtil.java
+++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/LoginUtil.java
@@ -205,7 +205,7 @@ public class LoginUtil {
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
- String filePath = "/rsoft/config/";
+ String filePath = "/home/rsoft/config/";
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + keyTabFile;
@@ -225,8 +225,8 @@ public class LoginUtil {
*/
public static Boolean isSecurityModel() {
Boolean isSecurity = false;
- String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
-
+// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
+ String krbFilePath = "/home/rsoft/config/kafkaSecurityMode";
Properties securityProps = new Properties();
// file does not exist.
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/MyConsumerProperties.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/MyConsumerProperties.java
deleted file mode 100644
index 040def94..00000000
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/MyConsumerProperties.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.dromara.kafka.consumer.config;
-
-/**
- * description:
- *
- * @author chenle
- * @date 2021-09-07 14:54
- */
-public class MyConsumerProperties {
- private String clientId;
- private String groupId = "222";
-
- public String getClientId() {
- return clientId;
- }
-
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
-
- public String getGroupId() {
- return groupId;
- }
-
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-}
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java
deleted file mode 100644
index 4bb0d44c..00000000
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java
+++ /dev/null
@@ -1,158 +0,0 @@
-package org.dromara.kafka.consumer.config;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-
-
-public class NewConsumer extends Thread{
- private static final Logger LOG = LoggerFactory.getLogger(NewConsumer.class);
-
- private final KafkaConsumer consumer;
-
- private final String topic;
-
- // 一次请求的最大等待时间
- private final int waitTime = 10000;
-
- // Broker连接地址
- private final String bootstrapServers = "bootstrap.servers";
- // Group id
- private final String groupId = "group.id";
- // 消息内容使用的反序列化类
- private final String valueDeserializer = "value.deserializer";
- // 消息Key值使用的反序列化类
- private final String keyDeserializer = "key.deserializer";
- // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
- private final String securityProtocol = "security.protocol";
- // 服务名
- private final String saslKerberosServiceName = "sasl.kerberos.service.name";
- // 域名
- private final String kerberosDomainName = "kerberos.domain.name";
- // 是否自动提交offset
- private final String enableAutoCommit = "enable.auto.commit";
- // 自动提交offset的时间间隔
- private final String autoCommitIntervalMs = "auto.commit.interval.ms";
-
- // 会话超时时间
- private final String sessionTimeoutMs = "session.timeout.ms";
-
- /**
- * 用户自己申请的机机账号keytab文件名称
- */
- private static final String USER_KEYTAB_FILE = "user.keytab";
-
- /**
- * 用户自己申请的机机账号名称
- */
- private static final String USER_PRINCIPAL = "aqdsj_ruansi";
-
- /**
- * NewConsumer构造函数
- * @param topic 订阅的Topic名称
- */
- public NewConsumer(String topic) {
-
- Properties props = new Properties();
-
- KafkaProperties kafkaProc = KafkaProperties.getInstance();
- // Broker连接地址
- props.put(bootstrapServers,
- kafkaProc.getValues(bootstrapServers, "localhost:21007"));
- // Group id
- props.put(groupId, "DemoConsumer");
- // 是否自动提交offset
- props.put(enableAutoCommit, "true");
- // 自动提交offset的时间间隔
- props.put(autoCommitIntervalMs, "1000");
- // 会话超时时间
- props.put(sessionTimeoutMs, "30000");
- // 消息Key值使用的反序列化类
- props.put(keyDeserializer,
- "org.apache.kafka.common.serialization.IntegerDeserializer");
- // 消息内容使用的反序列化类
- props.put(valueDeserializer,
- "org.apache.kafka.common.serialization.StringDeserializer");
- // 安全协议类型
- props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT"));
- // 服务名
- props.put(saslKerberosServiceName, "kafka");
- // 域名
- props.put(kerberosDomainName, kafkaProc.getValues(kerberosDomainName, "hadoop.hadoop.com"));
- consumer = new KafkaConsumer(props);
- this.topic = topic;
- }
-
- /**
- * 订阅Topic的消息处理函数
- */
- public void doWork()
- {
- // 订阅
- consumer.subscribe(Collections.singletonList(this.topic));
- // 消息消费请求
- ConsumerRecords records = consumer.poll(waitTime);
- // 消息处理
- for (ConsumerRecord record : records)
- {
- LOG.info("[NewConsumerExample], Received message: (" + record.key() + ", " + record.value()
- + ") at offset " + record.offset());
- }
- }
-
-
-
- public static void main(String[] args)
- {
- if (LoginUtil.isSecurityModel())
- {
- try
- {
- LOG.info("Securitymode start.");
-
- //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
- LoginUtil.setJaasFile(USER_PRINCIPAL,USER_KEYTAB_FILE);
- }
- catch (IOException e)
- {
- LOG.error("Security prepare failure.");
- LOG.error("The IOException occured : {}.", e);
- return;
- }
- LOG.info("Security prepare success.");
- }
-
- NewConsumer consumerThread = new NewConsumer(KafkaProperties.TOPIC);
- consumerThread.start();
-
- // 等到60s后将consumer关闭,实际执行过程中可修改
- try
- {
- Thread.sleep(60000);
- }
- catch (InterruptedException e)
- {
- LOG.info("The InterruptedException occured : {}.", e);
- }
- finally
- {
- consumerThread.shutdown();
- consumerThread.consumer.close();
- }
- }
-
- @Override
- public synchronized void start() {
- doWork();
- }
-
- private void shutdown(){
- Thread.currentThread().interrupt();
- }
-}
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/filters/MyFilter.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/filters/MyFilter.java
deleted file mode 100644
index fcae2d78..00000000
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/filters/MyFilter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.dromara.kafka.consumer.filters;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.*;
-import javax.servlet.annotation.WebFilter;
-import javax.servlet.http.HttpServletRequest;
-import java.io.IOException;
-
-/**
- * description:
- *
- * @author chenle
- * @date 2021-09-08 15:40
- */
-@WebFilter(filterName="MyFilter",urlPatterns = "/*")
-public class MyFilter implements Filter {
-
- private Logger logger = LoggerFactory.getLogger(MyFilter.class);
-
- @Override
- public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
- HttpServletRequest request = (HttpServletRequest) servletRequest;
- String queryString = request.getQueryString();
-// logger.error("pre,queryString={}",queryString);
- filterChain.doFilter(servletRequest,servletResponse);
-// logger.error("queryString={}",queryString);
- }
-}
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java
index 37634b12..23daaa9d 100644
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java
+++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java
@@ -27,7 +27,8 @@ import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
/**
- * description:
+ * description: 处理kafka数据并发送到data2es
+ *
*
* @author chenle
* @date 2021-09-06 16:44
@@ -36,78 +37,20 @@ public class ConsumerWorker implements Runnable {
private ConsumerRecord record;
private Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
- public static LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque<>(5000);
- private String cityCode ;
+ public static LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque<>(1000);
- ConsumerWorker(ConsumerRecord record, String cityCode) {
+
+ ConsumerWorker(ConsumerRecord record) {
this.record = record;
- this.cityCode = cityCode;
}
@Override
public void run() {
//其他地市使用的方法,这里使用了一个巧妙的方法,我们开发的地市都是传4位,这种其他地市的cityCode传大于4位,然后截取
- if(cityCode.length() > 4){
- cityCode = cityCode.substring(0,4);
- normalRequest();
- }else {
- //六安、安庆等地市的方法,这些地市都是我们自己公司开发的东西。
- luanrequest();
-// luanrequestBatch();
- }
+ luanrequest();
}
- /*
- * 废弃方法
- * */
- private void luanrequestBatch() {
- Object value = record.value();
- String topic = record.topic();
- List list = new ArrayList<>();
- logger.info("offset={},topic={},value={}", record.offset(), topic,value);
- List jsonObjects = JSON.parseArray((String) value, JSONObject.class);
- for (JSONObject jsonObject : jsonObjects) {
- EsGpsInfo esGpsInfo;
- /*try {
- jsonObject = JSONUtil.parseObj(((String) value));
- }catch (ConvertException e){
- logger.info("jsonObject=null:error={}",e.getMessage());
- return;
- }*/
- try {
- esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class);
- }catch (ConvertException e){
- logger.info("EsGpsInfo=null:error={}",e.getMessage());
- return;
- }
-
- if(Objects.isNull(esGpsInfo)){
- logger.info("esGpsInfo=null no error");
- return;
- }
- String deviceCode = esGpsInfo.getDeviceCode();
- if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
- logger.info("deviceCode:{} is null or is too long ",deviceCode);
- return;
- }
- String latitude = esGpsInfo.getLat();
- if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
- logger.info("latitude:{} is null or is zero ",latitude);
- return;
- }
- String longitude = esGpsInfo.getLng();
- if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
- logger.info("longitude:{} is null or is zero ",longitude);
- return;
- }
- esGpsInfo.setInfoSource(cityCode);
-
- esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime"))));
- list.add(esGpsInfo);
- }
-// dataToEsService.saveGpsInfoBatch(list);
- }
private void luanrequest() {
Object value = record.value();
@@ -148,7 +91,12 @@ public class ConsumerWorker implements Runnable {
logger.info("longitude:{} is null or is zero ",longitude);
return;
}
- esGpsInfo.setInfoSource(cityCode);
+ String infoSource = esGpsInfo.getInfoSource();
+ if(StringUtils.isEmpty(infoSource) ){
+ logger.info("infoSource:{} is null ",infoSource);
+ return;
+ }
+
try {
esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime"))));
}catch (Exception e){
@@ -169,66 +117,4 @@ public class ConsumerWorker implements Runnable {
}
-
- /**
- * 通用的请求(一般地市采用这个方法)
- */
- private void normalRequest() {
- Object value = record.value();
- String topic = record.topic();
-
- logger.info("offset={},topic={},value={}", record.offset(), topic,value);
-
- RemoteGpsInfo esGpsInfo = new RemoteGpsInfo();
- EsGpsInfoVO esGpsInfoVO;
- try {
- esGpsInfoVO = JSONUtil.toBean(((String) value), EsGpsInfoVO.class);
- }catch (ConvertException e){
- logger.info("esGpsInfoVO=null:error={}",e.getMessage());
- return;
- }
- if(Objects.isNull(esGpsInfoVO)){
- logger.info("esGpsInfoVO=null no error");
- return;
- }
-
-
- try {
- DateTime parse = DateUtil.parse(esGpsInfoVO.getGpsTime(), "yyyy-MM-dd HH:mm:ss");
- }catch (Exception e){
- logger.info("gpsTime:{} format error", esGpsInfoVO.getGpsTime());
- return;
- }
-
- String deviceCode = esGpsInfoVO.getDeviceCode();
- if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
- logger.info("deviceCode:{} is null or is too long ",deviceCode);
- return;
- }
- String latitude = esGpsInfoVO.getLatitude();
- if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
- logger.info("latitude:{} is null or is zero ",latitude);
- return;
- }
- String longitude = esGpsInfoVO.getLongitude();
- if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
- logger.info("longitude:{} is null or is zero ",longitude);
- return;
- }
- BeanUtil.copyProperties(esGpsInfoVO,esGpsInfo,new CopyOptions());
- esGpsInfo.setLat(latitude);
- esGpsInfo.setLng(esGpsInfoVO.getLongitude());
- esGpsInfo.setOrientation(esGpsInfoVO.getDirection());
- esGpsInfo.setInfoSource(cityCode);
-
- boolean offer = linkedBlockingDeque.offer(esGpsInfo);
- R response = R.ok(offer);
- if(200 == response.getCode()){
- logger.info("topic={},data2es={}",topic,"success");
- }else{
- logger.error("topic={},data2es={}",topic,"fail");
- }
- }
-
-
}
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaConsumerRunnable.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaConsumerRunnable.java
index 6d21ea5f..532984f0 100644
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaConsumerRunnable.java
+++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/KafkaConsumerRunnable.java
@@ -1,22 +1,16 @@
package org.dromara.kafka.consumer.handler;
-import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.dromara.data2es.api.RemoteDataToEsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.config.KafkaListenerContainerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import org.springframework.kafka.listener.ContainerProperties;
-import org.springframework.kafka.listener.MessageListener;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
@@ -29,68 +23,39 @@ import java.util.concurrent.ThreadPoolExecutor;
*/
public class KafkaConsumerRunnable implements Runnable {
- private Map props;
+ private final KafkaConsumer consumer;
private ThreadPoolExecutor taskExecutor;
private String cityCode;
private Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnable.class);
- public KafkaConsumerRunnable(Map props, ThreadPoolExecutor taskExecutor,
- String cityCode) {
- this.props = props;
+ public KafkaConsumerRunnable(KafkaConsumer consumer, ThreadPoolExecutor taskExecutor) {
+ this.consumer = consumer;
this.taskExecutor = taskExecutor;
- this.cityCode = cityCode;
}
- private DefaultKafkaConsumerFactory buildConsumerFactory(){
- return new DefaultKafkaConsumerFactory(props);
- }
-
- private ContainerProperties containerProperties(String[] topic, MessageListener messageListener) {
- ContainerProperties containerProperties = new ContainerProperties(topic);
- containerProperties.setMessageListener(messageListener);
- return containerProperties;
- }
-
- private KafkaListenerContainerFactory buildListenerFactory(){
- ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
- factory.setConsumerFactory(buildConsumerFactory());
- factory.setConcurrency(4);
- factory.setBatchListener(true);
-
- factory.getContainerProperties().setPollTimeout(3000);
- return factory;
- }
-
-
-
-
-
@Override
public void run() {
- KafkaConsumer consumer = new KafkaConsumer<>(props);
-
- List topics = (List) props.get("topics");
- consumer.subscribe(topics);
+ consumer.subscribe(Collections.singletonList("jysb_dwxx"));
consumer.poll(0); // 令订阅生效
List topicPartitions = new ArrayList<>();
Map> stringListMap = consumer.listTopics();
- for (Object topic : topics) {
- String topic1 = (String) topic;
- List partitionInfos = stringListMap.get(topic1);
- for (PartitionInfo partitionInfo : partitionInfos) {
- TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition());
- topicPartitions.add(partition);
- }
+ String topic1 ="jysb_dwxx";
+ List partitionInfos = stringListMap.get(topic1);
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition());
+ topicPartitions.add(partition);
}
consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
- taskExecutor.submit(new ConsumerWorker(record, cityCode));
+ logger.info("[Consumer], Received message: (" + record.key() + ", " + record.value()
+ + ") at offset " + record.offset());
+ taskExecutor.submit(new ConsumerWorker(record));
}
}
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java
index c294cd08..673dc55e 100644
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java
+++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/RealConsumer.java
@@ -1,9 +1,13 @@
package org.dromara.kafka.consumer.handler;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.dromara.kafka.consumer.config.KafkaPropertiesConfig;
+import org.dromara.kafka.consumer.config.KafkaProperties;
import org.dromara.kafka.consumer.config.LoginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -13,10 +17,11 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
+import javax.annotation.Resource;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@@ -30,102 +35,156 @@ import java.util.concurrent.ThreadPoolExecutor;
@Component
public class RealConsumer implements CommandLineRunner {
- private String kafkaServers;
+ private Logger logger = LoggerFactory.getLogger(RealConsumer.class);
- private String groupId;
+ private final KafkaConsumer consumer;
- private String topics;
-
- private String cityCode = "3400";
-
-
-
- @Autowired
- KafkaPropertiesConfig kafkaPropertiesConfig;
-
- @Autowired
+ @Resource
ThreadPoolExecutor dtpExecutor2;
- private Logger logger = LoggerFactory.getLogger(RealConsumer.class);
-
- @Override
- public void run(String... args) throws Exception {
- kafkaServers = "127.0.0.1:9092";
- topics = "topic.send.2,topic.send.3,topic.send.4,topic.send.5,topic.send.8";
- groupId = "group_ruansi_xuancheng";
- cityCode = "3418";
- if(args.length > 0){
- kafkaServers = args[0];
- topics = args[1];
- groupId = args[2];
- cityCode = args[3];
-
- }
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Map kafkaProp = getKafkaProp();
+ private volatile boolean closed;
+ // 一次请求的最大等待时间(S)
+ private final int waitTime = 1;
+
+ // Broker连接地址
+ private final static String BOOTSTRAP_SERVER = "bootstrap.servers";
+
+ // Group id
+ private final static String GROUP_ID = "group.id";
+
+ // 消息内容使用的反序列化类
+ private final static String VALUE_DESERIALIZER = "value.deserializer";
+
+ // 消息Key值使用的反序列化类
+ private final static String KEY_DESERIALIZER = "key.deserializer";
+
+ // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
+ private final static String SECURITY_PROTOCOL = "security.protocol";
+
+ // 服务名
+ private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
+
+ // 域名
+ private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name";
+
+ // 是否自动提交offset
+ private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit";
+
+ // 自动提交offset的时间间隔
+ private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
+
+ // 会话超时时间
+ private final static String SESSION_TIMEOUT_MS = "session.timeout.ms";
+
+ /**
+ * 用户自己申请的机机账号keytab文件名称
+ */
+ private static final String USER_KEYTAB_FILE = "user.keytab";
+
+ /**
+ * 用户自己申请的机机账号名称
+ */
+ private static final String USER_PRINCIPAL = "yhy_ahrs_rcw";
+
+ /**
+ * Consumer构造函数
+ *
+ * @param
+ */
+ public RealConsumer() {
+ initSecurity();
+ Properties props = initProperties();
+ consumer = new KafkaConsumer(props);
+ // 订阅
+// consumer.subscribe(Collections.singletonList("jysb_dwxx"));
+ }
+
+ public static Properties initProperties() {
+ Properties props = new Properties();
+ KafkaProperties kafkaProc = KafkaProperties.getInstance();
+
+ // Broker连接地址
+ props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
+ // Group id
+ props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));
+ // 是否自动提交offset
+ props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true"));
+ // 自动提交offset的时间间隔
+ props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000"));
+ // 会话超时时间
+ props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000"));
+ // 消息Key值使用的反序列化类
+ props.put(KEY_DESERIALIZER,
+ kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
+ // 消息内容使用的反序列化类
+ props.put(VALUE_DESERIALIZER,
+ kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
+ // 安全协议类型
+ props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
+ // 服务名
+ props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
+ // 域名
+ props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
+
+ return props;
+ }
+
+ /**
+ * 初始化安全认证
+ */
+ public void initSecurity() {
if (LoginUtil.isSecurityModel())
{
- try
- {
+ try {
logger.info("Securitymode start.");
- //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
- //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
- kafkaProp.put("security.protocol","SASL_PLAINTEXT");
- //服务名
- kafkaProp.put("sasl.kerberos.service.name","kafka");
- //域名
- kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com");
- LoginUtil.setJaasFile("","");
- }
- catch (IOException e)
- {
+ // !!注意,安全认证时,需要用户手动修改为自己申请的机机账号
+ LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
+ } catch (IOException e) {
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);
- return;
}
logger.info("Security prepare success.");
}
-
- KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
- executorService.execute(runnable);
}
-
/**
- * 获取kafka配置
- * @return
+ * 订阅Topic的消息处理函数
*/
- private Map getKafkaProp() {
-// Properties map = new Properties();
- Map map = new HashMap<>();
- map.put("bootstrap.servers",kafkaServers);
- map.put("group.id",groupId);
- map.put("enable.auto.commit", "true");
- map.put("auto.commit.interval.ms", "1000");
- map.put("session.timeout.ms", "30000");
- map.put("key.deserializer", StringDeserializer.class);
- map.put("value.deserializer", StringDeserializer.class);
- map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
-// map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1000 * 5);
-// map.put("ack.mode", "manual_immediate");
+ public void run(String... args) throws Exception{
+ try {
+ logger.info("进入消费");
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+// realConsumer.run();
+ KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(consumer,dtpExecutor2);
+ executorService.execute(runnable);
+ // 消息消费请求
+ /* ConsumerRecords records = consumer.poll(Duration.ofSeconds(waitTime));
+ // 消息处理
+ for (ConsumerRecord record : records) {
+ logger.info("[Consumer], Received message: (" + record.key() + ", " + record.value()
+ + ") at offset " + record.offset());
+ dtpExecutor2.submit(new ConsumerWorker(record));
-// //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
-// map.put("security.protocol","SASL_PLAINTEXT");
-// //服务名
-// map.put("sasl.kerberos.service.name","kafka");
-// //域名
-// map.put("kerberos.domain.name","hadoop.hadoop.com");
- String[] split = topics.split(",");
- List list = CollectionUtils.arrayToList(split);
- map.put("topics", list);
- return map;
+ }*/
+ } catch (AuthorizationException | UnsupportedVersionException
+ | RecordDeserializationException e) {
+ logger.error(e.getMessage());
+ // 无法从异常中恢复
+ } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
+ logger.error("Invalid or no offset found, using latest");
+ consumer.seekToEnd(e.partitions());
+ consumer.commitSync();
+ } catch (KafkaException e) {
+ logger.error(e.getMessage());
+ }
}
+
}
diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/interceptors/MyInterceptor.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/interceptors/MyInterceptor.java
index bcae27a2..c53485f9 100644
--- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/interceptors/MyInterceptor.java
+++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/interceptors/MyInterceptor.java
@@ -7,8 +7,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
/**
* description:
diff --git a/stwzhj-modules/stwzhj-data2StKafka/pom.xml b/stwzhj-modules/stwzhj-data2StKafka/pom.xml
index 40034489..0a2bf2a3 100644
--- a/stwzhj-modules/stwzhj-data2StKafka/pom.xml
+++ b/stwzhj-modules/stwzhj-data2StKafka/pom.xml
@@ -48,11 +48,6 @@
stwzhj-common-web
-
- org.dromara
- stwzhj-common-mybatis
-
-
org.dromara
stwzhj-common-dubbo
@@ -93,53 +88,18 @@
stwzhj-common-encrypt
-
- org.dromara
- stwzhj-common-redis
-
-
-
- org.dromara
- stwzhj-api-system
-
-
-
- org.dromara
- stwzhj-api-resource
-
org.dromara
stwzhj-api-data2es
+
- org.elasticsearch.client
- elasticsearch-rest-client
- 7.10.2-h0.cbu.mrs.350.r11
-
-
- org.elasticsearch.client
- elasticsearch-rest-high-level-client
- 7.10.2-h0.cbu.mrs.350.r11
-
-
- org.elasticsearch.plugin
- parent-join-client
-
-
- org.elasticsearch.plugin
- aggs-matrix-stats-client
-
-
-
-
-
- org.elasticsearch
- elasticsearch
- 7.10.2-h0.cbu.mrs.350.r11
-
+ cn.dynamictp
+ dynamic-tp-spring-boot-starter-common
+ 1.1.0
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/AsyncConfig.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/AsyncConfig.java
new file mode 100644
index 00000000..2d4e6b4d
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/AsyncConfig.java
@@ -0,0 +1,69 @@
+package org.dromara.data2kafka.config;
+
+import com.dtp.common.em.QueueTypeEnum;
+import com.dtp.common.em.RejectedTypeEnum;
+import com.dtp.core.support.ThreadPoolBuilder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * description:
+ *
+ * @author chenle
+ * @date 2021-09-06 16:31
+ */
+@Configuration
+public class AsyncConfig {
+
+ @Bean("taskExecutor")
+ public ThreadPoolTaskExecutor taskExecutor(){
+ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+ taskExecutor.setCorePoolSize(8);
+ taskExecutor.setMaxPoolSize(20);
+ taskExecutor.setQueueCapacity(200);
+ taskExecutor.setKeepAliveSeconds(60);
+ taskExecutor.setThreadNamePrefix("hfapp--kafkaConsumer--");
+ taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
+ taskExecutor.setAwaitTerminationSeconds(60);
+ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
+ return taskExecutor;
+ }
+
+ /**
+ * tips: 建议直接在配置中心配置就行,不用 @Bean 声明
+ * @return 线程池实例
+ */
+// @Bean(name = "dtpExecutor2")
+ public ThreadPoolExecutor dtpExecutor2() {
+ return ThreadPoolBuilder.newBuilder()
+ .threadPoolName("dtpExecutor2")
+ .corePoolSize(8)
+ .maximumPoolSize(20)
+ .keepAliveTime(60)
+ .timeUnit(TimeUnit.MILLISECONDS)
+ .workQueue(QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE.getName(), 1024, false)
+ .waitForTasksToCompleteOnShutdown(true)
+ .awaitTerminationSeconds(60)
+ .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
+ .buildDynamic();
+ }
+
+ @Bean(name = "threadPoolExecutor")
+ public ThreadPoolExecutor threadPoolExecutor() {
+ return new ThreadPoolExecutor(
+ 8, // 核心线程数
+ 20, // 最大线程数
+ 60, // 空闲时间300秒
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(10000), // 任务队列最大长度
+ new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:由调用线程处理
+ );
+ }
+
+}
+
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaConfig.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaConfig.java
deleted file mode 100644
index bc3563ec..00000000
--- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaConfig.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package org.dromara.data2kafka.config;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * description:
- *
- * @author chenle
- * @date 2021-11-03 14:15
- */
-@Component
-public class KafkaConfig {
-
- private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
-
- private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka
-
- private String groupId = "ruansiProducer";
-
-
-
-
- // Broker地址列表
- private final String bootstrapServers = "bootstrap.servers";
-
- // 客户端ID
- private final String clientId = "client.id";
-
- // Key序列化类
- private final String keySerializer = "key.serializer";
-
- // Value序列化类
- private final String valueSerializer = "value.serializer";
-
- // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
- private final String securityProtocol = "security.protocol";
-
- // 服务名
- private final String saslKerberosServiceName = "sasl.kerberos.service.name";
-
- // 域名
- private final String kerberosDomainName = "kerberos.domain.name";
-
- //默认发送20条消息
- private final int messageNumToSend = 100;
-
- /**
- * 用户自己申请的机机账号keytab文件名称
- */
- private static final String USER_KEYTAB_FILE = "user.keytab";
-
- /**
- * 用户自己申请的机机账号名称
- */
- private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
-
- /**
- * 新Producer 构造函数
- * @param
- * @param
- */
-
- @Bean(name = "myKafkaProducer")
- public KafkaProducer newProducer() {
- Properties props = new Properties();
-
- if (true)
- {
- try
- {
- logger.info("Securitymode start.");
- //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
- LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
- props.put(securityProtocol, "SASL_PLAINTEXT");
-// props.put("sasl.mechanism", "GSSAPI");
- // 服务名
- props.put(saslKerberosServiceName, "kafka");
- // 域名
- props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM");
- }
- catch (IOException e)
- {
- logger.error("Security prepare failure.");
- logger.error("The IOException occured.", e);
- return null;
- }
- logger.info("Security prepare success.");
- }else{
- props.put(securityProtocol, "PLAINTEXT");
- }
-
-
-
- // Broker地址列表
- props.put(bootstrapServers,kafkaServers);
- // 客户端ID
- props.put(clientId, "ruansiProducer");
- // Key序列化类
- props.put(keySerializer,
- "org.apache.kafka.common.serialization.IntegerSerializer");
- // Value序列化类
- props.put(valueSerializer,
- "org.apache.kafka.common.serialization.StringSerializer");
- //批量发送信息配置
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
- //props.put(securityProtocol, "SASL_PLAINTEXT");
-// // 服务名
-// props.put(saslKerberosServiceName, "kafka");
-// // 域名
-// props.put(kerberosDomainName, "hadoop.hadoop.com");
- //设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition
- //props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
-// props.put(securityProtocol, "SASL_PLAINTEXT");
-// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
-// props.put("sasl.mechanism", "SCRAM-SHA-256");
-// KafkaProducer producer = new KafkaProducer<>(props);
- KafkaProducer producer = new KafkaProducer<>(props);
-
- return producer;
- }
-
-
-
-}
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaProperties.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaProperties.java
new file mode 100644
index 00000000..de4ef6fd
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaProperties.java
@@ -0,0 +1,138 @@
+package org.dromara.data2kafka.config;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class KafkaProperties
+{
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
+
+ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限
+ public final static String TOPIC = "jysb_dwxx";
+
+ private static Properties serverProps = new Properties();
+
+ private static Properties producerProps = new Properties();
+
+ private static Properties consumerProps = new Properties();
+
+ private static Properties clientProps = new Properties();
+
+ private static KafkaProperties instance = null;
+
+ private KafkaProperties()
+ {
+// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
+ String filePath = "/home/rsoft/config/";
+ LOG.info("路径=={}",filePath);
+ try
+ {
+ File proFile = new File(filePath + "producer.properties");
+
+ if (proFile.exists())
+ {
+ producerProps.load(new FileInputStream(filePath + "producer.properties"));
+ }
+
+ File conFile = new File(filePath + "producer.properties");
+
+ if (conFile.exists())
+ {
+ consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
+ }
+
+ File serFile = new File(filePath + "server.properties");
+
+ if (serFile.exists())
+ {
+ serverProps.load(new FileInputStream(filePath + "server.properties"));
+ }
+
+ File cliFile = new File(filePath + "client.properties");
+
+ if (cliFile.exists())
+ {
+ clientProps.load(new FileInputStream(filePath + "client.properties"));
+ }
+ }
+ catch (IOException e)
+ {
+ LOG.info("The Exception occured.", e);
+ }
+ }
+
+ public synchronized static KafkaProperties getInstance()
+ {
+ if (null == instance)
+ {
+ instance = new KafkaProperties();
+ }
+
+ return instance;
+ }
+
+ /**
+ * 获取参数值
+ * @param key properites的key值
+ * @param defValue 默认值
+ * @return
+ */
+ public String getValues(String key, String defValue)
+ {
+ String rtValue = null;
+
+ if (null == key)
+ {
+ LOG.error("key is null");
+ }
+ else
+ {
+ rtValue = getPropertiesValue(key);
+ }
+
+ if (null == rtValue)
+ {
+ LOG.warn("KafkaProperties.getValues return null, key is " + key);
+ rtValue = defValue;
+ }
+
+ LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
+
+ return rtValue;
+ }
+
+ /**
+ * 根据key值获取server.properties的值
+ * @param key
+ * @return
+ */
+ private String getPropertiesValue(String key)
+ {
+ String rtValue = serverProps.getProperty(key);
+
+ // server.properties中没有,则再向producer.properties中获取
+ if (null == rtValue)
+ {
+ rtValue = producerProps.getProperty(key);
+ }
+
+ // producer中没有,则再向consumer.properties中获取
+ if (null == rtValue)
+ {
+ rtValue = consumerProps.getProperty(key);
+ }
+
+ // consumer没有,则再向client.properties中获取
+ if (null == rtValue)
+ {
+ rtValue = clientProps.getProperty(key);
+ }
+
+ return rtValue;
+ }
+}
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/LoginUtil.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/LoginUtil.java
index 6ce7a18a..421c0c1d 100644
--- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/LoginUtil.java
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/LoginUtil.java
@@ -205,7 +205,7 @@ public class LoginUtil {
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
- String filePath = "/rsoft/config/";
+ String filePath = "/home/rsoft/config/";
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + keyTabFile;
@@ -225,8 +225,8 @@ public class LoginUtil {
*/
public static Boolean isSecurityModel() {
Boolean isSecurity = false;
- String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
-
+// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
+ String krbFilePath = "/home/rsoft/config/kafkaSecurityMode";
Properties securityProps = new Properties();
// file does not exist.
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java
index ebde63ff..10845aaa 100644
--- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java
@@ -13,6 +13,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dromara.data2es.api.domain.RemoteGpsInfo;
import org.dromara.data2kafka.domain.EsGpsInfo;
import org.dromara.data2kafka.domain.EsGpsInfoVO;
+import org.dromara.data2kafka.producer.NewProducer;
import org.dromara.data2kafka.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,7 +23,6 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.LinkedBlockingDeque;
/**
* description:
@@ -33,8 +33,8 @@ import java.util.concurrent.LinkedBlockingDeque;
public class ConsumerWorker implements Runnable {
private ConsumerRecord record;
- @Autowired
- private Producer producer;
+
+ private final Producer producer;
private Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
@@ -44,6 +44,7 @@ public class ConsumerWorker implements Runnable {
private String cityCode ;
ConsumerWorker(ConsumerRecord record, String cityCode) {
+ this.producer = Producer.getInstance();
this.record = record;
this.cityCode = cityCode;
}
@@ -116,7 +117,7 @@ public class ConsumerWorker implements Runnable {
Object value = record.value();
String topic = record.topic();
- logger.info("offset={},topic={},value={}", record.offset(), topic,value);
+// logger.info("offset={},topic={},value={}", record.offset(), topic,value);
RemoteGpsInfo esGpsInfo;
JSONObject jsonObject;
try {
@@ -157,8 +158,7 @@ public class ConsumerWorker implements Runnable {
}catch (Exception e){
logger.error("error_msg={}",e.getMessage());
}
- logger.info("esGpsInfo={}",esGpsInfo);
- producer.send(esGpsInfo,"jysb_dwxx");
+ producer.sendMessage("jysb_dwxx",JSONUtil.toJsonStr(esGpsInfo));
}
@@ -213,7 +213,7 @@ public class ConsumerWorker implements Runnable {
gpsInfo.setLng(esGpsInfoVO.getLongitude());
gpsInfo.setOrientation(esGpsInfoVO.getDirection());
gpsInfo.setInfoSource(cityCode);
- producer.send(gpsInfo,"jysb_dwxx");
+ producer.sendMessage("jysb_dwxx",JSONUtil.toJsonStr(gpsInfo));
}
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java
index a3e78759..0dc95102 100644
--- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java
@@ -11,7 +11,12 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
+import javax.annotation.Resource;
import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -36,7 +41,6 @@ public class RealConsumer implements CommandLineRunner {
private String cityCode = "3400";
-
@Autowired
ThreadPoolExecutor dtpExecutor2;
@@ -59,7 +63,7 @@ public class RealConsumer implements CommandLineRunner {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Map kafkaProp = getKafkaProp();
-
+ checkNetworkConnection("53.1.213.25",21007);
if (false)
{
try
@@ -89,6 +93,21 @@ public class RealConsumer implements CommandLineRunner {
}
+ private void checkNetworkConnection(String host, int port) {
+ try (Socket socket = new Socket()) {
+ socket.connect(new InetSocketAddress(host, port), 3000);
+ logger.info("✅ 网络连接正常: {}:{}", host, port);
+ } catch (IOException e) {
+ logger.error("🚨 无法连接到 {}:{} - {}", host, port, e.getMessage());
+ // 详细错误分析
+ if (e instanceof ConnectException) {
+ logger.error("请检查: 1. Kafka服务状态 2. 防火墙设置 3. 端口是否正确");
+ } else if (e instanceof UnknownHostException) {
+ logger.error("主机名解析失败,请检查DNS或hosts文件");
+ }
+ }
+ }
+
/**
* 获取kafka配置
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/NewProducer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/NewProducer.java
new file mode 100644
index 00000000..869cbfae
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/NewProducer.java
@@ -0,0 +1,66 @@
+package org.dromara.data2kafka.producer;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * description:
+ *
+ * @author chenle
+ * @date 2021-11-01 17:20
+ */
+//@Component
+public class NewProducer {
+
+ @Autowired
+ @Resource(name = "myKafkaProducer")
+ KafkaProducer kafkaProducer;
+
+ private Logger LOG = LoggerFactory.getLogger(NewProducer.class);
+
+
+ /**
+ * 生产者线程执行函数,循环发送消息。
+ */
+ public void send(Object obj,String topic) {
+ String obj2String = JSONObject.toJSONString(obj);
+
+ // 构造消息记录
+ ProducerRecord record = new ProducerRecord(topic, obj2String);
+ try {
+ // 同步发送
+ Object o = kafkaProducer.send(record).get();
+ LOG.info("同步发送成功: Object={}", JSONObject.toJSONString(o));
+ } catch (InterruptedException ie) {
+ ie.printStackTrace();
+ LOG.error("The InterruptedException occured : {}.", ie);
+ } catch (ExecutionException ee) {
+ ee.printStackTrace();
+ LOG.error("The ExecutionException occured : {}.", ee);
+ }
+
+ /*kafkaProducer.send(record, (recordMetadata, e) -> {
+ if (e != null) {
+ LOG.error("send--The Exception occured.", e);
+ }
+ if (recordMetadata != null)
+ {
+ LOG.info("sent to partition(" + recordMetadata.partition() + "), "
+ + "offset(" + recordMetadata.offset()+"),topic="+recordMetadata.topic());
+ }
+ });*/
+
+ }
+
+
+
+
+}
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/Producer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/Producer.java
index 1ca6debd..ab1a51f3 100644
--- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/Producer.java
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/Producer.java
@@ -1,54 +1,215 @@
package org.dromara.data2kafka.producer;
import com.alibaba.fastjson.JSONObject;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.dromara.data2kafka.config.KafkaProperties;
+import org.dromara.data2kafka.config.LoginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
-import javax.annotation.Resource;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
/**
* description:
*
* @author chenle
- * @date 2021-11-01 17:20
+ * @date 2021-11-03 14:15
*/
@Component
public class Producer {
- @Autowired
- @Resource(name = "myKafkaProducer")
- KafkaProducer kafkaProducer;
+ private static final Logger logger = LoggerFactory.getLogger(Producer.class);
+
+ private final KafkaProducer producer;
+
+ // 私有静态实例(volatile 保证可见性和有序性)
+ private static volatile Producer instance;
+
+
+
+ private final Boolean isAsync = true;
+
+
+
+ // Broker地址列表
+ private final static String BOOTSTRAP_SERVER = "bootstrap.servers";
+
+ // 客户端ID
+ private final static String CLIENT_ID = "client.id";
+
+ // Key序列化类
+ private final static String KEY_SERIALIZER = "key.serializer";
+
+ // Value序列化类
+ private final static String VALUE_SERIALIZER = "value.serializer";
+
+ // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
+ private final static String SECURITY_PROTOCOL = "security.protocol";
+
+ // 服务名
+ private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
+
+ // 域名
+ private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name";
+
+ // 分区类名
+ private final static String PARTITIONER_NAME = "partitioner.class";
+
+ // 默认发送100条消息
+ private final static int MESSAGE_NUM = 100;
+
+ /**
+ * 用户自己申请的机机账号keytab文件名称
+ */
+ private static final String USER_KEYTAB_FILE = "user.keytab";
+
+ /**
+ * 用户自己申请的机机账号名称
+ */
+ private static final String USER_PRINCIPAL = "yhy_ahrs_rcw";
- private Logger LOG = LoggerFactory.getLogger(Producer.class);
/**
- * 生产者线程执行函数,循环发送消息。
+ * Producer constructor
+ *
*/
- public void send(Object obj,String topic) {
- String obj2String = JSONObject.toJSONString(obj);
+ public Producer() {
+ initSecurity();
+ Properties props = initProperties();
+ this.producer = new KafkaProducer<>(props);
+ }
- // 构造消息记录
- ProducerRecord record = new ProducerRecord(topic, obj2String);
-
- kafkaProducer.send(record, (recordMetadata, e) -> {
- if (e != null) {
- LOG.error("send--The Exception occured.", e);
+ // 获取单例实例的公共方法(双重校验锁)
+ public static Producer getInstance() {
+ if (instance == null) {
+ synchronized (Producer.class) {
+ if (instance == null) {
+ instance = new Producer();
+ }
}
- if (recordMetadata != null)
- {
- LOG.info("sent to partition(" + recordMetadata.partition() + "), "
- + "offset(" + recordMetadata.offset()+"),topic="+recordMetadata.topic());
- }
- });
+ }
+ return instance;
+ }
+ // 添加 ShutdownHook 确保资源释放(推荐)
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (instance != null && instance.producer != null) {
+ instance.producer.close();
+ }
+ }));
+ }
+
+
+ /**
+ * 初始化安全认证
+ */
+ public void initSecurity() {
+ if (LoginUtil.isSecurityModel())
+ {
+ try {
+ logger.info("Securitymode start.");
+
+ // !!注意,安全认证时,需要用户手动修改为自己申请的机机账号
+ LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
+ } catch (IOException e) {
+ logger.error("Security prepare failure.");
+ logger.error("The IOException occured.", e);
+ }
+ logger.info("Security prepare success.");
+ }
+ }
+
+ public static Properties initProperties() {
+ Properties props = new Properties();
+ KafkaProperties kafkaProc = KafkaProperties.getInstance();
+
+ // Broker地址列表
+ props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
+ // 客户端ID
+ props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
+ // Key序列化类
+ props.put(KEY_SERIALIZER,
+ kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
+ // Value序列化类
+ props.put(VALUE_SERIALIZER,
+ kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
+ // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
+ props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
+ // 服务名
+ props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
+ // 域名
+ props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
+ // 分区类名
+// props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner"));
+ return props;
+ }
+
+ /**
+ * 发送消息(核心方法)
+ *
+ * @param topic
+ * @param message 消息内容
+ * @return 同步发送时返回 RecordMetadata,异步发送返回 null
+ */
+ public RecordMetadata sendMessage(String topic, String message) {
+ try {
+ logger.info("调用发送:topic={}, Object={}",topic,message );
+ long startTime = System.currentTimeMillis();
+ ProducerRecord record = new ProducerRecord<>(topic, message);
+ if (isAsync) {
+ // 异步发送
+ producer.send(record, new DemoCallBack(startTime,topic, message));
+ return null;
+ } else {
+ Future future = producer.send(record);
+ logger.info("同步发送成功: Object={}", future.get().topic());
+ return future.get();
+
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ return null;
}
+ /**
+ * 内部回调类
+ */
+ private static class DemoCallBack implements Callback {
+ private final Logger logger = LoggerFactory.getLogger(DemoCallBack.class);
+ private final long startTime;
+
+ private final String topic;
+ private final String message;
+
+ public DemoCallBack(long startTime, String topic, String message) {
+ this.startTime = startTime;
+ this.topic = topic;
+ this.message = message;
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (metadata != null) {
+ logger.info("topic=({}, {}) sent to partition({}), offset({}) in {} ms",
+ topic, message, metadata.partition(), metadata.offset(), elapsedTime);
+ } else if (exception != null) {
+ logger.error("Message sending failed", exception);
+ }
+ }
+ }
}
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/client.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/client.properties
new file mode 100644
index 00000000..4fb412f2
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/client.properties
@@ -0,0 +1,6 @@
+security.protocol = SASL_PLAINTEXT
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+kafka.client.zookeeper.principal = zookeeper/hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+zookeeper.ssl.enable = false
+sasl.kerberos.service.name = kafka
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/client.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/client.properties
new file mode 100644
index 00000000..4fb412f2
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/client.properties
@@ -0,0 +1,6 @@
+security.protocol = SASL_PLAINTEXT
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+kafka.client.zookeeper.principal = zookeeper/hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+zookeeper.ssl.enable = false
+sasl.kerberos.service.name = kafka
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/connect-distributed.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/connect-distributed.properties
new file mode 100644
index 00000000..66ba83d9
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/connect-distributed.properties
@@ -0,0 +1,21 @@
+config.storage.topic = connect-configs
+group.id = connect-cluster
+status.storage.topic = connect-status
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+internal.key.converter.schemas.enable = false
+sasl.kerberos.service.name = kafka
+rest.port = 21010
+config.storage.replication.factor = 3
+offset.flush.interval.ms = 10000
+security.protocol = SASL_PLAINTEXT
+key.converter.schemas.enable = false
+internal.key.converter = org.apache.kafka.connect.storage.StringConverter
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+status.storage.replication.factor = 3
+internal.value.converter.schemas.enable = false
+value.converter.schemas.enable = false
+internal.value.converter = org.apache.kafka.connect.storage.StringConverter
+offset.storage.replication.factor = 3
+offset.storage.topic = connect-offsets
+value.converter = org.apache.kafka.connect.storage.StringConverter
+key.converter = org.apache.kafka.connect.storage.StringConverter
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/connect-standalone.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/connect-standalone.properties
new file mode 100644
index 00000000..c2bbd1bf
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/connect-standalone.properties
@@ -0,0 +1,20 @@
+consumer.sasl.kerberos.service.name = kafka
+producer.security.protocol = SASL_PLAINTEXT
+standalone1.key.converter.schemas.enable = false
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+internal.key.converter.schemas.enable = false
+sasl.kerberos.service.name = kafka
+offset.flush.interval.ms = 10000
+security.protocol = SASL_PLAINTEXT
+internal.key.converter = org.apache.kafka.connect.storage.StringConverter
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+offset.storage.file.filename = /tmp/connect.offsets
+producer.kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+internal.value.converter.schemas.enable = false
+internal.value.converter = org.apache.kafka.connect.storage.StringConverter
+value.converter.schemas.enable = false
+consumer.security.protocol = SASL_PLAINTEXT
+value.converter = org.apache.kafka.connect.storage.StringConverter
+key.converter = org.apache.kafka.connect.storage.StringConverter
+producer.sasl.kerberos.service.name = kafka
+consumer.kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/consumer.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/consumer.properties
new file mode 100644
index 00000000..752fcf76
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/consumer.properties
@@ -0,0 +1,5 @@
+security.protocol = SASL_PLAINTEXT
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+group.id = example-group1
+auto.commit.interval.ms = 60000
+sasl.kerberos.service.name = kafka
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/ip.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/ip.properties
new file mode 100644
index 00000000..66450ad5
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/ip.properties
@@ -0,0 +1 @@
+cluster.ip.model = IPV4
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/kafkaSecurityMode b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/kafkaSecurityMode
new file mode 100644
index 00000000..ed59a5e4
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/kafkaSecurityMode
@@ -0,0 +1 @@
+kafka.client.security.mode = yes
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/krb5.conf b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/krb5.conf
new file mode 100644
index 00000000..3382d176
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/krb5.conf
@@ -0,0 +1,49 @@
+[kdcdefaults]
+kdc_ports = 53.1.213.23:21732
+kdc_tcp_ports = 53.1.213.23:21732
+
+[libdefaults]
+default_realm = A528C942_01A6_1BEF_7A75_0187DC82C40F.COM
+kdc_timeout = 2500
+clockskew = 300
+use_dns_lookup = 0
+udp_preference_limit = 1465
+max_retries = 5
+dns_lookup_kdc = false
+dns_lookup_realm = false
+renewable = false
+forwardable = false
+renew_lifetime = 0m
+max_renewable_life = 30m
+allow_extend_version = false
+default_ccache_name = FILE:/tmp//krb5cc_%{uid}
+
+[realms]
+A528C942_01A6_1BEF_7A75_0187DC82C40F.COM = {
+kdc = 53.1.213.23:21732
+kdc = 53.1.213.22:21732
+admin_server = 53.1.213.22:21730
+admin_server = 53.1.213.23:21730
+kpasswd_server = 53.1.213.22:21731
+kpasswd_server = 53.1.213.23:21731
+supported_enctypes = aes256-cts-hmac-sha1-96:special aes128-cts-hmac-sha1-96:special
+kpasswd_port = 21731
+kadmind_port = 21730
+kadmind_listen = 53.1.213.23:21730
+kpasswd_listen = 53.1.213.23:21731
+renewable = false
+forwardable = false
+renew_lifetime = 0m
+max_renewable_life = 30m
+acl_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.5.0/install/FusionInsight-kerberos-1.20/kerberos/var/krb5kdc/kadm5.acl
+dict_file = /opt/huawei/Bigdata/common/runtime0/security/weakPasswdDic/weakPasswdForKdc.ini
+key_stash_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.5.0/install/FusionInsight-kerberos-1.20/kerberos/var/krb5kdc/.k5.A528C942_01A6_1BEF_7A75_0187DC82C40F.COM
+}
+
+[domain_realm]
+.a528c942_01a6_1bef_7a75_0187dc82c40f.com = A528C942_01A6_1BEF_7A75_0187DC82C40F.COM
+
+[logging]
+kdc = SYSLOG:INFO:DAEMON
+admin_server = SYSLOG:INFO:DAEMON
+default = SYSLOG:NOTICE:DAEMON
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/producer.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/producer.properties
new file mode 100644
index 00000000..78309a1d
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/producer.properties
@@ -0,0 +1,5 @@
+security.protocol = SASL_PLAINTEXT
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+acks = 1
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+sasl.kerberos.service.name = kafka
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/server.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/server.properties
new file mode 100644
index 00000000..0ff90593
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/server.properties
@@ -0,0 +1,192 @@
+log.cleaner.min.compaction.lag.ms = 0
+quota.producer.default = 9223372036854775807
+metric.reporters = com.huawei.bigdata.kafka.kafkabalancer.reporter.plugin.CoreMetricReporter
+offsets.topic.num.partitions = 50
+log.flush.interval.messages = 9223372036854775807
+controller.socket.timeout.ms = 30000
+auto.create.topics.enable = true
+log.flush.interval.ms = 9223372036854775807
+actual.broker.id.ip.map =
+listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class = com.huawei.kafka.plain.PlainCallBackHandler
+replica.socket.receive.buffer.bytes = 65536
+min.insync.replicas = 1
+ssl.enable = false
+replica.fetch.wait.max.ms = 500
+num.recovery.threads.per.data.dir = 10
+ssl.keystore.type = JKS
+super.users = User:kafka
+sasl.mechanism.inter.broker.protocol = GSSAPI
+default.replication.factor = 2
+log.preallocate = false
+sasl.kerberos.principal.to.local.rules = RULE:[2:$1@$0](.*@.*)s/@.*//,RULE:[1:$1@$0](.*@*.COM)s/@.*//,DEFAULT
+metrics.reporter.topic.replicas = 3
+actual.broker.id.port.map =
+fetch.purgatory.purge.interval.requests = 1000
+replica.socket.timeout.ms = 30000
+message.max.bytes = 100001200
+max.connections.per.user = 2147483647
+transactional.id.expiration.ms = 604800000
+control.plane.listener.name = TRACE
+transaction.state.log.replication.factor = 3
+num.io.threads = 8
+monitor.zk.ssl.connect = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002
+offsets.commit.required.acks = -1
+log.flush.offset.checkpoint.interval.ms = 60000
+quota.window.size.seconds = 1
+delete.topic.enable = true
+ssl.truststore.type = JKS
+offsets.commit.timeout.ms = 5000
+quota.window.num = 11
+log.partition.strategy = count
+zookeeper.connect = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002/kafka
+authorizer.class.name = org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer
+auto.reassign.check.interval.ms = 600000
+user.group.cache.timeout.sec = 300
+auto.reassign.enable = true
+num.replica.fetchers = 1
+alter.log.dirs.replication.quota.window.size.seconds = 1
+allow.everyone.if.no.acl.found = false
+ip.mode = IPV4
+alter.log.dirs.replication.quota.window.num = 11
+log.roll.jitter.hours = 0
+tmp.zookeeper.connect = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002
+log.cleaner.enable = true
+offsets.load.buffer.size = 5242880
+log.cleaner.delete.retention.ms = 86400000
+ssl.client.auth = none
+controlled.shutdown.max.retries = 3
+queued.max.requests = 500
+metrics.reporter.max.request.size = 104857600
+offsets.topic.replication.factor = 3
+log.cleaner.threads = 1
+transaction.state.log.min.isr = 2
+sasl.kerberos.service.name = kafka
+sasl.kerberos.ticket.renew.jitter = 0.05
+socket.request.max.bytes = 104857600
+zookeeper.session.timeout.ms = 45000
+log.retention.bytes = -1
+log.message.timestamp.type = CreateTime
+request.total.time.ms.threshold = 30000
+sasl.kerberos.min.time.before.relogin = 60000
+zookeeper.set.acl = true
+connections.max.idle.ms = 600000
+offsets.retention.minutes = 10080
+delegation.token.expiry.time.ms = 86400000
+max.connections = 2147483647
+is.security.mode = yes
+transaction.state.log.num.partitions = 50
+inter.broker.protocol.version = 3.6-IV1
+replica.fetch.backoff.ms = 1000
+kafka.metrics.reporters = com.huawei.kafka.PartitionStatusReporter
+listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,TRACE:SASL_PLAINTEXT
+log.retention.hours = 168
+num.partitions = 2
+listeners = SASL_PLAINTEXT://53.1.213.25:21007,PLAINTEXT://53.1.213.25:21005,SSL://53.1.213.25:21008,SASL_SSL://53.1.213.25:21009,TRACE://53.1.213.25:21013
+ssl.enabled.protocols = TLSv1.2
+delete.records.purgatory.purge.interval.requests = 1
+monitor.zk.normal.connect = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002
+ssl.cipher.suites = TLS_DHE_DSS_WITH_AES_128_GCM_SHA256,TLS_DHE_DSS_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
+log.flush.scheduler.interval.ms = 9223372036854775807
+sasl.port = 21007
+ssl.mode.enable = true
+security.protocol = SASL_PLAINTEXT
+log.index.size.max.bytes = 10485760
+rack.aware.enable = false
+security.inter.broker.protocol = SASL_PLAINTEXT
+replica.fetch.max.bytes = 104857600
+log.cleaner.dedupe.buffer.size = 134217728
+replica.high.watermark.checkpoint.interval.ms = 5000
+replication.quota.window.size.seconds = 1
+log.cleaner.io.buffer.size = 524288
+sasl.kerberos.ticket.renew.window.factor = 0.8
+metrics.reporter.zookeeper.url = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002/kafka
+max.connections.per.user.enable = true
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+metrics.reporter.sasl.kerberos.service.name = kafka
+zookeeper.connection.timeout.ms = 45000
+metrics.recording.level = INFO
+metrics.reporter.bootstrap.servers = 53.1.213.27:21009,53.1.213.26:21009,53.1.213.25:21009
+controlled.shutdown.retry.backoff.ms = 5000
+sasl-ssl.port = 21009
+advertised.broker.id.port.map =
+listener.name.sasl_ssl.plain.sasl.server.callback.handler.class = com.huawei.kafka.plain.PlainCallBackHandler
+log.roll.hours = 168
+log.cleanup.policy = delete
+log.flush.start.offset.checkpoint.interval.ms = 60000
+host.name = 53.1.213.25
+max.connections.per.user.overrides =
+max.connections.per.user.whitelist = kafka,default#principal
+transaction.state.log.segment.bytes = 104857600
+max.connections.per.ip = 2147483647
+offsets.topic.segment.bytes = 104857600
+background.threads = 10
+quota.consumer.default = 9223372036854775807
+request.timeout.ms = 30000
+log.message.format.version = 3.6-IV1
+group.initial.rebalance.delay.ms = 3000
+log.index.interval.bytes = 4096
+log.segment.bytes = 1073741824
+log.cleaner.backoff.ms = 15000
+kafka.zookeeper.root = /kafka
+offset.metadata.max.bytes = 4096
+ssl.truststore.location = #{conf_dir}/truststore.jks
+group.max.session.timeout.ms = 1800000
+replica.fetch.response.max.bytes = 104857600
+port = 21005
+zookeeper.sync.time.ms = 2000
+log.segment.delete.delay.ms = 60000
+ssl.port = 21008
+fetch.max.bytes = 115343360
+user.group.query.retry.backoff.ms = 300
+log.dirs = /srv/BigData/kafka/data1/kafka-logs,/srv/BigData/kafka/data2/kafka-logs,/srv/BigData/kafka/data3/kafka-logs,/srv/BigData/kafka/data4/kafka-logs
+monitor.keytab = /opt/huawei/Bigdata/om-agent/nodeagent/etc/agent/omm.keytab
+controlled.shutdown.enable = true
+az.aware.enable = false
+compression.type = producer
+max.connections.per.ip.overrides =
+log.message.timestamp.difference.max.ms = 9223372036854775807
+metrics.reporter.kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+kafka.metrics.polling.interval.secs = 60
+advertised.listeners.protocol = SASL_SSL
+sasl.kerberos.kinit.cmd = /opt/huawei/Bigdata/FusionInsight_BASE_8.5.0/install/FusionInsight-kerberos-1.20/kerberos/bin/kinit
+transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
+log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
+auto.leader.rebalance.enable = true
+leader.imbalance.check.interval.seconds = 3600
+log.cleaner.min.cleanable.ratio = 0.5
+user.group.query.retry = 10
+replica.lag.time.max.ms = 60000
+max.incremental.fetch.session.cache.slots = 1000
+delegation.token.master.key = null
+num.network.threads = 6
+reserved.broker.max.id = 65535
+listener.name.external_sasl_plaintext.plain.sasl.server.callback.handler.class = com.huawei.kafka.plain.PlainCallBackHandler
+monitor.principal = oms/manager@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM
+transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
+socket.send.buffer.bytes = 1024000
+log.message.downconversion.enable = true
+advertised.broker.id.ip.map =
+metrics.reporter.security.protocol = SASL_SSL
+transaction.state.log.load.buffer.size = 5242880
+socket.receive.buffer.bytes = 1024000
+ssl.keystore.location = #{conf_dir}/kafka_broker.jks
+replica.fetch.min.bytes = 1
+broker.rack = /default/rack0
+controller.port = 21013
+unclean.leader.election.enable = false
+sasl.enabled.mechanisms = GSSAPI,PLAIN
+group.min.session.timeout.ms = 6000
+offsets.retention.check.interval.ms = 600000
+log.cleaner.io.buffer.load.factor = 0.9
+transaction.max.timeout.ms = 900000
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+producer.purgatory.purge.interval.requests = 1000
+group.max.size = 2147483647
+broker.id = 1
+offsets.topic.compression.codec = 0
+delegation.token.max.lifetime.ms = 604800000
+replication.quota.window.num = 11
+enable.advertised.listener = false
+log.retention.check.interval.ms = 300000
+leader.imbalance.per.broker.percentage = 10
+queued.max.request.bytes = -1
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/user.keytab b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/user.keytab
new file mode 100644
index 00000000..d2beed6b
Binary files /dev/null and b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/config/user.keytab differ
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/connect-distributed.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/connect-distributed.properties
new file mode 100644
index 00000000..66ba83d9
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/connect-distributed.properties
@@ -0,0 +1,21 @@
+config.storage.topic = connect-configs
+group.id = connect-cluster
+status.storage.topic = connect-status
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+internal.key.converter.schemas.enable = false
+sasl.kerberos.service.name = kafka
+rest.port = 21010
+config.storage.replication.factor = 3
+offset.flush.interval.ms = 10000
+security.protocol = SASL_PLAINTEXT
+key.converter.schemas.enable = false
+internal.key.converter = org.apache.kafka.connect.storage.StringConverter
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+status.storage.replication.factor = 3
+internal.value.converter.schemas.enable = false
+value.converter.schemas.enable = false
+internal.value.converter = org.apache.kafka.connect.storage.StringConverter
+offset.storage.replication.factor = 3
+offset.storage.topic = connect-offsets
+value.converter = org.apache.kafka.connect.storage.StringConverter
+key.converter = org.apache.kafka.connect.storage.StringConverter
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/connect-standalone.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/connect-standalone.properties
new file mode 100644
index 00000000..c2bbd1bf
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/connect-standalone.properties
@@ -0,0 +1,20 @@
+consumer.sasl.kerberos.service.name = kafka
+producer.security.protocol = SASL_PLAINTEXT
+standalone1.key.converter.schemas.enable = false
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+internal.key.converter.schemas.enable = false
+sasl.kerberos.service.name = kafka
+offset.flush.interval.ms = 10000
+security.protocol = SASL_PLAINTEXT
+internal.key.converter = org.apache.kafka.connect.storage.StringConverter
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+offset.storage.file.filename = /tmp/connect.offsets
+producer.kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+internal.value.converter.schemas.enable = false
+internal.value.converter = org.apache.kafka.connect.storage.StringConverter
+value.converter.schemas.enable = false
+consumer.security.protocol = SASL_PLAINTEXT
+value.converter = org.apache.kafka.connect.storage.StringConverter
+key.converter = org.apache.kafka.connect.storage.StringConverter
+producer.sasl.kerberos.service.name = kafka
+consumer.kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/consumer.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/consumer.properties
new file mode 100644
index 00000000..752fcf76
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/consumer.properties
@@ -0,0 +1,5 @@
+security.protocol = SASL_PLAINTEXT
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+group.id = example-group1
+auto.commit.interval.ms = 60000
+sasl.kerberos.service.name = kafka
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/ip.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/ip.properties
new file mode 100644
index 00000000..66450ad5
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/ip.properties
@@ -0,0 +1 @@
+cluster.ip.model = IPV4
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/kafkaSecurityMode b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/kafkaSecurityMode
new file mode 100644
index 00000000..ed59a5e4
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/kafkaSecurityMode
@@ -0,0 +1 @@
+kafka.client.security.mode = yes
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/krb5.conf b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/krb5.conf
new file mode 100644
index 00000000..3382d176
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/krb5.conf
@@ -0,0 +1,49 @@
+[kdcdefaults]
+kdc_ports = 53.1.213.23:21732
+kdc_tcp_ports = 53.1.213.23:21732
+
+[libdefaults]
+default_realm = A528C942_01A6_1BEF_7A75_0187DC82C40F.COM
+kdc_timeout = 2500
+clockskew = 300
+use_dns_lookup = 0
+udp_preference_limit = 1465
+max_retries = 5
+dns_lookup_kdc = false
+dns_lookup_realm = false
+renewable = false
+forwardable = false
+renew_lifetime = 0m
+max_renewable_life = 30m
+allow_extend_version = false
+default_ccache_name = FILE:/tmp//krb5cc_%{uid}
+
+[realms]
+A528C942_01A6_1BEF_7A75_0187DC82C40F.COM = {
+kdc = 53.1.213.23:21732
+kdc = 53.1.213.22:21732
+admin_server = 53.1.213.22:21730
+admin_server = 53.1.213.23:21730
+kpasswd_server = 53.1.213.22:21731
+kpasswd_server = 53.1.213.23:21731
+supported_enctypes = aes256-cts-hmac-sha1-96:special aes128-cts-hmac-sha1-96:special
+kpasswd_port = 21731
+kadmind_port = 21730
+kadmind_listen = 53.1.213.23:21730
+kpasswd_listen = 53.1.213.23:21731
+renewable = false
+forwardable = false
+renew_lifetime = 0m
+max_renewable_life = 30m
+acl_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.5.0/install/FusionInsight-kerberos-1.20/kerberos/var/krb5kdc/kadm5.acl
+dict_file = /opt/huawei/Bigdata/common/runtime0/security/weakPasswdDic/weakPasswdForKdc.ini
+key_stash_file = /opt/huawei/Bigdata/FusionInsight_BASE_8.5.0/install/FusionInsight-kerberos-1.20/kerberos/var/krb5kdc/.k5.A528C942_01A6_1BEF_7A75_0187DC82C40F.COM
+}
+
+[domain_realm]
+.a528c942_01a6_1bef_7a75_0187dc82c40f.com = A528C942_01A6_1BEF_7A75_0187DC82C40F.COM
+
+[logging]
+kdc = SYSLOG:INFO:DAEMON
+admin_server = SYSLOG:INFO:DAEMON
+default = SYSLOG:NOTICE:DAEMON
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/producer.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/producer.properties
new file mode 100644
index 00000000..78309a1d
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/producer.properties
@@ -0,0 +1,5 @@
+security.protocol = SASL_PLAINTEXT
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+acks = 1
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+sasl.kerberos.service.name = kafka
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/server.properties b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/server.properties
new file mode 100644
index 00000000..0ff90593
--- /dev/null
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/server.properties
@@ -0,0 +1,192 @@
+log.cleaner.min.compaction.lag.ms = 0
+quota.producer.default = 9223372036854775807
+metric.reporters = com.huawei.bigdata.kafka.kafkabalancer.reporter.plugin.CoreMetricReporter
+offsets.topic.num.partitions = 50
+log.flush.interval.messages = 9223372036854775807
+controller.socket.timeout.ms = 30000
+auto.create.topics.enable = true
+log.flush.interval.ms = 9223372036854775807
+actual.broker.id.ip.map =
+listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class = com.huawei.kafka.plain.PlainCallBackHandler
+replica.socket.receive.buffer.bytes = 65536
+min.insync.replicas = 1
+ssl.enable = false
+replica.fetch.wait.max.ms = 500
+num.recovery.threads.per.data.dir = 10
+ssl.keystore.type = JKS
+super.users = User:kafka
+sasl.mechanism.inter.broker.protocol = GSSAPI
+default.replication.factor = 2
+log.preallocate = false
+sasl.kerberos.principal.to.local.rules = RULE:[2:$1@$0](.*@.*)s/@.*//,RULE:[1:$1@$0](.*@*.COM)s/@.*//,DEFAULT
+metrics.reporter.topic.replicas = 3
+actual.broker.id.port.map =
+fetch.purgatory.purge.interval.requests = 1000
+replica.socket.timeout.ms = 30000
+message.max.bytes = 100001200
+max.connections.per.user = 2147483647
+transactional.id.expiration.ms = 604800000
+control.plane.listener.name = TRACE
+transaction.state.log.replication.factor = 3
+num.io.threads = 8
+monitor.zk.ssl.connect = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002
+offsets.commit.required.acks = -1
+log.flush.offset.checkpoint.interval.ms = 60000
+quota.window.size.seconds = 1
+delete.topic.enable = true
+ssl.truststore.type = JKS
+offsets.commit.timeout.ms = 5000
+quota.window.num = 11
+log.partition.strategy = count
+zookeeper.connect = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002/kafka
+authorizer.class.name = org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer
+auto.reassign.check.interval.ms = 600000
+user.group.cache.timeout.sec = 300
+auto.reassign.enable = true
+num.replica.fetchers = 1
+alter.log.dirs.replication.quota.window.size.seconds = 1
+allow.everyone.if.no.acl.found = false
+ip.mode = IPV4
+alter.log.dirs.replication.quota.window.num = 11
+log.roll.jitter.hours = 0
+tmp.zookeeper.connect = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002
+log.cleaner.enable = true
+offsets.load.buffer.size = 5242880
+log.cleaner.delete.retention.ms = 86400000
+ssl.client.auth = none
+controlled.shutdown.max.retries = 3
+queued.max.requests = 500
+metrics.reporter.max.request.size = 104857600
+offsets.topic.replication.factor = 3
+log.cleaner.threads = 1
+transaction.state.log.min.isr = 2
+sasl.kerberos.service.name = kafka
+sasl.kerberos.ticket.renew.jitter = 0.05
+socket.request.max.bytes = 104857600
+zookeeper.session.timeout.ms = 45000
+log.retention.bytes = -1
+log.message.timestamp.type = CreateTime
+request.total.time.ms.threshold = 30000
+sasl.kerberos.min.time.before.relogin = 60000
+zookeeper.set.acl = true
+connections.max.idle.ms = 600000
+offsets.retention.minutes = 10080
+delegation.token.expiry.time.ms = 86400000
+max.connections = 2147483647
+is.security.mode = yes
+transaction.state.log.num.partitions = 50
+inter.broker.protocol.version = 3.6-IV1
+replica.fetch.backoff.ms = 1000
+kafka.metrics.reporters = com.huawei.kafka.PartitionStatusReporter
+listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,TRACE:SASL_PLAINTEXT
+log.retention.hours = 168
+num.partitions = 2
+listeners = SASL_PLAINTEXT://53.1.213.25:21007,PLAINTEXT://53.1.213.25:21005,SSL://53.1.213.25:21008,SASL_SSL://53.1.213.25:21009,TRACE://53.1.213.25:21013
+ssl.enabled.protocols = TLSv1.2
+delete.records.purgatory.purge.interval.requests = 1
+monitor.zk.normal.connect = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002
+ssl.cipher.suites = TLS_DHE_DSS_WITH_AES_128_GCM_SHA256,TLS_DHE_DSS_WITH_AES_256_GCM_SHA384,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
+log.flush.scheduler.interval.ms = 9223372036854775807
+sasl.port = 21007
+ssl.mode.enable = true
+security.protocol = SASL_PLAINTEXT
+log.index.size.max.bytes = 10485760
+rack.aware.enable = false
+security.inter.broker.protocol = SASL_PLAINTEXT
+replica.fetch.max.bytes = 104857600
+log.cleaner.dedupe.buffer.size = 134217728
+replica.high.watermark.checkpoint.interval.ms = 5000
+replication.quota.window.size.seconds = 1
+log.cleaner.io.buffer.size = 524288
+sasl.kerberos.ticket.renew.window.factor = 0.8
+metrics.reporter.zookeeper.url = 53.1.213.24:24002,53.1.213.23:24002,53.1.213.22:24002/kafka
+max.connections.per.user.enable = true
+bootstrap.servers = 53.1.213.27:21007,53.1.213.26:21007,53.1.213.25:21007
+metrics.reporter.sasl.kerberos.service.name = kafka
+zookeeper.connection.timeout.ms = 45000
+metrics.recording.level = INFO
+metrics.reporter.bootstrap.servers = 53.1.213.27:21009,53.1.213.26:21009,53.1.213.25:21009
+controlled.shutdown.retry.backoff.ms = 5000
+sasl-ssl.port = 21009
+advertised.broker.id.port.map =
+listener.name.sasl_ssl.plain.sasl.server.callback.handler.class = com.huawei.kafka.plain.PlainCallBackHandler
+log.roll.hours = 168
+log.cleanup.policy = delete
+log.flush.start.offset.checkpoint.interval.ms = 60000
+host.name = 53.1.213.25
+max.connections.per.user.overrides =
+max.connections.per.user.whitelist = kafka,default#principal
+transaction.state.log.segment.bytes = 104857600
+max.connections.per.ip = 2147483647
+offsets.topic.segment.bytes = 104857600
+background.threads = 10
+quota.consumer.default = 9223372036854775807
+request.timeout.ms = 30000
+log.message.format.version = 3.6-IV1
+group.initial.rebalance.delay.ms = 3000
+log.index.interval.bytes = 4096
+log.segment.bytes = 1073741824
+log.cleaner.backoff.ms = 15000
+kafka.zookeeper.root = /kafka
+offset.metadata.max.bytes = 4096
+ssl.truststore.location = #{conf_dir}/truststore.jks
+group.max.session.timeout.ms = 1800000
+replica.fetch.response.max.bytes = 104857600
+port = 21005
+zookeeper.sync.time.ms = 2000
+log.segment.delete.delay.ms = 60000
+ssl.port = 21008
+fetch.max.bytes = 115343360
+user.group.query.retry.backoff.ms = 300
+log.dirs = /srv/BigData/kafka/data1/kafka-logs,/srv/BigData/kafka/data2/kafka-logs,/srv/BigData/kafka/data3/kafka-logs,/srv/BigData/kafka/data4/kafka-logs
+monitor.keytab = /opt/huawei/Bigdata/om-agent/nodeagent/etc/agent/omm.keytab
+controlled.shutdown.enable = true
+az.aware.enable = false
+compression.type = producer
+max.connections.per.ip.overrides =
+log.message.timestamp.difference.max.ms = 9223372036854775807
+metrics.reporter.kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+kafka.metrics.polling.interval.secs = 60
+advertised.listeners.protocol = SASL_SSL
+sasl.kerberos.kinit.cmd = /opt/huawei/Bigdata/FusionInsight_BASE_8.5.0/install/FusionInsight-kerberos-1.20/kerberos/bin/kinit
+transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
+log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
+auto.leader.rebalance.enable = true
+leader.imbalance.check.interval.seconds = 3600
+log.cleaner.min.cleanable.ratio = 0.5
+user.group.query.retry = 10
+replica.lag.time.max.ms = 60000
+max.incremental.fetch.session.cache.slots = 1000
+delegation.token.master.key = null
+num.network.threads = 6
+reserved.broker.max.id = 65535
+listener.name.external_sasl_plaintext.plain.sasl.server.callback.handler.class = com.huawei.kafka.plain.PlainCallBackHandler
+monitor.principal = oms/manager@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM
+transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
+socket.send.buffer.bytes = 1024000
+log.message.downconversion.enable = true
+advertised.broker.id.ip.map =
+metrics.reporter.security.protocol = SASL_SSL
+transaction.state.log.load.buffer.size = 5242880
+socket.receive.buffer.bytes = 1024000
+ssl.keystore.location = #{conf_dir}/kafka_broker.jks
+replica.fetch.min.bytes = 1
+broker.rack = /default/rack0
+controller.port = 21013
+unclean.leader.election.enable = false
+sasl.enabled.mechanisms = GSSAPI,PLAIN
+group.min.session.timeout.ms = 6000
+offsets.retention.check.interval.ms = 600000
+log.cleaner.io.buffer.load.factor = 0.9
+transaction.max.timeout.ms = 900000
+kerberos.domain.name = hadoop.a528c942_01a6_1bef_7a75_0187dc82c40f.com
+producer.purgatory.purge.interval.requests = 1000
+group.max.size = 2147483647
+broker.id = 1
+offsets.topic.compression.codec = 0
+delegation.token.max.lifetime.ms = 604800000
+replication.quota.window.num = 11
+enable.advertised.listener = false
+log.retention.check.interval.ms = 300000
+leader.imbalance.per.broker.percentage = 10
+queued.max.request.bytes = -1
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/user.keytab b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/user.keytab
new file mode 100644
index 00000000..d2beed6b
Binary files /dev/null and b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/user.keytab differ
diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java
index 5f529e51..331a90de 100644
--- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java
+++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java
@@ -2,6 +2,7 @@ package org.dromara.data2es.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.config.SslConfigs;
import org.dromara.data2es.producer.NewProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,7 +24,7 @@ public class KafkaConfig {
private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
- private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka
+ private String kafkaServers = "53.1.212.25:21009,53.1.212.26:21009,53.1.212.27:21009"; //省厅 kafka
// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网
// private String kafkaServers = "34.72.62.93:9092";//六安视频网
// private String kafkaServers = "127.0.0.1:9092";//本地
@@ -69,6 +70,10 @@ public class KafkaConfig {
*/
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
+ private static final String USER_NAME = "yhy_ahrs_rcw";
+
+ private static final String PASS_WORD = "Ycgis@2509";
+
/**
* 新Producer 构造函数
* @param
@@ -85,15 +90,23 @@ public class KafkaConfig {
{
logger.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
- LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
- props.put(securityProtocol, "SASL_PLAINTEXT");
-// props.put("sasl.mechanism", "GSSAPI");
- // 服务名
- props.put(saslKerberosServiceName, "kafka");
- // 域名
- props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM");
+// LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
+ props.put(securityProtocol, "SASL_SSL");
+ props.put("sasl.mechanism", "PLAIN"); // 使用 PLAIN 机制
+
+ // SSL 配置 - 使用系统默认信任库
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/home/kafka.truststore.jks");
+ props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Ycgis@2509");
+ props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, "JKS");
+
+ // PLAIN 机制的 JAAS 配置
+ String jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ + "username=\"" + USER_NAME + "\" "
+ + "password=\"" + PASS_WORD + "\";";
+
+ props.put("sasl.jaas.config", jaasConfig);
}
- catch (IOException e)
+ catch (Exception e)
{
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);