点位查询换成从elasticsearch中查询

stwzhj
luyya 2026-04-10 10:50:48 +08:00
parent a8f0b96a75
commit 6b5aa718d2
5 changed files with 502 additions and 23 deletions

View File

@ -27,7 +27,17 @@ public class GpsInfoEntity implements Serializable {
*/
private String deviceType;
private String zzjgdm;
private String zzjgmc;
private String policeNo;
private String policeName;
private String phoneNum;
private String carNum;
private Double[] location;
@ -45,5 +55,7 @@ public class GpsInfoEntity implements Serializable {
//地市代码 34013402
private String infoSource;
private Integer online;
}

View File

@ -136,6 +136,7 @@ public class GpsServiceImpl implements IGpsService {
@Override
public R updateDataStatus(List<EsGpsInfoVO2> esGpsInfoVO2s) {
Map<String, String> onlineUserDataMap = new HashMap<>();
BulkRequest bulkRequest = new BulkRequest();
for (EsGpsInfoVO2 info : esGpsInfoVO2s) {
String zzjgdm = info.getZzjgdm();
String deviceCode = info.getDeviceCode();
@ -147,7 +148,10 @@ public class GpsServiceImpl implements IGpsService {
":" + deviceCode;
onlineUserDataMap.put(onlineUsersKey, jsonValue);
requestHandler.sendToKafka(info);
IndexRequest indexRequest = buildEsIndexRequest(info);
bulkRequest.add(indexRequest);
}
requestHandler.esRealBulkSave(bulkRequest);
requestHandler.redisOnlineUserBatch(onlineUserDataMap, 864000); //存放10天
return R.ok();
}
@ -216,7 +220,7 @@ public class GpsServiceImpl implements IGpsService {
return esGpsInfoVO2;
}
private IndexRequest buildEsIndexRequest(EsGpsInfo esGpsInfo) {
private IndexRequest buildEsIndexRequest(EsGpsInfoVO2 esGpsInfo) {
IndexRequest request = getIndexRequest(esGpsInfo);
return request;
}
@ -300,7 +304,7 @@ public class GpsServiceImpl implements IGpsService {
* @param esGpsInfo
* @return
*/
public IndexRequest getIndexRequest(EsGpsInfo esGpsInfo){
public IndexRequest getIndexRequest(EsGpsInfoVO2 esGpsInfo){
GpsInfoEntity gpsInfoEntity = new GpsInfoEntity();
BeanUtil.copyProperties(esGpsInfo,gpsInfoEntity);
double lng ;

View File

@ -8,6 +8,7 @@ import com.alibaba.fastjson.JSONArray;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.common.core.domain.R;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.location.service.ISearchService;
import org.dromara.system.api.RemoteDeviceService;
import org.dromara.system.api.domain.bo.RemoteDeviceBo;
import org.dromara.system.api.domain.vo.RemoteDeviceVo;
@ -16,12 +17,17 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.resps.GeoRadiusResponse;
import javax.annotation.Resource;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.*;
@Configuration
@ -32,6 +38,9 @@ public class LocationController {
@DubboReference
private RemoteDeviceService deviceService;
@Resource
private ISearchService searchService;
Logger logger = LoggerFactory.getLogger(LocationController.class);
@ -149,21 +158,6 @@ public class LocationController {
String lat = params.get("lat").toString();
String lng = params.get("lng").toString();
String dist = params.get("distance").toString();
/* List<GeoRadiusResponse> geoRadiusResponses = redisUtil.nearByXYReadonly(RedisConstants.ONLINE_USERS_GEO,
Double.parseDouble(lng), Double.parseDouble(lat), Double.parseDouble(dist));
List<Device> list = new ArrayList<>();
for (GeoRadiusResponse geoRadiusRespons : geoRadiusResponses) {
String memberByString = geoRadiusRespons.getMemberByString();
logger.info("member:"+memberByString);
String[] strs = memberByString.split("#");
logger.info("key值:"+keys+":"+strs[0]+":"+strs[1]+":"+strs[2]);
Object object = redisUtil.get(keys+":"+strs[0]+":"+strs[1]+":"+strs[2]);
if (null != object){
Device device = FastJSONUtil.parsePojo(object.toString(), Device.class);
//device = rebuildDevice(device);
list.add(device);
}
}*/
return R.ok();
}
@ -190,11 +184,6 @@ public class LocationController {
device.setValid(1);
device.setDeviceType(type);
device.setPoliceName(name);
/*if("01".equals(type) || "02".equals(type) || "06".equals(type) ||"07".equals(type) ||"08".equals(type) || "09".equals(type)){
device.setCarNum(name);
}else{
device.setPoliceNo(name);
}*/
List<RemoteDeviceVo> devices = new ArrayList<>();
if (!"".equals(zzjgdms)){ //前端选择机构时
String[] zzjgdm = zzjgdms.split(",");
@ -230,6 +219,290 @@ public class LocationController {
return zzjgdm;
}
/**
* Elasticsearch
* type()deptId(ID)zzjgdm(deptId)online(线1-线0-线)
* minLng()minLat()maxLng()maxLat()
* deviceCode()
*/
@PostMapping("/getAllLocationFromEs")
public R getAllLocationFromEs(@RequestBody Map<String,Object> params){
logger.info("从ES获取设备最新位置参数: {}", params);
String type = params.get("type") != null ? params.get("type").toString() : null;
String deptId = params.get("deptId") != null ? params.get("deptId").toString() : null;
String zzjgdm = params.get("zzjgdm") != null ? params.get("zzjgdm").toString() : null;
Integer online = params.get("online") != null ? Convert.toInt(params.get("online")) : null;
Double minLng = params.get("minLng") != null ? Convert.toDouble(params.get("minLng")) : null;
Double minLat = params.get("minLat") != null ? Convert.toDouble(params.get("minLat")) : null;
Double maxLng = params.get("maxLng") != null ? Convert.toDouble(params.get("maxLng")) : null;
Double maxLat = params.get("maxLat") != null ? Convert.toDouble(params.get("maxLat")) : null;
String deviceCode = params.get("deviceCode") != null ? params.get("deviceCode").toString() : null;
List<Map<String, Object>> list = searchService.getAllLatestLocationFromEs(type, deptId, zzjgdm, online, minLng, minLat, maxLng, maxLat, deviceCode);
list.removeAll(Collections.singleton(null));
logger.info("从ES查询到设备数量: {}", list.size());
return R.ok(list);
}
/**
* ASCII - GET
* ASCII
*
* (ASCII):
* type: ()
* deptId: ID(zzjgdm)
* zzjgdm: (deptId)
* online: 线(1-线0-线)
* minLng: ()
* minLat: ()
* maxLng: ()
* maxLat: ()
* deviceCode: ()
*
* (ASCII):
* STATUS=OK|COUNT=10
* DEVICE:deviceCode=001,deviceName=,lng=116.397,lat=39.908,online=1,time=2024-01-01 12:00:00
* DEVICE:deviceCode=002,deviceName=2,lng=116.398,lat=39.909,online=1,time=2024-01-01 12:00:01
* ...
* END
*
* :
* STATUS=ERROR|MESSAGE=
*/
@GetMapping("/getAllLocationAscii")
public void getAllLocationAscii(
@RequestParam(required = false) String type,
@RequestParam(required = false) String deptId,
@RequestParam(required = false) String zzjgdm,
@RequestParam(required = false) Integer online,
@RequestParam(required = false) Double minLng,
@RequestParam(required = false) Double minLat,
@RequestParam(required = false) Double maxLng,
@RequestParam(required = false) Double maxLat,
@RequestParam(required = false) String deviceCode,
HttpServletResponse response) {
response.setContentType("text/plain;charset=US-ASCII");
response.setCharacterEncoding("US-ASCII");
try (PrintWriter writer = response.getWriter()) {
logger.info("ASCII协议获取设备位置 - type={}, deptId={}, zzjgdm={}, online={}, deviceCode={}, bounds=[{},{},{},{}]",
type, deptId, zzjgdm, online, deviceCode, minLng, minLat, maxLng, maxLat);
List<Map<String, Object>> list = searchService.getAllLatestLocationFromEs(type, deptId, zzjgdm, online, minLng, minLat, maxLng, maxLat, deviceCode);
list.removeAll(Collections.singleton(null));
// ASCII格式响应头
writer.print("STATUS=OK|COUNT=");
writer.println(list.size());
// 每条设备数据
for (Map<String, Object> item : list) {
StringBuilder sb = new StringBuilder();
sb.append("DEVICE:");
boolean first = true;
for (Map.Entry<String, Object> entry : item.entrySet()) {
if (!first) {
sb.append(",");
}
first = false;
sb.append(escapeAscii(entry.getKey()));
sb.append("=");
Object value = entry.getValue();
sb.append(value != null ? escapeAscii(value.toString()) : "");
}
writer.println(sb.toString());
}
// 结束标记
writer.println("END");
writer.flush();
logger.info("ASCII协议返回设备数量: {}", list.size());
} catch (Exception e) {
logger.error("ASCII协议接口异常", e);
try (PrintWriter writer = response.getWriter()) {
writer.print("STATUS=ERROR|MESSAGE=");
writer.println(escapeAscii(e.getMessage() != null ? e.getMessage() : "Unknown error"));
writer.flush();
} catch (IOException ioEx) {
logger.error("写入错误响应失败", ioEx);
}
}
}
/**
* ASCII - POST
* POSTJSONJSONASCII
*
* (JSON):
* {
* "ASCII": "type=01,02\nonline=1\nminLng=116.0\nminLat=39.0\nmaxLng=117.0\nmaxLat=40.0"
* }
*
* GET
*/
@PostMapping(value = "/getAllLocationAscii")
public R<List<Map<String, Object>>> getAllLocationAsciiPost(@RequestBody Map<String, Object> params) {
// 解析JSON中的ASCII字段使用数组包装以支持lambda赋值
String[] typeHolder = new String[1];
String[] deptIdHolder = new String[1];
String[] zzjgdmHolder = new String[1];
Integer[] onlineHolder = new Integer[1];
Double[] minLngHolder = new Double[1];
Double[] minLatHolder = new Double[1];
Double[] maxLngHolder = new Double[1];
Double[] maxLatHolder = new Double[1];
String[] deviceCodeHolder = new String[1];
logger.info("接收到的原始参数: {}", params);
// 支持 ASCII 和 ascii 两种key
Object asciiObj = params.get("ASCII");
if (asciiObj == null) {
asciiObj = params.get("ascii");
}
String asciiBody = asciiObj != null ? asciiObj.toString() : null;
logger.info("解析到的ASCII内容: [{}]", asciiBody);
if (asciiBody != null && !asciiBody.isEmpty()) {
// 逐个解析已知的key
parseAndSet(asciiBody, "type", v -> typeHolder[0] = v);
parseAndSet(asciiBody, "deptId", v -> deptIdHolder[0] = v);
parseAndSet(asciiBody, "zzjgdm", v -> zzjgdmHolder[0] = v);
parseAndSetInt(asciiBody, "online", v -> onlineHolder[0] = v);
parseAndSetDouble(asciiBody, "minLng", v -> minLngHolder[0] = v);
parseAndSetDouble(asciiBody, "minLat", v -> minLatHolder[0] = v);
parseAndSetDouble(asciiBody, "maxLng", v -> maxLngHolder[0] = v);
parseAndSetDouble(asciiBody, "maxLat", v -> maxLatHolder[0] = v);
parseAndSet(asciiBody, "deviceCode", v -> deviceCodeHolder[0] = v);
}
String type = typeHolder[0];
String deptId = deptIdHolder[0];
String zzjgdm = zzjgdmHolder[0];
Integer online = onlineHolder[0];
Double minLng = minLngHolder[0];
Double minLat = minLatHolder[0];
Double maxLng = maxLngHolder[0];
Double maxLat = maxLatHolder[0];
String deviceCode = deviceCodeHolder[0];
logger.info("ASCII协议POST获取设备位置 - type={}, deptId={}, zzjgdm={}, online={}, deviceCode={}", type, deptId, zzjgdm, online, deviceCode);
List<Map<String, Object>> list = searchService.getAllLatestLocationFromEs(type, deptId, zzjgdm, online, minLng, minLat, maxLng, maxLat, deviceCode);
list.removeAll(Collections.singleton(null));
// 为每个对象添加ASCII字段
for (Map<String, Object> item : list) {
StringBuilder sb = new StringBuilder();
sb.append("DEVICE:");
boolean first = true;
for (Map.Entry<String, Object> entry : item.entrySet()) {
if (!first) sb.append(",");
first = false;
sb.append(escapeAscii(entry.getKey()));
sb.append("=");
Object val = entry.getValue();
sb.append(val != null ? escapeAscii(val.toString()) : "");
}
item.put("ASCII", sb.toString());
}
logger.info("ASCII协议POST返回设备数量: {}", list.size());
return R.ok(list);
}
/**
* ASCII
* ASCII
*/
private String escapeAscii(String input) {
if (input == null) {
return "";
}
StringBuilder sb = new StringBuilder();
for (char c : input.toCharArray()) {
if (c >= 32 && c <= 126 && c != '=' && c != ',' && c != '|' && c != ':') {
sb.append(c);
} else {
// 转义特殊字符: = , | : 以及非ASCII字符
sb.append(String.format("%%%02X", (int) c));
}
}
return sb.toString();
}
/**
* ASCIIkey
*/
private void parseAndSet(String content, String key, java.util.function.Consumer<String> setter) {
int idx = content.indexOf(key + "=");
if (idx >= 0) {
int start = idx + key.length() + 1;
// 找到value的结束位置换行、分号、空格或字符串结尾
int end = content.length();
for (int i = start; i < content.length(); i++) {
char c = content.charAt(i);
if (c == '\n' || c == '\r' || c == ';' || c == ' ') {
end = i;
break;
}
}
String value = content.substring(start, end).trim();
logger.info("解析到key=[{}], value=[{}]", key, value);
setter.accept(value);
}
}
/**
* Integer
*/
private void parseAndSetInt(String content, String key, java.util.function.Consumer<Integer> setter) {
int idx = content.indexOf(key + "=");
if (idx >= 0) {
int start = idx + key.length() + 1;
int end = content.length();
for (int i = start; i < content.length(); i++) {
char c = content.charAt(i);
if (c == '\n' || c == '\r' || c == ';' || c == ' ') {
end = i;
break;
}
}
String value = content.substring(start, end).trim();
logger.info("解析到key=[{}], value=[{}]", key, value);
if (!value.isEmpty()) {
setter.accept(Convert.toInt(value));
}
}
}
/**
* Double
*/
private void parseAndSetDouble(String content, String key, java.util.function.Consumer<Double> setter) {
int idx = content.indexOf(key + "=");
if (idx >= 0) {
int start = idx + key.length() + 1;
int end = content.length();
for (int i = start; i < content.length(); i++) {
char c = content.charAt(i);
if (c == '\n' || c == '\r' || c == ';' || c == ' ') {
end = i;
break;
}
}
String value = content.substring(start, end).trim();
logger.info("解析到key=[{}], value=[{}]", key, value);
if (!value.isEmpty()) {
setter.accept(Convert.toDouble(value));
}
}
}
}

View File

@ -13,5 +13,19 @@ public interface ISearchService {
List<EsGpsInfoVO2> queryDistinctDevicesNearPoint(QueryBuilder spatialQuery, String indexName) throws IOException;
/**
* Elasticsearch
* @param type
* @param deptId IDzzjgdm
* @param zzjgdm deptId
* @param online 线1-线0-线null-
* @param minLng
* @param minLat
* @param maxLng
* @param maxLat
* @param deviceCode
* @return
*/
List<Map<String, Object>> getAllLatestLocationFromEs(String type, String deptId, String zzjgdm, Integer online, Double minLng, Double minLat, Double maxLng, Double maxLat, String deviceCode);
}

View File

@ -227,4 +227,180 @@ public class SearchServiceImpl implements ISearchService {
};
}
@Override
public List<Map<String, Object>> getAllLatestLocationFromEs(String type, String deptId, String zzjgdm, Integer online, Double minLng, Double minLat, Double maxLng, Double maxLat, String deviceCode) {
List<Map<String, Object>> resultList = new ArrayList<>();
List<String> indexList = getRecentIndexList();
log.info("查询ES索引列表: {}", indexList);
if (indexList.isEmpty()) {
return resultList;
}
try {
// 构建查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 设备类型条件 - 支持单个、多个逗号分隔或不传
if (type != null && !type.isEmpty()) {
String[] types = type.split(",");
BoolQueryBuilder typeQuery = QueryBuilders.boolQuery();
typeQuery.must(QueryBuilders.existsQuery("deviceType"));
if (types.length == 1) {
typeQuery.must(QueryBuilders.termQuery("deviceType", types[0]));
} else {
typeQuery.must(QueryBuilders.termsQuery("deviceType", types));
}
boolQuery.must(typeQuery);
}
// 设备编码条件 - 支持单个、多个逗号分隔或不传
if (deviceCode != null && !deviceCode.isEmpty()) {
String[] deviceCodes = deviceCode.split(",");
BoolQueryBuilder deviceCodeQuery = QueryBuilders.boolQuery();
deviceCodeQuery.must(QueryBuilders.existsQuery("deviceCode"));
if (deviceCodes.length == 1) {
deviceCodeQuery.must(QueryBuilders.termQuery("deviceCode", deviceCodes[0]));
} else {
deviceCodeQuery.must(QueryBuilders.termsQuery("deviceCode", deviceCodes));
}
boolQuery.must(deviceCodeQuery);
}
// 部门ID条件单个- 使用zzjgdm字段
if (deptId != null && !deptId.isEmpty()) {
String deptPattern = deptIdSub(deptId);
BoolQueryBuilder deptQuery = QueryBuilders.boolQuery();
deptQuery.must(QueryBuilders.existsQuery("zzjgdm"));
if (deptPattern.endsWith("*")) {
deptPattern = deptPattern.substring(0, deptPattern.length() - 1);
deptQuery.must(QueryBuilders.prefixQuery("zzjgdm", deptPattern));
} else {
deptQuery.must(QueryBuilders.termQuery("zzjgdm", deptId));
}
boolQuery.must(deptQuery);
}
// 组织机构代码条件(多个用逗号分隔)- 使用zzjgdm字段
if (zzjgdm != null && !zzjgdm.isEmpty()) {
String[] zzjgdms = zzjgdm.split(",");
BoolQueryBuilder zzjgdmBoolQuery = QueryBuilders.boolQuery();
zzjgdmBoolQuery.must(QueryBuilders.existsQuery("zzjgdm"));
if (zzjgdms.length == 1) {
String deptPattern = deptIdSub(zzjgdms[0]);
if (deptPattern.endsWith("*")) {
deptPattern = deptPattern.substring(0, deptPattern.length() - 1);
zzjgdmBoolQuery.must(QueryBuilders.prefixQuery("zzjgdm", deptPattern));
} else {
zzjgdmBoolQuery.must(QueryBuilders.termQuery("zzjgdm", zzjgdms[0]));
}
} else {
BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery();
for (String zzjg : zzjgdms) {
String deptPattern = deptIdSub(zzjg);
if (deptPattern.endsWith("*")) {
deptPattern = deptPattern.substring(0, deptPattern.length() - 1);
shouldQuery.should(QueryBuilders.prefixQuery("zzjgdm", deptPattern));
} else {
shouldQuery.should(QueryBuilders.termQuery("zzjgdm", zzjg));
}
}
shouldQuery.minimumShouldMatch(1);
zzjgdmBoolQuery.must(shouldQuery);
}
boolQuery.must(zzjgdmBoolQuery);
}
// 在线状态条件
if (online != null) {
BoolQueryBuilder onlineQuery = QueryBuilders.boolQuery();
onlineQuery.must(QueryBuilders.existsQuery("online"));
onlineQuery.must(QueryBuilders.termQuery("online", online));
boolQuery.must(onlineQuery);
}
// 可视范围(边界框)查询 - 当四个边界参数都不为空时添加
if (minLng != null && minLat != null && maxLng != null && maxLat != null) {
BoolQueryBuilder locationQuery = QueryBuilders.boolQuery();
locationQuery.must(QueryBuilders.existsQuery("location"));
locationQuery.must(QueryBuilders.geoBoundingBoxQuery("location")
.setCorners(maxLat, minLng, minLat, maxLng));
boolQuery.must(locationQuery);
}
// 使用聚合查询获取每个设备的最新记录
TermsAggregationBuilder aggregation = AggregationBuilders.terms("distinct_devices")
.field("deviceCode")
.size(10000)
.shardSize(20000)
.subAggregation(
AggregationBuilders.topHits("latest_record")
.size(1)
.sort("gpsTime", SortOrder.DESC)
);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(boolQuery)
.aggregation(aggregation)
.size(0);
SearchRequest request = new SearchRequest(indexList.toArray(new String[0]))
.source(sourceBuilder)
.indicesOptions(IndicesOptions.lenientExpandOpen());
log.info("ES查询DSL: {}", sourceBuilder.toString());
// 执行查询
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 解析聚合结果
Terms terms = response.getAggregations().get("distinct_devices");
log.info("查询到设备数量: {}", terms.getBuckets().size());
for (Terms.Bucket bucket : terms.getBuckets()) {
TopHits topHits = bucket.getAggregations().get("latest_record");
SearchHit[] hits = topHits.getHits().getHits();
if (hits.length > 0) {
Map<String, Object> source = hits[0].getSourceAsMap();
resultList.add(source);
}
}
} catch (Exception e) {
log.error("从ES查询设备最新位置异常", e);
}
return resultList;
}
/**
* 7
*/
private List<String> getRecentIndexList() {
List<String> indexList = new ArrayList<>();
DateTime now = DateUtil.date();
for (int i = 0; i < 1; i++) {
DateTime dateTime = DateUtil.offsetDay(now, -i);
String dateStr = dateTime.toString("yyyyMMdd");
indexList.add("rs_gpsinfo" + dateStr);
}
return indexList;
}
/**
* IDcontrollerdeptIdSub
*/
private String deptIdSub(String zzjgdm) {
if (zzjgdm.endsWith("0000000000")) { // 省厅 即全部
zzjgdm = zzjgdm.substring(0, 2) + "*";
} else if (zzjgdm.endsWith("00000000")) { //地市
zzjgdm = zzjgdm.substring(0, 4) + "*";
} else if (zzjgdm.endsWith("000000")) { // 分局
zzjgdm = zzjgdm.substring(0, 6) + "*";
} else { // 支队
zzjgdm = zzjgdm.substring(0, 8) + "*";
}
return zzjgdm;
}
}