diff --git a/stwzhj-modules/pom.xml b/stwzhj-modules/pom.xml index c3b249e9..863f3a80 100644 --- a/stwzhj-modules/pom.xml +++ b/stwzhj-modules/pom.xml @@ -16,6 +16,7 @@ stwzhj-workflow stwzhj-data2es stwzhj-baseToSt + stwzhj-data2StKafka stwzhj-modules diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java index 2205ee31..4bb0d44c 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/config/NewConsumer.java @@ -3,7 +3,6 @@ package org.dromara.kafka.consumer.config; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.dromara.kafka.consumer.handler.KafkaSecurityUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,14 +110,14 @@ public class NewConsumer extends Thread{ public static void main(String[] args) { - if (KafkaSecurityUtil.isSecurityModel()) + if (LoginUtil.isSecurityModel()) { try { LOG.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 - KafkaSecurityUtil.securityPrepare(); + LoginUtil.setJaasFile(USER_PRINCIPAL,USER_KEYTAB_FILE); } catch (IOException e) { diff --git a/stwzhj-modules/stwzhj-data2StKafka/pom.xml b/stwzhj-modules/stwzhj-data2StKafka/pom.xml new file mode 100644 index 00000000..40034489 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/pom.xml @@ -0,0 +1,208 @@ + + + + org.dromara + stwzhj-modules + ${revision} + + 4.0.0 + + stwzhj-data2StKafka + + + stwzhj-data2StKafka 消费地市kafka发送到省厅kafka + + + + + + org.dromara + stwzhj-common-nacos + + + + org.dromara + stwzhj-common-sentinel + + + + + org.dromara + stwzhj-common-log + + + + org.dromara + stwzhj-common-dict + + + + org.dromara + stwzhj-common-doc + + + + org.dromara + stwzhj-common-web + + + + org.dromara + stwzhj-common-mybatis + + + + org.dromara + stwzhj-common-dubbo + + + + org.dromara + stwzhj-common-seata + + + + org.dromara + stwzhj-common-idempotent + + + + org.dromara + stwzhj-common-tenant + + + + org.dromara + stwzhj-common-security + + + + org.dromara + stwzhj-common-translation + + + + org.dromara + stwzhj-common-sensitive + + + + org.dromara + stwzhj-common-encrypt + + + + org.dromara + stwzhj-common-redis + + + + + org.dromara + stwzhj-api-system + + + + org.dromara + stwzhj-api-resource + + + + org.dromara + stwzhj-api-data2es + + + + org.elasticsearch.client + elasticsearch-rest-client + 7.10.2-h0.cbu.mrs.350.r11 + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 7.10.2-h0.cbu.mrs.350.r11 + + + org.elasticsearch.plugin + parent-join-client + + + org.elasticsearch.plugin + aggs-matrix-stats-client + + + + + + org.elasticsearch + elasticsearch + 7.10.2-h0.cbu.mrs.350.r11 + + + + + org.apache.kafka + kafka_2.12 + 3.6.1-h0.cbu.mrs.350.r11 + + + org.apache.zookeeper + zookeeper + + + net.sf.jopt-simple + jopt-simple + + + com.huawei.mrs + manager-wc2frm + + + org.apache.kafka + kafka-clients + + + org.xerial.snappy + snappy-java + + + com.huawei.mrs + om-controller-api + + + com.101tec + zkclient + + + + + + org.apache.kafka + kafka-clients + 3.6.1-h0.cbu.mrs.350.r11 + + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + + diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/Data2KafkaApplication.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/Data2KafkaApplication.java new file mode 100644 index 00000000..8fa85496 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/Data2KafkaApplication.java @@ -0,0 +1,22 @@ +package org.dromara.data2kafka; + + +import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; +import org.springframework.scheduling.annotation.EnableScheduling; + +@EnableDubbo +@EnableScheduling +@SpringBootApplication +public class Data2KafkaApplication { + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(Data2KafkaApplication.class); + application.setApplicationStartup(new BufferingApplicationStartup(2048)); + application.run(args); + System.out.println("(♥◠‿◠)ノ゙ 消费数据发送至省厅启动成功 ლ(´ڡ`ლ)゙ "); + } + +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaConfig.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaConfig.java new file mode 100644 index 00000000..bc3563ec --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/KafkaConfig.java @@ -0,0 +1,134 @@ +package org.dromara.data2kafka.config; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Properties; + +/** + *

description:

+ * + * @author chenle + * @date 2021-11-03 14:15 + */ +@Component +public class KafkaConfig { + + private Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + private String kafkaServers = "53.1.212.25:21007,53.1.212.26:21007,53.1.212.27:21007"; //省厅 kafka + + private String groupId = "ruansiProducer"; + + + + + // Broker地址列表 + private final String bootstrapServers = "bootstrap.servers"; + + // 客户端ID + private final String clientId = "client.id"; + + // Key序列化类 + private final String keySerializer = "key.serializer"; + + // Value序列化类 + private final String valueSerializer = "value.serializer"; + + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + private final String securityProtocol = "security.protocol"; + + // 服务名 + private final String saslKerberosServiceName = "sasl.kerberos.service.name"; + + // 域名 + private final String kerberosDomainName = "kerberos.domain.name"; + + //默认发送20条消息 + private final int messageNumToSend = 100; + + /** + * 用户自己申请的机机账号keytab文件名称 + */ + private static final String USER_KEYTAB_FILE = "user.keytab"; + + /** + * 用户自己申请的机机账号名称 + */ + private static final String USER_PRINCIPAL = "yhy_ahrs_rcw@A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"; + + /** + * 新Producer 构造函数 + * @param + * @param + */ + + @Bean(name = "myKafkaProducer") + public KafkaProducer newProducer() { + Properties props = new Properties(); + + if (true) + { + try + { + logger.info("Securitymode start."); + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); + props.put(securityProtocol, "SASL_PLAINTEXT"); +// props.put("sasl.mechanism", "GSSAPI"); + // 服务名 + props.put(saslKerberosServiceName, "kafka"); + // 域名 + props.put(kerberosDomainName, "A528C942_01A6_1BEF_7A75_0187DC82C40F.COM"); + } + catch (IOException e) + { + logger.error("Security prepare failure."); + logger.error("The IOException occured.", e); + return null; + } + logger.info("Security prepare success."); + }else{ + props.put(securityProtocol, "PLAINTEXT"); + } + + + + // Broker地址列表 + props.put(bootstrapServers,kafkaServers); + // 客户端ID + props.put(clientId, "ruansiProducer"); + // Key序列化类 + props.put(keySerializer, + "org.apache.kafka.common.serialization.IntegerSerializer"); + // Value序列化类 + props.put(valueSerializer, + "org.apache.kafka.common.serialization.StringSerializer"); + //批量发送信息配置 + props.put("batch.size", 16384); + props.put("linger.ms", 1); + props.put("buffer.memory", 33554432); + // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT + //props.put(securityProtocol, "SASL_PLAINTEXT"); +// // 服务名 +// props.put(saslKerberosServiceName, "kafka"); +// // 域名 +// props.put(kerberosDomainName, "hadoop.hadoop.com"); + //设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition + //props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner"); +// props.put(securityProtocol, "SASL_PLAINTEXT"); +// props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";"); +// props.put("sasl.mechanism", "SCRAM-SHA-256"); +// KafkaProducer producer = new KafkaProducer<>(props); + KafkaProducer producer = new KafkaProducer<>(props); + + return producer; + } + + + +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/LoginUtil.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/LoginUtil.java new file mode 100644 index 00000000..6ce7a18a --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/config/LoginUtil.java @@ -0,0 +1,259 @@ +package org.dromara.data2kafka.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Properties; + +public class LoginUtil { + private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); + + /** + * no JavaDoc + */ + public enum Module { + KAFKA("KafkaClient"), ZOOKEEPER("Client"); + + private String name; + + private Module(String name) + { + this.name = name; + } + + public String getName() + { + return name; + } + } + + /** + * line operator string + */ + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * jaas file postfix + */ + private static final String JAAS_POSTFIX = ".jaas.conf"; + + /** + * is IBM jdk or not + */ + private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); + + /** + * IBM jdk login module + */ + private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; + + /** + * oracle jdk login module + */ + private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + + /** + * Zookeeper quorum principal. + */ + public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; + + /** + * java security krb5 file path + */ + public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + + /** + * java security login file path + */ + public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; + + /** + * 设置jaas.conf文件 + * + * @param principal + * @param keytabPath + * @throws IOException + */ + public static void setJaasFile(String principal, String keytabPath) + throws IOException { + String jaasPath = + new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + + JAAS_POSTFIX; + + // windows路径下分隔符替换 + jaasPath = jaasPath.replace("\\", "\\\\"); + // 删除jaas文件 + deleteJaasFile(jaasPath); + writeJaasFile(jaasPath, principal, keytabPath); + System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); + } + + /** + * 设置zookeeper服务端principal + * + * @param zkServerPrincipal + * @throws IOException + */ + public static void setZookeeperServerPrincipal(String zkServerPrincipal) + throws IOException { + System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); + String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); + if (ret == null) + { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); + } + if (!ret.equals(zkServerPrincipal)) + { + throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); + } + } + + /** + * 设置krb5文件 + * + * @param krb5ConfFile + * @throws IOException + */ + public static void setKrb5Config(String krb5ConfFile) + throws IOException { + System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); + String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); + if (ret == null) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); + } + if (!ret.equals(krb5ConfFile)) + { + throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); + } + } + + /** + * 写入jaas文件 + * + * @throws IOException + * 写文件异常 + */ + private static void writeJaasFile(String jaasPath, String principal, String keytabPath) + throws IOException { + FileWriter writer = new FileWriter(new File(jaasPath)); + try + { + writer.write(getJaasConfContext(principal, keytabPath)); + writer.flush(); + } + catch (IOException e) + { + throw new IOException("Failed to create jaas.conf File"); + } + finally + { + writer.close(); + } + } + + private static void deleteJaasFile(String jaasPath) + throws IOException { + File jaasFile = new File(jaasPath); + if (jaasFile.exists()) + { + if (!jaasFile.delete()) + { + throw new IOException("Failed to delete exists jaas file."); + } + } + } + + private static String getJaasConfContext(String principal, String keytabPath) { + Module[] allModule = Module.values(); + StringBuilder builder = new StringBuilder(); + for (Module modlue : allModule) + { + builder.append(getModuleContext(principal, keytabPath, modlue)); + } + return builder.toString(); + } + + private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { + StringBuilder builder = new StringBuilder(); + if (IS_IBM_JDK) { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("credsType=both").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } else { + builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); + builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); + builder.append("useKeyTab=true").append(LINE_SEPARATOR); + builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); + builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); + builder.append("useTicketCache=false").append(LINE_SEPARATOR); + builder.append("storeKey=true").append(LINE_SEPARATOR); + builder.append("debug=true;").append(LINE_SEPARATOR); + builder.append("};").append(LINE_SEPARATOR); + } + + return builder.toString(); + } + + public static void securityPrepare(String principal, String keyTabFile) throws IOException { +// String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; + String filePath = "/rsoft/config/"; + String krbFile = filePath + "krb5.conf"; + String userKeyTableFile = filePath + keyTabFile; + + // windows路径下分隔符替换 + userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); + krbFile = krbFile.replace("\\", "\\\\"); + + LoginUtil.setKrb5Config(krbFile); + LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); + LoginUtil.setJaasFile(principal, userKeyTableFile); + } + + /** + * Check security mode + * + * @return boolean + */ + public static Boolean isSecurityModel() { + Boolean isSecurity = false; + String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; + + Properties securityProps = new Properties(); + + // file does not exist. + if (!isFileExists(krbFilePath)) { + return isSecurity; + } + + try { + securityProps.load(new FileInputStream(krbFilePath)); + + if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) + { + isSecurity = true; + } + } catch (Exception e) { + LOG.info("The Exception occured : {}.", e); + } + + return isSecurity; + } + + /* + * 判断文件是否存在 + */ + private static boolean isFileExists(String fileName) { + File file = new File(fileName); + + return file.exists(); + } +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java new file mode 100644 index 00000000..ebde63ff --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/ConsumerWorker.java @@ -0,0 +1,220 @@ +package org.dromara.data2kafka.consumer; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.bean.copier.CopyOptions; +import cn.hutool.core.convert.ConvertException; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.dromara.data2es.api.domain.RemoteGpsInfo; +import org.dromara.data2kafka.domain.EsGpsInfo; +import org.dromara.data2kafka.domain.EsGpsInfoVO; +import org.dromara.data2kafka.producer.Producer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.LinkedBlockingDeque; + +/** + *

description:

+ * + * @author chenle + * @date 2021-09-06 16:44 + */ +public class ConsumerWorker implements Runnable { + private ConsumerRecord record; + + @Autowired + private Producer producer; + + private Logger logger = LoggerFactory.getLogger(ConsumerWorker.class); + + + + + private String cityCode ; + + ConsumerWorker(ConsumerRecord record, String cityCode) { + this.record = record; + this.cityCode = cityCode; + } + + @Override + public void run() { + //其他地市使用的方法,这里使用了一个巧妙的方法,我们开发的地市都是传4位,这种其他地市的cityCode传大于4位,然后截取 + if(cityCode.length() > 4){ + cityCode = cityCode.substring(0,4); + normalRequest(); + }else { + //六安、安庆等地市的方法,这些地市都是我们自己公司开发的东西。 + luanrequest(); +// luanrequestBatch(); + } + } + + /* + * 废弃方法 + * */ + private void luanrequestBatch() { + Object value = record.value(); + String topic = record.topic(); + List list = new ArrayList<>(); + logger.info("offset={},topic={},value={}", record.offset(), topic,value); + List jsonObjects = JSON.parseArray((String) value, JSONObject.class); + for (JSONObject jsonObject : jsonObjects) { + EsGpsInfo esGpsInfo; + /*try { + jsonObject = JSONUtil.parseObj(((String) value)); + }catch (ConvertException e){ + logger.info("jsonObject=null:error={}",e.getMessage()); + return; + }*/ + try { + esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class); + }catch (ConvertException e){ + logger.info("EsGpsInfo=null:error={}",e.getMessage()); + return; + } + + if(Objects.isNull(esGpsInfo)){ + logger.info("esGpsInfo=null no error"); + return; + } + String deviceCode = esGpsInfo.getDeviceCode(); + if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){ + logger.info("deviceCode:{} is null or is too long ",deviceCode); + return; + } + String latitude = esGpsInfo.getLat(); + if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){ + logger.info("latitude:{} is null or is zero ",latitude); + return; + } + String longitude = esGpsInfo.getLng(); + if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){ + logger.info("longitude:{} is null or is zero ",longitude); + return; + } + esGpsInfo.setInfoSource(cityCode); + + esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime")))); + list.add(esGpsInfo); + } +// dataToEsService.saveGpsInfoBatch(list); + } + + private void luanrequest() { + Object value = record.value(); + String topic = record.topic(); + + logger.info("offset={},topic={},value={}", record.offset(), topic,value); + RemoteGpsInfo esGpsInfo; + JSONObject jsonObject; + try { + jsonObject = JSONUtil.parseObj(((String) value)); + }catch (ConvertException e){ + logger.info("jsonObject=null:error={}",e.getMessage()); + return; + } + try { + esGpsInfo = JSONUtil.toBean(jsonObject, RemoteGpsInfo.class); + }catch (ConvertException e){ + logger.info("EsGpsInfo=null:error={}",e.getMessage()); + return; + } + + if(Objects.isNull(esGpsInfo)){ + logger.info("esGpsInfo=null no error"); + return; + } + String deviceCode = esGpsInfo.getDeviceCode(); + if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){ + logger.info("deviceCode:{} is null or is too long ",deviceCode); + return; + } + String latitude = esGpsInfo.getLat(); + if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){ + logger.info("latitude:{} is null or is zero ",latitude); + return; + } + String longitude = esGpsInfo.getLng(); + if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){ + logger.info("longitude:{} is null or is zero ",longitude); + return; + } + esGpsInfo.setInfoSource(cityCode); + try { + esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime")))); + }catch (Exception e){ + logger.error("error_msg={}",e.getMessage()); + } + logger.info("esGpsInfo={}",esGpsInfo); + producer.send(esGpsInfo,"jysb_dwxx"); + } + + + + /** + * 通用的请求(一般地市采用这个方法) + */ + private void normalRequest() { + Object value = record.value(); + String topic = record.topic(); + + logger.info("offset={},topic={},value={}", record.offset(), topic,value); + + EsGpsInfo gpsInfo = new EsGpsInfo(); + EsGpsInfoVO esGpsInfoVO; + try { + esGpsInfoVO = JSONUtil.toBean(((String) value), EsGpsInfoVO.class); + }catch (ConvertException e){ + logger.info("esGpsInfoVO=null:error={}",e.getMessage()); + return; + } + if(Objects.isNull(esGpsInfoVO)){ + logger.info("esGpsInfoVO=null no error"); + return; + } + + + try { + DateTime parse = DateUtil.parse(esGpsInfoVO.getGpsTime(), "yyyy-MM-dd HH:mm:ss"); + }catch (Exception e){ + logger.info("gpsTime:{} format error", esGpsInfoVO.getGpsTime()); + return; + } + + String deviceCode = esGpsInfoVO.getDeviceCode(); + if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){ + logger.info("deviceCode:{} is null or is too long ",deviceCode); + return; + } + String latitude = esGpsInfoVO.getLatitude(); + if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){ + logger.info("latitude:{} is null or is zero ",latitude); + return; + } + String longitude = esGpsInfoVO.getLongitude(); + if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){ + logger.info("longitude:{} is null or is zero ",longitude); + return; + } + BeanUtil.copyProperties(esGpsInfoVO,gpsInfo,new CopyOptions()); + gpsInfo.setLat(latitude); + gpsInfo.setLng(esGpsInfoVO.getLongitude()); + gpsInfo.setOrientation(esGpsInfoVO.getDirection()); + gpsInfo.setInfoSource(cityCode); + producer.send(gpsInfo,"jysb_dwxx"); + } + + +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaConsumerRunnable.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaConsumerRunnable.java new file mode 100644 index 00000000..81272dc0 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/KafkaConsumerRunnable.java @@ -0,0 +1,67 @@ +package org.dromara.data2kafka.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; + +/** + *

description:

+ * + * @author chenle + * @date 2021-09-06 16:39 + */ +public class KafkaConsumerRunnable implements Runnable { + + private Map props; + private ThreadPoolExecutor taskExecutor; + + private String cityCode; + private Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnable.class); + + public KafkaConsumerRunnable(Map props, ThreadPoolExecutor taskExecutor, + String cityCode) { + this.props = props; + this.taskExecutor = taskExecutor; + this.cityCode = cityCode; + } + + + @Override + public void run() { + KafkaConsumer consumer = new KafkaConsumer<>(props); + + List topics = (List) props.get("topics"); + consumer.subscribe(topics); + consumer.poll(0); // 令订阅生效 + + List topicPartitions = new ArrayList<>(); + Map> stringListMap = consumer.listTopics(); + for (Object topic : topics) { + String topic1 = (String) topic; + List partitionInfos = stringListMap.get(topic1); + for (PartitionInfo partitionInfo : partitionInfos) { + TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition()); + topicPartitions.add(partition); + } + } + consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端 + + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + taskExecutor.submit(new ConsumerWorker(record, cityCode)); + } + + } + } +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java new file mode 100644 index 00000000..a3e78759 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/consumer/RealConsumer.java @@ -0,0 +1,125 @@ +package org.dromara.data2kafka.consumer; + + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.dromara.data2kafka.config.LoginUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +/** + *

description:

+ * + * @author chenle + * @date 2021-09-06 11:15 + */ +@Component +public class RealConsumer implements CommandLineRunner { + + private String kafkaServers; + + private String groupId; + + private String topics; + + private String cityCode = "3400"; + + + @Autowired + ThreadPoolExecutor dtpExecutor2; + + + private Logger logger = LoggerFactory.getLogger(RealConsumer.class); + + @Override + public void run(String... args) throws Exception { + kafkaServers = "127.0.0.1:9092"; + topics = "topic.send.2,topic.send.3,topic.send.4,topic.send.5,topic.send.8"; + groupId = "group_ruansi_xuancheng"; + cityCode = "3418"; + if(args.length > 0){ + kafkaServers = args[0]; + topics = args[1]; + groupId = args[2]; + cityCode = args[3]; + + } + ExecutorService executorService = Executors.newSingleThreadExecutor(); + Map kafkaProp = getKafkaProp(); + + + if (false) + { + try + { + logger.info("Securitymode start."); + + //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 + //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT + kafkaProp.put("security.protocol","SASL_PLAINTEXT"); + //服务名 + kafkaProp.put("sasl.kerberos.service.name","kafka"); + //域名 + kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com"); + LoginUtil.setJaasFile("",""); + } + catch (IOException e) + { + logger.error("Security prepare failure."); + logger.error("The IOException occured.", e); + return; + } + logger.info("Security prepare success."); + } + + KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode); + executorService.execute(runnable); + } + + + + /** + * 获取kafka配置 + * @return + */ + private Map getKafkaProp() { +// Properties map = new Properties(); + Map map = new HashMap<>(); + map.put("bootstrap.servers",kafkaServers); + map.put("group.id",groupId); + map.put("enable.auto.commit", "true"); + map.put("auto.commit.interval.ms", "1000"); + map.put("session.timeout.ms", "30000"); + map.put("key.deserializer", StringDeserializer.class); + map.put("value.deserializer", StringDeserializer.class); + map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5); +// map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1000 * 5); +// map.put("ack.mode", "manual_immediate"); + +// //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT +// map.put("security.protocol","SASL_PLAINTEXT"); +// //服务名 +// map.put("sasl.kerberos.service.name","kafka"); +// //域名 +// map.put("kerberos.domain.name","hadoop.hadoop.com"); + String[] split = topics.split(","); + List list = CollectionUtils.arrayToList(split); + map.put("topics", list); + return map; + } + + + +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/domain/EsGpsInfo.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/domain/EsGpsInfo.java new file mode 100644 index 00000000..8bc6c6a1 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/domain/EsGpsInfo.java @@ -0,0 +1,52 @@ +package org.dromara.data2kafka.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + *

description:

+ * gps定位信息(es表) + * @author chenle + * @date 2021-05-14 9:39 + */ +@Data +public class EsGpsInfo implements Serializable { + + private static final long serialVersionUID = 7455495841680488351L; + /** + * 唯一码(外部系统)合肥版本不需要 21位id, + * 到时候上传省厅的时候 需要在kafka发送端处理,生成一个省厅需要的21位id + */ + private String deviceCode; + /** + * 类型 + */ + private String deviceType; + private String lat; + private String lng; + //方向 + private String orientation; + //高程 + private String height; + //精度 + private String deltaH; + private String speed; + + private String zzjgdm; + private String zzjgmc; + private String policeNo; + private String policeName; + private String phoneNum; + private String carNum; + + private Integer online; + + @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + private Date gpsTime; + //3401,3402等地市代码 + private String infoSource; + +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/domain/EsGpsInfoVO.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/domain/EsGpsInfoVO.java new file mode 100644 index 00000000..5f416cf6 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/domain/EsGpsInfoVO.java @@ -0,0 +1,41 @@ +package org.dromara.data2kafka.domain; + +import lombok.Data; + +import java.io.Serializable; + +/** + *

description:

+ * + * @author chenle + * @date 2022-04-16 14:59 + */ +@Data +public class EsGpsInfoVO implements Serializable { + /** + * 设备串号,设备唯一值 + */ + private String deviceCode; + private String latitude; + private String longitude; + //方向 + private String direction; + //高程 + private String height; + //精度 + private String speed; + + private String gpsTime; + + private String zzjgdm; + + private String zzjgmc; + + private String policeNo; + + private String policeName; + + private String carNum; + + private Integer online; +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/Producer.java b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/Producer.java new file mode 100644 index 00000000..1ca6debd --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/java/org/dromara/data2kafka/producer/Producer.java @@ -0,0 +1,54 @@ +package org.dromara.data2kafka.producer; + +import com.alibaba.fastjson.JSONObject; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + *

description:

+ * + * @author chenle + * @date 2021-11-01 17:20 + */ +@Component +public class Producer { + + @Autowired + @Resource(name = "myKafkaProducer") + KafkaProducer kafkaProducer; + + private Logger LOG = LoggerFactory.getLogger(Producer.class); + + + /** + * 生产者线程执行函数,循环发送消息。 + */ + public void send(Object obj,String topic) { + String obj2String = JSONObject.toJSONString(obj); + + // 构造消息记录 + ProducerRecord record = new ProducerRecord(topic, obj2String); + + kafkaProducer.send(record, (recordMetadata, e) -> { + if (e != null) { + LOG.error("send--The Exception occured.", e); + } + if (recordMetadata != null) + { + LOG.info("sent to partition(" + recordMetadata.partition() + "), " + + "offset(" + recordMetadata.offset()+"),topic="+recordMetadata.topic()); + } + }); + + } + + + + +} diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml new file mode 100644 index 00000000..01621de8 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/application.yml @@ -0,0 +1,34 @@ +# Tomcat +server: + port: 9212 + +# Spring +spring: + application: + # 应用名称 + name: stwzhj-data2StKafka + profiles: + # 环境配置 + active: @profiles.active@ + autoconfigure: + exclude: org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration +--- # nacos 配置 +spring: + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + username: @nacos.username@ + password: @nacos.password@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml + - optional:nacos:${spring.application.name}.yml diff --git a/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/logback-plus.xml new file mode 100644 index 00000000..14ae5c53 --- /dev/null +++ b/stwzhj-modules/stwzhj-data2StKafka/src/main/resources/logback-plus.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + ${log.path}/info.${log.file}.log + + INFO + ACCEPT + DENY + + + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + +