20 changed files with 257 additions and 28 deletions
@ -0,0 +1,154 @@ |
|||
package app.annotations.impl; |
|||
|
|||
import app.updates.BaseUpdater; |
|||
import jakarta.annotation.PostConstruct; |
|||
import jakarta.servlet.http.HttpServletRequest; |
|||
import org.aspectj.lang.annotation.After; |
|||
import org.aspectj.lang.annotation.Aspect; |
|||
import org.aspectj.lang.annotation.Before; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.core.env.Environment; |
|||
import org.springframework.http.HttpEntity; |
|||
import org.springframework.http.HttpHeaders; |
|||
import org.springframework.http.HttpMethod; |
|||
import org.springframework.web.client.RestTemplate; |
|||
|
|||
import java.io.ByteArrayInputStream; |
|||
import java.io.ObjectInputStream; |
|||
import java.time.Instant; |
|||
import java.util.HashSet; |
|||
import java.util.Set; |
|||
import java.util.concurrent.Callable; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.function.Supplier; |
|||
|
|||
@Aspect |
|||
@Configuration |
|||
public class ClusterMethodAspect extends BaseUpdater { |
|||
|
|||
@Autowired |
|||
private HttpServletRequest httpServletRequest; |
|||
|
|||
private final RestTemplate restTemplate; |
|||
private final String[] clusters; |
|||
private final Logger logger = LoggerFactory.getLogger(getClass()); |
|||
|
|||
@Autowired |
|||
public ClusterMethodAspect(Environment environment) { |
|||
this.restTemplate = new RestTemplate(); |
|||
this.clusters = environment.getProperty("CLUSTERS","").isEmpty() ? new String[0] : environment.getProperty("CLUSTERS","").split(","); |
|||
if (this.clusters.length > 0) { |
|||
for (int i = 0; i < this.clusters.length; i++) { |
|||
this.logger.info("[Clusters init] found {} host", this.clusters[i]); |
|||
} |
|||
} else { |
|||
this.logger.warn("[Clusters init] not found clusters"); |
|||
} |
|||
} |
|||
|
|||
@After("@annotation(app.annotations.interfaces.ClusterMethod)") |
|||
public void after() { |
|||
try { |
|||
this.sendData(httpServletRequest); |
|||
} catch (Exception e) { |
|||
logger.error("Cannot create data to send to other clusters", e); |
|||
} |
|||
} |
|||
|
|||
private void sendData(HttpServletRequest request) { |
|||
if (clusters.length > 0) { |
|||
ExecutorService executor = Executors.newFixedThreadPool(clusters.length); |
|||
Set<Callable<Supplier>> callables = new HashSet<>(clusters.length); |
|||
|
|||
ClusterRequest clusterRequest = new ClusterRequest(request); |
|||
|
|||
for(int i = 0; i < clusters.length; i++) { |
|||
int finalI = i; |
|||
callables.add(() -> { |
|||
try { |
|||
restTemplate.exchange("http://" + clusters[finalI] + clusterRequest.getUrl(), clusterRequest.getHttpMethod(), clusterRequest.getHttpEntity(clusters[finalI]), byte[].class); |
|||
} catch (Exception e) { |
|||
logger.error("Cannot send info to cluster {}", clusters[finalI], e); |
|||
} |
|||
|
|||
return null; |
|||
}); |
|||
} |
|||
|
|||
try { |
|||
executor.invokeAll(callables); |
|||
} catch (InterruptedException ie) {} |
|||
finally { |
|||
executor.shutdown(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
@PostConstruct |
|||
private void createClustersChecker() { |
|||
for(int i = 0; i < clusters.length; i++) { |
|||
int finalI = i; |
|||
this.logger.info("Create cluster pulse check on {}", clusters[finalI]); |
|||
CreateTaskUpdater(() -> checkCluster(clusters[finalI]), 60 * 1000, getClass().getName()); |
|||
} |
|||
} |
|||
|
|||
private boolean checkCluster(String address) { |
|||
try { |
|||
restTemplate.getForEntity("http://" + address + "/api/pulse/db", Long.class).getBody(); |
|||
return true; |
|||
} catch (Exception err) { |
|||
this.logger.error("Cannot pulse {}", address); |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
class ClusterRequest { |
|||
private final String url; |
|||
private final HttpHeaders httpHeaders; |
|||
private Object body; |
|||
private final HttpMethod httpMethod; |
|||
|
|||
public ClusterRequest(HttpServletRequest httpServletRequest) { |
|||
StringBuilder url = new StringBuilder(); |
|||
url.append(httpServletRequest.getRequestURI()); |
|||
if (httpServletRequest.getQueryString() != null) { |
|||
url.append("?"); |
|||
url.append(httpServletRequest.getQueryString()); |
|||
} |
|||
this.url = url.toString(); |
|||
|
|||
httpHeaders = new HttpHeaders(); |
|||
httpServletRequest.getHeaderNames().asIterator().forEachRemaining( |
|||
(headerName) -> httpHeaders.set(headerName, httpServletRequest.getHeader(headerName)) |
|||
); |
|||
|
|||
try (ObjectInputStream is = new ObjectInputStream(httpServletRequest.getInputStream())) { |
|||
body = is.readObject(); |
|||
} catch (Exception e) {} |
|||
|
|||
httpMethod = HttpMethod.valueOf(httpServletRequest.getMethod()); |
|||
} |
|||
|
|||
public String getUrl() { |
|||
return url; |
|||
} |
|||
|
|||
public Object getBody() { |
|||
return body; |
|||
} |
|||
|
|||
public HttpMethod getHttpMethod() { |
|||
return httpMethod; |
|||
} |
|||
|
|||
public HttpEntity getHttpEntity(String cluster) { |
|||
httpHeaders.set("F13-TO-CLUSTER", cluster); |
|||
return new HttpEntity(body, httpHeaders); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,15 @@ |
|||
package app.annotations.interfaces; |
|||
|
|||
import java.lang.annotation.ElementType; |
|||
import java.lang.annotation.Retention; |
|||
import java.lang.annotation.RetentionPolicy; |
|||
import java.lang.annotation.Target; |
|||
|
|||
|
|||
/** |
|||
* штука для дублирования данных на другой аналогичный сервис |
|||
*/ |
|||
@Retention(RetentionPolicy.RUNTIME) |
|||
@Target(ElementType.METHOD) |
|||
public @interface ClusterMethod { |
|||
} |
Loading…
Reference in new issue