宣城kafka添加认证
parent
359b92a7f1
commit
3be8f33ba4
|
|
@ -1,5 +1,6 @@
|
||||||
package org.dromara.data2es.config;
|
package org.dromara.data2es.config;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
import org.apache.kafka.clients.admin.NewTopic;
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.dromara.data2es.producer.NewProducer;
|
import org.dromara.data2es.producer.NewProducer;
|
||||||
|
|
@ -11,6 +12,8 @@ import org.springframework.kafka.core.KafkaAdmin;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -128,9 +131,9 @@ public class KafkaConfig {
|
||||||
// props.put(kerberosDomainName, "hadoop.hadoop.com");
|
// props.put(kerberosDomainName, "hadoop.hadoop.com");
|
||||||
//设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition
|
//设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition
|
||||||
//props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
|
//props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
|
||||||
/*props.put(securityProtocol, "SASL_PLAINTEXT");
|
props.put(securityProtocol, "SASL_PLAINTEXT");
|
||||||
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
|
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";");
|
||||||
props.put("sasl.mechanism", "SCRAM-SHA-256");*/
|
props.put("sasl.mechanism", "SCRAM-SHA-256");
|
||||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||||
// KafkaProducer producer = new KafkaProducer<>(props);
|
// KafkaProducer producer = new KafkaProducer<>(props);
|
||||||
|
|
||||||
|
|
@ -139,7 +142,21 @@ public class KafkaConfig {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public KafkaAdmin admin(KafkaProperties properties){
|
public KafkaAdmin admin(KafkaProperties properties){
|
||||||
KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
// 1. 集群地址
|
||||||
|
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||||
|
kafkaServers);
|
||||||
|
// 2. SASL认证(和命令行的client.properties完全一致)
|
||||||
|
configs.put("security.protocol", "SASL_PLAINTEXT");
|
||||||
|
configs.put("sasl.mechanism", "SCRAM-SHA-256");
|
||||||
|
configs.put("sasl.jaas.config",
|
||||||
|
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
|
||||||
|
"username=\"rsoft\" password=\"rsoft-2026\";");
|
||||||
|
// 3. 解决超时核心配置
|
||||||
|
configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 60s超时
|
||||||
|
configs.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 2000); // 重试间隔
|
||||||
|
configs.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 300000);
|
||||||
|
KafkaAdmin admin = new KafkaAdmin(configs);
|
||||||
admin.setFatalIfBrokerNotAvailable(true);
|
admin.setFatalIfBrokerNotAvailable(true);
|
||||||
return admin;
|
return admin;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -87,9 +87,9 @@ public class SysDictDataController extends BaseController {
|
||||||
@Log(title = "字典数据", businessType = BusinessType.INSERT)
|
@Log(title = "字典数据", businessType = BusinessType.INSERT)
|
||||||
@PostMapping
|
@PostMapping
|
||||||
public R<Void> add(@Validated @RequestBody SysDictDataBo dict) {
|
public R<Void> add(@Validated @RequestBody SysDictDataBo dict) {
|
||||||
if (!dictDataService.checkDictDataUnique(dict)) {
|
/*if (!dictDataService.checkDictDataUnique(dict)) {
|
||||||
return R.fail("新增字典数据'" + dict.getDictValue() + "'失败,字典键值已存在");
|
return R.fail("新增字典数据'" + dict.getDictValue() + "'失败,字典键值已存在");
|
||||||
}
|
}*/
|
||||||
dictDataService.insertDictData(dict);
|
dictDataService.insertDictData(dict);
|
||||||
return R.ok();
|
return R.ok();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue