diff --git a/src/main/java/app/annotations/impl/ClusterMethodAspect.java b/src/main/java/app/annotations/impl/ClusterMethodAspect.java index 65ffb72..7aaa5e0 100644 --- a/src/main/java/app/annotations/impl/ClusterMethodAspect.java +++ b/src/main/java/app/annotations/impl/ClusterMethodAspect.java @@ -19,6 +19,7 @@ import org.springframework.web.client.RestTemplate; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.time.Instant; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Callable; @@ -32,10 +33,12 @@ public class ClusterMethodAspect extends BaseUpdater { @Autowired private HttpServletRequest httpServletRequest; + private final HashMap lastClusterPulse = new HashMap<>(); private final RestTemplate restTemplate; private final String[] clusters; private final Logger logger = LoggerFactory.getLogger(getClass()); + private final Long max_range = 300L; @Autowired public ClusterMethodAspect(Environment environment) { @@ -69,6 +72,11 @@ public class ClusterMethodAspect extends BaseUpdater { for(int i = 0; i < clusters.length; i++) { int finalI = i; callables.add(() -> { + if (lastClusterPulse.containsKey(clusters[finalI]) && Instant.now().getEpochSecond() - lastClusterPulse.get(clusters[finalI]) > max_range) { + this.logger.warn("Cannot redirect packet to cluster {}, he is ded more {} seconds", clusters[finalI], max_range); + return null; + } + try { restTemplate.exchange("http://" + clusters[finalI] + clusterRequest.getUrl(), clusterRequest.getHttpMethod(), clusterRequest.getHttpEntity(clusters[finalI]), byte[].class); } catch (Exception e) { @@ -100,10 +108,18 @@ public class ClusterMethodAspect extends BaseUpdater { private boolean checkCluster(String address) { try { restTemplate.getForEntity("http://" + address + "/api/pulse/db", Long.class).getBody(); + lastClusterPulse.put(address, Instant.now().getEpochSecond()); return true; } catch (Exception err) { this.logger.error("Cannot pulse {}", address); return false; + } finally { + if (lastClusterPulse.containsKey(address)) { + long range = Instant.now().getEpochSecond() - lastClusterPulse.get(address); + if (range > max_range) { + this.logger.warn("Cannot pulse {} more {} seconds", address, range); + } + } } }