基础数据同步省厅服务

stwzhj
luyya 2026-03-16 18:08:30 +08:00
parent 3de061a843
commit 8364d79d05
19 changed files with 1038 additions and 138 deletions

View File

@ -0,0 +1,131 @@
# 设备数据同步功能
## 功能介绍
本模块实现了从本地MySQL数据库定时抽取设备数据到目标PostgreSQL数据库的功能。每个地市部署一个程序实例从本地MySQL数据库抽取数据到统一的目标PostgreSQL数据库。
## 主要特性
1. **轻量化设计**移除了nacos等微服务相关依赖只保留了必要的功能
2. **增量同步**根据目标表中的info_source查询该地市最新更新的数据然后从源表中抽取
3. **设备类型映射**通过sys_dict_data字典表进行设备类型转换
4. **批量处理**:支持批量插入和更新,提高同步效率
5. **可配置**所有配置通过application.yml文件进行配置
## 部署架构
每个地市部署一个程序实例从本地MySQL数据库抽取数据到统一的目标PostgreSQL数据库
```
地市A实例 -> 本地MySQL A -> 目标PostgreSQL
地市B实例 -> 本地MySQL B -> 目标PostgreSQL
地市C实例 -> 本地MySQL C -> 目标PostgreSQL
```
## 配置说明
### 数据源配置
在application.yml中配置源数据库和目标数据库
```yaml
spring:
datasource:
dynamic:
primary: target #设置默认数据源为目标数据库
strict: false #严格匹配数据源
datasource:
# 源数据库(MySQL) - 每个地市配置不同的源数据库
source:
url: jdbc:mysql://localhost:3306/wzhj_hs
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
# 目标数据库(PostgreSQL) - 所有地市使用相同的目标数据库
target:
url: jdbc:postgresql://localhost:5432/your_database
username: postgres
password: postgres
driver-class-name: org.postgresql.Driver
```
### 设备同步配置
```yaml
device-sync:
# 信息来源标识 - 每个地市配置不同的info_source
info-source: "3418"
# 源表结构类型 - v1或v2
# v1: 第一种表结构字段使用下划线命名如police_no
# v2: 第二种表结构部分字段使用驼峰命名如policeNo
source-table-type: v1
# 设备类型映射字典类型
dict-type: device_type_tost
# 批量插入大小
batch-size: 50
# 定时任务cron表达式
cron: 0 0/10 * * * ?
```
### 源表结构类型说明
本系统支持两种不同的源表结构:
1. **v1类型**:第一种表结构
- 字段使用下划线命名如police_no、police_name、phone_num等
- 包含所有字段,包括录入单位、修改单位、设备品牌、设备型号等详细信息
2. **v2类型**:第二种表结构
- 部分字段使用驼峰命名如policeNo、policeName、phoneNum等
- 只包含核心字段,不包含录入单位、修改单位、设备品牌、设备型号等详细信息
根据源数据库的表结构在配置文件中设置相应的source-table-type值。
## 数据同步逻辑
1. 根据目标表中的info_source查询该地市最新更新的时间
2. 从本地MySQL数据库查询更新时间大于该时间的所有设备数据
3. 通过sys_dict_data字典表进行设备类型映射
4. 根据device_code和info_source判断设备是否存在存在则更新不存在则新增
5. 批量处理数据,提高同步效率
## 设备类型映射
设备类型映射通过sys_dict_data字典表实现配置规则如下
- dict_type: device_type_tost
- dict_value: 源表的device_type值
- dict_label: 目标表的device_type值
示例:
| dict_type | dict_value | dict_label |
|-----------|------------|------------|
| device_type_tost | 1 | 1 |
| device_type_tost | 2 | 2 |
| device_type_tost | 北斗 | 1 |
| device_type_tost | 车载 | 2 |
## 定时任务
系统默认配置了两个定时任务:
1. sendToSt设备数据同步到省厅原有功能
2. syncDevicesFromSource从源数据库同步设备数据新增功能
定时任务的执行时间可以通过application.yml中的cron表达式进行配置。
## 使用说明
1. 修改application.yml配置文件配置本地MySQL数据库和目标PostgreSQL数据库的连接信息
2. 配置该地市的info_source标识
3. 配置设备类型映射字典数据
4. 启动应用,系统将自动执行定时同步任务
## 注意事项
1. 确保源数据库和目标数据库的t_device表结构正确
2. 确保sys_dict_data表中配置了正确的设备类型映射
3. 首次同步时,如果目标表中无数据,将同步所有源数据
4. 建议在非高峰期执行同步任务,避免影响业务系统性能
5. 不同地市的info_source必须唯一否则会导致数据冲突

