diff --git a/stwzhj-modules/stwzhj-baseToSt/README.md b/stwzhj-modules/stwzhj-baseToSt/README.md new file mode 100644 index 00000000..ff60412c --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/README.md @@ -0,0 +1,131 @@ + +# 设备数据同步功能 + +## 功能介绍 + +本模块实现了从本地MySQL数据库定时抽取设备数据到目标PostgreSQL数据库的功能。每个地市部署一个程序实例,从本地MySQL数据库抽取数据到统一的目标PostgreSQL数据库。 + +## 主要特性 + +1. **轻量化设计**:移除了nacos等微服务相关依赖,只保留了必要的功能 +2. **增量同步**:根据目标表中的info_source查询该地市最新更新的数据,然后从源表中抽取 +3. **设备类型映射**:通过sys_dict_data字典表进行设备类型转换 +4. **批量处理**:支持批量插入和更新,提高同步效率 +5. **可配置**:所有配置通过application.yml文件进行配置 + +## 部署架构 + +每个地市部署一个程序实例,从本地MySQL数据库抽取数据到统一的目标PostgreSQL数据库: + +``` +地市A实例 -> 本地MySQL A -> 目标PostgreSQL +地市B实例 -> 本地MySQL B -> 目标PostgreSQL +地市C实例 -> 本地MySQL C -> 目标PostgreSQL +``` + +## 配置说明 + +### 数据源配置 + +在application.yml中配置源数据库和目标数据库: + +```yaml +spring: + datasource: + dynamic: + primary: target #设置默认数据源为目标数据库 + strict: false #严格匹配数据源 + datasource: + # 源数据库(MySQL) - 每个地市配置不同的源数据库 + source: + url: jdbc:mysql://localhost:3306/wzhj_hs + username: root + password: root + driver-class-name: com.mysql.cj.jdbc.Driver + # 目标数据库(PostgreSQL) - 所有地市使用相同的目标数据库 + target: + url: jdbc:postgresql://localhost:5432/your_database + username: postgres + password: postgres + driver-class-name: org.postgresql.Driver +``` + +### 设备同步配置 + +```yaml +device-sync: + # 信息来源标识 - 每个地市配置不同的info_source + info-source: "3418" + # 源表结构类型 - v1或v2 + # v1: 第一种表结构,字段使用下划线命名(如police_no) + # v2: 第二种表结构,部分字段使用驼峰命名(如policeNo) + source-table-type: v1 + # 设备类型映射字典类型 + dict-type: device_type_tost + # 批量插入大小 + batch-size: 50 + # 定时任务cron表达式 + cron: 0 0/10 * * * ? +``` + +### 源表结构类型说明 + +本系统支持两种不同的源表结构: + +1. **v1类型**:第一种表结构 + - 字段使用下划线命名,如police_no、police_name、phone_num等 + - 包含所有字段,包括录入单位、修改单位、设备品牌、设备型号等详细信息 + +2. **v2类型**:第二种表结构 + - 部分字段使用驼峰命名,如policeNo、policeName、phoneNum等 + - 只包含核心字段,不包含录入单位、修改单位、设备品牌、设备型号等详细信息 + +根据源数据库的表结构,在配置文件中设置相应的source-table-type值。 + +## 数据同步逻辑 + +1. 根据目标表中的info_source查询该地市最新更新的时间 +2. 从本地MySQL数据库查询更新时间大于该时间的所有设备数据 +3. 通过sys_dict_data字典表进行设备类型映射 +4. 根据device_code和info_source判断设备是否存在,存在则更新,不存在则新增 +5. 批量处理数据,提高同步效率 + +## 设备类型映射 + +设备类型映射通过sys_dict_data字典表实现,配置规则如下: + +- dict_type: device_type_tost +- dict_value: 源表的device_type值 +- dict_label: 目标表的device_type值 + +示例: +| dict_type | dict_value | dict_label | +|-----------|------------|------------| +| device_type_tost | 1 | 1 | +| device_type_tost | 2 | 2 | +| device_type_tost | 北斗 | 1 | +| device_type_tost | 车载 | 2 | + +## 定时任务 + +系统默认配置了两个定时任务: + +1. sendToSt:设备数据同步到省厅(原有功能) +2. syncDevicesFromSource:从源数据库同步设备数据(新增功能) + +定时任务的执行时间可以通过application.yml中的cron表达式进行配置。 + +## 使用说明 + +1. 修改application.yml配置文件,配置本地MySQL数据库和目标PostgreSQL数据库的连接信息 +2. 配置该地市的info_source标识 +3. 配置设备类型映射字典数据 +4. 启动应用,系统将自动执行定时同步任务 + +## 注意事项 + +1. 确保源数据库和目标数据库的t_device表结构正确 +2. 确保sys_dict_data表中配置了正确的设备类型映射 +3. 首次同步时,如果目标表中无数据,将同步所有源数据 +4. 建议在非高峰期执行同步任务,避免影响业务系统性能 +5. 不同地市的info_source必须唯一,否则会导致数据冲突 diff --git a/stwzhj-modules/stwzhj-baseToSt/pom.xml b/stwzhj-modules/stwzhj-baseToSt/pom.xml index 9f8016c1..c8b07318 100644 --- a/stwzhj-modules/stwzhj-baseToSt/pom.xml +++ b/stwzhj-modules/stwzhj-baseToSt/pom.xml @@ -16,14 +16,17 @@ + - org.dromara - stwzhj-common-nacos + org.postgresql + postgresql + - org.dromara - stwzhj-common-sentinel + com.baomidou + dynamic-datasource-spring-boot3-starter + ${dynamic-ds.version} @@ -37,65 +40,15 @@ stwzhj-common-dict - - org.dromara - stwzhj-common-doc - - - - org.dromara - stwzhj-common-web - - com.mysql mysql-connector-j + - org.dromara - stwzhj-common-dubbo - - - - org.dromara - stwzhj-common-seata - - - - org.dromara - stwzhj-common-idempotent - - - - org.dromara - stwzhj-common-tenant - - - - org.dromara - stwzhj-common-security - - - - org.dromara - stwzhj-common-translation - - - - org.dromara - stwzhj-common-sensitive - - - - org.dromara - stwzhj-common-encrypt - - - - - org.dromara - stwzhj-api-system + org.springframework.boot + spring-boot-starter-web diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/BaseToSTApplication.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/BaseToSTApplication.java index 0379070f..62176b4e 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/BaseToSTApplication.java +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/BaseToSTApplication.java @@ -2,20 +2,17 @@ package org.dromara.basetost; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.client.discovery.EnableDiscoveryClient; - import org.springframework.data.jpa.repository.config.EnableJpaAuditing; import org.springframework.scheduling.annotation.EnableScheduling; /** - *

