新增消费scram认证kafka和发送华为认证kafka服务

stwzhj
luyya 2026-03-24 15:49:51 +08:00
parent fd95d19e9c
commit c1152bf7d8
24 changed files with 2010 additions and 6 deletions

View File

@ -19,6 +19,8 @@
<module>stwzhj-data2StKafka</module>
<module>stwzhj-extract</module>
<module>stwzhj-data2gas</module>
<module>stwzhj-kafka-consumer</module>
<module>stwzhj-kafka-producer</module>
</modules>
<artifactId>stwzhj-modules</artifactId>

View File

@ -96,12 +96,12 @@ public class RealConsumer implements CommandLineRunner {
}
logger.info("Security prepare success.");
}
// kafkaProp.put("socket.connection.setup.timeout.ms", "60000");
//
// kafkaProp.put("security.protocol", "SASL_PLAINTEXT");
// kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";");
// kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");
// kafkaProp.put("metadata.max.age.ms", Long.MAX_VALUE); // 彻底禁用元数据更新
kafkaProp.put("socket.connection.setup.timeout.ms", "60000");
kafkaProp.put("security.protocol", "SASL_PLAINTEXT");
kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";");
kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");
kafkaProp.put("metadata.max.age.ms", Long.MAX_VALUE); // 彻底禁用元数据更新
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
executorService.execute(runnable);
}

View File

@ -0,0 +1,50 @@
# stwzhj-kafka-consumer 服务说明
## 功能描述
消费SCRAM认证的Kafka消息并通过HTTP批量发送给生产者服务。
## 启动命令格式
```bash
java -jar stwzhj-kafka-consumer.jar <port> <cityName> <bootstrapServers> <topics> <username> <cityCode>
```
### 参数说明
- `port`: 服务端口
- `cityName`: 城市名称
- `bootstrapServers`: Kafka服务器地址多个地址用逗号分隔
- `topics`: 订阅的主题,多个主题用逗号分隔
- `groupId`: 消费者组ID
- `cityCode`: 城市代码
### 环境变量
- `KAFKA_USERNAME`: Kafka用户名必填
- `KAFKA_PASSWORD`: Kafka密码必填
## 启动示例
```bash
# 设置环境变量
export KAFKA_USERNAME=your_username
export KAFKA_PASSWORD=your_password
# 启动服务
java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410
```
## 高吞吐量优化
1. **批量发送**: 将多条消息打包成一批发送减少HTTP请求次数
2. **连接池**: 使用Apache HttpClient连接池复用TCP连接
3. **压缩传输**: 支持Gzip压缩减少网络传输数据量
4. **多线程处理**: 使用3个线程处理消息队列
## 注意事项
1. 确保生产者服务已启动并正常运行
2. 根据实际负载情况调整批量发送大小和超时时间
3. 监控消息队列容量,避免消息丢失
4. Kafka密码需要在启动脚本中通过环境变量或配置文件传入

View File

@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-modules</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>stwzhj-kafka-consumer</artifactId>
<description>
stwzhj-kafka-consumer 消费SCRAM认证的Kafka消息并通过内存队列转发给生产者服务
</description>
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
<!-- FastJSON -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<!-- Apache Commons Lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- Hutool -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.0</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 动态线程池 -->
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-common</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,65 @@
package org.dromara.kafka.consumer;
import org.dromara.kafka.consumer.config.KafkaConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.Environment;
/**
* Kafka
* SCRAMKafkaHTTP
*
*
* java -jar stwzhj-kafka-consumer.jar <port> <cityName> <bootstrapServers> <topics> <username> <cityCode>
*
* java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410
*/
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
if (args.length < 6) {
System.err.println("启动参数不足,请使用以下格式:");
System.err.println("java -jar stwzhj-kafka-consumer.jar <port> <cityName> <bootstrapServers> <topics> <username> <cityCode>");
System.err.println("例如:");
System.err.println("java -jar stwzhj-kafka-consumer.jar 11000 huangshan 192.168.10.81:9092,192.168.10.82:9092,192.168.10.83:9092 topic.send.2,topic.send.3,topic.send.4,topic.send.5 hs_gp 3410");
System.exit(1);
}
// 解析启动参数
String port = args[0];
String cityName = args[1];
String bootstrapServers = args[2];
String topics = args[3];
String groupId = args[4];
String cityCode = args[5];
// 从环境变量获取密码
String password = System.getenv("KAFKA_PASSWORD");
if (password == null || password.isEmpty()) {
System.err.println("错误未设置KAFKA_PASSWORD环境变量");
System.exit(1);
}
// 从环境变量获取用户名
String username = System.getenv("KAFKA_USERNAME");
if (username == null || username.isEmpty()) {
System.err.println("错误未设置KAFKA_USERNAME环境变量");
System.exit(1);
}
// 设置系统属性供Spring Boot读取
System.setProperty("server.port", port);
System.setProperty("kafka.consumer.port", port);
System.setProperty("kafka.consumer.cityName", cityName);
System.setProperty("kafka.consumer.bootstrapServers", bootstrapServers);
System.setProperty("kafka.consumer.topics", topics);
System.setProperty("kafka.consumer.groupId", groupId);
System.setProperty("kafka.consumer.cityCode", cityCode);
System.setProperty("kafka.consumer.username", username);
System.setProperty("kafka.consumer.password", password);
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}

