Compare commits
No commits in common. "ds-xuancheng" and "2.X" have entirely different histories.
ds-xuanche
...
2.X
8
pom.xml
8
pom.xml
|
|
@ -89,12 +89,12 @@
|
|||
<id>prod</id>
|
||||
<properties>
|
||||
<profiles.active>prod</profiles.active>
|
||||
<nacos.server>53.238.79.33:8848</nacos.server>
|
||||
<nacos.server>127.0.0.1:8848</nacos.server>
|
||||
<nacos.discovery.group>DEFAULT_GROUP</nacos.discovery.group>
|
||||
<nacos.config.group>DEFAULT_GROUP</nacos.config.group>
|
||||
<nacos.username>nacos</nacos.username>
|
||||
<nacos.password>Ycgis!2509</nacos.password>
|
||||
<logstash.address>53.238.79.33:4560</logstash.address>
|
||||
<nacos.password>nacos</nacos.password>
|
||||
<logstash.address>127.0.0.1:4560</logstash.address>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
|
@ -376,7 +376,7 @@
|
|||
|
||||
<modules>
|
||||
<module>stwzhj-auth</module>
|
||||
<module>wzhj-gateway</module>
|
||||
<module>stwzhj-gateway</module>
|
||||
<module>stwzhj-visual</module>
|
||||
<module>stwzhj-modules</module>
|
||||
<module>stwzhj-api</module>
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
<module>stwzhj-api-resource</module>
|
||||
<module>stwzhj-api-workflow</module>
|
||||
<module>stwzhj-api-data2es</module>
|
||||
<module>stwzhj-api-location</module>
|
||||
</modules>
|
||||
|
||||
<artifactId>stwzhj-api</artifactId>
|
||||
|
|
|
|||
|
|
@ -47,12 +47,6 @@
|
|||
<version>${revision}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>stwzhj-api-location</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -4,15 +4,8 @@ import org.dromara.common.core.domain.R;
|
|||
import org.dromara.data2es.api.domain.RemoteGpsInfo;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public interface RemoteDataToEsService {
|
||||
|
||||
R saveDataBatch(List<RemoteGpsInfo> gpsInfoList);
|
||||
|
||||
R saveData(RemoteGpsInfo gpsInfo) throws Exception;
|
||||
|
||||
R updateOnlineStatusBatch(List<RemoteGpsInfo> gpsInfoList);
|
||||
|
||||
R updateOnlineStatus(RemoteGpsInfo gpsInfo);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ public class RemoteGpsInfo implements Serializable {
|
|||
private String policeNo;
|
||||
private String policeName;
|
||||
private String phoneNum;
|
||||
private String deviceName;
|
||||
private String carNum;
|
||||
|
||||
private Integer online;
|
||||
|
|
|
|||
|
|
@ -1,33 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>stwzhj-api</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>stwzhj-api-location</artifactId>
|
||||
|
||||
<description>
|
||||
stwzhj-api-location
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!-- stwzhj Common Core-->
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>stwzhj-common-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.dromara</groupId>
|
||||
<artifactId>stwzhj-common-excel</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
@ -1,8 +0,0 @@
|
|||
package org.dromara.location.api;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface RemoteElasticSearchService {
|
||||
|
||||
List<String> linstenDataStatus();
|
||||
}
|
||||
|
|
@ -1,10 +1,5 @@
|
|||
package org.dromara.system.api;
|
||||
|
||||
import org.dromara.system.api.domain.bo.RemoteDeptBo;
|
||||
import org.dromara.system.api.domain.vo.RemoteDeptVo;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 部门服务
|
||||
*
|
||||
|
|
@ -20,6 +15,4 @@ public interface RemoteDeptService {
|
|||
*/
|
||||
String selectDeptNameByIds(String deptIds);
|
||||
|
||||
List<RemoteDeptVo> selectDept(RemoteDeptBo bo);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,70 +0,0 @@
|
|||
package org.dromara.system.api.domain.bo;
|
||||
|
||||
import io.github.linpeilie.annotations.AutoMapper;
|
||||
import jakarta.validation.constraints.Email;
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* 部门业务对象 sys_dept
|
||||
*
|
||||
* @author Michelle.Chung
|
||||
*/
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public class RemoteDeptBo implements Serializable {
|
||||
|
||||
/**
|
||||
* 部门id
|
||||
*/
|
||||
private String deptId;
|
||||
|
||||
/**
|
||||
* 父部门ID
|
||||
*/
|
||||
private String parentId;
|
||||
|
||||
/**
|
||||
* 部门名称
|
||||
*/
|
||||
private String deptName;
|
||||
|
||||
/**
|
||||
* 部门类别编码
|
||||
*/
|
||||
private String deptCategory;
|
||||
|
||||
/**
|
||||
* 显示顺序
|
||||
*/
|
||||
private Integer orderNum;
|
||||
|
||||
/**
|
||||
* 负责人
|
||||
*/
|
||||
private Long leader;
|
||||
|
||||
/**
|
||||
* 联系电话
|
||||
*/
|
||||
private String phone;
|
||||
|
||||
/**
|
||||
* 邮箱
|
||||
*/
|
||||
private String email;
|
||||
|
||||
/**
|
||||
* 部门状态(0正常 1停用)
|
||||
*/
|
||||
private String status;
|
||||
|
||||
private String fullName;
|
||||
|
||||
}
|
||||
|
|
@ -83,12 +83,6 @@ public class RemoteDeviceBo implements Serializable {
|
|||
*/
|
||||
private String remark1;
|
||||
|
||||
private String createTime;
|
||||
|
||||
private String updateTime;
|
||||
|
||||
private String[] zzjgdms;
|
||||
|
||||
/**
|
||||
* 备注字段2
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1,96 +0,0 @@
|
|||
package org.dromara.system.api.domain.vo;
|
||||
|
||||
import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
|
||||
import com.alibaba.excel.annotation.ExcelProperty;
|
||||
import io.github.linpeilie.annotations.AutoMapper;
|
||||
import lombok.Data;
|
||||
import org.dromara.common.excel.annotation.ExcelDictFormat;
|
||||
import org.dromara.common.excel.convert.ExcelDictConvert;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* 部门视图对象 sys_dept
|
||||
*
|
||||
* @author Michelle.Chung
|
||||
*/
|
||||
@Data
|
||||
public class RemoteDeptVo implements Serializable {
|
||||
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
/**
|
||||
* 部门id
|
||||
*/
|
||||
private String deptId;
|
||||
|
||||
/**
|
||||
* 父部门id
|
||||
*/
|
||||
private String parentId;
|
||||
|
||||
/**
|
||||
* 父部门名称
|
||||
*/
|
||||
private String parentName;
|
||||
|
||||
/**
|
||||
* 祖级列表
|
||||
*/
|
||||
private String ancestors;
|
||||
|
||||
/**
|
||||
* 部门名称
|
||||
*/
|
||||
private String deptName;
|
||||
|
||||
/**
|
||||
* 部门类别编码
|
||||
*/
|
||||
private String deptCategory;
|
||||
|
||||
/**
|
||||
* 显示顺序
|
||||
*/
|
||||
private Integer orderNum;
|
||||
|
||||
/**
|
||||
* 负责人ID
|
||||
*/
|
||||
private Long leader;
|
||||
|
||||
/**
|
||||
* 负责人
|
||||
*/
|
||||
private String leaderName;
|
||||
|
||||
/**
|
||||
* 联系电话
|
||||
*/
|
||||
private String phone;
|
||||
|
||||
/**
|
||||
* 邮箱
|
||||
*/
|
||||
private String email;
|
||||
|
||||
/**
|
||||
* 部门状态(0正常 1停用)
|
||||
*/
|
||||
private String status;
|
||||
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private Date createTime;
|
||||
|
||||
private Integer allCount;
|
||||
|
||||
private Integer onlineCount;
|
||||
|
||||
private String fullName;
|
||||
|
||||
}
|
||||
|
|
@ -70,8 +70,6 @@ public class RemoteDeviceVo implements Serializable {
|
|||
|
||||
private String cardNum;
|
||||
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 0无效,1有效
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -130,8 +130,6 @@ public class LoginUser implements Serializable {
|
|||
*/
|
||||
private String deviceType;
|
||||
|
||||
private String manageDeptId;
|
||||
|
||||
/**
|
||||
* 获取登录id
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ server:
|
|||
spring:
|
||||
application:
|
||||
# 应用名称
|
||||
name: wzhj-auth
|
||||
name: stwzhj-auth
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: @profiles.active@
|
||||
|
|
|
|||
|
|
@ -1,49 +1,28 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="logs" />
|
||||
<property name="log.file" value="auth" />
|
||||
<property name="MAX_FILE_SIZE" value="30MB" />
|
||||
<property name="MAX_HISTORY" value="30" />
|
||||
<property name="log.path" value="logs/${project.artifactId}"/>
|
||||
<!-- 日志输出格式 -->
|
||||
<!-- INFO日志Appender -->
|
||||
<appender name="FILE_INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/info.${log.file}.log</file>
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<level>INFO</level>
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||
<fileNamePattern>${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
|
||||
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
|
||||
<maxHistory>${MAX_HISTORY}</maxHistory>
|
||||
</rollingPolicy>
|
||||
<property name="console.log.pattern"
|
||||
value="%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}%n) - %msg%n"/>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
<pattern>${console.log.pattern}</pattern>
|
||||
<charset>utf-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- ERROR日志Appender -->
|
||||
<appender name="FILE_ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/error.${log.file}.log</file>
|
||||
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||
<level>ERROR</level>
|
||||
</filter>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||
<fileNamePattern>${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
|
||||
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
|
||||
<maxHistory>${MAX_HISTORY}</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<include resource="logback-common.xml" />
|
||||
|
||||
<!-- 根Logger配置(禁用控制台输出) -->
|
||||
<root level="INFO">
|
||||
<appender-ref ref="FILE_INFO" />
|
||||
<appender-ref ref="FILE_ERROR" />
|
||||
<include resource="logback-logstash.xml" />
|
||||
|
||||
<!-- 开启 skywalking 日志收集 -->
|
||||
<include resource="logback-skylog.xml" />
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="console"/>
|
||||
</root>
|
||||
|
||||
</configuration>
|
||||
|
|
|
|||
|
|
@ -21,12 +21,8 @@ public class RedisConstants {
|
|||
|
||||
public static final long REDIS_ONLINE_USER_NEVER_EXPIRE = -1;
|
||||
|
||||
public static final long REDIS_NEVER_EXPIRE = 0L;
|
||||
|
||||
public static final long FIVE_MINUTES_REDIS_ONLINE_USER_EXPIRE_TIME = 60 * 5;
|
||||
|
||||
public static final String ONLINE_USERS_TEN = "ten:online_users:";
|
||||
|
||||
|
||||
public static String getUserTokenKey(String token) {
|
||||
return CCL_CODING_SSO_TOKEN + token;
|
||||
|
|
|
|||
|
|
@ -49,15 +49,10 @@ import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADAT
|
|||
public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
|
||||
private static final String REDIS_DATABASE_KEY = "database";
|
||||
|
||||
private static final String SENTINEL_KEY = "sentinel";
|
||||
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RedisMetadataReport.class);
|
||||
|
||||
// protected , for test
|
||||
protected JedisPool pool;
|
||||
|
||||
protected JedisSentinelPool sentinelPool;
|
||||
|
||||
private Set<HostAndPort> jedisClusterNodes;
|
||||
private int timeout;
|
||||
private String password;
|
||||
|
|
@ -80,14 +75,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
for (URL tmpUrl : urls) {
|
||||
jedisClusterNodes.add(new HostAndPort(tmpUrl.getHost(), tmpUrl.getPort()));
|
||||
}
|
||||
} else if (url.getParameter(SENTINEL_KEY,false)) {
|
||||
Set<String> sentinels = new HashSet<>();
|
||||
List<URL> urls = url.getBackupUrls();
|
||||
for (URL tmpUrl : urls) {
|
||||
sentinels.add(tmpUrl.getHost()+":"+ tmpUrl.getPort());
|
||||
}
|
||||
int database = url.getParameter(REDIS_DATABASE_KEY, 0);
|
||||
sentinelPool = new JedisSentinelPool("mymaster",sentinels ,new GenericObjectPoolConfig<>(), timeout, password, database);
|
||||
} else {
|
||||
int database = url.getParameter(REDIS_DATABASE_KEY, 0);
|
||||
pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort(), timeout, password, database);
|
||||
|
|
@ -141,25 +128,11 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
private void storeMetadata(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||
if (pool != null) {
|
||||
storeMetadataStandalone(metadataIdentifier, v);
|
||||
}else if(sentinelPool != null) {
|
||||
storeMetadataInSentinel(metadataIdentifier, v);
|
||||
} else {
|
||||
storeMetadataInCluster(metadataIdentifier, v);
|
||||
}
|
||||
}
|
||||
|
||||
private void storeMetadataInSentinel(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||
try (Jedis jedisSentinel = sentinelPool.getResource()) {
|
||||
jedisSentinel.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, jedisParams);
|
||||
} catch (Throwable e) {
|
||||
String msg =
|
||||
"Failed to put " + metadataIdentifier + " to redis cluster " + v + ", cause: " + e.getMessage();
|
||||
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||
throw new RpcException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void storeMetadataInCluster(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||
try (JedisCluster jedisCluster =
|
||||
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||
|
|
@ -185,24 +158,11 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
private void deleteMetadata(BaseMetadataIdentifier metadataIdentifier) {
|
||||
if (pool != null) {
|
||||
deleteMetadataStandalone(metadataIdentifier);
|
||||
}else if(sentinelPool != null) {
|
||||
deleteMetadataSentinel(metadataIdentifier);
|
||||
} else {
|
||||
deleteMetadataInCluster(metadataIdentifier);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteMetadataSentinel(BaseMetadataIdentifier metadataIdentifier) {
|
||||
try (Jedis jedisSentinel = sentinelPool.getResource()) {
|
||||
jedisSentinel.del(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||||
} catch (Throwable e) {
|
||||
String msg = "Failed to delete " + metadataIdentifier + " from redis , cause: " + e.getMessage();
|
||||
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||
throw new RpcException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void deleteMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) {
|
||||
try (JedisCluster jedisCluster =
|
||||
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||
|
|
@ -227,24 +187,11 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
private String getMetadata(BaseMetadataIdentifier metadataIdentifier) {
|
||||
if (pool != null) {
|
||||
return getMetadataStandalone(metadataIdentifier);
|
||||
}else if(sentinelPool != null) {
|
||||
return getMetadataSentinel(metadataIdentifier);
|
||||
} else {
|
||||
return getMetadataInCluster(metadataIdentifier);
|
||||
}
|
||||
}
|
||||
|
||||
private String getMetadataSentinel(BaseMetadataIdentifier metadataIdentifier) {
|
||||
try (Jedis jedisSentinel = sentinelPool.getResource()) {
|
||||
return jedisSentinel.get(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||||
} catch (Throwable e) {
|
||||
String msg = "Failed to get " + metadataIdentifier + " from redis , cause: " + e.getMessage();
|
||||
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||
throw new RpcException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String getMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) {
|
||||
try (JedisCluster jedisCluster =
|
||||
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||
|
|
@ -296,8 +243,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
private boolean storeMapping(String key, String field, String value, String ticket) {
|
||||
if (pool != null) {
|
||||
return storeMappingStandalone(key, field, value, ticket);
|
||||
}else if(sentinelPool != null) {
|
||||
return storeMappingSentinel(key, field, value, ticket);
|
||||
} else {
|
||||
return storeMappingInCluster(key, field, value, ticket);
|
||||
}
|
||||
|
|
@ -333,33 +278,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* use 'watch' to implement cas.
|
||||
* Find information about slot distribution by key.
|
||||
*/
|
||||
private boolean storeMappingSentinel(String key, String field, String value, String ticket) {
|
||||
try (Jedis jedisSentinel = sentinelPool.getResource()) {
|
||||
jedisSentinel.watch(key);
|
||||
String oldValue = jedisSentinel.hget(key, field);
|
||||
if (null == oldValue || null == ticket || oldValue.equals(ticket)) {
|
||||
Transaction transaction = jedisSentinel.multi();
|
||||
transaction.hset(key, field, value);
|
||||
List<Object> result = transaction.exec();
|
||||
if (null != result) {
|
||||
jedisSentinel.publish(buildPubSubKey(), field);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
jedisSentinel.unwatch();
|
||||
} catch (Throwable e) {
|
||||
String msg = "Failed to put " + key + ":" + field + " to redis " + value + ", cause: " + e.getMessage();
|
||||
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||
throw new RpcException(msg, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* use 'watch' to implement cas.
|
||||
* Find information about slot distribution by key.
|
||||
|
|
@ -421,8 +339,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
private String getMappingData(String key, String field) {
|
||||
if (pool != null) {
|
||||
return getMappingDataStandalone(key, field);
|
||||
}else if(sentinelPool != null) {
|
||||
return getMappingDataSentinel(key, field);
|
||||
} else {
|
||||
return getMappingDataInCluster(key, field);
|
||||
}
|
||||
|
|
@ -439,17 +355,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
}
|
||||
}
|
||||
|
||||
private String getMappingDataSentinel(String key, String field) {
|
||||
try (Jedis jedisSentinel = sentinelPool.getResource()) {
|
||||
return jedisSentinel.hget(key, field);
|
||||
} catch (Throwable e) {
|
||||
String msg = "Failed to get " + key + ":" + field + " from redis , cause: " + e.getMessage();
|
||||
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||
throw new RpcException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String getMappingDataStandalone(String key, String field) {
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
return jedis.hget(key, field);
|
||||
|
|
@ -597,14 +502,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
|||
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||
throw new RpcException(msg, e);
|
||||
}
|
||||
} else if (sentinelPool != null) {
|
||||
try (Jedis jedisSentinel = sentinelPool.getResource()) {
|
||||
jedisSentinel.subscribe(notifySub, path);
|
||||
} catch (Throwable e) {
|
||||
String msg = "Failed to subscribe " + path + ", cause: " + e.getMessage();
|
||||
logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
|
||||
throw new RpcException(msg, e);
|
||||
}
|
||||
} else {
|
||||
try (JedisCluster jedisCluster = new JedisCluster(
|
||||
jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||
|
|
|
|||
|
|
@ -23,27 +23,15 @@ dubbo:
|
|||
address: redis://${spring.data.redis.host}:${spring.data.redis.port}
|
||||
group: DUBBO_GROUP
|
||||
username: dubbo
|
||||
password: ruoyi123
|
||||
password: ${spring.data.redis.password}
|
||||
# 集群开关
|
||||
sentinel: true
|
||||
cluster: false
|
||||
parameters:
|
||||
namespace: ${spring.profiles.active}
|
||||
database: ${spring.data.redis.database}
|
||||
timeout: ${spring.data.redis.timeout}
|
||||
backup: 53.238.79.33:26380,53.238.79.34:26380,53.238.79.35:26380
|
||||
# metadata-report:
|
||||
# address: redis://${spring.data.redis.host}:${spring.data.redis.port}
|
||||
# group: DUBBO_GROUP
|
||||
# username: dubbo
|
||||
# password: ${spring.data.redis.password}
|
||||
# # 集群开关
|
||||
# cluster: false
|
||||
# parameters:
|
||||
# namespace: ${spring.profiles.active}
|
||||
# database: ${spring.data.redis.database}
|
||||
# timeout: ${spring.data.redis.timeout}
|
||||
# # 集群地址 cluster 为 true 生效
|
||||
# backup: 127.0.0.1:6379,127.0.0.1:6381
|
||||
# 集群地址 cluster 为 true 生效
|
||||
backup: 127.0.0.1:6379,127.0.0.1:6381
|
||||
# 消费者相关配置
|
||||
consumer:
|
||||
# 结果缓存(LRU算法)
|
||||
|
|
@ -55,12 +43,3 @@ dubbo:
|
|||
retries: 0
|
||||
# 初始化检查
|
||||
check: false
|
||||
|
||||
logging:
|
||||
level:
|
||||
# 设置 Dubbo 核心包的日志级别为 DEBUG
|
||||
org.apache.dubbo: DEBUG
|
||||
# 如果需要更细粒度的调试,可指定元数据报告模块
|
||||
org.apache.dubbo.metadata: DEBUG
|
||||
# Redis 客户端日志(可选)
|
||||
io.lettuce.core: WARN # 避免 Redis 连接日志过多
|
||||
|
|
|
|||
|
|
@ -38,12 +38,12 @@ public enum DataScopeType {
|
|||
/**
|
||||
* 部门数据权限
|
||||
*/
|
||||
DEPT("3", " #{#deptName} = #{#user.manageDeptId} ", " 1 = 0 "),
|
||||
DEPT("3", " #{#deptName} = #{#user.deptId} ", " 1 = 0 "),
|
||||
|
||||
/**
|
||||
* 部门及以下数据权限
|
||||
*/
|
||||
DEPT_AND_CHILD("4", " #{#deptName} IN ( #{@sdss.getDeptAndChild( #user.manageDeptId )} )", " 1 = 0 "),
|
||||
DEPT_AND_CHILD("4", " #{#deptName} IN ( #{@sdss.getDeptAndChild( #user.deptId )} )", " 1 = 0 "),
|
||||
|
||||
/**
|
||||
* 仅本人数据权限
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import org.springframework.core.task.VirtualThreadTaskExecutor;
|
|||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Objects;
|
||||
import java.util.TimeZone;
|
||||
|
||||
/**
|
||||
|
|
@ -95,22 +94,6 @@ public class RedisConfiguration {
|
|||
.setReadMode(clusterServersConfig.getReadMode())
|
||||
.setSubscriptionMode(clusterServersConfig.getSubscriptionMode());
|
||||
}
|
||||
// 哨兵模式
|
||||
RedissonProperties.Sentinel sentinel = redissonProperties.getSentinel();
|
||||
if (Objects.nonNull(sentinel)) {
|
||||
config.useSentinelServers()
|
||||
.setNameMapper(new KeyPrefixHandler(redissonProperties.getKeyPrefix()))
|
||||
.setTimeout(sentinel.getTimeout())
|
||||
.setClientName(sentinel.getClientName())
|
||||
.setIdleConnectionTimeout(sentinel.getIdleConnectionTimeout())
|
||||
.setSubscriptionConnectionPoolSize(sentinel.getSubscriptionConnectionPoolSize())
|
||||
.setMasterConnectionMinimumIdleSize(sentinel.getMasterConnectionMinimumIdleSize())
|
||||
.setMasterConnectionPoolSize(sentinel.getMasterConnectionPoolSize())
|
||||
.setSlaveConnectionMinimumIdleSize(sentinel.getSlaveConnectionMinimumIdleSize())
|
||||
.setSlaveConnectionPoolSize(sentinel.getSlaveConnectionPoolSize())
|
||||
.setReadMode(sentinel.getReadMode())
|
||||
.setSubscriptionMode(sentinel.getSubscriptionMode());
|
||||
}
|
||||
log.info("初始化 redis 配置");
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,8 +40,6 @@ public class RedissonProperties {
|
|||
*/
|
||||
private ClusterServersConfig clusterServersConfig;
|
||||
|
||||
private Sentinel sentinel;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public static class SingleServerConfig {
|
||||
|
|
@ -134,60 +132,4 @@ public class RedissonProperties {
|
|||
|
||||
}
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
public static class Sentinel {
|
||||
|
||||
/**
|
||||
* 客户端名称
|
||||
*/
|
||||
private String clientName;
|
||||
|
||||
/**
|
||||
* master最小空闲连接数
|
||||
*/
|
||||
private int masterConnectionMinimumIdleSize;
|
||||
|
||||
/**
|
||||
* master连接池大小
|
||||
*/
|
||||
private int masterConnectionPoolSize;
|
||||
|
||||
/**
|
||||
* slave最小空闲连接数
|
||||
*/
|
||||
private int slaveConnectionMinimumIdleSize;
|
||||
|
||||
/**
|
||||
* slave连接池大小
|
||||
*/
|
||||
private int slaveConnectionPoolSize;
|
||||
|
||||
/**
|
||||
* 连接空闲超时,单位:毫秒
|
||||
*/
|
||||
private int idleConnectionTimeout;
|
||||
|
||||
/**
|
||||
* 命令等待超时,单位:毫秒
|
||||
*/
|
||||
private int timeout;
|
||||
|
||||
/**
|
||||
* 发布和订阅连接池大小
|
||||
*/
|
||||
private int subscriptionConnectionPoolSize;
|
||||
|
||||
/**
|
||||
* 读取模式
|
||||
*/
|
||||
private ReadMode readMode;
|
||||
|
||||
/**
|
||||
* 订阅模式
|
||||
*/
|
||||
private SubscriptionMode subscriptionMode;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,9 @@
|
|||
package org.dromara.common.redis.utils;
|
||||
|
||||
import cn.hutool.core.date.DateUnit;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.dromara.common.core.utils.RedisConstants;
|
||||
import org.dromara.common.core.utils.SpringUtils;
|
||||
import org.redisson.api.*;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
|
|
@ -347,29 +345,6 @@ public class RedisUtils {
|
|||
return rSet.add(data);
|
||||
}
|
||||
|
||||
public static <T> void set(final String key, String data,long time) {
|
||||
if (time > 0){
|
||||
CLIENT.getBucket(key).set(data, time, TimeUnit.SECONDS);
|
||||
}else {
|
||||
CLIENT.getBucket(key).set(data);
|
||||
}
|
||||
|
||||
/*RSet<T> rSet = CLIENT.getSet(key);
|
||||
if (time > 0){
|
||||
rSet.expireAsync(time,TimeUnit.SECONDS);
|
||||
}
|
||||
return rSet.add(data);*/
|
||||
}
|
||||
|
||||
public static <T> void del(final String key) {
|
||||
CLIENT.getBucket(key).delete();
|
||||
/*RSet<T> rSet = CLIENT.getSet(key);
|
||||
if (time > 0){
|
||||
rSet.expireAsync(time,TimeUnit.SECONDS);
|
||||
}
|
||||
return rSet.add(data);*/
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册Set监听器
|
||||
* <p>
|
||||
|
|
@ -598,73 +573,6 @@ public class RedisUtils {
|
|||
System.out.println("redis:"+list);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量插入数据,并为整个 RMap 设置过期时间
|
||||
*
|
||||
* @param data 需要批量插入的数据
|
||||
* @param timeout 设置的过期时间
|
||||
* @param timeUnit 过期时间的单位
|
||||
*/
|
||||
public static void batchPutWithExpire(Map<String, String> data, long timeout, TimeUnit timeUnit) {
|
||||
// 创建 RBatch 实例
|
||||
RBatch batch = CLIENT.createBatch();
|
||||
|
||||
// 获取 RMapAsync 对象
|
||||
RMapAsync<Object, Object> mapAsync = batch.getMap("myMap");
|
||||
|
||||
// 批量操作:将多个数据添加到 map 中
|
||||
for (Map.Entry<String, String> entry : data.entrySet()) {
|
||||
mapAsync.putAsync(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
// 执行批量操作
|
||||
batch.execute();
|
||||
|
||||
// 获取同步的 RMap 对象并设置过期时间
|
||||
RMap<Object, Object> mapSync = CLIENT.getMap("myMap");
|
||||
mapSync.expire(timeout, timeUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量插入数据
|
||||
*
|
||||
* @param data 需要批量插入的数据
|
||||
*/
|
||||
public static void batchPut(Map<String, String> data) {
|
||||
// 创建 RBatch 实例
|
||||
RBatch batch = CLIENT.createBatch();
|
||||
|
||||
// 获取 RMapAsync 对象
|
||||
RMapAsync<Object, Object> mapAsync = batch.getMap("myMap");
|
||||
|
||||
// 批量操作:将多个数据添加到 map 中
|
||||
for (Map.Entry<String, String> entry : data.entrySet()) {
|
||||
mapAsync.putAsync(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
// 执行批量操作
|
||||
batch.execute();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 key 获取指定的值
|
||||
*
|
||||
* @param key 要查询的 key
|
||||
* @return 查询到的值
|
||||
*/
|
||||
public static JSONObject getData(String key) {
|
||||
// 获取同步的 RMap 对象
|
||||
RMap<Object, Object> map = CLIENT.getMap("myMap");
|
||||
|
||||
// 根据 key 获取数据
|
||||
Object value = map.get(key);
|
||||
if (null == value){
|
||||
return null;
|
||||
}
|
||||
return JSONUtil.parseObj(value.toString());
|
||||
}
|
||||
|
||||
/*
|
||||
* 模糊查询
|
||||
* */
|
||||
|
|
@ -688,42 +596,12 @@ public class RedisUtils {
|
|||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据模式(例如前缀)模糊匹配 Redis 中的 keys,并获取每个 key 的值
|
||||
*
|
||||
* @param pattern 模糊匹配的模式,例如 "user:*"
|
||||
* @return 匹配到的 key 和 value 对
|
||||
*/
|
||||
public static List<JSONObject> getMatchingKeysAndValues(String pattern) {
|
||||
RKeys rKeys = CLIENT.getKeys();
|
||||
Iterable<String> keysIterable = rKeys.getKeysByPattern(pattern); // 获取匹配的 key
|
||||
|
||||
// 获取匹配的键值对
|
||||
RMap<String, String> map = CLIENT.getMap("myMap");
|
||||
|
||||
List<JSONObject> list = new ArrayList<>();
|
||||
// RBatch batch = CLIENT.createBatch();
|
||||
// 批量获取这些key的值
|
||||
for (String key : keysIterable) {
|
||||
String value = map.get(key); // 获取每个 key 对应的 value
|
||||
if (null != value){
|
||||
JSONObject jsonObject = JSONUtil.parseObj(value);
|
||||
list.add(jsonObject);
|
||||
}
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
/*
|
||||
* 根据key获取RBucket值
|
||||
* */
|
||||
public static JSONObject getBucket(String key){
|
||||
RBucket<Object> bucket = CLIENT.getBucket(key);
|
||||
Object value = bucket.get();
|
||||
if (null == value){
|
||||
return null;
|
||||
}
|
||||
return JSONUtil.parseObj(value.toString());
|
||||
}
|
||||
|
||||
|
|
@ -745,18 +623,12 @@ public class RedisUtils {
|
|||
/*
|
||||
* 批量插入GEO数据
|
||||
* */
|
||||
public static long geoAdd(Double lng,Double lat,String member){
|
||||
RGeo<String> geo = CLIENT.getGeo(RedisConstants.ONLINE_USERS_GEO);
|
||||
long count1 = geo.add(lng, lat, member);
|
||||
return count1;
|
||||
}
|
||||
|
||||
// 查询半径周边 米内的成员
|
||||
public static List<String> nearByXYReadonly(double centerLon,double centerLat,double distance){
|
||||
RGeo<String> geo = CLIENT.getGeo(RedisConstants.ONLINE_USERS_GEO);
|
||||
List<String> members = geo.radius(centerLon, centerLat, distance, GeoUnit.METERS);
|
||||
return members;
|
||||
}
|
||||
|
||||
/*public static void batchGeoAdd(Map<String, GeoEntry> entryMap){
|
||||
RGeo<RMap<String, String>> geo = CLIENT.getGeo("myGeo");
|
||||
Map<String, GeoEntry> entries = new HashMap<>();
|
||||
entries.put("place1", new GeoEntry(13.361389, 38.115556, "Palermo"));
|
||||
entries.put("place2", new GeoEntry(15.087269, 37.502669, "Catania"));
|
||||
geo.p(entries);
|
||||
}*/
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,8 +36,6 @@ public class LoginHelper {
|
|||
public static final String USER_NAME_KEY = "userName";
|
||||
public static final String DEPT_KEY = "deptId";
|
||||
public static final String DEPT_NAME_KEY = "deptName";
|
||||
|
||||
public static final String MANAGE_DEPT__KEY = "manageDeptId";
|
||||
public static final String DEPT_CATEGORY_KEY = "deptCategory";
|
||||
public static final String CLIENT_KEY = "clientid";
|
||||
|
||||
|
|
@ -55,7 +53,6 @@ public class LoginHelper {
|
|||
.setExtra(USER_KEY, loginUser.getUserId())
|
||||
.setExtra(USER_NAME_KEY, loginUser.getUsername())
|
||||
.setExtra(DEPT_KEY, loginUser.getDeptId())
|
||||
.setExtra(MANAGE_DEPT__KEY,loginUser.getManageDeptId())
|
||||
.setExtra(DEPT_NAME_KEY, loginUser.getDeptName())
|
||||
.setExtra(DEPT_CATEGORY_KEY, loginUser.getDeptCategory())
|
||||
);
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@
|
|||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>wzhj-gateway</artifactId>
|
||||
<artifactId>stwzhj-gateway</artifactId>
|
||||
|
||||
<description>
|
||||
wzhj-gateway网关模块
|
||||
stwzhj-gateway网关模块
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
|
|
@ -23,7 +23,6 @@ public class GlobalCacheRequestFilter implements GlobalFilter, Ordered {
|
|||
if (!WebFluxUtils.isJsonRequest(exchange)) {
|
||||
return chain.filter(exchange);
|
||||
}
|
||||
|
||||
return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {
|
||||
if (serverHttpRequest == exchange.getRequest()) {
|
||||
return chain.filter(exchange);
|
||||
|
|
@ -8,7 +8,7 @@ server:
|
|||
spring:
|
||||
application:
|
||||
# 应用名称
|
||||
name: wzhj-gateway
|
||||
name: stwzhj-gateway
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: @profiles.active@
|
||||
|
|
@ -0,0 +1,114 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="logs/${project.artifactId}"/>
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="console.log.pattern"
|
||||
value="%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}%n) - %msg%n"/>
|
||||
<property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"/>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${console.log.pattern}</pattern>
|
||||
<charset>utf-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="file_console" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/console.log</file>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/console.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大 1天 -->
|
||||
<maxHistory>1</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
<charset>utf-8</charset>
|
||||
</encoder>
|
||||
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>INFO</level>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<!-- 系统日志输出 -->
|
||||
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/info.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
</encoder>
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>INFO</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log.path}/error.log</file>
|
||||
<!-- 循环政策:基于时间创建日志文件 -->
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<!-- 日志文件名格式 -->
|
||||
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<!-- 日志最大的历史 60天 -->
|
||||
<maxHistory>60</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder>
|
||||
<pattern>${log.pattern}</pattern>
|
||||
</encoder>
|
||||
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||
<!-- 过滤的级别 -->
|
||||
<level>ERROR</level>
|
||||
<!-- 匹配时的操作:接收(记录) -->
|
||||
<onMatch>ACCEPT</onMatch>
|
||||
<!-- 不匹配时的操作:拒绝(不记录) -->
|
||||
<onMismatch>DENY</onMismatch>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<!-- info异步输出 -->
|
||||
<appender name="async_info" class="ch.qos.logback.classic.AsyncAppender">
|
||||
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
|
||||
<discardingThreshold>0</discardingThreshold>
|
||||
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
|
||||
<queueSize>512</queueSize>
|
||||
<!-- 添加附加的appender,最多只能添加一个 -->
|
||||
<appender-ref ref="file_info"/>
|
||||
</appender>
|
||||
|
||||
<!-- error异步输出 -->
|
||||
<appender name="async_error" class="ch.qos.logback.classic.AsyncAppender">
|
||||
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
|
||||
<discardingThreshold>0</discardingThreshold>
|
||||
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
|
||||
<queueSize>512</queueSize>
|
||||
<!-- 添加附加的appender,最多只能添加一个 -->
|
||||
<appender-ref ref="file_error"/>
|
||||
</appender>
|
||||
|
||||
<include resource="logback-logstash.xml" />
|
||||
|
||||
<!-- 开启 skywalking 日志收集 -->
|
||||
<include resource="logback-skylog.xml" />
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="console"/>
|
||||
<appender-ref ref="async_info"/>
|
||||
<appender-ref ref="async_error"/>
|
||||
<appender-ref ref="file_console"/>
|
||||
</root>
|
||||
</configuration>
|
||||
|
|
@ -9,19 +9,13 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<modules>
|
||||
<module>wzhj-system</module>
|
||||
<module>stwzhj-system</module>
|
||||
<module>stwzhj-gen</module>
|
||||
<module>stwzhj-job</module>
|
||||
<module>stwzhj-resource</module>
|
||||
<module>stwzhj-workflow</module>
|
||||
<module>wzhj-data2es</module>
|
||||
<module>stwzhj-data2es</module>
|
||||
<module>stwzhj-baseToSt</module>
|
||||
<module>wzhj-consumer</module>
|
||||
<module>wzhj-location</module>
|
||||
<module>stwzhj-dataToGas</module>
|
||||
<module>wzhj-websocket</module>
|
||||
<module>wzhj-extract</module>
|
||||
<module>wzhj-udp</module>
|
||||
</modules>
|
||||
|
||||
<artifactId>stwzhj-modules</artifactId>
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@
|
|||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>wzhj-consumer</artifactId>
|
||||
<artifactId>stwzhj-consumer</artifactId>
|
||||
|
||||
<dependencies>
|
||||
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
package org.dromara.kafka.consumer;
|
||||
|
||||
import com.ruansee.redis.JedisConfig;
|
||||
import com.ruansee.redis.RedisConfig;
|
||||
import com.ruansee.redis.RedisUtil;
|
||||
import com.ruansee.redis.RedissionLockUtil;
|
||||
import org.dromara.kafka.consumer.config.KafkaPropertiesConfig;
|
||||
import org.redisson.spring.starter.RedissonAutoConfiguration;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.data.redis.RedisRepositoriesAutoConfiguration;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.web.servlet.ServletComponentScan;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.FilterType;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-09-06 11:12
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableAsync
|
||||
@EnableConfigurationProperties({KafkaPropertiesConfig.class})
|
||||
@ServletComponentScan
|
||||
public class KafkaConsumerApplication {
|
||||
public static void main(String[] args){
|
||||
SpringApplication.run(KafkaConsumerApplication.class,args);
|
||||
}
|
||||
}
|
||||
|
|
@ -27,7 +27,7 @@ public class AsyncConfig {
|
|||
taskExecutor.setMaxPoolSize(20);
|
||||
taskExecutor.setQueueCapacity(200);
|
||||
taskExecutor.setKeepAliveSeconds(60);
|
||||
taskExecutor.setThreadNamePrefix("wzhj--kafkaConsumer--");
|
||||
taskExecutor.setThreadNamePrefix("hfapp--kafkaConsumer--");
|
||||
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
|
||||
taskExecutor.setAwaitTerminationSeconds(60);
|
||||
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
|
|
@ -0,0 +1,136 @@
|
|||
package org.dromara.kafka.consumer.config;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
public final class KafkaProperties
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
|
||||
|
||||
// Topic名称,安全模式下,需要以管理员用户添加当前用户的访问权限
|
||||
public final static String TOPIC = "t_gps_realtime";
|
||||
|
||||
private static Properties serverProps = new Properties();
|
||||
private static Properties producerProps = new Properties();
|
||||
|
||||
private static Properties consumerProps = new Properties();
|
||||
|
||||
private static Properties clientProps = new Properties();
|
||||
|
||||
private static KafkaProperties instance = null;
|
||||
|
||||
private KafkaProperties()
|
||||
{
|
||||
String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
|
||||
|
||||
try
|
||||
{
|
||||
File proFile = new File(filePath + "producer.properties");
|
||||
|
||||
if (proFile.exists())
|
||||
{
|
||||
producerProps.load(new FileInputStream(filePath + "producer.properties"));
|
||||
}
|
||||
|
||||
File conFile = new File(filePath + "producer.properties");
|
||||
|
||||
if (conFile.exists())
|
||||
{
|
||||
consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
|
||||
}
|
||||
|
||||
File serFile = new File(filePath + "server.properties");
|
||||
|
||||
if (serFile.exists())
|
||||
{
|
||||
serverProps.load(new FileInputStream(filePath + "server.properties"));
|
||||
}
|
||||
|
||||
File cliFile = new File(filePath + "client.properties");
|
||||
|
||||
if (cliFile.exists())
|
||||
{
|
||||
clientProps.load(new FileInputStream(filePath + "client.properties"));
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.info("The Exception occured.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized static KafkaProperties getInstance()
|
||||
{
|
||||
if (null == instance)
|
||||
{
|
||||
instance = new KafkaProperties();
|
||||
}
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取参数值
|
||||
* @param key properites的key值
|
||||
* @param defValue 默认值
|
||||
* @return
|
||||
*/
|
||||
public String getValues(String key, String defValue)
|
||||
{
|
||||
String rtValue = null;
|
||||
|
||||
if (null == key)
|
||||
{
|
||||
LOG.error("key is null");
|
||||
}
|
||||
else
|
||||
{
|
||||
rtValue = getPropertiesValue(key);
|
||||
}
|
||||
|
||||
if (null == rtValue)
|
||||
{
|
||||
LOG.warn("KafkaProperties.getValues return null, key is " + key);
|
||||
rtValue = defValue;
|
||||
}
|
||||
|
||||
LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
|
||||
|
||||
return rtValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据key值获取server.properties的值
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
private String getPropertiesValue(String key)
|
||||
{
|
||||
String rtValue = serverProps.getProperty(key);
|
||||
|
||||
// server.properties中没有,则再向producer.properties中获取
|
||||
if (null == rtValue)
|
||||
{
|
||||
rtValue = producerProps.getProperty(key);
|
||||
}
|
||||
|
||||
// producer中没有,则再向consumer.properties中获取
|
||||
if (null == rtValue)
|
||||
{
|
||||
rtValue = consumerProps.getProperty(key);
|
||||
}
|
||||
|
||||
// consumer没有,则再向client.properties中获取
|
||||
if (null == rtValue)
|
||||
{
|
||||
rtValue = clientProps.getProperty(key);
|
||||
}
|
||||
|
||||
return rtValue;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
package org.dromara.kafka.consumer.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-09-06 15:13
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "mykafka")
|
||||
@Profile(value = "dev")
|
||||
public
|
||||
class KafkaPropertiesConfig {
|
||||
private String serverUrl;
|
||||
|
||||
private MyConsumerProperties consumerProperties = new MyConsumerProperties();
|
||||
|
||||
public String getServerUrl() {
|
||||
return serverUrl;
|
||||
}
|
||||
|
||||
public void setServerUrl(String serverUrl) {
|
||||
this.serverUrl = serverUrl;
|
||||
}
|
||||
|
||||
public MyConsumerProperties getConsumerProperties() {
|
||||
return consumerProperties;
|
||||
}
|
||||
|
||||
public void setConsumerProperties(MyConsumerProperties consumerProperties) {
|
||||
this.consumerProperties = consumerProperties;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
package org.dromara.kafka.consumer.config;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-09-07 14:54
|
||||
*/
|
||||
public class MyConsumerProperties {
|
||||
private String clientId;
|
||||
private String groupId = "222";
|
||||
|
||||
public String getClientId() {
|
||||
return clientId;
|
||||
}
|
||||
|
||||
public void setClientId(String clientId) {
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
public String getGroupId() {
|
||||
return groupId;
|
||||
}
|
||||
|
||||
public void setGroupId(String groupId) {
|
||||
this.groupId = groupId;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,159 @@
|
|||
package org.dromara.kafka.consumer.config;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.dromara.kafka.consumer.handler.KafkaSecurityUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
public class NewConsumer extends Thread{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(NewConsumer.class);
|
||||
|
||||
private final KafkaConsumer<Integer, String> consumer;
|
||||
|
||||
private final String topic;
|
||||
|
||||
// 一次请求的最大等待时间
|
||||
private final int waitTime = 10000;
|
||||
|
||||
// Broker连接地址
|
||||
private final String bootstrapServers = "bootstrap.servers";
|
||||
// Group id
|
||||
private final String groupId = "group.id";
|
||||
// 消息内容使用的反序列化类
|
||||
private final String valueDeserializer = "value.deserializer";
|
||||
// 消息Key值使用的反序列化类
|
||||
private final String keyDeserializer = "key.deserializer";
|
||||
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
|
||||
private final String securityProtocol = "security.protocol";
|
||||
// 服务名
|
||||
private final String saslKerberosServiceName = "sasl.kerberos.service.name";
|
||||
// 域名
|
||||
private final String kerberosDomainName = "kerberos.domain.name";
|
||||
// 是否自动提交offset
|
||||
private final String enableAutoCommit = "enable.auto.commit";
|
||||
// 自动提交offset的时间间隔
|
||||
private final String autoCommitIntervalMs = "auto.commit.interval.ms";
|
||||
|
||||
// 会话超时时间
|
||||
private final String sessionTimeoutMs = "session.timeout.ms";
|
||||
|
||||
/**
|
||||
* 用户自己申请的机机账号keytab文件名称
|
||||
*/
|
||||
private static final String USER_KEYTAB_FILE = "user.keytab";
|
||||
|
||||
/**
|
||||
* 用户自己申请的机机账号名称
|
||||
*/
|
||||
private static final String USER_PRINCIPAL = "aqdsj_ruansi";
|
||||
|
||||
/**
|
||||
* NewConsumer构造函数
|
||||
* @param topic 订阅的Topic名称
|
||||
*/
|
||||
public NewConsumer(String topic) {
|
||||
|
||||
Properties props = new Properties();
|
||||
|
||||
KafkaProperties kafkaProc = KafkaProperties.getInstance();
|
||||
// Broker连接地址
|
||||
props.put(bootstrapServers,
|
||||
kafkaProc.getValues(bootstrapServers, "localhost:21007"));
|
||||
// Group id
|
||||
props.put(groupId, "DemoConsumer");
|
||||
// 是否自动提交offset
|
||||
props.put(enableAutoCommit, "true");
|
||||
// 自动提交offset的时间间隔
|
||||
props.put(autoCommitIntervalMs, "1000");
|
||||
// 会话超时时间
|
||||
props.put(sessionTimeoutMs, "30000");
|
||||
// 消息Key值使用的反序列化类
|
||||
props.put(keyDeserializer,
|
||||
"org.apache.kafka.common.serialization.IntegerDeserializer");
|
||||
// 消息内容使用的反序列化类
|
||||
props.put(valueDeserializer,
|
||||
"org.apache.kafka.common.serialization.StringDeserializer");
|
||||
// 安全协议类型
|
||||
props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT"));
|
||||
// 服务名
|
||||
props.put(saslKerberosServiceName, "kafka");
|
||||
// 域名
|
||||
props.put(kerberosDomainName, kafkaProc.getValues(kerberosDomainName, "hadoop.hadoop.com"));
|
||||
consumer = new KafkaConsumer<Integer, String>(props);
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
/**
|
||||
* 订阅Topic的消息处理函数
|
||||
*/
|
||||
public void doWork()
|
||||
{
|
||||
// 订阅
|
||||
consumer.subscribe(Collections.singletonList(this.topic));
|
||||
// 消息消费请求
|
||||
ConsumerRecords<Integer, String> records = consumer.poll(waitTime);
|
||||
// 消息处理
|
||||
for (ConsumerRecord<Integer, String> record : records)
|
||||
{
|
||||
LOG.info("[NewConsumerExample], Received message: (" + record.key() + ", " + record.value()
|
||||
+ ") at offset " + record.offset());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static void main(String[] args)
|
||||
{
|
||||
if (KafkaSecurityUtil.isSecurityModel())
|
||||
{
|
||||
try
|
||||
{
|
||||
LOG.info("Securitymode start.");
|
||||
|
||||
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
|
||||
KafkaSecurityUtil.securityPrepare();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.error("Security prepare failure.");
|
||||
LOG.error("The IOException occured : {}.", e);
|
||||
return;
|
||||
}
|
||||
LOG.info("Security prepare success.");
|
||||
}
|
||||
|
||||
NewConsumer consumerThread = new NewConsumer(KafkaProperties.TOPIC);
|
||||
consumerThread.start();
|
||||
|
||||
// 等到60s后将consumer关闭,实际执行过程中可修改
|
||||
try
|
||||
{
|
||||
Thread.sleep(60000);
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
LOG.info("The InterruptedException occured : {}.", e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
consumerThread.shutdown();
|
||||
consumerThread.consumer.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
doWork();
|
||||
}
|
||||
|
||||
private void shutdown(){
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
|
@ -15,6 +15,7 @@ import java.util.Date;
|
|||
@Data
|
||||
public class EsGpsInfo implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 7455495841680488351L;
|
||||
/**
|
||||
* 唯一码(外部系统)合肥版本不需要 21位id,
|
||||
* 到时候上传省厅的时候 需要在kafka发送端处理,生成一个省厅需要的21位id
|
||||
|
|
@ -0,0 +1,234 @@
|
|||
package org.dromara.kafka.consumer.handler;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.core.bean.copier.CopyOptions;
|
||||
import cn.hutool.core.convert.ConvertException;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.ruansee.response.ApiResponse;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.dubbo.config.annotation.DubboReference;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.dromara.common.core.domain.R;
|
||||
import org.dromara.data2es.api.RemoteDataToEsService;
|
||||
import org.dromara.data2es.api.domain.RemoteGpsInfo;
|
||||
import org.dromara.kafka.consumer.entity.EsGpsInfo;
|
||||
import org.dromara.kafka.consumer.entity.EsGpsInfoVO;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-09-06 16:44
|
||||
*/
|
||||
public class ConsumerWorker implements Runnable {
|
||||
private ConsumerRecord<String, Object> record;
|
||||
private Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
|
||||
|
||||
public static LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque<>(5000);
|
||||
|
||||
private String cityCode ;
|
||||
|
||||
ConsumerWorker(ConsumerRecord<String, Object> record, String cityCode) {
|
||||
this.record = record;
|
||||
this.cityCode = cityCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
//其他地市使用的方法,这里使用了一个巧妙的方法,我们开发的地市都是传4位,这种其他地市的cityCode传大于4位,然后截取
|
||||
if(cityCode.length() > 4){
|
||||
cityCode = cityCode.substring(0,4);
|
||||
normalRequest();
|
||||
}else {
|
||||
//六安、安庆等地市的方法,这些地市都是我们自己公司开发的东西。
|
||||
luanrequest();
|
||||
// luanrequestBatch();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* 废弃方法
|
||||
* */
|
||||
private void luanrequestBatch() {
|
||||
Object value = record.value();
|
||||
String topic = record.topic();
|
||||
List<EsGpsInfo> list = new ArrayList<>();
|
||||
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
|
||||
List<JSONObject> jsonObjects = JSON.parseArray((String) value, JSONObject.class);
|
||||
for (JSONObject jsonObject : jsonObjects) {
|
||||
EsGpsInfo esGpsInfo;
|
||||
/*try {
|
||||
jsonObject = JSONUtil.parseObj(((String) value));
|
||||
}catch (ConvertException e){
|
||||
logger.info("jsonObject=null:error={}",e.getMessage());
|
||||
return;
|
||||
}*/
|
||||
try {
|
||||
esGpsInfo = JSONUtil.toBean(jsonObject, EsGpsInfo.class);
|
||||
}catch (ConvertException e){
|
||||
logger.info("EsGpsInfo=null:error={}",e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
if(Objects.isNull(esGpsInfo)){
|
||||
logger.info("esGpsInfo=null no error");
|
||||
return;
|
||||
}
|
||||
String deviceCode = esGpsInfo.getDeviceCode();
|
||||
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
|
||||
logger.info("deviceCode:{} is null or is too long ",deviceCode);
|
||||
return;
|
||||
}
|
||||
String latitude = esGpsInfo.getLat();
|
||||
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
|
||||
logger.info("latitude:{} is null or is zero ",latitude);
|
||||
return;
|
||||
}
|
||||
String longitude = esGpsInfo.getLng();
|
||||
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
|
||||
logger.info("longitude:{} is null or is zero ",longitude);
|
||||
return;
|
||||
}
|
||||
esGpsInfo.setInfoSource(cityCode);
|
||||
|
||||
esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime"))));
|
||||
list.add(esGpsInfo);
|
||||
}
|
||||
// dataToEsService.saveGpsInfoBatch(list);
|
||||
}
|
||||
|
||||
private void luanrequest() {
|
||||
Object value = record.value();
|
||||
String topic = record.topic();
|
||||
|
||||
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
|
||||
RemoteGpsInfo esGpsInfo;
|
||||
JSONObject jsonObject;
|
||||
try {
|
||||
jsonObject = JSONUtil.parseObj(((String) value));
|
||||
}catch (ConvertException e){
|
||||
logger.info("jsonObject=null:error={}",e.getMessage());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
esGpsInfo = JSONUtil.toBean(jsonObject, RemoteGpsInfo.class);
|
||||
}catch (ConvertException e){
|
||||
logger.info("EsGpsInfo=null:error={}",e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
if(Objects.isNull(esGpsInfo)){
|
||||
logger.info("esGpsInfo=null no error");
|
||||
return;
|
||||
}
|
||||
String deviceCode = esGpsInfo.getDeviceCode();
|
||||
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
|
||||
logger.info("deviceCode:{} is null or is too long ",deviceCode);
|
||||
return;
|
||||
}
|
||||
String latitude = esGpsInfo.getLat();
|
||||
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
|
||||
logger.info("latitude:{} is null or is zero ",latitude);
|
||||
return;
|
||||
}
|
||||
String longitude = esGpsInfo.getLng();
|
||||
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
|
||||
logger.info("longitude:{} is null or is zero ",longitude);
|
||||
return;
|
||||
}
|
||||
esGpsInfo.setInfoSource(cityCode);
|
||||
try {
|
||||
esGpsInfo.setGpsTime(new Date(Long.valueOf(jsonObject.getStr("gpsTime"))));
|
||||
}catch (Exception e){
|
||||
logger.error("error_msg={}",e.getMessage());
|
||||
}
|
||||
logger.info("esGpsInfo={}",esGpsInfo);
|
||||
boolean offer = linkedBlockingDeque.offer(esGpsInfo);
|
||||
R response = R.ok(offer);
|
||||
if(Objects.isNull(response)){
|
||||
logger.info("response == null");
|
||||
}
|
||||
logger.info("code={},msg={}",response.getCode(),response.getMsg());
|
||||
if(200 == response.getCode()){
|
||||
logger.info("topic={},data2es={},gpsTime={}",topic,"success",esGpsInfo.getGpsTime());
|
||||
}else{
|
||||
logger.info("topic={},data2es={}",topic,response.getMsg());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 通用的请求(一般地市采用这个方法)
|
||||
*/
|
||||
private void normalRequest() {
|
||||
Object value = record.value();
|
||||
String topic = record.topic();
|
||||
|
||||
logger.info("offset={},topic={},value={}", record.offset(), topic,value);
|
||||
|
||||
RemoteGpsInfo esGpsInfo = new RemoteGpsInfo();
|
||||
EsGpsInfoVO esGpsInfoVO;
|
||||
try {
|
||||
esGpsInfoVO = JSONUtil.toBean(((String) value), EsGpsInfoVO.class);
|
||||
}catch (ConvertException e){
|
||||
logger.info("esGpsInfoVO=null:error={}",e.getMessage());
|
||||
return;
|
||||
}
|
||||
if(Objects.isNull(esGpsInfoVO)){
|
||||
logger.info("esGpsInfoVO=null no error");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
DateTime parse = DateUtil.parse(esGpsInfoVO.getGpsTime(), "yyyy-MM-dd HH:mm:ss");
|
||||
}catch (Exception e){
|
||||
logger.info("gpsTime:{} format error", esGpsInfoVO.getGpsTime());
|
||||
return;
|
||||
}
|
||||
|
||||
String deviceCode = esGpsInfoVO.getDeviceCode();
|
||||
if(StringUtils.isEmpty(deviceCode) || deviceCode.length() > 100){
|
||||
logger.info("deviceCode:{} is null or is too long ",deviceCode);
|
||||
return;
|
||||
}
|
||||
String latitude = esGpsInfoVO.getLatitude();
|
||||
if(StringUtils.isEmpty(latitude) || "0.0".equals(latitude)){
|
||||
logger.info("latitude:{} is null or is zero ",latitude);
|
||||
return;
|
||||
}
|
||||
String longitude = esGpsInfoVO.getLongitude();
|
||||
if(StringUtils.isEmpty(longitude) || "0.0".equals(longitude)){
|
||||
logger.info("longitude:{} is null or is zero ",longitude);
|
||||
return;
|
||||
}
|
||||
BeanUtil.copyProperties(esGpsInfoVO,esGpsInfo,new CopyOptions());
|
||||
esGpsInfo.setLat(latitude);
|
||||
esGpsInfo.setLng(esGpsInfoVO.getLongitude());
|
||||
esGpsInfo.setOrientation(esGpsInfoVO.getDirection());
|
||||
esGpsInfo.setInfoSource(cityCode);
|
||||
|
||||
boolean offer = linkedBlockingDeque.offer(esGpsInfo);
|
||||
R response = R.ok(offer);
|
||||
if(200 == response.getCode()){
|
||||
logger.info("topic={},data2es={}",topic,"success");
|
||||
}else{
|
||||
logger.error("topic={},data2es={}",topic,"fail");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -7,8 +7,6 @@ import org.apache.dubbo.config.annotation.DubboReference;
|
|||
import org.dromara.data2es.api.RemoteDataToEsService;
|
||||
import org.dromara.data2es.api.domain.RemoteGpsInfo;
|
||||
import org.dromara.kafka.consumer.entity.EsGpsInfo;
|
||||
import org.dromara.system.api.RemoteDeviceService;
|
||||
import org.dromara.system.api.domain.bo.RemoteDeviceBo;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
|
@ -33,28 +31,21 @@ public class DataInsertBatchHandler implements CommandLineRunner {
|
|||
@DubboReference
|
||||
private RemoteDataToEsService gpsService;
|
||||
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
|
||||
LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque; //定位信息队列
|
||||
// LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列
|
||||
LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque;
|
||||
singleThreadExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
List<RemoteGpsInfo> list = new ArrayList<>();
|
||||
// List<RemoteDeviceBo> bases = new ArrayList<>();
|
||||
Queues.drain(linkedBlockingDeque, list, 200, 5, TimeUnit.SECONDS);
|
||||
// Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS);
|
||||
log.info("batch size={}", list.size());
|
||||
if(CollectionUtil.isNotEmpty(list)) {
|
||||
gpsService.saveDataBatch(list);
|
||||
}
|
||||
/*if(CollectionUtil.isNotEmpty(bases)) {
|
||||
deviceService.batchSaveDevice(bases);
|
||||
}*/
|
||||
} catch (Exception e) {
|
||||
log.error("缓存队列批量消费异常:{}", e.getMessage());
|
||||
}
|
||||
|
|
@ -0,0 +1,98 @@
|
|||
package org.dromara.kafka.consumer.handler;
|
||||
|
||||
import org.apache.dubbo.config.annotation.DubboReference;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.dromara.data2es.api.RemoteDataToEsService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.listener.MessageListener;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-09-06 16:39
|
||||
*/
|
||||
public class KafkaConsumerRunnable implements Runnable {
|
||||
|
||||
private Map props;
|
||||
private ThreadPoolExecutor taskExecutor;
|
||||
|
||||
private String cityCode;
|
||||
private Logger logger = LoggerFactory.getLogger(KafkaConsumerRunnable.class);
|
||||
|
||||
public KafkaConsumerRunnable(Map props, ThreadPoolExecutor taskExecutor,
|
||||
String cityCode) {
|
||||
this.props = props;
|
||||
this.taskExecutor = taskExecutor;
|
||||
this.cityCode = cityCode;
|
||||
}
|
||||
|
||||
private DefaultKafkaConsumerFactory buildConsumerFactory(){
|
||||
return new DefaultKafkaConsumerFactory<String, String>(props);
|
||||
}
|
||||
|
||||
private ContainerProperties containerProperties(String[] topic, MessageListener<String, Object> messageListener) {
|
||||
ContainerProperties containerProperties = new ContainerProperties(topic);
|
||||
containerProperties.setMessageListener(messageListener);
|
||||
return containerProperties;
|
||||
}
|
||||
|
||||
private KafkaListenerContainerFactory buildListenerFactory(){
|
||||
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
|
||||
factory.setConsumerFactory(buildConsumerFactory());
|
||||
factory.setConcurrency(4);
|
||||
factory.setBatchListener(true);
|
||||
|
||||
factory.getContainerProperties().setPollTimeout(3000);
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
KafkaConsumer<String,Object> consumer = new KafkaConsumer<>(props);
|
||||
|
||||
List topics = (List) props.get("topics");
|
||||
consumer.subscribe(topics);
|
||||
consumer.poll(0); // 令订阅生效
|
||||
|
||||
List<TopicPartition> topicPartitions = new ArrayList<>();
|
||||
Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
|
||||
for (Object topic : topics) {
|
||||
String topic1 = (String) topic;
|
||||
List<PartitionInfo> partitionInfos = stringListMap.get(topic1);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
TopicPartition partition = new TopicPartition(topic1, partitionInfo.partition());
|
||||
topicPartitions.add(partition);
|
||||
}
|
||||
}
|
||||
consumer.seekToEnd(topicPartitions); // 如果传Collections.emptyList()表示移动所有订阅topic分区offset到最末端
|
||||
|
||||
while (true) {
|
||||
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(100));
|
||||
for (ConsumerRecord<String, Object> record : records) {
|
||||
taskExecutor.submit(new ConsumerWorker(record, cityCode));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,108 @@
|
|||
package org.dromara.kafka.consumer.handler;
|
||||
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import org.dromara.kafka.consumer.entity.EsGpsInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-10-28 14:48
|
||||
*/
|
||||
public class KafkaSecurityUtil {
|
||||
|
||||
|
||||
|
||||
|
||||
static Logger logger = LoggerFactory.getLogger(KafkaSecurityUtil.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
EsGpsInfo esGpsInfo = new EsGpsInfo();
|
||||
String realtime = "2021/11/04 12:00:11";
|
||||
DateTime dateTime = DateUtil.parse(realtime);
|
||||
esGpsInfo.setGpsTime(dateTime.toJdkDate());
|
||||
logger.info("esGpsInfo:{},deviceType={},gpsTime={}",esGpsInfo.toString(),
|
||||
esGpsInfo.getDeviceType(),dateTime.toJdkDate().toString());
|
||||
}
|
||||
/**
|
||||
* 用户自己申请的机机账号keytab文件名称
|
||||
*/
|
||||
private static final String USER_KEYTAB_FILE = "user.keytab";
|
||||
|
||||
/**
|
||||
* 用户自己申请的机机账号名称
|
||||
*/
|
||||
private static final String USER_PRINCIPAL = "aqdsj_ruansi@HADOOP.COM";
|
||||
|
||||
public static void securityPrepare() throws IOException
|
||||
{
|
||||
logger.error("进入了---securityPrepare");
|
||||
//String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator;
|
||||
//String krbFile = filePath + "krb5.conf";
|
||||
//ClassPathResource classPathResource = new ClassPathResource("krb5.conf");
|
||||
//String krbFile = classPathResource.getAbsolutePath();
|
||||
String krbFile = "/gpsstore/krb5.conf";
|
||||
// String userKeyTableFile = filePath + USER_KEYTAB_FILE;
|
||||
//ClassPathResource classPathResource1 = new ClassPathResource(USER_KEYTAB_FILE);
|
||||
String userKeyTableFile = "/gpsstore/user.keytab";
|
||||
|
||||
//windows路径下分隔符替换
|
||||
userKeyTableFile = userKeyTableFile.replace("\\", "\\\\");
|
||||
krbFile = krbFile.replace("\\", "\\\\");
|
||||
|
||||
LoginUtil.setKrb5Config(krbFile);
|
||||
LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com");
|
||||
logger.error("userKeyTableFile路径---{}",userKeyTableFile);
|
||||
LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile);
|
||||
}
|
||||
|
||||
public static Boolean isSecurityModel()
|
||||
{
|
||||
Boolean isSecurity = false;
|
||||
//String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode";
|
||||
//ClassPathResource classPathResource = new ClassPathResource("kafkaSecurityMode");
|
||||
InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafkaSecurityMode");
|
||||
|
||||
/*File file = classPathResource.getFile();
|
||||
|
||||
if(!file.exists()){
|
||||
return isSecurity;
|
||||
}*/
|
||||
|
||||
Properties securityProps = new Properties();
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
securityProps.load(inputStream);
|
||||
if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode")))
|
||||
{
|
||||
isSecurity = true;
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
logger.info("The Exception occured : {}.", e);
|
||||
}
|
||||
|
||||
return isSecurity;
|
||||
}
|
||||
|
||||
/*
|
||||
* 判断文件是否存在
|
||||
*/
|
||||
private static boolean isFileExists(String fileName)
|
||||
{
|
||||
File file = new File(fileName);
|
||||
|
||||
return file.exists();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,215 @@
|
|||
package org.dromara.kafka.consumer.handler;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-10-28 15:40
|
||||
*/
|
||||
public class LoginUtil
|
||||
{
|
||||
|
||||
public enum Module
|
||||
{
|
||||
STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client");
|
||||
|
||||
private String name;
|
||||
|
||||
private Module(String name)
|
||||
{
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* line operator string
|
||||
*/
|
||||
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
|
||||
|
||||
/**
|
||||
* jaas file postfix
|
||||
*/
|
||||
private static final String JAAS_POSTFIX = ".jaas.conf";
|
||||
|
||||
/**
|
||||
* is IBM jdk or not
|
||||
*/
|
||||
private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");
|
||||
|
||||
/**
|
||||
* IBM jdk login module
|
||||
*/
|
||||
private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required";
|
||||
|
||||
/**
|
||||
* oracle jdk login module
|
||||
*/
|
||||
private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";
|
||||
|
||||
/**
|
||||
* Zookeeper quorum principal.
|
||||
*/
|
||||
public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal";
|
||||
|
||||
/**
|
||||
* java security krb5 file path
|
||||
*/
|
||||
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
|
||||
|
||||
/**
|
||||
* java security login file path
|
||||
*/
|
||||
public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config";
|
||||
|
||||
/**
|
||||
* 设置jaas.conf文件
|
||||
*
|
||||
* @param principal
|
||||
* @param keytabPath
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void setJaasFile(String principal, String keytabPath)
|
||||
throws IOException
|
||||
{
|
||||
String jaasPath =
|
||||
new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name")
|
||||
+ JAAS_POSTFIX;
|
||||
|
||||
// windows路径下分隔符替换
|
||||
jaasPath = jaasPath.replace("\\", "\\\\");
|
||||
// 删除jaas文件
|
||||
deleteJaasFile(jaasPath);
|
||||
writeJaasFile(jaasPath, principal, keytabPath);
|
||||
System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置zookeeper服务端principal
|
||||
*
|
||||
* @param zkServerPrincipal
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void setZookeeperServerPrincipal(String zkServerPrincipal)
|
||||
throws IOException
|
||||
{
|
||||
System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal);
|
||||
String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL);
|
||||
if (ret == null)
|
||||
{
|
||||
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null.");
|
||||
}
|
||||
if (!ret.equals(zkServerPrincipal))
|
||||
{
|
||||
throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + ".");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置krb5文件
|
||||
*
|
||||
* @param krb5ConfFile
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void setKrb5Config(String krb5ConfFile)
|
||||
throws IOException
|
||||
{
|
||||
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile);
|
||||
String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF);
|
||||
if (ret == null)
|
||||
{
|
||||
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null.");
|
||||
}
|
||||
if (!ret.equals(krb5ConfFile))
|
||||
{
|
||||
throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + ".");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 写入jaas文件
|
||||
*
|
||||
* @throws IOException
|
||||
* 写文件异常
|
||||
*/
|
||||
private static void writeJaasFile(String jaasPath, String principal, String keytabPath)
|
||||
throws IOException
|
||||
{
|
||||
FileWriter writer = new FileWriter(new File(jaasPath));
|
||||
try
|
||||
{
|
||||
writer.write(getJaasConfContext(principal, keytabPath));
|
||||
writer.flush();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new IOException("Failed to create jaas.conf File");
|
||||
}
|
||||
finally
|
||||
{
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteJaasFile(String jaasPath)
|
||||
throws IOException
|
||||
{
|
||||
File jaasFile = new File(jaasPath);
|
||||
if (jaasFile.exists())
|
||||
{
|
||||
if (!jaasFile.delete())
|
||||
{
|
||||
throw new IOException("Failed to delete exists jaas file.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static String getJaasConfContext(String principal, String keytabPath)
|
||||
{
|
||||
Module[] allModule = Module.values();
|
||||
StringBuilder builder = new StringBuilder();
|
||||
for (Module modlue : allModule)
|
||||
{
|
||||
builder.append(getModuleContext(principal, keytabPath, modlue));
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private static String getModuleContext(String userPrincipal, String keyTabPath, Module module)
|
||||
{
|
||||
StringBuilder builder = new StringBuilder();
|
||||
if (IS_IBM_JDK)
|
||||
{
|
||||
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
|
||||
builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR);
|
||||
builder.append("credsType=both").append(LINE_SEPARATOR);
|
||||
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
|
||||
builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
|
||||
builder.append("debug=true;").append(LINE_SEPARATOR);
|
||||
builder.append("};").append(LINE_SEPARATOR);
|
||||
}
|
||||
else
|
||||
{
|
||||
builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
|
||||
builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
|
||||
builder.append("useKeyTab=true").append(LINE_SEPARATOR);
|
||||
builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR);
|
||||
builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR);
|
||||
builder.append("useTicketCache=false").append(LINE_SEPARATOR);
|
||||
builder.append("storeKey=true").append(LINE_SEPARATOR);
|
||||
builder.append("debug=true;").append(LINE_SEPARATOR);
|
||||
builder.append("};").append(LINE_SEPARATOR);
|
||||
}
|
||||
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,130 @@
|
|||
package org.dromara.kafka.consumer.handler;
|
||||
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.dromara.kafka.consumer.config.KafkaPropertiesConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-09-06 11:15
|
||||
*/
|
||||
@Component
|
||||
public class RealConsumer implements CommandLineRunner {
|
||||
|
||||
private String kafkaServers;
|
||||
|
||||
private String groupId;
|
||||
|
||||
private String topics;
|
||||
|
||||
private String cityCode = "3400";
|
||||
|
||||
|
||||
|
||||
@Autowired
|
||||
KafkaPropertiesConfig kafkaPropertiesConfig;
|
||||
|
||||
@Autowired
|
||||
ThreadPoolExecutor dtpExecutor2;
|
||||
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(RealConsumer.class);
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
kafkaServers = "127.0.0.1:9092";
|
||||
topics = "topic.send.2,topic.send.3,topic.send.4,topic.send.5,topic.send.8";
|
||||
groupId = "group_ruansi_xuancheng";
|
||||
cityCode = "3418";
|
||||
if(args.length > 0){
|
||||
/*kafkaServers = args[0];
|
||||
topics = args[1];
|
||||
groupId = args[2];
|
||||
cityCode = args[3];*/
|
||||
|
||||
}
|
||||
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||
Map kafkaProp = getKafkaProp();
|
||||
|
||||
|
||||
if (KafkaSecurityUtil.isSecurityModel())
|
||||
{
|
||||
try
|
||||
{
|
||||
logger.info("Securitymode start.");
|
||||
|
||||
//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
|
||||
//认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
|
||||
kafkaProp.put("security.protocol","SASL_PLAINTEXT");
|
||||
//服务名
|
||||
kafkaProp.put("sasl.kerberos.service.name","kafka");
|
||||
//域名
|
||||
kafkaProp.put("kerberos.domain.name","hadoop.hadoop.com");
|
||||
KafkaSecurityUtil.securityPrepare();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
logger.error("Security prepare failure.");
|
||||
logger.error("The IOException occured.", e);
|
||||
return;
|
||||
}
|
||||
logger.info("Security prepare success.");
|
||||
}
|
||||
|
||||
KafkaConsumerRunnable runnable = new KafkaConsumerRunnable(kafkaProp,dtpExecutor2,cityCode);
|
||||
executorService.execute(runnable);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 获取kafka配置
|
||||
* @return
|
||||
*/
|
||||
private Map<String, Object> getKafkaProp() {
|
||||
// Properties map = new Properties();
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("bootstrap.servers",kafkaServers);
|
||||
map.put("group.id",groupId);
|
||||
map.put("enable.auto.commit", "true");
|
||||
map.put("auto.commit.interval.ms", "1000");
|
||||
map.put("session.timeout.ms", "30000");
|
||||
map.put("key.deserializer", StringDeserializer.class);
|
||||
map.put("value.deserializer", StringDeserializer.class);
|
||||
map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
|
||||
// map.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1000 * 5);
|
||||
// map.put("ack.mode", "manual_immediate");
|
||||
|
||||
// //认证方式 SASL_PLAINTEXT 或者 PLAINTEXT
|
||||
// map.put("security.protocol","SASL_PLAINTEXT");
|
||||
// //服务名
|
||||
// map.put("sasl.kerberos.service.name","kafka");
|
||||
// //域名
|
||||
// map.put("kerberos.domain.name","hadoop.hadoop.com");
|
||||
String[] split = topics.split(",");
|
||||
List list = CollectionUtils.arrayToList(split);
|
||||
map.put("topics", list);
|
||||
return map;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -6,7 +6,7 @@ server:
|
|||
spring:
|
||||
application:
|
||||
# 应用名称
|
||||
name: wzhj-websocket
|
||||
name: stwzhj-consumer
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: @profiles.active@
|
||||
|
|
@ -30,5 +30,3 @@ spring:
|
|||
config:
|
||||
import:
|
||||
- optional:nacos:application-common.yml
|
||||
- optional:nacos:datasource.yml
|
||||
- optional:nacos:${spring.application.name}.yml
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="logs/${project.artifactId}" />
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="console.log.pattern"
|
||||
value="%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}%n) - %msg%n"/>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${console.log.pattern}</pattern>
|
||||
<charset>utf-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<include resource="logback-common.xml" />
|
||||
|
||||
<include resource="logback-logstash.xml" />
|
||||
|
||||
<!-- 开启 skywalking 日志收集 -->
|
||||
<include resource="logback-skylog.xml" />
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="console" />
|
||||
</root>
|
||||
</configuration>
|
||||
|
|
@ -9,10 +9,10 @@
|
|||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>wzhj-data2es</artifactId>
|
||||
<artifactId>stwzhj-data2es</artifactId>
|
||||
|
||||
<description>
|
||||
wzhj-data2es位置汇聚数据处理
|
||||
stwzhj-data2es位置汇聚数据处理
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
|
|
@ -68,7 +68,7 @@ public class ElasticsearchConfig {
|
|||
RestClientBuilder builder = RestClient.builder(httpHost);
|
||||
// 设置用户名、密码
|
||||
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
|
||||
// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
|
||||
// 连接延时配置
|
||||
builder.setRequestConfigCallback(requestConfigBuilder -> {
|
||||
requestConfigBuilder.setConnectTimeout(connectTimeOut);
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
package org.dromara.data2es.config;
|
||||
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.dromara.data2es.producer.NewProducer;
|
||||
|
|
@ -12,8 +11,6 @@ import org.springframework.kafka.core.KafkaAdmin;
|
|||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
|
|
@ -30,8 +27,8 @@ public class KafkaConfig {
|
|||
// private String kafkaServers = "140.168.2.31:21007,140.168.2.32:21007,140.168.2.33:21007";
|
||||
// private String kafkaServers = "53.208.61.105:6667,53.208.61.106:6667,53.208.61.107:6667";//六安GA网
|
||||
// private String kafkaServers = "34.72.62.93:9092";//六安视频网
|
||||
private String kafkaServers = "53.238.79.33:9092,53.238.79.34:9092,53.238.79.35:9092";//本地
|
||||
// private String kafkaServers = "53.238.79.4:9092,53.238.79.5:9092,53.238.79.6:9092";//省厅 马伟提供
|
||||
// private String kafkaServers = "127.0.0.1:9092";//本地
|
||||
private String kafkaServers = "53.207.8.71:9092,53.193.3.15:9092,53.160.0.237:9092,53.104.56.58:9092,53.128.22.61:9092";//省厅 马伟提供
|
||||
|
||||
private String groupId = "ruansiProducer";
|
||||
|
||||
|
|
@ -132,7 +129,7 @@ public class KafkaConfig {
|
|||
//设置自定义的分区策略类,默认不传key,是粘性分区,尽量往一个分区中发消息。如果key不为null,则默认是按照key的hashcode与 partition的取余来决定哪个partition
|
||||
//props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
|
||||
props.put(securityProtocol, "SASL_PLAINTEXT");
|
||||
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"rsoft\" password=\"rsoft-2026\";");
|
||||
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"zkxc\" password=\"zkxcKafka07252023\";");
|
||||
props.put("sasl.mechanism", "SCRAM-SHA-256");
|
||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||
// KafkaProducer producer = new KafkaProducer<>(props);
|
||||
|
|
@ -142,21 +139,7 @@ public class KafkaConfig {
|
|||
|
||||
@Bean
|
||||
public KafkaAdmin admin(KafkaProperties properties){
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
// 1. 集群地址
|
||||
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
kafkaServers);
|
||||
// 2. SASL认证(和命令行的client.properties完全一致)
|
||||
configs.put("security.protocol", "SASL_PLAINTEXT");
|
||||
configs.put("sasl.mechanism", "SCRAM-SHA-256");
|
||||
configs.put("sasl.jaas.config",
|
||||
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
|
||||
"username=\"rsoft\" password=\"rsoft-2026\";");
|
||||
// 3. 解决超时核心配置
|
||||
configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 60s超时
|
||||
configs.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 2000); // 重试间隔
|
||||
configs.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 300000);
|
||||
KafkaAdmin admin = new KafkaAdmin(configs);
|
||||
KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
|
||||
admin.setFatalIfBrokerNotAvailable(true);
|
||||
return admin;
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package org.dromara.data2es.config;
|
||||
|
||||
import org.dromara.data2es.handler.RedisExpireListener;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
|
||||
@Configuration
|
||||
public class RedisListenerConfig {
|
||||
|
||||
@Bean
|
||||
RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) {
|
||||
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
|
||||
listenerContainer.setConnectionFactory(connectionFactory);
|
||||
return listenerContainer;
|
||||
}
|
||||
|
||||
@Bean
|
||||
KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
|
||||
return new RedisExpireListener(listenerContainer);
|
||||
}
|
||||
}
|
||||
|
|
@ -32,6 +32,11 @@ public class DataToEsController extends BaseController {
|
|||
public R saveGpsInfo(@RequestBody EsGpsInfoVO2 esGpsInfo ){
|
||||
R apiResponse = new R<>();
|
||||
try {
|
||||
if(StringUtils.isBlank(esGpsInfo.getInfoSource())){
|
||||
apiResponse.setCode(500);
|
||||
apiResponse.setMsg("infoSource为空");
|
||||
return apiResponse;
|
||||
}
|
||||
boolean offer = linkedBlockingDeque.offer(esGpsInfo);
|
||||
apiResponse = R.ok(offer);
|
||||
} catch (Exception e) {
|
||||
|
|
@ -82,15 +87,16 @@ public class DataToEsController extends BaseController {
|
|||
EsGpsInfoVO2 esGpsInfo = new EsGpsInfoVO2();
|
||||
|
||||
HashMap<String, Object> map = new HashMap<>();
|
||||
esGpsInfo.setDeviceCode("34180201001310000071");
|
||||
esGpsInfo.setDeviceType("5");
|
||||
esGpsInfo.setDeviceCode("34153800001320000101");
|
||||
esGpsInfo.setDeviceType("05");
|
||||
esGpsInfo.setInfoSource("3401");
|
||||
esGpsInfo.setGpsTime(new Date());
|
||||
|
||||
esGpsInfo.setLat("30.68" + (a + i));
|
||||
esGpsInfo.setLng("118.40" + (b + i));
|
||||
esGpsInfo.setZzjgdm("341802400000");
|
||||
esGpsInfo.setZzjgmc("宣州分局济川派出所");
|
||||
esGpsInfo.setPoliceName("057486_郭超");
|
||||
esGpsInfo.setLat("31.1" + (a + i));
|
||||
esGpsInfo.setLng("117.2" + (b + i));
|
||||
esGpsInfo.setZzjgdm("340100000000");
|
||||
esGpsInfo.setZzjgmc("合肥市公安局");
|
||||
esGpsInfo.setCarNum("霍邱看守所01");
|
||||
saveGpsInfo(esGpsInfo);
|
||||
//gpsService.saveData(map);
|
||||
|
||||
|
|
@ -28,6 +28,7 @@ public class EsGpsInfo implements Serializable {
|
|||
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
|
||||
private Date gpsTime;
|
||||
//3401 ,3402 地市代码
|
||||
private String infoSource;
|
||||
|
||||
private Integer online;
|
||||
|
||||
|
|
@ -18,5 +18,4 @@ public class EsGpsInfoVO2 extends EsGpsInfo {
|
|||
private String policeName;
|
||||
private String phoneNum;
|
||||
private String carNum;
|
||||
private String deviceName;
|
||||
}
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
package org.dromara.data2es.dubbo;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.dubbo.config.annotation.DubboService;
|
||||
import org.dromara.common.core.domain.R;
|
||||
|
|
@ -22,21 +21,6 @@ public class RemoteDataToEsServiceImpl implements RemoteDataToEsService {
|
|||
|
||||
@Override
|
||||
public R saveDataBatch(List<RemoteGpsInfo> gpsInfoList) {
|
||||
return gpsService.saveDataBatch(BeanUtil.copyToList(gpsInfoList, EsGpsInfoVO2.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
public R saveData(RemoteGpsInfo gpsInfo) throws Exception {
|
||||
return gpsService.saveData(BeanUtil.toBean(gpsInfo, EsGpsInfoVO2.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
public R updateOnlineStatusBatch(List<RemoteGpsInfo> gpsInfoList) {
|
||||
return gpsService.updateOnlineStatusBatch(BeanUtil.copyToList(gpsInfoList, EsGpsInfoVO2.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
public R updateOnlineStatus(RemoteGpsInfo gpsInfo) {
|
||||
return null;
|
||||
return gpsService.saveDataBatch(MapstructUtils.convert(gpsInfoList, EsGpsInfoVO2.class));
|
||||
}
|
||||
}
|
||||
|
|
@ -7,7 +7,6 @@ import org.dromara.data2es.controller.DataToEsController;
|
|||
import org.dromara.data2es.domain.EsGpsInfo;
|
||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||
import org.dromara.data2es.service.IGpsService;
|
||||
import org.dromara.data2es.service.StoreDataService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
|
@ -32,9 +31,6 @@ public class DataInsertBatchHandler implements CommandLineRunner {
|
|||
@Autowired
|
||||
IGpsService gpsService;
|
||||
|
||||
@Autowired
|
||||
StoreDataService storeDataService;
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
|
||||
|
|
@ -49,7 +45,6 @@ public class DataInsertBatchHandler implements CommandLineRunner {
|
|||
log.info("batch size={}", list.size());
|
||||
if(CollectionUtil.isNotEmpty(list)) {
|
||||
gpsService.saveDataBatch(list);
|
||||
storeDataService.saveDataByPersonTypeBatch(list);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("缓存队列批量消费异常:{}", e.getMessage());
|
||||
|
|
@ -0,0 +1,70 @@
|
|||
package org.dromara.data2es.handler;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.dromara.common.core.utils.RedisConstants;
|
||||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.data2es.controller.DataToEsController;
|
||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-11-08 16:40
|
||||
*/
|
||||
@Component
|
||||
public class RedisExpireListener extends KeyExpirationEventMessageListener {
|
||||
|
||||
@Autowired
|
||||
DataToEsController dataToEsController;
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(RedisExpireListener.class);
|
||||
|
||||
|
||||
/**
|
||||
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
|
||||
*
|
||||
* @param listenerContainer must not be {@literal null}.
|
||||
*/
|
||||
public RedisExpireListener(RedisMessageListenerContainer listenerContainer) {
|
||||
super(listenerContainer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
String expireKey = message.toString();
|
||||
if(StringUtils.isNotEmpty(expireKey) &&
|
||||
expireKey.startsWith(RedisConstants.ORG_CODE_PRE)){
|
||||
String[] split = expireKey.split(":");
|
||||
EsGpsInfoVO2 esGpsInfoVO2 = new EsGpsInfoVO2();
|
||||
esGpsInfoVO2.setDeviceType(split[2]);
|
||||
esGpsInfoVO2.setDeviceCode(split[3]);
|
||||
String zzjgdm = split[1];
|
||||
String deviceType = split[2];
|
||||
String deviceCode = split[3];
|
||||
if(StringUtils.isNotEmpty(zzjgdm)) {
|
||||
JSONObject object = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + zzjgdm + ":"
|
||||
+ deviceType+":"+deviceCode);
|
||||
EsGpsInfoVO2 gpsInfo = BeanUtil.toBean(object, EsGpsInfoVO2.class);
|
||||
gpsInfo.setGpsTime(new Date());
|
||||
gpsInfo.setOnline(0);
|
||||
dataToEsController.saveGpsInfo(gpsInfo);
|
||||
}
|
||||
}
|
||||
logger.info("redis key expired:key={}", expireKey);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,120 @@
|
|||
package org.dromara.data2es.handler;
|
||||
|
||||
/*
|
||||
* 处理数据存入各类中间件
|
||||
* es redis kafka
|
||||
* */
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONPObject;
|
||||
import com.alibaba.fastjson2.util.JSONObject1O;
|
||||
import jodd.util.StringUtil;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.data2es.domain.EsGpsInfo;
|
||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||
import org.dromara.data2es.service.IGpsService;
|
||||
import org.dromara.data2es.util.ConfigConstants;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
@Configuration
|
||||
public class RequestHandler {
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate<String, String> kafkaTemplate;
|
||||
|
||||
@Autowired
|
||||
private RestHighLevelClient restHighLevelClient;
|
||||
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(RequestHandler.class);
|
||||
|
||||
|
||||
/*@Async
|
||||
public CompletableFuture<EsGpsInfo> doRequest(EsGpsInfo esGpsInfo){
|
||||
EsGpsInfo entity = gpsService.createEntity(esGpsInfo);
|
||||
|
||||
return CompletableFuture.completedFuture(entity);
|
||||
}*/
|
||||
|
||||
@Async
|
||||
public void sendToKafka(EsGpsInfoVO2 esGpsInfoVO2) {
|
||||
if (!Objects.isNull(esGpsInfoVO2)) {
|
||||
|
||||
String deviceType = esGpsInfoVO2.getDeviceType();
|
||||
if(StringUtil.isEmpty(deviceType)){
|
||||
deviceType = "99";
|
||||
}
|
||||
String infoSource = esGpsInfoVO2.getInfoSource();
|
||||
if(StringUtils.isEmpty(infoSource)){
|
||||
infoSource = "other";
|
||||
}
|
||||
/**
|
||||
* 获取该条记录的基本信息推送给第三方使用
|
||||
*/
|
||||
//EsGpsInfoVO esGpsInfoVO = deviceInfoContext.doGetInfo(esGpsInfo);
|
||||
//EsGpsInfoVO2 esGpsInfoVO2 = getInfo(esGpsInfo);
|
||||
|
||||
//kafkaProducer.send(esGpsInfo, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+ infoSource);
|
||||
//todo 2023年3月30日 cpu过载暂时隐藏
|
||||
|
||||
kafkaTemplate.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType, JSON.toJSONString(esGpsInfoVO2));
|
||||
//kafkaProducer.send(esGpsInfoVO2, ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+deviceType);
|
||||
//地市的kafka数据,如接收地市某个设备的数据可以对接此kafka topic
|
||||
//todo 暂时隐藏
|
||||
kafkaTemplate.send(ConfigConstants.KAFKA_TOPIC_SEND_PRE+"."+infoSource+"."+deviceType,JSON.toJSONString(esGpsInfoVO2));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 批量在线用户存入
|
||||
* @param map
|
||||
*/
|
||||
@Async
|
||||
public void redisOnlineUserBatch(Map<String,String> map, long time){
|
||||
RedisUtils.batchInsert(map,time);
|
||||
}
|
||||
|
||||
@Async
|
||||
public void redisDeleteBatch(List<String> deleteKeys){
|
||||
RedisUtils.deleteObject(deleteKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量存入es
|
||||
* @param bulkRequest
|
||||
*/
|
||||
@Async
|
||||
public void esRealBulkSave(BulkRequest bulkRequest){
|
||||
try {
|
||||
BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
|
||||
boolean b = response.hasFailures();
|
||||
String bulkErrorMsg = response.buildFailureMessage();
|
||||
logger.info("b={}", bulkErrorMsg);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
logger.error("batchInsert error={}",e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
package org.dromara.data2es.mapper;
|
||||
|
||||
public class TDeviceMapper {
|
||||
}
|
||||
|
|
@ -24,7 +24,7 @@ public class GpsTaskTest {
|
|||
IGpsService gpsService;
|
||||
|
||||
|
||||
// @Scheduled(cron = "0/10 * * * * ?")
|
||||
@Scheduled(cron = "0/10 * * * * ?")
|
||||
public void produceGps() throws InvocationTargetException, IllegalAccessException, ExecutionException, InterruptedException {
|
||||
int a = 10000;
|
||||
int b = 20000;
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
package org.dromara.data2es.schedule;
|
||||
|
||||
/**
|
||||
* <p>description: </p>
|
||||
*
|
||||
* @author chenle
|
||||
* @date 2021-05-18 18:23
|
||||
*/
|
||||
|
||||
public class RedisOnlineUserSchedule {
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -14,10 +14,4 @@ public interface IGpsService {
|
|||
|
||||
R saveDataBatch(List<EsGpsInfoVO2> esGpsInfoVO2s);
|
||||
|
||||
R saveData(EsGpsInfoVO2 esGpsInfoVO2) throws ExecutionException, InterruptedException;
|
||||
|
||||
R updateOnlineStatusBatch(List<EsGpsInfoVO2> esGpsInfoVO2s);
|
||||
|
||||
R updateOnlineStatus(EsGpsInfoVO2 gpsInfoVO2);
|
||||
|
||||
}
|
||||
|
|
@ -4,21 +4,18 @@ import cn.hutool.core.bean.BeanUtil;
|
|||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUnit;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.dubbo.config.annotation.DubboReference;
|
||||
import org.dromara.common.core.domain.R;
|
||||
import org.dromara.common.core.utils.RedisConstants;
|
||||
import org.dromara.common.redis.utils.RedisUtils;
|
||||
import org.dromara.data2es.domain.EsGpsInfo;
|
||||
import org.dromara.data2es.domain.EsGpsInfoVO2;
|
||||
import org.dromara.data2es.domain.entity.GpsInfoEntity;
|
||||
import org.dromara.data2es.exception.MyBusinessException;
|
||||
import org.dromara.data2es.handler.RequestHandler;
|
||||
import org.dromara.data2es.service.IGpsService;
|
||||
import org.dromara.data2es.service.StoreDataService;
|
||||
import org.dromara.system.api.RemoteDeviceService;
|
||||
import org.dromara.system.api.domain.vo.RemoteDeviceVo;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
|
|
@ -36,15 +33,12 @@ import org.elasticsearch.rest.RestStatus;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
|
||||
@RequiredArgsConstructor
|
||||
|
|
@ -54,8 +48,8 @@ public class GpsServiceImpl implements IGpsService {
|
|||
@Autowired
|
||||
private RestHighLevelClient restHighLevelClient;
|
||||
|
||||
@Autowired
|
||||
StoreDataService storeDataService;
|
||||
@DubboReference
|
||||
private RemoteDeviceService deviceService;
|
||||
|
||||
private final RequestHandler requestHandler;
|
||||
|
||||
|
|
@ -109,157 +103,69 @@ public class GpsServiceImpl implements IGpsService {
|
|||
List<String> deleteKeys = new ArrayList<>();
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
for (EsGpsInfoVO2 info : list) {
|
||||
String deviceType = info.getDeviceType();
|
||||
String deviceCode = info.getDeviceCode();
|
||||
//设置地市zzjgdm
|
||||
info = getInfo(info);
|
||||
if (Objects.isNull(info)) {
|
||||
logger.error("redis或者mysql中的Object=null,deviceType={},deviceCode={}",deviceType,deviceCode);
|
||||
|
||||
if(StringUtils.isBlank(info.getInfoSource())){
|
||||
logger.info("infoSource 为空");
|
||||
continue;
|
||||
}
|
||||
//设置地市zzjgdm
|
||||
info = getInfoByInfoSource(info);
|
||||
//redis
|
||||
// buildRedisMap(info,onlineUserDataMap,orgCodeDataMap,deleteKeys);
|
||||
// logger.error("接收数据={},deviceCode={},gpsTime={}",info,info.getDeviceCode(),info.getGpsTime());
|
||||
buildRedisMap(info,onlineUserDataMap,orgCodeDataMap,deleteKeys);
|
||||
|
||||
IndexRequest indexRequest = buildEsIndexRequest(info);
|
||||
bulkRequest.add(indexRequest);
|
||||
|
||||
// 发送到 kafka
|
||||
requestHandler.sendToKafka(info);
|
||||
//地市版本没用批量插入
|
||||
requestHandler.redisOnlineUser(info);
|
||||
}
|
||||
|
||||
// requestHandler.redisOnlineUserBatch(onlineUserDataMap, RedisConstants.REDIS_ONLINE_USER_NEVER_EXPIRE);
|
||||
// requestHandler.redisOnlineUserBatch(orgCodeDataMap, 600);
|
||||
// requestHandler.batchPut(onlineUserDataMap);
|
||||
// requestHandler.batchPutWithExpire(orgCodeDataMap,600);
|
||||
// requestHandler.redisDeleteBatch(deleteKeys);
|
||||
requestHandler.redisOnlineUserBatch(onlineUserDataMap, RedisConstants.REDIS_ONLINE_USER_NEVER_EXPIRE);
|
||||
requestHandler.redisOnlineUserBatch(orgCodeDataMap, 300);
|
||||
requestHandler.redisDeleteBatch(deleteKeys);
|
||||
|
||||
requestHandler.esRealBulkSave(bulkRequest);
|
||||
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
@Override
|
||||
public R saveData(EsGpsInfoVO2 info) throws ExecutionException, InterruptedException{
|
||||
//设置地市zzjgdm
|
||||
info = getInfo(info);
|
||||
if (null == info){
|
||||
return R.fail("暂无设备相关信息");
|
||||
}
|
||||
IndexRequest indexRequest = buildEsIndexRequest(info);
|
||||
//存es
|
||||
CompletableFuture<EsGpsInfo> esFuture = doRequest(info);
|
||||
|
||||
// 发送到 kafka
|
||||
requestHandler.sendToKafka(info);
|
||||
//地市版本没用批量插入
|
||||
requestHandler.redisOnlineUser(info);
|
||||
//发送到勤务
|
||||
storeDataService.saveDataByPersonType(info);
|
||||
|
||||
CompletableFuture.allOf(esFuture);
|
||||
EsGpsInfo esGpsInfo1 = esFuture.get();
|
||||
if(Objects.isNull(esGpsInfo1)){
|
||||
return R.fail(-1,"保存失败");
|
||||
}else{
|
||||
return R.ok("保存成功");
|
||||
/**
|
||||
* 获取基本信息(主要是组织机构) 不查库 否者对库压力过大
|
||||
* @param esGpsInfo
|
||||
* @return
|
||||
*/
|
||||
private EsGpsInfoVO2 getInfoByInfoSource(EsGpsInfo esGpsInfo) {
|
||||
EsGpsInfoVO2 esGpsInfoVO2 = new EsGpsInfoVO2();
|
||||
BeanUtil.copyProperties(esGpsInfo,esGpsInfoVO2);
|
||||
if(null == esGpsInfoVO2.getZzjgdm() || "".equals(esGpsInfoVO2.getZzjgdm())){
|
||||
RemoteDeviceVo vo = deviceService.getDeviceInfo(esGpsInfoVO2.getDeviceCode(),esGpsInfoVO2.getDeviceType());
|
||||
if (null != vo){
|
||||
esGpsInfoVO2.setZzjgdm(vo.getZzjgdm());
|
||||
esGpsInfoVO2.setZzjgmc(vo.getZzjgmc());
|
||||
esGpsInfoVO2.setPoliceName(vo.getPoliceName());
|
||||
esGpsInfoVO2.setPoliceNo(vo.getPoliceNo());
|
||||
esGpsInfoVO2.setCarNum(vo.getCarNum());
|
||||
String deviceType = vo.getDeviceType();
|
||||
if(StringUtils.isNotBlank(deviceType)){
|
||||
deviceType = deviceType.replaceAll("\"", "");
|
||||
if(deviceType.charAt(0) == '0' && deviceType.length() > 1){
|
||||
deviceType = deviceType.substring(1);
|
||||
if(deviceType.equals("1")){
|
||||
deviceType = "2";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public R updateOnlineStatusBatch(List<EsGpsInfoVO2> list) {
|
||||
logger.error("记录仪状态修改设备数量={}",list.size());
|
||||
int num = 0;
|
||||
for (EsGpsInfo originEsGpsInfo : list) {
|
||||
String deviceCode = originEsGpsInfo.getDeviceCode();
|
||||
String deviceType = originEsGpsInfo.getDeviceType();
|
||||
// DeviceEntityV2 de = deviceService.checkDeviceExists(info);
|
||||
JSONObject o = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType+":" + deviceCode);
|
||||
if (Objects.isNull(o)) {
|
||||
|
||||
if ("5".equals(deviceType) || "7".equals(deviceType) || "8".equals(deviceType)){
|
||||
logger.error("记录仪设备不在online_users中,deviceCode={},deviceType={}",deviceCode,deviceType);
|
||||
o = JSONUtil.parseObj(originEsGpsInfo);
|
||||
}
|
||||
esGpsInfoVO2.setDeviceType(deviceType);
|
||||
}else {
|
||||
logger.error("redis中的Object=null,deviceType={},deviceCode={}",deviceType,deviceCode);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
EsGpsInfoVO2 esGpsInfoVO2 = JSONUtil.toBean(o, EsGpsInfoVO2.class);
|
||||
//更新在线状态和时间,经纬度不变
|
||||
esGpsInfoVO2.setOnline(originEsGpsInfo.getOnline());
|
||||
|
||||
if(!Objects.isNull(originEsGpsInfo.getGpsTime())) {
|
||||
esGpsInfoVO2.setGpsTime(originEsGpsInfo.getGpsTime());
|
||||
}
|
||||
|
||||
EsGpsInfoVO2 esGpsInfo = new EsGpsInfoVO2();
|
||||
esGpsInfo.setOnline(esGpsInfoVO2.getOnline());
|
||||
esGpsInfo.setGpsTime(esGpsInfoVO2.getGpsTime());
|
||||
esGpsInfo.setLat(esGpsInfoVO2.getLat());
|
||||
esGpsInfo.setLng(esGpsInfoVO2.getLng());
|
||||
esGpsInfo.setHeight(esGpsInfoVO2.getHeight());
|
||||
esGpsInfo.setDeltaH(esGpsInfoVO2.getDeltaH());
|
||||
esGpsInfo.setOrientation(esGpsInfoVO2.getOrientation());
|
||||
esGpsInfo.setSpeed(esGpsInfoVO2.getSpeed());
|
||||
esGpsInfo.setDeviceType(esGpsInfoVO2.getDeviceType());
|
||||
esGpsInfo.setDeviceCode(esGpsInfoVO2.getDeviceCode());
|
||||
try {
|
||||
saveData(esGpsInfo);
|
||||
num++;
|
||||
// storeDataService.saveDataByPersonType(esGpsInfo);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
num--;
|
||||
logger.error(e.getMessage());
|
||||
// return response.error(500,e.getMessage());
|
||||
esGpsInfoVO2.setDeviceType("99");
|
||||
esGpsInfoVO2.setZzjgdm(esGpsInfo.getInfoSource()+"00000000");
|
||||
}
|
||||
|
||||
}
|
||||
logger.error("update status,设备数量={}",num);
|
||||
return R.ok();
|
||||
}
|
||||
|
||||
@Override
|
||||
public R updateOnlineStatus(EsGpsInfoVO2 gpsInfoVO2) {
|
||||
String deviceCode = gpsInfoVO2.getDeviceCode();
|
||||
String deviceType = gpsInfoVO2.getDeviceType();
|
||||
// DeviceEntityV2 de = deviceService.checkDeviceExists(info);
|
||||
JSONObject o = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode);
|
||||
if (Objects.isNull(o)) {
|
||||
logger.error("redis中的Object=null,deviceType={},deviceCode={}", deviceType, deviceCode);
|
||||
return null;
|
||||
}
|
||||
|
||||
EsGpsInfoVO2 esGpsInfoVO2 = JSONUtil.toBean(o, EsGpsInfoVO2.class);
|
||||
//更新在线状态和时间,经纬度不变
|
||||
esGpsInfoVO2.setOnline(gpsInfoVO2.getOnline());
|
||||
|
||||
if (!Objects.isNull(gpsInfoVO2.getGpsTime())) {
|
||||
esGpsInfoVO2.setGpsTime(gpsInfoVO2.getGpsTime());
|
||||
}
|
||||
|
||||
EsGpsInfoVO2 esGpsInfo = new EsGpsInfoVO2();
|
||||
esGpsInfo.setOnline(esGpsInfoVO2.getOnline());
|
||||
esGpsInfo.setGpsTime(esGpsInfoVO2.getGpsTime());
|
||||
esGpsInfo.setLat(esGpsInfoVO2.getLat());
|
||||
esGpsInfo.setLng(esGpsInfoVO2.getLng());
|
||||
esGpsInfo.setHeight(esGpsInfoVO2.getHeight());
|
||||
esGpsInfo.setDeltaH(esGpsInfoVO2.getDeltaH());
|
||||
esGpsInfo.setOrientation(esGpsInfoVO2.getOrientation());
|
||||
esGpsInfo.setSpeed(esGpsInfoVO2.getSpeed());
|
||||
esGpsInfo.setDeviceType(esGpsInfoVO2.getDeviceType());
|
||||
esGpsInfo.setDeviceCode(esGpsInfoVO2.getDeviceCode());
|
||||
try {
|
||||
return saveData(esGpsInfo);
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage());
|
||||
return R.fail(e.getMessage());
|
||||
// return response.error(500,e.getMessage());
|
||||
}
|
||||
|
||||
return esGpsInfoVO2;
|
||||
}
|
||||
|
||||
private IndexRequest buildEsIndexRequest(EsGpsInfo esGpsInfo) {
|
||||
|
|
@ -272,7 +178,7 @@ public class GpsServiceImpl implements IGpsService {
|
|||
* @param
|
||||
*/
|
||||
private String checkIndexExist() {
|
||||
String todayIndexName = "gpsinfo"+ LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
||||
String todayIndexName = "rs_gpsinfo"+ LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
||||
CreateIndexRequest createIndexRequest ;
|
||||
if(!existsIndex(todayIndexName)){
|
||||
logger.error("进入了exist");
|
||||
|
|
@ -376,7 +282,9 @@ public class GpsServiceImpl implements IGpsService {
|
|||
}
|
||||
String jsonValue = JSONUtil.toJsonStr(esGpsInfoVo2);
|
||||
|
||||
String onlineUsersKey = RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode;
|
||||
String onlineUsersKey = RedisConstants.ONLINE_USERS +
|
||||
zzjgdm + ":" + deviceType +
|
||||
":" + deviceCode;
|
||||
onlineUserDataMap.put(onlineUsersKey, jsonValue);
|
||||
//todo 省厅左侧只需要展示到市级以及区县级数量,所以需要查询出该单位的上级单位进行存储。
|
||||
//如:key = "org_code:340800260000:deviceType:deviceCode 安庆交通警察支队
|
||||
|
|
@ -395,79 +303,6 @@ public class GpsServiceImpl implements IGpsService {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Async
|
||||
public CompletableFuture<EsGpsInfo> doRequest(EsGpsInfoVO2 esGpsInfo){
|
||||
EsGpsInfo entity = createEntity(esGpsInfo);
|
||||
|
||||
return CompletableFuture.completedFuture(entity);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取基本信息(主要是组织机构)
|
||||
* @param esGpsInfo
|
||||
* @return
|
||||
*/
|
||||
private EsGpsInfoVO2 getInfo(EsGpsInfo esGpsInfo) {
|
||||
RemoteDeviceVo deviceEntityV2 = new RemoteDeviceVo();
|
||||
String deviceCode = esGpsInfo.getDeviceCode();
|
||||
String deviceType = esGpsInfo.getDeviceType();
|
||||
/*if(StringUtils.isEmpty(deviceCode) || StringUtils.isEmpty(deviceType)){
|
||||
logger.error("deviceCode or deviceType = null{},{}",deviceCode,deviceType);
|
||||
return null;
|
||||
}*/
|
||||
deviceEntityV2.setDeviceCode(deviceCode);
|
||||
deviceEntityV2.setDeviceType(deviceType);
|
||||
RemoteDeviceVo deviceEntityV21 = new RemoteDeviceVo();
|
||||
if ("5".equals(deviceType)){
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:" + deviceType+":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
if (null == deviceEntityV21){
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:8" +":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
if (null == deviceEntityV21){
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:7" +":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
|
||||
}
|
||||
}
|
||||
}else {
|
||||
deviceEntityV21 = BeanUtil.toBean(RedisUtils.getBucket("deviceInfo:" + deviceType+":"+deviceCode), RemoteDeviceVo.class) ;
|
||||
}
|
||||
if(Objects.isNull(deviceEntityV21)){
|
||||
logger.error("库里没有这个数据,deviceCode={}",deviceCode);
|
||||
return null;
|
||||
}
|
||||
EsGpsInfoVO2 esGpsInfoVO2 = new EsGpsInfoVO2();
|
||||
BeanUtil.copyProperties(esGpsInfo,esGpsInfoVO2);
|
||||
if(!Objects.isNull(deviceEntityV21)){
|
||||
deviceType = deviceEntityV21.getDeviceType()+"";
|
||||
esGpsInfoVO2.setDeviceType(deviceEntityV21.getDeviceType()+"");
|
||||
esGpsInfoVO2.setZzjgdm(deviceEntityV21.getZzjgdm());
|
||||
esGpsInfoVO2.setZzjgmc(deviceEntityV21.getZzjgmc());
|
||||
esGpsInfoVO2.setPoliceNo(deviceEntityV21.getPoliceNo());
|
||||
esGpsInfoVO2.setPoliceName(deviceEntityV21.getPoliceName());
|
||||
esGpsInfoVO2.setCarNum(deviceEntityV21.getCarNum());
|
||||
esGpsInfoVO2.setPhoneNum(deviceEntityV21.getPhoneNum());
|
||||
esGpsInfoVO2.setDeviceName(deviceEntityV21.getDeviceName());
|
||||
}
|
||||
String lat = esGpsInfo.getLat();
|
||||
String lng = esGpsInfo.getLng();
|
||||
|
||||
//如果定位是0的话 ,则上传最后一次有定位的坐标,如果最后一次是0的话,那就上传0
|
||||
boolean nonLatLng = isNonLatLng(lat, lng);
|
||||
if(nonLatLng){
|
||||
JSONObject o = RedisUtils.getBucket(RedisConstants.ONLINE_USERS + deviceType + ":" + deviceCode);
|
||||
if(!Objects.isNull(o)) {
|
||||
|
||||
EsGpsInfoVO2 esGpsInfoVO3 = JSONUtil.toBean(o, EsGpsInfoVO2.class);
|
||||
String lat1 = esGpsInfoVO3.getLat();
|
||||
String lng1 = esGpsInfoVO3.getLng();
|
||||
esGpsInfoVO2.setLat(lat1);
|
||||
esGpsInfoVO2.setLng(lng1);
|
||||
}
|
||||
|
||||
}
|
||||
return esGpsInfoVO2;
|
||||
}
|
||||
|
||||
private void generateMappingRequest(CreateIndexRequest createIndexRequest) {
|
||||
XContentBuilder builder = null;
|
||||
try {
|
||||
|
|
@ -516,16 +351,5 @@ public class GpsServiceImpl implements IGpsService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断一个定位是否是0的方法
|
||||
* @param lat
|
||||
* @param lng
|
||||
* @return
|
||||
*/
|
||||
private boolean isNonLatLng(String lat, String lng) {
|
||||
return StringUtils.isBlank(lat) || StringUtils.isBlank(lng)
|
||||
|| "0".equals(lat) || "0".equals(lng)
|
||||
|| "0.0".equals(lat) || "0.0".equals(lng);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -3,5 +3,5 @@ package org.dromara.data2es.util;
|
|||
public class ConfigConstants {
|
||||
public static final String HTTP_HEADER_AUTH_TOKEN = "Auth-Token";
|
||||
|
||||
public static final String KAFKA_TOPIC_SEND_PRE = "topic.send";
|
||||
public static final String KAFKA_TOPIC_SEND_PRE = "rs_topic.send";
|
||||
}
|
||||
|
|
@ -6,7 +6,7 @@ server:
|
|||
spring:
|
||||
application:
|
||||
# 应用名称
|
||||
name: wzhj-data2es
|
||||
name: stwzhj-data2es
|
||||
profiles:
|
||||
# 环境配置
|
||||
active: @profiles.active@
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="logs/${project.artifactId}" />
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="console.log.pattern"
|
||||
value="%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}%n) - %msg%n"/>
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>${console.log.pattern}</pattern>
|
||||
<charset>utf-8</charset>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<include resource="logback-common.xml" />
|
||||
|
||||
<include resource="logback-logstash.xml" />
|
||||
|
||||
<!-- 开启 skywalking 日志收集 -->
|
||||
<include resource="logback-skylog.xml" />
|
||||
|
||||
<!--系统操作日志-->
|
||||
<root level="info">
|
||||
<appender-ref ref="console" />
|
||||
</root>
|
||||
</configuration>
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue