diff --git a/stwzhj-modules/pom.xml b/stwzhj-modules/pom.xml
index 21b478f5..7a0d1ffb 100644
--- a/stwzhj-modules/pom.xml
+++ b/stwzhj-modules/pom.xml
@@ -19,6 +19,8 @@
stwzhj-data2StKafka
stwzhj-extract
stwzhj-data2gas
+ stwzhj-kafka-consumer
+ stwzhj-kafka-producer
stwzhj-modules
diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java
index ccaeca55..5818d1be 100644
--- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java
+++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java
@@ -96,12 +96,12 @@ public class RealConsumer implements CommandLineRunner {
}
logger.info("Security prepare success.");
}
-// kafkaProp.put("socket.connection.setup.timeout.ms", "60000");
-//
-// kafkaProp.put("security.protocol", "SASL_PLAINTEXT");
-// kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";");
-// kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");
-// kafkaProp.put("metadata.max.age.ms", Long.MAX_VALUE); // 彻底禁用元数据更新
+ kafkaProp.put("socket.connection.setup.timeout.ms", "60000");
+
+ kafkaProp.put("security.protocol", "SASL_PLAINTEXT");
+ kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";");
+ kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");
+ kafkaProp.put("metadata.max.age.ms", Long.MAX_VALUE); // 彻底禁用元数据更新
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
executorService.execute(runnable);
}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/README.md b/stwzhj-modules/stwzhj-kafka-consumer/README.md
new file mode 100644
index 00000000..2b08f9ea
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/README.md
@@ -0,0 +1,50 @@
+# stwzhj-kafka-consumer 服务说明
+
+## 功能描述
+
+消费SCRAM认证的Kafka消息,并通过HTTP批量发送给生产者服务。
+
+## 启动命令格式
+
+```bash
+java -jar stwzhj-kafka-consumer.jar
+```
+
+### 参数说明
+
+- `port`: 服务端口
+- `cityName`: 城市名称
+- `bootstrapServers`: Kafka服务器地址,多个地址用逗号分隔
+- `topics`: 订阅的主题,多个主题用逗号分隔
+- `groupId`: 消费者组ID
+- `cityCode`: 城市代码
+
+### 环境变量
+
+- `KAFKA_USERNAME`: Kafka用户名(必填)
+- `KAFKA_PASSWORD`: Kafka密码(必填)
+
+## 启动示例
+
+```bash
+# 设置环境变量
+export KAFKA_USERNAME=your_username
+export KAFKA_PASSWORD=your_password
+
+# 启动服务
+java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410
+```
+
+## 高吞吐量优化
+
+1. **批量发送**: 将多条消息打包成一批发送,减少HTTP请求次数
+2. **连接池**: 使用Apache HttpClient连接池,复用TCP连接
+3. **压缩传输**: 支持Gzip压缩,减少网络传输数据量
+4. **多线程处理**: 使用3个线程处理消息队列
+
+## 注意事项
+
+1. 确保生产者服务已启动并正常运行
+2. 根据实际负载情况调整批量发送大小和超时时间
+3. 监控消息队列容量,避免消息丢失
+4. Kafka密码需要在启动脚本中通过环境变量或配置文件传入
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/pom.xml b/stwzhj-modules/stwzhj-kafka-consumer/pom.xml
new file mode 100644
index 00000000..2a4db48d
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+ org.dromara
+ stwzhj-modules
+ ${revision}
+
+ 4.0.0
+
+ stwzhj-kafka-consumer
+
+
+ stwzhj-kafka-consumer 消费SCRAM认证的Kafka消息,并通过内存队列转发给生产者服务
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.6.1
+
+
+
+
+ com.alibaba
+ fastjson
+
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
+
+ cn.hutool
+ hutool-all
+ 5.4.0
+
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+ cn.dynamictp
+ dynamic-tp-spring-boot-starter-common
+ 1.1.0
+
+
+ org.apache.httpcomponents.client5
+ httpclient5
+
+
+
+
+ ${project.artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+
+
+ repackage
+
+
+
+
+
+
+
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java
new file mode 100644
index 00000000..73ee126d
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java
@@ -0,0 +1,65 @@
+package org.dromara.kafka.consumer;
+
+import org.dromara.kafka.consumer.config.KafkaConsumerConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.core.env.Environment;
+
+/**
+ * Kafka消费者服务启动类
+ * 消费SCRAM认证的Kafka消息,并通过HTTP批量发送给生产者服务
+ *
+ * 启动命令格式:
+ * java -jar stwzhj-kafka-consumer.jar
+ * 例如:
+ * java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410
+ */
+@SpringBootApplication
+public class KafkaConsumerApplication {
+
+ public static void main(String[] args) {
+ if (args.length < 6) {
+ System.err.println("启动参数不足,请使用以下格式:");
+ System.err.println("java -jar stwzhj-kafka-consumer.jar ");
+ System.err.println("例如:");
+ System.err.println("java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410");
+ System.exit(1);
+ }
+
+ // 解析启动参数
+ String port = args[0];
+ String cityName = args[1];
+ String bootstrapServers = args[2];
+ String topics = args[3];
+ String groupId = args[4];
+ String cityCode = args[5];
+
+ // 从环境变量获取密码
+ String password = System.getenv("KAFKA_PASSWORD");
+ if (password == null || password.isEmpty()) {
+ System.err.println("错误:未设置KAFKA_PASSWORD环境变量");
+ System.exit(1);
+ }
+
+ // 从环境变量获取用户名
+ String username = System.getenv("KAFKA_USERNAME");
+ if (username == null || username.isEmpty()) {
+ System.err.println("错误:未设置KAFKA_USERNAME环境变量");
+ System.exit(1);
+ }
+
+ // 设置系统属性,供Spring Boot读取
+ System.setProperty("server.port", port);
+ System.setProperty("kafka.consumer.port", port);
+ System.setProperty("kafka.consumer.cityName", cityName);
+ System.setProperty("kafka.consumer.bootstrapServers", bootstrapServers);
+ System.setProperty("kafka.consumer.topics", topics);
+ System.setProperty("kafka.consumer.groupId", groupId);
+ System.setProperty("kafka.consumer.cityCode", cityCode);
+ System.setProperty("kafka.consumer.username", username);
+ System.setProperty("kafka.consumer.password", password);
+
+ SpringApplication.run(KafkaConsumerApplication.class, args);
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/HttpClientConfig.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/HttpClientConfig.java
new file mode 100644
index 00000000..4a5e772f
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/HttpClientConfig.java
@@ -0,0 +1,54 @@
+package org.dromara.kafka.consumer.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * HTTP客户端配置
+ */
+@Data
+@Configuration
+@ConfigurationProperties(prefix = "http.client")
+public class HttpClientConfig {
+
+ /**
+ * 生产者服务地址
+ */
+ private String producerUrl = "http://localhost:9214";
+
+ /**
+ * 连接超时时间(ms)
+ */
+ private Integer connectTimeout = 5000;
+
+ /**
+ * 读取超时时间(ms)
+ */
+ private Integer readTimeout = 10000;
+
+ /**
+ * 最大连接数
+ */
+ private Integer maxConnections = 200;
+
+ /**
+ * 每个路由的最大连接数
+ */
+ private Integer maxConnectionsPerRoute = 50;
+
+ /**
+ * 批量发送大小
+ */
+ private Integer batchSize = 100;
+
+ /**
+ * 批量发送超时时间(ms)
+ */
+ private Integer batchTimeout = 100;
+
+ /**
+ * 是否启用压缩
+ */
+ private Boolean enableCompression = true;
+}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConsumerConfig.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConsumerConfig.java
new file mode 100644
index 00000000..cd1d989c
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConsumerConfig.java
@@ -0,0 +1,104 @@
+package org.dromara.kafka.consumer.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Kafka消费者配置
+ */
+@Data
+@Configuration
+@ConfigurationProperties(prefix = "kafka.consumer")
+public class KafkaConsumerConfig {
+
+ /**
+ * Kafka服务器地址
+ */
+ private String bootstrapServers;
+
+ /**
+ * 消费者组ID
+ */
+ private String groupId;
+
+ /**
+ * 消费者组ID(从启动命令获取)
+ */
+ private String groupName;
+
+ /**
+ * 订阅的主题,多个主题用逗号分隔
+ */
+ private String topics;
+
+ /**
+ * 安全协议:SASL_PLAINTEXT 或 PLAINTEXT
+ */
+ private String securityProtocol = "SASL_PLAINTEXT";
+
+ /**
+ * SASL机制:SCRAM-SHA-256 或 PLAIN
+ */
+ private String saslMechanism = "SCRAM-SHA-256";
+
+ /**
+ * SASL JAAS配置
+ */
+ private String saslJaasConfig;
+
+ /**
+ * 是否自动提交offset
+ */
+ private Boolean enableAutoCommit = true;
+
+ /**
+ * 自动提交offset的时间间隔(ms)
+ */
+ private Integer autoCommitIntervalMs = 1000;
+
+ /**
+ * 会话超时时间(ms)
+ */
+ private Integer sessionTimeoutMs = 30000;
+
+ /**
+ * 一次poll的最大等待时间(ms)
+ */
+ private Integer pollTimeoutMs = 10000;
+
+ /**
+ * 最大poll记录数
+ */
+ private Integer maxPollRecords = 500;
+
+ /**
+ * 城市代码
+ */
+ private String cityCode = "3408";
+
+ /**
+ * 消息队列容量
+ */
+ private Integer queueCapacity = 100000;
+
+ /**
+ * 用户名(从启动命令获取)
+ */
+ private String username;
+
+ /**
+ * 密码(从启动命令获取)
+ */
+ private String password;
+
+ /**
+ * 城市名称(从启动命令获取)
+ */
+ private String cityName;
+
+ /**
+ * 服务端口(从启动命令获取)
+ */
+ private Integer port = 9213;
+}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/MemoryQueueConfig.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/MemoryQueueConfig.java
new file mode 100644
index 00000000..6bdd6c26
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/MemoryQueueConfig.java
@@ -0,0 +1,22 @@
+package org.dromara.kafka.consumer.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * 内存队列配置
+ */
+@Configuration
+public class MemoryQueueConfig {
+
+ /**
+ * 创建内存队列,用于存储从Kafka消费的消息
+ */
+ @Bean
+ public BlockingQueue messageQueue(KafkaConsumerConfig kafkaConsumerConfig) {
+ return new ArrayBlockingQueue<>(kafkaConsumerConfig.getQueueCapacity());
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/domain/EsGpsInfo.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/domain/EsGpsInfo.java
new file mode 100644
index 00000000..ca5c471b
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/domain/EsGpsInfo.java
@@ -0,0 +1,50 @@
+package org.dromara.kafka.consumer.domain;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * description:
+ * gps定位信息(es表)
+ */
+@Data
+public class EsGpsInfo implements Serializable {
+
+ private static final long serialVersionUID = 7455495841680488351L;
+ /**
+ * 唯一码(外部系统)合肥版本不需要 21位id,
+ * 到时候上传省厅的时候 需要在kafka发送端处理,生成一个省厅需要的21位id
+ */
+ private String deviceCode;
+ /**
+ * 类型
+ */
+ private String deviceType;
+ private String lat;
+ private String lng;
+ //方向
+ private String orientation;
+ //高程
+ private String height;
+ //精度
+ private String deltaH;
+ private String speed;
+
+ private String zzjgdm;
+ private String zzjgmc;
+ private String policeNo;
+ private String policeName;
+ private String phoneNum;
+ private String carNum;
+
+ private Integer online;
+
+ @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
+ private Date gpsTime;
+ //3401,3402等地市代码
+ private String infoSource;
+
+}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/BatchMessageProcessor.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/BatchMessageProcessor.java
new file mode 100644
index 00000000..a046fa8d
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/BatchMessageProcessor.java
@@ -0,0 +1,150 @@
+package org.dromara.kafka.consumer.service;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import org.dromara.kafka.consumer.config.HttpClientConfig;
+import org.dromara.kafka.consumer.util.HttpClientUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * 批量消息处理器
+ * 将从Kafka消费的消息批量发送到生产者服务
+ */
+@Component
+public class BatchMessageProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(BatchMessageProcessor.class);
+
+ @Autowired
+ private HttpClientConfig httpClientConfig;
+
+ @Autowired
+ private HttpClientUtil httpClientUtil;
+
+ private BlockingQueue messageQueue;
+
+ private ExecutorService executorService;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ @PostConstruct
+ public void init() {
+ logger.info("初始化批量消息处理器...");
+
+ // 创建内存队列
+ messageQueue = new LinkedBlockingQueue<>(500);
+
+ // 启动处理线程
+ executorService = Executors.newFixedThreadPool(3); // 使用3个线程处理消息
+ running.set(true);
+
+ for (int i = 0; i < 3; i++) {
+ executorService.submit(this::processMessages);
+ }
+
+ logger.info("批量消息处理器启动成功");
+ }
+
+ @PreDestroy
+ public void destroy() {
+ logger.info("关闭批量消息处理器...");
+ running.set(false);
+ if (executorService != null) {
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executorService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+ logger.info("批量消息处理器已关闭");
+ }
+
+ /**
+ * 添加消息到队列
+ *
+ * @param message 消息内容
+ * @return 是否添加成功
+ */
+ public boolean addMessage(String message) {
+ try {
+ return messageQueue.offer(message);
+ } catch (Exception e) {
+ logger.error("添加消息到队列异常: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+
+ /**
+ * 处理消息
+ */
+ private void processMessages() {
+ logger.info("启动消息处理线程...");
+ List batch = new ArrayList<>(httpClientConfig.getBatchSize());
+ long lastSendTime = System.currentTimeMillis();
+
+ while (running.get()) {
+ try {
+ // 从队列中获取消息
+ String message = messageQueue.poll(100, TimeUnit.MILLISECONDS);
+
+ if (message != null) {
+ batch.add(message);
+ }
+
+ // 检查是否需要发送批次
+ long currentTime = System.currentTimeMillis();
+ boolean shouldSend = batch.size() >= httpClientConfig.getBatchSize() ||
+ (batch.size() > 0 && (currentTime - lastSendTime) >= httpClientConfig.getBatchTimeout());
+
+ if (shouldSend) {
+ if (!batch.isEmpty()) {
+ logger.debug("准备发送批量消息,消息数: {}", batch.size());
+ boolean success = httpClientUtil.sendBatchMessages(batch);
+
+ if (success) {
+ logger.debug("批量发送消息成功,消息数: {}", batch.size());
+ } else {
+ logger.warn("批量发送消息失败,消息数: {}", batch.size());
+ // 发送失败,将消息重新放回队列
+ for (String msg : batch) {
+ messageQueue.offer(msg);
+ }
+ }
+
+ batch.clear();
+ lastSendTime = currentTime;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Exception e) {
+ logger.error("处理消息异常: {}", e.getMessage(), e);
+ }
+ }
+
+ // 发送剩余消息
+ if (!batch.isEmpty()) {
+ logger.info("发送剩余消息,消息数: {}", batch.size());
+ httpClientUtil.sendBatchMessages(batch);
+ }
+
+ logger.info("消息处理线程结束");
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java
new file mode 100644
index 00000000..4a4e765c
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java
@@ -0,0 +1,163 @@
+package org.dromara.kafka.consumer.service;
+
+import cn.hutool.core.convert.ConvertException;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.dromara.kafka.consumer.config.KafkaConsumerConfig;
+import org.dromara.kafka.consumer.domain.EsGpsInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Kafka消费者服务
+ * 消费SCRAM认证的Kafka消息,并通过内存队列转发给生产者服务
+ */
+@Service
+public class KafkaConsumerService {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
+
+ @Autowired
+ private KafkaConsumerConfig kafkaConsumerConfig;
+
+ @Autowired
+ private BatchMessageProcessor batchMessageProcessor;
+
+ private KafkaConsumer consumer;
+
+ private ExecutorService executorService;
+
+ private volatile boolean running = false;
+
+ @PostConstruct
+ public void init() {
+ logger.info("初始化Kafka消费者服务...");
+ Properties props = createConsumerProperties();
+ consumer = new KafkaConsumer<>(props);
+
+ // 订阅主题
+ String[] topics = kafkaConsumerConfig.getTopics().split(",");
+ consumer.subscribe(Arrays.asList(topics));
+ logger.info("订阅Kafka主题成功: {}", Arrays.toString(topics));
+
+ // 启动消费者线程
+ executorService = Executors.newSingleThreadExecutor();
+ running = true;
+ executorService.submit(this::consumeMessages);
+
+ logger.info("Kafka消费者服务启动成功");
+ }
+
+ @PreDestroy
+ public void destroy() {
+ logger.info("关闭Kafka消费者服务...");
+ running = false;
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+ logger.info("Kafka消费者服务已关闭");
+ }
+
+ /**
+ * 创建消费者配置属性
+ */
+ private Properties createConsumerProperties() {
+ Properties props = new Properties();
+
+ // 基本配置
+ props.put("bootstrap.servers", kafkaConsumerConfig.getBootstrapServers());
+ // 使用从启动命令获取的消费者组ID
+ props.put("group.id", kafkaConsumerConfig.getGroupId());
+ props.put("enable.auto.commit", kafkaConsumerConfig.getEnableAutoCommit());
+ props.put("auto.commit.interval.ms", kafkaConsumerConfig.getAutoCommitIntervalMs());
+ props.put("session.timeout.ms", kafkaConsumerConfig.getSessionTimeoutMs());
+ props.put("max.poll.records", kafkaConsumerConfig.getMaxPollRecords());
+
+ // 序列化配置
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ // 安全配置
+ props.put("security.protocol", kafkaConsumerConfig.getSecurityProtocol());
+ props.put("sasl.mechanism", kafkaConsumerConfig.getSaslMechanism());
+
+ // 构建SASL JAAS配置
+ String saslJaasConfig = String.format(
+ "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",
+ kafkaConsumerConfig.getUsername(),
+ kafkaConsumerConfig.getPassword()
+ );
+ props.put("sasl.jaas.config", saslJaasConfig);
+ props.put("sasl.jaas.config", saslJaasConfig);
+
+ return props;
+ }
+
+ /**
+ * 消费消息
+ */
+ private void consumeMessages() {
+ logger.info("开始消费Kafka消息...");
+ while (running) {
+ try {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(kafkaConsumerConfig.getPollTimeoutMs()));
+ logger.info("Poll到消息数: {}", records.count());
+
+ for (ConsumerRecord record : records) {
+ try {
+ // 将消息添加到批量处理器
+ EsGpsInfo esGpsInfo;
+ JSONObject jsonObject;
+ try {
+ jsonObject = JSONUtil.parseObj(((String) record.value()));
+ }catch (ConvertException e){
+ logger.info("jsonObject=null:error={}",e.getMessage());
+ return;
+ }
+ try {
+ esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class);
+ }catch (ConvertException e){
+ logger.info("EsGpsInfo=null:error={}",e.getMessage());
+ return;
+ }
+ esGpsInfo.setInfoSource(kafkaConsumerConfig.getCityCode());
+ boolean added = batchMessageProcessor.addMessage(esGpsInfo.toString());
+ if (!added) {
+ logger.warn("消息队列已满,丢弃消息: topic={}, partition={}, offset={}",
+ record.topic(), record.partition(), record.offset());
+ }
+ } catch (Exception e) {
+ logger.error("处理消息异常: topic={}, partition={}, offset={}, error={}",
+ record.topic(), record.partition(), record.offset(), e.getMessage(), e);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("消费Kafka消息异常: {}", e.getMessage(), e);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ logger.info("停止消费Kafka消息");
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/util/HttpClientUtil.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/util/HttpClientUtil.java
new file mode 100644
index 00000000..e09b67c1
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/util/HttpClientUtil.java
@@ -0,0 +1,117 @@
+package org.dromara.kafka.consumer.util;
+
+import com.alibaba.fastjson.JSON;
+import jakarta.annotation.PostConstruct;
+import org.apache.hc.client5.http.classic.HttpClient;
+import org.apache.hc.client5.http.config.ConnectionConfig;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.SocketConfig;
+import org.apache.hc.core5.util.Timeout;
+import org.dromara.kafka.consumer.config.HttpClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.http.*;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HTTP客户端工具类
+ * 用于发送批量消息到生产者服务
+ */
+@Component
+public class HttpClientUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpClientUtil.class);
+
+ @Autowired
+ private HttpClientConfig httpClientConfig;
+
+ private RestTemplate restTemplate;
+
+ @PostConstruct
+ public void init() {
+ try {
+ logger.info("初始化HTTP客户端...");
+
+ // 创建连接管理器(HttpClient 5.x)
+ PoolingHttpClientConnectionManager connectionManager =
+ new PoolingHttpClientConnectionManager();
+
+ // 设置最大连接数
+ connectionManager.setMaxTotal(httpClientConfig.getMaxConnections());
+ // 设置每个路由的最大连接数
+ connectionManager.setDefaultMaxPerRoute(httpClientConfig.getMaxConnectionsPerRoute());
+
+ // 配置连接超时
+ ConnectionConfig connectionConfig = ConnectionConfig.custom()
+ .setConnectTimeout(Timeout.of(httpClientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS))
+ .build();
+ connectionManager.setDefaultConnectionConfig(connectionConfig);
+
+ // 配置 Socket 超时(读写超时)
+ SocketConfig socketConfig = SocketConfig.custom()
+ .setSoTimeout(Timeout.of(httpClientConfig.getReadTimeout(), TimeUnit.MILLISECONDS))
+ .build();
+ connectionManager.setDefaultSocketConfig(socketConfig);
+
+ // 创建 HttpClient
+ HttpClient httpClient = HttpClients.custom()
+ .setConnectionManager(connectionManager)
+ .build();
+
+ // 创建 HttpComponentsClientHttpRequestFactory
+ HttpComponentsClientHttpRequestFactory factory =
+ new HttpComponentsClientHttpRequestFactory(httpClient);
+
+ // 设置请求超时
+ factory.setConnectTimeout(httpClientConfig.getConnectTimeout());
+
+ restTemplate = new RestTemplate(factory);
+
+ logger.info("HTTP客户端初始化成功,连接池配置:最大连接数={}, 每路由最大连接数={}",
+ httpClientConfig.getMaxConnections(),
+ httpClientConfig.getMaxConnectionsPerRoute());
+
+ } catch (Exception e) {
+ logger.error("HTTP客户端初始化失败", e);
+ throw new RuntimeException("HTTP客户端初始化失败", e);
+ }
+ }
+
+ /**
+ * 批量发送消息到生产者服务
+ *
+ * @param messages 消息列表
+ * @return 是否发送成功
+ */
+ public boolean sendBatchMessages(List messages) {
+ try {
+ String url = httpClientConfig.getProducerUrl() + "/api/kafka/batch";
+
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+ if (httpClientConfig.getEnableCompression()) {
+ headers.add("Content-Encoding", "gzip");
+ }
+
+ HttpEntity> request = new HttpEntity<>(messages, headers);
+ ResponseEntity response = restTemplate.exchange(url, HttpMethod.POST, request, String.class);
+
+ if (response.getStatusCode() == HttpStatus.OK) {
+ logger.debug("批量发送消息成功,消息数: {}", messages.size());
+ return true;
+ } else {
+ logger.warn("批量发送消息失败,状态码: {}, 响应: {}", response.getStatusCode(), response.getBody());
+ return false;
+ }
+ } catch (Exception e) {
+ logger.error("批量发送消息异常,消息数: {}, 错误: {}", messages.size(), e.getMessage(), e);
+ return false;
+ }
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/application.yml b/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/application.yml
new file mode 100644
index 00000000..45aba0a3
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/application.yml
@@ -0,0 +1,72 @@
+# Tomcat
+server:
+ port: ${kafka.consumer.port:9213}
+
+# Spring
+spring:
+ application:
+ # 应用名称
+ name: stwzhj-kafka-consumer
+ profiles:
+ # 环境配置
+ active: @profiles.active@
+
+# Kafka消费者配置
+kafka:
+ consumer:
+ # Kafka服务器地址(通过启动命令传入)
+ bootstrap-servers: ${kafka.consumer.bootstrapServers:}
+ # 消费者组ID(通过启动命令传入)
+ group-id: ${kafka.consumer.groupId:}
+ # 订阅的主题,多个主题用逗号分隔(通过启动命令传入)
+ topics: ${kafka.consumer.topics:}
+ # 安全协议
+ security-protocol: SASL_PLAINTEXT
+ # SASL机制
+ sasl-mechanism: SCRAM-SHA-256
+ # 是否自动提交offset
+ enable-auto-commit: true
+ # 自动提交offset的时间间隔(ms)
+ auto-commit-interval-ms: 1000
+ # 会话超时时间(ms)
+ session-timeout-ms: 30000
+ # 一次poll的最大等待时间(ms)
+ poll-timeout-ms: 10000
+ # 最大poll记录数
+ max-poll-records: 500
+ # 城市代码(通过启动命令传入)
+ city-code: ${kafka.consumer.cityCode:3408}
+ # 用户名(通过启动命令传入)
+ username: ${kafka.consumer.username:}
+ # 密码(通过启动命令传入)
+ password: ${kafka.consumer.password:}
+ # 消息队列容量
+ queue-capacity: 100000
+
+# HTTP客户端配置
+http:
+ client:
+ # 生产者服务地址
+ producer-url: http://localhost:9214
+ # 连接超时时间(ms)
+ connect-timeout: 5000
+ # 读取超时时间(ms)
+ read-timeout: 10000
+ # 最大连接数
+ max-connections: 200
+ # 每个路由的最大连接数
+ max-connections-per-route: 50
+ # 批量发送大小
+ batch-size: 100
+ # 批量发送超时时间(ms)
+ batch-timeout: 100
+ # 是否启用压缩
+ enable-compression: true
+
+# 日志配置
+logging:
+ level:
+ org.springframework: warn
+ org.apache.kafka: INFO
+ org.dromara.kafka.consumer: INFO
+ config: classpath:logback-plus.xml
diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/logback-plus.xml
new file mode 100644
index 00000000..c2ec3dad
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/logback-plus.xml
@@ -0,0 +1,49 @@
+
+
+
+
+
+
+
+
+
+
+ ${log.path}/info.${log.file}.log
+
+ INFO
+ ACCEPT
+ DENY
+
+
+ ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz
+ ${MAX_FILE_SIZE}
+ ${MAX_HISTORY}
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+ ${log.path}/error.${log.file}.log
+
+ ERROR
+
+
+ ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz
+ ${MAX_FILE_SIZE}
+ ${MAX_HISTORY}
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
diff --git a/stwzhj-modules/stwzhj-kafka-producer/README.md b/stwzhj-modules/stwzhj-kafka-producer/README.md
new file mode 100644
index 00000000..d43d118c
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/README.md
@@ -0,0 +1,51 @@
+# stwzhj-kafka-producer 服务说明
+
+## 功能描述
+
+接收来自消费者服务的HTTP请求,并将消息发送到华为认证的Kafka。
+
+## 配置说明
+
+本服务的华为Kafka认证配置从服务器上的以下文件读取:
+- `/shengting/gpsstore/producer.properties`: Kafka生产者配置
+- `/shengting/gpsstore/krb5.conf`: Kerberos配置文件
+- `/shengting/gpsstore/user.keytab`: Kerberos认证keytab文件
+- `/shengting/gpsstore/kafkaSecurityMode`: 安全模式开关
+
+这些配置文件与原服务保持一致,确保华为Kafka认证完全兼容。
+
+## 启动示例
+
+```bash
+java -jar stwzhj-kafka-producer.jar
+```
+
+## 高吞吐量优化
+
+1. **异步发送**: 支持异步发送消息,提高并发处理能力
+2. **批量发送**: 配合Kafka的批量发送机制,提高吞吐量
+3. **连接复用**: HTTP连接复用,减少连接建立开销
+
+## 注意事项
+
+1. 确保服务器上的华为Kafka认证配置文件存在且配置正确
+2. 服务启动时会自动读取服务器上的配置文件,无需手动配置
+3. 监控Kafka发送成功率,及时发现和处理异常
+
+## API接口
+
+### 批量接收消息
+
+**URL**: `/api/kafka/batch`
+
+**方法**: POST
+
+**请求体**: JSON数组,包含多条消息
+
+```json
+["message1", "message2", "message3"]
+```
+
+**响应**:
+- 成功: 200 OK, "发送成功"
+- 失败: 500 Internal Server Error, "发送失败" 或 "处理异常: xxx"
diff --git a/stwzhj-modules/stwzhj-kafka-producer/pom.xml b/stwzhj-modules/stwzhj-kafka-producer/pom.xml
new file mode 100644
index 00000000..acc2dd75
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/pom.xml
@@ -0,0 +1,118 @@
+
+
+
+ org.dromara
+ stwzhj-modules
+ ${revision}
+
+ 4.0.0
+
+ stwzhj-kafka-producer
+
+
+ stwzhj-kafka-producer 从内存队列获取消息,并发送到华为认证的Kafka
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ org.apache.kafka
+ kafka_2.12
+ 3.6.1-h0.cbu.mrs.350.r11
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ net.sf.jopt-simple
+ jopt-simple
+
+
+ com.huawei.mrs
+ manager-wc2frm
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+ org.xerial.snappy
+ snappy-java
+
+
+ com.huawei.mrs
+ om-controller-api
+
+
+ com.101tec
+ zkclient
+
+
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.6.1-h0.cbu.mrs.350.r11
+
+
+
+
+ com.alibaba
+ fastjson
+
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
+
+ cn.hutool
+ hutool-all
+ 5.4.0
+
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+ cn.dynamictp
+ dynamic-tp-spring-boot-starter-common
+ 1.1.0
+
+
+
+
+ ${project.artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+
+
+ repackage
+
+
+
+
+
+
+
diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/KafkaProducerApplication.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/KafkaProducerApplication.java
new file mode 100644
index 00000000..32785981
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/KafkaProducerApplication.java
@@ -0,0 +1,15 @@
+package org.dromara.kafka.producer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * Kafka生产者服务启动类
+ * 从内存队列获取消息,并发送到华为认证的Kafka
+ */
+@SpringBootApplication
+public class KafkaProducerApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaProducerApplication.class, args);
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/controller/KafkaProducerController.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/controller/KafkaProducerController.java
new file mode 100644
index 00000000..7673bdc0
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/controller/KafkaProducerController.java
@@ -0,0 +1,54 @@
+package org.dromara.kafka.producer.controller;
+
+import org.dromara.kafka.producer.service.KafkaProducerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+/**
+ * Kafka生产者控制器
+ * 接收来自消费者服务的HTTP请求,并将消息发送到华为Kafka
+ */
+@RestController
+@RequestMapping("/api/kafka")
+public class KafkaProducerController {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaProducerController.class);
+
+ @Autowired
+ private KafkaProducerService kafkaProducerService;
+
+ /**
+ * 批量接收消息并发送到Kafka
+ *
+ * @param messages 消息列表
+ * @return 处理结果
+ */
+ @PostMapping("/batch")
+ public ResponseEntity batchSend(@RequestBody List messages) {
+ try {
+ logger.info("接收到批量消息,消息数: {}", messages.size());
+
+ // 批量发送消息到Kafka
+ boolean success = kafkaProducerService.sendBatchMessages(messages);
+
+ if (success) {
+ logger.info("批量发送消息成功,消息数: {}", messages.size());
+ return ResponseEntity.ok("发送成功");
+ } else {
+ logger.warn("批量发送消息失败,消息数: {}", messages.size());
+ return ResponseEntity.status(500).body("发送失败");
+ }
+ } catch (Exception e) {
+ logger.error("处理批量消息异常,消息数: {}, 错误: {}", messages.size(), e.getMessage(), e);
+ return ResponseEntity.status(500).body("处理异常: " + e.getMessage());
+ }
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/producer/Producer.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/producer/Producer.java
new file mode 100644
index 00000000..0fca9553
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/producer/Producer.java
@@ -0,0 +1,212 @@
+package org.dromara.kafka.producer.producer;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.dromara.kafka.producer.util.KafkaProperties;
+import org.dromara.kafka.producer.util.LoginUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+/**
+ * description:
+ *
+ * @author chenle
+ * @date 2021-11-03 14:15
+ */
+@Component
+public class Producer {
+
+ private static final Logger logger = LoggerFactory.getLogger(Producer.class);
+
+ private final KafkaProducer producer;
+
+ // 私有静态实例(volatile 保证可见性和有序性)
+ private static volatile Producer instance;
+
+
+
+ private final Boolean isAsync = true;
+
+
+
+ // Broker地址列表
+ private final static String BOOTSTRAP_SERVER = "bootstrap.servers";
+
+ // 客户端ID
+ private final static String CLIENT_ID = "client.id";
+
+ // Key序列化类
+ private final static String KEY_SERIALIZER = "key.serializer";
+
+ // Value序列化类
+ private final static String VALUE_SERIALIZER = "value.serializer";
+
+ // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
+ private final static String SECURITY_PROTOCOL = "security.protocol";
+
+ // 服务名
+ private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
+
+ // 域名
+ private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name";
+
+ // 分区类名
+ private final static String PARTITIONER_NAME = "partitioner.class";
+
+ // 默认发送100条消息
+ private final static int MESSAGE_NUM = 100;
+
+ /**
+ * 用户自己申请的机机账号keytab文件名称
+ */
+ private static final String USER_KEYTAB_FILE = "user.keytab";
+
+ /**
+ * 用户自己申请的机机账号名称
+ */
+ private static final String USER_PRINCIPAL = "yhy_ahrs_rcw";
+
+
+
+ /**
+ * Producer constructor
+ *
+ */
+ public Producer() {
+ initSecurity();
+ Properties props = initProperties();
+ this.producer = new KafkaProducer<>(props);
+ }
+
+ // 获取单例实例的公共方法(双重校验锁)
+ public static Producer getInstance() {
+ if (instance == null) {
+ synchronized (Producer.class) {
+ if (instance == null) {
+ instance = new Producer();
+ }
+ }
+ }
+ return instance;
+ }
+
+ // 添加 ShutdownHook 确保资源释放(推荐)
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ if (instance != null && instance.producer != null) {
+ instance.producer.close();
+ }
+ }));
+ }
+
+
+ /**
+ * 初始化安全认证
+ */
+ public void initSecurity() {
+ if (LoginUtil.isSecurityModel())
+ {
+ try {
+ logger.info("Securitymode start.");
+
+ // !!注意,安全认证时,需要用户手动修改为自己申请的机机账号
+ LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
+ } catch (IOException e) {
+ logger.error("Security prepare failure.");
+ logger.error("The IOException occured.", e);
+ }
+ logger.info("Security prepare success.");
+ }
+ }
+
+ public static Properties initProperties() {
+ Properties props = new Properties();
+ KafkaProperties kafkaProc = KafkaProperties.getInstance();
+
+ // Broker地址列表
+ props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
+ // 客户端ID
+ props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
+ // Key序列化类
+ props.put(KEY_SERIALIZER,
+ kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
+ // Value序列化类
+ props.put(VALUE_SERIALIZER,
+ kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
+ // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
+ props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
+ // 服务名
+ props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
+ // 域名
+ props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
+ // 分区类名
+// props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner"));
+ return props;
+ }
+
+ /**
+ * 发送消息(核心方法)
+ *
+ * @param topic
+ * @param message 消息内容
+ * @return 同步发送时返回 RecordMetadata,异步发送返回 null
+ */
+ public RecordMetadata sendMessage(String topic, String message) {
+ try {
+ logger.info("调用发送:topic={}, Object={}",topic,message );
+ long startTime = System.currentTimeMillis();
+ ProducerRecord record = new ProducerRecord<>(topic, message);
+ if (isAsync) {
+ // 异步发送
+ producer.send(record, new DemoCallBack(startTime,topic, message));
+ return null;
+ } else {
+ Future future = producer.send(record);
+ logger.info("同步发送成功: Object={}", future.get().topic());
+ return future.get();
+
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+
+
+ /**
+ * 内部回调类
+ */
+ private static class DemoCallBack implements Callback {
+ private final Logger logger = LoggerFactory.getLogger(DemoCallBack.class);
+ private final long startTime;
+
+ private final String topic;
+ private final String message;
+
+ public DemoCallBack(long startTime, String topic, String message) {
+ this.startTime = startTime;
+ this.topic = topic;
+ this.message = message;
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (metadata != null) {
+ logger.info("topic=({}, {}) sent to partition({}), offset({}) in {} ms",
+ topic, message, metadata.partition(), metadata.offset(), elapsedTime);
+ } else if (exception != null) {
+ logger.error("Message sending failed", exception);
+ }
+ }
+ }
+
+}
diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/service/KafkaProducerService.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/service/KafkaProducerService.java
new file mode 100644
index 00000000..0440463b
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/service/KafkaProducerService.java
@@ -0,0 +1,94 @@
+package org.dromara.kafka.producer.service;
+
+import jakarta.annotation.PostConstruct;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.dromara.kafka.producer.producer.Producer;
+import org.dromara.kafka.producer.util.KafkaProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+
+/**
+ * Kafka生产者服务
+ * 将消息发送到华为认证的Kafka
+ */
+@Service
+public class KafkaProducerService {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
+ private Producer producer;
+
+
+
+ @PostConstruct
+ public void init() {
+ logger.info("初始化Kafka生产者服务...");
+ producer = Producer.getInstance();
+ logger.info("Kafka生产者服务初始化成功");
+ }
+
+ /**
+ * 批量发送消息到Kafka
+ *
+ * @param messages 消息列表
+ * @return 是否全部发送成功
+ */
+ /**
+ * 批量发送消息到Kafka
+ *
+ * @param messages 消息列表
+ * @return 是否全部发送成功
+ */
+ public boolean sendBatchMessages(List messages) {
+ if (messages == null || messages.isEmpty()) {
+ logger.warn("消息列表为空,不发送");
+ return true;
+ }
+
+ try {
+ String topic = KafkaProperties.TOPIC;
+ boolean allSuccess = true;
+ int successCount = 0;
+ int failCount = 0;
+
+ for (String message : messages) {
+ try {
+ // 使用Producer单例发送消息(异步发送)
+ RecordMetadata metadata = producer.sendMessage(topic, message);
+ // 异步发送时metadata为null,表示发送请求已提交
+ if (metadata == null) {
+ successCount++;
+ } else {
+ // 同步发送的情况(但当前配置是异步)
+ successCount++;
+ logger.info("同步发送成功: topic={}, offset={}", metadata.topic(), metadata.offset());
+ }
+ } catch (Exception e) {
+ logger.error("发送消息异常: {}", e.getMessage(), e);
+ allSuccess = false;
+ failCount++;
+ }
+ }
+
+ logger.info("批量发送消息完成,总数: {}, 成功: {}, 失败: {}", messages.size(), successCount, failCount);
+
+ // 异步发送需要等待一段时间确保消息发送完成
+ if (successCount > 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("等待消息发送完成时被中断");
+ }
+ }
+
+ return allSuccess;
+ } catch (Exception e) {
+ logger.error("批量发送消息异常: {}", e.getMessage(), e);
+ return false;
+ }
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/KafkaProperties.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/KafkaProperties.java
new file mode 100644
index 00000000..0439e1b8
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/KafkaProperties.java
@@ -0,0 +1,138 @@
+package org.dromara.kafka.producer.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class KafkaProperties
+{
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
+
+ // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限
+ public final static String TOPIC = "jysb_dwxx";
+
+ private static Properties serverProps = new Properties();
+
+ private static Properties producerProps = new Properties();
+
+ private static Properties consumerProps = new Properties();
+
+ private static Properties clientProps = new Properties();
+
+ private static KafkaProperties instance = null;
+
+ private KafkaProperties()
+ {
+// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
+ String filePath = "/home/rsoft/config/";
+ LOG.info("路径=={}",filePath);
+ try
+ {
+ File proFile = new File(filePath + "producer.properties");
+
+ if (proFile.exists())
+ {
+ producerProps.load(new FileInputStream(filePath + "producer.properties"));
+ }
+
+ File conFile = new File(filePath + "producer.properties");
+
+ if (conFile.exists())
+ {
+ consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
+ }
+
+ File serFile = new File(filePath + "server.properties");
+
+ if (serFile.exists())
+ {
+ serverProps.load(new FileInputStream(filePath + "server.properties"));
+ }
+
+ File cliFile = new File(filePath + "client.properties");
+
+ if (cliFile.exists())
+ {
+ clientProps.load(new FileInputStream(filePath + "client.properties"));
+ }
+ }
+ catch (IOException e)
+ {
+ LOG.info("The Exception occured.", e);
+ }
+ }
+
+ public synchronized static KafkaProperties getInstance()
+ {
+ if (null == instance)
+ {
+ instance = new KafkaProperties();
+ }
+
+ return instance;
+ }
+
+ /**
+ * 获取参数值
+ * @param key properites的key值
+ * @param defValue 默认值
+ * @return
+ */
+ public String getValues(String key, String defValue)
+ {
+ String rtValue = null;
+
+ if (null == key)
+ {
+ LOG.error("key is null");
+ }
+ else
+ {
+ rtValue = getPropertiesValue(key);
+ }
+
+ if (null == rtValue)
+ {
+ LOG.warn("KafkaProperties.getValues return null, key is " + key);
+ rtValue = defValue;
+ }
+
+ LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
+
+ return rtValue;
+ }
+
+ /**
+ * 根据key值获取server.properties的值
+ * @param key
+ * @return
+ */
+ private String getPropertiesValue(String key)
+ {
+ String rtValue = serverProps.getProperty(key);
+
+ // server.properties中没有,则再向producer.properties中获取
+ if (null == rtValue)
+ {
+ rtValue = producerProps.getProperty(key);
+ }
+
+ // producer中没有,则再向consumer.properties中获取
+ if (null == rtValue)
+ {
+ rtValue = consumerProps.getProperty(key);
+ }
+
+ // consumer没有,则再向client.properties中获取
+ if (null == rtValue)
+ {
+ rtValue = clientProps.getProperty(key);
+ }
+
+ return rtValue;
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/LoginUtil.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/LoginUtil.java
new file mode 100644
index 00000000..6bfb825e
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/LoginUtil.java
@@ -0,0 +1,259 @@
+package org.dromara.kafka.producer.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+
+public class LoginUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);
+
+ /**
+ * no JavaDoc
+ */
+ public enum Module {
+ KAFKA("KafkaClient"), ZOOKEEPER("Client");
+
+ private String name;
+
+ private Module(String name)
+ {
+ this.name = name;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+ }
+
+ /**
+ * line operator string
+ */
+ private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+ /**
+ * jaas file postfix
+ */
+ private static final String JAAS_POSTFIX = ".jaas.conf";
+
+ /**
+ * is IBM jdk or not
+ */
+ private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
+
+ /**
+ * IBM jdk login module
+ */
+ private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
+
+ /**
+ * oracle jdk login module
+ */
+ private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
+
+ /**
+ * Zookeeper quorum principal.
+ */
+ public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
+
+ /**
+ * java security krb5 file path
+ */
+ public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
+
+ /**
+ * java security login file path
+ */
+ public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
+
+ /**
+ * 设置jaas.conf文件
+ *
+ * @param principal
+ * @param keytabPath
+ * @throws IOException
+ */
+ public static void setJaasFile(String principal, String keytabPath)
+ throws IOException {
+ String jaasPath =
+ new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ + JAAS_POSTFIX;
+
+ // windows路径下分隔符替换
+ jaasPath = jaasPath.replace("\\", "\\\\");
+ // 删除jaas文件
+ deleteJaasFile(jaasPath);
+ writeJaasFile(jaasPath, principal, keytabPath);
+ System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
+ }
+
+ /**
+ * 设置zookeeper服务端principal
+ *
+ * @param zkServerPrincipal
+ * @throws IOException
+ */
+ public static void setZookeeperServerPrincipal(String zkServerPrincipal)
+ throws IOException {
+ System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
+ String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
+ if (ret == null)
+ {
+ throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
+ }
+ if (!ret.equals(zkServerPrincipal))
+ {
+ throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
+ }
+ }
+
+ /**
+ * 设置krb5文件
+ *
+ * @param krb5ConfFile
+ * @throws IOException
+ */
+ public static void setKrb5Config(String krb5ConfFile)
+ throws IOException {
+ System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
+ String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
+ if (ret == null)
+ {
+ throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
+ }
+ if (!ret.equals(krb5ConfFile))
+ {
+ throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
+ }
+ }
+
+ /**
+ * 写入jaas文件
+ *
+ * @throws IOException
+ * 写文件异常
+ */
+ private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
+ throws IOException {
+ FileWriter writer = new FileWriter(new File(jaasPath));
+ try
+ {
+ writer.write(getJaasConfContext(principal, keytabPath));
+ writer.flush();
+ }
+ catch (IOException e)
+ {
+ throw new IOException("Failed to create jaas.conf File");
+ }
+ finally
+ {
+ writer.close();
+ }
+ }
+
+ private static void deleteJaasFile(String jaasPath)
+ throws IOException {
+ File jaasFile = new File(jaasPath);
+ if (jaasFile.exists())
+ {
+ if (!jaasFile.delete())
+ {
+ throw new IOException("Failed to delete exists jaas file.");
+ }
+ }
+ }
+
+ private static String getJaasConfContext(String principal, String keytabPath) {
+ Module[] allModule = Module.values();
+ StringBuilder builder = new StringBuilder();
+ for (Module modlue : allModule)
+ {
+ builder.append(getModuleContext(principal, keytabPath, modlue));
+ }
+ return builder.toString();
+ }
+
+ private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
+ StringBuilder builder = new StringBuilder();
+ if (IS_IBM_JDK) {
+ builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
+ builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
+ builder.append("credsType=both").append(LINE_SEPARATOR);
+ builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
+ builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
+ builder.append("debug=true;").append(LINE_SEPARATOR);
+ builder.append("};").append(LINE_SEPARATOR);
+ } else {
+ builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
+ builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
+ builder.append("useKeyTab=true").append(LINE_SEPARATOR);
+ builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
+ builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
+ builder.append("useTicketCache=false").append(LINE_SEPARATOR);
+ builder.append("storeKey=true").append(LINE_SEPARATOR);
+ builder.append("debug=true;").append(LINE_SEPARATOR);
+ builder.append("};").append(LINE_SEPARATOR);
+ }
+
+ return builder.toString();
+ }
+
+ public static void securityPrepare(String principal, String keyTabFile) throws IOException {
+// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
+ String filePath = "/home/rsoft/config/";
+ String krbFile = filePath + "krb5.conf";
+ String userKeyTableFile = filePath + keyTabFile;
+
+ // windows路径下分隔符替换
+ userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
+ krbFile = krbFile.replace("\\", "\\\\");
+
+ LoginUtil.setKrb5Config(krbFile);
+ LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
+ LoginUtil.setJaasFile(principal, userKeyTableFile);
+ }
+
+ /**
+ * Check security mode
+ *
+ * @return boolean
+ */
+ public static Boolean isSecurityModel() {
+ Boolean isSecurity = false;
+// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
+ String krbFilePath = "/home/rsoft/config/kafkaSecurityMode";
+ Properties securityProps = new Properties();
+
+ // file does not exist.
+ if (!isFileExists(krbFilePath)) {
+ return isSecurity;
+ }
+
+ try {
+ securityProps.load(new FileInputStream(krbFilePath));
+
+ if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
+ {
+ isSecurity = true;
+ }
+ } catch (Exception e) {
+ LOG.info("The Exception occured : {}.", e);
+ }
+
+ return isSecurity;
+ }
+
+ /*
+ * 判断文件是否存在
+ */
+ private static boolean isFileExists(String fileName) {
+ File file = new File(fileName);
+
+ return file.exists();
+ }
+}
diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/application.yml b/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/application.yml
new file mode 100644
index 00000000..445be730
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/application.yml
@@ -0,0 +1,20 @@
+# Tomcat
+server:
+ port: 9214
+
+# Spring
+spring:
+ application:
+ # 应用名称
+ name: stwzhj-kafka-producer
+ profiles:
+ # 环境配置
+ active: @profiles.active@
+
+# 日志配置
+logging:
+ level:
+ org.springframework: warn
+ org.apache.kafka: INFO
+ org.dromara.kafka.producer: INFO
+ config: classpath:logback-plus.xml
diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/logback-plus.xml
new file mode 100644
index 00000000..5a49af14
--- /dev/null
+++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/logback-plus.xml
@@ -0,0 +1,59 @@
+
+
+
+
+
+
+
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+
+
+
+
+
+ ${log.path}/info.log
+
+ ${log.path}/info-%d{yyyy-MM-dd}.%i.log
+ ${log.maxHistory}
+ ${log.maxSize}
+ ${log.totalSizeCap}
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+
+
+ INFO
+ ACCEPT
+ DENY
+
+
+
+
+
+ ${log.path}/error.log
+
+ ${log.path}/error-%d{yyyy-MM-dd}.%i.log
+ ${log.maxHistory}
+ ${log.maxSize}
+ ${log.totalSizeCap}
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+
+
+ ERROR
+ ACCEPT
+ DENY
+
+
+
+
+
+
+
+
+
+