省厅位置汇聚添加地市定位发送到省厅kafka功能

stwzhj
luyya 2025-07-23 14:42:27 +08:00
parent 5f9ecaa366
commit caf0ee6c5b
14 changed files with 1268 additions and 3 deletions

View File

@ -16,6 +16,7 @@
<module>stwzhj-workflow</module>
<module>stwzhj-data2es</module>
<module>stwzhj-baseToSt</module>
<module>stwzhj-data2StKafka</module>
</modules>
<artifactId>stwzhj-modules</artifactId>

View File

@ -3,7 +3,6 @@ package org.dromara.kafka.consumer.config;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.dromara.kafka.consumer.handler.KafkaSecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -111,14 +110,14 @@ public class NewConsumer extends Thread{
public static void main(String[] args)
{
if (KafkaSecurityUtil.isSecurityModel())
if (LoginUtil.isSecurityModel())
{
try
{
LOG.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
KafkaSecurityUtil.securityPrepare();
LoginUtil.setJaasFile(USER_PRINCIPAL,USER_KEYTAB_FILE);
}
catch (IOException e)
{

View File

@ -0,0 +1,208 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-modules</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>stwzhj-data2StKafka</artifactId>
<description>
stwzhj-data2StKafka 消费地市kafka发送到省厅kafka
</description>
<dependencies>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-nacos</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-sentinel</artifactId>
</dependency>
<!-- RuoYi Common Log -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-log</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-dict</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-doc</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-web</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-mybatis</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-seata</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-idempotent</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-tenant</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-security</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-translation</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-sensitive</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-encrypt</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-redis</artifactId>
</dependency>
<!-- RuoYi Api System -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-api-system</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-api-resource</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-api-data2es</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.10.2-h0.cbu.mrs.350.r11</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.2-h0.cbu.mrs.350.r11</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>parent-join-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>aggs-matrix-stats-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.10.2-h0.cbu.mrs.350.r11</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>3.6.1-h0.cbu.mrs.350.r11</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>manager-wc2frm</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>om-controller-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1-h0.cbu.mrs.350.r11</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,22 @@
package org.dromara.data2kafka;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableDubbo
@EnableScheduling
@SpringBootApplication
public class Data2KafkaApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(Data2KafkaApplication.class);
application.setApplicationStartup(new BufferingApplicationStartup(2048));
application.run(args);
System.out.println("(♥◠‿◠)ノ゙ 消费数据发送至省厅启动成功 ლ(´ڡ`ლ)゙ ");
}
}

View File

@ -0,0 +1,134 @@
package org.dromara.data2kafka.config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Properties;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-03 14:15
*/
@Component
public class KafkaConfig {
private Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka
private String groupId = "ruansiProducer";
// Broker地址列表
private final String bootstrapServers = "bootstrap.servers";
// 客户端ID
private final String clientId = "client.id";
// Key序列化类
private final String keySerializer = "key.serializer";
// Value序列化类
private final String valueSerializer = "value.serializer";
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
private final String securityProtocol = "security.protocol";
// 服务名
private final String saslKerberosServiceName = "sasl.kerberos.service.name";
// 域名
private final String kerberosDomainName = "kerberos.domain.name";
//默认发送20条消息
private final int messageNumToSend = 100;
/**
* keytab
*/
private static final String USER_KEYTAB_FILE = "user.keytab";
/**
*
*/
private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM";
/**
* Producer
* @param
* @param
*/
@Bean(name = "myKafkaProducer")
public KafkaProducer newProducer() {
Properties props = new Properties();
if (true)
{
try
{
logger.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
props.put(securityProtocol, "SASL_PLAINTEXT");
// props.put("sasl.mechanism", "GSSAPI");
// 服务名
props.put(saslKerberosServiceName, "kafka");
// 域名
props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM");
}
catch (IOException e)
{
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);
return null;
}
logger.info("Security prepare success.");
}else{
props.put(securityProtocol, "PLAINTEXT");
}
// Broker地址列表
props.put(bootstrapServers,kafkaServers);
// 客户端ID
props.put(clientId, "ruansiProducer");
// Key序列化类
props.put(keySerializer,
"org.apache.kafka.common.serialization.IntegerSerializer");
// Value序列化类
props.put(valueSerializer,
"org.apache.kafka.common.serialization.StringSerializer");
//批量发送信息配置
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
//props.put(securityProtocol, "SASL_PLAINTEXT");
// // 服务名
// props.put(saslKerberosServiceName, "kafka");
// // 域名
// props.put(kerberosDomainName, "hadoop.hadoop.com");
//设置自定义的分区策略类默认不传key是粘性分区尽量往一个分区中发消息。如果key不为null则默认是按照key的hashcode与 partition的取余来决定哪个partition
//props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
// props.put(securityProtocol, "SASL_PLAINTEXT");
// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
// props.put("sasl.mechanism", "SCRAM-SHA-256");
// KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaProducer producer = new KafkaProducer<>(props);
return producer;
}
}

View File

@ -0,0 +1,259 @@
package org.dromara.data2kafka.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Properties;
public class LoginUtil {
private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class);
/**
* no JavaDoc
*/
public enum Module {
KAFKA("KafkaClient"), ZOOKEEPER("Client");
private String name;
private Module(String name)
{
this.name = name;
}
public String getName()
{
return name;
}
}
/**
* line operator string
*/
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
/**
* jaas file postfix
*/
private static final String JAAS_POSTFIX = ".jaas.conf";
/**
* is IBM jdk or not
*/
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
/**
* IBM jdk login module
*/
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
/**
* oracle jdk login module
*/
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
/**
* Zookeeper quorum principal.
*/
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
/**
* java security krb5 file path
*/
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
/**
* java security login file path
*/
public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
/**
* jaas.conf
*
* @param principal
* @param keytabPath
* @throws IOException
*/
public static void setJaasFile(String principal, String keytabPath)
throws IOException {
String jaasPath =
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
+ JAAS_POSTFIX;
// windows路径下分隔符替换
jaasPath = jaasPath.replace("\\", "\\\\");
// 删除jaas文件
deleteJaasFile(jaasPath);
writeJaasFile(jaasPath, principal, keytabPath);
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
}
/**
* zookeeperprincipal
*
* @param zkServerPrincipal
* @throws IOException
*/
public static void setZookeeperServerPrincipal(String zkServerPrincipal)
throws IOException {
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
if (ret == null)
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
}
if (!ret.equals(zkServerPrincipal))
{
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
}
}
/**
* krb5
*
* @param krb5ConfFile
* @throws IOException
*/
public static void setKrb5Config(String krb5ConfFile)
throws IOException {
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
if (ret == null)
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
}
if (!ret.equals(krb5ConfFile))
{
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
}
}
/**
* jaas
*
* @throws IOException
*
*/
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
throws IOException {
FileWriter writer = new FileWriter(new File(jaasPath));
try
{
writer.write(getJaasConfContext(principal, keytabPath));
writer.flush();
}
catch (IOException e)
{
throw new IOException("Failed to create jaas.conf File");
}
finally
{
writer.close();
}
}
private static void deleteJaasFile(String jaasPath)
throws IOException {
File jaasFile = new File(jaasPath);
if (jaasFile.exists())
{
if (!jaasFile.delete())
{
throw new IOException("Failed to delete exists jaas file.");
}
}
}
private static String getJaasConfContext(String principal, String keytabPath) {
Module[] allModule = Module.values();
StringBuilder builder = new StringBuilder();
for (Module modlue : allModule)
{
builder.append(getModuleContext(principal, keytabPath, modlue));
}
return builder.toString();
}
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) {
StringBuilder builder = new StringBuilder();
if (IS_IBM_JDK) {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("credsType=both").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
} else {
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
builder.append("storeKey=true").append(LINE_SEPARATOR);
builder.append("debug=true;").append(LINE_SEPARATOR);
builder.append("};").append(LINE_SEPARATOR);
}
return builder.toString();
}
public static void securityPrepare(String principal, String keyTabFile) throws IOException {
// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
String filePath = "/rsoft/config/";
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + keyTabFile;
// windows路径下分隔符替换
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
krbFile = krbFile.replace("\\", "\\\\");
LoginUtil.setKrb5Config(krbFile);
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
LoginUtil.setJaasFile(principal, userKeyTableFile);
}
/**
* Check security mode
*
* @return boolean
*/
public static Boolean isSecurityModel() {
Boolean isSecurity = false;
String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
Properties securityProps = new Properties();
// file does not exist.
if (!isFileExists(krbFilePath)) {
return isSecurity;
}
try {
securityProps.load(new FileInputStream(krbFilePath));
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
{
isSecurity = true;
}
} catch (Exception e) {
LOG.info("The Exception occured : {}.", e);
}
return isSecurity;
}
/*
*
*/
private static boolean isFileExists(String fileName) {
File file = new File(fileName);
return file.exists();
}
}