View File

@ -0,0 +1,54 @@
package org.dromara.kafka.consumer.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* HTTP
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "http.client")
public class HttpClientConfig {
/**
*
*/
private String producerUrl = "http://localhost:9214";
/**
* (ms)
*/
private Integer connectTimeout = 5000;
/**
* (ms)
*/
private Integer readTimeout = 10000;
/**
*
*/
private Integer maxConnections = 200;
/**
*
*/
private Integer maxConnectionsPerRoute = 50;
/**
*
*/
private Integer batchSize = 100;
/**
* (ms)
*/
private Integer batchTimeout = 100;
/**
*
*/
private Boolean enableCompression = true;
}

View File

@ -0,0 +1,104 @@
package org.dromara.kafka.consumer.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* Kafka
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "kafka.consumer")
public class KafkaConsumerConfig {
/**
* Kafka
*/
private String bootstrapServers;
/**
* ID
*/
private String groupId;
/**
* ID
*/
private String groupName;
/**
*
*/
private String topics;
/**
* SASL_PLAINTEXT PLAINTEXT
*/
private String securityProtocol = "SASL_PLAINTEXT";
/**
* SASLSCRAM-SHA-256 PLAIN
*/
private String saslMechanism = "SCRAM-SHA-256";
/**
* SASL JAAS
*/
private String saslJaasConfig;
/**
* offset
*/
private Boolean enableAutoCommit = true;
/**
* offset(ms)
*/
private Integer autoCommitIntervalMs = 1000;
/**
* (ms)
*/
private Integer sessionTimeoutMs = 30000;
/**
* poll(ms)
*/
private Integer pollTimeoutMs = 10000;
/**
* poll
*/
private Integer maxPollRecords = 500;
/**
*
*/
private String cityCode = "3408";
/**
*
*/
private Integer queueCapacity = 100000;
/**
*
*/
private String username;
/**
*
*/
private String password;
/**
*
*/
private String cityName;
/**
*
*/
private Integer port = 9213;
}

View File

@ -0,0 +1,22 @@
package org.dromara.kafka.consumer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
*
*/
@Configuration
public class MemoryQueueConfig {
/**
* Kafka
*/
@Bean
public BlockingQueue<String> messageQueue(KafkaConsumerConfig kafkaConsumerConfig) {
return new ArrayBlockingQueue<>(kafkaConsumerConfig.getQueueCapacity());
}
}

View File

@ -0,0 +1,50 @@
package org.dromara.kafka.consumer.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* <p>description: </p>
* gps(es)
*/
@Data
public class EsGpsInfo implements Serializable {
private static final long serialVersionUID = 7455495841680488351L;
/**
* 21id
* kafka21id
*/
private String deviceCode;
/**
*
*/
private String deviceType;
private String lat;
private String lng;
//方向
private String orientation;
//高程
private String height;
//精度
private String deltaH;
private String speed;
private String zzjgdm;
private String zzjgmc;
private String policeNo;
private String policeName;
private String phoneNum;
private String carNum;
private Integer online;
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date gpsTime;
//3401,3402等地市代码
private String infoSource;
}

View File

