From 13f46756c89f66353fb8c4610cd4cba57dbc624b Mon Sep 17 00:00:00 2001 From: luyya Date: Thu, 6 Nov 2025 11:53:47 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9C=81=E5=8E=85=E4=BD=8D=E7=BD=AE=E6=B1=87?= =?UTF-8?q?=E8=81=9A=E6=B7=BB=E5=8A=A0=E6=8A=BD=E5=8F=96=E8=AD=A6=E6=83=85?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=88=E6=9A=82=E7=94=A8=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stwzhj-modules/pom.xml | 1 + stwzhj-modules/stwzhj-extract/pom.xml | 91 +++++ .../dromara/extract/ExtractApplication.java | 24 ++ .../dromara/extract/config/AsyncConfig.java | 27 ++ .../extract/controller/AddressController.java | 72 ++++ .../dromara/extract/domain/AoiAddress.java | 60 +++ .../java/org/dromara/extract/domain/Jqd.java | 35 ++ .../dromara/extract/domain/PoiAddress.java | 75 ++++ .../dromara/extract/domain/dto/Inputs.java | 8 + .../extract/domain/dto/ParsedAnswer.java | 12 + .../extract/domain/dto/QueryRequest.java | 23 ++ .../extract/mapper/AoiAddressMapper.java | 6 + .../org/dromara/extract/mapper/JqdMapper.java | 16 + .../extract/mapper/PoiAddressMapper.java | 8 + .../extract/service/IAoiAddressService.java | 4 + .../dromara/extract/service/IJqdService.java | 4 + .../extract/service/IPoiAddressService.java | 4 + .../service/impl/AoiAddressServiceImpl.java | 13 + .../service/impl/DataMigrationService.java | 363 ++++++++++++++++++ .../extract/service/impl/JqdServiceImpl.java | 13 + .../service/impl/PoiAddressServiceImpl.java | 13 + .../src/main/resources/application.yml | 34 ++ .../src/main/resources/logback-plus.xml | 49 +++ .../mapper/extract/AoiAddressMapper.xml | 7 + .../resources/mapper/extract/JqdMapper.xml | 16 + .../mapper/extract/PoiAddressMapper.xml | 7 + .../src/main/resources/mapper/package-info.md | 3 + 27 files changed, 988 insertions(+) create mode 100644 stwzhj-modules/stwzhj-extract/pom.xml create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/ExtractApplication.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/config/AsyncConfig.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/AddressController.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/AoiAddress.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/Jqd.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/PoiAddress.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/Inputs.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/ParsedAnswer.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/QueryRequest.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/AoiAddressMapper.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/JqdMapper.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/PoiAddressMapper.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IAoiAddressService.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IJqdService.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IPoiAddressService.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/AoiAddressServiceImpl.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/DataMigrationService.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/JqdServiceImpl.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/PoiAddressServiceImpl.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/resources/application.yml create mode 100644 stwzhj-modules/stwzhj-extract/src/main/resources/logback-plus.xml create mode 100644 stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/AoiAddressMapper.xml create mode 100644 stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/JqdMapper.xml create mode 100644 stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/PoiAddressMapper.xml create mode 100644 stwzhj-modules/stwzhj-extract/src/main/resources/mapper/package-info.md diff --git a/stwzhj-modules/pom.xml b/stwzhj-modules/pom.xml index 863f3a80..1523ef4c 100644 --- a/stwzhj-modules/pom.xml +++ b/stwzhj-modules/pom.xml @@ -17,6 +17,7 @@ stwzhj-data2es stwzhj-baseToSt stwzhj-data2StKafka + stwzhj-extract stwzhj-modules diff --git a/stwzhj-modules/stwzhj-extract/pom.xml b/stwzhj-modules/stwzhj-extract/pom.xml new file mode 100644 index 00000000..c749ffc4 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/pom.xml @@ -0,0 +1,91 @@ + + + + stwzhj-modules + org.dromara + 2.2.2 + + 4.0.0 + + stwzhj-extract + + 抽取警情拆分 + + + + 17 + 17 + UTF-8 + + + + + org.dromara + stwzhj-common-nacos + + + + org.dromara + stwzhj-common-sentinel + + + + + org.dromara + stwzhj-common-log + + + + org.dromara + stwzhj-common-dict + + + + org.dromara + stwzhj-common-doc + + + + org.dromara + stwzhj-common-web + + + + org.dromara + stwzhj-common-mybatis + + + + org.dromara + stwzhj-common-dubbo + + + + com.github.jeffreyning + mybatisplus-plus + 1.5.1-RELEASE + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + + diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/ExtractApplication.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/ExtractApplication.java new file mode 100644 index 00000000..5f628177 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/ExtractApplication.java @@ -0,0 +1,24 @@ +package org.dromara.extract; + +import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * 系统模块 + * + * @author ruoyi + */ +@EnableDubbo +@EnableScheduling +@SpringBootApplication +public class ExtractApplication { + public static void main(String[] args) { + SpringApplication application = new SpringApplication(ExtractApplication.class); + application.setApplicationStartup(new BufferingApplicationStartup(2048)); + application.run(args); + System.out.println("(♥◠‿◠)ノ゙ 抽取模块启动成功 ლ(´ڡ`ლ)゙ "); + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/config/AsyncConfig.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/config/AsyncConfig.java new file mode 100644 index 00000000..ada3cd7c --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/config/AsyncConfig.java @@ -0,0 +1,27 @@ +package org.dromara.extract.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +@EnableAsync +public class AsyncConfig { + + + @Bean("migrationExecutor") + public Executor migrationExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(2); + executor.setMaxPoolSize(4); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("migration-task-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 防止丢任务 + executor.initialize(); + return executor; + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/AddressController.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/AddressController.java new file mode 100644 index 00000000..c970bafd --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/AddressController.java @@ -0,0 +1,72 @@ +package org.dromara.extract.controller; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.web.core.BaseController; +import org.dromara.extract.service.impl.DataMigrationService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; + +@RequiredArgsConstructor +@RestController +@Slf4j +public class AddressController extends BaseController { + + + @Autowired + private DataMigrationService dataMigrationService; + + // 用于监控任务状态 + private boolean isRunning = false; + + @GetMapping("/start") + public ResponseEntity startMigration() { + if (isRunning) { + return ResponseEntity.badRequest() + .body("迁移任务已在运行中,请勿重复提交!"); + } + + // 异步启动任务 + isRunning = true; + new Thread(() -> { + try { + log.info("🚀 开始异步执行数据迁移任务..."); + dataMigrationService.startMigration(); + } catch (Exception e) { + log.error("❌ 迁移任务执行异常", e); + } finally { + isRunning = false; + } + }).start(); + + return ResponseEntity.ok("✅ 迁移任务已提交,正在后台执行..."); + } + + @GetMapping("/status") + public ResponseEntity> getStatus() { + Map status = new HashMap<>(); + status.put("isRunning", isRunning); + status.put("message", isRunning ? "任务运行中" : "空闲"); + return ResponseEntity.ok(status); + } + + @GetMapping("/stop") + public ResponseEntity stopMigration() { + // 注意:当前线程池没有实现优雅关闭 + // 这里只是一个提示,实际需要在 Service 中支持中断 + if (!isRunning) { + return ResponseEntity.ok("任务未运行,无需停止"); + } + + // ⚠️ 提示用户:无法强制停止,建议手动关闭应用或增强 Service + return ResponseEntity.ok("⚠️ 停止功能暂不支持。请重启应用或增强线程控制逻辑。"); + } + + + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/AoiAddress.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/AoiAddress.java new file mode 100644 index 00000000..5f4382a9 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/AoiAddress.java @@ -0,0 +1,60 @@ +package org.dromara.extract.domain; + + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; + +@Data +@TableName("aoi_address") +public class AoiAddress implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + //ID + private String id; + + //地址名称 + private String name; + + //详细地址 + private String address; + + //省 + private String provinceName; + + //市 + private String cityName; + + //区县 + private String districtName; + + //开发区 + private String devzone; + + //乡镇街道 + private String townName; + + //社区村 + private String communityName; + + // 纬度 + private String latitude; + + //经度 + private String longitude; + + + private String poiTypeCode; + + //地址类型 + private String poiTypeName; + + + //数据来源 + private String source; + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/Jqd.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/Jqd.java new file mode 100644 index 00000000..92749b9e --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/Jqd.java @@ -0,0 +1,35 @@ +package org.dromara.extract.domain; + +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; + +@Data +public class Jqd implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + private String sfdz; + + private String xzqh; + + private String yxbz; + + private String sfdzfldm; + + private String sfdzflmc; + + private String xzb; + + private String yzb; + + private String jzxqdwdm; + + private String jzxqdwmc; + + private String scbjsj; + + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/PoiAddress.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/PoiAddress.java new file mode 100644 index 00000000..c70c81f8 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/PoiAddress.java @@ -0,0 +1,75 @@ +package org.dromara.extract.domain; + + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.io.Serial; +import java.io.Serializable; + +@Data +@TableName("poi_address") +public class PoiAddress implements Serializable { + @Serial + private static final long serialVersionUID = 1L; + + //ID + private String id; + + //地址名称 + private String name; + + //详细地址 + private String address; + + //省 + private String provinceName; + + //市 + private String cityName; + + //区县 + private String districtName; + + //开发区 + private String devzone; + + //乡镇街道 + private String townName; + + //社区村 + private String communityName; + + // 纬度 + private String latitude; + + //经度 + private String longitude; + + + private String fullName; + + + private String poiTypeCode; + + //地址类型 + private String poiTypeName; + + //兴趣面 + private String aoi; + + private Integer status; + + //数据来源 + private String source; + + //类型 1 poi 2 aoi + private String type; + + private String gadwdm; + + private String gadwmc; + + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/Inputs.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/Inputs.java new file mode 100644 index 00000000..628cf42a --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/Inputs.java @@ -0,0 +1,8 @@ +package org.dromara.extract.domain.dto; + +public class Inputs { + private String query; + public Inputs(String query) { this.query = query; } + public String getQuery() { return query; } + public void setQuery(String query) { this.query = query; } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/ParsedAnswer.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/ParsedAnswer.java new file mode 100644 index 00000000..ca022ba9 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/ParsedAnswer.java @@ -0,0 +1,12 @@ +package org.dromara.extract.domain.dto; + +import lombok.Data; + +// 内部 JSON 结构 +@Data +public class ParsedAnswer { + private String aoi; + private String poi; + private String address; + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/QueryRequest.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/QueryRequest.java new file mode 100644 index 00000000..027e414a --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/dto/QueryRequest.java @@ -0,0 +1,23 @@ +package org.dromara.extract.domain.dto; + +// 请求体 +public class QueryRequest { + private Inputs inputs; + private String response_mode = "blocking"; + private String user = ""; + + // 构造函数 + public QueryRequest(String query) { + this.inputs = new Inputs(query); + } + + // Getters and Setters + public Inputs getInputs() { return inputs; } + public void setInputs(Inputs inputs) { this.inputs = inputs; } + public String getResponse_mode() { return response_mode; } + public void setResponse_mode(String response_mode) { this.response_mode = response_mode; } + public String getUser() { return user; } + public void setUser(String user) { this.user = user; } +} + + diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/AoiAddressMapper.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/AoiAddressMapper.java new file mode 100644 index 00000000..acf8953a --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/AoiAddressMapper.java @@ -0,0 +1,6 @@ +package org.dromara.extract.mapper; + +import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; + +public interface AoiAddressMapper extends BaseMapperPlus { +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/JqdMapper.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/JqdMapper.java new file mode 100644 index 00000000..02e22eaf --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/JqdMapper.java @@ -0,0 +1,16 @@ +package org.dromara.extract.mapper; + +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.mybatisplus.core.conditions.Wrapper; +import com.baomidou.mybatisplus.core.toolkit.Constants; +import org.apache.ibatis.annotations.Param; +import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; +import org.dromara.extract.domain.Jqd; + +import java.util.List; + +@DS("slave") +public interface JqdMapper extends BaseMapperPlus { + + List selectJq(@Param(Constants.WRAPPER) Wrapper queryWrapper); +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/PoiAddressMapper.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/PoiAddressMapper.java new file mode 100644 index 00000000..d995e526 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/PoiAddressMapper.java @@ -0,0 +1,8 @@ +package org.dromara.extract.mapper; + +import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; +import org.dromara.extract.domain.PoiAddress; + +public interface PoiAddressMapper extends BaseMapperPlus { + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IAoiAddressService.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IAoiAddressService.java new file mode 100644 index 00000000..ab3c262a --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IAoiAddressService.java @@ -0,0 +1,4 @@ +package org.dromara.extract.service; + +public interface IAoiAddressService { +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IJqdService.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IJqdService.java new file mode 100644 index 00000000..2a251566 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IJqdService.java @@ -0,0 +1,4 @@ +package org.dromara.extract.service; + +public interface IJqdService { +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IPoiAddressService.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IPoiAddressService.java new file mode 100644 index 00000000..131a0b12 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/IPoiAddressService.java @@ -0,0 +1,4 @@ +package org.dromara.extract.service; + +public interface IPoiAddressService { +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/AoiAddressServiceImpl.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/AoiAddressServiceImpl.java new file mode 100644 index 00000000..b52b3e01 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/AoiAddressServiceImpl.java @@ -0,0 +1,13 @@ +package org.dromara.extract.service.impl; + + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.extract.service.IAoiAddressService; +import org.springframework.stereotype.Service; + +@Slf4j +@RequiredArgsConstructor +@Service +public class AoiAddressServiceImpl implements IAoiAddressService { +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/DataMigrationService.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/DataMigrationService.java new file mode 100644 index 00000000..faad8a3b --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/DataMigrationService.java @@ -0,0 +1,363 @@ +package org.dromara.extract.service.impl; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.mybatis.core.page.PageQuery; +import org.dromara.extract.domain.Jqd; +import org.dromara.extract.domain.PoiAddress; +import org.dromara.extract.mapper.AoiAddressMapper; +import org.dromara.extract.mapper.JqdMapper; +import org.dromara.extract.mapper.PoiAddressMapper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Service +@Slf4j +public class DataMigrationService { + + @Value("${api.thirdparty.url}") + private String apiUrl; + + @Value("${api.thirdparty.token}") + private String bearerToken; + + @Autowired + private JqdMapper sourceMapper; + + @Autowired + private PoiAddressMapper poiAddressMapper; + + @Autowired + private AoiAddressMapper aoiAddressMapper; + + // 内存去重集合(所有已成功入库的 name) + private final Set processedNames = ConcurrentHashMap.newKeySet(); + + // 固定线程池(建议:CPU 核数 * 2 ~ 4) + private final ExecutorService executorService = Executors.newFixedThreadPool(4); + + // 批量插入大小 + private static final int BATCH_SIZE = 1000; + + // 分页大小 + private static final int PAGE_SIZE = 500; + + // 启动迁移 + public void startMigration() { + log.info("开始迁移数据..."); + + // 1. 先加载目标库中已存在的 name(只需一次) + loadExistingNamesToMemory(); + + // 2. 分页读取源数据 + + LambdaQueryWrapper jqlqw = new LambdaQueryWrapper<>(); + jqlqw.likeRight(Jqd::getXzqh,"3401") + .ge(Jqd::getScbjsj,"2025-07-01") + .isNotNull(Jqd::getSfdz) + .ne(Jqd::getSfdz,"地址不详") + .ne(Jqd::getSfdz,"地址未知") + .ne(Jqd::getXzb,0) + .eq(Jqd::getYxbz,"1"); + long totalCount = sourceMapper.selectCount(jqlqw); + log.info("查询数据大小={}",totalCount); + long totalPages = (totalCount + PAGE_SIZE - 1) / PAGE_SIZE; + PageQuery pageQuery = new PageQuery(); + jqlqw.orderByDesc(Jqd::getScbjsj); + pageQuery.setPageSize(PAGE_SIZE); + for (int pageNum = 0; pageNum < totalPages; pageNum++) { + long offset = pageNum * PAGE_SIZE; + pageQuery.setPageNum(pageNum); + + + log.info("正在处理第 {}/{} 页, offset={}", pageNum + 1, totalPages, offset); + + List pageData; + try { + IPage jqPage = sourceMapper.selectPage(pageQuery.build(), jqlqw); + pageData = jqPage.getRecords(); + } catch (Exception e) { + log.error("读取第 {} 页失败", pageNum, e); + continue; + } + + if (pageData.isEmpty()) continue; + + // 并发处理当前页 + processPageAsync(pageData); + } + + log.info("✅ 数据迁移任务全部提交完成"); + } + + // 加载目标库中已存在的 name 到内存(避免重复插入) + private void loadExistingNamesToMemory() { + Long count = poiAddressMapper.selectCount(null); + if (count == 0) { + log.info("目标表为空,首次运行,跳过加载"); + return; + } + + log.info("正在加载已有 name 到内存..."); + + // ✅ 改用 selectList + LambdaQueryWrapper + List list = poiAddressMapper.selectList( + new LambdaQueryWrapper() + .select(PoiAddress::getName) + ); + + List names = list.stream() + .map(PoiAddress::getName) + .filter(Objects::nonNull) + .toList(); + + processedNames.addAll(names); + log.info("✅ 已加载 {} 个 name", names.size()); + } + + // 异步处理一页数据 + private void processPageAsync(List pageData) { + log.info("✅ 提交异步任务,页数据量: {}", pageData.size()); + CompletableFuture.runAsync(() -> { + try { + List batch = new ArrayList<>(BATCH_SIZE); + + for (Jqd src : pageData) { + try { + String query = src.getSfdz(); + if (StrUtil.isBlank(query)) continue; + log.debug("📡 正在调用第三方 API 查询: {}", query); // ✅ 加日志 + // 调用接口获取解析结果 + ParsedResult result = callThirdPartyApi(query); + if (result == null || StrUtil.isBlank(result.getName())) { + continue; + } + + // 内存去重:跳过已处理的 name + if (processedNames.contains(result.getName())) { + log.debug("跳过已存在 name: {}", result.getName()); + continue; + } + + // 如果POI和AOI都为空 则为垃圾地址 不进行入库 + if ("".equals(result.getAoi()) && !"".equals(result.getPoi())) { + // 如果AOI为空 构造POI目标对象 则不会再入一条 AOI地址 即poi、aoi都相同的地址 + PoiAddress poi = new PoiAddress(); + poi.setName(result.getPoi()); + poi.setAoi(result.getAoi()); + poi.setAddress(result.getAddress()); + poi.setLatitude(src.getYzb()); + poi.setLongitude(src.getXzb()); + poi.setGadwdm(src.getJzxqdwdm()); + poi.setGadwmc(src.getJzxqdwmc()); + poi.setPoiTypeCode(src.getSfdzfldm()); + poi.setPoiTypeName(src.getSfdzflmc()); + poi.setType("1"); + poi.setSource("警情"); + + batch.add(poi); + } else if (!"".equals(result.getAoi()) && "".equals(result.getPoi())) { + // 如果AOI不为空 poi为空 则入一条 AOI地址 即poi、aoi都相同的地址 + PoiAddress poi = new PoiAddress(); + poi.setName(result.getAoi()); + poi.setAoi(result.getAoi()); + poi.setAddress(result.getAddress()); + poi.setLatitude(src.getYzb()); + poi.setLongitude(src.getXzb()); + poi.setGadwdm(src.getJzxqdwdm()); + poi.setGadwmc(src.getJzxqdwmc()); + poi.setPoiTypeCode(src.getSfdzfldm()); + poi.setPoiTypeName(src.getSfdzflmc()); + poi.setType("2"); + poi.setSource("警情"); + + batch.add(poi); + } else if (!"".equals(result.getAoi()) && !"".equals(result.getPoi())) { + // 如果AOI、POI都不为空 则会入两条数据 一条AOI数据address中把poi替换为空 然后入一条AOI、POI相同的数据、一条POI数据 + PoiAddress poi = new PoiAddress(); + poi.setName(result.getPoi()); + poi.setAoi(result.getAoi()); + poi.setAddress(result.getAddress()); + poi.setLatitude(src.getYzb()); + poi.setLongitude(src.getXzb()); + poi.setGadwdm(src.getJzxqdwdm()); + poi.setGadwmc(src.getJzxqdwmc()); + poi.setPoiTypeCode(src.getSfdzfldm()); + poi.setPoiTypeName(src.getSfdzflmc()); + poi.setType("1"); + poi.setSource("警情"); + + batch.add(poi); + + PoiAddress aoi = new PoiAddress(); + aoi.setName(result.getAoi()); + aoi.setAoi(result.getAoi()); + aoi.setAddress(result.getAddress().replaceAll(result.getPoi(),"")); + aoi.setLatitude(src.getYzb()); + aoi.setLongitude(src.getXzb()); + aoi.setGadwdm(src.getJzxqdwdm()); + aoi.setGadwmc(src.getJzxqdwmc()); + aoi.setPoiTypeCode(src.getSfdzfldm()); + aoi.setPoiTypeName(src.getSfdzflmc()); + aoi.setType("2"); + aoi.setSource("警情"); + + batch.add(aoi); + } + + // 达到批次大小,尝试入库 + if (batch.size() >= BATCH_SIZE) { + flushBatch(batch); + } + + } catch (Exception e) { + log.error("处理地址失败: {}", src.getSfdz(), e); + } + } + + // 处理最后一批 + if (!batch.isEmpty()) { + flushBatch(batch); + } + + }catch (Throwable t) { + // 🚨 捕获所有错误(包括 Error) + log.error("❌ processPageAsync 任务内部发生致命错误", t); + } + + }, executorService).exceptionally(throwable -> { + // 🚨 捕获 CompletableFuture 本身的异常 + log.error("❌ CompletableFuture 执行失败", throwable); + return null; + }); + } + + // 调用第三方接口 + private ParsedResult callThirdPartyApi(String query) { + try { + Map headers = Map.of( + "Authorization", "Bearer " + bearerToken, + "Content-Type", "application/json" + ); + + JSONObject requestBody = JSONUtil.createObj() + .set("response_mode", "blocking") + .set("user", "qiaobin@cmcc.com") + .set("inputs", JSONUtil.createObj().set("query", query)); + + String response = HttpUtil.createPost(apiUrl) + .addHeaders(headers) + .body(requestBody.toString()) + .timeout(10000) + .execute() + .body(); + + JSONObject json = JSONUtil.parseObj(response); + String answer = json.getStr("answer"); + + if (StrUtil.isBlank(answer)) return null; + + String jsonStr = extractJsonFromMarkdown(answer); + if (jsonStr == null) return null; + + JSONObject data = JSONUtil.parseObj(jsonStr); + String aoi = data.getStr("aoi"); + String poi = data.getStr("poi"); + String addr = data.getStr("address"); + + String name = StrUtil.isNotBlank(aoi) ? aoi : + (StrUtil.isNotBlank(addr) ? addr : null); + + if (name == null) return null; + + return new ParsedResult(name, aoi, poi, addr); + + } catch (Exception e) { + log.warn("调用接口失败: {}", query, e); + return null; // 可加入重试机制 + } + } + + // 提取 ```json{...}``` 中内容 + private String extractJsonFromMarkdown(String answer) { + Pattern pattern = Pattern.compile("```json\\s*([\\s\\S]*?)\\s*```", Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(answer); + if (matcher.find()) { + return matcher.group(1).trim(); + } + if (JSONUtil.isJson(answer.trim())) { + return answer.trim(); + } + return null; + } + + // 批量入库 + 更新内存集合 + private void flushBatch(List batch) { + if (batch.isEmpty()) return; + + try { + // 使用 MyBatis-Plus 批量插入(注意:不是 update) + boolean saved = poiAddressMapper.insertBatch(batch); + if (saved) { + // 成功后更新内存去重集合 + batch.forEach(loc -> processedNames.add(loc.getName())); + log.info("✅ 批量插入 {} 条数据", batch.size()); + } + } catch (DuplicateKeyException e) { + // 如果有重复,逐条插入(可选) + for (PoiAddress loc : batch) { + if (processedNames.contains(loc.getName())) continue; + try { + poiAddressMapper.insert(loc); + processedNames.add(loc.getName()); + } catch (DuplicateKeyException ex) { + // 忽略 + } catch (Exception ex) { + log.error("单条插入失败: {}", loc.getName(), ex); + } + } + } catch (Exception e) { + log.error("批量插入失败,降级为单条处理", e); + // 降级处理... + } + + batch.clear(); + } + + @PreDestroy + public void shutdown() { + executorService.shutdown(); + } + + // 辅助类 + private static class ParsedResult { + private final String name, aoi, poi, address; + public ParsedResult(String name, String aoi, String poi, String address) { + this.name = name; + this.aoi = aoi; + this.poi = poi; + this.address = address; + } + public String getName() { return name; } + public String getAoi() { return aoi; } + public String getPoi() { return poi; } + public String getAddress() { return address; } + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/JqdServiceImpl.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/JqdServiceImpl.java new file mode 100644 index 00000000..8f4f2bf9 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/JqdServiceImpl.java @@ -0,0 +1,13 @@ +package org.dromara.extract.service.impl; + + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.extract.service.IJqdService; +import org.springframework.stereotype.Service; + +@Slf4j +@RequiredArgsConstructor +@Service +public class JqdServiceImpl implements IJqdService { +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/PoiAddressServiceImpl.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/PoiAddressServiceImpl.java new file mode 100644 index 00000000..f72494f5 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/PoiAddressServiceImpl.java @@ -0,0 +1,13 @@ +package org.dromara.extract.service.impl; + + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.extract.service.IPoiAddressService; +import org.springframework.stereotype.Service; + +@Slf4j +@RequiredArgsConstructor +@Service +public class PoiAddressServiceImpl implements IPoiAddressService { +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/resources/application.yml b/stwzhj-modules/stwzhj-extract/src/main/resources/application.yml new file mode 100644 index 00000000..fa2e68ea --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/resources/application.yml @@ -0,0 +1,34 @@ +# Tomcat +server: + port: 9205 + +# Spring +spring: + application: + # 应用名称 + name: stwzhj-extract + profiles: + # 环境配置 + active: @profiles.active@ + +--- # nacos 配置 +spring: + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + username: @nacos.username@ + password: @nacos.password@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml + - optional:nacos:datasource.yml + - optional:nacos:${spring.application.name}.yml diff --git a/stwzhj-modules/stwzhj-extract/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-extract/src/main/resources/logback-plus.xml new file mode 100644 index 00000000..35182fea --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/resources/logback-plus.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + ${log.path}/info.${log.file}.log + + INFO + ACCEPT + DENY + + + ${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + ${log.path}/error.${log.file}.log + + ERROR + + + ${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz + ${MAX_FILE_SIZE} + ${MAX_HISTORY} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + diff --git a/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/AoiAddressMapper.xml b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/AoiAddressMapper.xml new file mode 100644 index 00000000..18478e86 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/AoiAddressMapper.xml @@ -0,0 +1,7 @@ + + + + + diff --git a/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/JqdMapper.xml b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/JqdMapper.xml new file mode 100644 index 00000000..cc1a03ef --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/JqdMapper.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + diff --git a/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/PoiAddressMapper.xml b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/PoiAddressMapper.xml new file mode 100644 index 00000000..c75c5460 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/PoiAddressMapper.xml @@ -0,0 +1,7 @@ + + + + + diff --git a/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/package-info.md b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/package-info.md new file mode 100644 index 00000000..c938b1e5 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/package-info.md @@ -0,0 +1,3 @@ +java包使用 `.` 分割 resource 目录使用 `/` 分割 +
+此文件目的 防止文件夹粘连找不到 `xml` 文件 \ No newline at end of file