View File

@ -0,0 +1,220 @@
package org.dromara.data2kafka.consumer;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.convert.ConvertException;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dromara.data2es.api.domain.RemoteGpsInfo;
import org.dromara.data2kafka.domain.EsGpsInfo;
import org.dromara.data2kafka.domain.EsGpsInfoVO;
import org.dromara.data2kafka.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-09-06 16:44
*/
public class ConsumerWorker implements Runnable {
private ConsumerRecord<String, Object> record;
@Autowired
private Producer producer;
private Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
private String cityCode ;
ConsumerWorker(ConsumerRecord<String, Object> record, String cityCode) {
this.record = record;
this.cityCode = cityCode;
}
@Override
public void run() {
//其他地市使用的方法,这里使用了一个巧妙的方法我们开发的地市都是传4位这种其他地市的cityCode传大于4位然后截取
if(cityCode.length() > 4){
cityCode = cityCode.substring(0,4);
normalRequest();
}else {
//六安、安庆等地市的方法,这些地市都是我们自己公司开发的东西。
luanrequest();
// luanrequestBatch();
}
}
/*
*
* */
private void luanrequestBatch() {
Object value = record.value();
String topic = record.topic();
List<EsGpsInfo> list = new ArrayList<>();
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
List<JSONObject> jsonObjects = JSON.parseArray((String) value, JSONObject.class);
for (JSONObject jsonObject : jsonObjects) {
EsGpsInfo esGpsInfo;
/*try {
jsonObject = JSONUtil.parseObj(((String) value));
}catch (ConvertException e){
logger.info("jsonObject=null:error={}",e.getMessage());
return;
}*/
try {
esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class);
}catch (ConvertException e){
logger.info("EsGpsInfo=null:error={}",e.getMessage());
return;
}
if(Objects.isNull(esGpsInfo)){
logger.info("esGpsInfo=null no error");
return;
}
String deviceCode = esGpsInfo.getDeviceCode();
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
logger.info("deviceCode:{} is null or is too long ",deviceCode);
return;
}
String latitude = esGpsInfo.getLat();
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
logger.info("latitude:{} is null or is zero ",latitude);
return;
}
String longitude = esGpsInfo.getLng();
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
logger.info("longitude:{} is null or is zero ",longitude);
return;
}
esGpsInfo.setInfoSource(cityCode);
esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime"))));
list.add(esGpsInfo);
}
// dataToEsService.saveGpsInfoBatch(list);
}
private void luanrequest() {
Object value = record.value();
String topic = record.topic();
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
RemoteGpsInfo esGpsInfo;
JSONObject jsonObject;
try {
jsonObject = JSONUtil.parseObj(((String) value));
}catch (ConvertException e){
logger.info("jsonObject=null:error={}",e.getMessage());
return;
}
try {
esGpsInfo = JSONUtil.toBean(jsonObject, RemoteGpsInfo.class);
}catch (ConvertException e){
logger.info("EsGpsInfo=null:error={}",e.getMessage());
return;
}
if(Objects.isNull(esGpsInfo)){
logger.info("esGpsInfo=null no error");
return;
}
String deviceCode = esGpsInfo.getDeviceCode();
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
logger.info("deviceCode:{} is null or is too long ",deviceCode);
return;
}
String latitude = esGpsInfo.getLat();
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
logger.info("latitude:{} is null or is zero ",latitude);
return;
}
String longitude = esGpsInfo.getLng();
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
logger.info("longitude:{} is null or is zero ",longitude);
return;
}
esGpsInfo.setInfoSource(cityCode);
try {
esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime"))));
}catch (Exception e){
logger.error("error_msg={}",e.getMessage());
}
logger.info("esGpsInfo={}",esGpsInfo);
producer.send(esGpsInfo,"jysb_dwxx");
}
/**
*
*/
private void normalRequest() {
Object value = record.value();
String topic = record.topic();
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
EsGpsInfo gpsInfo = new EsGpsInfo();
EsGpsInfoVO esGpsInfoVO;
try {
esGpsInfoVO = JSONUtil.toBean(((String) value), EsGpsInfoVO.class);
}catch (ConvertException e){
logger.info("esGpsInfoVO=null:error={}",e.getMessage());
return;
}
if(Objects.isNull(esGpsInfoVO)){
logger.info("esGpsInfoVO=null no error");
return;
}
try {
DateTime parse = DateUtil.parse(esGpsInfoVO.getGpsTime(), "yyyy-MM-dd HH:mm:ss");
}catch (Exception e){
logger.info("gpsTime:{} format error", esGpsInfoVO.getGpsTime());
return;
}
String deviceCode = esGpsInfoVO.getDeviceCode();
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
logger.info("deviceCode:{} is null or is too long ",deviceCode);
return;
}
String latitude = esGpsInfoVO.getLatitude();
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
logger.info("latitude:{} is null or is zero ",latitude);
return;
}
String longitude = esGpsInfoVO.getLongitude();
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
logger.info("longitude:{} is null or is zero ",longitude);
return;
}
BeanUtil.copyProperties(esGpsInfoVO,gpsInfo,new CopyOptions());
gpsInfo.setLat(latitude);
gpsInfo.setLng(esGpsInfoVO.getLongitude());
gpsInfo.setOrientation(esGpsInfoVO.getDirection());
gpsInfo.setInfoSource(cityCode);
producer.send(gpsInfo,"jysb_dwxx");
}
}