@ -0,0 +1,150 @@
package org.dromara.kafka.consumer.service;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.dromara.kafka.consumer.config.HttpClientConfig;
import org.dromara.kafka.consumer.util.HttpClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* Kafka
*/
@Component
public class BatchMessageProcessor {
private static final Logger logger = LoggerFactory.getLogger(BatchMessageProcessor.class);
@Autowired
private HttpClientConfig httpClientConfig;
@Autowired
private HttpClientUtil httpClientUtil;
private BlockingQueue<String> messageQueue;
private ExecutorService executorService;
private final AtomicBoolean running = new AtomicBoolean(false);
@PostConstruct
public void init() {
logger.info("初始化批量消息处理器...");
// 创建内存队列
messageQueue = new LinkedBlockingQueue<>(500);
// 启动处理线程
executorService = Executors.newFixedThreadPool(3); // 使用3个线程处理消息
running.set(true);
for (int i = 0; i < 3; i++) {
executorService.submit(this::processMessages);
}
logger.info("批量消息处理器启动成功");
}
@PreDestroy
public void destroy() {
logger.info("关闭批量消息处理器...");
running.set(false);
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
logger.info("批量消息处理器已关闭");
}
/**
*
*
* @param message
* @return
*/
public boolean addMessage(String message) {
try {
return messageQueue.offer(message);
} catch (Exception e) {
logger.error("添加消息到队列异常: {}", e.getMessage(), e);
return false;
}
}
/**
*
*/
private void processMessages() {
logger.info("启动消息处理线程...");
List<String> batch = new ArrayList<>(httpClientConfig.getBatchSize());
long lastSendTime = System.currentTimeMillis();
while (running.get()) {
try {
// 从队列中获取消息
String message = messageQueue.poll(100, TimeUnit.MILLISECONDS);
if (message != null) {
batch.add(message);
}
// 检查是否需要发送批次
long currentTime = System.currentTimeMillis();
boolean shouldSend = batch.size() >= httpClientConfig.getBatchSize() ||
(batch.size() > 0 && (currentTime - lastSendTime) >= httpClientConfig.getBatchTimeout());
if (shouldSend) {
if (!batch.isEmpty()) {
logger.debug("准备发送批量消息,消息数: {}", batch.size());
boolean success = httpClientUtil.sendBatchMessages(batch);
if (success) {
logger.debug("批量发送消息成功,消息数: {}", batch.size());
} else {
logger.warn("批量发送消息失败,消息数: {}", batch.size());
// 发送失败,将消息重新放回队列
for (String msg : batch) {
messageQueue.offer(msg);
}
}
batch.clear();
lastSendTime = currentTime;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("处理消息异常: {}", e.getMessage(), e);
}
}
// 发送剩余消息
if (!batch.isEmpty()) {
logger.info("发送剩余消息,消息数: {}", batch.size());
httpClientUtil.sendBatchMessages(batch);
}
logger.info("消息处理线程结束");
}
}

View File

@ -0,0 +1,163 @@
package org.dromara.kafka.consumer.service;
import cn.hutool.core.convert.ConvertException;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.dromara.kafka.consumer.config.KafkaConsumerConfig;
import org.dromara.kafka.consumer.domain.EsGpsInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Kafka
* SCRAMKafka
*/
@Service
public class KafkaConsumerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
@Autowired
private KafkaConsumerConfig kafkaConsumerConfig;
@Autowired
private BatchMessageProcessor batchMessageProcessor;
private KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
private volatile boolean running = false;
@PostConstruct
public void init() {
logger.info("初始化Kafka消费者服务...");
Properties props = createConsumerProperties();
consumer = new KafkaConsumer<>(props);
// 订阅主题
String[] topics = kafkaConsumerConfig.getTopics().split(",");
consumer.subscribe(Arrays.asList(topics));
logger.info("订阅Kafka主题成功: {}", Arrays.toString(topics));
// 启动消费者线程
executorService = Executors.newSingleThreadExecutor();
running = true;
executorService.submit(this::consumeMessages);
logger.info("Kafka消费者服务启动成功");
}
@PreDestroy
public void destroy() {
logger.info("关闭Kafka消费者服务...");
running = false;
if (executorService != null) {
executorService.shutdown();
}
if (consumer != null) {
consumer.close();
}
logger.info("Kafka消费者服务已关闭");
}
/**
*
*/
private Properties createConsumerProperties() {
Properties props = new Properties();
// 基本配置
props.put("bootstrap.servers", kafkaConsumerConfig.getBootstrapServers());
// 使用从启动命令获取的消费者组ID
props.put("group.id", kafkaConsumerConfig.getGroupId());
props.put("enable.auto.commit", kafkaConsumerConfig.getEnableAutoCommit());
props.put("auto.commit.interval.ms", kafkaConsumerConfig.getAutoCommitIntervalMs());
props.put("session.timeout.ms", kafkaConsumerConfig.getSessionTimeoutMs());
props.put("max.poll.records", kafkaConsumerConfig.getMaxPollRecords());
// 序列化配置
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 安全配置
props.put("security.protocol", kafkaConsumerConfig.getSecurityProtocol());
props.put("sasl.mechanism", kafkaConsumerConfig.getSaslMechanism());
// 构建SASL JAAS配置
String saslJaasConfig = String.format(
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",
kafkaConsumerConfig.getUsername(),
kafkaConsumerConfig.getPassword()
);
props.put("sasl.jaas.config", saslJaasConfig);
props.put("sasl.jaas.config", saslJaasConfig);
return props;
}
/**
*
*/
private void consumeMessages() {
logger.info("开始消费Kafka消息...");
while (running) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(kafkaConsumerConfig.getPollTimeoutMs()));
logger.info("Poll到消息数: {}", records.count());
for (ConsumerRecord<String, String> record : records) {
try {
// 将消息添加到批量处理器
EsGpsInfo esGpsInfo;
JSONObject jsonObject;
try {
jsonObject = JSONUtil.parseObj(((String) record.value()));
}catch (ConvertException e){
logger.info("jsonObject=null:error={}",e.getMessage());
return;
}
try {
esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class);
}catch (ConvertException e){
logger.info("EsGpsInfo=null:error={}",e.getMessage());
return;
}
esGpsInfo.setInfoSource(kafkaConsumerConfig.getCityCode());
boolean added = batchMessageProcessor.addMessage(esGpsInfo.toString());
if (!added) {
logger.warn("消息队列已满,丢弃消息: topic={}, partition={}, offset={}",
record.topic(), record.partition(), record.offset());
}
} catch (Exception e) {
logger.error("处理消息异常: topic={}, partition={}, offset={}, error={}",
record.topic(), record.partition(), record.offset(), e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error("消费Kafka消息异常: {}", e.getMessage(), e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
logger.info("停止消费Kafka消息");
}
}

View File

@ -0,0 +1,117 @@
package org.dromara.kafka.consumer.util;
import com.alibaba.fastjson.JSON;
import jakarta.annotation.PostConstruct;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.util.Timeout;
import org.dromara.kafka.consumer.config.HttpClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.springframework.http.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* HTTP
*
*/
@Component
public class HttpClientUtil {
private static final Logger logger = LoggerFactory.getLogger(HttpClientUtil.class);
@Autowired
private HttpClientConfig httpClientConfig;
private RestTemplate restTemplate;
@PostConstruct
public void init() {
try {
logger.info("初始化HTTP客户端...");
// 创建连接管理器HttpClient 5.x
PoolingHttpClientConnectionManager connectionManager =
new PoolingHttpClientConnectionManager();
// 设置最大连接数
connectionManager.setMaxTotal(httpClientConfig.getMaxConnections());
// 设置每个路由的最大连接数
connectionManager.setDefaultMaxPerRoute(httpClientConfig.getMaxConnectionsPerRoute());
// 配置连接超时
ConnectionConfig connectionConfig = ConnectionConfig.custom()
.setConnectTimeout(Timeout.of(httpClientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS))
.build();
connectionManager.setDefaultConnectionConfig(connectionConfig);
// 配置 Socket 超时(读写超时)
SocketConfig socketConfig = SocketConfig.custom()
.setSoTimeout(Timeout.of(httpClientConfig.getReadTimeout(), TimeUnit.MILLISECONDS))
.build();
connectionManager.setDefaultSocketConfig(socketConfig);
// 创建 HttpClient
HttpClient httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.build();
// 创建 HttpComponentsClientHttpRequestFactory
HttpComponentsClientHttpRequestFactory factory =
new HttpComponentsClientHttpRequestFactory(httpClient);
// 设置请求超时
factory.setConnectTimeout(httpClientConfig.getConnectTimeout());
restTemplate = new RestTemplate(factory);
logger.info("HTTP客户端初始化成功连接池配置最大连接数={}, 每路由最大连接数={}",
httpClientConfig.getMaxConnections(),
httpClientConfig.getMaxConnectionsPerRoute());
} catch (Exception e) {
logger.error("HTTP客户端初始化失败", e);
throw new RuntimeException("HTTP客户端初始化失败", e);
}
}
/**
*
*
* @param messages
* @return
*/
public boolean sendBatchMessages(List<String> messages) {
try {
String url = httpClientConfig.getProducerUrl() + "/api/kafka/batch";
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
if (httpClientConfig.getEnableCompression()) {
headers.add("Content-Encoding", "gzip");
}
HttpEntity<List<String>> request = new HttpEntity<>(messages, headers);
ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, request, String.class);
if (response.getStatusCode() == HttpStatus.OK) {
logger.debug("批量发送消息成功,消息数: {}", messages.size());
return true;
} else {
logger.warn("批量发送消息失败,状态码: {}, 响应: {}", response.getStatusCode(), response.getBody());
return false;
}
} catch (Exception e) {
logger.error("批量发送消息异常,消息数: {}, 错误: {}", messages.size(), e.getMessage(), e);
return false;
}
}
}

View File

@ -0,0 +1,72 @@
# Tomcat
server:
port: ${kafka.consumer.port:9213}
# Spring
spring:
application:
# 应用名称
name: stwzhj-kafka-consumer
profiles:
# 环境配置
active: @profiles.active@
# Kafka消费者配置
kafka:
consumer:
# Kafka服务器地址通过启动命令传入
bootstrap-servers: ${kafka.consumer.bootstrapServers:}
# 消费者组ID通过启动命令传入
group-id: ${kafka.consumer.groupId:}
# 订阅的主题,多个主题用逗号分隔(通过启动命令传入)
topics: ${kafka.consumer.topics:}
# 安全协议
security-protocol: SASL_PLAINTEXT
# SASL机制
sasl-mechanism: SCRAM-SHA-256
# 是否自动提交offset
enable-auto-commit: true
# 自动提交offset的时间间隔(ms)
auto-commit-interval-ms: 1000
# 会话超时时间(ms)
session-timeout-ms: 30000
# 一次poll的最大等待时间(ms)
poll-timeout-ms: 10000
# 最大poll记录数
max-poll-records: 500
# 城市代码(通过启动命令传入)
city-code: ${kafka.consumer.cityCode:3408}
# 用户名(通过启动命令传入)
username: ${kafka.consumer.username:}
# 密码(通过启动命令传入)
password: ${kafka.consumer.password:}
# 消息队列容量
queue-capacity: 100000
# HTTP客户端配置
http:
client:
# 生产者服务地址
producer-url: http://localhost:9214
# 连接超时时间(ms)
connect-timeout: 5000
# 读取超时时间(ms)
read-timeout: 10000
# 最大连接数
max-connections: 200
# 每个路由的最大连接数
max-connections-per-route: 50
# 批量发送大小
batch-size: 100
# 批量发送超时时间(ms)
batch-timeout: 100
# 是否启用压缩
enable-compression: true
# 日志配置
logging:
level:
org.springframework: warn
org.apache.kafka: INFO
org.dromara.kafka.consumer: INFO
config: classpath:logback-plus.xml

View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志存放路径 -->
<property name="log.path" value="logs" />
<property name="log.file" value="consumer" />
<property name="MAX_FILE_SIZE" value="50MB" />
<property name="MAX_HISTORY" value="30" />
<!-- 日志输出格式 -->
<!-- INFO日志Appender -->
<appender name="FILE_INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/info.${log.file}.log</file>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
<maxHistory>${MAX_HISTORY}</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- ERROR日志Appender -->
<appender name="FILE_ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.${log.file}.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
<maxHistory>${MAX_HISTORY}</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 根Logger配置禁用控制台输出 -->
<root level="INFO">
<appender-ref ref="FILE_INFO" />
<appender-ref ref="FILE_ERROR" />
</root>
</configuration>

View File

@ -0,0 +1,51 @@
# stwzhj-kafka-producer 服务说明
## 功能描述
接收来自消费者服务的HTTP请求并将消息发送到华为认证的Kafka。
## 配置说明
本服务的华为Kafka认证配置从服务器上的以下文件读取
- `/shengting/gpsstore/producer.properties`: Kafka生产者配置
- `/shengting/gpsstore/krb5.conf`: Kerberos配置文件
- `/shengting/gpsstore/user.keytab`: Kerberos认证keytab文件
- `/shengting/gpsstore/kafkaSecurityMode`: 安全模式开关
这些配置文件与原服务保持一致确保华为Kafka认证完全兼容。
## 启动示例
```bash
java -jar stwzhj-kafka-producer.jar
```
## 高吞吐量优化
1. **异步发送**: 支持异步发送消息,提高并发处理能力
2. **批量发送**: 配合Kafka的批量发送机制提高吞吐量
3. **连接复用**: HTTP连接复用减少连接建立开销
## 注意事项
1. 确保服务器上的华为Kafka认证配置文件存在且配置正确
2. 服务启动时会自动读取服务器上的配置文件,无需手动配置
3. 监控Kafka发送成功率及时发现和处理异常
## API接口
### 批量接收消息
**URL**: `/api/kafka/batch`
**方法**: POST
**请求体**: JSON数组包含多条消息
```json
["message1", "message2", "message3"]
```
**响应**:
- 成功: 200 OK, "发送成功"
- 失败: 500 Internal Server Error, "发送失败" 或 "处理异常: xxx"

View File

@ -0,0 +1,118 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-modules</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>stwzhj-kafka-producer</artifactId>
<description>
stwzhj-kafka-producer 从内存队列获取消息并发送到华为认证的Kafka
</description>
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.6.1-h0.cbu.mrs.350.r11</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>manager-wc2frm</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>om-controller-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1-h0.cbu.mrs.350.r11</version>
</dependency>
<!-- FastJSON -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<!-- Apache Commons Lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- Hutool -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.0</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- 动态线程池 -->
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-common</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,15 @@
package org.dromara.kafka.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Kafka
* Kafka
*/
@SpringBootApplication
public class KafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
}

View File

@ -0,0 +1,54 @@
package org.dromara.kafka.producer.controller;
import org.dromara.kafka.producer.service.KafkaProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* Kafka
* HTTPKafka
*/
@RestController
@RequestMapping("/api/kafka")
public class KafkaProducerController {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerController.class);
@Autowired
private KafkaProducerService kafkaProducerService;
/**
* Kafka
*
* @param messages
* @return
*/
@PostMapping("/batch")
public ResponseEntity<String> batchSend(@RequestBody List<String> messages) {
try {
logger.info("接收到批量消息,消息数: {}", messages.size());
// 批量发送消息到Kafka
boolean success = kafkaProducerService.sendBatchMessages(messages);
if (success) {
logger.info("批量发送消息成功,消息数: {}", messages.size());
return ResponseEntity.ok("发送成功");
} else {
logger.warn("批量发送消息失败,消息数: {}", messages.size());
return ResponseEntity.status(500).body("发送失败");
}
} catch (Exception e) {
logger.error("处理批量消息异常,消息数: {}, 错误: {}", messages.size(), e.getMessage(), e);
return ResponseEntity.status(500).body("处理异常: " + e.getMessage());
}
}
}

