From 0ff37e767eb29f56b21a89c2a573004bd62099a0 Mon Sep 17 00:00:00 2001 From: luyya Date: Mon, 26 May 2025 15:43:30 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0websocket=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 +- stwzhj-api/pom.xml | 1 + stwzhj-api/stwzhj-api-bom/pom.xml | 6 + stwzhj-api/stwzhj-api-location/pom.xml | 33 ++++ .../api/RemoteElasticSearchService.java | 8 + .../src/main/resources/logback-plus.xml | 80 +++++++-- .../src/main/resources/common-dubbo.yml | 4 +- .../common/redis/utils/RedisUtils.java | 8 + .../handler/PlusWebSocketHandler.java | 5 + .../holder/WebSocketSessionHolder.java | 8 +- .../gateway/filter/GlobalCorsFilter.java | 50 +++++- .../gateway/filter/GlobalLogFilter.java | 1 - .../gateway/filter/WebSocketAuthFilter.java | 65 +++++++ .../src/main/resources/logback-plus.xml | 168 ++++++++---------- stwzhj-modules/pom.xml | 1 + .../consumer/handler/ConsumerWorker.java | 27 ++- .../handler/DataInsertBatchHandler.java | 10 +- .../src/main/resources/logback-plus.xml | 68 ++++++- .../data2es/handler/RedisExpireListener.java | 7 +- .../src/main/resources/logback-plus.xml | 82 +++++++-- .../stwzhj-kafkaToWebsocket/pom.xml | 139 +++++++++++++++ .../KafkaToSocketApplication.java | 26 +++ .../kafka2Websocket/config/KafkaConfig.java | 66 +++++++ .../config/KafkaProperties.java | 17 ++ .../config/WebSocketConfig.java | 58 ++++++ .../config/WebSocketProperties.java | 26 +++ .../consumer/KafkaConsumerManager.java | 113 ++++++++++++ .../consumer/KafkaConsumerService.java | 39 ++++ .../consumer/MessageBufferManager.java | 71 ++++++++ .../kafka2Websocket/dto/SharedState.java | 33 ++++ .../dto/WebSocketMessageDto.java | 29 +++ .../handle/KafkaWebSocketHandler.java | 113 ++++++++++++ .../holder/WebSocketSessionHolder.java | 116 ++++++++++++ .../interceptor/AuthInterceptor.java | 42 +++++ .../kafka2Websocket/utils/SocketUtils.java | 153 ++++++++++++++++ .../src/main/resources/application.yml | 34 ++++ .../src/main/resources/banner.txt | 10 ++ .../src/main/resources/logback-plus.xml | 86 +++++++++ stwzhj-modules/stwzhj-location/pom.xml | 11 ++ .../dromara/location/LocationApplication.java | 1 + .../controller/LocationController.java | 24 ++- .../dubbo/RemoteElasticSearchServiceImpl.java | 23 +++ .../org/dromara/location/package-info.java | 1 - .../location/service/ISearchService.java | 7 + .../service/impl/SearchServiceImpl.java | 64 ++++++- .../src/main/resources/logback-plus.xml | 82 +++++++-- .../src/main/resources/application.yml | 2 +- stwzhj-modules/stwzhj-system/pom.xml | 5 + .../system/IndexStaticsController.java | 33 ++++ .../org/dromara/system/domain/SysDept.java | 4 + .../dromara/system/domain/bo/SysDeptBo.java | 6 + .../dromara/system/domain/bo/TDeviceBo.java | 4 + .../dromara/system/domain/vo/SysDeptVo.java | 4 + .../system/domain/vo/TDeviceExportVo.java | 24 ++- .../system/domain/vo/TDeviceImportVo.java | 8 +- .../dromara/system/domain/vo/TDeviceVo.java | 4 + .../listener/TDeviceImportListener.java | 78 +++++--- .../system/service/ISysNoticeService.java | 2 + .../service/impl/SysDeptServiceImpl.java | 20 ++- .../service/impl/SysNoticeServiceImpl.java | 10 ++ .../service/impl/TDeviceServiceImpl.java | 1 + .../src/main/resources/logback-plus.xml | 82 +++++++-- .../resources/mapper/system/SysDeptMapper.xml | 20 +-- .../src/main/resources/logback-plus.xml | 82 +++++++-- .../udp/controller/OriginalUdpReceiver.java | 2 +- .../src/main/resources/logback-plus.xml | 82 +++++++-- .../src/main/resources/logback-plus.xml | 86 +++++++++ .../src/main/resources/application.properties | 5 +- 68 files changed, 2316 insertions(+), 268 deletions(-) create mode 100644 stwzhj-api/stwzhj-api-location/pom.xml create mode 100644 stwzhj-api/stwzhj-api-location/src/main/java/org/dromara/location/api/RemoteElasticSearchService.java create mode 100644 stwzhj-gateway/src/main/java/org/dromara/gateway/filter/WebSocketAuthFilter.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/pom.xml create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/KafkaToSocketApplication.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/KafkaConfig.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/KafkaProperties.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/WebSocketConfig.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/WebSocketProperties.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/KafkaConsumerManager.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/KafkaConsumerService.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/MessageBufferManager.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/dto/SharedState.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/dto/WebSocketMessageDto.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/handle/KafkaWebSocketHandler.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/holder/WebSocketSessionHolder.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/interceptor/AuthInterceptor.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/utils/SocketUtils.java create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/application.yml create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/banner.txt create mode 100644 stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/logback-plus.xml create mode 100644 stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/dubbo/RemoteElasticSearchServiceImpl.java delete mode 100644 stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/package-info.java create mode 100644 stwzhj-modules/wzhj-webscoket/src/main/resources/logback-plus.xml diff --git a/pom.xml b/pom.xml index bacecbde..912ad929 100644 --- a/pom.xml +++ b/pom.xml @@ -89,12 +89,12 @@ prod prod - 10.129.128.114:8848 + 53.176.146.99:8848 DEFAULT_GROUP DEFAULT_GROUP nacos nacos - 10.129.128.114:4560 + 53.176.146.99:4560 diff --git a/stwzhj-api/pom.xml b/stwzhj-api/pom.xml index 38a3e4a9..7628b723 100644 --- a/stwzhj-api/pom.xml +++ b/stwzhj-api/pom.xml @@ -14,6 +14,7 @@ stwzhj-api-resource stwzhj-api-workflow stwzhj-api-data2es + stwzhj-api-location stwzhj-api diff --git a/stwzhj-api/stwzhj-api-bom/pom.xml b/stwzhj-api/stwzhj-api-bom/pom.xml index f1e1c317..97770712 100644 --- a/stwzhj-api/stwzhj-api-bom/pom.xml +++ b/stwzhj-api/stwzhj-api-bom/pom.xml @@ -47,6 +47,12 @@ ${revision} + + org.dromara + stwzhj-api-location + ${revision} + + diff --git a/stwzhj-api/stwzhj-api-location/pom.xml b/stwzhj-api/stwzhj-api-location/pom.xml new file mode 100644 index 00000000..08f5be64 --- /dev/null +++ b/stwzhj-api/stwzhj-api-location/pom.xml @@ -0,0 +1,33 @@ + + + + org.dromara + stwzhj-api + ${revision} + + 4.0.0 + + stwzhj-api-location + + + stwzhj-api-location + + + + + + + org.dromara + stwzhj-common-core + + + + org.dromara + stwzhj-common-excel + + + + + diff --git a/stwzhj-api/stwzhj-api-location/src/main/java/org/dromara/location/api/RemoteElasticSearchService.java b/stwzhj-api/stwzhj-api-location/src/main/java/org/dromara/location/api/RemoteElasticSearchService.java new file mode 100644 index 00000000..fcaa62fb --- /dev/null +++ b/stwzhj-api/stwzhj-api-location/src/main/java/org/dromara/location/api/RemoteElasticSearchService.java @@ -0,0 +1,8 @@ +package org.dromara.location.api; + +import java.util.List; + +public interface RemoteElasticSearchService { + + List linstenDataStatus(); +} diff --git a/stwzhj-auth/src/main/resources/logback-plus.xml b/stwzhj-auth/src/main/resources/logback-plus.xml index a2e187f8..909b3020 100644 --- a/stwzhj-auth/src/main/resources/logback-plus.xml +++ b/stwzhj-auth/src/main/resources/logback-plus.xml @@ -1,18 +1,18 @@ - + - - - - ${console.log.pattern} - utf-8 - - + @@ -21,8 +21,66 @@ - - - + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + diff --git a/stwzhj-common/stwzhj-common-dubbo/src/main/resources/common-dubbo.yml b/stwzhj-common/stwzhj-common-dubbo/src/main/resources/common-dubbo.yml index 02638409..70227a0a 100644 --- a/stwzhj-common/stwzhj-common-dubbo/src/main/resources/common-dubbo.yml +++ b/stwzhj-common/stwzhj-common-dubbo/src/main/resources/common-dubbo.yml @@ -25,12 +25,12 @@ dubbo: username: dubbo password: ruoyi123 # 集群开关 - sentinel: false + sentinel: true parameters: namespace: ${spring.profiles.active} database: ${spring.data.redis.database} timeout: ${spring.data.redis.timeout} - backup: 10.129.128.116:26380,10.129.128.115:26380,10.129.128.114:26380 + backup: 53.176.146.98:26380,53.176.146.99:26380,53.176.146.100:26380 # metadata-report: # address: redis://${spring.data.redis.host}:${spring.data.redis.port} # group: DUBBO_GROUP diff --git a/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java b/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java index a7f0e8af..99df478b 100644 --- a/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java +++ b/stwzhj-common/stwzhj-common-redis/src/main/java/org/dromara/common/redis/utils/RedisUtils.java @@ -751,4 +751,12 @@ public class RedisUtils { return count1; } + // 查询半径周边 米内的成员 + public static List nearByXYReadonly(double centerLon,double centerLat,double distance){ + RGeo geo = CLIENT.getGeo(RedisConstants.ONLINE_USERS_GEO); + List members = geo.radius(centerLon, centerLat, distance, GeoUnit.METERS); + return members; + } + + } diff --git a/stwzhj-common/stwzhj-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java b/stwzhj-common/stwzhj-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java index 557ed8eb..847156af 100644 --- a/stwzhj-common/stwzhj-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java +++ b/stwzhj-common/stwzhj-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java @@ -56,6 +56,11 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { WebSocketUtils.publishMessage(webSocketMessageDto); } + protected void handleStringMessage(WebSocketSession session, String message) throws Exception { + // 从WebSocket会话中获取登录用户信息 + WebSocketUtils.sendMessage(session,message); + } + /** * 处理接收到的二进制消息 * diff --git a/stwzhj-common/stwzhj-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java b/stwzhj-common/stwzhj-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java index 368801c3..9c2372b8 100644 --- a/stwzhj-common/stwzhj-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java +++ b/stwzhj-common/stwzhj-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java @@ -2,6 +2,7 @@ package org.dromara.common.websocket.holder; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketSession; import java.util.Map; @@ -25,6 +26,7 @@ public class WebSocketSessionHolder { * @param session 要添加的WebSocket会话 */ public static void addSession(Long sessionKey, WebSocketSession session) { + removeSession(sessionKey); USER_SESSION_MAP.put(sessionKey, session); } @@ -34,8 +36,10 @@ public class WebSocketSessionHolder { * @param sessionKey 要移除的会话键 */ public static void removeSession(Long sessionKey) { - if (USER_SESSION_MAP.containsKey(sessionKey)) { - USER_SESSION_MAP.remove(sessionKey); + WebSocketSession session = USER_SESSION_MAP.remove(sessionKey); + try { + session.close(CloseStatus.BAD_DATA); + } catch (Exception ignored) { } } diff --git a/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/GlobalCorsFilter.java b/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/GlobalCorsFilter.java index a2482e79..54b25038 100644 --- a/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/GlobalCorsFilter.java +++ b/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/GlobalCorsFilter.java @@ -1,5 +1,7 @@ package org.dromara.gateway.filter; +import cn.dev33.satoken.stp.StpUtil; +import lombok.extern.slf4j.Slf4j; import org.springframework.core.Ordered; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -20,6 +22,7 @@ import reactor.core.publisher.Mono; * @author Lion Li */ @Component +@Slf4j public class GlobalCorsFilter implements WebFilter, Ordered { /** @@ -60,8 +63,18 @@ public class GlobalCorsFilter implements WebFilter, Ordered { @Override public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); - // 判断请求是否为跨域请求 - if (CorsUtils.isCorsRequest(request)) { + + // 仅拦截 WebSocket 握手请求 + /*if (isWebSocketHandshake(exchange.getRequest())) { + // 从请求中提取 Token(支持 URL 参数或 Header) + String token = extractToken(exchange.getRequest()); + + // 验证 Token 有效性 + if (!validateToken(token)) { + log.info("webSocket认证过期Token={}", token); + return unauthorizedResponse(exchange, "Invalid token"); + } + }else */if (CorsUtils.isCorsRequest(request)) { // 判断请求是否为跨域请求 ServerHttpResponse response = exchange.getResponse(); HttpHeaders headers = response.getHeaders(); headers.add("Access-Control-Allow-Headers", ALLOWED_HEADERS); @@ -83,4 +96,37 @@ public class GlobalCorsFilter implements WebFilter, Ordered { public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } + + // 判断是否为 WebSocket 握手请求 + private boolean isWebSocketHandshake(ServerHttpRequest request) { + String upgradeHeader = request.getHeaders().getFirst("Upgrade"); + return "websocket".equalsIgnoreCase(upgradeHeader); + } + + // 提取 Token(优先从 Header,其次 URL 参数) + private String extractToken(ServerHttpRequest request) { + // 从 Header 获取 + String authHeader = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION); + if (authHeader != null && authHeader.startsWith("Bearer ")) { + return authHeader.substring(7); + } + // 从 URL 参数获取 + return request.getQueryParams().getFirst("token"); + } + + // Token 验证逻辑(示例:调用认证服务或本地解析 JWT) + private boolean validateToken(String token) { + + // 示例:调用认证服务接口或本地校验 JWT + return token != null && !token.isEmpty() && StpUtil.getTokenTimeout(token) > 0; + } + + // 返回 401 未授权响应 + private Mono unauthorizedResponse(ServerWebExchange exchange, String message) { + exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); + exchange.getResponse().getHeaders().add("Content-Type", "text/plain"); + return exchange.getResponse().writeWith( + Mono.just(exchange.getResponse().bufferFactory().wrap(message.getBytes())) + ); + } } diff --git a/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/GlobalLogFilter.java b/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/GlobalLogFilter.java index fbfa329a..3810a5b4 100644 --- a/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/GlobalLogFilter.java +++ b/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/GlobalLogFilter.java @@ -43,7 +43,6 @@ public class GlobalLogFilter implements GlobalFilter, Ordered { ServerHttpRequest request = exchange.getRequest(); String path = WebFluxUtils.getOriginalRequestUrl(exchange); String url = request.getMethod().name() + " " + path; - // 打印请求参数 if (WebFluxUtils.isJsonRequest(exchange)) { if (apiDecryptProperties.getEnabled() diff --git a/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/WebSocketAuthFilter.java b/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/WebSocketAuthFilter.java new file mode 100644 index 00000000..c8abe445 --- /dev/null +++ b/stwzhj-gateway/src/main/java/org/dromara/gateway/filter/WebSocketAuthFilter.java @@ -0,0 +1,65 @@ +package org.dromara.gateway.filter; + +import org.springframework.cloud.gateway.filter.GatewayFilterChain; +import org.springframework.cloud.gateway.filter.GlobalFilter; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.stereotype.Component; + +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Mono; + +//@Configuration +//@Order(-1) // 高优先级 +public class WebSocketAuthFilter implements GlobalFilter { + + @Override + public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { + // 仅拦截 WebSocket 握手请求 + if (isWebSocketHandshake(exchange.getRequest())) { + // 从请求中提取 Token(支持 URL 参数或 Header) + String token = extractToken(exchange.getRequest()); + + // 验证 Token 有效性 + if (!validateToken(token)) { + return unauthorizedResponse(exchange, "Invalid token"); + } + } + return chain.filter(exchange); + } + + // 判断是否为 WebSocket 握手请求 + private boolean isWebSocketHandshake(ServerHttpRequest request) { + String upgradeHeader = request.getHeaders().getFirst("Upgrade"); + return "websocket".equalsIgnoreCase(upgradeHeader); + } + + // 提取 Token(优先从 Header,其次 URL 参数) + private String extractToken(ServerHttpRequest request) { + // 从 Header 获取 + String authHeader = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION); + if (authHeader != null && authHeader.startsWith("Bearer ")) { + return authHeader.substring(7); + } + // 从 URL 参数获取 + return request.getQueryParams().getFirst("token"); + } + + // Token 验证逻辑(示例:调用认证服务或本地解析 JWT) + private boolean validateToken(String token) { + // 示例:调用认证服务接口或本地校验 JWT + return token != null && !token.isEmpty(); + } + + // 返回 401 未授权响应 + private Mono unauthorizedResponse(ServerWebExchange exchange, String message) { + exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); + exchange.getResponse().getHeaders().add("Content-Type", "text/plain"); + return exchange.getResponse().writeWith( + Mono.just(exchange.getResponse().bufferFactory().wrap(message.getBytes())) + ); + } +} diff --git a/stwzhj-gateway/src/main/resources/logback-plus.xml b/stwzhj-gateway/src/main/resources/logback-plus.xml index 4d66014c..909b3020 100644 --- a/stwzhj-gateway/src/main/resources/logback-plus.xml +++ b/stwzhj-gateway/src/main/resources/logback-plus.xml @@ -1,114 +1,86 @@ - + - - - - - ${console.log.pattern} - utf-8 - - + - - - ${log.path}/console.log - - - ${log.path}/console.%d{yyyy-MM-dd}.log - - 1 - - - ${log.pattern} - utf-8 - - - - INFO - - - - - - ${log.path}/info.log - - - - ${log.path}/info.%d{yyyy-MM-dd}.log - - 60 - - - ${log.pattern} - - - - INFO - - ACCEPT - - DENY - - - - - ${log.path}/error.log - - - - ${log.path}/error.%d{yyyy-MM-dd}.log - - 60 - - - ${log.pattern} - - - - ERROR - - ACCEPT - - DENY - - - - - - - 0 - - 512 - - - - - - - - 0 - - 512 - - - + - - - - - - + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + diff --git a/stwzhj-modules/pom.xml b/stwzhj-modules/pom.xml index 132b6144..54edfd46 100644 --- a/stwzhj-modules/pom.xml +++ b/stwzhj-modules/pom.xml @@ -19,6 +19,7 @@ stwzhj-consumer stwzhj-location stwzhj-dataToGas + stwzhj-kafkaToWebsocket wzhj-webscoket wzhj-extract wzhj-udp diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java index b906fcd5..f1bd9073 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/ConsumerWorker.java @@ -52,14 +52,27 @@ public class ConsumerWorker { "auto.offset.reset:latest"}) public void consumer(ConsumerRecord record) { Object value = record.value(); - String topic = record.topic(); + EsGpsInfo esGpsInfo = JSONUtil.toBean((String) value, EsGpsInfo.class); + Date gpsTime = esGpsInfo.getGpsTime(); +// log.info("value={}",value); + if(Objects.isNull(gpsTime)){ + log.error("gpsTime == null,deviceCode={}",esGpsInfo.getDeviceCode()); + return; + } + String deviceType = esGpsInfo.getDeviceType(); + if(StringUtils.isBlank(deviceType)){ + log.error("deviceType is null, deviceCode={}",esGpsInfo.getDeviceCode()); + return; + } + if(DateUtil.between(gpsTime,new Date(), DateUnit.MINUTE) < 30){ + esGpsInfo.setOnline(1); + } - if ("jysb_dwxx".equals(topic)){ //定位信息 -// logger.info("offset={},topic={},value={}", record.offset(), topic,value); - luanrequest(value); - } else if ("jysb_sbxx".equals(topic)) { //基础信息 -// logger.info("offset={},topic={},value={}", record.offset(), topic,value); - baseDataRequest(value); + logger.info("esGpsInfo={}",esGpsInfo); + boolean offer = linkedBlockingDeque.offer(esGpsInfo); + R response = R.ok(offer); + if(Objects.isNull(response)){ + logger.info("response == null"); } diff --git a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java index e43afdd5..3dccf021 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java +++ b/stwzhj-modules/stwzhj-consumer/src/main/java/org/dromara/kafka/consumer/handler/DataInsertBatchHandler.java @@ -40,7 +40,7 @@ public class DataInsertBatchHandler implements CommandLineRunner { public void run(String... args) throws Exception { ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); LinkedBlockingDeque linkedBlockingDeque = ConsumerWorker.linkedBlockingDeque; //定位信息队列 - LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列 +// LinkedBlockingDeque baseDataDeque = ConsumerWorker.basedataDeque; //基础信息队列 singleThreadExecutor.execute(new Runnable() { @Override public void run() { @@ -49,15 +49,15 @@ public class DataInsertBatchHandler implements CommandLineRunner { List list = new ArrayList<>(); List bases = new ArrayList<>(); Queues.drain(linkedBlockingDeque, list, 200, 5, TimeUnit.SECONDS); - Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS); +// Queues.drain(baseDataDeque, bases, 100, 5, TimeUnit.SECONDS); log.info("batch size={}", list.size()); - log.info("basedata size={}", bases.size()); +// log.info("basedata size={}", bases.size()); if(CollectionUtil.isNotEmpty(list)) { gpsService.saveDataBatch(list); } - if(CollectionUtil.isNotEmpty(bases)) { + /*if(CollectionUtil.isNotEmpty(bases)) { deviceService.batchSaveDevice(bases); - } + }*/ } catch (Exception e) { log.error("缓存队列批量消费异常:{}", e.getMessage()); } diff --git a/stwzhj-modules/stwzhj-consumer/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-consumer/src/main/resources/logback-plus.xml index caaa3455..efdad6ff 100644 --- a/stwzhj-modules/stwzhj-consumer/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/stwzhj-consumer/src/main/resources/logback-plus.xml @@ -6,13 +6,13 @@ - + @@ -21,8 +21,66 @@ - - - + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + diff --git a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java index 73ba4e6d..750c97cc 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java +++ b/stwzhj-modules/stwzhj-data2es/src/main/java/org/dromara/data2es/handler/RedisExpireListener.java @@ -53,7 +53,7 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener { public void onMessage(Message message, byte[] pattern) { String expireKey = message.toString(); if(StringUtils.isNotEmpty(expireKey) && - expireKey.startsWith(RedisConstants.ORG_CODE_PRE)){ + expireKey.startsWith(RedisConstants.ONLINE_USERS_TEN)){ handleExpiredEvent(expireKey); } } @@ -85,7 +85,10 @@ public class RedisExpireListener extends KeyExpirationEventMessageListener { } catch (InterruptedException e) { e.printStackTrace(); } finally { - lock.unlock(); + // 仅在当前线程持有锁时释放 + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } } } diff --git a/stwzhj-modules/stwzhj-data2es/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-data2es/src/main/resources/logback-plus.xml index caaa3455..909b3020 100644 --- a/stwzhj-modules/stwzhj-data2es/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/stwzhj-data2es/src/main/resources/logback-plus.xml @@ -1,18 +1,18 @@ - - + + - - - - ${console.log.pattern} - utf-8 - - + @@ -21,8 +21,66 @@ - - - + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/pom.xml b/stwzhj-modules/stwzhj-kafkaToWebsocket/pom.xml new file mode 100644 index 00000000..d0f0f85e --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/pom.xml @@ -0,0 +1,139 @@ + + + + org.dromara + stwzhj-modules + ${revision} + + 4.0.0 + + stwzhj-kafkaToWebsocket + + + stwzhj-kafkaToWebsocket kafka消息发送到Websocket + + + + + + org.dromara + stwzhj-common-nacos + + + + org.dromara + stwzhj-common-sentinel + + + + + org.dromara + stwzhj-common-log + + + + org.dromara + stwzhj-common-dict + + + + org.dromara + stwzhj-common-doc + + + + org.dromara + stwzhj-common-web + + + + org.dromara + stwzhj-common-dubbo + + + + org.dromara + stwzhj-common-seata + + + + org.dromara + stwzhj-common-idempotent + + + + org.dromara + stwzhj-common-tenant + + + + org.dromara + stwzhj-common-security + + + + org.dromara + stwzhj-common-translation + + + + org.dromara + stwzhj-common-sensitive + + + + org.dromara + stwzhj-common-encrypt + + + + + org.dromara + stwzhj-api-system + + + + org.dromara + stwzhj-api-resource + + + + org.dromara + stwzhj-api-location + + + + org.springframework.boot + spring-boot-starter-websocket + + + + + + org.springframework.kafka + spring-kafka + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + + diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/KafkaToSocketApplication.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/KafkaToSocketApplication.java new file mode 100644 index 00000000..8bba6be0 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/KafkaToSocketApplication.java @@ -0,0 +1,26 @@ +package org.dromara.kafka2Websocket; + + +import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; + +import org.dromara.kafka2Websocket.config.KafkaProperties; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.scheduling.annotation.EnableScheduling; + +@EnableDubbo +@EnableScheduling +@SpringBootApplication +@EnableConfigurationProperties(KafkaProperties.class) +public class KafkaToSocketApplication { + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(KafkaToSocketApplication.class); + application.setApplicationStartup(new BufferingApplicationStartup(2048)); + application.run(args); + System.out.println("(♥◠‿◠)ノ゙ Socket启动成功 ლ(´ڡ`ლ)゙ "); + } + +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/KafkaConfig.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/KafkaConfig.java new file mode 100644 index 00000000..e3b0651c --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/KafkaConfig.java @@ -0,0 +1,66 @@ +package org.dromara.kafka2Websocket.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler; +import org.springframework.kafka.listener.ContainerProperties; + +import java.util.HashMap; +import java.util.Map; + +// 1、kafka配置类 +@Configuration +@Slf4j +public class KafkaConfig { + + @Bean + public ConsumerFactory consumerFactory(KafkaProperties properties) { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + config.put(ConsumerConfig.GROUP_ID_CONFIG,properties.getGroupId()); + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setBatchListener(true); + factory.setConcurrency(3); + // 核心配置:设置手动提交模式 +// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } + + + + + /** + * 异常处理器 + * + * @return + */ + @Bean + public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() { + return (message, exception, consumer) -> { + log.error("消息{} , 异常原因{}", message, exception.getMessage()); + log.error("consumerAwareListenerErrorHandler called"); + + return null; + }; + } + +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/KafkaProperties.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/KafkaProperties.java new file mode 100644 index 00000000..b6983db4 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/KafkaProperties.java @@ -0,0 +1,17 @@ +package org.dromara.kafka2Websocket.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.util.List; + +// 2. 配置属性(KafkaProperties.java) +@Data +@ConfigurationProperties(prefix = "app.kafka") +public class KafkaProperties { + private String bootstrapServers; + private List topics; + private String groupId; + private int concurrency = 3; +} + diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/WebSocketConfig.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/WebSocketConfig.java new file mode 100644 index 00000000..26895891 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/WebSocketConfig.java @@ -0,0 +1,58 @@ +package org.dromara.kafka2Websocket.config; + +import cn.hutool.core.util.StrUtil; +import jakarta.websocket.OnOpen; +import org.dromara.kafka2Websocket.dto.SharedState; +import org.dromara.kafka2Websocket.handle.KafkaWebSocketHandler; +import org.dromara.kafka2Websocket.interceptor.AuthInterceptor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.server.HandshakeInterceptor; + +// 3. WebSocket配置(WebSocketConfig.java) +@Configuration +@EnableWebSocket +public class WebSocketConfig { + + + @Bean + public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, + WebSocketHandler webSocketHandler) { + // 如果WebSocket的路径为空,则设置默认路径为 "/websocket" +// webSocketProperties.setPath("ws/websocket"); +// webSocketProperties.setAllowedOrigins("*"); + /* if (StrUtil.isBlank(webSocketProperties.getPath())) { + webSocketProperties.setPath("/websocket"); + }*/ + + // 如果允许跨域访问的地址为空,则设置为 "*",表示允许所有来源的跨域请求 + /* if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { + webSocketProperties.setAllowedOrigins("*"); + } +*/ + // 返回一个WebSocketConfigurer对象,用于配置WebSocket + return registry -> registry + // 添加WebSocket处理程序和拦截器到指定路径,设置允许的跨域来源 + .addHandler(webSocketHandler,"ws/websocket") + .addInterceptors(handshakeInterceptor) + .setAllowedOrigins("*"); + } + + @Bean + public HandshakeInterceptor handshakeInterceptor() { + return new AuthInterceptor(); + } + + + + + @Bean + public WebSocketHandler webSocketHandler() { + return new KafkaWebSocketHandler(); + } + + +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/WebSocketProperties.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/WebSocketProperties.java new file mode 100644 index 00000000..ba680173 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/config/WebSocketProperties.java @@ -0,0 +1,26 @@ +package org.dromara.kafka2Websocket.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * WebSocket 配置项 + * + * @author zendwang + */ +//@ConfigurationProperties("websocket") +@Data +public class WebSocketProperties { + + private Boolean enabled; + + /** + * 路径 + */ + private String path; + + /** + * 设置访问源地址 + */ + private String allowedOrigins; +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/KafkaConsumerManager.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/KafkaConsumerManager.java new file mode 100644 index 00000000..520af126 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/KafkaConsumerManager.java @@ -0,0 +1,113 @@ +package org.dromara.kafka2Websocket.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.dromara.common.satoken.utils.LoginHelper; +import org.dromara.kafka2Websocket.config.KafkaProperties; +import org.dromara.kafka2Websocket.handle.KafkaWebSocketHandler; +import org.dromara.system.api.model.LoginUser; +import org.springframework.context.SmartLifecycle; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; + +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +//@Configuration +//@Slf4j +public class KafkaConsumerManager implements SmartLifecycle { + private final AtomicBoolean running = new AtomicBoolean(false); + private final KafkaProperties properties; + private final KafkaWebSocketHandler webSocketHandler; + private final KafkaTemplate kafkaTemplate; + + private KafkaConsumer consumer; + private Thread consumerThread; + + public KafkaConsumerManager(KafkaProperties properties, + KafkaWebSocketHandler webSocketHandler, + KafkaTemplate kafkaTemplate) { + this.properties = properties; + this.webSocketHandler = webSocketHandler; + this.kafkaTemplate = kafkaTemplate; + } + + @Override + public void start() { + if (running.compareAndSet(false, true)) { + initializeConsumer(); + consumerThread = new Thread(this::consumeMessages); + consumerThread.start(); + } + } + + private void initializeConsumer() { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServers()); + props.put("group.id", properties.getGroupId()); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("enable.auto.commit", "false"); + props.put("auto.offset.reset", "earliest"); + + consumer = new KafkaConsumer<>(props); + consumer.subscribe(properties.getTopics()); + } + + private void consumeMessages() { + try { + //running.get() && webSocketHandler.hasConnections() + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + processRecords(records); + consumer.commitSync(); + } + } catch (Exception e) { + // 处理异常 + } finally { + consumer.close(); + } + } + + private void processRecords(ConsumerRecords records) { + LoginUser user = LoginHelper.getLoginUser(); + records.forEach(record -> { + try { +// String formatted = formatMessage(record.topic(), record.value()); + webSocketHandler.broadcast(record.value(), user.getManageDeptId()); + } catch (Exception e) { + handleFailedMessage(record); + } + }); + } + + private String formatMessage(String topic, String payload) { + return String.format("{\"topic\":\"%s\",\"data\":%s}", topic, payload); + } + + private void handleFailedMessage(ConsumerRecord record) { + kafkaTemplate.send("dlq-" + record.topic(), record.value()); + } + + @Override + public void stop() { + if (running.compareAndSet(true, false)) { + if (consumer != null) consumer.wakeup(); + if (consumerThread != null) { + try { + consumerThread.join(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + @Override + public boolean isRunning() { + return running.get(); + } +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/KafkaConsumerService.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/KafkaConsumerService.java new file mode 100644 index 00000000..a46649f1 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/KafkaConsumerService.java @@ -0,0 +1,39 @@ +package org.dromara.kafka2Websocket.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.kafka2Websocket.dto.SharedState; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +import java.util.List; + + +@Service +@Slf4j +public class KafkaConsumerService { + + @Autowired + private SharedState state; + + @Autowired + private MessageBufferManager bufferManager; + + + @KafkaListener(topics = "${spring.kafka.topics}") + public void processMessage(List records) { + log.error("flag的值={}",state.getFlag()); + try { + records.stream().forEach(record -> { + if (state.getFlag()){ + bufferManager.bufferMessage("3413",record); + } + + }); + + } catch (Exception e){ + e.printStackTrace(); + } + } + +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/MessageBufferManager.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/MessageBufferManager.java new file mode 100644 index 00000000..0421ca03 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/consumer/MessageBufferManager.java @@ -0,0 +1,71 @@ +package org.dromara.kafka2Websocket.consumer; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import jakarta.websocket.Session; +import org.dromara.kafka2Websocket.holder.WebSocketSessionHolder; +import org.dromara.kafka2Websocket.utils.SocketUtils; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +@Component +public class MessageBufferManager { + // 配置参数:批量大小和缓冲时间 + private static final int BATCH_SIZE = 500; + private static final long BUFFER_TIME_MS = 100; + + // 用户ID -> 消息队列(线程安全) + private final ConcurrentHashMap> userMessageQueues = new ConcurrentHashMap<>(); + + // 添加消息到缓冲区 + public void bufferMessage(String userId, String message) { + LinkedBlockingQueue queue = userMessageQueues.computeIfAbsent(userId, k -> new LinkedBlockingQueue<>()); + queue.offer(message); + + // 触发批量发送(如果达到阈值) + if (queue.size() >= BATCH_SIZE) { + sendBufferedMessages(userId); + } + } + + // 定时任务:按时间窗口发送 + @Scheduled(fixedRate = BUFFER_TIME_MS) + @Async + public void flushBufferedMessages() { + userMessageQueues.keySet().forEach(this::sendBufferedMessages); + } + + // 发送并清空缓冲区 + private void sendBufferedMessages(String userId) { + LinkedBlockingQueue queue = userMessageQueues.get(userId); + if (queue == null || queue.isEmpty()) return; + + List messages = new ArrayList<>(); + queue.drainTo(messages); // 原子操作:取出所有消息并清空队列 + + if (!messages.isEmpty()) { + String mergedMessage = mergeMessages(messages); + Map> allOnlineSessions = WebSocketSessionHolder.getAllOnlineSessions(); + allOnlineSessions.forEach((key, value) -> { + SocketUtils.sendMessage(key,mergedMessage); + }); + /*WebSocketSessionHolder.getSessions(userId).forEach(session -> { + if (session.isOpen()) { + // todo 发送消息 + SocketUtils.sendMessage(session,mergedMessage); + } + });*/ + } + } + + // 合并消息(示例:JSON 数组) + private String mergeMessages(List messages) { + return "{\"type\":\"batch\", \"data\":" + new Gson().toJson(messages) + "}"; + } +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/dto/SharedState.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/dto/SharedState.java new file mode 100644 index 00000000..807c13ca --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/dto/SharedState.java @@ -0,0 +1,33 @@ +package org.dromara.kafka2Websocket.dto; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicBoolean; + +@Component +public class SharedState { + + // 定义 AtomicBoolean 实例 + private final AtomicBoolean flag = new AtomicBoolean(false); + + // 修改标志位(原子操作) + public void setFlag(boolean value) { + flag.set(value); + } + + //获取 + public boolean getFlag() { + return flag.get(); + } + + + // 原子性地从 false 修改为 true(类似锁的获取) + public boolean tryActivate() { + return flag.compareAndSet(false, true); + } + + public boolean tryActivateFalse() { + return flag.compareAndSet(true, false); + } + +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/dto/WebSocketMessageDto.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/dto/WebSocketMessageDto.java new file mode 100644 index 00000000..f30c86c8 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/dto/WebSocketMessageDto.java @@ -0,0 +1,29 @@ +package org.dromara.kafka2Websocket.dto; + +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; +import java.util.List; + +/** + * 消息的dto + * + * @author zendwang + */ +@Data +public class WebSocketMessageDto implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * 需要推送到的session key 列表 + */ + private List sessionKeys; + + /** + * 需要发送的消息 + */ + private String message; +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/handle/KafkaWebSocketHandler.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/handle/KafkaWebSocketHandler.java new file mode 100644 index 00000000..ef116546 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/handle/KafkaWebSocketHandler.java @@ -0,0 +1,113 @@ +package org.dromara.kafka2Websocket.handle; + +import cn.dev33.satoken.session.SaSession; +import cn.dev33.satoken.stp.StpUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.satoken.utils.LoginHelper; +import org.dromara.kafka2Websocket.dto.SharedState; +import org.dromara.kafka2Websocket.holder.WebSocketSessionHolder; +import org.dromara.system.api.model.LoginUser; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.dromara.common.satoken.utils.LoginHelper.LOGIN_USER_KEY; + +// 4. WebSocket处理器(WebSocketHandler.java) +@Configuration +@Slf4j +public class KafkaWebSocketHandler extends TextWebSocketHandler { + + private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); + private final AtomicInteger connectionCount = new AtomicInteger(0); + + @Autowired + private SharedState state; + + + @Override + public void afterConnectionEstablished(WebSocketSession session) { +// session.getUri().getQuery(); + +// LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); + sessions.put(session.getId(), session); + WebSocketSessionHolder.addSession(session.getId(),session); + connectionCount.incrementAndGet(); + if(connectionCount.get() >0){ //有在线用户时 + state.setFlag(true); + log.info("resume over "); + } + log.info("连接建立: {} 当前连接数: {}", session.getId(), sessions.size()); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + sessions.remove(session.getId()); + WebSocketSessionHolder.removeSession(session.getId()); + connectionCount.decrementAndGet(); + if(connectionCount.get() ==0){ + // 暂停监听 + state.setFlag(false); + log.info("pause"); + } + log.info("连接关闭: {} 剩余连接数: {}", session.getId(), sessions.size()); + } + + public void broadcast(String message,String deptId) { + sessions.values().parallelStream() + .filter(WebSocketSession::isOpen) + .forEach(session -> sendMessage(session, deptId, message)); + } + + private void sendMessage(WebSocketSession session, String deptId, String message) { + try { + synchronized (session) { // 保证线程安全 + JSONObject job = JSON.parseObject(message); + String zzjgdm = job.getString("zzjgdm"); + if (deptId.endsWith("00000000")){ //如果管理机构是市局 不过滤 + session.sendMessage(new TextMessage(message)); + } else if (zzjgdm.substring(0,6).equals(deptId.substring(0,6))) { // 分局登录 分局接收 + session.sendMessage(new TextMessage(message)); + }else if (zzjgdm.substring(0,8).equals(deptId.substring(0,8))) { // 支队登录 支队接收 + session.sendMessage(new TextMessage(message)); + } + } + } catch (IOException e) { + // 处理发送失败 + } + } + + public boolean hasConnections() { + return connectionCount.get() > 0; + } + + @PreDestroy + public void cleanUp() { + sessions.values().forEach(session -> { + try { + if (session.isOpen()) session.close(); + } catch (IOException e) { + // 处理关闭异常 + } + }); + } + + + + +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/holder/WebSocketSessionHolder.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/holder/WebSocketSessionHolder.java new file mode 100644 index 00000000..205ad260 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/holder/WebSocketSessionHolder.java @@ -0,0 +1,116 @@ +package org.dromara.kafka2Websocket.holder; + +import jakarta.websocket.Session; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketSession; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * WebSocketSession 用于保存当前所有在线的会话信息 + * + * @author zendwang + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class WebSocketSessionHolder { + + private static final Map USER_SESSION_MAP = new ConcurrentHashMap<>(); + + // 用户ID -> 会话集合(支持多设备) + private static final ConcurrentHashMap> userSessions = new ConcurrentHashMap<>(); + + /** + * 将WebSocket会话添加到用户会话Map中 + * + * @param sessionKey 会话键,用于检索会话 + * @param session 要添加的WebSocket会话 + */ + public static void addSession(String sessionKey, WebSocketSession session) { + removeSession(sessionKey); + userSessions.computeIfAbsent(sessionKey, k -> new CopyOnWriteArraySet<>()).add(session); + USER_SESSION_MAP.put(sessionKey, session); + } + + /** + * 从用户会话Map中移除指定会话键对应的WebSocket会话 + * + * @param sessionKey 要移除的会话键 + */ + public static void removeSession(String sessionKey) { + WebSocketSession session = USER_SESSION_MAP.remove(sessionKey); + CopyOnWriteArraySet sessions = userSessions.get(sessionKey); + if (sessions != null) { + sessions.remove(session); + if (sessions.isEmpty()) { + userSessions.remove(sessionKey); + } + } + try { + session.close(CloseStatus.BAD_DATA); + } catch (Exception ignored) { + } + } + + /** + * 根据会话键从用户会话Map中获取WebSocket会话 + * + * @param sessionKey 要获取的会话键 + * @return 与给定会话键对应的WebSocket会话,如果不存在则返回null + */ + public static WebSocketSession getSessions(Long sessionKey) { + return USER_SESSION_MAP.get(sessionKey); + } + + + // 获取用户所有会话(过滤已关闭连接) + public static CopyOnWriteArraySet getSessions(String userId) { + CopyOnWriteArraySet sessions = userSessions.get(userId); + if (sessions == null) return new CopyOnWriteArraySet<>(); + + // 清理已关闭的会话 + sessions.removeIf(session -> !session.isOpen()); + return sessions; + } + + /** + * 获取存储在用户会话Map中所有WebSocket会话的会话键集合 + * + * @return 所有WebSocket会话的会话键集合 + */ + public static Set getSessionsAll() { + return USER_SESSION_MAP.keySet(); + } + + /** + * 检查给定的会话键是否存在于用户会话Map中 + * + * @param sessionKey 要检查的会话键 + * @return 如果存在对应的会话键,则返回true;否则返回false + */ + public static Boolean existSession(Long sessionKey) { + return USER_SESSION_MAP.containsKey(sessionKey); + } + + // 获取所有在线用户ID列表 + public static List getAllOnlineUserIds() { + List userIds = new ArrayList<>(userSessions.keySet()); + userIds.removeIf(userId -> getSessions(userId).isEmpty()); + return Collections.unmodifiableList(userIds); + } + + // 获取所有在线会话(跨用户) + public static Map> getAllOnlineSessions() { + Map> copy = new ConcurrentHashMap<>(); + userSessions.forEach((userId, sessions) -> { + CopyOnWriteArraySet activeSessions = getSessions(userId); + if (!activeSessions.isEmpty()) { + copy.put(userId, activeSessions); + } + }); + return Collections.unmodifiableMap(copy); + } +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/interceptor/AuthInterceptor.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/interceptor/AuthInterceptor.java new file mode 100644 index 00000000..45903ae6 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/interceptor/AuthInterceptor.java @@ -0,0 +1,42 @@ +package org.dromara.kafka2Websocket.interceptor; + +import cn.dev33.satoken.exception.NotLoginException; +import cn.hutool.http.server.HttpServerRequest; +import com.sun.net.httpserver.HttpExchange; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.satoken.utils.LoginHelper; +import org.dromara.system.api.model.LoginUser; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; + + +// 7. 认证拦截器(AuthInterceptor.java) +@Slf4j +public class AuthInterceptor implements HandshakeInterceptor { + + @Override + public boolean beforeHandshake(ServerHttpRequest request, + ServerHttpResponse response, + WebSocketHandler wsHandler, + Map attributes) { + try { + +// LoginUser loginUser = LoginHelper.getLoginUser(); +// attributes.put("loginUser", loginUser); + return true; + } catch (NotLoginException e) { + log.error("WebSocket 认证失败'{}',无法访问系统资源", e.getMessage()); + return false; + } + } + + @Override + public void afterHandshake(ServerHttpRequest request, + ServerHttpResponse response, + WebSocketHandler wsHandler, + Exception exception) {} +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/utils/SocketUtils.java b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/utils/SocketUtils.java new file mode 100644 index 00000000..6429d2d6 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/java/org/dromara/kafka2Websocket/utils/SocketUtils.java @@ -0,0 +1,153 @@ +package org.dromara.kafka2Websocket.utils; + +import cn.hutool.core.collection.CollUtil; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.redis.utils.RedisUtils; + +import org.dromara.kafka2Websocket.dto.WebSocketMessageDto; +import org.dromara.kafka2Websocket.holder.WebSocketSessionHolder; +import org.springframework.web.socket.PongMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + + +/** + * WebSocket工具类 + * + * @author zendwang + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class SocketUtils { + + /** + * 向指定的WebSocket会话发送消息 + * + * @param sessionKey 要发送消息的用户id + * @param message 要发送的消息内容 + */ + public static void sendMessage(Long sessionKey, String message) { + WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); + sendMessage(session, message); + } + + /** + * 订阅WebSocket消息主题,并提供一个消费者函数来处理接收到的消息 + * + * @param consumer 处理WebSocket消息的消费者函数 + */ + public static void subscribeMessage(Consumer consumer) { + RedisUtils.subscribe("global:websocket", WebSocketMessageDto.class, consumer); + } + + /** + * 发布WebSocket订阅消息 + * + * @param webSocketMessage 要发布的WebSocket消息对象 + */ + public static void publishMessage(WebSocketMessageDto webSocketMessage) { + List unsentSessionKeys = new ArrayList<>(); + // 当前服务内session,直接发送消息 + for (Long sessionKey : webSocketMessage.getSessionKeys()) { + if (WebSocketSessionHolder.existSession(sessionKey)) { + SocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage()); + continue; + } + unsentSessionKeys.add(sessionKey); + } + // 不在当前服务内session,发布订阅消息 + if (CollUtil.isNotEmpty(unsentSessionKeys)) { + WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); + broadcastMessage.setMessage(webSocketMessage.getMessage()); + broadcastMessage.setSessionKeys(unsentSessionKeys); + RedisUtils.publish("global:websocket", broadcastMessage, consumer -> { + log.info("WebSocket发送主题订阅消息topic:{} session keys:{} message:{}", + "global:websocket", unsentSessionKeys, webSocketMessage.getMessage()); + }); + } + } + + /** + * 向所有的WebSocket会话发布订阅的消息(群发) + * + * @param message 要发布的消息内容 + */ + public static void publishAll(String message) { + WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); + broadcastMessage.setMessage(message); + RedisUtils.publish("global:websocket", broadcastMessage, consumer -> { + log.info("WebSocket发送主题订阅消息topic:{} message:{}", "global:websocket", message); + }); + } + + /** + * 向指定的WebSocket会话发送Pong消息 + * + * @param session 要发送Pong消息的WebSocket会话 + */ + public static void sendPongMessage(WebSocketSession session) { + sendMessage(session, new PongMessage()); + } + + /** + * 向指定的WebSocket会话发送文本消息 + * + * @param session WebSocket会话 + * @param message 要发送的文本消息内容 + */ + public static void sendMessage(WebSocketSession session, String message) { + sendMessage(session, new TextMessage(message)); + } + + public static void sendMessage(String sid, String message) { + WebSocketSessionHolder.getSessions(sid).forEach(session ->{ + sendMessage(session, new TextMessage(message)); + }); + + } + + + /** + * 向指定的WebSocket会话发送WebSocket消息对象 + * + * @param session WebSocket会话 + * @param message 要发送的WebSocket消息对象 + */ + private synchronized static void sendMessage(WebSocketSession session, WebSocketMessage message) { + if (session == null || !session.isOpen()) { + log.warn("[send] session会话已经关闭"); + } else { + try { + session.sendMessage(message); + } catch (IOException e) { + log.error("[send] session({}) 发送消息({}) 异常", session, message, e); + } + } + } + + /** + * 向所有的WebSocket会话发送WebSocket消息对象 + * + * @param session WebSocket会话 + * @param message 要发送的WebSocket消息对象 + */ + private synchronized static void sendAllMessage(WebSocketSession session, WebSocketMessage message) { + if (session == null || !session.isOpen()) { + log.warn("[send] session会话已经关闭"); + } else { + try { + session.sendMessage(message); + } catch (IOException e) { + log.error("[send] session({}) 发送消息({}) 异常", session, message, e); + } + } + } +} diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/application.yml b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/application.yml new file mode 100644 index 00000000..44a1bcb0 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/application.yml @@ -0,0 +1,34 @@ +# Tomcat +server: + port: 9216 + +# Spring +spring: + application: + # 应用名称 + name: wzhj-kafka2websocket + profiles: + # 环境配置 + active: @profiles.active@ + +--- # nacos 配置 +spring: + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + username: @nacos.username@ + password: @nacos.password@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml + - optional:nacos:datasource.yml + - optional:nacos:${spring.application.name}.yml diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/banner.txt b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/banner.txt new file mode 100644 index 00000000..fbd45f53 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/banner.txt @@ -0,0 +1,10 @@ +Spring Boot Version: ${spring-boot.version} +Spring Application Name: ${spring.application.name} + _ _ + (_) | | + _ __ _ _ ___ _ _ _ ______ ___ _ _ ___ | |_ ___ _ __ ___ +| '__|| | | | / _ \ | | | || ||______|/ __|| | | |/ __|| __| / _ \| '_ ` _ \ +| | | |_| || (_) || |_| || | \__ \| |_| |\__ \| |_ | __/| | | | | | +|_| \__,_| \___/ \__, ||_| |___/ \__, ||___/ \__| \___||_| |_| |_| + __/ | __/ | + |___/ |___/ \ No newline at end of file diff --git a/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/logback-plus.xml new file mode 100644 index 00000000..909b3020 --- /dev/null +++ b/stwzhj-modules/stwzhj-kafkaToWebsocket/src/main/resources/logback-plus.xml @@ -0,0 +1,86 @@ + + + + + + + + + + + + + + + + + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + + + diff --git a/stwzhj-modules/stwzhj-location/pom.xml b/stwzhj-modules/stwzhj-location/pom.xml index 17f45728..63703e66 100644 --- a/stwzhj-modules/stwzhj-location/pom.xml +++ b/stwzhj-modules/stwzhj-location/pom.xml @@ -103,6 +103,17 @@ org.dromara stwzhj-api-resource + + + org.dromara + stwzhj-api-location + + + + org.springframework.boot + spring-boot-starter-websocket + + org.elasticsearch diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/LocationApplication.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/LocationApplication.java index d77f8d29..f8df182a 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/LocationApplication.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/LocationApplication.java @@ -5,6 +5,7 @@ import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.scheduling.annotation.EnableScheduling; @EnableDubbo diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/LocationController.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/LocationController.java index 1cf210af..7082cd53 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/LocationController.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/controller/LocationController.java @@ -9,6 +9,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.common.core.domain.R; +import org.dromara.common.core.utils.RedisConstants; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.system.api.RemoteDeviceService; import org.dromara.system.api.domain.bo.RemoteDeviceBo; @@ -177,23 +178,20 @@ public class LocationController { String lat = params.get("lat").toString(); String lng = params.get("lng").toString(); String dist = params.get("distance").toString(); - /* List geoRadiusResponses = redisUtil.nearByXYReadonly(RedisConstants.ONLINE_USERS_GEO, + List geoRadiusResponses = RedisUtils.nearByXYReadonly( Double.parseDouble(lng), Double.parseDouble(lat), Double.parseDouble(dist)); - List list = new ArrayList<>(); - for (GeoRadiusResponse geoRadiusRespons : geoRadiusResponses) { - String memberByString = geoRadiusRespons.getMemberByString(); - logger.info("member:"+memberByString); - String[] strs = memberByString.split("#"); - logger.info("key值:"+keys+":"+strs[0]+":"+strs[1]+":"+strs[2]); - Object object = redisUtil.get(keys+":"+strs[0]+":"+strs[1]+":"+strs[2]); + List list = new ArrayList<>(); + for (String geoRadiusRespons : geoRadiusResponses) { + logger.info("member:"+geoRadiusRespons); + String[] strs = geoRadiusRespons.split("#"); + logger.info("key值:"+keys+":"+strs[0]+":"+strs[1]); + JSONObject object = RedisUtils.getBucket(keys+":"+strs[0]+":"+strs[1]); if (null != object){ - Device device = FastJSONUtil.parsePojo(object.toString(), Device.class); - //device = rebuildDevice(device); - list.add(device); + list.add(object); } - }*/ + } - return R.ok(); + return R.ok(list); } /*+ diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/dubbo/RemoteElasticSearchServiceImpl.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/dubbo/RemoteElasticSearchServiceImpl.java new file mode 100644 index 00000000..020a27a3 --- /dev/null +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/dubbo/RemoteElasticSearchServiceImpl.java @@ -0,0 +1,23 @@ +package org.dromara.location.dubbo; + +import lombok.RequiredArgsConstructor; +import org.apache.dubbo.config.annotation.DubboService; +import org.dromara.location.api.RemoteElasticSearchService; +import org.dromara.location.service.ISearchService; +import org.springframework.stereotype.Service; + +import java.util.List; + +@RequiredArgsConstructor +@Service +@DubboService +public class RemoteElasticSearchServiceImpl implements RemoteElasticSearchService { + + private final ISearchService searchService; + + + @Override + public List linstenDataStatus() { + return searchService.linstenDataStatus(); + } +} diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/package-info.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/package-info.java deleted file mode 100644 index abbfa76b..00000000 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/package-info.java +++ /dev/null @@ -1 +0,0 @@ -package org.dromara.location; diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/ISearchService.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/ISearchService.java index 55915b5d..068d08e3 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/ISearchService.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/ISearchService.java @@ -1,5 +1,7 @@ package org.dromara.location.service; +import org.dromara.system.api.domain.vo.RemoteDictDataVo; + import java.util.List; import java.util.Map; @@ -7,5 +9,10 @@ import java.util.Map; public interface ISearchService { public List searchCar(String deviceCode, String startTime, String endTime,String deviceType) ; + /* + * 监听ES不同设备数据状态 + * */ + public List linstenDataStatus(); + } diff --git a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java index 2e60d742..4938a7d2 100644 --- a/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java +++ b/stwzhj-modules/stwzhj-location/src/main/java/org/dromara/location/service/impl/SearchServiceImpl.java @@ -2,9 +2,14 @@ package org.dromara.location.service.impl; import cn.hutool.core.date.DateField; import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; +import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; +import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.location.service.ISearchService; +import org.dromara.system.api.RemoteDictService; +import org.dromara.system.api.domain.vo.RemoteDictDataVo; import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.RequestOptions; @@ -13,20 +18,22 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.IOException; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -38,6 +45,11 @@ public class SearchServiceImpl implements ISearchService { @Autowired private RestHighLevelClient restHighLevelClient; + @DubboReference + RemoteDictService dictService; + + + @Override public List searchCar(String deviceCode, String startTime, String endTime,String deviceType) throws RuntimeException{ @@ -117,8 +129,52 @@ public class SearchServiceImpl implements ISearchService { return sourceList; } + @Override + public List linstenDataStatus() { + List list = dictService.selectDictDataByType("zd_device_type"); + List maps = new ArrayList<>(); + for (RemoteDictDataVo dataVo : list) { + try { + BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery(); + // 匹配第二个 + TermQueryBuilder termTerminalBuilder2 = QueryBuilders.termQuery("deviceType", dataVo.getDictValue()); + boolBuilder.must(termTerminalBuilder2); + SearchRequest searchRequest = new SearchRequest(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + //排序条件 + searchSourceBuilder.sort("gpsTime", SortOrder.DESC); + searchSourceBuilder.query(boolBuilder); + searchSourceBuilder.size(1); + searchRequest.source(searchSourceBuilder); + searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + String index = "gpsinfo"+DateUtil.format(new Date(),"YYYYMMdd"); + searchRequest.indices(index); + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + for (SearchHit hit : hits) { + // 将 JSON 转换成对象 + Map sourceAsMap = hit.getSourceAsMap(); + String time = sourceAsMap.get("gpsTime").toString(); + if (DateUtil.between(new Date(),DateUtil.parse(time) , DateUnit.MINUTE)>=30){ + maps.add(dataVo.getDictLabel()); + } + } + }else { + maps.add(dataVo.getDictLabel()); + } + }catch (Exception e){ + e.printStackTrace(); + } + + } + return maps; + } + private List findEsIndexByTime(String startTime, String endTime) { startTime = startTime.substring(0, 10).replaceAll("-","");//yyyyMMdd diff --git a/stwzhj-modules/stwzhj-location/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-location/src/main/resources/logback-plus.xml index caaa3455..909b3020 100644 --- a/stwzhj-modules/stwzhj-location/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/stwzhj-location/src/main/resources/logback-plus.xml @@ -1,18 +1,18 @@ - - + + - - - - ${console.log.pattern} - utf-8 - - + @@ -21,8 +21,66 @@ - - - + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + diff --git a/stwzhj-modules/stwzhj-resource/src/main/resources/application.yml b/stwzhj-modules/stwzhj-resource/src/main/resources/application.yml index ff8e56ba..0144e3a3 100644 --- a/stwzhj-modules/stwzhj-resource/src/main/resources/application.yml +++ b/stwzhj-modules/stwzhj-resource/src/main/resources/application.yml @@ -6,7 +6,7 @@ server: spring: application: # 应用名称 - name: ruoyi-resource + name: wzhj-resource profiles: # 环境配置 active: @profiles.active@ diff --git a/stwzhj-modules/stwzhj-system/pom.xml b/stwzhj-modules/stwzhj-system/pom.xml index 2f608feb..e638f8c2 100644 --- a/stwzhj-modules/stwzhj-system/pom.xml +++ b/stwzhj-modules/stwzhj-system/pom.xml @@ -104,6 +104,11 @@ stwzhj-api-resource + + org.dromara + stwzhj-api-location + + com.github.jeffreyning mybatisplus-plus diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java index 0fb2b008..0d5049d5 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/controller/system/IndexStaticsController.java @@ -4,14 +4,20 @@ package org.dromara.system.controller.system; import cn.hutool.core.date.DateUtil; import jdk.dynalink.linker.LinkerServices; import lombok.RequiredArgsConstructor; +import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.common.core.domain.R; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.web.core.BaseController; +import org.dromara.location.api.RemoteElasticSearchService; import org.dromara.system.domain.DeviceRedis; +import org.dromara.system.domain.SysNotice; import org.dromara.system.domain.bo.SysDeptBo; +import org.dromara.system.domain.bo.SysNoticeBo; import org.dromara.system.domain.bo.TDeviceBo; import org.dromara.system.domain.vo.*; import org.dromara.system.service.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.bind.annotation.*; import java.util.*; @@ -28,6 +34,12 @@ public class IndexStaticsController extends BaseController { private final IDeviceRedisService redisService; + private final ISysNoticeService noticeService; + + @DubboReference + RemoteElasticSearchService elasticSearchService; + + /* @@ -75,6 +87,27 @@ public class IndexStaticsController extends BaseController { return R.ok(maps); } + /* + * 通过ES来监听各类设备定位是否正常 + * */ + @Scheduled(cron = "0 */30 * * * ?") + public void listen(){ + List strs = elasticSearchService.linstenDataStatus(); + if (strs.size() >0){ + List nlist = noticeService.selectTodayNoticeList(); + if (nlist.size() <=2){ + // -- todo 发送短信 + SysNoticeBo noticeBo = new SysNoticeBo(); + noticeBo.setNoticeTitle("手机号码"); + noticeBo.setNoticeType("3"); + noticeBo.setNoticeContent(strs.toString()+"数据不正常,请检查服务是否正常"); + noticeBo.setCreateTime(DateUtil.date()); + noticeService.insertNotice(noticeBo); + } + + } + } + /* * 各地市总数和在线数 * */ diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/SysDept.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/SysDept.java index 2113f91d..0ab84dd1 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/SysDept.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/SysDept.java @@ -40,6 +40,8 @@ public class SysDept extends TenantEntity { */ private String deptName; + private String shortName; + /** * 显示顺序 @@ -79,4 +81,6 @@ public class SysDept extends TenantEntity { private String fullName; + private String isVisible; + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/SysDeptBo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/SysDeptBo.java index e0812819..297a69bc 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/SysDeptBo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/SysDeptBo.java @@ -24,6 +24,7 @@ public class SysDeptBo extends BaseEntity { /** * 部门id */ + @Size(min = 0, max = 12, message = "部门机构代码长度不能超过{max}个字符") private String deptId; /** @@ -38,6 +39,9 @@ public class SysDeptBo extends BaseEntity { @Size(min = 0, max = 30, message = "部门名称长度不能超过{max}个字符") private String deptName; + @NotBlank(message = "部门简称不能为空") + private String shortName; + /** * 部门类别编码 */ @@ -75,4 +79,6 @@ public class SysDeptBo extends BaseEntity { private String fullName; + private String isVisible; + } diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/TDeviceBo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/TDeviceBo.java index 2334e292..ccdc3b25 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/TDeviceBo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/bo/TDeviceBo.java @@ -77,6 +77,10 @@ public class TDeviceBo extends BaseEntity { private String cardNum; + private String tdbm; + + private String gbbm; + /** * 0无效,1有效 */ diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/SysDeptVo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/SysDeptVo.java index 75d16f5b..0730b7ca 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/SysDeptVo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/SysDeptVo.java @@ -52,6 +52,8 @@ public class SysDeptVo implements Serializable { @ExcelProperty(value = "部门名称") private String deptName; + private String shortName; + /** * 部门类别编码 */ @@ -93,6 +95,8 @@ public class SysDeptVo implements Serializable { @ExcelDictFormat(dictType = "sys_normal_disable") private String status; + private String isVisible; + /** * 创建时间 */ diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceExportVo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceExportVo.java index a611044c..fd2fa5c5 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceExportVo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceExportVo.java @@ -16,13 +16,13 @@ public class TDeviceExportVo implements Serializable { @Serial private static final long serialVersionUID = 1L; - @ExcelProperty(value = "设备编号") + @ExcelProperty(value = "设备编码") private String deviceCode; /** * 设备类型 */ - @ExcelProperty(value = "设备类型") + @ExcelProperty(value = "设备类型", converter = ExcelDictConvert.class) @ExcelDictFormat(dictType = "zd_device_type") private String deviceType; @@ -41,35 +41,41 @@ public class TDeviceExportVo implements Serializable { /** * 警号(若有) */ - @ExcelProperty(value = "警号", converter = ExcelDictConvert.class) + @ExcelProperty(value = "警号") private String policeNo; /** * 姓名(若有) */ - @ExcelProperty(value = "警员姓名", converter = ExcelDictConvert.class) + @ExcelProperty(value = "警员姓名") private String policeName; /** * 联系电话(若有) */ - @ExcelProperty(value = "电话号码", converter = ExcelDictConvert.class) + @ExcelProperty(value = "电话号码") private String phoneNum; /** * 车牌号(若有) */ - @ExcelProperty(value = "车牌号", converter = ExcelDictConvert.class) + @ExcelProperty(value = "车牌号") private String carNum; - @ExcelProperty(value = "证件号码", converter = ExcelDictConvert.class) + @ExcelProperty(value = "证件号码") private String cardNum; + @ExcelProperty(value = "通道编码") + private String tdbm; + + @ExcelProperty(value = "国标编码") + private String gbbm; + /** * 0无效,1有效 */ - @ExcelProperty(value = "有效性") - @ExcelDictFormat(readConverterExp = "1=有效,0=无效") + @ExcelProperty(value = "有效性", converter = ExcelDictConvert.class) + @ExcelDictFormat(dictType = "zd_device_status") private Integer valid; /** diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceImportVo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceImportVo.java index 69b1ecc8..77592b1f 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceImportVo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceImportVo.java @@ -16,7 +16,7 @@ public class TDeviceImportVo implements Serializable { @Serial private static final long serialVersionUID = 1L; - @ExcelProperty(value = "设备编号") + @ExcelProperty(value = "设备编码") private String deviceCode; /** @@ -65,6 +65,12 @@ public class TDeviceImportVo implements Serializable { @ExcelProperty(value = "证件号码") private String cardNum; + @ExcelProperty(value = "通道编码") + private String tdbm; + + @ExcelProperty(value = "国标编码") + private String gbbm; + /** * 0无效,1有效 */ diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceVo.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceVo.java index c812ca5e..6b6f6a25 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceVo.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/domain/vo/TDeviceVo.java @@ -91,6 +91,10 @@ public class TDeviceVo implements Serializable { private String cardNum; + private String tdbm; + + private String gbbm; + /** * 0无效,1有效 */ diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/listener/TDeviceImportListener.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/listener/TDeviceImportListener.java index 046e5b8a..87162912 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/listener/TDeviceImportListener.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/listener/TDeviceImportListener.java @@ -16,8 +16,10 @@ import org.dromara.common.excel.core.ExcelListener; import org.dromara.common.excel.core.ExcelResult; import org.dromara.common.satoken.utils.LoginHelper; import org.dromara.system.domain.bo.TDeviceBo; +import org.dromara.system.domain.vo.SysDeptVo; import org.dromara.system.domain.vo.TDeviceImportVo; import org.dromara.system.domain.vo.TDeviceVo; +import org.dromara.system.service.ISysDeptService; import org.dromara.system.service.ITDeviceService; import java.util.List; @@ -27,6 +29,8 @@ public class TDeviceImportListener extends AnalysisEventListener").append(successNum).append("、设备 ").append(deviceBo.getDeviceCode()).append(" 导入成功"); - } else if (isUpdateSupport) { - Long id = deviceVo.getId(); - TDeviceBo deviceBo = BeanUtil.toBean(deviceVo, TDeviceBo.class); - deviceBo.setId(id); - ValidatorUtils.validate(deviceBo); - deviceService.updateByBo(deviceBo); - successNum++; - successMsg.append("
").append(successNum).append("、设备 ").append(deviceImportVo.getDeviceCode()).append(" 更新成功"); - } else { + + if (null != deviceImportVo.getDeviceCode() && deviceImportVo.getDeviceCode().contains("设备编码为国标编码")){ + failureNum = 1; + failureMsg.append("请删除Excel中提示必须删除的行后重新导入!"); + }else { + String deviceCode = deviceImportVo.getDeviceCode().replaceAll("[\\p{Zs}\\s]", ""); + TDeviceVo deviceVo = this.deviceService.queryByDeviceCode(deviceCode); + try { + // 验证是否存在这个设备 + if (ObjectUtil.isNull(deviceVo)) { + deviceImportVo.setDeviceCode(deviceCode); + SysDeptVo deptVo = deptService.selectDeptById(deviceImportVo.getZzjgdm()); + if(null != deptVo){ + deviceImportVo.setZzjgmc(deptVo.getShortName()); + } + TDeviceBo deviceBo = BeanUtil.toBean(deviceImportVo, TDeviceBo.class); + ValidatorUtils.validate(deviceBo); + deviceService.insertByBo(deviceBo); + successNum++; + successMsg.append("
").append(successNum).append("、设备 ").append(deviceBo.getDeviceCode()).append(" 导入成功"); + } else if (isUpdateSupport) { + Long id = deviceVo.getId(); + SysDeptVo deptVo = deptService.selectDeptById(deviceVo.getZzjgdm()); + if(null != deptVo){ + deviceVo.setZzjgmc(deptVo.getShortName()); + } + TDeviceBo deviceBo = BeanUtil.toBean(deviceVo, TDeviceBo.class); + deviceBo.setId(id); + ValidatorUtils.validate(deviceBo); + deviceService.updateByBo(deviceBo); + successNum++; + successMsg.append("
").append(successNum).append("、设备 ").append(deviceImportVo.getDeviceCode()).append(" 更新成功"); + } else { + failureNum++; + failureMsg.append("
").append(failureNum).append("、设备 ").append(deviceImportVo.getDeviceCode()).append(" 已存在"); + } + } catch (Exception e) { failureNum++; - failureMsg.append("
").append(failureNum).append("、设备 ").append(deviceImportVo.getDeviceCode()).append(" 已存在"); + String msg = "
" + failureNum + "、设备 " + HtmlUtil.cleanHtmlTag(deviceImportVo.getDeviceCode()) + " 导入失败:"; + String message = e.getMessage(); + if (e instanceof ConstraintViolationException cvException) { + message = StreamUtils.join(cvException.getConstraintViolations(), ConstraintViolation::getMessage, ", "); + } + failureMsg.append(msg).append(message); + log.error(msg, e); } - } catch (Exception e) { - failureNum++; - String msg = "
" + failureNum + "、设备 " + HtmlUtil.cleanHtmlTag(deviceImportVo.getDeviceCode()) + " 导入失败:"; - String message = e.getMessage(); - if (e instanceof ConstraintViolationException cvException) { - message = StreamUtils.join(cvException.getConstraintViolations(), ConstraintViolation::getMessage, ", "); - } - failureMsg.append(msg).append(message); - log.error(msg, e); } + } @Override diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/ISysNoticeService.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/ISysNoticeService.java index 8ec999d0..f24596ec 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/ISysNoticeService.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/ISysNoticeService.java @@ -33,6 +33,8 @@ public interface ISysNoticeService { */ List selectNoticeList(SysNoticeBo notice); + List selectTodayNoticeList(); + /** * 新增公告 * diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java index 879ce35e..577dfa52 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysDeptServiceImpl.java @@ -82,6 +82,7 @@ public class SysDeptServiceImpl implements ISysDeptService { lqw.eq(StringUtils.isNotBlank(bo.getParentId()), SysDept::getParentId, bo.getParentId()); lqw.like(StringUtils.isNotBlank(bo.getDeptName()), SysDept::getDeptName, bo.getDeptName()); lqw.eq(StringUtils.isNotBlank(bo.getStatus()), SysDept::getStatus, bo.getStatus()); + lqw.eq(StringUtils.isNotBlank(bo.getIsVisible()),SysDept::getIsVisible,bo.getIsVisible()); lqw.eq(StringUtils.isNotBlank(bo.getFullName()), SysDept::getFullName, bo.getFullName()); lqw.orderByAsc(SysDept::getAncestors); lqw.orderByAsc(SysDept::getParentId); @@ -104,7 +105,7 @@ public class SysDeptServiceImpl implements ISysDeptService { return TreeBuildUtils.build(depts, (dept, tree) -> tree.setId(dept.getDeptId()) .setParentId(dept.getParentId()) - .setName(dept.getDeptName()) + .setName(dept.getShortName()) .setWeight(dept.getOrderNum())); } @@ -289,6 +290,10 @@ public class SysDeptServiceImpl implements ISysDeptService { // 如果该部门是启用状态,则启用该部门的所有上级部门 updateParentDeptStatusNormal(dept); } + if ("0".equals(dept.getIsVisible())){ //如果隐藏该部门则其子部门全部隐藏 + updateDeptChildrenVisiable(dept.getDeptId(),"0"); + } + return result; } @@ -305,6 +310,19 @@ public class SysDeptServiceImpl implements ISysDeptService { .in(SysDept::getDeptId, Arrays.asList(deptIds))); } + /* + * 修改子部门为隐藏 + * + * */ + private void updateDeptChildrenVisiable(String deptId,String visible){ + List children = baseMapper.selectList(new LambdaQueryWrapper() + .apply(DataBaseHelper.findInSet(deptId, "ancestors"))); + for (SysDept child : children) { + child.setIsVisible(visible); + baseMapper.updateById(child); + } + } + /** * 修改子元素关系 * diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysNoticeServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysNoticeServiceImpl.java index db63e61d..226a6fb3 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysNoticeServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/SysNoticeServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.system.service.impl; +import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -20,6 +21,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import java.util.Arrays; +import java.util.Date; import java.util.List; /** @@ -64,6 +66,14 @@ public class SysNoticeServiceImpl implements ISysNoticeService { return baseMapper.selectVoList(lqw); } + @Override + public List selectTodayNoticeList() { + LambdaQueryWrapper lqw = new LambdaQueryWrapper<>(); + lqw.likeRight(SysNotice::getCreateTime, DateUtil.formatDate(new Date())); + lqw.eq(SysNotice::getNoticeType,"3"); + return baseMapper.selectVoList(lqw); + } + private LambdaQueryWrapper buildQueryWrapper(SysNoticeBo bo) { LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); lqw.like(StringUtils.isNotBlank(bo.getNoticeTitle()), SysNotice::getNoticeTitle, bo.getNoticeTitle()); diff --git a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java index bcbf2257..9a143aa9 100644 --- a/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java +++ b/stwzhj-modules/stwzhj-system/src/main/java/org/dromara/system/service/impl/TDeviceServiceImpl.java @@ -166,6 +166,7 @@ public class TDeviceServiceImpl implements ITDeviceService { */ @Override public Boolean insertByBo(TDeviceBo bo) { + bo.setDeviceCode(bo.getDeviceCode().replaceAll("[\\p{Zs}\\s]", "")); TDevice add = MapstructUtils.convert(bo, TDevice.class); validEntityBeforeSave(add); add.setCreateTime(DateUtil.now()); diff --git a/stwzhj-modules/stwzhj-system/src/main/resources/logback-plus.xml b/stwzhj-modules/stwzhj-system/src/main/resources/logback-plus.xml index caaa3455..909b3020 100644 --- a/stwzhj-modules/stwzhj-system/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/stwzhj-system/src/main/resources/logback-plus.xml @@ -1,18 +1,18 @@ - - + + - - - - ${console.log.pattern} - utf-8 - - + @@ -21,8 +21,66 @@ - - - + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + diff --git a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/SysDeptMapper.xml b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/SysDeptMapper.xml index 98fa431b..ffcc40ae 100644 --- a/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/SysDeptMapper.xml +++ b/stwzhj-modules/stwzhj-system/src/main/resources/mapper/system/SysDeptMapper.xml @@ -70,7 +70,7 @@ group by SUBSTRING(zzjgdm,1,8)) a on SUBSTRING(d.dept_id,1,8) = SUBSTRING(a.zzjgdm,1,8) - where `status` = '0' and del_flag =0 and parent_id = '341800000000' and dept_id like '341800%' + where `status` = '0' and del_flag =0 and is_visible = '1' and parent_id = '341300000000' and dept_id like '341300%' union @@ -92,7 +92,7 @@ group by SUBSTRING(zzjgdm,1,6)) a on SUBSTRING(d.dept_id,1,6) = SUBSTRING(a.zzjgdm,1,6) - where `status` = '0' and del_flag =0 and parent_id = '341800000000' and dept_id not like '341800%' + where `status` = '0' and del_flag =0 and is_visible = '1' and parent_id = '341300000000' and dept_id not like '341300%' union @@ -113,7 +113,7 @@ group by SUBSTRING(zzjgdm,1,8)) a on SUBSTRING(d.dept_id,1,8) = SUBSTRING(a.zzjgdm,1,8) - where `status` = '0' and del_flag =0 and parent_id != '341800000000' and dept_id not like '341800%' and dept_id like '%0000' + where `status` = '0' and del_flag =0 and is_visible = '1' and parent_id != '341300000000' and parent_id != '0' and dept_id like '%0000' UNION select dept_id as deptId,short_name as deptName,parent_id as parentId,IFNULL(onlineCount,0) as onlineCount,IFNULL(allCount,0) as allCount from sys_dept d @@ -133,7 +133,7 @@ group by SUBSTRING(zzjgdm,1,10)) a on SUBSTRING(d.dept_id,1,10) = SUBSTRING(a.zzjgdm,1,10) - where `status` = '0' and del_flag =0 and parent_id != '341800000000' and parent_id != '0' and dept_id like '341800%' and dept_id like '%00' + where `status` = '0' and del_flag =0 and is_visible = '1' and parent_id != '341300000000' and parent_id != '0' and dept_id not like '%0000' and dept_id like '%00' union @@ -154,7 +154,7 @@ group by zzjgdm) a on d.dept_id = a.zzjgdm - where `status` = '0' and del_flag =0 and LENGTH(ancestors) = 40 + where `status` = '0' and is_visible = '1' and del_flag =0 and LENGTH(ancestors) = 53 ) a order by a.deptId asc @@ -175,7 +175,7 @@ LEFT JOIN (select substring(zzjgdm,1,8) as zzjgdm ,count(*) as allCount from t_device where device_type = #{deviceType} and valid =1 group by SUBSTRING(zzjgdm,1,8)) a on SUBSTRING(d.dept_id,1,8) = SUBSTRING(a.zzjgdm,1,8) - where `status` = '0' and del_flag =0 and parent_id = '340100000000' and dept_id like '340100%' + where `status` = '0' and del_flag =0 and is_visible = '1' and parent_id = '341300000000' and dept_id like '341300%' union @@ -187,7 +187,7 @@ LEFT JOIN (select substring(zzjgdm,1,6) as zzjgdm ,count(*) as allCount from t_device where device_type = #{deviceType} and valid =1 group by SUBSTRING(zzjgdm,1,6)) a on SUBSTRING(d.dept_id,1,6) = SUBSTRING(a.zzjgdm,1,6) - where `status` = '0' and del_flag =0 and parent_id = '340100000000' and dept_id not like '340100%' + where `status` = '0' and del_flag =0 and is_visible = '1' and parent_id = '341300000000' and dept_id not like '341300%' union @@ -198,7 +198,7 @@ LEFT JOIN (select substring(zzjgdm,1,8) as zzjgdm ,count(*) as allCount from t_device where device_type = #{deviceType} and valid = 1 group by SUBSTRING(zzjgdm,1,8)) a on SUBSTRING(d.dept_id,1,8) = SUBSTRING(a.zzjgdm,1,8) - where `status` = '0' and del_flag =0 and parent_id != '340100000000' and dept_id not like '340100%' and dept_id like '%0000' + where `status` = '0' and del_flag =0 and is_visible = '1' and parent_id != '341300000000' and parent_id != '0' and dept_id like '%0000' UNION select dept_id as deptId,short_name as deptName,parent_id as parentId,IFNULL(onlineCount,0) as onlineCount,IFNULL(allCount,0) as allCount from sys_dept d @@ -208,7 +208,7 @@ LEFT JOIN (select substring(zzjgdm,1,10) as zzjgdm ,count(*) as allCount from t_device where device_type = #{deviceType} and valid = 1 group by SUBSTRING(zzjgdm,1,10)) a on SUBSTRING(d.dept_id,1,10) = SUBSTRING(a.zzjgdm,1,10) - where `status` = '0' and del_flag =0 and parent_id != '340100000000' and parent_id != '0' and dept_id like '340100%' and dept_id like '%00' + where `status` = '0' and del_flag =0 and is_visible = '1' and parent_id != '341300000000' and parent_id != '0' and dept_id not like '%0000' and dept_id like '%00' union @@ -219,7 +219,7 @@ LEFT JOIN (select zzjgdm as zzjgdm ,count(*) as allCount from t_device where device_type = #{deviceType} and valid = 1 group by zzjgdm) a on d.dept_id = a.zzjgdm - where `status` = '0' and del_flag =0 and LENGTH(ancestors) = 40 + where `status` = '0' and del_flag =0 and is_visible = '1' and LENGTH(ancestors) = 53 ) count_temp diff --git a/stwzhj-modules/wzhj-extract/src/main/resources/logback-plus.xml b/stwzhj-modules/wzhj-extract/src/main/resources/logback-plus.xml index caaa3455..909b3020 100644 --- a/stwzhj-modules/wzhj-extract/src/main/resources/logback-plus.xml +++ b/stwzhj-modules/wzhj-extract/src/main/resources/logback-plus.xml @@ -1,18 +1,18 @@ - - + + - - - - ${console.log.pattern} - utf-8 - - + @@ -21,8 +21,66 @@ - - - + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + diff --git a/stwzhj-modules/wzhj-udp/src/main/java/org/dromara/udp/controller/OriginalUdpReceiver.java b/stwzhj-modules/wzhj-udp/src/main/java/org/dromara/udp/controller/OriginalUdpReceiver.java index f702b648..daa461e7 100644 --- a/stwzhj-modules/wzhj-udp/src/main/java/org/dromara/udp/controller/OriginalUdpReceiver.java +++ b/stwzhj-modules/wzhj-udp/src/main/java/org/dromara/udp/controller/OriginalUdpReceiver.java @@ -77,7 +77,7 @@ public class OriginalUdpReceiver implements ApplicationListener - - + + - - - - ${console.log.pattern} - utf-8 - - + @@ -21,8 +21,66 @@ - - - + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + diff --git a/stwzhj-modules/wzhj-webscoket/src/main/resources/logback-plus.xml b/stwzhj-modules/wzhj-webscoket/src/main/resources/logback-plus.xml new file mode 100644 index 00000000..909b3020 --- /dev/null +++ b/stwzhj-modules/wzhj-webscoket/src/main/resources/logback-plus.xml @@ -0,0 +1,86 @@ + + + + + + + + + + + + + + + + + + + + + + ERROR + + DENY + + ACCEPT + + + ${LOG_PATH}${LOG_FILE} + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + ${LOG_PATH}info/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + 50MB + 20 + + + + + + + + Error + + + ${LOG_PATH}error.${LOG_FILE} + + + + ${LOG_PATH}error/${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz + + 50MB + 180 + + + + + + UTF-8 + %date [%level] [%thread] %logger{60} [%file : %line] %msg%n + + + + + + + + + + diff --git a/stwzhj-visual/stwzhj-nacos/src/main/resources/application.properties b/stwzhj-visual/stwzhj-nacos/src/main/resources/application.properties index 7d30f1ae..58dafa10 100644 --- a/stwzhj-visual/stwzhj-nacos/src/main/resources/application.properties +++ b/stwzhj-visual/stwzhj-nacos/src/main/resources/application.properties @@ -40,9 +40,10 @@ spring.sql.init.platform=mysql db.num=1 ### Connect URL of DB: -db.url.0=jdbc:mysql://127.0.0.1:3306/ry-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true +#db.url.0=jdbc:mysql://127.0.0.1:3306/ry-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true +db.url.0=jdbc:mysql://53.176.146.98:3306/wzhj-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true db.user.0=root -db.password.0=root +db.password.0=Ycgis!2509 ### the maximum retry times for push nacos.config.push.maxRetryTime=50