From 3be8f33ba4d902d69bfad50d6aa8710687a5c310 Mon Sep 17 00:00:00 2001 From: luyya Date: Tue, 24 Mar 2026 18:24:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=A3=E5=9F=8Ekafka=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E8=AE=A4=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dromara/data2es/config/KafkaConfig.java | 25 ++++++++++++++++--- .../system/SysDictDataController.java | 4 +-- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java index 464531b0..b3692b03 100644 --- a/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java +++ b/stwzhj-modules/wzhj-data2es/src/main/java/org/dromara/data2es/config/KafkaConfig.java @@ -1,5 +1,6 @@ package org.dromara.data2es.config; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.dromara.data2es.producer.NewProducer; @@ -11,6 +12,8 @@ import org.springframework.kafka.core.KafkaAdmin; import org.springframework.stereotype.Component; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; /** @@ -128,9 +131,9 @@ public class KafkaConfig { // props.put(kerberosDomainName, "hadoop.hadoop.com"); //设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition //props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner"); - /*props.put(securityProtocol, "SASL_PLAINTEXT"); - props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";"); - props.put("sasl.mechanism", "SCRAM-SHA-256");*/ + props.put(securityProtocol, "SASL_PLAINTEXT"); + props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";"); + props.put("sasl.mechanism", "SCRAM-SHA-256"); KafkaProducer producer = new KafkaProducer<>(props); // KafkaProducer producer = new KafkaProducer<>(props); @@ -139,7 +142,21 @@ public class KafkaConfig { @Bean public KafkaAdmin admin(KafkaProperties properties){ - KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties()); + Map configs = new HashMap<>(); + // 1. 集群地址 + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + kafkaServers); + // 2. SASL认证(和命令行的client.properties完全一致) + configs.put("security.protocol", "SASL_PLAINTEXT"); + configs.put("sasl.mechanism", "SCRAM-SHA-256"); + configs.put("sasl.jaas.config", + "org.apache.kafka.common.security.scram.ScramLoginModule required " + + "username=\"rsoft\" password=\"rsoft-2026\";"); + // 3. 解决超时核心配置 + configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 60s超时 + configs.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 2000); // 重试间隔 + configs.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 300000); + KafkaAdmin admin = new KafkaAdmin(configs); admin.setFatalIfBrokerNotAvailable(true); return admin; } diff --git a/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/controller/system/SysDictDataController.java b/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/controller/system/SysDictDataController.java index 917d9931..47a8add6 100644 --- a/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/controller/system/SysDictDataController.java +++ b/stwzhj-modules/wzhj-system/src/main/java/org/dromara/system/controller/system/SysDictDataController.java @@ -87,9 +87,9 @@ public class SysDictDataController extends BaseController { @Log(title = "字典数据", businessType = BusinessType.INSERT) @PostMapping public R add(@Validated @RequestBody SysDictDataBo dict) { - if (!dictDataService.checkDictDataUnique(dict)) { + /*if (!dictDataService.checkDictDataUnique(dict)) { return R.fail("新增字典数据'" + dict.getDictValue() + "'失败,字典键值已存在"); - } + }*/ dictDataService.insertDictData(dict); return R.ok(); }