description:

+ * 设备数据同步应用 * * @author chenle * @date 2023-05-20 12:01 */ @SpringBootApplication @EnableJpaAuditing -@EnableDiscoveryClient @EnableScheduling public class BaseToSTApplication { public static void main(String[] args) { diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/config/DeviceSyncConfig.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/config/DeviceSyncConfig.java new file mode 100644 index 00000000..367f3dda --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/config/DeviceSyncConfig.java @@ -0,0 +1,37 @@ + +package org.dromara.basetost.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 设备数据同步配置 + */ +@Data +@Component +@ConfigurationProperties(prefix = "device-sync") +public class DeviceSyncConfig { + + /** + * 信息来源标识 - 每个地市配置不同的info_source + */ + private String infoSource = "3418"; + + /** + * 设备类型映射字典类型 + */ + private String dictType = "device_type_tost"; + + /** + * 批量插入大小 + */ + private int batchSize = 50; + + /** + * 源表结构类型 - v1或v2 + * v1: 第一种表结构,字段使用下划线命名(如police_no) + * v2: 第二种表结构,部分字段使用驼峰命名(如policeNo) + */ + private String sourceTableType = "v1"; +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/config/JpaConfig.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/config/JpaConfig.java new file mode 100644 index 00000000..dcc27822 --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/config/JpaConfig.java @@ -0,0 +1,14 @@ + +package org.dromara.basetost.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; + +/** + * JPA配置类 + */ +@Configuration +@EnableJpaRepositories(basePackages = "org.dromara.basetost.repository") +public class JpaConfig { + // JPA配置,启用JPA仓库 +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/Device.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/Device.java index bec24982..6865f5ec 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/Device.java +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/Device.java @@ -105,4 +105,88 @@ public class Device implements AbstractGpsEntity, Serializable { @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date updateTime; + /** + * 地市 + */ + @Column(name = "info_source") + private String infoSource; + + /** + * 录入单位代码 + */ + @Column(name = "lrdwdm") + private String lrdwdm; + + /** + * 录入单位名称 + */ + @Column(name = "lrdwmc") + private String lrdwmc; + + /** + * 录入人姓名 + */ + @Column(name = "lrrxm") + private String lrrxm; + + /** + * 录入人身份证 + */ + @Column(name = "lrrsfzh") + private String lrrsfzh; + + /** + * 修改单位代码 + */ + @Column(name = "xgdwdm") + private String xgdwdm; + + /** + * 修改单位名称 + */ + @Column(name = "xgdwmc") + private String xgdwmc; + + /** + * 修改人姓名 + */ + @Column(name = "xgrxm") + private String xgrxm; + + /** + * 修改人身份证 + */ + @Column(name = "xgrsfzh") + private String xgrsfzh; + + /** + * 设备品牌 + */ + @Column(name = "sbpp") + private String sbpp; + + /** + * 设备型号 + */ + @Column(name = "sbxh") + private String sbxh; + + /** + * 警员身份证号 + */ + @Column(name = "card_num") + private String cardNum; + + /** + * 通道编码 + */ + @Column(name = "tdbm") + private String tdbm; + + /** + * 国标编码 + */ + @Column(name = "gbbm") + private String gbbm; + } diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/DictData.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/DictData.java index c3f1236d..d77fdf08 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/DictData.java +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/DictData.java @@ -49,54 +49,8 @@ public class DictData implements Serializable { @Column(name = "dict_type") private String dictType; - /** - * 样式属性(其他样式扩展) - */ - @Column(name = "css_class") - private String cssClass; - - /** - * 表格回显样式 - */ - @Column(name = "list_class") - private String listClass; - /** - * 状态(0正常 1停用) - */ - @Column(name = "status") - private String status; - - /** - * 创建者 - */ - @Column(name = "create_by") - private String createBy; - - /** - * 创建时间 - */ - @Column(name = "create_time") - private Date createTime; - - /** - * 更新者 - */ - @Column(name = "update_by") - private String updateBy; - - /** - * 更新时间 - */ - @Column(name = "update_time") - private Date updateTime; - - /** - * 备注 - */ - @Column(name = "remark") - private String remark; } diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/SourceDevice.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/SourceDevice.java new file mode 100644 index 00000000..31fd048e --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/SourceDevice.java @@ -0,0 +1,186 @@ + +package org.dromara.basetost.entity; + +import jakarta.persistence.*; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * 源设备实体类 + */ +@Data +@Entity +@Table(name = "t_device") +public class SourceDevice implements Serializable { + + private static final long serialVersionUID = 1L; + + @Id + @Column(name = "id", nullable = false) + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Integer id; + + /** + * 外部系统设备编号建议21位 + */ + @Column(name = "device_code") + private String deviceCode; + + /** + * 设备类型 + */ + @Column(name = "device_type") + private String deviceType; + + /** + * 组织机构代码 + */ + @Column(name = "zzjgdm") + private String zzjgdm; + + /** + * 组织机构名称 + */ + @Column(name = "zzjgmc") + private String zzjgmc; + + /** + * 警号(若有) + */ + @Column(name = "police_no") + private String policeNo; + + /** + * 姓名(若有) + */ + @Column(name = "police_name") + private String policeName; + + /** + * 联系电话(若有) + */ + @Column(name = "phone_num") + private String phoneNum; + + /** + * 车牌号(若有) + */ + @Column(name = "car_num") + private String carNum; + + /** + * 0无效,1有效 + */ + @Column(name = "valid") + private Integer valid; + + /** + * 备注字段1 + */ + @Column(name = "remark1") + private String remark1; + + /** + * 备注字段2 + */ + @Column(name = "remark2") + private String remark2; + + /** + * 创建时间 + */ + @Column(name = "create_time") + private Date createTime; + + /** + * 最后更新时间 + */ + @Column(name = "update_time") + private Date updateTime; + + /** + * 地市 + */ + @Column(name = "info_source") + private String infoSource; + + /** + * 录入单位代码 + */ + @Column(name = "lrdwdm") + private String lrdwdm; + + /** + * 录入单位名称 + */ + @Column(name = "lrdwmc") + private String lrdwmc; + + /** + * 录入人姓名 + */ + @Column(name = "lrrxm") + private String lrrxm; + + /** + * 录入人身份证 + */ + @Column(name = "lrrsfzh") + private String lrrsfzh; + + /** + * 修改单位代码 + */ + @Column(name = "xgdwdm") + private String xgdwdm; + + /** + * 修改单位名称 + */ + @Column(name = "xgdwmc") + private String xgdwmc; + + /** + * 修改人姓名 + */ + @Column(name = "xgrxm") + private String xgrxm; + + /** + * 修改人身份证 + */ + @Column(name = "xgrsfzh") + private String xgrsfzh; + + /** + * 设备品牌 + */ + @Column(name = "sbpp") + private String sbpp; + + /** + * 设备型号 + */ + @Column(name = "sbxh") + private String sbxh; + + /** + * 警员身份证号 + */ + @Column(name = "card_num") + private String cardNum; + + /** + * 通道编码 + */ + @Column(name = "tdbm") + private String tdbm; + + /** + * 国标编码 + */ + @Column(name = "gbbm") + private String gbbm; +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/SourceDeviceV2.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/SourceDeviceV2.java new file mode 100644 index 00000000..bc444c58 --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/entity/SourceDeviceV2.java @@ -0,0 +1,114 @@ + +package org.dromara.basetost.entity; + +import jakarta.persistence.*; +import lombok.Data; + +import java.io.Serializable; +import java.util.Date; + +/** + * 源设备实体类V2 - 第二种表结构 + */ +@Data +@Entity +@Table(name = "t_device") +public class SourceDeviceV2 implements Serializable { + + private static final long serialVersionUID = 1L; + + @Id + @Column(name = "id", nullable = false) + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Integer id; + + /** + * 外部系统设备编号建议21位 + */ + @Column(name = "device_code") + private String deviceCode; + + /** + * 设备类型 + */ + @Column(name = "device_type") + private String deviceType; + + /** + * 组织机构代码 + */ + @Column(name = "zzjgdm") + private String zzjgdm; + + /** + * 组织机构名称 + */ + @Column(name = "zzjgmc") + private String zzjgmc; + + /** + * 警号(若有)- 使用驼峰命名 + */ + @Column(name = "policeNo") + private String policeNo; + + /** + * 姓名(若有)- 使用驼峰命名 + */ + @Column(name = "policeName") + private String policeName; + + /** + * 联系电话(若有)- 使用驼峰命名 + */ + @Column(name = "phoneNum") + private String phoneNum; + + /** + * 车牌号(若有) + */ + @Column(name = "car_num") + private String carNum; + + /** + * 0无效,1有效 + */ + @Column(name = "valid") + private Integer valid; + + /** + * 备注字段1 + */ + @Column(name = "remark1") + private String remark1; + + /** + * 备注字段2 + */ + @Column(name = "remark2") + private String remark2; + + /** + * 创建时间 + */ + @Column(name = "create_time") + private Date createTime; + + /** + * 最后更新时间 + */ + @Column(name = "update_time") + private Date updateTime; + + /** + * 地市 + */ + @Column(name = "info_source") + private String infoSource; + + /** + * 警员身份证号 - 使用驼峰命名 + */ + @Column(name = "card_num") + private String cardNum; +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/handler/AsyncHandler.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/handler/AsyncHandler.java index d743d5f8..4fb1a980 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/handler/AsyncHandler.java +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/handler/AsyncHandler.java @@ -1,6 +1,5 @@ package org.dromara.basetost.handler; -import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.common.core.domain.R; import org.dromara.system.api.RemoteDeviceService; import org.dromara.system.api.domain.bo.RemoteDeviceBo; @@ -23,7 +22,6 @@ public class AsyncHandler { private Logger logger = LoggerFactory.getLogger(AsyncHandler.class); - @DubboReference private RemoteDeviceService deviceService; diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/DeviceRepository.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/DeviceRepository.java index dfbe058a..4cc80876 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/DeviceRepository.java +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/DeviceRepository.java @@ -3,6 +3,8 @@ package org.dromara.basetost.repository; import org.dromara.basetost.entity.Device; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; import java.util.Date; import java.util.List; @@ -10,6 +12,21 @@ import java.util.List; public interface DeviceRepository extends JpaRepository, JpaSpecificationExecutor { List findDeviceByUpdateTimeAfterOrderByUpdateTimeAsc(Date updateTime); + /** + * 根据设备编码和信息来源查询设备 + * @param deviceCode 设备编码 + * @param infoSource 信息来源 + * @return 设备 + */ + Device findByDeviceCodeAndInfoSource(String deviceCode, String infoSource); + + /** + * 查询指定信息来源的最新更新时间 + * @param infoSource 信息来源 + * @return 最新更新时间 + */ + @Query("SELECT MAX(d.updateTime) FROM Device d WHERE d.infoSource = :infoSource") + Date findMaxUpdateTimeByInfoSource(@Param("infoSource") String infoSource); // @Query(nativeQuery = true,value = "select * from t_device t1 " + // " where t1.update_time>?1 order by update_time asc limit 50") diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/DictDataRepository.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/DictDataRepository.java index de2fb433..e8db242e 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/DictDataRepository.java +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/DictDataRepository.java @@ -6,4 +6,12 @@ import org.springframework.data.jpa.repository.JpaSpecificationExecutor; public interface DictDataRepository extends JpaRepository, JpaSpecificationExecutor { DictData findDictDataByDictTypeAndDictLabel(String dictType, String dictLabel); + + /** + * 根据字典类型和字典值查询字典数据 + * @param dictType 字典类型 + * @param dictValue 字典值 + * @return 字典数据 + */ + DictData findDictDataByDictTypeAndDictValue(String dictType, String dictValue); } diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/SourceDeviceRepository.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/SourceDeviceRepository.java new file mode 100644 index 00000000..cc97d79b --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/SourceDeviceRepository.java @@ -0,0 +1,40 @@ + +package org.dromara.basetost.repository; + +import org.dromara.basetost.entity.SourceDevice; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.util.Date; +import java.util.List; + +/** + * 源设备数据访问接口 + */ +public interface SourceDeviceRepository extends JpaRepository, JpaSpecificationExecutor { + + /** + * 根据更新时间查询设备数据 + * @param updateTime 更新时间 + * @return 设备列表 + */ + List findByUpdateTimeAfterOrderByUpdateTimeAsc(Date updateTime); + + /** + * 根据设备编码和信息来源查询设备 + * @param deviceCode 设备编码 + * @param infoSource 信息来源 + * @return 设备 + */ + SourceDevice findByDeviceCodeAndInfoSource(String deviceCode, String infoSource); + + /** + * 查询指定信息来源的最新更新时间 + * @param infoSource 信息来源 + * @return 最新更新时间 + */ + @Query("SELECT MAX(d.updateTime) FROM SourceDevice d WHERE d.infoSource = :infoSource") + Date findMaxUpdateTimeByInfoSource(@Param("infoSource") String infoSource); +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/SourceDeviceV2Repository.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/SourceDeviceV2Repository.java new file mode 100644 index 00000000..fcd85a3f --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/repository/SourceDeviceV2Repository.java @@ -0,0 +1,40 @@ + +package org.dromara.basetost.repository; + +import org.dromara.basetost.entity.SourceDeviceV2; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import java.util.Date; +import java.util.List; + +/** + * 源设备数据访问接口V2 - 第二种表结构 + */ +public interface SourceDeviceV2Repository extends JpaRepository, JpaSpecificationExecutor { + + /** + * 根据更新时间查询设备数据 + * @param updateTime 更新时间 + * @return 设备列表 + */ + List findByUpdateTimeAfterOrderByUpdateTimeAsc(Date updateTime); + + /** + * 根据设备编码和信息来源查询设备 + * @param deviceCode 设备编码 + * @param infoSource 信息来源 + * @return 设备 + */ + SourceDeviceV2 findByDeviceCodeAndInfoSource(String deviceCode, String infoSource); + + /** + * 查询指定信息来源的最新更新时间 + * @param infoSource 信息来源 + * @return 最新更新时间 + */ + @Query("SELECT MAX(d.updateTime) FROM SourceDeviceV2 d WHERE d.infoSource = :infoSource") + Date findMaxUpdateTimeByInfoSource(@Param("infoSource") String infoSource); +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/schedule/BaseToSTSchedule.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/schedule/BaseToSTSchedule.java index 2f291fc6..453cabf6 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/schedule/BaseToSTSchedule.java +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/schedule/BaseToSTSchedule.java @@ -1,13 +1,14 @@ package org.dromara.basetost.schedule; import org.dromara.basetost.handler.AbstractAsyncHandler; +import org.dromara.basetost.service.DeviceSyncService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** - *

description:

+ * 数据同步调度任务 * * @author chenle * @date 2023-05-22 9:24 @@ -15,14 +16,26 @@ import org.springframework.stereotype.Component; @Component public class BaseToSTSchedule { - @Autowired @Qualifier(value = "pdthandler") - AbstractAsyncHandler abstractAsyncHandler; + private AbstractAsyncHandler abstractAsyncHandler; + @Autowired + private DeviceSyncService deviceSyncService; + /** + * 设备数据同步到省厅 + */ @Scheduled(cron = "${devicecorn:0/30 * * * * ?}") - public void sendToSt(){ + public void sendToSt() { abstractAsyncHandler.saveBaseToST(); } + + /** + * 从源数据库同步设备数据 + */ + @Scheduled(cron = "${device-sync.cron:0 0/10 * * * ?}") + public void syncDevicesFromSource() { + deviceSyncService.syncDevices(); + } } diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/service/DeviceSyncService.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/service/DeviceSyncService.java new file mode 100644 index 00000000..1f2f04a8 --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/service/DeviceSyncService.java @@ -0,0 +1,206 @@ + +package org.dromara.basetost.service; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.date.DateUtil; +import com.baomidou.dynamic.datasource.annotation.DS; +import org.dromara.basetost.config.DeviceSyncConfig; +import org.dromara.basetost.entity.Device; +import org.dromara.basetost.entity.DictData; +import org.dromara.basetost.entity.SourceDevice; +import org.dromara.basetost.entity.SourceDeviceV2; +import org.dromara.basetost.repository.DeviceRepository; +import org.dromara.basetost.repository.DictDataRepository; +import org.dromara.basetost.repository.SourceDeviceRepository; +import org.dromara.basetost.repository.SourceDeviceV2Repository; +import org.dromara.basetost.util.DataSourceContextHolder; +import org.dromara.basetost.util.FieldMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; + +/** + * 设备数据同步服务 + */ +@Service +public class DeviceSyncService { + + private static final Logger logger = LoggerFactory.getLogger(DeviceSyncService.class); + + @Autowired + private DeviceSyncConfig deviceSyncConfig; + + @Autowired + private DeviceRepository deviceRepository; + + @Autowired + private SourceDeviceRepository sourceDeviceRepository; + + @Autowired + private SourceDeviceV2Repository sourceDeviceV2Repository; + + @Autowired + private DictDataRepository dictDataRepository; + + /** + * 从源数据库同步设备数据 + */ + @DS("target") + @Transactional(rollbackFor = Exception.class) + public void syncDevices() { + logger.info("开始同步设备数据,infoSource: {}", deviceSyncConfig.getInfoSource()); + + try { + // 1. 查询目标表中该info_source的最新更新时间 + Date lastUpdateTime = deviceRepository.findMaxUpdateTimeByInfoSource(deviceSyncConfig.getInfoSource()); + if (lastUpdateTime == null) { + // 如果没有数据,使用一个较早的时间 + lastUpdateTime = DateUtil.parse("2020-01-01 00:00:00"); + logger.info("目标表中无该info_source的数据,使用初始时间: {}", lastUpdateTime); + } else { + logger.info("目标表中该info_source的最新更新时间: {}", lastUpdateTime); + } + + // 2. 从源数据库查询更新的数据 + List sourceDevices = getUpdatedDevicesFromSource(lastUpdateTime); + logger.info("从源数据库查询到 {} 条更新的数据", sourceDevices.size()); + + if (CollectionUtils.isEmpty(sourceDevices)) { + logger.info("源数据库无更新的数据"); + return; + } + + // 3. 处理设备类型映射 + mapDeviceTypes(sourceDevices); + + // 4. 同步数据到目标表 + syncDevicesToTarget(sourceDevices); + + logger.info("同步设备数据完成,infoSource: {}", deviceSyncConfig.getInfoSource()); + } catch (Exception e) { + logger.error("同步设备数据失败: {}", e.getMessage(), e); + throw new RuntimeException("同步设备数据失败", e); + } + } + + /** + * 从源数据库查询更新的数据 + * @param lastUpdateTime 最后更新时间 + * @return 更新的设备列表 + */ + public List getUpdatedDevicesFromSource(Date lastUpdateTime) { + // 切换到源数据源 + DataSourceContextHolder.setDataSource("source"); + try { + // 根据源表结构类型查询更新的数据 + if ("v2".equals(deviceSyncConfig.getSourceTableType())) { + // 使用第二种表结构 + List v2Devices = sourceDeviceV2Repository.findByUpdateTimeAfterOrderByUpdateTimeAsc(lastUpdateTime); + // 转换为SourceDevice + return convertV2ToV1(v2Devices); + } else { + // 使用第一种表结构 + return sourceDeviceRepository.findByUpdateTimeAfterOrderByUpdateTimeAsc(lastUpdateTime); + } + } finally { + // 恢复到默认数据源 + DataSourceContextHolder.clearDataSource(); + } + } + + /** + * 将SourceDeviceV2转换为SourceDevice + * @param v2Devices SourceDeviceV2列表 + * @return SourceDevice列表 + */ + private List convertV2ToV1(List v2Devices) { + if (CollectionUtils.isEmpty(v2Devices)) { + return new ArrayList<>(); + } + + List v1Devices = new ArrayList<>(); + for (SourceDeviceV2 v2Device : v2Devices) { + SourceDevice v1Device = new SourceDevice(); + // 复制相同名称的字段 + BeanUtil.copyProperties(v2Device, v1Device); + v1Devices.add(v1Device); + } + return v1Devices; + } + + /** + * 处理设备类型映射 + * @param sourceDevices 源设备列表 + */ + private void mapDeviceTypes(List sourceDevices) { + for (SourceDevice sourceDevice : sourceDevices) { + String sourceDeviceType = sourceDevice.getDeviceType(); + if (sourceDeviceType == null || sourceDeviceType.isEmpty()) { + continue; + } + + // 从字典表查询设备类型映射 + // dict_value是源表的device_type值,dict_label是目标表的device_type值 + DictData dictData = dictDataRepository.findDictDataByDictTypeAndDictLabel( + deviceSyncConfig.getDictType(), sourceDeviceType); + + String targetDeviceType = "99"; // 默认值为"99"(其他) + if (!Objects.isNull(dictData) && dictData.getDictValue() != null) { + targetDeviceType = dictData.getDictValue(); + } + + sourceDevice.setDeviceType(targetDeviceType); + } + } + + /** + * 同步设备数据到目标表 + * @param sourceDevices 源设备列表 + */ + @DS("target") + @Transactional(rollbackFor = Exception.class) + public void syncDevicesToTarget(List sourceDevices) { + if (CollectionUtils.isEmpty(sourceDevices)) { + return; + } + + int batchSize = deviceSyncConfig.getBatchSize(); + int size = sourceDevices.size(); + int batchCount = (size + batchSize - 1) / batchSize; + + for (int i = 0; i < batchCount; i++) { + int fromIndex = i * batchSize; + int toIndex = Math.min((i + 1) * batchSize, size); + List batch = sourceDevices.subList(fromIndex, toIndex); + + // 批量处理 + for (SourceDevice sourceDevice : batch) { + // 查询目标表中是否存在该设备 + Device existingDevice = deviceRepository.findByDeviceCodeAndInfoSource( + sourceDevice.getDeviceCode(), deviceSyncConfig.getInfoSource()); + + // 转换为目标设备 + Device targetDevice = FieldMapper.mapToTargetDevice(sourceDevice, deviceSyncConfig.getInfoSource()); + + if (existingDevice != null) { + // 更新现有设备 + targetDevice.setId(existingDevice.getId()); + deviceRepository.save(targetDevice); + } else { + // 插入新设备 + deviceRepository.save(targetDevice); + } + } + + logger.info("已处理 {}/{} 批次数据", i + 1, batchCount); + } + } +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/util/DataSourceContextHolder.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/util/DataSourceContextHolder.java new file mode 100644 index 00000000..12ba0e22 --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/util/DataSourceContextHolder.java @@ -0,0 +1,33 @@ + +package org.dromara.basetost.util; + +import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder; + +/** + * 数据源切换工具类 + */ +public class DataSourceContextHolder { + + /** + * 切换数据源 + * @param dataSourceName 数据源名称 + */ + public static void setDataSource(String dataSourceName) { + DynamicDataSourceContextHolder.push(dataSourceName); + } + + /** + * 获取当前数据源 + * @return 数据源名称 + */ + public static String getDataSource() { + return DynamicDataSourceContextHolder.peek(); + } + + /** + * 清除数据源 + */ + public static void clearDataSource() { + DynamicDataSourceContextHolder.clear(); + } +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/util/FieldMapper.java b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/util/FieldMapper.java new file mode 100644 index 00000000..15920be4 --- /dev/null +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/java/org/dromara/basetost/util/FieldMapper.java @@ -0,0 +1,66 @@ + +package org.dromara.basetost.util; + +import cn.hutool.core.bean.BeanUtil; +import org.dromara.basetost.entity.Device; +import org.dromara.basetost.entity.SourceDevice; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 字段映射工具类 + * 用于处理源表和目标表之间的字段映射 + */ +public class FieldMapper { + + private static final Logger logger = LoggerFactory.getLogger(FieldMapper.class); + + /** + * 将源设备转换为目标设备 + * @param sourceDevice 源设备 + * @param infoSource 信息来源 + * @return 目标设备 + */ + public static Device mapToTargetDevice(SourceDevice sourceDevice, String infoSource) { + Device targetDevice = new Device(); + + try { + // 复制相同名称的字段 + BeanUtil.copyProperties(sourceDevice, targetDevice); + + // 设置info_source + targetDevice.setInfoSource(infoSource); + + // 处理字段名差异 + if (sourceDevice.getPoliceNo() != null) { + targetDevice.setPoliceNo(sourceDevice.getPoliceNo()); + } + if (sourceDevice.getPoliceName() != null) { + targetDevice.setPoliceName(sourceDevice.getPoliceName()); + } + if (sourceDevice.getPhoneNum() != null) { + targetDevice.setPhoneNum(sourceDevice.getPhoneNum()); + } + if (sourceDevice.getCarNum() != null) { + targetDevice.setCarNum(sourceDevice.getCarNum()); + } + + } catch (Exception e) { + logger.error("字段映射失败: {}", e.getMessage(), e); + } + + return targetDevice; + } + + /** + * 将源设备列表转换为目标设备列表 + * @param sourceDevices 源设备列表 + * @param infoSource 信息来源 + * @return 目标设备列表 + */ + public static java.util.List mapToTargetDevices(java.util.List sourceDevices, String infoSource) { + return sourceDevices.stream() + .map(sourceDevice -> mapToTargetDevice(sourceDevice, infoSource)) + .collect(java.util.stream.Collectors.toList()); + } +} diff --git a/stwzhj-modules/stwzhj-baseToSt/src/main/resources/application.yml b/stwzhj-modules/stwzhj-baseToSt/src/main/resources/application.yml index 6f7b9f71..f4332b22 100644 --- a/stwzhj-modules/stwzhj-baseToSt/src/main/resources/application.yml +++ b/stwzhj-modules/stwzhj-baseToSt/src/main/resources/application.yml @@ -11,35 +11,44 @@ spring: profiles: # 环境配置 active: dev - ---- # nacos 配置 -spring: datasource: - url: jdbc:mysql://localhost:3306/wzhj_hs?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false - username: root - password: root - driver-class-name: com.mysql.jdbc.Driver - type: com.zaxxer.hikari.HikariDataSource + dynamic: + primary: target #设置默认数据源为目标数据库 + strict: false #严格匹配数据源 + datasource: + # 源数据库(MySQL) - 每个地市配置不同的源数据库 + source: + url: jdbc:mysql://53.248.2.141:3306/wzhj-bz?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true + username: root + password: Ycgis!2509 + driver-class-name: com.mysql.cj.jdbc.Driver + # 目标数据库(PostgreSQL) - 所有地市使用相同的目标数据库 + target: + url: jdbc:postgresql://53.16.17.15:5432/wzhj?useUnicode=true&characterEncoding=utf8&useSSL=true&autoReconnect=true&reWriteBatchedInserts=true&stringtype=unspecified + username: pgsql + password: ycgis + driver-class-name: org.postgresql.Driver jpa: show-sql: true hibernate: naming: physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl - cloud: - nacos: - # nacos 服务地址 - server-addr: 127.0.0.1:8848 - username: nacos - password: nacos - discovery: - # 注册组 - group: DEFAULT_GROUP - namespace: ${spring.profiles.active} - config: - # 配置组 - group: DEFAULT_GROUP - namespace: ${spring.profiles.active} - config: - import: - - optional:nacos:application-common.yml + properties: + hibernate: + dialect: org.hibernate.dialect.PostgreSQLDialect + +# 设备数据同步配置 +device-sync: + # 信息来源标识 - 每个地市配置不同的info_source + info-source: "3416" + # 源表结构类型 - v1或v2 + # v1: 第一种表结构,字段使用下划线命名(如police_no) + # v2: 第二种表结构,部分字段使用驼峰命名(如policeNo) + source-table-type: v1 + # 设备类型映射字典类型 + dict-type: device_type_tost + # 批量插入大小 + batch-size: 50 + # 定时任务cron表达式 + cron: 0 0/10 * * * ?