From c1152bf7d8d367be06f11d98688fac026f40b12d Mon Sep 17 00:00:00 2001 From: luyya Date: Tue, 24 Mar 2026 15:49:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=B6=88=E8=B4=B9scram?= =?UTF-8?q?=E8=AE=A4=E8=AF=81kafka=E5=92=8C=E5=8F=91=E9=80=81=E5=8D=8E?= =?UTF-8?q?=E4=B8=BA=E8=AE=A4=E8=AF=81kafka=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stwzhj-modules/pom.xml | 2 + .../data2kafka/consumer/RealConsumer.java | 12 +- .../stwzhj-kafka-consumer/README.md | 50 ++++ stwzhj-modules/stwzhj-kafka-consumer/pom.xml | 86 ++++++ .../consumer/KafkaConsumerApplication.java | 65 +++++ .../consumer/config/HttpClientConfig.java | 54 ++++ .../consumer/config/KafkaConsumerConfig.java | 104 +++++++ .../consumer/config/MemoryQueueConfig.java | 22 ++ .../kafka/consumer/domain/EsGpsInfo.java | 50 ++++ .../service/BatchMessageProcessor.java | 150 ++++++++++ .../service/KafkaConsumerService.java | 163 +++++++++++ .../kafka/consumer/util/HttpClientUtil.java | 117 ++++++++ .../src/main/resources/application.yml | 72 +++++ .../src/main/resources/logback-plus.xml | 49 ++++ .../stwzhj-kafka-producer/README.md | 51 ++++ stwzhj-modules/stwzhj-kafka-producer/pom.xml | 118 ++++++++ .../producer/KafkaProducerApplication.java | 15 + .../controller/KafkaProducerController.java | 54 ++++ .../kafka/producer/producer/Producer.java | 212 ++++++++++++++ .../service/KafkaProducerService.java | 94 +++++++ .../kafka/producer/util/KafkaProperties.java | 138 ++++++++++ .../kafka/producer/util/LoginUtil.java | 259 ++++++++++++++++++ .../src/main/resources/application.yml | 20 ++ .../src/main/resources/logback-plus.xml | 59 ++++ 24 files changed, 2010 insertions(+), 6 deletions(-) create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/README.md create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/pom.xml create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/HttpClientConfig.java create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConsumerConfig.java create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/MemoryQueueConfig.java create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/domain/EsGpsInfo.java create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/BatchMessageProcessor.java create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/util/HttpClientUtil.java create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/application.yml create mode 100644 stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/logback-plus.xml create mode 100644 stwzhj-modules/stwzhj-kafka-producer/README.md create mode 100644 stwzhj-modules/stwzhj-kafka-producer/pom.xml create mode 100644 stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/KafkaProducerApplication.java create mode 100644 stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/controller/KafkaProducerController.java create mode 100644 stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/producer/Producer.java create mode 100644 stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/service/KafkaProducerService.java create mode 100644 stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/KafkaProperties.java create mode 100644 stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/LoginUtil.java create mode 100644 stwzhj-modules/stwzhj-kafka-producer/src/main/resources/application.yml create mode 100644 stwzhj-modules/stwzhj-kafka-producer/src/main/resources/logback-plus.xml diff --git a/stwzhj-modules/pom.xml b/stwzhj-modules/pom.xml index 21b478f5..7a0d1ffb 100644 --- a/stwzhj-modules/pom.xml +++ b/stwzhj-modules/pom.xml @@ -19,6 +19,8 @@ stwzhj-data2StKafka stwzhj-extract stwzhj-data2gas + stwzhj-kafka-consumer + stwzhj-kafka-producer stwzhj-modules diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java index ccaeca55..5818d1be 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java @@ -96,12 +96,12 @@ public class RealConsumer implements CommandLineRunner { } logger.info("Security prepare success."); } -// kafkaProp.put("socket.connection.setup.timeout.ms", "60000"); -// -// kafkaProp.put("security.protocol", "SASL_PLAINTEXT"); -// kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";"); -// kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256"); -// kafkaProp.put("metadata.max.age.ms", Long.MAX_VALUE); // 彻底禁用元数据更新 + kafkaProp.put("socket.connection.setup.timeout.ms", "60000"); + + kafkaProp.put("security.protocol", "SASL_PLAINTEXT"); + kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";"); + kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256"); + kafkaProp.put("metadata.max.age.ms", Long.MAX_VALUE); // 彻底禁用元数据更新 KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode); executorService.execute(runnable); } diff --git a/stwzhj-modules/stwzhj-kafka-consumer/README.md b/stwzhj-modules/stwzhj-kafka-consumer/README.md new file mode 100644 index 00000000..2b08f9ea --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/README.md @@ -0,0 +1,50 @@ +# stwzhj-kafka-consumer 服务说明 + +## 功能描述 + +消费SCRAM认证的Kafka消息,并通过HTTP批量发送给生产者服务。 + +## 启动命令格式 + +```bash +java -jar stwzhj-kafka-consumer.jar +``` + +### 参数说明 + +- `port`: 服务端口 +- `cityName`: 城市名称 +- `bootstrapServers`: Kafka服务器地址,多个地址用逗号分隔 +- `topics`: 订阅的主题,多个主题用逗号分隔 +- `groupId`: 消费者组ID +- `cityCode`: 城市代码 + +### 环境变量 + +- `KAFKA_USERNAME`: Kafka用户名(必填) +- `KAFKA_PASSWORD`: Kafka密码(必填) + +## 启动示例 + +```bash +# 设置环境变量 +export KAFKA_USERNAME=your_username +export KAFKA_PASSWORD=your_password + +# 启动服务 +java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410 +``` + +## 高吞吐量优化 + +1. **批量发送**: 将多条消息打包成一批发送,减少HTTP请求次数 +2. **连接池**: 使用Apache HttpClient连接池,复用TCP连接 +3. **压缩传输**: 支持Gzip压缩,减少网络传输数据量 +4. **多线程处理**: 使用3个线程处理消息队列 + +## 注意事项 + +1. 确保生产者服务已启动并正常运行 +2. 根据实际负载情况调整批量发送大小和超时时间 +3. 监控消息队列容量,避免消息丢失 +4. Kafka密码需要在启动脚本中通过环境变量或配置文件传入 diff --git a/stwzhj-modules/stwzhj-kafka-consumer/pom.xml b/stwzhj-modules/stwzhj-kafka-consumer/pom.xml new file mode 100644 index 00000000..2a4db48d --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/pom.xml @@ -0,0 +1,86 @@ + + + + org.dromara + stwzhj-modules + ${revision} + + 4.0.0 + + stwzhj-kafka-consumer + + + stwzhj-kafka-consumer 消费SCRAM认证的Kafka消息,并通过内存队列转发给生产者服务 + + + + + + org.springframework.boot + spring-boot-starter-web + + + + + org.apache.kafka + kafka-clients + 3.6.1 + + + + + com.alibaba + fastjson + + + + + org.apache.commons + commons-lang3 + + + + + cn.hutool + hutool-all + 5.4.0 + + + + + org.projectlombok + lombok + + + + + cn.dynamictp + dynamic-tp-spring-boot-starter-common + 1.1.0 + + + org.apache.httpcomponents.client5 + httpclient5 + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java new file mode 100644 index 00000000..73ee126d --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/KafkaConsumerApplication.java @@ -0,0 +1,65 @@ +package org.dromara.kafka.consumer; + +import org.dromara.kafka.consumer.config.KafkaConsumerConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.core.env.Environment; + +/** + * Kafka消费者服务启动类 + * 消费SCRAM认证的Kafka消息,并通过HTTP批量发送给生产者服务 + * + * 启动命令格式: + * java -jar stwzhj-kafka-consumer.jar + * 例如: + * java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410 + */ +@SpringBootApplication +public class KafkaConsumerApplication { + + public static void main(String[] args) { + if (args.length < 6) { + System.err.println("启动参数不足,请使用以下格式:"); + System.err.println("java -jar stwzhj-kafka-consumer.jar "); + System.err.println("例如:"); + System.err.println("java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410"); + System.exit(1); + } + + // 解析启动参数 + String port = args[0]; + String cityName = args[1]; + String bootstrapServers = args[2]; + String topics = args[3]; + String groupId = args[4]; + String cityCode = args[5]; + + // 从环境变量获取密码 + String password = System.getenv("KAFKA_PASSWORD"); + if (password == null || password.isEmpty()) { + System.err.println("错误:未设置KAFKA_PASSWORD环境变量"); + System.exit(1); + } + + // 从环境变量获取用户名 + String username = System.getenv("KAFKA_USERNAME"); + if (username == null || username.isEmpty()) { + System.err.println("错误:未设置KAFKA_USERNAME环境变量"); + System.exit(1); + } + + // 设置系统属性,供Spring Boot读取 + System.setProperty("server.port", port); + System.setProperty("kafka.consumer.port", port); + System.setProperty("kafka.consumer.cityName", cityName); + System.setProperty("kafka.consumer.bootstrapServers", bootstrapServers); + System.setProperty("kafka.consumer.topics", topics); + System.setProperty("kafka.consumer.groupId", groupId); + System.setProperty("kafka.consumer.cityCode", cityCode); + System.setProperty("kafka.consumer.username", username); + System.setProperty("kafka.consumer.password", password); + + SpringApplication.run(KafkaConsumerApplication.class, args); + } +} diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/HttpClientConfig.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/HttpClientConfig.java new file mode 100644 index 00000000..4a5e772f --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/HttpClientConfig.java @@ -0,0 +1,54 @@ +package org.dromara.kafka.consumer.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * HTTP客户端配置 + */ +@Data +@Configuration +@ConfigurationProperties(prefix = "http.client") +public class HttpClientConfig { + + /** + * 生产者服务地址 + */ + private String producerUrl = "http://localhost:9214"; + + /** + * 连接超时时间(ms) + */ + private Integer connectTimeout = 5000; + + /** + * 读取超时时间(ms) + */ + private Integer readTimeout = 10000; + + /** + * 最大连接数 + */ + private Integer maxConnections = 200; + + /** + * 每个路由的最大连接数 + */ + private Integer maxConnectionsPerRoute = 50; + + /** + * 批量发送大小 + */ + private Integer batchSize = 100; + + /** + * 批量发送超时时间(ms) + */ + private Integer batchTimeout = 100; + + /** + * 是否启用压缩 + */ + private Boolean enableCompression = true; +} diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConsumerConfig.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConsumerConfig.java new file mode 100644 index 00000000..cd1d989c --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/KafkaConsumerConfig.java @@ -0,0 +1,104 @@ +package org.dromara.kafka.consumer.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * Kafka消费者配置 + */ +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka.consumer") +public class KafkaConsumerConfig { + + /** + * Kafka服务器地址 + */ + private String bootstrapServers; + + /** + * 消费者组ID + */ + private String groupId; + + /** + * 消费者组ID(从启动命令获取) + */ + private String groupName; + + /** + * 订阅的主题,多个主题用逗号分隔 + */ + private String topics; + + /** + * 安全协议:SASL_PLAINTEXT 或 PLAINTEXT + */ + private String securityProtocol = "SASL_PLAINTEXT"; + + /** + * SASL机制:SCRAM-SHA-256 或 PLAIN + */ + private String saslMechanism = "SCRAM-SHA-256"; + + /** + * SASL JAAS配置 + */ + private String saslJaasConfig; + + /** + * 是否自动提交offset + */ + private Boolean enableAutoCommit = true; + + /** + * 自动提交offset的时间间隔(ms) + */ + private Integer autoCommitIntervalMs = 1000; + + /** + * 会话超时时间(ms) + */ + private Integer sessionTimeoutMs = 30000; + + /** + * 一次poll的最大等待时间(ms) + */ + private Integer pollTimeoutMs = 10000; + + /** + * 最大poll记录数 + */ + private Integer maxPollRecords = 500; + + /** + * 城市代码 + */ + private String cityCode = "3408"; + + /** + * 消息队列容量 + */ + private Integer queueCapacity = 100000; + + /** + * 用户名(从启动命令获取) + */ + private String username; + + /** + * 密码(从启动命令获取) + */ + private String password; + + /** + * 城市名称(从启动命令获取) + */ + private String cityName; + + /** + * 服务端口(从启动命令获取) + */ + private Integer port = 9213; +} diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/MemoryQueueConfig.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/MemoryQueueConfig.java new file mode 100644 index 00000000..6bdd6c26 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/config/MemoryQueueConfig.java @@ -0,0 +1,22 @@ +package org.dromara.kafka.consumer.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** + * 内存队列配置 + */ +@Configuration +public class MemoryQueueConfig { + + /** + * 创建内存队列,用于存储从Kafka消费的消息 + */ + @Bean + public BlockingQueue messageQueue(KafkaConsumerConfig kafkaConsumerConfig) { + return new ArrayBlockingQueue<>(kafkaConsumerConfig.getQueueCapacity()); + } +} diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/domain/EsGpsInfo.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/domain/EsGpsInfo.java new file mode 100644 index 00000000..ca5c471b --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/domain/EsGpsInfo.java @@ -0,0 +1,50 @@ +package org.dromara.kafka.consumer.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + *

