diff --git a/src/main/java/app/websocket/handlers/ServersHandler.java b/src/main/java/app/websocket/handlers/ServersHandler.java index f77b8e4..e7dcdee 100644 --- a/src/main/java/app/websocket/handlers/ServersHandler.java +++ b/src/main/java/app/websocket/handlers/ServersHandler.java @@ -8,6 +8,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; @@ -15,11 +18,8 @@ import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.*; +import java.util.concurrent.*; @Component public class ServersHandler extends TextWebSocketHandler implements BaseWebsocketHandler { @@ -27,6 +27,8 @@ public class ServersHandler extends TextWebSocketHandler implements BaseWebsocke private final Map activeSessions = new ConcurrentHashMap<>(); private ObjectMapper objectMapper = new ObjectMapper(); private Stats stats; + private final BlockingQueue q = new LinkedBlockingQueue<>(); + private boolean q_run = true; @Autowired ServersHandler(Stats stats) { @@ -34,6 +36,25 @@ public class ServersHandler extends TextWebSocketHandler implements BaseWebsocke this.stats = stats; } + @EventListener(ApplicationReadyEvent.class) + public void pusher() throws InterruptedException { + while (q_run) { //ага иди нахуй, я врот ебал кто такое не уважает + try { + TextMessage textMessage = q.take(); + push2All(textMessage); + } catch (Exception e) { + logger.error("Cannot push ws message", e); + } finally { + Thread.sleep(50); + } + } + } + + @EventListener(ContextClosedEvent.class) + public void disablePusher(ContextClosedEvent event) { + this.q_run = false; + } + @Override public String getPath() { return "servers"; @@ -64,7 +85,7 @@ public class ServersHandler extends TextWebSocketHandler implements BaseWebsocke private void push2All(TextMessage msg) throws IOException { for (Map.Entry otherSession : Collections.synchronizedSet(activeSessions.entrySet())) { try { - synchronized (otherSession) { + synchronized (otherSession.getValue()) { otherSession.getValue().sendMessage(msg); } } catch (Exception err) { @@ -95,8 +116,9 @@ public class ServersHandler extends TextWebSocketHandler implements BaseWebsocke public void pushServer(String srv, Server server) { try { - push2All(getServersPayload(srv, server)); - } catch (IOException err) { + if (!q.offer(getServersPayload(srv, server))); + logger.warn("Cannot fill ws status queue, da blya max int mojet i ne hvatit ebanutiy 4tole blya"); + } catch (Exception err) { logger.error("Cannot send message"); } }