View File

@ -16,14 +16,17 @@
</description>
<dependencies>
<!-- PostgreSQL Driver -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-nacos</artifactId>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<!-- 动态数据源 -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-sentinel</artifactId>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot3-starter</artifactId>
<version>${dynamic-ds.version}</version>
</dependency>
<!-- RuoYi Common Log -->
@ -37,65 +40,15 @@
<artifactId>stwzhj-common-dict</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-doc</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-web</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-seata</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-idempotent</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-tenant</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-security</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-translation</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-sensitive</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-common-encrypt</artifactId>
</dependency>
<!-- RuoYi Api System -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>stwzhj-api-system</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

View File

@ -2,20 +2,17 @@ package org.dromara.basetost;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* <p>description: </p>
*
*
* @author chenle
* @date 2023-05-20 12:01
*/
@SpringBootApplication
@EnableJpaAuditing
@EnableDiscoveryClient
@EnableScheduling
public class BaseToSTApplication {
public static void main(String[] args) {

View File

@ -0,0 +1,37 @@
package org.dromara.basetost.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
*
*/
@Data
@Component
@ConfigurationProperties(prefix = "device-sync")
public class DeviceSyncConfig {
/**
* - info_source
*/
private String infoSource = "3418";
/**
*
*/
private String dictType = "device_type_tost";
/**
*
*/
private int batchSize = 50;
/**
* - v1v2
* v1: 使线police_no
* v2: 使policeNo
*/
private String sourceTableType = "v1";
}

View File

@ -0,0 +1,14 @@
package org.dromara.basetost.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
/**
* JPA
*/
@Configuration
@EnableJpaRepositories(basePackages = "org.dromara.basetost.repository")
public class JpaConfig {
// JPA配置启用JPA仓库
}

View File

@ -105,4 +105,88 @@ public class Device implements AbstractGpsEntity, Serializable {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date updateTime;
/**
*
*/
@Column(name = "info_source")
private String infoSource;
/**
*
*/
@Column(name = "lrdwdm")
private String lrdwdm;
/**
*
*/
@Column(name = "lrdwmc")
private String lrdwmc;
/**
*
*/
@Column(name = "lrrxm")
private String lrrxm;
/**
*
*/
@Column(name = "lrrsfzh")
private String lrrsfzh;
/**
*
*/
@Column(name = "xgdwdm")
private String xgdwdm;
/**
*
*/
@Column(name = "xgdwmc")
private String xgdwmc;
/**
*
*/
@Column(name = "xgrxm")
private String xgrxm;
/**
*
*/
@Column(name = "xgrsfzh")
private String xgrsfzh;
/**
*
*/
@Column(name = "sbpp")
private String sbpp;
/**
*
*/
@Column(name = "sbxh")
private String sbxh;
/**
*
*/
@Column(name = "card_num")
private String cardNum;
/**
*
*/
@Column(name = "tdbm")
private String tdbm;
/**
*
*/
@Column(name = "gbbm")
private String gbbm;
}

View File

@ -49,54 +49,8 @@ public class DictData implements Serializable {
@Column(name = "dict_type")
private String dictType;
/**
*
*/
@Column(name = "css_class")
private String cssClass;
/**
*
*/
@Column(name = "list_class")
private String listClass;
/**
* 0 1
*/
@Column(name = "status")
private String status;
/**
*
*/
@Column(name = "create_by")
private String createBy;
/**
*
*/
@Column(name = "create_time")
private Date createTime;
/**
*
*/
@Column(name = "update_by")
private String updateBy;
/**
*
*/
@Column(name = "update_time")
private Date updateTime;
/**
*
*/
@Column(name = "remark")
private String remark;
}

View File

@ -0,0 +1,186 @@
package org.dromara.basetost.entity;
import jakarta.persistence.*;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
*
*/
@Data
@Entity
@Table(name = "t_device")
public class SourceDevice implements Serializable {
private static final long serialVersionUID = 1L;
@Id
@Column(name = "id", nullable = false)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
/**
* 21
*/
@Column(name = "device_code")
private String deviceCode;
/**
*
*/
@Column(name = "device_type")
private String deviceType;
/**
*
*/
@Column(name = "zzjgdm")
private String zzjgdm;
/**
*
*/
@Column(name = "zzjgmc")
private String zzjgmc;
/**
*
*/
@Column(name = "police_no")
private String policeNo;
/**
*
*/
@Column(name = "police_name")
private String policeName;
/**
*
*/
@Column(name = "phone_num")
private String phoneNum;
/**
*
*/
@Column(name = "car_num")
private String carNum;
/**
* 01
*/
@Column(name = "valid")
private Integer valid;
/**
* 1
*/
@Column(name = "remark1")
private String remark1;
/**
* 2
*/
@Column(name = "remark2")
private String remark2;
/**
*
*/
@Column(name = "create_time")
private Date createTime;
/**
*
*/
@Column(name = "update_time")
private Date updateTime;
/**
*
*/
@Column(name = "info_source")
private String infoSource;
/**
*
*/
@Column(name = "lrdwdm")
private String lrdwdm;
/**
*
*/
@Column(name = "lrdwmc")
private String lrdwmc;
/**
*
*/
@Column(name = "lrrxm")
private String lrrxm;
/**
*
*/
@Column(name = "lrrsfzh")
private String lrrsfzh;
/**
*
*/
@Column(name = "xgdwdm")
private String xgdwdm;
/**
*
*/
@Column(name = "xgdwmc")
private String xgdwmc;
/**
*
*/
@Column(name = "xgrxm")
private String xgrxm;
/**
*
*/
@Column(name = "xgrsfzh")
private String xgrsfzh;
/**
*
*/
@Column(name = "sbpp")
private String sbpp;
/**
*
*/
@Column(name = "sbxh")
private String sbxh;
/**
*
*/
@Column(name = "card_num")
private String cardNum;
/**
*
*/
@Column(name = "tdbm")
private String tdbm;
/**
*
*/
@Column(name = "gbbm")
private String gbbm;
}

View File

@ -0,0 +1,114 @@
package org.dromara.basetost.entity;
import jakarta.persistence.*;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* V2 -
*/
@Data
@Entity
@Table(name = "t_device")
public class SourceDeviceV2 implements Serializable {
private static final long serialVersionUID = 1L;
@Id
@Column(name = "id", nullable = false)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
/**
* 21
*/
@Column(name = "device_code")
private String deviceCode;
/**
*
*/
@Column(name = "device_type")
private String deviceType;
/**
*
*/
@Column(name = "zzjgdm")
private String zzjgdm;
/**
*
*/
@Column(name = "zzjgmc")
private String zzjgmc;
/**
* - 使
*/
@Column(name = "policeNo")
private String policeNo;
/**
* - 使
*/
@Column(name = "policeName")
private String policeName;
/**
* - 使
*/
@Column(name = "phoneNum")
private String phoneNum;
/**
*
*/
@Column(name = "car_num")
private String carNum;
/**
* 01
*/
@Column(name = "valid")
private Integer valid;
/**
* 1
*/
@Column(name = "remark1")
private String remark1;
/**
* 2
*/
@Column(name = "remark2")
private String remark2;
/**
*
*/
@Column(name = "create_time")
private Date createTime;
/**
*
*/
@Column(name = "update_time")
private Date updateTime;
/**
*
*/
@Column(name = "info_source")
private String infoSource;
/**
* - 使
*/
@Column(name = "card_num")
private String cardNum;
}

View File

@ -1,6 +1,5 @@
package org.dromara.basetost.handler;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.common.core.domain.R;
import org.dromara.system.api.RemoteDeviceService;
import org.dromara.system.api.domain.bo.RemoteDeviceBo;
@ -23,7 +22,6 @@ public class AsyncHandler {
private Logger logger = LoggerFactory.getLogger(AsyncHandler.class);
@DubboReference
private RemoteDeviceService deviceService;

View File

@ -3,6 +3,8 @@ package org.dromara.basetost.repository;
import org.dromara.basetost.entity.Device;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Date;
import java.util.List;
@ -10,6 +12,21 @@ import java.util.List;
public interface DeviceRepository extends JpaRepository<Device, Integer>, JpaSpecificationExecutor<Device> {
List<Device> findDeviceByUpdateTimeAfterOrderByUpdateTimeAsc(Date updateTime);
/**
*
* @param deviceCode
* @param infoSource
* @return
*/
Device findByDeviceCodeAndInfoSource(String deviceCode, String infoSource);
/**
*
* @param infoSource
* @return
*/
@Query("SELECT MAX(d.updateTime) FROM Device d WHERE d.infoSource = :infoSource")
Date findMaxUpdateTimeByInfoSource(@Param("infoSource") String infoSource);
// @Query(nativeQuery = true,value = "select * from t_device t1 " +
// " where t1.update_time>?1 order by update_time asc limit 50")

View File

@ -6,4 +6,12 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
public interface DictDataRepository extends JpaRepository<DictData, Long>, JpaSpecificationExecutor<DictData> {
DictData findDictDataByDictTypeAndDictLabel(String dictType, String dictLabel);
/**
*
* @param dictType
* @param dictValue
* @return
*/
DictData findDictDataByDictTypeAndDictValue(String dictType, String dictValue);
}

View File

@ -0,0 +1,40 @@
package org.dromara.basetost.repository;
import org.dromara.basetost.entity.SourceDevice;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Date;
import java.util.List;
/**
* 访
*/
public interface SourceDeviceRepository extends JpaRepository<SourceDevice, Integer>, JpaSpecificationExecutor<SourceDevice> {
/**
*
* @param updateTime
* @return
*/
List<SourceDevice> findByUpdateTimeAfterOrderByUpdateTimeAsc(Date updateTime);
/**
*
* @param deviceCode
* @param infoSource
* @return
*/
SourceDevice findByDeviceCodeAndInfoSource(String deviceCode, String infoSource);
/**
*
* @param infoSource
* @return
*/
@Query("SELECT MAX(d.updateTime) FROM SourceDevice d WHERE d.infoSource = :infoSource")
Date findMaxUpdateTimeByInfoSource(@Param("infoSource") String infoSource);
}

View File

@ -0,0 +1,40 @@
package org.dromara.basetost.repository;
import org.dromara.basetost.entity.SourceDeviceV2;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Date;
import java.util.List;
/**
* 访V2 -
*/
public interface SourceDeviceV2Repository extends JpaRepository<SourceDeviceV2, Integer>, JpaSpecificationExecutor<SourceDeviceV2> {
/**
*
* @param updateTime
* @return
*/
List<SourceDeviceV2> findByUpdateTimeAfterOrderByUpdateTimeAsc(Date updateTime);
/**
*
* @param deviceCode
* @param infoSource
* @return
*/
SourceDeviceV2 findByDeviceCodeAndInfoSource(String deviceCode, String infoSource);
/**
*
* @param infoSource
* @return
*/
@Query("SELECT MAX(d.updateTime) FROM SourceDeviceV2 d WHERE d.infoSource = :infoSource")
Date findMaxUpdateTimeByInfoSource(@Param("infoSource") String infoSource);
}

View File

@ -1,13 +1,14 @@
package org.dromara.basetost.schedule;
import org.dromara.basetost.handler.AbstractAsyncHandler;
import org.dromara.basetost.service.DeviceSyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* <p>description: </p>
*
*
* @author chenle
* @date 2023-05-22 9:24
@ -15,14 +16,26 @@ import org.springframework.stereotype.Component;
@Component
public class BaseToSTSchedule {
@Autowired
@Qualifier(value = "pdthandler")
AbstractAsyncHandler abstractAsyncHandler;
private AbstractAsyncHandler abstractAsyncHandler;
@Autowired
private DeviceSyncService deviceSyncService;
/**
*
*/
@Scheduled(cron = "${devicecorn:0/30 * * * * ?}")
public void sendToSt(){
public void sendToSt() {
abstractAsyncHandler.saveBaseToST();
}
/**
*
*/
@Scheduled(cron = "${device-sync.cron:0 0/10 * * * ?}")
public void syncDevicesFromSource() {
deviceSyncService.syncDevices();
}
}

View File

@ -0,0 +1,206 @@
package org.dromara.basetost.service;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import org.dromara.basetost.config.DeviceSyncConfig;
import org.dromara.basetost.entity.Device;
import org.dromara.basetost.entity.DictData;
import org.dromara.basetost.entity.SourceDevice;
import org.dromara.basetost.entity.SourceDeviceV2;
import org.dromara.basetost.repository.DeviceRepository;
import org.dromara.basetost.repository.DictDataRepository;
import org.dromara.basetost.repository.SourceDeviceRepository;
import org.dromara.basetost.repository.SourceDeviceV2Repository;
import org.dromara.basetost.util.DataSourceContextHolder;
import org.dromara.basetost.util.FieldMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
/**
*
*/
@Service
public class DeviceSyncService {
private static final Logger logger = LoggerFactory.getLogger(DeviceSyncService.class);
@Autowired
private DeviceSyncConfig deviceSyncConfig;
@Autowired
private DeviceRepository deviceRepository;
@Autowired
private SourceDeviceRepository sourceDeviceRepository;
@Autowired
private SourceDeviceV2Repository sourceDeviceV2Repository;
@Autowired
private DictDataRepository dictDataRepository;
/**
*
*/
@DS("target")
@Transactional(rollbackFor = Exception.class)
public void syncDevices() {
logger.info("开始同步设备数据infoSource: {}", deviceSyncConfig.getInfoSource());
try {
// 1. 查询目标表中该info_source的最新更新时间
Date lastUpdateTime = deviceRepository.findMaxUpdateTimeByInfoSource(deviceSyncConfig.getInfoSource());
if (lastUpdateTime == null) {
// 如果没有数据,使用一个较早的时间
lastUpdateTime = DateUtil.parse("2020-01-01 00:00:00");
logger.info("目标表中无该info_source的数据使用初始时间: {}", lastUpdateTime);
} else {
logger.info("目标表中该info_source的最新更新时间: {}", lastUpdateTime);
}
// 2. 从源数据库查询更新的数据
List<SourceDevice> sourceDevices = getUpdatedDevicesFromSource(lastUpdateTime);
logger.info("从源数据库查询到 {} 条更新的数据", sourceDevices.size());
if (CollectionUtils.isEmpty(sourceDevices)) {
logger.info("源数据库无更新的数据");
return;
}
// 3. 处理设备类型映射
mapDeviceTypes(sourceDevices);
// 4. 同步数据到目标表
syncDevicesToTarget(sourceDevices);
logger.info("同步设备数据完成infoSource: {}", deviceSyncConfig.getInfoSource());
} catch (Exception e) {
logger.error("同步设备数据失败: {}", e.getMessage(), e);
throw new RuntimeException("同步设备数据失败", e);
}
}
/**
*
* @param lastUpdateTime
* @return
*/
public List<SourceDevice> getUpdatedDevicesFromSource(Date lastUpdateTime) {
// 切换到源数据源
DataSourceContextHolder.setDataSource("source");
try {
// 根据源表结构类型查询更新的数据
if ("v2".equals(deviceSyncConfig.getSourceTableType())) {
// 使用第二种表结构
List<SourceDeviceV2> v2Devices = sourceDeviceV2Repository.findByUpdateTimeAfterOrderByUpdateTimeAsc(lastUpdateTime);
// 转换为SourceDevice
return convertV2ToV1(v2Devices);
} else {
// 使用第一种表结构
return sourceDeviceRepository.findByUpdateTimeAfterOrderByUpdateTimeAsc(lastUpdateTime);
}
} finally {
// 恢复到默认数据源
DataSourceContextHolder.clearDataSource();
}
}
/**
* SourceDeviceV2SourceDevice
* @param v2Devices SourceDeviceV2
* @return SourceDevice
*/
private List<SourceDevice> convertV2ToV1(List<SourceDeviceV2> v2Devices) {
if (CollectionUtils.isEmpty(v2Devices)) {
return new ArrayList<>();
}
List<SourceDevice> v1Devices = new ArrayList<>();
for (SourceDeviceV2 v2Device : v2Devices) {
SourceDevice v1Device = new SourceDevice();
// 复制相同名称的字段
BeanUtil.copyProperties(v2Device, v1Device);
v1Devices.add(v1Device);
}
return v1Devices;
}
/**
*
* @param sourceDevices
*/
private void mapDeviceTypes(List<SourceDevice> sourceDevices) {
for (SourceDevice sourceDevice : sourceDevices) {
String sourceDeviceType = sourceDevice.getDeviceType();
if (sourceDeviceType == null || sourceDeviceType.isEmpty()) {
continue;
}
// 从字典表查询设备类型映射
// dict_value是源表的device_type值dict_label是目标表的device_type值
DictData dictData = dictDataRepository.findDictDataByDictTypeAndDictLabel(
deviceSyncConfig.getDictType(), sourceDeviceType);
String targetDeviceType = "99"; // 默认值为"99"(其他)
if (!Objects.isNull(dictData) && dictData.getDictValue() != null) {
targetDeviceType = dictData.getDictValue();
}
sourceDevice.setDeviceType(targetDeviceType);
}
}
/**
*
* @param sourceDevices
*/
@DS("target")
@Transactional(rollbackFor = Exception.class)
public void syncDevicesToTarget(List<SourceDevice> sourceDevices) {
if (CollectionUtils.isEmpty(sourceDevices)) {
return;
}
int batchSize = deviceSyncConfig.getBatchSize();
int size = sourceDevices.size();
int batchCount = (size + batchSize - 1) / batchSize;
for (int i = 0; i < batchCount; i++) {
int fromIndex = i * batchSize;
int toIndex = Math.min((i + 1) * batchSize, size);
List<SourceDevice> batch = sourceDevices.subList(fromIndex, toIndex);
// 批量处理
for (SourceDevice sourceDevice : batch) {
// 查询目标表中是否存在该设备
Device existingDevice = deviceRepository.findByDeviceCodeAndInfoSource(
sourceDevice.getDeviceCode(), deviceSyncConfig.getInfoSource());
// 转换为目标设备
Device targetDevice = FieldMapper.mapToTargetDevice(sourceDevice, deviceSyncConfig.getInfoSource());
if (existingDevice != null) {
// 更新现有设备
targetDevice.setId(existingDevice.getId());
deviceRepository.save(targetDevice);
} else {
// 插入新设备
deviceRepository.save(targetDevice);
}
}
logger.info("已处理 {}/{} 批次数据", i + 1, batchCount);
}
}
}

View File

@ -0,0 +1,33 @@
package org.dromara.basetost.util;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
/**
*
*/
public class DataSourceContextHolder {
/**
*
* @param dataSourceName
*/
public static void setDataSource(String dataSourceName) {
DynamicDataSourceContextHolder.push(dataSourceName);
}
/**
*
* @return
*/
public static String getDataSource() {
return DynamicDataSourceContextHolder.peek();
}
/**
*
*/
public static void clearDataSource() {
DynamicDataSourceContextHolder.clear();
}
}

View File

@ -0,0 +1,66 @@
package org.dromara.basetost.util;
import cn.hutool.core.bean.BeanUtil;
import org.dromara.basetost.entity.Device;
import org.dromara.basetost.entity.SourceDevice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public class FieldMapper {
private static final Logger logger = LoggerFactory.getLogger(FieldMapper.class);
/**
*
* @param sourceDevice
* @param infoSource
* @return
*/
public static Device mapToTargetDevice(SourceDevice sourceDevice, String infoSource) {
Device targetDevice = new Device();
try {
// 复制相同名称的字段
BeanUtil.copyProperties(sourceDevice, targetDevice);
// 设置info_source
targetDevice.setInfoSource(infoSource);
// 处理字段名差异
if (sourceDevice.getPoliceNo() != null) {
targetDevice.setPoliceNo(sourceDevice.getPoliceNo());
}
if (sourceDevice.getPoliceName() != null) {
targetDevice.setPoliceName(sourceDevice.getPoliceName());
}
if (sourceDevice.getPhoneNum() != null) {
targetDevice.setPhoneNum(sourceDevice.getPhoneNum());
}
if (sourceDevice.getCarNum() != null) {
targetDevice.setCarNum(sourceDevice.getCarNum());
}
} catch (Exception e) {
logger.error("字段映射失败: {}", e.getMessage(), e);
}
return targetDevice;
}
/**
*
* @param sourceDevices
* @param infoSource
* @return
*/
public static java.util.List<Device> mapToTargetDevices(java.util.List<SourceDevice> sourceDevices, String infoSource) {
return sourceDevices.stream()
.map(sourceDevice -> mapToTargetDevice(sourceDevice, infoSource))
.collect(java.util.stream.Collectors.toList());
}
}

View File

@ -11,35 +11,44 @@ spring:
profiles:
# 环境配置
active: dev
--- # nacos 配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/wzhj_hs?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
type: com.zaxxer.hikari.HikariDataSource
dynamic:
primary: target #设置默认数据源为目标数据库
strict: false #严格匹配数据源
datasource:
# 源数据库(MySQL) - 每个地市配置不同的源数据库
source:
url: jdbc:mysql://53.248.2.141:3306/wzhj-bz?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
username: root
password: Ycgis!2509
driver-class-name: com.mysql.cj.jdbc.Driver
# 目标数据库(PostgreSQL) - 所有地市使用相同的目标数据库
target:
url: jdbc:postgresql://53.16.17.15:5432/wzhj?useUnicode=true&characterEncoding=utf8&useSSL=true&autoReconnect=true&reWriteBatchedInserts=true&stringtype=unspecified
username: pgsql
password: ycgis
driver-class-name: org.postgresql.Driver
jpa:
show-sql: true
hibernate:
naming:
physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
cloud:
nacos:
# nacos 服务地址
server-addr: 127.0.0.1:8848
username: nacos
password: nacos
discovery:
# 注册组
group: DEFAULT_GROUP
namespace: ${spring.profiles.active}
config:
# 配置组
group: DEFAULT_GROUP
namespace: ${spring.profiles.active}
config:
import:
- optional:nacos:application-common.yml
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect
# 设备数据同步配置
device-sync:
# 信息来源标识 - 每个地市配置不同的info_source
info-source: "3416"
# 源表结构类型 - v1或v2
# v1: 第一种表结构字段使用下划线命名如police_no
# v2: 第二种表结构部分字段使用驼峰命名如policeNo
source-table-type: v1
# 设备类型映射字典类型
dict-type: device_type_tost
# 批量插入大小
batch-size: 50
# 定时任务cron表达式
cron: 0 0/10 * * * ?