View File

@ -0,0 +1,212 @@
package org.dromara.kafka.producer.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.dromara.kafka.producer.util.KafkaProperties;
import org.dromara.kafka.producer.util.LoginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-03 14:15
*/
@Component
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private final KafkaProducer<String, String> producer;
// 私有静态实例volatile 保证可见性和有序性)
private static volatile Producer instance;
private final Boolean isAsync = true;
// Broker地址列表
private final static String BOOTSTRAP_SERVER = "bootstrap.servers";
// 客户端ID
private final static String CLIENT_ID = "client.id";
// Key序列化类
private final static String KEY_SERIALIZER = "key.serializer";
// Value序列化类
private final static String VALUE_SERIALIZER = "value.serializer";
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
private final static String SECURITY_PROTOCOL = "security.protocol";
// 服务名
private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
// 域名
private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name";
// 分区类名
private final static String PARTITIONER_NAME = "partitioner.class";
// 默认发送100条消息
private final static int MESSAGE_NUM = 100;
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw";
/**
* Producer constructor
*
*/
public Producer() {
initSecurity();
Properties props = initProperties();
this.producer = new KafkaProducer<>(props);
}
// 获取单例实例的公共方法(双重校验锁)
public static Producer getInstance() {
if (instance == null) {
synchronized (Producer.class) {
if (instance == null) {
instance = new Producer();
}
}
}
return instance;
}
// 添加 ShutdownHook 确保资源释放(推荐)
static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (instance != null && instance.producer != null) {
instance.producer.close();
}
}));
}
/**
*
*/
public void initSecurity() {
if (LoginUtil.isSecurityModel())
{
try {
logger.info("Securitymode start.");
// !!注意,安全认证时,需要用户手动修改为自己申请的机机账号
LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
} catch (IOException e) {
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);
}
logger.info("Security prepare success.");
}
}
public static Properties initProperties() {
Properties props = new Properties();
KafkaProperties kafkaProc = KafkaProperties.getInstance();
// Broker地址列表
props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
// 客户端ID
props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
// Key序列化类
props.put(KEY_SERIALIZER,
kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
// Value序列化类
props.put(VALUE_SERIALIZER,
kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
// 服务名
props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
// 域名
props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
// 分区类名
// props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner"));
return props;
}
/**
*
*
* @param topic
* @param message
* @return RecordMetadata null
*/
public RecordMetadata sendMessage(String topic, String message) {
try {
logger.info("调用发送:topic={}, Object={}",topic,message );
long startTime = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
if (isAsync) {
// 异步发送
producer.send(record, new DemoCallBack(startTime,topic, message));
return null;
} else {
Future<RecordMetadata> future = producer.send(record);
logger.info("同步发送成功: Object={}", future.get().topic());
return future.get();
}
}catch (Exception e){
e.printStackTrace();
}
return null;
}
/**
*
*/
private static class DemoCallBack implements Callback {
private final Logger logger = LoggerFactory.getLogger(DemoCallBack.class);
private final long startTime;
private final String topic;
private final String message;
public DemoCallBack(long startTime, String topic, String message) {
this.startTime = startTime;
this.topic = topic;
this.message = message;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
logger.info("topic=({}, {}) sent to partition({}), offset({}) in {} ms",
topic, message, metadata.partition(), metadata.offset(), elapsedTime);
} else if (exception != null) {
logger.error("Message sending failed", exception);
}
}
}
}