View File

@ -0,0 +1,67 @@
package org.dromara.data2kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-09-06 16:39
*/
public class KafkaConsumerRunnable implements Runnable {
private Map props;
private ThreadPoolExecutor taskExecutor;
private String cityCode;
private Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnable.class);
public KafkaConsumerRunnable(Map props, ThreadPoolExecutor taskExecutor,
String cityCode) {
this.props = props;
this.taskExecutor = taskExecutor;
this.cityCode = cityCode;
}
@Override
public void run() {
KafkaConsumer<String,Object> consumer = new KafkaConsumer<>(props);
List topics = (List) props.get("topics");
consumer.subscribe(topics);
consumer.poll(0); // 令订阅生效
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
for (Object topic : topics) {
String topic1 = (String) topic;
List<PartitionInfo> partitionInfos = stringListMap.get(topic1);
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition());
topicPartitions.add(partition);
}
}
consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Object> record : records) {
taskExecutor.submit(new ConsumerWorker(record, cityCode));
}
}
}
}

View File

@ -0,0 +1,125 @@
package org.dromara.data2kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.dromara.data2kafka.config.LoginUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-09-06 11:15
*/
@Component
public class RealConsumer implements CommandLineRunner {
private String kafkaServers;
private String groupId;
private String topics;
private String cityCode = "3400";
@Autowired
ThreadPoolExecutor dtpExecutor2;
private Logger logger = LoggerFactory.getLogger(RealConsumer.class);
@Override
public void run(String... args) throws Exception {
kafkaServers = "127.0.0.1:9092";
topics = "topic.send.2,topic.send.3,topic.send.4,topic.send.5,topic.send.8";
groupId = "group_ruansi_xuancheng";
cityCode = "3418";
if(args.length > 0){
kafkaServers = args[0];
topics = args[1];
groupId = args[2];
cityCode = args[3];
}
ExecutorService executorService = Executors.newSingleThreadExecutor();
Map kafkaProp = getKafkaProp();
if (false)
{
try
{
logger.info("Securitymode start.");
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
//认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
kafkaProp.put("security.protocol","SASL_PLAINTEXT");
//服务名
kafkaProp.put("sasl.kerberos.service.name","kafka");
//域名
kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com");
LoginUtil.setJaasFile("","");
}
catch (IOException e)
{
logger.error("Security prepare failure.");
logger.error("The IOException occured.", e);
return;
}
logger.info("Security prepare success.");
}
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
executorService.execute(runnable);
}
/**
* kafka
* @return
*/
private Map<String, Object> getKafkaProp() {
// Properties map = new Properties();
Map<String, Object> map = new HashMap<>();
map.put("bootstrap.servers",kafkaServers);
map.put("group.id",groupId);
map.put("enable.auto.commit", "true");
map.put("auto.commit.interval.ms", "1000");
map.put("session.timeout.ms", "30000");
map.put("key.deserializer", StringDeserializer.class);
map.put("value.deserializer", StringDeserializer.class);
map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
// map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1000 * 5);
// map.put("ack.mode", "manual_immediate");
// //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
// map.put("security.protocol","SASL_PLAINTEXT");
// //服务名
// map.put("sasl.kerberos.service.name","kafka");
// //域名
// map.put("kerberos.domain.name","hadoop.hadoop.com");
String[] split = topics.split(",");
List list = CollectionUtils.arrayToList(split);
map.put("topics", list);
return map;
}
}

