|
|
@ -19,6 +19,7 @@ import org.springframework.web.client.RestTemplate; |
|
|
|
import java.io.ByteArrayInputStream; |
|
|
|
import java.io.ObjectInputStream; |
|
|
|
import java.time.Instant; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.concurrent.Callable; |
|
|
@ -32,10 +33,12 @@ public class ClusterMethodAspect extends BaseUpdater { |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private HttpServletRequest httpServletRequest; |
|
|
|
private final HashMap<String, Long> lastClusterPulse = new HashMap<>(); |
|
|
|
|
|
|
|
private final RestTemplate restTemplate; |
|
|
|
private final String[] clusters; |
|
|
|
private final Logger logger = LoggerFactory.getLogger(getClass()); |
|
|
|
private final Long max_range = 300L; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
public ClusterMethodAspect(Environment environment) { |
|
|
@ -69,6 +72,11 @@ public class ClusterMethodAspect extends BaseUpdater { |
|
|
|
for(int i = 0; i < clusters.length; i++) { |
|
|
|
int finalI = i; |
|
|
|
callables.add(() -> { |
|
|
|
if (lastClusterPulse.containsKey(clusters[finalI]) && Instant.now().getEpochSecond() - lastClusterPulse.get(clusters[finalI]) > max_range) { |
|
|
|
this.logger.warn("Cannot redirect packet to cluster {}, he is ded more {} seconds", clusters[finalI], max_range); |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
try { |
|
|
|
restTemplate.exchange("http://" + clusters[finalI] + clusterRequest.getUrl(), clusterRequest.getHttpMethod(), clusterRequest.getHttpEntity(clusters[finalI]), byte[].class); |
|
|
|
} catch (Exception e) { |
|
|
@ -100,10 +108,18 @@ public class ClusterMethodAspect extends BaseUpdater { |
|
|
|
private boolean checkCluster(String address) { |
|
|
|
try { |
|
|
|
restTemplate.getForEntity("http://" + address + "/api/pulse/db", Long.class).getBody(); |
|
|
|
lastClusterPulse.put(address, Instant.now().getEpochSecond()); |
|
|
|
return true; |
|
|
|
} catch (Exception err) { |
|
|
|
this.logger.error("Cannot pulse {}", address); |
|
|
|
return false; |
|
|
|
} finally { |
|
|
|
if (lastClusterPulse.containsKey(address)) { |
|
|
|
long range = Instant.now().getEpochSecond() - lastClusterPulse.get(address); |
|
|
|
if (range > max_range) { |
|
|
|
this.logger.warn("Cannot pulse {} more {} seconds", address, range); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|