View File

@ -0,0 +1,94 @@
package org.dromara.kafka.producer.service;
import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.dromara.kafka.producer.producer.Producer;
import org.dromara.kafka.producer.util.KafkaProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Kafka
* Kafka
*/
@Service
public class KafkaProducerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
private Producer producer;
@PostConstruct
public void init() {
logger.info("初始化Kafka生产者服务...");
producer = Producer.getInstance();
logger.info("Kafka生产者服务初始化成功");
}
/**
* Kafka
*
* @param messages
* @return
*/
/**
* Kafka
*
* @param messages
* @return
*/
public boolean sendBatchMessages(List<String> messages) {
if (messages == null || messages.isEmpty()) {
logger.warn("消息列表为空,不发送");
return true;
}
try {
String topic = KafkaProperties.TOPIC;
boolean allSuccess = true;
int successCount = 0;
int failCount = 0;
for (String message : messages) {
try {
// 使用Producer单例发送消息异步发送
RecordMetadata metadata = producer.sendMessage(topic, message);
// 异步发送时metadata为null表示发送请求已提交
if (metadata == null) {
successCount++;
} else {
// 同步发送的情况(但当前配置是异步)
successCount++;
logger.info("同步发送成功: topic={}, offset={}", metadata.topic(), metadata.offset());
}
} catch (Exception e) {
logger.error("发送消息异常: {}", e.getMessage(), e);
allSuccess = false;
failCount++;
}
}
logger.info("批量发送消息完成,总数: {}, 成功: {}, 失败: {}", messages.size(), successCount, failCount);
// 异步发送需要等待一段时间确保消息发送完成
if (successCount > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("等待消息发送完成时被中断");
}
}
return allSuccess;
} catch (Exception e) {
logger.error("批量发送消息异常: {}", e.getMessage(), e);
return false;
}
}
}