View File

@ -0,0 +1,52 @@
package org.dromara.data2kafka.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* <p>description: </p>
* gps(es)
* @author chenle
* @date 2021-05-14 9:39
*/
@Data
public class EsGpsInfo implements Serializable {
private static final long serialVersionUID = 7455495841680488351L;
/**
* 21id
* kafka21id
*/
private String deviceCode;
/**
*
*/
private String deviceType;
private String lat;
private String lng;
//方向
private String orientation;
//高程
private String height;
//精度
private String deltaH;
private String speed;
private String zzjgdm;
private String zzjgmc;
private String policeNo;
private String policeName;
private String phoneNum;
private String carNum;
private Integer online;
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date gpsTime;
//3401,3402等地市代码
private String infoSource;
}

View File

@ -0,0 +1,41 @@
package org.dromara.data2kafka.domain;
import lombok.Data;
import java.io.Serializable;
/**
* <p>description: </p>
*
* @author chenle
* @date 2022-04-16 14:59
*/
@Data
public class EsGpsInfoVO implements Serializable {
/**
*
*/
private String deviceCode;
private String latitude;
private String longitude;
//方向
private String direction;
//高程
private String height;
//精度
private String speed;
private String gpsTime;
private String zzjgdm;
private String zzjgmc;
private String policeNo;
private String policeName;
private String carNum;
private Integer online;
}

