省厅位置汇聚添加抽取警情功能(暂用)
parent
e5a166d4a3
commit
13f46756c8
|
|
@ -17,6 +17,7 @@
|
||||||
<module>stwzhj-data2es</module>
|
<module>stwzhj-data2es</module>
|
||||||
<module>stwzhj-baseToSt</module>
|
<module>stwzhj-baseToSt</module>
|
||||||
<module>stwzhj-data2StKafka</module>
|
<module>stwzhj-data2StKafka</module>
|
||||||
|
<module>stwzhj-extract</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
<artifactId>stwzhj-modules</artifactId>
|
<artifactId>stwzhj-modules</artifactId>
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>stwzhj-modules</artifactId>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<version>2.2.2</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>stwzhj-extract</artifactId>
|
||||||
|
<description>
|
||||||
|
抽取警情拆分
|
||||||
|
</description>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>17</maven.compiler.source>
|
||||||
|
<maven.compiler.target>17</maven.compiler.target>
|
||||||
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-nacos</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-sentinel</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- RuoYi Common Log -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-log</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-dict</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-doc</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-web</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-mybatis</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.dromara</groupId>
|
||||||
|
<artifactId>stwzhj-common-dubbo</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.jeffreyning</groupId>
|
||||||
|
<artifactId>mybatisplus-plus</artifactId>
|
||||||
|
<version>1.5.1-RELEASE</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<finalName>${project.artifactId}</finalName>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
|
<version>${spring-boot.version}</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<goals>
|
||||||
|
<goal>repackage</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
</project>
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
package org.dromara.extract;
|
||||||
|
|
||||||
|
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
|
||||||
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 系统模块
|
||||||
|
*
|
||||||
|
* @author ruoyi
|
||||||
|
*/
|
||||||
|
@EnableDubbo
|
||||||
|
@EnableScheduling
|
||||||
|
@SpringBootApplication
|
||||||
|
public class ExtractApplication {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication application = new SpringApplication(ExtractApplication.class);
|
||||||
|
application.setApplicationStartup(new BufferingApplicationStartup(2048));
|
||||||
|
application.run(args);
|
||||||
|
System.out.println("(♥◠‿◠)ノ゙ 抽取模块启动成功 ლ(´ڡ`ლ)゙ ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
package org.dromara.extract.config;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.annotation.EnableAsync;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableAsync
|
||||||
|
public class AsyncConfig {
|
||||||
|
|
||||||
|
|
||||||
|
@Bean("migrationExecutor")
|
||||||
|
public Executor migrationExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
executor.setCorePoolSize(2);
|
||||||
|
executor.setMaxPoolSize(4);
|
||||||
|
executor.setQueueCapacity(100);
|
||||||
|
executor.setThreadNamePrefix("migration-task-");
|
||||||
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 防止丢任务
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,72 @@
|
||||||
|
package org.dromara.extract.controller;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.dromara.common.web.core.BaseController;
|
||||||
|
import org.dromara.extract.service.impl.DataMigrationService;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@RestController
|
||||||
|
@Slf4j
|
||||||
|
public class AddressController extends BaseController {
|
||||||
|
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DataMigrationService dataMigrationService;
|
||||||
|
|
||||||
|
// 用于监控任务状态
|
||||||
|
private boolean isRunning = false;
|
||||||
|
|
||||||
|
@GetMapping("/start")
|
||||||
|
public ResponseEntity<String> startMigration() {
|
||||||
|
if (isRunning) {
|
||||||
|
return ResponseEntity.badRequest()
|
||||||
|
.body("迁移任务已在运行中,请勿重复提交!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 异步启动任务
|
||||||
|
isRunning = true;
|
||||||
|
new Thread(() -> {
|
||||||
|
try {
|
||||||
|
log.info("🚀 开始异步执行数据迁移任务...");
|
||||||
|
dataMigrationService.startMigration();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("❌ 迁移任务执行异常", e);
|
||||||
|
} finally {
|
||||||
|
isRunning = false;
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
return ResponseEntity.ok("✅ 迁移任务已提交,正在后台执行...");
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/status")
|
||||||
|
public ResponseEntity<Map<String, Object>> getStatus() {
|
||||||
|
Map<String, Object> status = new HashMap<>();
|
||||||
|
status.put("isRunning", isRunning);
|
||||||
|
status.put("message", isRunning ? "任务运行中" : "空闲");
|
||||||
|
return ResponseEntity.ok(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("/stop")
|
||||||
|
public ResponseEntity<String> stopMigration() {
|
||||||
|
// 注意:当前线程池没有实现优雅关闭
|
||||||
|
// 这里只是一个提示,实际需要在 Service 中支持中断
|
||||||
|
if (!isRunning) {
|
||||||
|
return ResponseEntity.ok("任务未运行,无需停止");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ⚠️ 提示用户:无法强制停止,建议手动关闭应用或增强 Service
|
||||||
|
return ResponseEntity.ok("⚠️ 停止功能暂不支持。请重启应用或增强线程控制逻辑。");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,60 @@
|
||||||
|
package org.dromara.extract.domain;
|
||||||
|
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serial;
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@TableName("aoi_address")
|
||||||
|
public class AoiAddress implements Serializable {
|
||||||
|
|
||||||
|
@Serial
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
//ID
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
//地址名称
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
//详细地址
|
||||||
|
private String address;
|
||||||
|
|
||||||
|
//省
|
||||||
|
private String provinceName;
|
||||||
|
|
||||||
|
//市
|
||||||
|
private String cityName;
|
||||||
|
|
||||||
|
//区县
|
||||||
|
private String districtName;
|
||||||
|
|
||||||
|
//开发区
|
||||||
|
private String devzone;
|
||||||
|
|
||||||
|
//乡镇街道
|
||||||
|
private String townName;
|
||||||
|
|
||||||
|
//社区村
|
||||||
|
private String communityName;
|
||||||
|
|
||||||
|
// 纬度
|
||||||
|
private String latitude;
|
||||||
|
|
||||||
|
//经度
|
||||||
|
private String longitude;
|
||||||
|
|
||||||
|
|
||||||
|
private String poiTypeCode;
|
||||||
|
|
||||||
|
//地址类型
|
||||||
|
private String poiTypeName;
|
||||||
|
|
||||||
|
|
||||||
|
//数据来源
|
||||||
|
private String source;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
package org.dromara.extract.domain;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serial;
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class Jqd implements Serializable {
|
||||||
|
|
||||||
|
@Serial
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
private String sfdz;
|
||||||
|
|
||||||
|
private String xzqh;
|
||||||
|
|
||||||
|
private String yxbz;
|
||||||
|
|
||||||
|
private String sfdzfldm;
|
||||||
|
|
||||||
|
private String sfdzflmc;
|
||||||
|
|
||||||
|
private String xzb;
|
||||||
|
|
||||||
|
private String yzb;
|
||||||
|
|
||||||
|
private String jzxqdwdm;
|
||||||
|
|
||||||
|
private String jzxqdwmc;
|
||||||
|
|
||||||
|
private String scbjsj;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,75 @@
|
||||||
|
package org.dromara.extract.domain;
|
||||||
|
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
|
|
||||||
|
import java.io.Serial;
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@TableName("poi_address")
|
||||||
|
public class PoiAddress implements Serializable {
|
||||||
|
@Serial
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
//ID
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
//地址名称
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
//详细地址
|
||||||
|
private String address;
|
||||||
|
|
||||||
|
//省
|
||||||
|
private String provinceName;
|
||||||
|
|
||||||
|
//市
|
||||||
|
private String cityName;
|
||||||
|
|
||||||
|
//区县
|
||||||
|
private String districtName;
|
||||||
|
|
||||||
|
//开发区
|
||||||
|
private String devzone;
|
||||||
|
|
||||||
|
//乡镇街道
|
||||||
|
private String townName;
|
||||||
|
|
||||||
|
//社区村
|
||||||
|
private String communityName;
|
||||||
|
|
||||||
|
// 纬度
|
||||||
|
private String latitude;
|
||||||
|
|
||||||
|
//经度
|
||||||
|
private String longitude;
|
||||||
|
|
||||||
|
|
||||||
|
private String fullName;
|
||||||
|
|
||||||
|
|
||||||
|
private String poiTypeCode;
|
||||||
|
|
||||||
|
//地址类型
|
||||||
|
private String poiTypeName;
|
||||||
|
|
||||||
|
//兴趣面
|
||||||
|
private String aoi;
|
||||||
|
|
||||||
|
private Integer status;
|
||||||
|
|
||||||
|
//数据来源
|
||||||
|
private String source;
|
||||||
|
|
||||||
|
//类型 1 poi 2 aoi
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
private String gadwdm;
|
||||||
|
|
||||||
|
private String gadwmc;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
package org.dromara.extract.domain.dto;
|
||||||
|
|
||||||
|
public class Inputs {
|
||||||
|
private String query;
|
||||||
|
public Inputs(String query) { this.query = query; }
|
||||||
|
public String getQuery() { return query; }
|
||||||
|
public void setQuery(String query) { this.query = query; }
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,12 @@
|
||||||
|
package org.dromara.extract.domain.dto;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
// 内部 JSON 结构
|
||||||
|
@Data
|
||||||
|
public class ParsedAnswer {
|
||||||
|
private String aoi;
|
||||||
|
private String poi;
|
||||||
|
private String address;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,23 @@
|
||||||
|
package org.dromara.extract.domain.dto;
|
||||||
|
|
||||||
|
// 请求体
|
||||||
|
public class QueryRequest {
|
||||||
|
private Inputs inputs;
|
||||||
|
private String response_mode = "blocking";
|
||||||
|
private String user = "";
|
||||||
|
|
||||||
|
// 构造函数
|
||||||
|
public QueryRequest(String query) {
|
||||||
|
this.inputs = new Inputs(query);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Getters and Setters
|
||||||
|
public Inputs getInputs() { return inputs; }
|
||||||
|
public void setInputs(Inputs inputs) { this.inputs = inputs; }
|
||||||
|
public String getResponse_mode() { return response_mode; }
|
||||||
|
public void setResponse_mode(String response_mode) { this.response_mode = response_mode; }
|
||||||
|
public String getUser() { return user; }
|
||||||
|
public void setUser(String user) { this.user = user; }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
package org.dromara.extract.mapper;
|
||||||
|
|
||||||
|
import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
|
||||||
|
|
||||||
|
public interface AoiAddressMapper extends BaseMapperPlus {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,16 @@
|
||||||
|
package org.dromara.extract.mapper;
|
||||||
|
|
||||||
|
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.Wrapper;
|
||||||
|
import com.baomidou.mybatisplus.core.toolkit.Constants;
|
||||||
|
import org.apache.ibatis.annotations.Param;
|
||||||
|
import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
|
||||||
|
import org.dromara.extract.domain.Jqd;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@DS("slave")
|
||||||
|
public interface JqdMapper extends BaseMapperPlus<Jqd,Jqd> {
|
||||||
|
|
||||||
|
List<Jqd> selectJq(@Param(Constants.WRAPPER) Wrapper<Jqd> queryWrapper);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
package org.dromara.extract.mapper;
|
||||||
|
|
||||||
|
import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
|
||||||
|
import org.dromara.extract.domain.PoiAddress;
|
||||||
|
|
||||||
|
public interface PoiAddressMapper extends BaseMapperPlus<PoiAddress,PoiAddress> {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
package org.dromara.extract.service;
|
||||||
|
|
||||||
|
public interface IAoiAddressService {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
package org.dromara.extract.service;
|
||||||
|
|
||||||
|
public interface IJqdService {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
package org.dromara.extract.service;
|
||||||
|
|
||||||
|
public interface IPoiAddressService {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.dromara.extract.service.impl;
|
||||||
|
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.dromara.extract.service.IAoiAddressService;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Service
|
||||||
|
public class AoiAddressServiceImpl implements IAoiAddressService {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,363 @@
|
||||||
|
package org.dromara.extract.service.impl;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import cn.hutool.http.HttpUtil;
|
||||||
|
import cn.hutool.json.JSONObject;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||||
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||||
|
import jakarta.annotation.PreDestroy;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.dromara.common.mybatis.core.page.PageQuery;
|
||||||
|
import org.dromara.extract.domain.Jqd;
|
||||||
|
import org.dromara.extract.domain.PoiAddress;
|
||||||
|
import org.dromara.extract.mapper.AoiAddressMapper;
|
||||||
|
import org.dromara.extract.mapper.JqdMapper;
|
||||||
|
import org.dromara.extract.mapper.PoiAddressMapper;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.dao.DuplicateKeyException;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
public class DataMigrationService {
|
||||||
|
|
||||||
|
@Value("${api.thirdparty.url}")
|
||||||
|
private String apiUrl;
|
||||||
|
|
||||||
|
@Value("${api.thirdparty.token}")
|
||||||
|
private String bearerToken;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JqdMapper sourceMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private PoiAddressMapper poiAddressMapper;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private AoiAddressMapper aoiAddressMapper;
|
||||||
|
|
||||||
|
// 内存去重集合(所有已成功入库的 name)
|
||||||
|
private final Set<String> processedNames = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
|
// 固定线程池(建议:CPU 核数 * 2 ~ 4)
|
||||||
|
private final ExecutorService executorService = Executors.newFixedThreadPool(4);
|
||||||
|
|
||||||
|
// 批量插入大小
|
||||||
|
private static final int BATCH_SIZE = 1000;
|
||||||
|
|
||||||
|
// 分页大小
|
||||||
|
private static final int PAGE_SIZE = 500;
|
||||||
|
|
||||||
|
// 启动迁移
|
||||||
|
public void startMigration() {
|
||||||
|
log.info("开始迁移数据...");
|
||||||
|
|
||||||
|
// 1. 先加载目标库中已存在的 name(只需一次)
|
||||||
|
loadExistingNamesToMemory();
|
||||||
|
|
||||||
|
// 2. 分页读取源数据
|
||||||
|
|
||||||
|
LambdaQueryWrapper<Jqd> jqlqw = new LambdaQueryWrapper<>();
|
||||||
|
jqlqw.likeRight(Jqd::getXzqh,"3401")
|
||||||
|
.ge(Jqd::getScbjsj,"2025-07-01")
|
||||||
|
.isNotNull(Jqd::getSfdz)
|
||||||
|
.ne(Jqd::getSfdz,"地址不详")
|
||||||
|
.ne(Jqd::getSfdz,"地址未知")
|
||||||
|
.ne(Jqd::getXzb,0)
|
||||||
|
.eq(Jqd::getYxbz,"1");
|
||||||
|
long totalCount = sourceMapper.selectCount(jqlqw);
|
||||||
|
log.info("查询数据大小={}",totalCount);
|
||||||
|
long totalPages = (totalCount + PAGE_SIZE - 1) / PAGE_SIZE;
|
||||||
|
PageQuery pageQuery = new PageQuery();
|
||||||
|
jqlqw.orderByDesc(Jqd::getScbjsj);
|
||||||
|
pageQuery.setPageSize(PAGE_SIZE);
|
||||||
|
for (int pageNum = 0; pageNum < totalPages; pageNum++) {
|
||||||
|
long offset = pageNum * PAGE_SIZE;
|
||||||
|
pageQuery.setPageNum(pageNum);
|
||||||
|
|
||||||
|
|
||||||
|
log.info("正在处理第 {}/{} 页, offset={}", pageNum + 1, totalPages, offset);
|
||||||
|
|
||||||
|
List<Jqd> pageData;
|
||||||
|
try {
|
||||||
|
IPage jqPage = sourceMapper.selectPage(pageQuery.build(), jqlqw);
|
||||||
|
pageData = jqPage.getRecords();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("读取第 {} 页失败", pageNum, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pageData.isEmpty()) continue;
|
||||||
|
|
||||||
|
// 并发处理当前页
|
||||||
|
processPageAsync(pageData);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("✅ 数据迁移任务全部提交完成");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 加载目标库中已存在的 name 到内存(避免重复插入)
|
||||||
|
private void loadExistingNamesToMemory() {
|
||||||
|
Long count = poiAddressMapper.selectCount(null);
|
||||||
|
if (count == 0) {
|
||||||
|
log.info("目标表为空,首次运行,跳过加载");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("正在加载已有 name 到内存...");
|
||||||
|
|
||||||
|
// ✅ 改用 selectList + LambdaQueryWrapper
|
||||||
|
List<PoiAddress> list = poiAddressMapper.selectList(
|
||||||
|
new LambdaQueryWrapper<PoiAddress>()
|
||||||
|
.select(PoiAddress::getName)
|
||||||
|
);
|
||||||
|
|
||||||
|
List<String> names = list.stream()
|
||||||
|
.map(PoiAddress::getName)
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
processedNames.addAll(names);
|
||||||
|
log.info("✅ 已加载 {} 个 name", names.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 异步处理一页数据
|
||||||
|
private void processPageAsync(List<Jqd> pageData) {
|
||||||
|
log.info("✅ 提交异步任务,页数据量: {}", pageData.size());
|
||||||
|
CompletableFuture.runAsync(() -> {
|
||||||
|
try {
|
||||||
|
List<PoiAddress> batch = new ArrayList<>(BATCH_SIZE);
|
||||||
|
|
||||||
|
for (Jqd src : pageData) {
|
||||||
|
try {
|
||||||
|
String query = src.getSfdz();
|
||||||
|
if (StrUtil.isBlank(query)) continue;
|
||||||
|
log.debug("📡 正在调用第三方 API 查询: {}", query); // ✅ 加日志
|
||||||
|
// 调用接口获取解析结果
|
||||||
|
ParsedResult result = callThirdPartyApi(query);
|
||||||
|
if (result == null || StrUtil.isBlank(result.getName())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 内存去重:跳过已处理的 name
|
||||||
|
if (processedNames.contains(result.getName())) {
|
||||||
|
log.debug("跳过已存在 name: {}", result.getName());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果POI和AOI都为空 则为垃圾地址 不进行入库
|
||||||
|
if ("".equals(result.getAoi()) && !"".equals(result.getPoi())) {
|
||||||
|
// 如果AOI为空 构造POI目标对象 则不会再入一条 AOI地址 即poi、aoi都相同的地址
|
||||||
|
PoiAddress poi = new PoiAddress();
|
||||||
|
poi.setName(result.getPoi());
|
||||||
|
poi.setAoi(result.getAoi());
|
||||||
|
poi.setAddress(result.getAddress());
|
||||||
|
poi.setLatitude(src.getYzb());
|
||||||
|
poi.setLongitude(src.getXzb());
|
||||||
|
poi.setGadwdm(src.getJzxqdwdm());
|
||||||
|
poi.setGadwmc(src.getJzxqdwmc());
|
||||||
|
poi.setPoiTypeCode(src.getSfdzfldm());
|
||||||
|
poi.setPoiTypeName(src.getSfdzflmc());
|
||||||
|
poi.setType("1");
|
||||||
|
poi.setSource("警情");
|
||||||
|
|
||||||
|
batch.add(poi);
|
||||||
|
} else if (!"".equals(result.getAoi()) && "".equals(result.getPoi())) {
|
||||||
|
// 如果AOI不为空 poi为空 则入一条 AOI地址 即poi、aoi都相同的地址
|
||||||
|
PoiAddress poi = new PoiAddress();
|
||||||
|
poi.setName(result.getAoi());
|
||||||
|
poi.setAoi(result.getAoi());
|
||||||
|
poi.setAddress(result.getAddress());
|
||||||
|
poi.setLatitude(src.getYzb());
|
||||||
|
poi.setLongitude(src.getXzb());
|
||||||
|
poi.setGadwdm(src.getJzxqdwdm());
|
||||||
|
poi.setGadwmc(src.getJzxqdwmc());
|
||||||
|
poi.setPoiTypeCode(src.getSfdzfldm());
|
||||||
|
poi.setPoiTypeName(src.getSfdzflmc());
|
||||||
|
poi.setType("2");
|
||||||
|
poi.setSource("警情");
|
||||||
|
|
||||||
|
batch.add(poi);
|
||||||
|
} else if (!"".equals(result.getAoi()) && !"".equals(result.getPoi())) {
|
||||||
|
// 如果AOI、POI都不为空 则会入两条数据 一条AOI数据address中把poi替换为空 然后入一条AOI、POI相同的数据、一条POI数据
|
||||||
|
PoiAddress poi = new PoiAddress();
|
||||||
|
poi.setName(result.getPoi());
|
||||||
|
poi.setAoi(result.getAoi());
|
||||||
|
poi.setAddress(result.getAddress());
|
||||||
|
poi.setLatitude(src.getYzb());
|
||||||
|
poi.setLongitude(src.getXzb());
|
||||||
|
poi.setGadwdm(src.getJzxqdwdm());
|
||||||
|
poi.setGadwmc(src.getJzxqdwmc());
|
||||||
|
poi.setPoiTypeCode(src.getSfdzfldm());
|
||||||
|
poi.setPoiTypeName(src.getSfdzflmc());
|
||||||
|
poi.setType("1");
|
||||||
|
poi.setSource("警情");
|
||||||
|
|
||||||
|
batch.add(poi);
|
||||||
|
|
||||||
|
PoiAddress aoi = new PoiAddress();
|
||||||
|
aoi.setName(result.getAoi());
|
||||||
|
aoi.setAoi(result.getAoi());
|
||||||
|
aoi.setAddress(result.getAddress().replaceAll(result.getPoi(),""));
|
||||||
|
aoi.setLatitude(src.getYzb());
|
||||||
|
aoi.setLongitude(src.getXzb());
|
||||||
|
aoi.setGadwdm(src.getJzxqdwdm());
|
||||||
|
aoi.setGadwmc(src.getJzxqdwmc());
|
||||||
|
aoi.setPoiTypeCode(src.getSfdzfldm());
|
||||||
|
aoi.setPoiTypeName(src.getSfdzflmc());
|
||||||
|
aoi.setType("2");
|
||||||
|
aoi.setSource("警情");
|
||||||
|
|
||||||
|
batch.add(aoi);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 达到批次大小,尝试入库
|
||||||
|
if (batch.size() >= BATCH_SIZE) {
|
||||||
|
flushBatch(batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理地址失败: {}", src.getSfdz(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理最后一批
|
||||||
|
if (!batch.isEmpty()) {
|
||||||
|
flushBatch(batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
}catch (Throwable t) {
|
||||||
|
// 🚨 捕获所有错误(包括 Error)
|
||||||
|
log.error("❌ processPageAsync 任务内部发生致命错误", t);
|
||||||
|
}
|
||||||
|
|
||||||
|
}, executorService).exceptionally(throwable -> {
|
||||||
|
// 🚨 捕获 CompletableFuture 本身的异常
|
||||||
|
log.error("❌ CompletableFuture 执行失败", throwable);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// 调用第三方接口
|
||||||
|
private ParsedResult callThirdPartyApi(String query) {
|
||||||
|
try {
|
||||||
|
Map<String, String> headers = Map.of(
|
||||||
|
"Authorization", "Bearer " + bearerToken,
|
||||||
|
"Content-Type", "application/json"
|
||||||
|
);
|
||||||
|
|
||||||
|
JSONObject requestBody = JSONUtil.createObj()
|
||||||
|
.set("response_mode", "blocking")
|
||||||
|
.set("user", "qiaobin@cmcc.com")
|
||||||
|
.set("inputs", JSONUtil.createObj().set("query", query));
|
||||||
|
|
||||||
|
String response = HttpUtil.createPost(apiUrl)
|
||||||
|
.addHeaders(headers)
|
||||||
|
.body(requestBody.toString())
|
||||||
|
.timeout(10000)
|
||||||
|
.execute()
|
||||||
|
.body();
|
||||||
|
|
||||||
|
JSONObject json = JSONUtil.parseObj(response);
|
||||||
|
String answer = json.getStr("answer");
|
||||||
|
|
||||||
|
if (StrUtil.isBlank(answer)) return null;
|
||||||
|
|
||||||
|
String jsonStr = extractJsonFromMarkdown(answer);
|
||||||
|
if (jsonStr == null) return null;
|
||||||
|
|
||||||
|
JSONObject data = JSONUtil.parseObj(jsonStr);
|
||||||
|
String aoi = data.getStr("aoi");
|
||||||
|
String poi = data.getStr("poi");
|
||||||
|
String addr = data.getStr("address");
|
||||||
|
|
||||||
|
String name = StrUtil.isNotBlank(aoi) ? aoi :
|
||||||
|
(StrUtil.isNotBlank(addr) ? addr : null);
|
||||||
|
|
||||||
|
if (name == null) return null;
|
||||||
|
|
||||||
|
return new ParsedResult(name, aoi, poi, addr);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("调用接口失败: {}", query, e);
|
||||||
|
return null; // 可加入重试机制
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 提取 ```json{...}``` 中内容
|
||||||
|
private String extractJsonFromMarkdown(String answer) {
|
||||||
|
Pattern pattern = Pattern.compile("```json\\s*([\\s\\S]*?)\\s*```", Pattern.CASE_INSENSITIVE);
|
||||||
|
Matcher matcher = pattern.matcher(answer);
|
||||||
|
if (matcher.find()) {
|
||||||
|
return matcher.group(1).trim();
|
||||||
|
}
|
||||||
|
if (JSONUtil.isJson(answer.trim())) {
|
||||||
|
return answer.trim();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量入库 + 更新内存集合
|
||||||
|
private void flushBatch(List<PoiAddress> batch) {
|
||||||
|
if (batch.isEmpty()) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 使用 MyBatis-Plus 批量插入(注意:不是 update)
|
||||||
|
boolean saved = poiAddressMapper.insertBatch(batch);
|
||||||
|
if (saved) {
|
||||||
|
// 成功后更新内存去重集合
|
||||||
|
batch.forEach(loc -> processedNames.add(loc.getName()));
|
||||||
|
log.info("✅ 批量插入 {} 条数据", batch.size());
|
||||||
|
}
|
||||||
|
} catch (DuplicateKeyException e) {
|
||||||
|
// 如果有重复,逐条插入(可选)
|
||||||
|
for (PoiAddress loc : batch) {
|
||||||
|
if (processedNames.contains(loc.getName())) continue;
|
||||||
|
try {
|
||||||
|
poiAddressMapper.insert(loc);
|
||||||
|
processedNames.add(loc.getName());
|
||||||
|
} catch (DuplicateKeyException ex) {
|
||||||
|
// 忽略
|
||||||
|
} catch (Exception ex) {
|
||||||
|
log.error("单条插入失败: {}", loc.getName(), ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("批量插入失败,降级为单条处理", e);
|
||||||
|
// 降级处理...
|
||||||
|
}
|
||||||
|
|
||||||
|
batch.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void shutdown() {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 辅助类
|
||||||
|
private static class ParsedResult {
|
||||||
|
private final String name, aoi, poi, address;
|
||||||
|
public ParsedResult(String name, String aoi, String poi, String address) {
|
||||||
|
this.name = name;
|
||||||
|
this.aoi = aoi;
|
||||||
|
this.poi = poi;
|
||||||
|
this.address = address;
|
||||||
|
}
|
||||||
|
public String getName() { return name; }
|
||||||
|
public String getAoi() { return aoi; }
|
||||||
|
public String getPoi() { return poi; }
|
||||||
|
public String getAddress() { return address; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.dromara.extract.service.impl;
|
||||||
|
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.dromara.extract.service.IJqdService;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Service
|
||||||
|
public class JqdServiceImpl implements IJqdService {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
package org.dromara.extract.service.impl;
|
||||||
|
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.dromara.extract.service.IPoiAddressService;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Service
|
||||||
|
public class PoiAddressServiceImpl implements IPoiAddressService {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,34 @@
|
||||||
|
# Tomcat
|
||||||
|
server:
|
||||||
|
port: 9205
|
||||||
|
|
||||||
|
# Spring
|
||||||
|
spring:
|
||||||
|
application:
|
||||||
|
# 应用名称
|
||||||
|
name: stwzhj-extract
|
||||||
|
profiles:
|
||||||
|
# 环境配置
|
||||||
|
active: @profiles.active@
|
||||||
|
|
||||||
|
--- # nacos 配置
|
||||||
|
spring:
|
||||||
|
cloud:
|
||||||
|
nacos:
|
||||||
|
# nacos 服务地址
|
||||||
|
server-addr: @nacos.server@
|
||||||
|
username: @nacos.username@
|
||||||
|
password: @nacos.password@
|
||||||
|
discovery:
|
||||||
|
# 注册组
|
||||||
|
group: @nacos.discovery.group@
|
||||||
|
namespace: ${spring.profiles.active}
|
||||||
|
config:
|
||||||
|
# 配置组
|
||||||
|
group: @nacos.config.group@
|
||||||
|
namespace: ${spring.profiles.active}
|
||||||
|
config:
|
||||||
|
import:
|
||||||
|
- optional:nacos:application-common.yml
|
||||||
|
- optional:nacos:datasource.yml
|
||||||
|
- optional:nacos:${spring.application.name}.yml
|
||||||
|
|
@ -0,0 +1,49 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<configuration scan="true" scanPeriod="60 seconds" debug="false">
|
||||||
|
<!-- 日志存放路径 -->
|
||||||
|
<property name="log.path" value="logs" />
|
||||||
|
<property name="log.file" value="extract" />
|
||||||
|
<property name="MAX_FILE_SIZE" value="50MB" />
|
||||||
|
<property name="MAX_HISTORY" value="30" />
|
||||||
|
<!-- 日志输出格式 -->
|
||||||
|
<!-- INFO日志Appender -->
|
||||||
|
<appender name="FILE_INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||||
|
<file>${log.path}/info.${log.file}.log</file>
|
||||||
|
<filter class="ch.qos.logback.classic.filter.LevelFilter">
|
||||||
|
<level>INFO</level>
|
||||||
|
<onMatch>ACCEPT</onMatch>
|
||||||
|
<onMismatch>DENY</onMismatch>
|
||||||
|
</filter>
|
||||||
|
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||||
|
<fileNamePattern>${log.path}/info/info.${log.file}.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
|
||||||
|
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
|
||||||
|
<maxHistory>${MAX_HISTORY}</maxHistory>
|
||||||
|
</rollingPolicy>
|
||||||
|
<encoder>
|
||||||
|
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<!-- ERROR日志Appender -->
|
||||||
|
<appender name="FILE_ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||||
|
<file>${log.path}/error.${log.file}.log</file>
|
||||||
|
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||||
|
<level>ERROR</level>
|
||||||
|
</filter>
|
||||||
|
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
|
||||||
|
<fileNamePattern>${log.path}/error/error.${log.file}.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
|
||||||
|
<maxFileSize>${MAX_FILE_SIZE}</maxFileSize>
|
||||||
|
<maxHistory>${MAX_HISTORY}</maxHistory>
|
||||||
|
</rollingPolicy>
|
||||||
|
<encoder>
|
||||||
|
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<!-- 根Logger配置(禁用控制台输出) -->
|
||||||
|
<root level="INFO">
|
||||||
|
<appender-ref ref="FILE_INFO" />
|
||||||
|
<appender-ref ref="FILE_ERROR" />
|
||||||
|
</root>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper
|
||||||
|
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||||
|
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="org.dromara.extract.mapper.AoiAddressMapper">
|
||||||
|
|
||||||
|
</mapper>
|
||||||
|
|
@ -0,0 +1,16 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper
|
||||||
|
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||||
|
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="org.dromara.extract.mapper.JqdMapper">
|
||||||
|
|
||||||
|
<resultMap id="jqdResult" type="org.dromara.extract.domain.Jqd">
|
||||||
|
|
||||||
|
</resultMap>
|
||||||
|
|
||||||
|
<select id="selectJq" resultMap="jqdResult">
|
||||||
|
select sfdz,sfdzfldm,sfdzflmc,xzb,yzb,jzxqdwdm,jzxqdwmc,scbjsj from jqd
|
||||||
|
${ew.getCustomSqlSegment}
|
||||||
|
</select>
|
||||||
|
|
||||||
|
</mapper>
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<!DOCTYPE mapper
|
||||||
|
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||||
|
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||||
|
<mapper namespace="org.dromara.extract.mapper.PoiAddressMapper">
|
||||||
|
|
||||||
|
</mapper>
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
java包使用 `.` 分割 resource 目录使用 `/` 分割
|
||||||
|
<br>
|
||||||
|
此文件目的 防止文件夹粘连找不到 `xml` 文件
|
||||||
Loading…
Reference in New Issue