View File

@ -0,0 +1,138 @@
package org.dromara.kafka.producer.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class KafkaProperties
{
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
// Topic名称安全模式下需要以管理员用户添加当前用户的访问权限
public final static String TOPIC = "jysb_dwxx";
private static Properties serverProps = new Properties();
private static Properties producerProps = new Properties();
private static Properties consumerProps = new Properties();
private static Properties clientProps = new Properties();
private static KafkaProperties instance = null;
private KafkaProperties()
{
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/home/rsoft/config/";
LOG.info("路径=={}",filePath);
try
{
File proFile = new File(filePath + "producer.properties");
if (proFile.exists())
{
producerProps.load(new FileInputStream(filePath + "producer.properties"));
}
File conFile = new File(filePath + "producer.properties");
if (conFile.exists())
{
consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
}
File serFile = new File(filePath + "server.properties");
if (serFile.exists())
{
serverProps.load(new FileInputStream(filePath + "server.properties"));
}
File cliFile = new File(filePath + "client.properties");
if (cliFile.exists())
{
clientProps.load(new FileInputStream(filePath + "client.properties"));
}
}
catch (IOException e)
{
LOG.info("The Exception occured.", e);
}
}
public synchronized static KafkaProperties getInstance()
{
if (null == instance)
{
instance = new KafkaProperties();
}
return instance;
}
/**
*
* @param key properiteskey
* @param defValue
* @return
*/
public String getValues(String key, String defValue)
{
String rtValue = null;
if (null == key)
{
LOG.error("key is null");
}
else
{
rtValue = getPropertiesValue(key);
}
if (null == rtValue)
{
LOG.warn("KafkaProperties.getValues return null, key is " + key);
rtValue = defValue;
}
LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
return rtValue;
}
/**
* keyserver.properties
* @param key
* @return
*/
private String getPropertiesValue(String key)
{
String rtValue = serverProps.getProperty(key);
// server.properties中没有则再向producer.properties中获取
if (null == rtValue)
{
rtValue = producerProps.getProperty(key);
}
// producer中没有则再向consumer.properties中获取
if (null == rtValue)
{
rtValue = consumerProps.getProperty(key);
}
// consumer没有则再向client.properties中获取
if (null == rtValue)
{
rtValue = clientProps.getProperty(key);
}
return rtValue;
}
}

