Browse Source

remove jobrunr and fix thread leaks

master
gsd 2 years ago
parent
commit
d33f211b8b
  1. 10
      pom.xml
  2. 17
      src/main/java/app/configs/JobRunrConfig.java
  3. 42
      src/main/java/app/configs/ProtocolA2S.java
  4. 6
      src/main/java/app/entities/Stats.java
  5. 16
      src/main/java/app/updates/BanCountUpdater.java
  6. 27
      src/main/java/app/updates/BaseUpdater.java
  7. 16
      src/main/java/app/updates/CountriesUpdater.java
  8. 27
      src/main/java/app/updates/PlayersUpdater.java
  9. 27
      src/main/java/app/updates/SocialUpdater.java
  10. 45
      src/main/java/app/updates/UniqueUpdater.java
  11. 29
      src/main/java/app/updates/VipCountUpdater.java

10
pom.xml

@ -52,16 +52,6 @@
<artifactId>mysql-connector-j</artifactId>
<version>8.0.32</version>
</dependency>
<dependency>
<groupId>org.jobrunr</groupId>
<artifactId>jobrunr</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.jobrunr</groupId>
<artifactId>jobrunr-spring-boot-starter</artifactId>
<version>5.3.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>

17
src/main/java/app/configs/JobRunrConfig.java

@ -1,17 +0,0 @@
package app.configs;
import org.jobrunr.jobs.mappers.JobMapper;
import org.jobrunr.storage.InMemoryStorageProvider;
import org.jobrunr.storage.StorageProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JobRunrConfig {
@Bean
public StorageProvider storageProvider(JobMapper jobMapper) {
InMemoryStorageProvider storageProvider = new InMemoryStorageProvider();
storageProvider.setJobMapper(jobMapper);
return storageProvider;
}
}

42
src/main/java/app/configs/ProtocolA2S.java

@ -1,42 +0,0 @@
package app.configs;
import com.ibasco.agql.core.enums.RateLimitType;
import com.ibasco.agql.core.util.FailsafeOptions;
import com.ibasco.agql.core.util.GeneralOptions;
import com.ibasco.agql.protocols.valve.source.query.SourceQueryClient;
import com.ibasco.agql.protocols.valve.source.query.SourceQueryOptions;
import com.ibasco.agql.protocols.valve.source.query.rcon.SourceRconClient;
import com.ibasco.agql.protocols.valve.source.query.rcon.SourceRconOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//@Configuration
public class ProtocolA2S {
//https://ribasco.github.io/async-gamequery-lib/examples/source_query_example.html#blocking-example
//@Scope("prototype")
/*@Bean
public SourceQueryClient GetSourceQueryClient() {
ExecutorService customExecutor = Executors.newCachedThreadPool();
SourceQueryOptions options = SourceQueryOptions.builder()
.option(FailsafeOptions.FAILSAFE_RATELIMIT_TYPE, RateLimitType.BURST)
.option(GeneralOptions.THREAD_EXECUTOR_SERVICE, customExecutor)
.build();
return new SourceQueryClient(options);
}
*/
//@Scope("prototype")
/*
@Bean
public SourceRconClient GetSourceRconClient() {
ExecutorService customExecutor = Executors.newCachedThreadPool();
SourceRconOptions options = SourceRconOptions.builder()
//.option(FailsafeOptions.FAILSAFE_RATELIMIT_TYPE, RateLimitType.BURST)
.option(GeneralOptions.THREAD_EXECUTOR_SERVICE, customExecutor)
.build();
return new SourceRconClient(options);
}*/
}

6
src/main/java/app/entities/Stats.java

