From 3de061a84323bcb5044b815b62c31952b885fd6f Mon Sep 17 00:00:00 2001 From: luyya Date: Fri, 16 Jan 2026 16:23:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86=E9=9F=A6=E6=80=BB=E5=9C=B0?= =?UTF-8?q?=E5=9D=80=E7=BB=93=E6=9E=84=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../extract/controller/AddressController.java | 50 +- .../controller/ExcelPreviewController.java | 354 +++++ .../extract/domain/AddressParseResult.java | 50 + .../dromara/extract/domain/ColumnIndices.java | 26 + .../extract/domain/ExcelProcessResult.java | 70 + .../extract/domain/ProcessingStrategy.java | 45 + .../extract/domain/RowProcessResult.java | 28 + .../extract/domain/RowProcessingTask.java | 17 + .../org/dromara/extract/domain/SdJqd.java | 37 + .../org/dromara/extract/domain/ShJqd.java | 37 + .../dromara/extract/mapper/SdJqdMapper.java | 11 + .../dromara/extract/mapper/ShJqdMapper.java | 17 + .../service/impl/DataMigrationService.java | 470 ++++++- .../impl/LargeExcelAddressParseService.java | 1176 +++++++++++++++++ .../extract/util/ExcelProcessTask.java | 73 + .../resources/mapper/extract/SdJqdMapper.xml | 13 + .../resources/mapper/extract/ShJqdMapper.xml | 13 + 17 files changed, 2466 insertions(+), 21 deletions(-) create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/ExcelPreviewController.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/AddressParseResult.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ColumnIndices.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ExcelProcessResult.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ProcessingStrategy.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/RowProcessResult.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/RowProcessingTask.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/SdJqd.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ShJqd.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/SdJqdMapper.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/ShJqdMapper.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/LargeExcelAddressParseService.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/util/ExcelProcessTask.java create mode 100644 stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/SdJqdMapper.xml create mode 100644 stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/ShJqdMapper.xml diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/AddressController.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/AddressController.java index c906e105..106ea687 100644 --- a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/AddressController.java +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/AddressController.java @@ -2,14 +2,18 @@ package org.dromara.extract.controller; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.domain.R; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.web.core.BaseController; import org.dromara.extract.service.impl.DataMigrationService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.multipart.MultipartFile; import java.util.HashMap; import java.util.Map; @@ -52,6 +56,41 @@ public class AddressController extends BaseController { return ResponseEntity.ok("✅ 迁移任务已提交,正在后台执行..."); } + + + @GetMapping("/startsh") + public ResponseEntity startshMigration() { + try { + log.info("🚀 开始执行涉黄数据迁移任务..."); + dataMigrationService.startShMigration(); + } catch (Exception e) { + log.error("❌ 迁移任务执行异常", e); + } finally { + isRunning = false; + } + return ResponseEntity.ok("✅ 迁移任务已提交,正在后台执行..."); + } + + + @GetMapping("/startsd") + public ResponseEntity startsdMigration() { + + try { + log.info("🚀 开始执行涉赌数据迁移任务..."); + dataMigrationService.startSdMigration(); + } catch (Exception e) { + log.error("❌ 迁移任务执行异常", e); + } finally { + isRunning = false; + } + + return ResponseEntity.ok("✅ 迁移任务已提交,正在后台执行..."); + } + + + + + @GetMapping("/status") public ResponseEntity> getStatus() { Map status = new HashMap<>(); @@ -60,17 +99,6 @@ public class AddressController extends BaseController { return ResponseEntity.ok(status); } - @GetMapping("/stop") - public ResponseEntity stopMigration() { - // 注意:当前线程池没有实现优雅关闭 - // 这里只是一个提示,实际需要在 Service 中支持中断 - if (!isRunning) { - return ResponseEntity.ok("任务未运行,无需停止"); - } - - // ⚠️ 提示用户:无法强制停止,建议手动关闭应用或增强 Service - return ResponseEntity.ok("⚠️ 停止功能暂不支持。请重启应用或增强线程控制逻辑。"); - } diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/ExcelPreviewController.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/ExcelPreviewController.java new file mode 100644 index 00000000..f89df1c1 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/controller/ExcelPreviewController.java @@ -0,0 +1,354 @@ +package org.dromara.extract.controller; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.extract.service.impl.LargeExcelAddressParseService; +import org.dromara.extract.util.ExcelProcessTask; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.core.io.Resource; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.multipart.MultipartFile; + +import java.io.File; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * Excel地址解析控制器 - 大文件并发处理 + * 支持多请求并发处理,保留预览50条功能 + */ + +@Slf4j +@RestController +@RequestMapping("/api/excel") +public class ExcelPreviewController { + + @Autowired + private LargeExcelAddressParseService excelService; + + /** + * 上传并预览处理(前50行) - 核心功能 + */ + @PostMapping("/upload-preview") + public ResponseEntity uploadAndPreview(@RequestParam("file") MultipartFile file) { + try { + validateLargeExcelFile(file); + + String requestId = UUID.randomUUID().toString(); + log.info("收到大文件预览请求 [{}]: {},大小: {} MB", + requestId, file.getOriginalFilename(), + file.getSize() / (1024.0 * 1024.0)); + + // 异步处理预览50行 + CompletableFuture.supplyAsync(() -> { + return excelService.processLargeExcelFile(file, requestId, true); // previewOnly=true + }); + + // 立即返回任务信息,用户可轮询状态 + Map response = createPreviewResponse(requestId, file); + + return ResponseEntity.accepted().body(response); + + } catch (IllegalArgumentException e) { + return ResponseEntity.badRequest().body(createErrorResponse(e.getMessage())); + } catch (Exception e) { + log.error("大文件预览请求失败", e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body(createErrorResponse("预览处理失败: " + e.getMessage())); + } + } + + + + + /** + * 完整处理(一次性处理所有行,跳过预览) + */ + @PostMapping("/upload-full") + public ResponseEntity uploadAndFullProcess(@RequestParam("file") MultipartFile file) { + try { + validateLargeExcelFile(file); + + String requestId = UUID.randomUUID().toString(); + log.info("收到大文件完整处理请求 [{}]: {},大小: {} MB", + requestId, file.getOriginalFilename(), + file.getSize() / (1024.0 * 1024.0)); + + // 异步完整处理 + CompletableFuture.supplyAsync(() -> { + return excelService.processLargeExcelFile(file, requestId, false); // previewOnly=false + }); + + Map response = new HashMap<>(); + response.put("requestId", requestId); + response.put("filename", file.getOriginalFilename()); + response.put("fileSize", formatFileSize(file.getSize())); + response.put("status", "FULL_PROCESSING"); + response.put("message", "正在完整处理文件..."); + response.put("estimatedTime", estimateFullProcessTime(file.getSize())); + response.put("statusUrl", "/api/excel/status/" + requestId); + response.put("isPreview", false); + + return ResponseEntity.accepted().body(response); + + } catch (IllegalArgumentException e) { + return ResponseEntity.badRequest().body(createErrorResponse(e.getMessage())); + } catch (Exception e) { + log.error("大文件完整处理请求失败", e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body(createErrorResponse("完整处理失败: " + e.getMessage())); + } + } + + + + + /** + * 下载完整处理文件 + */ + @GetMapping("/download/{requestId}") + public ResponseEntity downloadFile(@PathVariable String requestId) { + try { + // 获取任务信息 + ExcelProcessTask task = excelService.getTaskStatus(requestId); + if (task == null) { + return ResponseEntity.status(HttpStatus.NOT_FOUND).build(); + } + + // 从任务信息中获取文件路径 + // 注意:LargeExcelAddressParseService需要提供获取输出文件的方法 + File outputFile = getOutputFileFromTask(task); + + if (outputFile == null || !outputFile.exists()) { + log.warn("输出文件不存在 [{}]", requestId); + return ResponseEntity.status(HttpStatus.NOT_FOUND).build(); + } + + byte[] content = Files.readAllBytes(outputFile.toPath()); + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_OCTET_STREAM); + headers.setContentDispositionFormData("attachment", + "processed_" + task.getFilename()); + headers.setContentLength(content.length); + + if (task.isPreviewMode()) { + headers.add("X-Processing-Type", "preview-and-continue"); + headers.add("X-Preview-Completed", String.valueOf(task.getPreviewCompletedRows())); + } else { + headers.add("X-Processing-Type", "full-process"); + } + + return new ResponseEntity<>(new ByteArrayResource(content), headers, HttpStatus.OK); + + } catch (Exception e) { + log.error("下载文件失败 [{}]", requestId, e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); + } + } + + + + /** + * 取消任务 + */ + @PostMapping("/cancel/{requestId}") + public ResponseEntity cancelTask(@PathVariable String requestId) { + try { + // 这里需要服务层支持取消任务的功能 + // excelService.cancelTask(requestId); + + Map response = new HashMap<>(); + response.put("requestId", requestId); + response.put("status", "CANCELLED"); + response.put("message", "任务取消请求已提交"); + response.put("timestamp", System.currentTimeMillis()); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("取消任务失败 [{}]", requestId, e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body(createErrorResponse("取消任务失败: " + e.getMessage())); + } + } + + /** + * 清理过期任务(管理员接口) + */ + @PostMapping("/admin/cleanup") + public ResponseEntity cleanupOldTasks(@RequestHeader(value = "X-Admin-Token", required = false) String adminToken) { + try { + // 简单的管理员验证 + if (!"your-admin-token".equals(adminToken)) { + return ResponseEntity.status(HttpStatus.UNAUTHORIZED) + .body(createErrorResponse("未授权")); + } + + // 获取清理前的任务数量 + int beforeCount = excelService.getAllActiveTasks().size(); + + // 清理操作 + // excelService.cleanupExpiredTasks(); + + Map response = new HashMap<>(); + response.put("action", "cleanup"); + response.put("beforeCount", beforeCount); + response.put("afterCount", excelService.getAllActiveTasks().size()); + response.put("timestamp", System.currentTimeMillis()); + + log.info("管理员清理过期任务,清理前: {},清理后: {}", beforeCount, response.get("afterCount")); + + return ResponseEntity.ok(response); + + } catch (Exception e) { + log.error("清理任务失败", e); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body(createErrorResponse("清理失败: " + e.getMessage())); + } + } + + // ============ 辅助方法 ============ + + /** + * 创建预览响应 + */ + private Map createPreviewResponse(String requestId, MultipartFile file) { + Map response = new HashMap<>(); + response.put("requestId", requestId); + response.put("filename", file.getOriginalFilename()); + response.put("fileSize", formatFileSize(file.getSize())); + response.put("status", "PREVIEW_PROCESSING"); + response.put("message", "正在处理前50行预览..."); + response.put("previewRows", 50); // 明确显示预览行数 + response.put("previewStatusUrl", "/api/excel/preview/status/" + requestId); + response.put("fullStatusUrl", "/api/excel/status/" + requestId); + response.put("estimatedTime", estimatePreviewTime(file.getSize())); + response.put("isPreview", true); + response.put("timestamp", System.currentTimeMillis()); + return response; + } + + + /** + * 创建错误响应 + */ + private Map createErrorResponse(String error) { + Map response = new HashMap<>(); + response.put("error", error); + response.put("timestamp", System.currentTimeMillis()); + return response; + } + + /** + * 验证大文件 + */ + private void validateLargeExcelFile(MultipartFile file) { + if (file.isEmpty()) { + throw new IllegalArgumentException("文件不能为空"); + } + + String filename = file.getOriginalFilename(); + if (filename == null) { + throw new IllegalArgumentException("文件名不能为空"); + } + + if (!filename.toLowerCase().endsWith(".xlsx") && + !filename.toLowerCase().endsWith(".xls")) { + throw new IllegalArgumentException("只支持Excel文件(.xlsx, .xls)"); + } + + // 大文件大小限制(100MB) + if (file.getSize() > 100 * 1024 * 1024) { + throw new IllegalArgumentException("文件大小不能超过100MB"); + } + } + + /** + * 格式化文件大小 + */ + private String formatFileSize(long size) { + if (size < 1024) { + return size + " B"; + } else if (size < 1024 * 1024) { + return String.format("%.1f KB", size / 1024.0); + } else { + return String.format("%.1f MB", size / (1024.0 * 1024.0)); + } + } + + /** + * 估算预览时间 + */ + private String estimatePreviewTime(long fileSize) { + // 简单估算:每MB约1秒,最小5秒,最大60秒 + long estimatedSeconds = Math.min(60, Math.max(5, fileSize / (1024 * 1024))); + return estimatedSeconds + "秒"; + } + + /** + * 估算继续处理时间 + */ + private String estimateContinueTime(int remainingRows) { + // 简单估算:每行约2秒,最小30秒 + long estimatedSeconds = Math.max(30, remainingRows * 2); + long minutes = estimatedSeconds / 60; + long seconds = estimatedSeconds % 60; + + if (minutes > 0) { + return String.format("%d分%d秒", minutes, seconds); + } else { + return estimatedSeconds + "秒"; + } + } + + /** + * 估算完整处理时间 + */ + private String estimateFullProcessTime(long fileSize) { + // 简单估算:每MB约3秒,最小30秒 + long estimatedSeconds = Math.max(30, (fileSize / (1024 * 1024)) * 3); + long minutes = estimatedSeconds / 60; + long seconds = estimatedSeconds % 60; + + if (minutes > 60) { + long hours = minutes / 60; + minutes = minutes % 60; + return String.format("%d小时%d分", hours, minutes); + } else if (minutes > 0) { + return String.format("%d分%d秒", minutes, seconds); + } else { + return estimatedSeconds + "秒"; + } + } + + /** + * 获取任务总行数 + */ + private int getTaskTotalRows(String requestId) { + try { + ExcelProcessTask task = excelService.getTaskStatus(requestId); + return task != null ? task.getTotalRows() : 0; + } catch (Exception e) { + return 0; + } + } + + /** + * 从任务获取输出文件 + */ + private File getOutputFileFromTask(ExcelProcessTask task) { + // 这里需要根据你的实际实现来获取文件 + // 假设文件保存在固定目录下 + String outputDir = "/home/rsoft/excel_output"; + String filename = task.getRequestId() + "_processed.xlsx"; + return new File(outputDir, filename); + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/AddressParseResult.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/AddressParseResult.java new file mode 100644 index 00000000..93228025 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/AddressParseResult.java @@ -0,0 +1,50 @@ +package org.dromara.extract.domain; + +import lombok.Data; + +// 新增解析结果类 +@Data +public class AddressParseResult { + private String aoi; + private String aoiType; + private String poi; + private String poiType; + private String street; + private String community; + private String address; + + + + public AddressParseResult(String aoi, String aoiType, String poi, String poiType, + String street, String community, String address) { + this.aoi = aoi; + this.aoiType = aoiType; + this.poi = poi; + this.poiType = poiType; + this.street = street; + this.community = community; + this.address = address; + } + + // getter和setter方法 + public String getAoi() { return aoi; } + public void setAoi(String aoi) { this.aoi = aoi; } + + public String getAoiType() { return aoiType; } + public void setAoiType(String aoiType) { this.aoiType = aoiType; } + + public String getPoi() { return poi; } + public void setPoi(String poi) { this.poi = poi; } + + public String getPoiType() { return poiType; } + public void setPoiType(String poiType) { this.poiType = poiType; } + + public String getStreet() { return street; } + public void setStreet(String street) { this.street = street; } + + public String getCommunity() { return community; } + public void setCommunity(String community) { this.community = community; } + + public String getAddress() { return address; } + public void setAddress(String address) { this.address = address; } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ColumnIndices.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ColumnIndices.java new file mode 100644 index 00000000..15624edb --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ColumnIndices.java @@ -0,0 +1,26 @@ +package org.dromara.extract.domain; + +import lombok.Data; + +/** + * 列索引容器类 + */ +@Data +public class ColumnIndices { + public int addressColIndex; + public int poiColIndex; + public int poiTypeColIndex; + public int aoiColIndex; + public int aoiTypeColIndex; + public int streetColIndex; + public int communityColIndex; + + public ColumnIndices() {} + + @Override + public String toString() { + return String.format("地址列:%d, POI:%d, POI类型:%d, AOI:%d, AOI类型:%d, 街道:%d, 社区:%d", + addressColIndex, poiColIndex, poiTypeColIndex, aoiColIndex, + aoiTypeColIndex, streetColIndex, communityColIndex); + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ExcelProcessResult.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ExcelProcessResult.java new file mode 100644 index 00000000..a0c36175 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ExcelProcessResult.java @@ -0,0 +1,70 @@ +package org.dromara.extract.domain; + +import lombok.Data; + +@Data +public class ExcelProcessResult { + private String requestId; + private String filename; + private String outputPath; + private int successCount; + private int failCount; + private int totalRows; + private String status; + private long elapsedTime; + private boolean isPreview; + private int previewRows; + private int previewCompletedRows; + private String downloadUrl; + private String continueUrl; + + public ExcelProcessResult(String requestId, String filename, String outputPath, + int successCount, int failCount, int totalRows, + String status, long elapsedTime, + boolean isPreview, int previewRows, int previewCompletedRows) { + this.requestId = requestId; + this.filename = filename; + this.outputPath = outputPath; + this.successCount = successCount; + this.failCount = failCount; + this.totalRows = totalRows; + this.status = status; + this.elapsedTime = elapsedTime; + this.isPreview = isPreview; + this.previewRows = previewRows; + this.previewCompletedRows = previewCompletedRows; + + // 自动生成URL + if (outputPath != null) { + this.downloadUrl = "/api/excel/download/" + requestId; + } + + if ("PREVIEW_COMPLETED".equals(status)) { + this.continueUrl = "/api/excel/continue/" + requestId; + } + } + + // Getters + public String getRequestId() { return requestId; } + public String getFilename() { return filename; } + public String getOutputPath() { return outputPath; } + public int getSuccessCount() { return successCount; } + public int getFailCount() { return failCount; } + public int getTotalRows() { return totalRows; } + public String getStatus() { return status; } + public long getElapsedTime() { return elapsedTime; } + public boolean isPreview() { return isPreview; } + public int getPreviewRows() { return previewRows; } + public int getPreviewCompletedRows() { return previewCompletedRows; } + public String getDownloadUrl() { return downloadUrl; } + public String getContinueUrl() { return continueUrl; } + + public double getSuccessRate() { + int total = successCount + failCount; + return total > 0 ? (successCount * 100.0 / total) : 0; + } + + public int getRemainingRows() { + return totalRows - previewCompletedRows; + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ProcessingStrategy.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ProcessingStrategy.java new file mode 100644 index 00000000..d7359a17 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ProcessingStrategy.java @@ -0,0 +1,45 @@ +package org.dromara.extract.domain; + +/** + * 处理策略枚举 + */ +public enum ProcessingStrategy { + FAST("快速模式", 1000), // 适合小文件 + BALANCED("平衡模式", 5000), // 适合中等文件 + STREAMING("流式模式", -1); // 适合大文件 + + private final String name; + private final int batchSize; // 批处理大小 + + ProcessingStrategy(String name, int batchSize) { + this.name = name; + this.batchSize = batchSize; + } + + public String getName() { return name; } + public int getBatchSize() { return batchSize; } + + /** + * 获取推荐的API并发数 + */ + public int getApiConcurrency() { + switch (this) { + case FAST: return 20; + case BALANCED: return 15; + case STREAMING: return 10; + default: return 10; + } + } + + /** + * 获取内存中的行数限制 + */ + public int getMemoryRowLimit() { + switch (this) { + case FAST: return 100000; // 内存保持10万行 + case BALANCED: return 50000; // 内存保持5万行 + case STREAMING: return 1000; // 内存只保持1000行 + default: return 10000; + } + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/RowProcessResult.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/RowProcessResult.java new file mode 100644 index 00000000..822a8db3 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/RowProcessResult.java @@ -0,0 +1,28 @@ +package org.dromara.extract.domain; + +/** + * 行处理结果类 + */ +public class RowProcessResult { + private final int rowIndex; + private final boolean success; + private final String errorMessage; + + private RowProcessResult(int rowIndex, boolean success, String errorMessage) { + this.rowIndex = rowIndex; + this.success = success; + this.errorMessage = errorMessage; + } + + public static RowProcessResult success(int rowIndex) { + return new RowProcessResult(rowIndex, true, null); + } + + public static RowProcessResult failed(int rowIndex, String errorMessage) { + return new RowProcessResult(rowIndex, false, errorMessage); + } + + public boolean isSuccess() { return success; } + public String getErrorMessage() { return errorMessage; } + public int getRowIndex() { return rowIndex; } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/RowProcessingTask.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/RowProcessingTask.java new file mode 100644 index 00000000..2bd574e5 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/RowProcessingTask.java @@ -0,0 +1,17 @@ +package org.dromara.extract.domain; + +import lombok.Data; +import org.apache.poi.ss.usermodel.Row; + +/** + * 行处理任务包装类 + */ +public class RowProcessingTask { + public final Row row; + public final int rowIndex; + + public RowProcessingTask(Row row, int rowIndex) { + this.row = row; + this.rowIndex = rowIndex; + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/SdJqd.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/SdJqd.java new file mode 100644 index 00000000..6206bb33 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/SdJqd.java @@ -0,0 +1,37 @@ +package org.dromara.extract.domain; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import lombok.Data; +import org.springframework.data.annotation.Id; + +import java.io.Serial; +import java.io.Serializable; + +@Data +public class SdJqd implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + @TableId(type = IdType.AUTO) + private String jqd; + + private String sfdz; + + private String scbjsj; + + private String poi; + + private String aoi; + + private String street; + + private String aoiType; + + private String poiType; + + private String community; + + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ShJqd.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ShJqd.java new file mode 100644 index 00000000..815abc80 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/domain/ShJqd.java @@ -0,0 +1,37 @@ +package org.dromara.extract.domain; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import lombok.Data; +import org.springframework.data.annotation.Id; + +import java.io.Serial; +import java.io.Serializable; + +@Data +public class ShJqd implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + @TableId(type = IdType.AUTO) + private String jqd; + + private String sfdz; + + private String scbjsj; + + private String poi; + + private String aoi; + + private String street; + + private String aoiType; + + private String poiType; + + private String community; + + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/SdJqdMapper.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/SdJqdMapper.java new file mode 100644 index 00000000..27db5e39 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/SdJqdMapper.java @@ -0,0 +1,11 @@ +package org.dromara.extract.mapper; + +import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; +import org.dromara.extract.domain.SdJqd; +import org.dromara.extract.domain.ShJqd; + + +public interface SdJqdMapper extends BaseMapperPlus { + + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/ShJqdMapper.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/ShJqdMapper.java new file mode 100644 index 00000000..4bec8a2e --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/mapper/ShJqdMapper.java @@ -0,0 +1,17 @@ +package org.dromara.extract.mapper; + +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.mybatisplus.core.conditions.Wrapper; +import com.baomidou.mybatisplus.core.toolkit.Constants; +import org.apache.ibatis.annotations.Param; +import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; +import org.dromara.extract.domain.Jqd; +import org.dromara.extract.domain.ShJqd; + +import java.util.List; + + +public interface ShJqdMapper extends BaseMapperPlus { + + +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/DataMigrationService.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/DataMigrationService.java index b4c2870c..bdf3f347 100644 --- a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/DataMigrationService.java +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/DataMigrationService.java @@ -10,22 +10,22 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.apache.poi.ss.usermodel.*; +import org.apache.poi.xssf.usermodel.XSSFWorkbook; import org.dromara.common.mybatis.core.page.PageQuery; -import org.dromara.extract.domain.Jqd; -import org.dromara.extract.domain.PoiAddress; -import org.dromara.extract.mapper.AoiAddressMapper; -import org.dromara.extract.mapper.JqdMapper; -import org.dromara.extract.mapper.PoiAddressMapper; +import org.dromara.extract.domain.*; +import org.dromara.extract.mapper.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -45,6 +45,12 @@ public class DataMigrationService { @Autowired private PoiAddressMapper poiAddressMapper; + @Autowired + private ShJqdMapper shJqdMapper; + + @Autowired + private SdJqdMapper sdJqdMapper; + @Autowired private AoiAddressMapper aoiAddressMapper; @@ -58,7 +64,7 @@ public class DataMigrationService { private static final int BATCH_SIZE = 1000; // 分页大小 - private static final int PAGE_SIZE = 500; + private static final int PAGE_SIZE = 200; private static final String SYSTEM_PROMPT = """ 你是一个地址解析专家,必须严格、逐条遵守以下规则处理输入地址。直接输出JSON,不思考、不解释。注意:你的任务是执行规则,不是依赖常识或猜测。以下规则优先级高于任何通用知识。 @@ -114,6 +120,74 @@ public class DataMigrationService { 注意:输出必须是纯JSON格式,不要有任何额外的解释、思考过程或markdown代码块标记。 """; + + private static final String SYSTEM_PROMPT_TYPE = """ +你是一个地址解析专家,必须严格、逐条遵守以下规则处理输入地址。直接输出JSON,不思考、不解释。注意:你的任务是执行规则,不是依赖常识或猜测。以下规则优先级高于任何通用知识。 + +1. AOI定义:指区域状地理实体,如学校、居民小区、商场、广场、大厦、产业园区、公园、市场等具有明确边界和名称的非行政区划实体。 +✅示例:世纪新都、安徽农业大学教学基地、万达广场、高新银泰、凤凰城二期、九溪江南小区、大摩广场。 +⚠️必须完整保留"期数"或"分区"信息:如"二期"、"B区"、"南区"、"东苑"等,不得简化为母体名称; +❌严格排除:上级行政区划(如省、市、县、区、镇、乡、街道); +类行政区划或功能区:`政务区`、`新区`、`高新区`、`经开区`、`工业区`、`示范区`、`片区`、`功能区`、`园区`(无具体名称时)等; +无明确边界的泛称区域;学校、医院、政府机关、公司等机构内部的分区(如"西区""南楼""B座"),即使带"区""期"字眼,也不视为AOI; +任何包含"社区""片区"等字眼但无独立命名的区域。 + +2. POI定义:指具体的、可独立定位的地理点位。 +允许以下类型:具体商户("庐江百大超市"); +连锁品牌(如"蜜雪冰城""雅韵皖酒厂"); +机构营业网点("中国人寿保险(江淮路营业部)"、"工商银行XX支行"); +具体建筑("邮政大楼"、"科技馆"、"中医院康复楼"); +基层机构(如"杨楼中队""XX派出所""XX卫生院"); +基层地理实体(如"宋庄""葛庄""大王庄"); +交通枢纽(如"孙湾火车站""合肥客运中心""汽车站"); +政府及公共服务机构(如"观堂镇政府""张村老镇政府""车管所""政务服务中心"); +道路交口:格式为"[道路A]与[道路B]交口"或"[道路A]与[道路B]交叉口",且道路名称具体; +红绿灯:仅当命名为"`[地标]红绿灯`"(如"县政府红绿灯")或"`[交口]红绿灯`"(如"南一环与长江路交口红绿灯")时,可作为POI; +功能设施:`加油站`、`公交站`、`地铁站`、`停车场`,但必须依附于一个可定位的主体,且有具体名称(如"亚珠加油站""中石化加油站")。 +严格排除(以下情况 POI 必须为空字符串):任何孤立的内部设施:`广场`、`花园`、`凉亭`、`健身区`;任何泛称区域词;加油站等设施无具体品牌名(如"南加油站""加油站")。 +⚠️ 方位/模糊词处理原则:如果地址仅由排除词构成(如"门口""旁边"),或POI完全依赖排除词成立(如"大摩广场对面"),则POI=""; +但如果地址中包含一个独立、可定位的命名实体(如"三阳路小学""观堂镇政府""客运中心""孙湾火车站""车管所"),即使后接"门口""院里""出站口""院内""后面""东北角"等词,仍应提取该实体作为POI; +"出站口""进站口""候车厅""院内""门口"等属于内部位置描述,应在address中去除,但不影响主体POI提取。 + +3. 语义优先原则:若地址中包含"AOI + 方位词"(如"大摩广场对面"),则AOI = 主体名称,POI = ""; +若地址为"道路交口"且无任何方位词,则POI = 交口名称; +若地址为"XX街道红绿灯",则POI = ""。 + +4. address清洗规则:目标:输出一个最简但具备地理可定位性的核心地址; +保留:上一级行政区划;道路交口;可定位设施或地标(如"高新银泰"、"亚珠加油站"); +小区"期数"或"分区"信息; +基层地名(如"宋庄""葛庄"); +去除:模糊方位词本身(包括方言表达如`北头`、`南头`、`桥头`、`靠近`、`旁边`、`附近`、`对面`、`里面`、`内`、`东南角`、`东北角`、`门口`、`院里`、`出站口`、`院内`、`往北`等),但不删除这些词所修饰的有效实体; +✅正确示例:"祁门路与庐州大道路口大摩广场对面" → address: "祁门路与庐州大道路口大摩广场";"三阳路小学门口(西区)" → address: "三阳路小学(西区)"; +"红星美凯龙东红绿灯" → address: "红星美凯龙";"城西大市场南加油站" → address: "城西大市场";"亚珠加油站桥南" → address: "亚珠加油站"; +"张寨汽贸城南门雅韵皖酒厂后面" → aoi: "张寨汽贸城", poi: "雅韵皖酒厂", address: "张寨汽贸城雅韵皖酒厂"; +"高铁南站南边交控集团" → poi: "交控集团", address: "交控集团";"店集镇蜜雪冰城门口" → poi: "蜜雪冰城", address: "店集镇蜜雪冰城"; +"二桥北头加油站" → aoi: "", poi: "", address: "";"中医院康复楼(庄周西区)" → aoi: "", poi: "中医院康复楼", address: "中医院康复楼(庄周西区)"; +"吉峰农机大市场西门(庄周西区)" → aoi: "吉峰农机大市场", poi: "", address: "吉峰农机大市场(庄周西区)";"陈大镇杨楼中队后面" → poi: "杨楼中队", address: "陈大镇杨楼中队"; +"芦庙镇宋庄" → poi: "宋庄", address: "芦庙镇宋庄";"涡南镇葛庄东北角" → poi: "葛庄", address: "涡南镇葛庄"; +"板桥双陆大王庄" → poi: "大王庄", address: "板桥双陆大王庄";"观堂镇政府门口" → poi: "观堂镇政府", address: "观堂镇政府"; +"张村老镇政府院里" → poi: "张村老镇政府", address: "张村老镇政府";"客运中心门口" → poi: "客运中心", address: "客运中心"; +"孙湾火车站出站口" → poi: "孙湾火车站", address: "孙湾火车站";"车管所院内" → poi: "车管所", address: "车管所"; +"火车站西路口往北" → poi: "火车站西路口", address: "火车站西路口"; +❌错误示例:address为"桥南"、"东红绿灯"、"北头",poi为"南加油站"或"加油站",aoi为"庄周西区"或"西区"(当主体是学校或行政区时)。 + +5. 新增字段提取规则:街道 (street):提取以"街道"结尾的行政区划名称(如"蜀山街道"、"三里庵街道")。当地址中明确包含此类街道信息时提取,否则为空字符串。 +社区 (community):仅当地址中明确包含"社区"一词(如"XX社区")时提取该完整社区名称,否则为空字符串。注意区分"社区"(行政区划)与"小区"(AOI)。 +POI类型:根据POI名称推断其业态或类别(如"超市"、"加油站"、"银行"、"学校"、"医院"、"政府"、"火车站"、"小区"、"民宿"等)。若POI为空,则类型也为空字符串。 +AOI类型:根据AOI名称推断其类别(如"小区"、"商场"、"园区"、"市场"、"广场"、"大厦"等)。若AOI为空,则类型也为空字符串。 + +6. 输出格式:仅输出JSON,结构如下:{ +"aoi": "匹配的AOI名称,无则为空字符串", +"aoi_type": "AOI类型,无则为空字符串", +"poi": "最可能的POI名称,无有效POI则为空字符串", +"poi_type": "POI类型,无则为空字符串", +"street": "街道名称(行政区划),无则为空字符串", +"community": "社区名称,无则为空字符串", +"address": "清洗后的核心地址,若无有效定位信息则为空字符串" +} 注意:输出必须是纯JSON格式,不要有任何额外的解释、思考过程或markdown代码块标记。 +"""; + + // 启动迁移 public void startMigration(String xzqh, String startTime) { log.info("参数xzqh={},startTime={}",xzqh,startTime); @@ -163,6 +237,99 @@ public class DataMigrationService { log.info("✅ 数据迁移任务全部提交完成"); } + + //涉黄警情 + public void startShMigration() { + log.info("涉黄警情开始迁移数据..."); + + + // 2. 分页读取源数据 + + LambdaQueryWrapper jqlqw = new LambdaQueryWrapper<>(); + jqlqw.le(ShJqd::getScbjsj,"2025-01-03 15:30:10") + .isNotNull(ShJqd::getSfdz) + .ne(ShJqd::getSfdz,"地址不详") + .ne(ShJqd::getSfdz,"地址未知"); + long totalCount = shJqdMapper.selectCount(jqlqw); + log.info("涉黄查询数据大小={}",totalCount); + long totalPages = (totalCount + PAGE_SIZE - 1) / PAGE_SIZE; + PageQuery pageQuery = new PageQuery(); + jqlqw.orderByDesc(ShJqd::getScbjsj); + pageQuery.setPageSize(PAGE_SIZE); + for (int pageNum = 0; pageNum < totalPages; pageNum++) { + long offset = pageNum * PAGE_SIZE; + pageQuery.setPageNum(pageNum); + + + log.info("正在处理涉黄第 {}/{} 页, offset={}", pageNum + 1, totalPages, offset); + + List pageData; + try { + IPage jqPage = shJqdMapper.selectPage(pageQuery.build(), jqlqw); + pageData = jqPage.getRecords(); + } catch (Exception e) { + log.error("读取涉黄第 {} 页失败", pageNum, e); + continue; + } + + if (pageData.isEmpty()) continue; + + // 并发处理当前页 + processShPageAsync(pageData); + } + + log.info("✅ 涉黄数据迁移任务全部提交完成"); + } + + //涉赌警情 + public void startSdMigration() { + log.info("涉赌警情开始迁移数据..."); + + + // 2. 分页读取源数据 + + LambdaQueryWrapper jqlqw = new LambdaQueryWrapper<>(); + jqlqw.le(SdJqd::getScbjsj,"2025-01-01 14:01:41") + .isNotNull(SdJqd::getSfdz) + .ne(SdJqd::getSfdz,"地址不详") + .ne(SdJqd::getSfdz,"地址未知"); + long totalCount = sdJqdMapper.selectCount(jqlqw); + log.info("涉赌查询数据大小={}",totalCount); + long totalPages = (totalCount + PAGE_SIZE - 1) / PAGE_SIZE; + PageQuery pageQuery = new PageQuery(); + jqlqw.orderByDesc(SdJqd::getScbjsj); + pageQuery.setPageSize(PAGE_SIZE); + for (int pageNum = 0; pageNum < totalPages; pageNum++) { + long offset = pageNum * PAGE_SIZE; + pageQuery.setPageNum(pageNum); + + + log.info("正在处理涉赌第 {}/{} 页, offset={}", pageNum + 1, totalPages, offset); + + List pageData; + try { + IPage jqPage = sdJqdMapper.selectPage(pageQuery.build(), jqlqw); + pageData = jqPage.getRecords(); + } catch (Exception e) { + log.error("读取涉赌第 {} 页失败", pageNum, e); + continue; + } + + if (pageData.isEmpty()) continue; + + // 并发处理当前页 + processSdPageAsync(pageData); + } + + log.info("✅ 涉赌数据迁移任务全部提交完成"); + } + + + + + + + // 加载目标库中已存在的 name 到内存(避免重复插入) private void loadExistingNamesToMemory() { Long count = poiAddressMapper.selectCount(null); @@ -321,6 +488,96 @@ public class DataMigrationService { }); } + //涉黄 + private void processShPageAsync(List pageData) { + log.info("✅ 提交异步任务,页数据量: {}", pageData.size()); + CompletableFuture.runAsync(() -> { + try { + List batch = new ArrayList<>(); + + for (ShJqd src : pageData) { + try { + String query = src.getSfdz(); + if (StrUtil.isBlank(query)) continue; + log.debug("📡 正在调用第三方 API 查询: {}", query); // ✅ 加日志 + // 调用接口获取解析结果 + AddressParseResult result = parseAddressWithLLM(query); + if (result == null ) { + continue; + } + src.setAoi(result.getAoi()); + src.setPoi(result.getPoi()); + src.setAoiType(result.getAoiType()); + src.setPoiType(result.getPoiType()); + src.setStreet(result.getStreet()); + src.setCommunity(result.getCommunity()); + batch.add(src); + + + } catch (Exception e) { + log.error("处理地址失败: {}", src.getSfdz(), e); + } + } + + shJqdMapper.updateBatchById(batch); + log.info("涉黄调用LLM模型修改数据: {}", batch.size()); + + }catch (Throwable t) { + // 🚨 捕获所有错误(包括 Error) + log.error("❌ processPageAsync 任务内部发生致命错误", t); + } + + }, executorService).exceptionally(throwable -> { + // 🚨 捕获 CompletableFuture 本身的异常 + log.error("❌ CompletableFuture 执行失败", throwable); + return null; + }); + } + + private void processSdPageAsync(List pageData) { + log.info("✅ 提交异步任务,页数据量: {}", pageData.size()); + CompletableFuture.runAsync(() -> { + try { + List batch = new ArrayList<>(); + + for (SdJqd src : pageData) { + try { + String query = src.getSfdz(); + if (StrUtil.isBlank(query)) continue; + log.debug("📡 正在调用第三方 API 查询: {}", query); // ✅ 加日志 + // 调用接口获取解析结果 + AddressParseResult result = parseAddressWithLLM(query); + if (result == null ) { + continue; + } + src.setAoi(result.getAoi()); + src.setPoi(result.getPoi()); + src.setAoiType(result.getAoiType()); + src.setPoiType(result.getPoiType()); + src.setStreet(result.getStreet()); + src.setCommunity(result.getCommunity()); + batch.add(src); + + + } catch (Exception e) { + log.error("处理地址失败: {}", src.getSfdz(), e); + } + } + sdJqdMapper.updateBatchById(batch); + log.info("涉赌调用LLM模型修改数据: {}", batch.size()); + }catch (Throwable t) { + // 🚨 捕获所有错误(包括 Error) + log.error("❌ processPageAsync 任务内部发生致命错误", t); + } + + }, executorService).exceptionally(throwable -> { + // 🚨 捕获 CompletableFuture 本身的异常 + log.error("❌ CompletableFuture 执行失败", throwable); + return null; + }); + } + + // 调用第三方接口 private ParsedResult callThirdPartyApi(String query) { try { @@ -521,11 +778,204 @@ public class DataMigrationService { batch.clear(); } + + @PreDestroy public void shutdown() { executorService.shutdown(); } + + + + + + /** + * 处理列名别名 + */ + private void handleColumnAliases(Map columnMap, String mainName, List aliases) { + if (!columnMap.containsKey(mainName)) { + for (String alias : aliases) { + if (columnMap.containsKey(alias)) { + columnMap.put(mainName, columnMap.get(alias)); + log.info("使用别名 '{}' 作为 '{}'", alias, mainName); + break; + } + } + } + } + + + /** + * 调用大模型解析地址 + */ + private AddressParseResult parseAddressWithLLM(String address) { + try { + // 构造消息 + JSONArray messages = new JSONArray(); + messages.add(new JSONObject().set("role", "system").set("content", SYSTEM_PROMPT_TYPE)); + messages.add(new JSONObject().set("role", "user").set("content", address)); + + // 构建请求体 + JSONObject requestBody = new JSONObject() + .set("model", "Qwen3-32B") + .set("messages", messages) + .set("max_tokens", 3000) + .set("temperature", 0) + .set("seed",42) + .set("top_p", 0.9); + + // 发送请求 + String response = HttpUtil.createPost(apiUrl) + .header("Authorization", "Bearer " + bearerToken) + .header("Content-Type", "application/json") + .timeout(30000) // 增加超时时间 + .body(requestBody.toString()) + .execute() + .body(); + + log.debug("大模型响应: {}", response); + + // 解析响应 + JSONObject jsonResponse = JSONUtil.parseObj(response); + JSONArray choices = jsonResponse.getJSONArray("choices"); + if (choices == null || choices.isEmpty()) { + log.warn("API返回无choices"); + return null; + } + + String content = choices.get(0, JSONObject.class) + .getJSONObject("message") + .getStr("content"); + + if (StrUtil.isBlank(content)) { + log.warn("content为空"); + return null; + } + + // 提取JSON + String cleanJson = extractPureJsonFromContentForExcel(content); + if (StrUtil.isBlank(cleanJson)) { + log.warn("无法提取有效JSON"); + return null; + } + + JSONObject resultJson = JSONUtil.parseObj(cleanJson); + + return new AddressParseResult( + resultJson.getStr("aoi", ""), + resultJson.getStr("aoi_type", ""), + resultJson.getStr("poi", ""), + resultJson.getStr("poi_type", ""), + resultJson.getStr("street", ""), + resultJson.getStr("community", ""), + resultJson.getStr("address", "") + ); + + } catch (Exception e) { + log.error("调用大模型解析地址失败: {}", address, e); + return null; + } + } + + /** + * 专为Excel处理优化的JSON提取方法 + */ + private String extractPureJsonFromContentForExcel(String content) { + if (StrUtil.isBlank(content)) return null; + + // 1. 移除 ... + content = content.replaceAll("(?s).*?", ""); + content = content.trim(); + + // 2. 尝试直接解析 + if (JSONUtil.isJson(content)) { + return content; + } + + // 3. 查找第一个 { 和最后一个 } + int start = content.indexOf('{'); + int end = content.lastIndexOf('}'); + + if (start >= 0 && end > start) { + String candidate = content.substring(start, end + 1); + if (JSONUtil.isJson(candidate)) { + return candidate; + } + } + + // 4. 尝试提取JSON对象 + Pattern pattern = Pattern.compile("\\{(?:[^{}]|\\{(?:[^{}]|\\{[^{}]*\\})*\\})*\\}", Pattern.DOTALL); + Matcher matcher = pattern.matcher(content); + + while (matcher.find()) { + String candidate = matcher.group(); + if (JSONUtil.isJson(candidate) && + (candidate.contains("\"aoi\"") || candidate.contains("\"poi\"") || + candidate.contains("\"aoi_type\"") || candidate.contains("\"poi_type\""))) { + return candidate; + } + } + + return null; + } + + + /** + * 获取或创建列 + */ + private int getOrCreateColumn(Sheet sheet, Map columnMap, + String columnName, String defaultColumn) { + if (columnMap.containsKey(columnName)) { + return columnMap.get(columnName); + } + + // 创建新列 + Row headerRow = sheet.getRow(0); + int newColIndex = headerRow.getLastCellNum(); + + Cell newHeaderCell = headerRow.createCell(newColIndex); + newHeaderCell.setCellValue(columnName); + + columnMap.put(columnName, newColIndex); + return newColIndex; + } + + + + + /** + * 提取纯JSON内容(复用原有方法) + */ + private String extractPureJsonFromContentnew(String content) { + // 使用原有的 extractPureJsonFromContent 方法 + // 或者可以稍作修改以适应新的JSON结构 + if (StrUtil.isBlank(content)) return null; + + // 移除 ... + content = content.replaceAll("(?s).*?", ""); + content = content.trim(); + + // 尝试直接解析 + if (JSONUtil.isJson(content)) { + return content; + } + + // 尝试提取 {...} 模式 + Pattern pattern = Pattern.compile("\\{.*?\\}", Pattern.DOTALL); + Matcher matcher = pattern.matcher(content); + + while (matcher.find()) { + String candidate = matcher.group(); + if (JSONUtil.isJson(candidate) && + (candidate.contains("\"aoi\"") || candidate.contains("\"poi\""))) { + return candidate; + } + } + + return null; + } + // 辅助类 private static class ParsedResult { private final String name, aoi, poi, address; diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/LargeExcelAddressParseService.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/LargeExcelAddressParseService.java new file mode 100644 index 00000000..f03892a7 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/service/impl/LargeExcelAddressParseService.java @@ -0,0 +1,1176 @@ +package org.dromara.extract.service.impl; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpUtil; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; +import org.apache.poi.ss.usermodel.*; +import org.apache.poi.xssf.streaming.SXSSFWorkbook; +import org.apache.poi.xssf.usermodel.XSSFWorkbook; +import org.dromara.extract.domain.AddressParseResult; +import org.dromara.extract.domain.ExcelProcessResult; +import org.dromara.extract.util.ExcelProcessTask; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.web.multipart.MultipartFile; + +import java.io.*; +import java.nio.file.Files; // 新增,用于更安全的临时文件处理 +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +/** + * Excel地址解析服务 - 支持大文件并发处理(改造版本) + */ +@Service +@Slf4j +public class LargeExcelAddressParseService { + + static { + // 修复POI大文件处理限制,允许更大内存分配 + // 注意:这只是一个上限,实际内存使用取决于JVM配置和文件大小 + org.apache.poi.util.IOUtils.setByteArrayMaxOverride(Integer.MAX_VALUE - 8); + } + + @Value("${api.thirdparty.url}") + private String apiUrl; + + @Value("${api.thirdparty.token}") + private String bearerToken; + + // 系统提示词 - 优化,提供清晰的LLM指令 + private static final String SYSTEM_PROMPT = """ +你是一个地址解析专家,必须严格、逐条遵守以下规则处理输入地址。直接输出JSON,不思考、不解释。注意:你的任务是执行规则,不是依赖常识或猜测。以下规则优先级高于任何通用知识。 + +1. AOI定义:指区域状地理实体,如学校、居民小区、商场、广场、大厦、产业园区、公园、市场等具有明确边界和名称的非行政区划实体。 +✅示例:世纪新都、安徽农业大学教学基地、万达广场、高新银泰、凤凰城二期、九溪江南小区、大摩广场。 +⚠️必须完整保留"期数"或"分区"信息:如"二期"、"B区"、"南区"、"东苑"等,不得简化为母体名称; +❌严格排除:上级行政区划(如省、市、县、区、镇、乡、街道); +类行政区划或功能区:`政务区`、`新区`、`高新区`、`经开区`、`工业区`、`示范区`、`片区`、`功能区`、`园区`(无具体名称时)等; +无明确边界的泛称区域;学校、医院、政府机关、公司等机构内部的分区(如"西区""南楼""B座"),即使带"区""期"字眼,也不视为AOI; +任何包含"社区""片区"等字眼但无独立命名的区域。 + +2. POI定义:指具体的、可独立定位的地理点位。 +允许以下类型:具体商户("庐江百大超市"); +连锁品牌(如"蜜雪冰城""雅韵皖酒厂"); +机构营业网点("中国人寿保险(江淮路营业部)"、"工商银行XX支行"); +具体建筑("邮政大楼"、"科技馆"、"中医院康复楼"); +基层机构(如"杨楼中队""XX派出所""XX卫生院"); +基层地理实体(如"宋庄""葛庄""大王庄"); +交通枢纽(如"孙湾火车站""合肥客运中心""汽车站"); +政府及公共服务机构(如"观堂镇政府""张村老镇政府""车管所""政务服务中心"); +道路交口:格式为"[道路A]与[道路B]交口"或"[道路A]与[道路B]交叉口",且道路名称具体; +红绿灯:仅当命名为"`[地标]红绿灯`"(如"县政府红绿灯")或"`[交口]红绿灯`"(如"南一环与长江路交口红绿灯")时,可作为POI; +功能设施:`加油站`、`公交站`、`地铁站`、`停车场`,但必须依附于一个可定位的主体,且有具体名称(如"亚珠加油站""中石化加油站")。 +严格排除(以下情况 POI 必须为空字符串):任何孤立的内部设施:`广场`、`花园`、`凉亭`、`健身区`;任何泛称区域词;加油站等设施无具体品牌名(如"南加油站""加油站")。 +⚠️ 方位/模糊词处理原则:如果地址仅由排除词构成(如"门口""旁边"),或POI完全依赖排除词成立(如"大摩广场对面"),则POI=""; +但如果地址中包含一个独立、可定位的命名实体(如"三阳路小学""观堂镇政府""客运中心""孙湾火车站""车管所"),即使后接"门口""院里""出站口""院内""后面""东北角"等词,仍应提取该实体作为POI; +"出站口""进站口""候车厅""院内""门口"等属于内部位置描述,应在address中去除,但不影响主体POI提取。 + +3. 语义优先原则:若地址中包含"AOI + 方位词"(如"大摩广场对面"),则AOI = 主体名称,POI = ""; +若地址为"道路交口"且无任何方位词,则POI = 交口名称; +若地址为"XX街道红绿灯",则POI = ""。 + +4. address清洗规则:目标:输出一个最简但具备地理可定位性的核心地址; +保留:上一级行政区划;道路交口;可定位设施或地标(如"高新银泰"、"亚珠加油站"); +小区"期数"或"分区"信息; +基层地名(如"宋庄""葛庄"); +去除:模糊方位词本身(包括方言表达如`北头`、`南头`、`桥头`、`靠近`、`旁边`、`附近`、`对面`、`里面`、`内`、`东南角`、`东北角`、`门口`、`院里`、`出站口`、`院内`、`往北`等),但不删除这些词所修饰的有效实体; +✅正确示例:"祁门路与庐州大道路口大摩广场对面" → address: "祁门路与庐州大道路口大摩广场";"三阳路小学门口(西区)" → address: "三阳路小学(西区)"; +"红星美凯龙东红绿灯" → address: "红星美凯龙";"城西大市场南加油站" → address: "城西大市场";"亚珠加油站桥南" → address: "亚珠加油站"; +"张寨汽贸城南门雅韵皖酒厂后面" → aoi: "张寨汽贸城", poi: "雅韵皖酒厂", address: "张寨汽贸城雅韵皖酒厂"; +"高铁南站南边交控集团" → poi: "交控集团", address: "交控集团";"店集镇蜜雪冰城门口" → poi: "蜜雪冰城", address: "店集镇蜜雪冰城"; +"二桥北头加油站" → aoi: "", poi: "", address: "";"中医院康复楼(庄周西区)" → aoi: "", poi: "中医院康复楼", address: "中医院康复楼(庄周西区)"; +"吉峰农机大市场西门(庄周西区)" → aoi: "吉峰农机大市场", poi: "", address: "吉峰农机大市场(庄周西区)";"陈大镇杨楼中队后面" → poi: "杨楼中队", address: "陈大镇杨楼中队"; +"芦庙镇宋庄" → poi: "宋庄", address: "芦庙镇宋庄";"涡南镇葛庄东北角" → poi: "葛庄", address: "涡南镇葛庄"; +"板桥双陆大王庄" → poi: "大王庄", address: "板桥双陆大王庄";"观堂镇政府门口" → poi: "观堂镇政府", address: "观堂镇政府"; +"张村老镇政府院里" → poi: "张村老镇政府", address: "张村老镇政府";"客运中心门口" → poi: "客运中心", address: "客运中心"; +"孙湾火车站出站口" → poi: "孙湾火车站", address: "孙湾火车站";"车管所院内" → poi: "车管所", address: "车管所"; +"火车站西路口往北" → poi: "火车站西路口", address: "火车站西路口"; +❌错误示例:address为"桥南"、"东红绿灯"、"北头",poi为"南加油站"或"加油站",aoi为"庄周西区"或"西区"(当主体是学校或行政区时)。 + +5. 新增字段提取规则:街道 (street):提取以"街道"结尾的行政区划名称(如"蜀山街道"、"三里庵街道")。当地址中明确包含此类街道信息时提取,否则为空字符串。 +社区 (community):仅当地址中明确包含"社区"一词(如"XX社区")时提取该完整社区名称,否则为空字符串。注意区分"社区"(行政区划)与"小区"(AOI)。 +POI类型:根据POI名称推断其业态或类别(如"超市"、"加油站"、"银行"、"学校"、"医院"、"政府"、"火车站"、"小区"、"民宿"、"ktv"等)。若POI为空,则类型也为空字符串。 +AOI类型:根据AOI名称推断其类别(如"小区"、"商场"、"园区"、"市场"、"广场"、"大厦"等)。若AOI为空,则类型也为空字符串。 + +6. 输出格式:仅输出JSON,结构如下:{ +"aoi": "匹配的AOI名称,无则为空字符串", +"aoi_type": "AOI类型,无则为空字符串", +"poi": "最可能的POI名称,无有效POI则为空字符串", +"poi_type": "POI类型,无则为空字符串", +"street": "街道名称(行政区划),无则为空字符串", +"community": "社区名称,无则为空字符串", +"address": "清洗后的核心地址,若无有效定位信息则为空字符串" +} 注意:输出必须是纯JSON格式,不要有任何额外的解释、思考过程或markdown代码块标记。 +"""; + + // 配置参数 + private static final int PREVIEW_ROWS = 50; + private static final int MAX_RETRY_COUNT = 3; // API调用最大重试次数 + private static final long RETRY_DELAY_MS = 1000; // API重试间隔 + private static final int BATCH_SIZE = 100; // 每次从Excel读取并并发调用API的行数 + private static final int MAX_CONCURRENT_API_CALLS = 20; // 最大并发API调用数 + private static final int MAX_CONCURRENT_BATCHES = 5; // 最大并发处理的批次数量 + private static final long ROW_PROCESS_TIMEOUT_SECONDS = 30; // 单行API调用超时时间 + + // 使用SXSSF的行窗口大小(内存中保留的行数),超出此值会写入磁盘 + private static final int SXSSF_WINDOW_SIZE = 1000; + + // 任务管理 + private final Map activeTasks = new ConcurrentHashMap<>(); + // 存储临时输入文件路径,以便在"继续处理"时使用 + private final Map tempInputFiles = new ConcurrentHashMap<>(); + + // 临时文件存放目录,建议在应用部署目录下,而不是系统/tmp + private static final String TEMP_FILE_DIR = "/home/rsoft/excel_processing_temp"; + private static final String OUTPUT_FILE_DIR = "/home/rsoft/excel_processed_output"; + + + // 线程池配置 + private final ExecutorService batchExecutor; // 用于并行处理Excel批次和API调用 + private final ExecutorService previewExecutor; // 专门用于预览模式的批处理 + private final ExecutorService cleanupExecutor; // 用于异步清理文件 + private final ScheduledExecutorService taskScheduler; // 用于定期任务(如清理过期任务) + + // API限流器 + private final Semaphore apiSemaphore = new Semaphore(MAX_CONCURRENT_API_CALLS); + + public LargeExcelAddressParseService() { + // 创建临时文件目录 + try { + Files.createDirectories(Paths.get(TEMP_FILE_DIR)); + Files.createDirectories(Paths.get(OUTPUT_FILE_DIR)); + } catch (IOException e) { + log.error("无法创建临时文件目录: {}", TEMP_FILE_DIR, e); + throw new RuntimeException("初始化服务失败,无法创建临时文件目录", e); + } + + // 初始化线程池 + this.batchExecutor = Executors.newFixedThreadPool(MAX_CONCURRENT_BATCHES, + new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "ExcelBatchWorker-" + counter.incrementAndGet()); + t.setDaemon(true); // 设置为守护线程,JVM关闭时自动退出 + return t; + } + }); + + this.previewExecutor = Executors.newFixedThreadPool(10, // 预览模式可以多一点并发,因为通常数据量不大 + new ThreadFactory() { + private final AtomicInteger counter = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "ExcelPreviewWorker-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; + } + }); + + this.cleanupExecutor = Executors.newSingleThreadExecutor( // 单线程清理,避免资源竞争 + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "CleanupWorker"); + t.setDaemon(true); + return t; + } + }); + + this.taskScheduler = Executors.newSingleThreadScheduledExecutor( // 单线程调度 + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "TaskScheduler"); + t.setDaemon(true); + return t; + } + }); + + // 定期清理任务(每小时执行一次) + taskScheduler.scheduleAtFixedRate(this::cleanupOldTasks, 1, 1, TimeUnit.HOURS); + } + + /** + * 主处理方法 - 流式处理大文件 + * + * @param file 上传的Excel文件 + * @param requestId 请求ID + * @param previewOnly 是否只进行预览 + * @param existingInputFilePath 如果是继续处理,则传入已保存的输入文件路径 + * @return 初始的ExcelProcessResult,实际处理结果通过轮询任务状态获取 + */ + private ExcelProcessResult doProcessLargeExcelFile(MultipartFile file, Path existingInputFilePath, String requestId, boolean previewOnly) { + ExcelProcessTask task = activeTasks.get(requestId); // 确保获取到当前任务实例 + + log.info("[{}] 开始处理文件: {}, 预览模式: {}", requestId, task.getFilename(), previewOnly); + + Path tempInputFilePath; + try { + // 1. 创建或使用临时输入文件 + if (existingInputFilePath != null) { + log.info("[{}] 使用现有临时文件: {}", requestId, existingInputFilePath.toAbsolutePath()); + tempInputFilePath = existingInputFilePath; + } else if (file != null) { + log.info("[{}] 创建新的临时文件", requestId); + tempInputFilePath = createTempFile(file, requestId); + log.info("[{}] 临时文件创建完成: {}", requestId, tempInputFilePath.toAbsolutePath()); + } else { + throw new IllegalArgumentException("既没有MultipartFile也没有existingInputFile提供"); + } + tempInputFiles.put(requestId, tempInputFilePath); // 记录输入文件路径 + task.setStatus("FILE_SAVED"); + + // 2. 创建输出文件路径 + Path outputFilePath = createOutputFilePath(requestId); + + // 3. 处理文件 + log.info("[{}] 开始解析和处理Excel数据...", requestId); + ExcelProcessResult finalResult; + if (previewOnly) { + finalResult = processPreviewOnly(task, tempInputFilePath, outputFilePath); + } else { + finalResult = processFullFile(task, tempInputFilePath, outputFilePath); + } + return finalResult; + + } catch (Exception e) { + log.error("[{}] 处理失败", requestId, e); + task.setStatus("FAILED"); + task.setErrorMessage(e.getMessage()); + task.setEndTime(System.currentTimeMillis()); + return createErrorResult(task, e); + } finally { + // 清理临时文件(仅在完整处理完成后,如果是预览,不清理输入文件) + if (!previewOnly && tempInputFiles.containsKey(requestId)) { + Path fileToClean = tempInputFiles.remove(requestId); + cleanupExecutor.execute(() -> cleanupTempFile(fileToClean)); + } + } + } + + /** + * 对外暴露的异步处理入口 + */ + public ExcelProcessResult processLargeExcelFile(MultipartFile file, String requestId, boolean previewOnly) { + ExcelProcessTask task = new ExcelProcessTask(requestId, file.getOriginalFilename()); + task.setPreviewMode(previewOnly); + task.setStatus("INITIALIZING"); + activeTasks.put(requestId, task); + + CompletableFuture processFuture = CompletableFuture.supplyAsync(() -> { + return doProcessLargeExcelFile(file, null, requestId, previewOnly); // 首次调用,existingInputFilePath为null + }, previewOnly ? previewExecutor : batchExecutor); // 根据模式选择线程池 + + task.setProcessFuture(processFuture); // 保存Future,客户端可用于检查是否完成 + + return new ExcelProcessResult( + requestId, + file.getOriginalFilename(), + null, // 输出文件路径暂时未知 + 0, + 0, + 0, + "PROCESSING", + 0, + previewOnly, + previewOnly ? PREVIEW_ROWS : 0, + 0 + ); + } + + /** + * 只预览处理 + */ + private ExcelProcessResult processPreviewOnly(ExcelProcessTask task, Path inputFilePath, Path outputFilePath) throws Exception { + String requestId = task.getRequestId(); + task.setStatus("PROCESSING_PREVIEW"); + + try (Workbook workbook = createWorkbookForRead(inputFilePath)) { + Sheet sheet = workbook.getSheetAt(0); + + // 定位列 + ColumnIndices indices = locateColumns(sheet, requestId); + // 物理行数-1是数据行数,因为有表头 + int totalRowsInFile = Math.max(0, sheet.getPhysicalNumberOfRows() - 1); + task.setTotalRows(totalRowsInFile); + task.setPreviewRows(PREVIEW_ROWS); // 设置期望预览的行数 + + log.info("[{}] 预览模式: 文件总行数(不含表头): {},将预览前 {} 行", requestId, totalRowsInFile, PREVIEW_ROWS); + + // 创建SXSSF工作簿用于写入(流式写入) + try (SXSSFWorkbook outputWorkbook = new SXSSFWorkbook(SXSSF_WINDOW_SIZE)) { + Sheet outputSheet = outputWorkbook.createSheet(); + + // 复制表头并添加结果列 + copyHeaderRow(sheet, outputSheet, indices); + + // 处理预览行 (从第1行开始,到 PREVIEW_ROWS 或文件末尾) + PreviewProcessor processor = new PreviewProcessor(task, indices); + int endRowExclusive = Math.min(PREVIEW_ROWS + 1, sheet.getPhysicalNumberOfRows()); // +1是因为API是exclusive + int processedRowsInSegment = processRowsWithProgress( + sheet, outputSheet, indices, processor, + 1, endRowExclusive, // 从第二行(索引1)开始处理 + task + ); + + // 保存结果 + saveWorkbook(outputWorkbook, outputFilePath); + + // 更新任务状态 + task.setStatus("PREVIEW_COMPLETED"); + // 预览模式下,processedRows代表实际处理的行数 + task.setPreviewCompletedRows(processedRowsInSegment); + task.incrementProcessedRows(processedRowsInSegment); // 更新总处理行数 + task.setEndTime(System.currentTimeMillis()); + + log.info("[{}] 预览完成,实际处理 {} 行,成功: {},失败: {}", + requestId, processedRowsInSegment, task.getSuccessCount(), task.getFailCount()); + + return createSuccessResult(task, outputFilePath.toFile(), true); + } + } + } + + public void createTest() throws IOException { + String tempDir = "/home/rsoft/excel_processing_temp/test_poi_temp"; + Files.createDirectories(Paths.get(tempDir)); + System.setProperty("java.io.tmpdir", tempDir); + log.info("POI temp dir set to: " + System.getProperty("java.io.tmpdir")); + + log.info("Creating new SXSSFWorkbook..."); + try (SXSSFWorkbook outputWorkbook = new SXSSFWorkbook(100)) { + log.info("SXSSFWorkbook created. Now creating sheet..."); + Sheet outputSheet = outputWorkbook.createSheet("TestSheet"); + log.info("Sheet 'TestSheet' created successfully."); + // Optionally write to a file to ensure it's fully functional + // Files.write(Paths.get("test.xlsx"), b -> outputWorkbook.write(b)); + } + log.info("Workbook closed."); + } + + + /** + * 完整文件处理 + */ + private ExcelProcessResult processFullFile(ExcelProcessTask task, Path inputFilePath, Path outputFilePath) throws Exception { + String requestId = task.getRequestId(); + task.setStatus("PROCESSING_FULL"); + + try (Workbook workbook = createWorkbookForRead(inputFilePath)) { + Sheet sheet = workbook.getSheetAt(0); + + // 定位列 + ColumnIndices indices = locateColumns(sheet, requestId); + int totalRowsInFile = Math.max(0, sheet.getPhysicalNumberOfRows() - 1); + task.setTotalRows(totalRowsInFile); + + // 确定起始行 + int startRow = 1; // 默认从第二行(索引1)开始 + if (task.isPreviewCompleted()) { + // 如果是继续处理,则从预览完成的下一行开始 + startRow = task.getPreviewCompletedRows() + 1; + log.info("[{}] 继续处理模式: 从第 {} 行开始处理剩余数据", requestId, startRow); + } else { + log.info("[{}] 完整处理模式: 从第 {} 行开始处理", requestId, startRow); + } + createTest(); + + // 创建SXSSF工作簿用于写入 + try (SXSSFWorkbook outputWorkbook = new SXSSFWorkbook(SXSSF_WINDOW_SIZE)) { + log.info("进入new SXSSFWorkbook(SXSSF_WINDOW_SIZE)"); + Sheet outputSheet = outputWorkbook.createSheet(); + + // 复制表头 + log.info("进入复制表头"); + copyHeaderRow(sheet, outputSheet, indices); + + // 处理所有行 + log.info("处理所有行"); + FullFileProcessor processor = new FullFileProcessor(task, indices); + // processRowsWithProgress 会更新 task 中的 successCount, failCount, processedRows + int processedRowsInSegment = processRowsWithProgress( + sheet, outputSheet, indices, processor, + startRow, sheet.getPhysicalNumberOfRows(), // 处理到文件末尾 + task + ); + + log.info("processor执行结束"); + // 保存结果 + saveWorkbook(outputWorkbook, outputFilePath); + + // 更新任务状态 + task.setStatus("COMPLETED"); + task.setEndTime(System.currentTimeMillis()); + + log.info("[{}] 完整处理完成,总计成功: {},失败: {},总行数(不含表头): {}", + requestId, task.getSuccessCount(), task.getFailCount(), task.getTotalRows()); + + return createSuccessResult(task, outputFilePath.toFile(), false); + } + } + } + + /** + * 逐行处理并更新进度 (以批次为单位) + * + * @param inputSheet 输入Excel的Sheet + * @param outputSheet 输出Excel的Sheet + * @param indices 列索引信息 + * @param processor 处理器(PreviewProcessor或FullFileProcessor) + * @param startRow 开始处理的行索引 (包含) + * @param endRow 结束处理的行索引 (不包含) + * @param task 当前任务 + * @return 当前处理片段中实际处理的行数 (排除空行或无效地址) + */ + private int processRowsWithProgress(Sheet inputSheet, Sheet outputSheet, + ColumnIndices indices, BaseProcessor processor, + int startRow, int endRow, ExcelProcessTask task) { + + String requestId = task.getRequestId(); + int totalRowsToProcess = endRow - startRow; // 当前片段需要处理的逻辑行数 + int currentSegmentProcessed = 0; // 当前处理片段中,已调用API的行数 + + log.info("[{}] 开始处理片段: 从行 {} 到行 {} (总计 {} 行)", requestId, startRow, endRow - 1, totalRowsToProcess); + + // 使用分批处理 + for (int batchStart = startRow; batchStart < endRow; batchStart += BATCH_SIZE) { + int batchEnd = Math.min(batchStart + BATCH_SIZE, endRow); + + List batchData = new ArrayList<>(); + for (int i = batchStart; i < batchEnd; i++) { + Row row = inputSheet.getRow(i); + if (row == null) continue; // 跳过空行 + + String address = getCellValue(row.getCell(indices.addressColIndex)); + // 仅处理包含有效地址的行 + if (isValidAddress(address)) { + batchData.add(new RowData(i, address)); // RowData只存储必要信息,避免持有整个POI Row对象 + } else { + // 如果地址无效,也算作处理了,但可能需要记录到失败或跳过 + // 这里我们选择跳过API调用,但在统计processedRows时需要考虑 + // 实际处理的行数应该是指成功或失败调用API的行数。 + // 故此处不计入 currentSegmentProcessed,但如果需求是统计所有遍历过的行,则需调整 + log.debug("[{}] 行 {} 地址无效或为空,跳过API调用", requestId, i); + // 也可以选择在这里直接把原始行写入输出Excel,并在结果列标记“地址无效” + } + } + + if (!batchData.isEmpty()) { + // 并发处理当前批次,并获取结果 + List batchResults = processBatchConcurrently(batchData, requestId); + + int batchSuccess = 0; + int batchFail = 0; + // 确保对 outputSheet 的写入是同步的,防止并发修改 + synchronized (outputSheet) { + for (ParsedRowResult result : batchResults) { + if (result.isSuccess()) { + // 只有成功解析的才写入结果列 + writeRowResult(outputSheet, result.getRowIndex(), result.getParseResult(), indices); + batchSuccess++; + } else { + // 可以选择在原始Excel的特定列写入错误信息 + // 例如:writeCellSafely(outputSheet.getRow(result.getRowIndex()), indices.errorColIndex, result.getErrorMessage()); + log.warn("[{}] 行 {} 解析失败: {}", requestId, result.getRowIndex(), result.getErrorMessage()); + batchFail++; + } + } + } + currentSegmentProcessed += batchResults.size(); // 统计实际调用API的行数 + processor.incrementSuccessCount(batchSuccess); + processor.incrementFailCount(batchFail); + + log.info("[{}] 批次 [{}-{}] 完成. 实际处理 {} 行, 成功: {}, 失败: {}. 总进度: {}/{}({}%)", + requestId, batchStart, batchEnd - 1, batchResults.size(), batchSuccess, batchFail, + task.getProcessedRows() + currentSegmentProcessed, task.getTotalRows(), + String.format("%.2f", (double)(task.getProcessedRows() + currentSegmentProcessed) * 100 / task.getTotalRows())); + } else { + log.debug("[{}] 批次 [{}-{}] 无有效地址数据,跳过", requestId, batchStart, batchEnd - 1); + } + + // 更新任务整体进度 + // task.getProcessedRows() 已经包含之前的批次处理量 + // currentSegmentProcessed 是当前processRowsWithProgress函数中处理的量 + double totalProgress = (double)(task.getProcessedRows() + currentSegmentProcessed) / task.getTotalRows(); + task.setProgress((int) (totalProgress * 100)); + + // 控制处理速度,防止API限流或CPU过载 + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("[{}] 处理线程被中断", requestId); + break; + } + } + return currentSegmentProcessed; // 返回当前片段处理的行数 + } + + /** + * 并发处理批次API调用 + * + * @param batchData 批次数据 (包含行号和地址) + * @param requestId 请求ID + * @return 包含解析结果的列表 + */ + private List processBatchConcurrently(List batchData, String requestId) { + List> futures = batchData.stream() + .map(rowData -> CompletableFuture.supplyAsync(() -> { + try { + AddressParseResult parseResult = callLLMForAddress(rowData.address, requestId); + if (parseResult != null) { + return ParsedRowResult.success(rowData.rowIndex, parseResult); + } else { + return ParsedRowResult.failed(rowData.rowIndex, "LLM解析失败或返回空结果"); + } + } catch (Exception e) { + log.error("[{}] 处理行 {} (地址: {}) 异常: {}", requestId, rowData.rowIndex, rowData.address, e.getMessage()); + return ParsedRowResult.failed(rowData.rowIndex, e.getMessage()); + } + }, batchExecutor)) // 使用 batchExecutor 执行API调用 + .collect(Collectors.toList()); + + // 等待批次中的所有API调用完成,并设置超时 + // ROW_PROCESS_TIMEOUT_SECONDS * futures.size() 可能会很长,如果batch_size大。 + // 考虑更合理的整体批次超时 + long batchTimeout = ROW_PROCESS_TIMEOUT_SECONDS * BATCH_SIZE; // 例如,每行30秒,一个批次100行,总共3000秒 + + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .orTimeout(batchTimeout, TimeUnit.SECONDS) + .join(); // 等待所有future完成或超时 + } catch (CompletionException e) { + log.error("[{}] 批次处理超时或发生异常,部分任务可能未完成", requestId, e.getCause()); + } + + // 收集所有结果 (包括超时或异常的,getNow会返回默认值) + return futures.stream() + .map(future -> future.getNow(ParsedRowResult.failed(-1, "任务未完成或获取结果异常"))) + .collect(Collectors.toList()); + } + + + /** + * 流式写入结果行到SXSSFWorkbook + * 注意:SXSSFWorkbook 不是线程安全的,此方法应该在同步块中调用或由单线程调用 + */ + private void writeRowResult(Sheet sheet, int rowIndex, AddressParseResult result, ColumnIndices indices) { + // 获取或创建行。SXSSFWorkbook 的createRow方法在流式模式下是安全的,但多个线程同时调用可能会有问题 + Row row = sheet.getRow(rowIndex); // 获取已有的行(如果之前读过并复制了原始内容) + if (row == null) { + row = sheet.createRow(rowIndex); // 如果是新行,则创建 + } + + // 写入解析结果到特定列 + writeCellSafely(row, indices.poiColIndex, result.getPoi()); + writeCellSafely(row, indices.poiTypeColIndex, result.getPoiType()); + writeCellSafely(row, indices.aoiColIndex, result.getAoi()); + writeCellSafely(row, indices.aoiTypeColIndex, result.getAoiType()); + writeCellSafely(row, indices.streetColIndex, result.getStreet()); + writeCellSafely(row, indices.communityColIndex, result.getCommunity()); + } + + /** + * 安全写入单元格 + */ + private void writeCellSafely(Row row, int colIndex, String value) { + if (row == null || colIndex < 0) return; + Cell cell = row.getCell(colIndex); + if (cell == null) { + cell = row.createCell(colIndex); + } + cell.setCellValue(value != null ? value.trim() : ""); + } + + /** + * 复制表头行,并添加新的结果列 + */ + private void copyHeaderRow(Sheet sourceSheet, Sheet targetSheet, ColumnIndices indices) { + Row sourceHeader = sourceSheet.getRow(0); + Row targetHeader = targetSheet.createRow(0); // 始终在目标Sheet创建第一行作为表头 + + if (sourceHeader != null) { + // 复制所有原有单元格样式和值 + for (Cell sourceCell : sourceHeader) { + Cell targetCell = targetHeader.createCell(sourceCell.getColumnIndex()); + targetCell.setCellValue(getCellValue(sourceCell)); + if (sourceCell.getCellStyle() != null) { + targetCell.setCellStyle(sourceCell.getCellStyle()); // 尝试复制样式 + } + } + } + + // 确保目标列存在并设置标题 + String[] headers = {"POI", "POI类型", "AOI", "AOI类型", "街道", "社区"}; + int[] columns = { + indices.poiColIndex, indices.poiTypeColIndex, + indices.aoiColIndex, indices.aoiTypeColIndex, + indices.streetColIndex, indices.communityColIndex + }; + + for (int i = 0; i < headers.length; i++) { + Cell cell = targetHeader.getCell(columns[i]); + if (cell == null) { + cell = targetHeader.createCell(columns[i]); + } + cell.setCellValue(headers[i]); + // 可以添加一个默认的样式,比如加粗 + // CellStyle headerStyle = targetWorkbook.createCellStyle(); + // Font font = targetWorkbook.createFont(); + // font.setBold(true); + // headerStyle.setFont(font); + // cell.setCellStyle(headerStyle); + } + } + + /** + * 优化的大模型调用方法,包含重试机制 + */ + private AddressParseResult callLLMForAddress(String address, String requestId) { + log.info("进入LLM调用"); + for (int retry = 0; retry < MAX_RETRY_COUNT; retry++) { + try { + if (!apiSemaphore.tryAcquire(ROW_PROCESS_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.error("[{}] 获取API许可超时", requestId); + return null; // 超时未获取到许可 + } + // log.debug("[{}] API调用许可已获取 (剩余: {})", requestId, apiSemaphore.availablePermits()); + + // 构造请求 + JSONArray messages = new JSONArray(); + messages.add(new JSONObject().set("role", "system").set("content", SYSTEM_PROMPT)); + messages.add(new JSONObject().set("role", "user").set("content", address)); + + JSONObject requestBody = new JSONObject() + .set("model", "Qwen3-32B") + .set("messages", messages) + .set("max_tokens", 3000) + .set("temperature", 0) // 0表示确定性最高 + .set("seed", 42) + .set("top_p", 0.9); + + // 发送请求(带超时) + String response = HttpUtil.createPost(apiUrl) + .header("Authorization", "Bearer " + bearerToken) + .header("Content-Type", "application/json") + .timeout((int) (ROW_PROCESS_TIMEOUT_SECONDS * 1000)) // 设置连接和读取超时 + .body(requestBody.toString()) + .execute() + .body(); + + if (StrUtil.isBlank(response)) { + log.error("[{}] API返回空响应,地址: {}", requestId, address); + continue; // 尝试重试 + } + + JSONObject jsonResponse = JSONUtil.parseObj(response); + JSONArray choices = jsonResponse.getJSONArray("choices"); + + if (choices == null || choices.isEmpty()) { + log.error("[{}] API返回无choices,地址: {},响应: {}", requestId, address, response); + continue; // 尝试重试 + } + + String content = choices.get(0, JSONObject.class) + .getJSONObject("message") + .getStr("content"); + + if (StrUtil.isBlank(content)) { + log.error("[{}] content为空,地址: {},响应: {}", requestId, address, response); + continue; // 尝试重试 + } + + // 提取JSON + String cleanJson = extractJsonFromContent(content); + if (StrUtil.isBlank(cleanJson)) { + log.error("[{}] 无法从LLM响应中提取JSON,地址: {},内容: {}", requestId, address, content); + continue; // 尝试重试 + } + + JSONObject resultJson = JSONUtil.parseObj(cleanJson); + + return new AddressParseResult( + safeGetString(resultJson, "aoi"), + safeGetString(resultJson, "aoi_type"), + safeGetString(resultJson, "poi"), + safeGetString(resultJson, "poi_type"), + safeGetString(resultJson, "street"), + safeGetString(resultJson, "community"), + // LLM返回的地址可能经过标准化,但这里通常是希望得到原始地址 + // 如果需要标准化后的地址,LLM可以返回在address字段 + safeGetString(resultJson, "address", address) // 如果LLM未返回address,则使用原始输入 + ); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("[{}] API调用线程被中断,地址: {}", requestId, address); + return null; + } catch (Exception e) { + log.error("[{}] 调用大模型失败 (重试 {}/{}),地址: {}", requestId, retry + 1, MAX_RETRY_COUNT, address, e); + if (retry < MAX_RETRY_COUNT - 1) { + try { + Thread.sleep(RETRY_DELAY_MS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return null; + } + } + } finally { + apiSemaphore.release(); // 释放许可 + // log.debug("[{}] API调用许可已释放 (剩余: {})", requestId, apiSemaphore.availablePermits()); + + // 控制请求频率,避免触发API限制 + try { + Thread.sleep(200); // 每次调用后间隔200ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + log.error("[{}] 地址 {} 经过 {} 次重试后仍未能成功解析", requestId, address, MAX_RETRY_COUNT); + return null; // 所有重试均失败 + } + + /** + * 创建用于读取的Workbook(优化大文件读取) + */ + private Workbook createWorkbookForRead(Path filePath) throws IOException { + // XSSFWorkbook适用于xlsx格式 + try (InputStream is = Files.newInputStream(filePath)) { + // 使用XSSFWorkbook (支持流式读取,但可能会加载整个文件到内存,对于超大文件仍需注意) + // 对于非常大的文件,可以通过XSSFReader来流式读取,但这里为了简便,先用这个。 + // 考虑使用 Apache POI SAX 方式处理超大文件,但这会增加代码复杂性。 + // 当前的 XSSFWorkbook(InputStream) 会在内存中构建XMLDOM,对内存有消耗。 + return new XSSFWorkbook(is); + } + } + + /** + * 保存Workbook(SXSSFWorkbook会流式写入) + */ + private void saveWorkbook(SXSSFWorkbook workbook, Path outputFilePath) throws IOException { + try (OutputStream fos = Files.newOutputStream(outputFilePath)) { + workbook.write(fos); + workbook.dispose(); // 清理SXSSFWorkbook产生的临时文件 + } + log.info("文件保存成功: {},大小: {} MB", + outputFilePath.toAbsolutePath(), (double)Files.size(outputFilePath) / (1024.0 * 1024.0)); + } + + // ============ 辅助类 ============ + + /** + * 行数据包装类,只包含处理所需信息,避免持有整个POI Row对象 + */ + private static class RowData { + final int rowIndex; + final String address; + + RowData(int rowIndex, String address) { + this.rowIndex = rowIndex; + this.address = address; + } + } + + /** + * 解析结果包装类,用于在 CompletableFuture 之间传递结果和错误信息 + */ + @Data + @NoArgsConstructor + @AllArgsConstructor + private static class ParsedRowResult { + private int rowIndex; + private AddressParseResult parseResult; // 成功时的解析结果 + private String errorMessage; // 失败时的错误信息 + + public static ParsedRowResult success(int rowIndex, AddressParseResult parseResult) { + return new ParsedRowResult(rowIndex, parseResult, null); + } + + public static ParsedRowResult failed(int rowIndex, String errorMessage) { + return new ParsedRowResult(rowIndex, null, errorMessage); + } + + public boolean isSuccess() { + return parseResult != null; + } + } + + /** + * 基础处理器 + */ + private abstract class BaseProcessor { + protected final ExcelProcessTask task; + protected final ColumnIndices indices; + + public BaseProcessor(ExcelProcessTask task, ColumnIndices indices) { + this.task = task; + this.indices = indices; + } + + public ColumnIndices getIndices() { + return indices; + } + + // 使用 task 的原子计数器更新 + public void incrementSuccessCount(int count) { + task.incrementSuccessCount(count); + } + + public void incrementFailCount(int count) { + task.incrementFailCount(count); + } + } + + /** + * 预览处理器 + */ + private class PreviewProcessor extends BaseProcessor { + public PreviewProcessor(ExcelProcessTask task, ColumnIndices indices) { + super(task, indices); + } + } + + /** + * 完整文件处理器 + */ + private class FullFileProcessor extends BaseProcessor { + public FullFileProcessor(ExcelProcessTask task, ColumnIndices indices) { + super(task, indices); + } + } + + // ============ 工具方法 ============ + + /** + * 创建临时文件,将MultipartFile保存到文件系统 + */ + private Path createTempFile(MultipartFile file, String requestId) throws IOException { + Path tempDir = Paths.get(TEMP_FILE_DIR); + if (!Files.exists(tempDir)) { + Files.createDirectories(tempDir); + } + + String originalFilename = file.getOriginalFilename(); + String suffix = ""; + int dotIndex = originalFilename.lastIndexOf('.'); + if (dotIndex > 0) { + suffix = originalFilename.substring(dotIndex); + } + // 使用 requestId + 原始文件名作为临时文件名称,确保唯一性 + Path tempFilePath = tempDir.resolve(requestId + "_" + originalFilename); + + try (InputStream inputStream = file.getInputStream()) { + Files.copy(inputStream, tempFilePath); + } + return tempFilePath; + } + + /** + * 创建输出文件路径 + */ + private Path createOutputFilePath(String requestId) throws IOException { + Path outputDir = Paths.get(OUTPUT_FILE_DIR); + if (!Files.exists(outputDir)) { + Files.createDirectories(outputDir); + } + return outputDir.resolve(requestId + "_processed.xlsx"); + } + + /** + * 清理临时文件 + */ + private void cleanupTempFile(Path filePath) { + if (filePath != null && Files.exists(filePath)) { + try { + // 确保只删除我们自己创建的临时目录下的文件 + if (filePath.startsWith(Paths.get(TEMP_FILE_DIR))) { + Files.delete(filePath); + log.info("成功删除临时文件: {}", filePath); + } else { + log.warn("尝试删除非预期的文件路径,已阻止: {}", filePath); + } + } catch (IOException e) { + log.error("删除临时文件失败: {}", filePath, e); + } + } + } + + /** + * 定期清理过期的任务和关联的临时文件 + */ + private void cleanupOldTasks() { + long cutoff = System.currentTimeMillis() - (24 * 60 * 60 * 1000); // 24小时前的任务 + + Iterator> it = activeTasks.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + ExcelProcessTask task = entry.getValue(); + + // 如果任务已结束(COMPLETED, FAILED, PREVIEW_COMPLETED)且结束时间已超过截止点 + if (task.getEndTime() > 0 && task.getEndTime() < cutoff) { + // 确保异步任务已完成,否则get()会阻塞 + if (task.getProcessFuture() != null && task.getProcessFuture().isDone()) { + it.remove(); + log.info("清理过期任务: {}", entry.getKey()); + + // 清理相关临时输入文件 + Path inputFile = tempInputFiles.remove(entry.getKey()); + if (inputFile != null) { + cleanupExecutor.execute(() -> cleanupTempFile(inputFile)); + } + // 清理输出文件(如果任务完成且有输出文件) + /*if (task.getOutputFile() != null && !task.getOutputFile().isEmpty()) { + Path outputFile = Paths.get(task.getOutputFile()); + cleanupExecutor.execute(() -> cleanupTempFile(outputFile)); // 重用 cleanupTempFile,但需要注意目录判断 + }*/ + } + } + } + } + + + // ============ 公开API方法 ============ + + public ExcelProcessTask getTaskStatus(String requestId) { + return activeTasks.get(requestId); + } + + public Map getAllActiveTasks() { + return new HashMap<>(activeTasks); // 返回副本,防止外部修改 + } + + /** + * 继续处理预览后的剩余部分 + */ + public ExcelProcessResult continueProcessing(String requestId) { + ExcelProcessTask task = activeTasks.get(requestId); + if (task == null) { + throw new IllegalArgumentException("任务不存在: " + requestId); + } + + if (!"PREVIEW_COMPLETED".equals(task.getStatus())) { + throw new IllegalStateException("任务状态不正确,无法继续处理,当前状态: " + task.getStatus()); + } + + Path tempInputFilePath = tempInputFiles.get(requestId); + if (tempInputFilePath == null || !Files.exists(tempInputFilePath)) { + throw new IllegalStateException("任务对应的临时输入文件不存在或已丢失,无法继续处理。请重新上传文件。"); + } + + // 重置任务状态以便进行完整处理 + task.setStatus("RESUMING_FULL_PROCESSING"); + task.setPreviewMode(false); // 切换到非预览模式 + task.setEndTime(0); // 重置结束时间 + + // 异步执行剩余部分的完整处理 + CompletableFuture processFuture = CompletableFuture.supplyAsync(() -> { + return doProcessLargeExcelFile(null, tempInputFilePath, requestId, false); // 传入null MultipartFile,使用existingInputFilePath + }, batchExecutor); + + task.setProcessFuture(processFuture); // 更新任务的Future + + // 立即返回一个表示“继续处理中”的结果 + return new ExcelProcessResult( + requestId, + task.getFilename(), + null, + task.getSuccessCount(), // 沿用预览时的计数 + task.getFailCount(), // 沿用预览时的计数 + task.getTotalRows(), + "RESUMING_FULL_PROCESSING", + task.getElapsedTime(), + false, // 现在是完整处理模式 + task.getPreviewRows(), + task.getPreviewCompletedRows() + ); + } + + // ============ 创建结果对象的方法 ============ + + private ExcelProcessResult createSuccessResult(ExcelProcessTask task, File outputFile, boolean isPreview) { + // 如果 outputFilePath 为空,表示可能是在预览模式下取消了保存 + String outputFilePath = (outputFile != null && outputFile.exists()) ? outputFile.getAbsolutePath() : null; + + return new ExcelProcessResult( + task.getRequestId(), + task.getFilename(), + outputFilePath, + task.getSuccessCount(), + task.getFailCount(), + task.getTotalRows(), + task.getStatus(), // 直接使用任务的最终状态 + task.getElapsedTime(), + isPreview, + task.getPreviewRows(), + task.getPreviewCompletedRows() + ); + } + + private ExcelProcessResult createErrorResult(ExcelProcessTask task, Exception e) { + return new ExcelProcessResult( + task.getRequestId(), + task.getFilename(), + null, + task.getSuccessCount(), + task.getFailCount(), + task.getTotalRows(), + "FAILED: " + e.getMessage(), + task.getElapsedTime(), + task.isPreviewMode(), + task.getPreviewRows(), + task.getPreviewCompletedRows() + ); + } + + // ============ 其他辅助方法(与之前相同) ============ + + private String extractJsonFromContent(String content) { + if (StrUtil.isBlank(content)) return null; + + try { + // 移除 ... 标签及其内容 + content = content.replaceAll("(?s).*?", "").trim(); + + if (JSONUtil.isJson(content)) { + return content; + } + + int start = content.indexOf('{'); + int end = content.lastIndexOf('}'); + + if (start >= 0 && end > start) { + String candidate = content.substring(start, end + 1); + if (JSONUtil.isJson(candidate)) { + return candidate; + } + } + log.warn("无法从内容中提取JSON: {}", content); + return null; + + } catch (Exception e) { + log.error("提取JSON失败,内容: {}", content, e); + return null; + } + } + + private String safeGetString(JSONObject json, String key) { + return safeGetString(json, key, ""); + } + + private String safeGetString(JSONObject json, String key, String defaultValue) { + try { + Object value = json.get(key); + return value == null ? defaultValue : value.toString().trim(); + } catch (Exception e) { + log.debug("获取JSON字段 {} 失败,使用默认值: {}", key, defaultValue, e); + return defaultValue; + } + } + + private String getCellValue(Cell cell) { + if (cell == null) return ""; + try { + DataFormatter formatter = new DataFormatter(); + return formatter.formatCellValue(cell).trim(); + } catch (Exception e) { + log.warn("获取单元格值失败: {}", e.getMessage()); + return ""; + } + } + + private boolean isValidAddress(String address) { + if (StrUtil.isBlank(address)) return false; + + String lowerCaseAddress = address.trim().toLowerCase(); + // 常见的无效地址关键词 + String[] invalidKeywords = {"不详", "未知", "无地址", "无", "null", "nan", "无信息", "待补充", "暂无", "空"}; + for (String keyword : invalidKeywords) { + if (lowerCaseAddress.contains(keyword)) { + return false; + } + } + + // 地址长度至少大于1个字符 + return address.trim().length() > 1; + } + + /** + * 列索引信息 + */ + private static class ColumnIndices { + int addressColIndex = -1; // 地址列索引 + int poiColIndex = -1; + int poiTypeColIndex = -1; + int aoiColIndex = -1; + int aoiTypeColIndex = -1; + int streetColIndex = -1; + int communityColIndex = -1; + + @Override + public String toString() { + return "ColumnIndices{" + + "addressColIndex=" + addressColIndex + + ", poiColIndex=" + poiColIndex + + ", poiTypeColIndex=" + poiTypeColIndex + + ", aoiColIndex=" + aoiColIndex + + ", aoiTypeColIndex=" + aoiTypeColIndex + + ", streetColIndex=" + streetColIndex + + ", communityColIndex=" + communityColIndex + + '}'; + } + } + + /** + * 定位列索引 + */ + private ColumnIndices locateColumns(Sheet sheet, String requestId) { + ColumnIndices indices = new ColumnIndices(); + Row headerRow = sheet.getRow(0); + + if (headerRow != null) { + for (Cell cell : headerRow) { + String cellValue = getCellValue(cell); + if (cellValue != null) { + if (cellValue.contains("事发地址") || cellValue.contains("地址") || cellValue.contains("事发地")) { + indices.addressColIndex = cell.getColumnIndex(); + break; // 找到地址列后即可退出 + } + } + } + } + + // 如果未找到地址列,则使用默认值 + if (indices.addressColIndex == -1) { + log.warn("[{}] 未在表头中找到明确的地址列,将使用默认索引 3", requestId); + indices.addressColIndex = 3; // 默认使用第D列 + } + + // 目标结果列,保持固定位置 + indices.poiColIndex = 28; // AC列 + indices.poiTypeColIndex = 29; // AD列 + indices.aoiColIndex = 30; // AE列 + indices.aoiTypeColIndex = 31; // AF列 + indices.streetColIndex = 32; // AG列 + indices.communityColIndex = 33; // AH列 + + log.info("[{}] 列索引定位完成: {}", requestId, indices); + + return indices; + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/util/ExcelProcessTask.java b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/util/ExcelProcessTask.java new file mode 100644 index 00000000..2351e59e --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/java/org/dromara/extract/util/ExcelProcessTask.java @@ -0,0 +1,73 @@ +package org.dromara.extract.util; + +import lombok.Data; +import org.dromara.extract.domain.ExcelProcessResult; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +@Data +public class ExcelProcessTask { + private String requestId; + private String filename; + private volatile String status; // 使用 volatile 确保多线程可见性 + private volatile int progress; // 使用 volatile + private AtomicInteger totalRows = new AtomicInteger(0); + private AtomicInteger processedRows = new AtomicInteger(0); // 实际处理的行数(不含表头) + private AtomicInteger successCount = new AtomicInteger(0); + private AtomicInteger failCount = new AtomicInteger(0); + private String errorMessage; + private long startTime; + private long endTime; + private CompletableFuture processFuture; + private boolean previewMode; + private boolean previewCompleted = false; // 预览是否已完成 + private int previewRows; // 预览的行数限制 + private int previewCompletedRows = 0; // 实际完成预览的行数 + + public ExcelProcessTask(String requestId, String filename) { + this.requestId = requestId; + this.filename = filename; + this.startTime = System.currentTimeMillis(); + } + + public long getElapsedTime() { + if (endTime == 0) { + return System.currentTimeMillis() - startTime; + } + return endTime - startTime; + } + + // 提供原子操作来更新计数器 + public void incrementProcessedRows(int count) { + this.processedRows.getAndAdd(count); + } + + public void incrementSuccessCount(int count) { + this.successCount.getAndAdd(count); + } + + public void incrementFailCount(int count) { + this.failCount.getAndAdd(count); + } + + public int getProcessedRows() { + return this.processedRows.get(); + } + + public int getSuccessCount() { + return this.successCount.get(); + } + + public int getFailCount() { + return this.failCount.get(); + } + + public int getTotalRows() { + return this.totalRows.get(); + } + + public void setTotalRows(int totalRows) { + this.totalRows.set(totalRows); + } +} diff --git a/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/SdJqdMapper.xml b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/SdJqdMapper.xml new file mode 100644 index 00000000..0ff8fa05 --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/SdJqdMapper.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + diff --git a/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/ShJqdMapper.xml b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/ShJqdMapper.xml new file mode 100644 index 00000000..84f430aa --- /dev/null +++ b/stwzhj-modules/stwzhj-extract/src/main/resources/mapper/extract/ShJqdMapper.xml @@ -0,0 +1,13 @@ + + + + + + + + + + +