华为认证和SCRAM认证不能共存在一个里面切记
parent
45167e5874
commit
fd95d19e9c
|
|
@ -38,30 +38,33 @@ public class KafkaConsumerRunnable implements Runnable {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
KafkaConsumer<String,Object> consumer = new KafkaConsumer<>(props);
|
||||
KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(props);
|
||||
|
||||
List topics = (List) props.get("topics");
|
||||
consumer.subscribe(topics);
|
||||
consumer.poll(0); // 令订阅生效
|
||||
|
||||
List<TopicPartition> topicPartitions = new ArrayList<>();
|
||||
Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
|
||||
for (Object topic : topics) {
|
||||
String topic1 = (String) topic;
|
||||
List<PartitionInfo> partitionInfos = stringListMap.get(topic1);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition());
|
||||
topicPartitions.add(partition);
|
||||
}
|
||||
}
|
||||
consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端
|
||||
// consumer.poll(0); // 令订阅生效
|
||||
//
|
||||
// List<TopicPartition> topicPartitions = new ArrayList<>();
|
||||
// Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
|
||||
// for (Object topic : topics) {
|
||||
// String topic1 = (String) topic;
|
||||
// List<PartitionInfo> partitionInfos = stringListMap.get(topic1);
|
||||
// for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
// TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition());
|
||||
// topicPartitions.add(partition);
|
||||
// }
|
||||
// }
|
||||
// consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端
|
||||
|
||||
logger.info("SCRAM认证Kafka订阅Topic成功:{}", topics);
|
||||
while (true) {
|
||||
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(100));
|
||||
// 延长poll超时到10秒(跨网段足够)
|
||||
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(10000));
|
||||
logger.info("Poll到消息数:{}", records.count());
|
||||
for (ConsumerRecord<String, Object> record : records) {
|
||||
logger.info("offset={}, topic={}, value={}", record.offset(), record.topic(), record.value());
|
||||
taskExecutor.submit(new ConsumerWorker(record, cityCode));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -96,10 +96,12 @@ public class RealConsumer implements CommandLineRunner {
|
|||
}
|
||||
logger.info("Security prepare success.");
|
||||
}
|
||||
|
||||
kafkaProp.put("security.protocol", "SASL_PLAINTEXT");
|
||||
kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2024\";");
|
||||
kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");
|
||||
// kafkaProp.put("socket.connection.setup.timeout.ms", "60000");
|
||||
//
|
||||
// kafkaProp.put("security.protocol", "SASL_PLAINTEXT");
|
||||
// kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";");
|
||||
// kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");
|
||||
// kafkaProp.put("metadata.max.age.ms", Long.MAX_VALUE); // 彻底禁用元数据更新
|
||||
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
|
||||
executorService.execute(runnable);
|
||||
}
|
||||
|
|
@ -130,21 +132,12 @@ public class RealConsumer implements CommandLineRunner {
|
|||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("bootstrap.servers",kafkaServers);
|
||||
map.put("group.id",groupId);
|
||||
map.put("enable.auto.commit", "true");
|
||||
map.put("auto.commit.interval.ms", "1000");
|
||||
map.put("enable.auto.commit", true);
|
||||
map.put("auto.commit.interval.ms", 1000);
|
||||
map.put("session.timeout.ms", "30000");
|
||||
map.put("key.deserializer", StringDeserializer.class);
|
||||
map.put("value.deserializer", StringDeserializer.class);
|
||||
map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
|
||||
// map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1000 * 5);
|
||||
// map.put("ack.mode", "manual_immediate");
|
||||
|
||||
// //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
|
||||
// map.put("security.protocol","SASL_PLAINTEXT");
|
||||
// //服务名
|
||||
// map.put("sasl.kerberos.service.name","kafka");
|
||||
// //域名
|
||||
// map.put("kerberos.domain.name","hadoop.hadoop.com");
|
||||
String[] split = topics.split(",");
|
||||
List list = CollectionUtils.arrayToList(split);
|
||||
map.put("topics", list);
|
||||
|
|
|
|||
Loading…
Reference in New Issue