@ -77,7 +77,7 @@ public class Stats {
uniq.merge(key, value, (x, y) -> y);
}
public void RefreshServerA2SData(String server_name) throws IOException {
public void RefreshServerA2SData(String server_name) {
//try (SourceQueryClient sourceQueryClient = context.getBean(SourceQueryClient.class)) {
try (SourceQueryClient sourceQueryClient = GetSourceQueryClient()) {
sourceQueryClient.getInfo(getServers().get(server_name).getInetAddress()).whenComplete((info, error) -> {
@ -88,7 +88,7 @@ public class Stats {
}
getServers().get(server_name).UpdateStatusFromA2S(info);
}).join();
} catch (CompletionException err) {
} catch (CompletionException | IOException err) {
}
if (!getServers().get(server_name).isStatus() || getServers().get(server_name).getPlayer_count() < 1) {
@ -104,7 +104,7 @@ public class Stats {
if (error != null) return;
getServers().get(server_name).UpdatePlayersFromA2S(players);
}).join();
}
} catch (CompletionException | IOException err) {}
///////////////////////////////////////////////////////////////////////
//Extend current players of rcon result
//////////////////////////////////////////////////////////////////////

16
src/main/java/app/updates/BanCountUpdater.java

@ -4,8 +4,6 @@ import app.entities.Stats;
import jakarta.annotation.PostConstruct;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import org.jobrunr.jobs.annotations.Job;
import org.jobrunr.scheduling.JobScheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -13,32 +11,28 @@ import org.springframework.stereotype.Component;
import java.time.Instant;
@Component
public class BanCountUpdater {
public class BanCountUpdater extends BaseUpdater{
private Stats stats;
private JobScheduler jobScheduler;
@Value("${backend.updates.ban_count}")
boolean update = false;
@PersistenceContext
EntityManager entityManager;
@Autowired
public BanCountUpdater(Stats stats,
JobScheduler jobScheduler) {
public BanCountUpdater(Stats stats) {
this.stats = stats;
this.jobScheduler = jobScheduler;
}
@PostConstruct
public void SetUpdater(){
if(update) {
jobScheduler.enqueue(() -> UpdateBanCount());
jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateBanCount());
CreateTaskUpdater(this::UpdateBanCount, 5 * 60 * 1000);
}
}
@Job(name = "Get ban count", retries = 0)
public void UpdateBanCount(){
public boolean UpdateBanCount(){
stats.setBan_count((Long) entityManager.createNativeQuery("SELECT COUNT(*) as count FROM `light_bans` WHERE active = 1").getSingleResult());
stats.getUpdates().merge("ban_count", Instant.now().getEpochSecond(), (x, y) -> y);
return true;
}
}

27
src/main/java/app/updates/BaseUpdater.java

@ -0,0 +1,27 @@
package app.updates;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
public abstract class BaseUpdater {
public void CreateTaskUpdater(Supplier function, int timeout) {
System.out.printf("Create task: %s, update after %d sec\n", function.toString(), timeout / 1000);
CompletableFuture.supplyAsync(() -> {
while (true) {
try {
System.out.printf("Call: %s\n", function.toString());
function.get();
} catch (Exception err) {
err.printStackTrace();
} finally {
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {}
}
}
});
}
}

16
src/main/java/app/updates/CountriesUpdater.java

@ -7,8 +7,6 @@ import com.maxmind.geoip2.exception.GeoIp2Exception;
import jakarta.annotation.PostConstruct;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import org.jobrunr.jobs.annotations.Job;
import org.jobrunr.scheduling.JobScheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -18,10 +16,9 @@ import java.net.UnknownHostException;
import java.util.Map;
@Component
public class CountriesUpdater {
public class CountriesUpdater extends BaseUpdater{
Stats stats;
GeoIP geoIP;
JobScheduler jobScheduler;
@PersistenceContext
EntityManager entityManager;
@ -32,23 +29,19 @@ public class CountriesUpdater {
@Autowired
public CountriesUpdater(Stats stats,
GeoIP geoIP,
JobScheduler jobScheduler) {
GeoIP geoIP) {
this.stats = stats;
this.geoIP = geoIP;
this.jobScheduler = jobScheduler;
}
@PostConstruct
public void UpdateCountries(){
if (update) {
jobScheduler.enqueue(() -> UpdateCountriesStatistic());
jobScheduler.scheduleRecurrently("backend.stats.countries.update", "*/15 * * * *", () -> UpdateCountriesStatistic());
CreateTaskUpdater(this::UpdateCountriesStatistic, 30 * 60 * 1000);
}
}
@Job(name = "Update countries statistic", retries = 0)
public void UpdateCountriesStatistic() {
public boolean UpdateCountriesStatistic() {
stats.getCountries().clear();
String query = "";
@ -64,5 +57,6 @@ public class CountriesUpdater {
} catch (GeoIp2Exception e) {
}
}
return true;
}
}

27
src/main/java/app/updates/PlayersUpdater.java

@ -2,8 +2,6 @@ package app.updates;
import app.entities.Stats;
import jakarta.annotation.PostConstruct;
import org.jobrunr.jobs.annotations.Job;
import org.jobrunr.scheduling.JobScheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
@ -11,38 +9,31 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
@Component
public class PlayersUpdater {
public class PlayersUpdater extends BaseUpdater{
Stats stats;
ApplicationContext context;
JobScheduler jobScheduler;
@Value("${backend.updates.a2s}")
private boolean update = false;
private int timeout = 60 * 1000;
@Autowired
public PlayersUpdater(Stats stats,
ApplicationContext context,
JobScheduler jobScheduler) {
public PlayersUpdater(Stats stats) {
this.stats = stats;
this.context = context;
this.jobScheduler = jobScheduler;
}
@PostConstruct
public void updateValues() {
if (update) {
stats.getServers().forEach((server_name, server) -> {
jobScheduler.enqueue(() -> UpdatePlayersOnServer(server_name));
jobScheduler.scheduleRecurrently("backend.stats.info.update." + server_name, "* * * * *", () -> UpdatePlayersOnServer(server_name));
CreateTaskUpdater(() -> {
stats.RefreshServerA2SData(server_name);
stats.getUpdates().merge(server_name, Instant.now().getEpochSecond(), (x, y) -> y);
return null;
}, timeout);
});
}
}
@Job(name = "Update A2S data on: %0", retries = 0)
public void UpdatePlayersOnServer(String server_name) throws IOException {
stats.RefreshServerA2SData(server_name);
stats.getUpdates().merge("servers", Instant.now().getEpochSecond(), (x, y) -> y);
}
}

27
src/main/java/app/updates/SocialUpdater.java

@ -4,8 +4,6 @@ import app.entities.Stats;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import org.jobrunr.jobs.annotations.Job;
import org.jobrunr.scheduling.JobScheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -17,11 +15,10 @@ import java.net.URL;
import java.time.Instant;
@Component
public class SocialUpdater {
public class SocialUpdater extends BaseUpdater{
RestTemplate restTemplate;
ObjectMapper objectMapper;
Stats stats;
JobScheduler jobScheduler;
@Value("${backend.social.discord}")
private String discord_url = "";
@ -30,32 +27,31 @@ public class SocialUpdater {
private String vk_url;
@Autowired
public SocialUpdater(Stats stats,
JobScheduler jobScheduler) {
public SocialUpdater(Stats stats) {
restTemplate = new RestTemplate();
objectMapper = new ObjectMapper();
this.stats = stats;
this.jobScheduler = jobScheduler;
}
@PostConstruct
public void SetUpdater(){
if(!discord_url.isEmpty()) {
jobScheduler.enqueue(() -> UpdateDiscordCount());
jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateDiscordCount());
jobScheduler.enqueue(() -> UpdateVKCount());
jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateVKCount());
CreateTaskUpdater(this::UpdateDiscordCount, 5 * 60 * 1000);
}
if(!vk_url.isEmpty()){
CreateTaskUpdater(this::UpdateVKCount, 5 * 60 * 1000);
}
}
@Job(name = "Get Discord users count", retries = 0)
public void UpdateDiscordCount() throws IOException {
public boolean UpdateDiscordCount(){
try {
stats.setDiscord_users(objectMapper.readTree(new URL(discord_url)).get("approximate_member_count").asInt());
stats.getUpdates().merge("discord_count", Instant.now().getEpochSecond(), (x, y) -> y);
} catch (IOException err) {}
return true;
}
@Job(name = "Get VK users count", retries = 0)
public void UpdateVKCount() {
public boolean UpdateVKCount() {
int count = 0;
String response = restTemplate.getForEntity(vk_url, String.class).getBody();
int k_start = response.indexOf("<h3 class=\"slim_header\">") + "<h3 class=\"slim_header\">".length();
@ -66,5 +62,6 @@ public class SocialUpdater {
count += Integer.valueOf(response.substring(s_start, s_end));
stats.setVk_users(count);
stats.getUpdates().merge("vk_count", Instant.now().getEpochSecond(), (x, y) -> y);
return true;
}
}

45
src/main/java/app/updates/UniqueUpdater.java

@ -5,8 +5,6 @@ import app.entities.Stats;
import jakarta.annotation.PostConstruct;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import org.jobrunr.jobs.annotations.Job;
import org.jobrunr.scheduling.JobScheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -14,9 +12,8 @@ import org.springframework.stereotype.Component;
import static java.time.Instant.now;
@Component
public class UniqueUpdater {
public class UniqueUpdater extends BaseUpdater{
Stats stats;
JobScheduler jobScheduler;
@PersistenceContext
EntityManager entityManager;
@ -32,31 +29,23 @@ public class UniqueUpdater {
boolean server_update = false;
@Autowired
public UniqueUpdater(Stats stats,
JobScheduler jobScheduler) {
public UniqueUpdater(Stats stats) {
this.stats = stats;
this.jobScheduler = jobScheduler;
}
@PostConstruct
public void updateValues() throws InterruptedException {
if(global_update) {
Thread.sleep(2000);
jobScheduler.enqueue(() -> UpdateServerUniqueTotal());
jobScheduler.enqueue(() -> UpdateServerUniqueYear());
jobScheduler.enqueue(() -> UpdateServerUniqueMonth());
jobScheduler.enqueue(() -> UpdateServerUniqueDay());
///////////////////////////////////////////////////////////////////////////////////////
jobScheduler.scheduleRecurrently("0 0 */1 * *", () -> UpdateServerUniqueTotal());
jobScheduler.scheduleRecurrently("0 0 */1 * *", () -> UpdateServerUniqueYear());
jobScheduler.scheduleRecurrently("0 0 */1 * *", () -> UpdateServerUniqueMonth());
jobScheduler.scheduleRecurrently("0 */1 * * *", () -> UpdateServerUniqueDay());
CreateTaskUpdater(this::UpdateServerUniqueTotal, 24 * 60 * 60 * 1000);
CreateTaskUpdater(this::UpdateServerUniqueYear, 24 * 60 * 60 * 1000);
CreateTaskUpdater(this::UpdateServerUniqueMonth, 24 * 60 * 60 * 1000);
CreateTaskUpdater(this::UpdateServerUniqueDay, 60 * 60 * 1000);
}
///////////////////////////////////////////////////////////////////////////////////////
if(server_update) {
stats.getServers().forEach((server_name, server) -> {
jobScheduler.enqueue(() -> getServerUnique(server_name, server.getDb()));
jobScheduler.scheduleRecurrently("backend.stats.unique.update." + server_name, "*/5 * * * *", () -> getServerUnique(server_name, server.getDb()));
CreateTaskUpdater(() -> getServerUnique(server_name, server.getDb()), 5 * 60 * 1000);
});
}
}
@ -77,31 +66,31 @@ public class UniqueUpdater {
}
///////////////////////////////////////////////////////////////////////////////////////////
@Job(name = "Get server unique statistic %0", retries = 0)
public void getServerUnique(String server_name, String db) {
public boolean getServerUnique(String server_name, String db) {
stats.getServers().get(server_name).UpdateUniq("total", getServerUniqueFromQuery(query_total, db));
stats.getServers().get(server_name).UpdateUniq("day", getServerUniqueFromQuery(query_day, db));
stats.getServers().get(server_name).UpdateUniq("month", getServerUniqueFromQuery(query_month, db));
stats.getServers().get(server_name).UpdateUniq("year", getServerUniqueFromQuery(query_year, db));
return true;
}
@Job(name = "Get total count unique players on all server", retries = 0)
public void UpdateServerUniqueTotal() {
public boolean UpdateServerUniqueTotal() {
stats.UpdateUniq("total", getServerUniqueFromQuery(query_total));
return true;
}
@Job(name = "Get year count unique players on all server", retries = 0)
public void UpdateServerUniqueYear() {
public boolean UpdateServerUniqueYear() {
stats.UpdateUniq("year", getServerUniqueFromQuery(query_year));
return true;
}
@Job(name = "Get month count unique players on all server", retries = 0)
public void UpdateServerUniqueMonth() {
public boolean UpdateServerUniqueMonth() {
stats.UpdateUniq("month", getServerUniqueFromQuery(query_month));
return true;
}
@Job(name = "Get day count unique players on all server", retries = 0)
public void UpdateServerUniqueDay() {
public boolean UpdateServerUniqueDay() {
stats.UpdateUniq("day", getServerUniqueFromQuery(query_day));
return true;
}
}

29
src/main/java/app/updates/VipCountUpdater.java

@ -4,54 +4,49 @@ import app.entities.Stats;
import jakarta.annotation.PostConstruct;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import org.jobrunr.jobs.annotations.Job;
import org.jobrunr.scheduling.JobScheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
@Component
public class VipCountUpdater {
public class VipCountUpdater extends BaseUpdater{
private Stats stats;
private JobScheduler jobScheduler;
@PersistenceContext
EntityManager entityManager;
@Value("${backend.updates.vip_count}")
boolean update = false;
private boolean update = false;
private int timeout = 5 * 60 * 1000;
@Autowired
public VipCountUpdater(Stats stats,
JobScheduler jobScheduler) {
public VipCountUpdater(Stats stats) {
this.stats = stats;
this.jobScheduler = jobScheduler;
}
@PostConstruct
public void SetUpdater(){
if(update) {
jobScheduler.enqueue(() -> UpdateFreeVIPCount());
jobScheduler.enqueue(() -> UpdateVIPCount());
jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateFreeVIPCount());
jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateVIPCount());
if (update) {
CreateTaskUpdater(this::UpdateVIPCount, timeout);
CreateTaskUpdater(this::UpdateFreeVIPCount, timeout);
}
}
@Job(name = "Get VIP count", retries = 0)
public void UpdateVIPCount() {
public boolean UpdateVIPCount() {
stats.setVip_players(
(Long) entityManager.createNativeQuery("SELECT COUNT(*) as count FROM `sm_admins` WHERE `status` LIKE 'VIP' AND (`comment` LIKE 'Donate.User' OR `comment` LIKE 'f13bot.User')")
.getSingleResult());
stats.getUpdates().merge("vip_count", Instant.now().getEpochSecond(), (x, y) -> y);
return true;
}
@Job(name = "Get FreeVIP count", retries = 0)
public void UpdateFreeVIPCount() {
public boolean UpdateFreeVIPCount() {
stats.setFreevip_players(
(Long) entityManager.createNativeQuery("SELECT COUNT(*) as count FROM `sm_admins` WHERE `status` LIKE 'VIP' AND `comment` LIKE 'f13bot.FreeVIP'")
.getSingleResult());
stats.getUpdates().merge("freevip_count", Instant.now().getEpochSecond(), (x, y) -> y);
return true;
}
}

Loading…
Cancel
Save