View File

@ -0,0 +1,259 @@
package org.dromara.kafka.producer.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
public class LoginUtil {
private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);
/**
* no JavaDoc
*/
public enum Module {
KAFKA("KafkaClient"), ZOOKEEPER("Client");
private String name;
private Module(String name)
{
this.name = name;
}
public String getName()
{
return name;
}
}
/**
* line operator string
*/
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
/**
* jaas file postfix
*/
private static final String JAAS_POSTFIX = ".jaas.conf";
/**
* is IBM jdk or not
*/
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
/**
* IBM jdk login module
*/
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
/**
* oracle jdk login module
*/
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
/**
* Zookeeper quorum principal.
*/
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
/**
* java security krb5 file path
*/
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
/**
* java security login file path
*/
public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
/**
* jaas.conf
*
* @param principal
* @param keytabPath
* @throws IOException
*/
public static void setJaasFile(String principal, String keytabPath)
throws IOException {
String jaasPath =
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ JAAS_POSTFIX;
// windows路径下分隔符替换
jaasPath = jaasPath.replace("\\", "\\\\");
// 删除jaas文件
deleteJaasFile(jaasPath);
writeJaasFile(jaasPath, principal, keytabPath);
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
}
/**
* zookeeperprincipal
*
* @param zkServerPrincipal
* @throws IOException
*/
public static void setZookeeperServerPrincipal(String zkServerPrincipal)
throws IOException {
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
if (ret == null)
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
}
if (!ret.equals(zkServerPrincipal))
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
}
}
/**
* krb5
*
* @param krb5ConfFile
* @throws IOException
*/
public static void setKrb5Config(String krb5ConfFile)
throws IOException {
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
if (ret == null)
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
}
if (!ret.equals(krb5ConfFile))
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
}
}
/**
* jaas
*
* @throws IOException
*
*/
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
throws IOException {
FileWriter writer = new FileWriter(new File(jaasPath));
try
{
writer.write(getJaasConfContext(principal, keytabPath));
writer.flush();
}
catch (IOException e)
{
throw new IOException("Failed to create jaas.conf File");
}
finally
{
writer.close();
}
}
private static void deleteJaasFile(String jaasPath)
throws IOException {
File jaasFile = new File(jaasPath);
if (jaasFile.exists())
{
if (!jaasFile.delete())
{
throw new IOException("Failed to delete exists jaas file.");
}
}
}
private static String getJaasConfContext(String principal, String keytabPath) {
Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder();
for (Module modlue : allModule)
{
builder.append(getModuleContext(principal, keytabPath, modlue));
}
return builder.toString();
}
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK) {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
} else {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
builder.append("storeKey=true").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
return builder.toString();
}
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/home/rsoft/config/";
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + keyTabFile;
// windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
LoginUtil.setJaasFile(principal, userKeyTableFile);
}
/**
* Check security mode
*
* @return boolean
*/
public static Boolean isSecurityModel() {
Boolean isSecurity = false;
// String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
String krbFilePath = "/home/rsoft/config/kafkaSecurityMode";
Properties securityProps = new Properties();
// file does not exist.
if (!isFileExists(krbFilePath)) {
return isSecurity;
}
try {
securityProps.load(new FileInputStream(krbFilePath));
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
{
isSecurity = true;
}
} catch (Exception e) {
LOG.info("The Exception occured : {}.", e);
}
return isSecurity;
}
/*
*
*/
private static boolean isFileExists(String fileName) {
File file = new File(fileName);
return file.exists();
}
}

View File

@ -0,0 +1,20 @@
# Tomcat
server:
port: 9214
# Spring
spring:
application:
# 应用名称
name: stwzhj-kafka-producer
profiles:
# 环境配置
active: @profiles.active@
# 日志配置
logging:
level:
org.springframework: warn
org.apache.kafka: INFO
org.dromara.kafka.producer: INFO
config: classpath:logback-plus.xml

View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="log.path" value="logs/stwzhj-kafka-producer"/>
<property name="log.maxHistory" value="30"/>
<property name="log.maxSize" value="100MB"/>
<property name="log.totalSizeCap" value="10GB"/>
<!-- 控制台输出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- 系统日志输出 -->
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/info.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxHistory>${log.maxHistory}</maxHistory>
<maxFileSize>${log.maxSize}</maxFileSize>
<totalSizeCap>${log.totalSizeCap}</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 错误日志输出 -->
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxHistory>${log.maxHistory}</maxHistory>
<maxFileSize>${log.maxSize}</maxFileSize>
<totalSizeCap>${log.totalSizeCap}</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="file_info"/>
<appender-ref ref="file_error"/>
</root>
</configuration>