You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
172 lines
6.3 KiB
172 lines
6.3 KiB
package app.annotations.impl;
|
|
|
|
import app.updates.BaseUpdater;
|
|
import jakarta.annotation.PostConstruct;
|
|
import jakarta.servlet.http.HttpServletRequest;
|
|
import org.aspectj.lang.annotation.After;
|
|
import org.aspectj.lang.annotation.Aspect;
|
|
import org.aspectj.lang.annotation.Before;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
|
import org.springframework.context.annotation.Configuration;
|
|
import org.springframework.context.event.EventListener;
|
|
import org.springframework.core.env.Environment;
|
|
import org.springframework.http.HttpEntity;
|
|
import org.springframework.http.HttpHeaders;
|
|
import org.springframework.http.HttpMethod;
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.ObjectInputStream;
|
|
import java.time.Instant;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.Set;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.function.Supplier;
|
|
|
|
@Aspect
|
|
@Configuration
|
|
public class ClusterMethodAspect extends BaseUpdater {
|
|
|
|
@Autowired
|
|
private HttpServletRequest httpServletRequest;
|
|
private final HashMap<String, Long> lastClusterPulse = new HashMap<>();
|
|
|
|
private final RestTemplate restTemplate;
|
|
private final String[] clusters;
|
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
private final Long max_range = 15L;
|
|
|
|
@Autowired
|
|
public ClusterMethodAspect(Environment environment) {
|
|
this.restTemplate = new RestTemplate();
|
|
this.clusters = environment.getProperty("CLUSTERS","").isEmpty() ? new String[0] : environment.getProperty("CLUSTERS","").split(",");
|
|
if (this.clusters.length > 0) {
|
|
for (int i = 0; i < this.clusters.length; i++) {
|
|
this.logger.info("[Clusters init] found {} host", this.clusters[i]);
|
|
}
|
|
} else {
|
|
this.logger.warn("[Clusters init] not found clusters");
|
|
}
|
|
}
|
|
|
|
@After("@annotation(app.annotations.interfaces.ClusterMethod)")
|
|
public void after() {
|
|
try {
|
|
this.sendData(httpServletRequest);
|
|
} catch (Exception e) {
|
|
logger.error("Cannot create data to send to other clusters", e);
|
|
}
|
|
}
|
|
|
|
private void sendData(HttpServletRequest request) {
|
|
if (clusters.length > 0) {
|
|
ExecutorService executor = Executors.newFixedThreadPool(clusters.length);
|
|
Set<Callable<Supplier>> callables = new HashSet<>(clusters.length);
|
|
|
|
ClusterRequest clusterRequest = new ClusterRequest(request);
|
|
|
|
for(int i = 0; i < clusters.length; i++) {
|
|
int finalI = i;
|
|
callables.add(() -> {
|
|
if (lastClusterPulse.containsKey(clusters[finalI]) && Instant.now().getEpochSecond() - lastClusterPulse.get(clusters[finalI]) > max_range) {
|
|
this.logger.warn("Cannot redirect packet to cluster {}, he is ded more {} seconds", clusters[finalI], max_range);
|
|
return null;
|
|
}
|
|
|
|
try {
|
|
restTemplate.exchange("http://" + clusters[finalI] + clusterRequest.getUrl(), clusterRequest.getHttpMethod(), clusterRequest.getHttpEntity(clusters[finalI]), byte[].class);
|
|
} catch (Exception e) {
|
|
logger.error("Cannot send info to cluster {}", clusters[finalI], e);
|
|
}
|
|
|
|
return null;
|
|
});
|
|
}
|
|
|
|
try {
|
|
executor.invokeAll(callables);
|
|
} catch (InterruptedException ie) {}
|
|
finally {
|
|
executor.shutdown();
|
|
}
|
|
}
|
|
}
|
|
|
|
@EventListener(ApplicationReadyEvent.class)
|
|
private void createClustersChecker() {
|
|
for(int i = 0; i < clusters.length; i++) {
|
|
int finalI = i;
|
|
this.logger.info("Create cluster pulse check on {}", clusters[finalI]);
|
|
CreateTaskUpdater(() -> checkCluster(clusters[finalI]), 1000, getClass().getName());
|
|
}
|
|
}
|
|
|
|
private boolean checkCluster(String address) {
|
|
try {
|
|
restTemplate.getForEntity("http://" + address + "/api/pulse/db", Long.class).getBody();
|
|
lastClusterPulse.put(address, Instant.now().getEpochSecond());
|
|
return true;
|
|
} catch (Exception err) {
|
|
this.logger.error("Cannot pulse {}", address);
|
|
return false;
|
|
} finally {
|
|
if (lastClusterPulse.containsKey(address)) {
|
|
long range = Instant.now().getEpochSecond() - lastClusterPulse.get(address);
|
|
if (range > max_range) {
|
|
this.logger.warn("Cannot pulse {} more {} seconds", address, range);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
class ClusterRequest {
|
|
private final String url;
|
|
private final HttpHeaders httpHeaders;
|
|
private Object body;
|
|
private final HttpMethod httpMethod;
|
|
|
|
public ClusterRequest(HttpServletRequest httpServletRequest) {
|
|
StringBuilder url = new StringBuilder();
|
|
url.append(httpServletRequest.getRequestURI());
|
|
if (httpServletRequest.getQueryString() != null) {
|
|
url.append("?");
|
|
url.append(httpServletRequest.getQueryString());
|
|
}
|
|
this.url = url.toString();
|
|
|
|
httpHeaders = new HttpHeaders();
|
|
httpServletRequest.getHeaderNames().asIterator().forEachRemaining(
|
|
(headerName) -> httpHeaders.set(headerName, httpServletRequest.getHeader(headerName))
|
|
);
|
|
|
|
try (ObjectInputStream is = new ObjectInputStream(httpServletRequest.getInputStream())) {
|
|
body = is.readObject();
|
|
} catch (Exception e) {}
|
|
|
|
httpMethod = HttpMethod.valueOf(httpServletRequest.getMethod());
|
|
}
|
|
|
|
public String getUrl() {
|
|
return url;
|
|
}
|
|
|
|
public Object getBody() {
|
|
return body;
|
|
}
|
|
|
|
public HttpMethod getHttpMethod() {
|
|
return httpMethod;
|
|
}
|
|
|
|
public HttpEntity getHttpEntity(String cluster) {
|
|
httpHeaders.set("F13-TO-CLUSTER", cluster);
|
|
return new HttpEntity(body, httpHeaders);
|
|
}
|
|
}
|
|
}
|
|
|