description:

+ * gps定位信息(es表) + */ +@Data +public class EsGpsInfo implements Serializable { + + private static final long serialVersionUID = 7455495841680488351L; + /** + * 唯一码(外部系统)合肥版本不需要 21位id, + * 到时候上传省厅的时候 需要在kafka发送端处理,生成一个省厅需要的21位id + */ + private String deviceCode; + /** + * 类型 + */ + private String deviceType; + private String lat; + private String lng; + //方向 + private String orientation; + //高程 + private String height; + //精度 + private String deltaH; + private String speed; + + private String zzjgdm; + private String zzjgmc; + private String policeNo; + private String policeName; + private String phoneNum; + private String carNum; + + private Integer online; + + @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + private Date gpsTime; + //3401,3402等地市代码 + private String infoSource; + +} diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/BatchMessageProcessor.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/BatchMessageProcessor.java new file mode 100644 index 00000000..a046fa8d --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/BatchMessageProcessor.java @@ -0,0 +1,150 @@ +package org.dromara.kafka.consumer.service; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.dromara.kafka.consumer.config.HttpClientConfig; +import org.dromara.kafka.consumer.util.HttpClientUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * 批量消息处理器 + * 将从Kafka消费的消息批量发送到生产者服务 + */ +@Component +public class BatchMessageProcessor { + + private static final Logger logger = LoggerFactory.getLogger(BatchMessageProcessor.class); + + @Autowired + private HttpClientConfig httpClientConfig; + + @Autowired + private HttpClientUtil httpClientUtil; + + private BlockingQueue messageQueue; + + private ExecutorService executorService; + + private final AtomicBoolean running = new AtomicBoolean(false); + + @PostConstruct + public void init() { + logger.info("初始化批量消息处理器..."); + + // 创建内存队列 + messageQueue = new LinkedBlockingQueue<>(500); + + // 启动处理线程 + executorService = Executors.newFixedThreadPool(3); // 使用3个线程处理消息 + running.set(true); + + for (int i = 0; i < 3; i++) { + executorService.submit(this::processMessages); + } + + logger.info("批量消息处理器启动成功"); + } + + @PreDestroy + public void destroy() { + logger.info("关闭批量消息处理器..."); + running.set(false); + if (executorService != null) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + logger.info("批量消息处理器已关闭"); + } + + /** + * 添加消息到队列 + * + * @param message 消息内容 + * @return 是否添加成功 + */ + public boolean addMessage(String message) { + try { + return messageQueue.offer(message); + } catch (Exception e) { + logger.error("添加消息到队列异常: {}", e.getMessage(), e); + return false; + } + } + + /** + * 处理消息 + */ + private void processMessages() { + logger.info("启动消息处理线程..."); + List batch = new ArrayList<>(httpClientConfig.getBatchSize()); + long lastSendTime = System.currentTimeMillis(); + + while (running.get()) { + try { + // 从队列中获取消息 + String message = messageQueue.poll(100, TimeUnit.MILLISECONDS); + + if (message != null) { + batch.add(message); + } + + // 检查是否需要发送批次 + long currentTime = System.currentTimeMillis(); + boolean shouldSend = batch.size() >= httpClientConfig.getBatchSize() || + (batch.size() > 0 && (currentTime - lastSendTime) >= httpClientConfig.getBatchTimeout()); + + if (shouldSend) { + if (!batch.isEmpty()) { + logger.debug("准备发送批量消息,消息数: {}", batch.size()); + boolean success = httpClientUtil.sendBatchMessages(batch); + + if (success) { + logger.debug("批量发送消息成功,消息数: {}", batch.size()); + } else { + logger.warn("批量发送消息失败,消息数: {}", batch.size()); + // 发送失败,将消息重新放回队列 + for (String msg : batch) { + messageQueue.offer(msg); + } + } + + batch.clear(); + lastSendTime = currentTime; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + logger.error("处理消息异常: {}", e.getMessage(), e); + } + } + + // 发送剩余消息 + if (!batch.isEmpty()) { + logger.info("发送剩余消息,消息数: {}", batch.size()); + httpClientUtil.sendBatchMessages(batch); + } + + logger.info("消息处理线程结束"); + } +} diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java new file mode 100644 index 00000000..4a4e765c --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/service/KafkaConsumerService.java @@ -0,0 +1,163 @@ +package org.dromara.kafka.consumer.service; + +import cn.hutool.core.convert.ConvertException; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.dromara.kafka.consumer.config.KafkaConsumerConfig; +import org.dromara.kafka.consumer.domain.EsGpsInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Kafka消费者服务 + * 消费SCRAM认证的Kafka消息,并通过内存队列转发给生产者服务 + */ +@Service +public class KafkaConsumerService { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class); + + @Autowired + private KafkaConsumerConfig kafkaConsumerConfig; + + @Autowired + private BatchMessageProcessor batchMessageProcessor; + + private KafkaConsumer consumer; + + private ExecutorService executorService; + + private volatile boolean running = false; + + @PostConstruct + public void init() { + logger.info("初始化Kafka消费者服务..."); + Properties props = createConsumerProperties(); + consumer = new KafkaConsumer<>(props); + + // 订阅主题 + String[] topics = kafkaConsumerConfig.getTopics().split(","); + consumer.subscribe(Arrays.asList(topics)); + logger.info("订阅Kafka主题成功: {}", Arrays.toString(topics)); + + // 启动消费者线程 + executorService = Executors.newSingleThreadExecutor(); + running = true; + executorService.submit(this::consumeMessages); + + logger.info("Kafka消费者服务启动成功"); + } + + @PreDestroy + public void destroy() { + logger.info("关闭Kafka消费者服务..."); + running = false; + if (executorService != null) { + executorService.shutdown(); + } + if (consumer != null) { + consumer.close(); + } + logger.info("Kafka消费者服务已关闭"); + } + + /** + * 创建消费者配置属性 + */ + private Properties createConsumerProperties() { + Properties props = new Properties(); + + // 基本配置 + props.put("bootstrap.servers", kafkaConsumerConfig.getBootstrapServers()); + // 使用从启动命令获取的消费者组ID + props.put("group.id", kafkaConsumerConfig.getGroupId()); + props.put("enable.auto.commit", kafkaConsumerConfig.getEnableAutoCommit()); + props.put("auto.commit.interval.ms", kafkaConsumerConfig.getAutoCommitIntervalMs()); + props.put("session.timeout.ms", kafkaConsumerConfig.getSessionTimeoutMs()); + props.put("max.poll.records", kafkaConsumerConfig.getMaxPollRecords()); + + // 序列化配置 + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + // 安全配置 + props.put("security.protocol", kafkaConsumerConfig.getSecurityProtocol()); + props.put("sasl.mechanism", kafkaConsumerConfig.getSaslMechanism()); + + // 构建SASL JAAS配置 + String saslJaasConfig = String.format( + "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", + kafkaConsumerConfig.getUsername(), + kafkaConsumerConfig.getPassword() + ); + props.put("sasl.jaas.config", saslJaasConfig); + props.put("sasl.jaas.config", saslJaasConfig); + + return props; + } + + /** + * 消费消息 + */ + private void consumeMessages() { + logger.info("开始消费Kafka消息..."); + while (running) { + try { + ConsumerRecords records = consumer.poll(Duration.ofMillis(kafkaConsumerConfig.getPollTimeoutMs())); + logger.info("Poll到消息数: {}", records.count()); + + for (ConsumerRecord record : records) { + try { + // 将消息添加到批量处理器 + EsGpsInfo esGpsInfo; + JSONObject jsonObject; + try { + jsonObject = JSONUtil.parseObj(((String) record.value())); + }catch (ConvertException e){ + logger.info("jsonObject=null:error={}",e.getMessage()); + return; + } + try { + esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class); + }catch (ConvertException e){ + logger.info("EsGpsInfo=null:error={}",e.getMessage()); + return; + } + esGpsInfo.setInfoSource(kafkaConsumerConfig.getCityCode()); + boolean added = batchMessageProcessor.addMessage(esGpsInfo.toString()); + if (!added) { + logger.warn("消息队列已满,丢弃消息: topic={}, partition={}, offset={}", + record.topic(), record.partition(), record.offset()); + } + } catch (Exception e) { + logger.error("处理消息异常: topic={}, partition={}, offset={}, error={}", + record.topic(), record.partition(), record.offset(), e.getMessage(), e); + } + } + } catch (Exception e) { + logger.error("消费Kafka消息异常: {}", e.getMessage(), e); + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + logger.info("停止消费Kafka消息"); + } +} diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/util/HttpClientUtil.java b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/util/HttpClientUtil.java new file mode 100644 index 00000000..e09b67c1 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/java/org/dromara/kafka/consumer/util/HttpClientUtil.java @@ -0,0 +1,117 @@ +package org.dromara.kafka.consumer.util; + +import com.alibaba.fastjson.JSON; +import jakarta.annotation.PostConstruct; +import org.apache.hc.client5.http.classic.HttpClient; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.core5.http.io.SocketConfig; +import org.apache.hc.core5.util.Timeout; +import org.dromara.kafka.consumer.config.HttpClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; +import org.springframework.http.*; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * HTTP客户端工具类 + * 用于发送批量消息到生产者服务 + */ +@Component +public class HttpClientUtil { + + private static final Logger logger = LoggerFactory.getLogger(HttpClientUtil.class); + + @Autowired + private HttpClientConfig httpClientConfig; + + private RestTemplate restTemplate; + + @PostConstruct + public void init() { + try { + logger.info("初始化HTTP客户端..."); + + // 创建连接管理器(HttpClient 5.x) + PoolingHttpClientConnectionManager connectionManager = + new PoolingHttpClientConnectionManager(); + + // 设置最大连接数 + connectionManager.setMaxTotal(httpClientConfig.getMaxConnections()); + // 设置每个路由的最大连接数 + connectionManager.setDefaultMaxPerRoute(httpClientConfig.getMaxConnectionsPerRoute()); + + // 配置连接超时 + ConnectionConfig connectionConfig = ConnectionConfig.custom() + .setConnectTimeout(Timeout.of(httpClientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS)) + .build(); + connectionManager.setDefaultConnectionConfig(connectionConfig); + + // 配置 Socket 超时(读写超时) + SocketConfig socketConfig = SocketConfig.custom() + .setSoTimeout(Timeout.of(httpClientConfig.getReadTimeout(), TimeUnit.MILLISECONDS)) + .build(); + connectionManager.setDefaultSocketConfig(socketConfig); + + // 创建 HttpClient + HttpClient httpClient = HttpClients.custom() + .setConnectionManager(connectionManager) + .build(); + + // 创建 HttpComponentsClientHttpRequestFactory + HttpComponentsClientHttpRequestFactory factory = + new HttpComponentsClientHttpRequestFactory(httpClient); + + // 设置请求超时 + factory.setConnectTimeout(httpClientConfig.getConnectTimeout()); + + restTemplate = new RestTemplate(factory); + + logger.info("HTTP客户端初始化成功,连接池配置:最大连接数={}, 每路由最大连接数={}", + httpClientConfig.getMaxConnections(), + httpClientConfig.getMaxConnectionsPerRoute()); + + } catch (Exception e) { + logger.error("HTTP客户端初始化失败", e); + throw new RuntimeException("HTTP客户端初始化失败", e); + } + } + + /** + * 批量发送消息到生产者服务 + * + * @param messages 消息列表 + * @return 是否发送成功 + */ + public boolean sendBatchMessages(List messages) { + try { + String url = httpClientConfig.getProducerUrl() + "/api/kafka/batch"; + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + if (httpClientConfig.getEnableCompression()) { + headers.add("Content-Encoding", "gzip"); + } + + HttpEntity> request = new HttpEntity<>(messages, headers); + ResponseEntity response = restTemplate.exchange(url, HttpMethod.POST, request, String.class); + + if (response.getStatusCode() == HttpStatus.OK) { + logger.debug("批量发送消息成功,消息数: {}", messages.size()); + return true; + } else { + logger.warn("批量发送消息失败,状态码: {}, 响应: {}", response.getStatusCode(), response.getBody()); + return false; + } + } catch (Exception e) { + logger.error("批量发送消息异常,消息数: {}, 错误: {}", messages.size(), e.getMessage(), e); + return false; + } + } +} diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/application.yml b/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/application.yml new file mode 100644 index 00000000..45aba0a3 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/application.yml @@ -0,0 +1,72 @@ +# Tomcat +server: + port: ${kafka.consumer.port:9213} + +# Spring +spring: + application: + # 应用名称 + name: stwzhj-kafka-consumer + profiles: + # 环境配置 + active: @profiles.active@ + +# Kafka消费者配置 +kafka: + consumer: + # Kafka服务器地址(通过启动命令传入) + bootstrap-servers: ${kafka.consumer.bootstrapServers:} + # 消费者组ID(通过启动命令传入) + group-id: ${kafka.consumer.groupId:} + # 订阅的主题,多个主题用逗号分隔(通过启动命令传入) + topics: ${kafka.consumer.topics:} + # 安全协议 + security-protocol: SASL_PLAINTEXT + # SASL机制 + sasl-mechanism: SCRAM-SHA-256 + # 是否自动提交offset + enable-auto-commit: true + # 自动提交offset的时间间隔(ms) + auto-commit-interval-ms: 1000 + # 会话超时时间(ms) + session-timeout-ms: 30000 + # 一次poll的最大等待时间(ms) + poll-timeout-ms: 10000 + # 最大poll记录数 + max-poll-records: 500 + # 城市代码(通过启动命令传入) + city-code: ${kafka.consumer.cityCode:3408} + # 用户名(通过启动命令传入) + username: ${kafka.consumer.username:} + # 密码(通过启动命令传入) + password: ${kafka.consumer.password:} + # 消息队列容量 + queue-capacity: 100000 + +# HTTP客户端配置 +http: + client: + # 生产者服务地址 + producer-url: http://localhost:9214 + # 连接超时时间(ms) + connect-timeout: 5000 + # 读取超时时间(ms) + read-timeout: 10000 + # 最大连接数 + max-connections: 200 + # 每个路由的最大连接数 + max-connections-per-route: 50 + # 批量发送大小 + batch-size: 100 + # 批量发送超时时间(ms) + batch-timeout: 100 + # 是否启用压缩 + enable-compression: true + +# 日志配置 +logging: + level: + org.springframework: warn + org.apache.kafka: INFO + org.dromara.kafka.consumer: INFO + config: classpath:logback-plus.xml diff --git a/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/logback-plus.xml new file mode 100644 index 00000000..c2ec3dad --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-consumer/src/main/resources/logback-plus.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + ${log.path}/info.${log.file}.log + + INFO + ACCEPT + DENY + + + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + diff --git a/stwzhj-modules/stwzhj-kafka-producer/README.md b/stwzhj-modules/stwzhj-kafka-producer/README.md new file mode 100644 index 00000000..d43d118c --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/README.md @@ -0,0 +1,51 @@ +# stwzhj-kafka-producer 服务说明 + +## 功能描述 + +接收来自消费者服务的HTTP请求,并将消息发送到华为认证的Kafka。 + +## 配置说明 + +本服务的华为Kafka认证配置从服务器上的以下文件读取: +- `/shengting/gpsstore/producer.properties`: Kafka生产者配置 +- `/shengting/gpsstore/krb5.conf`: Kerberos配置文件 +- `/shengting/gpsstore/user.keytab`: Kerberos认证keytab文件 +- `/shengting/gpsstore/kafkaSecurityMode`: 安全模式开关 + +这些配置文件与原服务保持一致,确保华为Kafka认证完全兼容。 + +## 启动示例 + +```bash +java -jar stwzhj-kafka-producer.jar +``` + +## 高吞吐量优化 + +1. **异步发送**: 支持异步发送消息,提高并发处理能力 +2. **批量发送**: 配合Kafka的批量发送机制,提高吞吐量 +3. **连接复用**: HTTP连接复用,减少连接建立开销 + +## 注意事项 + +1. 确保服务器上的华为Kafka认证配置文件存在且配置正确 +2. 服务启动时会自动读取服务器上的配置文件,无需手动配置 +3. 监控Kafka发送成功率,及时发现和处理异常 + +## API接口 + +### 批量接收消息 + +**URL**: `/api/kafka/batch` + +**方法**: POST + +**请求体**: JSON数组,包含多条消息 + +```json +["message1", "message2", "message3"] +``` + +**响应**: +- 成功: 200 OK, "发送成功" +- 失败: 500 Internal Server Error, "发送失败" 或 "处理异常: xxx" diff --git a/stwzhj-modules/stwzhj-kafka-producer/pom.xml b/stwzhj-modules/stwzhj-kafka-producer/pom.xml new file mode 100644 index 00000000..acc2dd75 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/pom.xml @@ -0,0 +1,118 @@ + + + + org.dromara + stwzhj-modules + ${revision} + + 4.0.0 + + stwzhj-kafka-producer + + + stwzhj-kafka-producer 从内存队列获取消息,并发送到华为认证的Kafka + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.apache.kafka + kafka_2.12 + 3.6.1-h0.cbu.mrs.350.r11 + + + org.apache.zookeeper + zookeeper + + + net.sf.jopt-simple + jopt-simple + + + com.huawei.mrs + manager-wc2frm + + + org.apache.kafka + kafka-clients + + + org.xerial.snappy + snappy-java + + + com.huawei.mrs + om-controller-api + + + com.101tec + zkclient + + + + + + + org.apache.kafka + kafka-clients + 3.6.1-h0.cbu.mrs.350.r11 + + + + + com.alibaba + fastjson + + + + + org.apache.commons + commons-lang3 + + + + + cn.hutool + hutool-all + 5.4.0 + + + + + org.projectlombok + lombok + + + + + cn.dynamictp + dynamic-tp-spring-boot-starter-common + 1.1.0 + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/KafkaProducerApplication.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/KafkaProducerApplication.java new file mode 100644 index 00000000..32785981 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/KafkaProducerApplication.java @@ -0,0 +1,15 @@ +package org.dromara.kafka.producer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Kafka生产者服务启动类 + * 从内存队列获取消息,并发送到华为认证的Kafka + */ +@SpringBootApplication +public class KafkaProducerApplication { + public static void main(String[] args) { + SpringApplication.run(KafkaProducerApplication.class, args); + } +} diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/controller/KafkaProducerController.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/controller/KafkaProducerController.java new file mode 100644 index 00000000..7673bdc0 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/controller/KafkaProducerController.java @@ -0,0 +1,54 @@ +package org.dromara.kafka.producer.controller; + +import org.dromara.kafka.producer.service.KafkaProducerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * Kafka生产者控制器 + * 接收来自消费者服务的HTTP请求,并将消息发送到华为Kafka + */ +@RestController +@RequestMapping("/api/kafka") +public class KafkaProducerController { + + private static final Logger logger = LoggerFactory.getLogger(KafkaProducerController.class); + + @Autowired + private KafkaProducerService kafkaProducerService; + + /** + * 批量接收消息并发送到Kafka + * + * @param messages 消息列表 + * @return 处理结果 + */ + @PostMapping("/batch") + public ResponseEntity batchSend(@RequestBody List messages) { + try { + logger.info("接收到批量消息,消息数: {}", messages.size()); + + // 批量发送消息到Kafka + boolean success = kafkaProducerService.sendBatchMessages(messages); + + if (success) { + logger.info("批量发送消息成功,消息数: {}", messages.size()); + return ResponseEntity.ok("发送成功"); + } else { + logger.warn("批量发送消息失败,消息数: {}", messages.size()); + return ResponseEntity.status(500).body("发送失败"); + } + } catch (Exception e) { + logger.error("处理批量消息异常,消息数: {}, 错误: {}", messages.size(), e.getMessage(), e); + return ResponseEntity.status(500).body("处理异常: " + e.getMessage()); + } + } +} diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/producer/Producer.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/producer/Producer.java new file mode 100644 index 00000000..0fca9553 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/producer/Producer.java @@ -0,0 +1,212 @@ +package org.dromara.kafka.producer.producer; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.dromara.kafka.producer.util.KafkaProperties; +import org.dromara.kafka.producer.util.LoginUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.Future; + +/** + *

description:

+ * + * @author chenle + * @date 2021-11-03 14:15 + */ +@Component +public class Producer { + + private static final Logger logger = LoggerFactory.getLogger(Producer.class); + + private final KafkaProducer producer; + + // 私有静态实例(volatile 保证可见性和有序性) + private static volatile Producer instance; + + + + private final Boolean isAsync = true; + + + + // Broker地址列表 + private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; + + // 客户端ID + private final static String CLIENT_ID = "client.id"; + + // Key序列化类 + private final static String KEY_SERIALIZER = "key.serializer"; + + // Value序列化类 + private final static String VALUE_SERIALIZER = "value.serializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final static String SECURITY_PROTOCOL = "security.protocol"; + + // 服务名 + private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; + + // 域名 + private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; + + // 分区类名 + private final static String PARTITIONER_NAME = "partitioner.class"; + + // 默认发送100条消息 + private final static int MESSAGE_NUM = 100; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "yhy_ahrs_rcw"; + + + + /** + * Producer constructor + * + */ + public Producer() { + initSecurity(); + Properties props = initProperties(); + this.producer = new KafkaProducer<>(props); + } + + // 获取单例实例的公共方法(双重校验锁) + public static Producer getInstance() { + if (instance == null) { + synchronized (Producer.class) { + if (instance == null) { + instance = new Producer(); + } + } + } + return instance; + } + + // 添加 ShutdownHook 确保资源释放(推荐) + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (instance != null && instance.producer != null) { + instance.producer.close(); + } + })); + } + + + /** + * 初始化安全认证 + */ + public void initSecurity() { + if (LoginUtil.isSecurityModel()) + { + try { + logger.info("Securitymode start."); + + // !!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); + } catch (IOException e) { + logger.error("Security prepare failure."); + logger.error("The IOException occured.", e); + } + logger.info("Security prepare success."); + } + } + + public static Properties initProperties() { + Properties props = new Properties(); + KafkaProperties kafkaProc = KafkaProperties.getInstance(); + + // Broker地址列表 + props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); + // 客户端ID + props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); + // Key序列化类 + props.put(KEY_SERIALIZER, + kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // Value序列化类 + props.put(VALUE_SERIALIZER, + kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); + // 服务名 + props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); + // 域名 + props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); + // 分区类名 +// props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner")); + return props; + } + + /** + * 发送消息(核心方法) + * + * @param topic + * @param message 消息内容 + * @return 同步发送时返回 RecordMetadata,异步发送返回 null + */ + public RecordMetadata sendMessage(String topic, String message) { + try { + logger.info("调用发送:topic={}, Object={}",topic,message ); + long startTime = System.currentTimeMillis(); + ProducerRecord record = new ProducerRecord<>(topic, message); + if (isAsync) { + // 异步发送 + producer.send(record, new DemoCallBack(startTime,topic, message)); + return null; + } else { + Future future = producer.send(record); + logger.info("同步发送成功: Object={}", future.get().topic()); + return future.get(); + + } + }catch (Exception e){ + e.printStackTrace(); + } + return null; + } + + + + /** + * 内部回调类 + */ + private static class DemoCallBack implements Callback { + private final Logger logger = LoggerFactory.getLogger(DemoCallBack.class); + private final long startTime; + + private final String topic; + private final String message; + + public DemoCallBack(long startTime, String topic, String message) { + this.startTime = startTime; + this.topic = topic; + this.message = message; + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + long elapsedTime = System.currentTimeMillis() - startTime; + if (metadata != null) { + logger.info("topic=({}, {}) sent to partition({}), offset({}) in {} ms", + topic, message, metadata.partition(), metadata.offset(), elapsedTime); + } else if (exception != null) { + logger.error("Message sending failed", exception); + } + } + } + +} diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/service/KafkaProducerService.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/service/KafkaProducerService.java new file mode 100644 index 00000000..0440463b --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/service/KafkaProducerService.java @@ -0,0 +1,94 @@ +package org.dromara.kafka.producer.service; + +import jakarta.annotation.PostConstruct; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.dromara.kafka.producer.producer.Producer; +import org.dromara.kafka.producer.util.KafkaProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import java.util.List; + + +/** + * Kafka生产者服务 + * 将消息发送到华为认证的Kafka + */ +@Service +public class KafkaProducerService { + + private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class); + private Producer producer; + + + + @PostConstruct + public void init() { + logger.info("初始化Kafka生产者服务..."); + producer = Producer.getInstance(); + logger.info("Kafka生产者服务初始化成功"); + } + + /** + * 批量发送消息到Kafka + * + * @param messages 消息列表 + * @return 是否全部发送成功 + */ + /** + * 批量发送消息到Kafka + * + * @param messages 消息列表 + * @return 是否全部发送成功 + */ + public boolean sendBatchMessages(List messages) { + if (messages == null || messages.isEmpty()) { + logger.warn("消息列表为空,不发送"); + return true; + } + + try { + String topic = KafkaProperties.TOPIC; + boolean allSuccess = true; + int successCount = 0; + int failCount = 0; + + for (String message : messages) { + try { + // 使用Producer单例发送消息(异步发送) + RecordMetadata metadata = producer.sendMessage(topic, message); + // 异步发送时metadata为null,表示发送请求已提交 + if (metadata == null) { + successCount++; + } else { + // 同步发送的情况(但当前配置是异步) + successCount++; + logger.info("同步发送成功: topic={}, offset={}", metadata.topic(), metadata.offset()); + } + } catch (Exception e) { + logger.error("发送消息异常: {}", e.getMessage(), e); + allSuccess = false; + failCount++; + } + } + + logger.info("批量发送消息完成,总数: {}, 成功: {}, 失败: {}", messages.size(), successCount, failCount); + + // 异步发送需要等待一段时间确保消息发送完成 + if (successCount > 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("等待消息发送完成时被中断"); + } + } + + return allSuccess; + } catch (Exception e) { + logger.error("批量发送消息异常: {}", e.getMessage(), e); + return false; + } + } +} diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/KafkaProperties.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/KafkaProperties.java new file mode 100644 index 00000000..0439e1b8 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/KafkaProperties.java @@ -0,0 +1,138 @@ +package org.dromara.kafka.producer.util; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class KafkaProperties +{ + private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); + + // Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限 + public final static String TOPIC = "jysb_dwxx"; + + private static Properties serverProps = new Properties(); + + private static Properties producerProps = new Properties(); + + private static Properties consumerProps = new Properties(); + + private static Properties clientProps = new Properties(); + + private static KafkaProperties instance = null; + + private KafkaProperties() + { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/home/rsoft/config/"; + LOG.info("路径=={}",filePath); + try + { + File proFile = new File(filePath + "producer.properties"); + + if (proFile.exists()) + { + producerProps.load(new FileInputStream(filePath + "producer.properties")); + } + + File conFile = new File(filePath + "producer.properties"); + + if (conFile.exists()) + { + consumerProps.load(new FileInputStream(filePath + "consumer.properties")); + } + + File serFile = new File(filePath + "server.properties"); + + if (serFile.exists()) + { + serverProps.load(new FileInputStream(filePath + "server.properties")); + } + + File cliFile = new File(filePath + "client.properties"); + + if (cliFile.exists()) + { + clientProps.load(new FileInputStream(filePath + "client.properties")); + } + } + catch (IOException e) + { + LOG.info("The Exception occured.", e); + } + } + + public synchronized static KafkaProperties getInstance() + { + if (null == instance) + { + instance = new KafkaProperties(); + } + + return instance; + } + + /** + * 获取参数值 + * @param key properites的key值 + * @param defValue 默认值 + * @return + */ + public String getValues(String key, String defValue) + { + String rtValue = null; + + if (null == key) + { + LOG.error("key is null"); + } + else + { + rtValue = getPropertiesValue(key); + } + + if (null == rtValue) + { + LOG.warn("KafkaProperties.getValues return null, key is " + key); + rtValue = defValue; + } + + LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue); + + return rtValue; + } + + /** + * 根据key值获取server.properties的值 + * @param key + * @return + */ + private String getPropertiesValue(String key) + { + String rtValue = serverProps.getProperty(key); + + // server.properties中没有,则再向producer.properties中获取 + if (null == rtValue) + { + rtValue = producerProps.getProperty(key); + } + + // producer中没有,则再向consumer.properties中获取 + if (null == rtValue) + { + rtValue = consumerProps.getProperty(key); + } + + // consumer没有,则再向client.properties中获取 + if (null == rtValue) + { + rtValue = clientProps.getProperty(key); + } + + return rtValue; + } +} diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/LoginUtil.java b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/LoginUtil.java new file mode 100644 index 00000000..6bfb825e --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/java/org/dromara/kafka/producer/util/LoginUtil.java @@ -0,0 +1,259 @@ +package org.dromara.kafka.producer.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) + { + this.name = name; + } + + public String getName() + { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { + String jaasPath = + new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 + deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) + { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) + { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException + * 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try + { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } + catch (IOException e) + { + throw new IOException("Failed to create jaas.conf File"); + } + finally + { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) + { + if (!jaasFile.delete()) + { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) + { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/home/rsoft/config/"; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; +// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; + String krbFilePath = "/home/rsoft/config/kafkaSecurityMode"; + Properties securityProps = new Properties(); + + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) + { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return isSecurity; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/application.yml b/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/application.yml new file mode 100644 index 00000000..445be730 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/application.yml @@ -0,0 +1,20 @@ +# Tomcat +server: + port: 9214 + +# Spring +spring: + application: + # 应用名称 + name: stwzhj-kafka-producer + profiles: + # 环境配置 + active: @profiles.active@ + +# 日志配置 +logging: + level: + org.springframework: warn + org.apache.kafka: INFO + org.dromara.kafka.producer: INFO + config: classpath:logback-plus.xml diff --git a/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/logback-plus.xml new file mode 100644 index 00000000..5a49af14 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafka-producer/src/main/resources/logback-plus.xml @@ -0,0 +1,59 @@ + + + + + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + + + + + + ${log.path}/info.log + + ${log.path}/info-%d{yyyy-MM-dd}.%i.log + ${log.maxHistory} + ${log.maxSize} + ${log.totalSizeCap} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + + + INFO + ACCEPT + DENY + + + + + + ${log.path}/error.log + + ${log.path}/error-%d{yyyy-MM-dd}.%i.log + ${log.maxHistory} + ${log.maxSize} + ${log.totalSizeCap} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + + + ERROR + ACCEPT + DENY + + + + + + + + + +