View File

@ -0,0 +1,54 @@
package org.dromara.data2kafka.producer;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* <p>description: </p>
*
* @author chenle
* @date 2021-11-01 17:20
*/
@Component
public class Producer {
@Autowired
@Resource(name = "myKafkaProducer")
KafkaProducer kafkaProducer;
private Logger LOG = LoggerFactory.getLogger(Producer.class);
/**
* 线
*/
public void send(Object obj,String topic) {
String obj2String = JSONObject.toJSONString(obj);
// 构造消息记录
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, obj2String);
kafkaProducer.send(record, (recordMetadata, e) -> {
if (e != null) {
LOG.error("send--The Exception occured.", e);
}
if (recordMetadata != null)
{
LOG.info("sent to partition(" + recordMetadata.partition() + "), "
+ "offset(" + recordMetadata.offset()+"),topic="+recordMetadata.topic());
}
});
}
}

View File

@ -0,0 +1,34 @@
# Tomcat
server:
port: 9212
# Spring
spring:
application:
# 应用名称
name: stwzhj-data2StKafka
profiles:
# 环境配置
active: @profiles.active@
autoconfigure:
exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
--- # nacos 配置
spring:
cloud:
nacos:
# nacos 服务地址
server-addr: @nacos.server@
username: @nacos.username@
password: @nacos.password@
discovery:
# 注册组
group: @nacos.discovery.group@
namespace: ${spring.profiles.active}
config:
# 配置组
group: @nacos.config.group@
namespace: ${spring.profiles.active}
config:
import:
- optional:nacos:application-common.yml
- optional:nacos:${spring.application.name}.yml

View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志存放路径 -->
<property name="log.path" value="logs" />
<property name="log.file" value="data2stKafka" />
<property name="MAX_FILE_SIZE" value="50MB" />
<property name="MAX_HISTORY" value="30" />
<!-- 日志输出格式 -->
<!-- INFO日志Appender -->
<appender name="FILE_INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/info.${log.file}.log</file>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
<maxHistory>${MAX_HISTORY}</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- ERROR日志Appender -->
<appender name="FILE_ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.${log.file}.log</file>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
<maxHistory>${MAX_HISTORY}</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 根Logger配置禁用控制台输出 -->
<root level="INFO">
<appender-ref ref="FILE_INFO" />
<appender-ref ref="FILE_ERROR" />
</root>
</configuration>