From fd95d19e9c31c9d97d0edd9f4a059384b64a14d8 Mon Sep 17 00:00:00 2001 From: luyya Date: Mon, 23 Mar 2026 20:06:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=8E=E4=B8=BA=E8=AE=A4=E8=AF=81=E5=92=8CSC?= =?UTF-8?q?RAM=E8=AE=A4=E8=AF=81=E4=B8=8D=E8=83=BD=E5=85=B1=E5=AD=98?= =?UTF-8?q?=E5=9C=A8=E4=B8=80=E4=B8=AA=E9=87=8C=E9=9D=A2=E5=88=87=E8=AE=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/KafkaConsumerRunnable.java | 35 ++++++++++--------- .../data2kafka/consumer/RealConsumer.java | 23 +++++------- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaConsumerRunnable.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaConsumerRunnable.java index 81272dc0..2d21de7b 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaConsumerRunnable.java +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaConsumerRunnable.java @@ -38,30 +38,33 @@ public class KafkaConsumerRunnable implements Runnable { @Override public void run() { - KafkaConsumer consumer = new KafkaConsumer<>(props); + KafkaConsumer consumer = new KafkaConsumer<>(props); List topics = (List) props.get("topics"); consumer.subscribe(topics); - consumer.poll(0); // 令订阅生效 - - List topicPartitions = new ArrayList<>(); - Map> stringListMap = consumer.listTopics(); - for (Object topic : topics) { - String topic1 = (String) topic; - List partitionInfos = stringListMap.get(topic1); - for (PartitionInfo partitionInfo : partitionInfos) { - TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition()); - topicPartitions.add(partition); - } - } - consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端 +// consumer.poll(0); // 令订阅生效 +// +// List topicPartitions = new ArrayList<>(); +// Map> stringListMap = consumer.listTopics(); +// for (Object topic : topics) { +// String topic1 = (String) topic; +// List partitionInfos = stringListMap.get(topic1); +// for (PartitionInfo partitionInfo : partitionInfos) { +// TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition()); +// topicPartitions.add(partition); +// } +// } +// consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端 + logger.info("SCRAM认证Kafka订阅Topic成功:{}", topics); while (true) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + // 延长poll超时到10秒(跨网段足够) + ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); + logger.info("Poll到消息数:{}", records.count()); for (ConsumerRecord record : records) { + logger.info("offset={}, topic={}, value={}", record.offset(), record.topic(), record.value()); taskExecutor.submit(new ConsumerWorker(record, cityCode)); } - } } } diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java index 3d3c639d..ccaeca55 100644 --- a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java @@ -96,10 +96,12 @@ public class RealConsumer implements CommandLineRunner { } logger.info("Security prepare success."); } - - kafkaProp.put("security.protocol", "SASL_PLAINTEXT"); - kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2024\";"); - kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256"); +// kafkaProp.put("socket.connection.setup.timeout.ms", "60000"); +// +// kafkaProp.put("security.protocol", "SASL_PLAINTEXT"); +// kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";"); +// kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256"); +// kafkaProp.put("metadata.max.age.ms", Long.MAX_VALUE); // 彻底禁用元数据更新 KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode); executorService.execute(runnable); } @@ -130,21 +132,12 @@ public class RealConsumer implements CommandLineRunner { Map map = new HashMap<>(); map.put("bootstrap.servers",kafkaServers); map.put("group.id",groupId); - map.put("enable.auto.commit", "true"); - map.put("auto.commit.interval.ms", "1000"); + map.put("enable.auto.commit", true); + map.put("auto.commit.interval.ms", 1000); map.put("session.timeout.ms", "30000"); map.put("key.deserializer", StringDeserializer.class); map.put("value.deserializer", StringDeserializer.class); map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5); -// map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1000 * 5); -// map.put("ack.mode", "manual_immediate"); - -// //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT -// map.put("security.protocol","SASL_PLAINTEXT"); -// //服务名 -// map.put("sasl.kerberos.service.name","kafka"); -// //域名 -// map.put("kerberos.domain.name","hadoop.hadoop.com"); String[] split = topics.split(","); List list = CollectionUtils.arrayToList(split); map.put("topics", list);