省厅位置汇聚发送kafka修改

stwzhj
luyya 2025-09-10 09:27:05 +08:00
parent 16f8b6dec5
commit e5a166d4a3
3 changed files with 4 additions and 3 deletions

View File

@ -116,7 +116,7 @@ public class ConsumerWorker implements Runnable {
Object value = record.value(); Object value = record.value();
String topic = record.topic(); String topic = record.topic();
// logger.info("offset={},topic={},value={}", record.offset(), topic,value); logger.info("offset={},topic={},value={}", record.offset(), topic,value);
EsGpsInfo esGpsInfo; EsGpsInfo esGpsInfo;
JSONObject jsonObject; JSONObject jsonObject;
try { try {

View File

@ -97,9 +97,9 @@ public class RealConsumer implements CommandLineRunner {
logger.info("Security prepare success."); logger.info("Security prepare success.");
} }
/*kafkaProp.put("security.protocol", "SASL_PLAINTEXT"); kafkaProp.put("security.protocol", "SASL_PLAINTEXT");
kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2024\";"); kafkaProp.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2024\";");
kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");*/ kafkaProp.put("sasl.mechanism", "SCRAM-SHA-256");
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode); KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
executorService.execute(runnable); executorService.execute(runnable);
} }

View File

@ -43,6 +43,7 @@ public class RedisOnlineUserSchedule {
} }
EsGpsInfoVO2 vo2 = BeanUtil.toBean(job, EsGpsInfoVO2.class); EsGpsInfoVO2 vo2 = BeanUtil.toBean(job, EsGpsInfoVO2.class);
if (1 == vo2.getOnline() && DateUtil.between(vo2.getGpsTime(), new Date(), DateUnit.SECOND) > 1800L){ if (1 == vo2.getOnline() && DateUtil.between(vo2.getGpsTime(), new Date(), DateUnit.SECOND) > 1800L){
vo2.setOnline(0);
gpsInfoVO2s.add(vo2); gpsInfoVO2s.add(vo2);
} }
} }