From d33f211b8ba9f3737cf1a190d67386da87458231 Mon Sep 17 00:00:00 2001 From: gsd Date: Fri, 24 Feb 2023 21:23:14 +0300 Subject: [PATCH] remove jobrunr and fix thread leaks --- pom.xml | 10 ----- src/main/java/app/configs/JobRunrConfig.java | 17 ------- src/main/java/app/configs/ProtocolA2S.java | 42 ----------------- src/main/java/app/entities/Stats.java | 6 +-- .../java/app/updates/BanCountUpdater.java | 16 +++---- src/main/java/app/updates/BaseUpdater.java | 27 +++++++++++ .../java/app/updates/CountriesUpdater.java | 16 +++---- src/main/java/app/updates/PlayersUpdater.java | 27 ++++------- src/main/java/app/updates/SocialUpdater.java | 31 ++++++------- src/main/java/app/updates/UniqueUpdater.java | 45 +++++++------------ .../java/app/updates/VipCountUpdater.java | 29 +++++------- 11 files changed, 92 insertions(+), 174 deletions(-) delete mode 100644 src/main/java/app/configs/JobRunrConfig.java delete mode 100644 src/main/java/app/configs/ProtocolA2S.java create mode 100644 src/main/java/app/updates/BaseUpdater.java diff --git a/pom.xml b/pom.xml index bf96f0d..d0aef31 100644 --- a/pom.xml +++ b/pom.xml @@ -52,16 +52,6 @@ mysql-connector-j 8.0.32 - - org.jobrunr - jobrunr - 6.0.0 - - - org.jobrunr - jobrunr-spring-boot-starter - 5.3.3 - org.springframework.boot spring-boot-starter-actuator diff --git a/src/main/java/app/configs/JobRunrConfig.java b/src/main/java/app/configs/JobRunrConfig.java deleted file mode 100644 index a9d15b0..0000000 --- a/src/main/java/app/configs/JobRunrConfig.java +++ /dev/null @@ -1,17 +0,0 @@ -package app.configs; - -import org.jobrunr.jobs.mappers.JobMapper; -import org.jobrunr.storage.InMemoryStorageProvider; -import org.jobrunr.storage.StorageProvider; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class JobRunrConfig { - @Bean - public StorageProvider storageProvider(JobMapper jobMapper) { - InMemoryStorageProvider storageProvider = new InMemoryStorageProvider(); - storageProvider.setJobMapper(jobMapper); - return storageProvider; - } -} diff --git a/src/main/java/app/configs/ProtocolA2S.java b/src/main/java/app/configs/ProtocolA2S.java deleted file mode 100644 index cba48ee..0000000 --- a/src/main/java/app/configs/ProtocolA2S.java +++ /dev/null @@ -1,42 +0,0 @@ -package app.configs; - -import com.ibasco.agql.core.enums.RateLimitType; -import com.ibasco.agql.core.util.FailsafeOptions; -import com.ibasco.agql.core.util.GeneralOptions; -import com.ibasco.agql.protocols.valve.source.query.SourceQueryClient; -import com.ibasco.agql.protocols.valve.source.query.SourceQueryOptions; -import com.ibasco.agql.protocols.valve.source.query.rcon.SourceRconClient; -import com.ibasco.agql.protocols.valve.source.query.rcon.SourceRconOptions; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Scope; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -//@Configuration -public class ProtocolA2S { - //https://ribasco.github.io/async-gamequery-lib/examples/source_query_example.html#blocking-example - //@Scope("prototype") - /*@Bean - public SourceQueryClient GetSourceQueryClient() { - ExecutorService customExecutor = Executors.newCachedThreadPool(); - SourceQueryOptions options = SourceQueryOptions.builder() - .option(FailsafeOptions.FAILSAFE_RATELIMIT_TYPE, RateLimitType.BURST) - .option(GeneralOptions.THREAD_EXECUTOR_SERVICE, customExecutor) - .build(); - return new SourceQueryClient(options); - } -*/ - //@Scope("prototype") - /* - @Bean - public SourceRconClient GetSourceRconClient() { - ExecutorService customExecutor = Executors.newCachedThreadPool(); - SourceRconOptions options = SourceRconOptions.builder() - //.option(FailsafeOptions.FAILSAFE_RATELIMIT_TYPE, RateLimitType.BURST) - .option(GeneralOptions.THREAD_EXECUTOR_SERVICE, customExecutor) - .build(); - return new SourceRconClient(options); - }*/ -} \ No newline at end of file diff --git a/src/main/java/app/entities/Stats.java b/src/main/java/app/entities/Stats.java index 9ab41fa..ef586f3 100644 --- a/src/main/java/app/entities/Stats.java +++ b/src/main/java/app/entities/Stats.java @@ -77,7 +77,7 @@ public class Stats { uniq.merge(key, value, (x, y) -> y); } - public void RefreshServerA2SData(String server_name) throws IOException { + public void RefreshServerA2SData(String server_name) { //try (SourceQueryClient sourceQueryClient = context.getBean(SourceQueryClient.class)) { try (SourceQueryClient sourceQueryClient = GetSourceQueryClient()) { sourceQueryClient.getInfo(getServers().get(server_name).getInetAddress()).whenComplete((info, error) -> { @@ -88,7 +88,7 @@ public class Stats { } getServers().get(server_name).UpdateStatusFromA2S(info); }).join(); - } catch (CompletionException err) { + } catch (CompletionException | IOException err) { } if (!getServers().get(server_name).isStatus() || getServers().get(server_name).getPlayer_count() < 1) { @@ -104,7 +104,7 @@ public class Stats { if (error != null) return; getServers().get(server_name).UpdatePlayersFromA2S(players); }).join(); - } + } catch (CompletionException | IOException err) {} /////////////////////////////////////////////////////////////////////// //Extend current players of rcon result ////////////////////////////////////////////////////////////////////// diff --git a/src/main/java/app/updates/BanCountUpdater.java b/src/main/java/app/updates/BanCountUpdater.java index e3fc3a6..c7b6563 100644 --- a/src/main/java/app/updates/BanCountUpdater.java +++ b/src/main/java/app/updates/BanCountUpdater.java @@ -4,8 +4,6 @@ import app.entities.Stats; import jakarta.annotation.PostConstruct; import jakarta.persistence.EntityManager; import jakarta.persistence.PersistenceContext; -import org.jobrunr.jobs.annotations.Job; -import org.jobrunr.scheduling.JobScheduler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -13,32 +11,28 @@ import org.springframework.stereotype.Component; import java.time.Instant; @Component -public class BanCountUpdater { +public class BanCountUpdater extends BaseUpdater{ private Stats stats; - private JobScheduler jobScheduler; @Value("${backend.updates.ban_count}") boolean update = false; @PersistenceContext EntityManager entityManager; @Autowired - public BanCountUpdater(Stats stats, - JobScheduler jobScheduler) { + public BanCountUpdater(Stats stats) { this.stats = stats; - this.jobScheduler = jobScheduler; } @PostConstruct public void SetUpdater(){ if(update) { - jobScheduler.enqueue(() -> UpdateBanCount()); - jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateBanCount()); + CreateTaskUpdater(this::UpdateBanCount, 5 * 60 * 1000); } } - @Job(name = "Get ban count", retries = 0) - public void UpdateBanCount(){ + public boolean UpdateBanCount(){ stats.setBan_count((Long) entityManager.createNativeQuery("SELECT COUNT(*) as count FROM `light_bans` WHERE active = 1").getSingleResult()); stats.getUpdates().merge("ban_count", Instant.now().getEpochSecond(), (x, y) -> y); + return true; } } diff --git a/src/main/java/app/updates/BaseUpdater.java b/src/main/java/app/updates/BaseUpdater.java new file mode 100644 index 0000000..eeb425a --- /dev/null +++ b/src/main/java/app/updates/BaseUpdater.java @@ -0,0 +1,27 @@ +package app.updates; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +public abstract class BaseUpdater { + public void CreateTaskUpdater(Supplier function, int timeout) { + System.out.printf("Create task: %s, update after %d sec\n", function.toString(), timeout / 1000); + CompletableFuture.supplyAsync(() -> { + while (true) { + try { + System.out.printf("Call: %s\n", function.toString()); + function.get(); + } catch (Exception err) { + err.printStackTrace(); + } finally { + try { + Thread.sleep(timeout); + } catch (InterruptedException e) {} + } + } + }); + } +} diff --git a/src/main/java/app/updates/CountriesUpdater.java b/src/main/java/app/updates/CountriesUpdater.java index 6181f08..74dbe3d 100644 --- a/src/main/java/app/updates/CountriesUpdater.java +++ b/src/main/java/app/updates/CountriesUpdater.java @@ -7,8 +7,6 @@ import com.maxmind.geoip2.exception.GeoIp2Exception; import jakarta.annotation.PostConstruct; import jakarta.persistence.EntityManager; import jakarta.persistence.PersistenceContext; -import org.jobrunr.jobs.annotations.Job; -import org.jobrunr.scheduling.JobScheduler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -18,10 +16,9 @@ import java.net.UnknownHostException; import java.util.Map; @Component -public class CountriesUpdater { +public class CountriesUpdater extends BaseUpdater{ Stats stats; GeoIP geoIP; - JobScheduler jobScheduler; @PersistenceContext EntityManager entityManager; @@ -32,23 +29,19 @@ public class CountriesUpdater { @Autowired public CountriesUpdater(Stats stats, - GeoIP geoIP, - JobScheduler jobScheduler) { + GeoIP geoIP) { this.stats = stats; this.geoIP = geoIP; - this.jobScheduler = jobScheduler; } @PostConstruct public void UpdateCountries(){ if (update) { - jobScheduler.enqueue(() -> UpdateCountriesStatistic()); - jobScheduler.scheduleRecurrently("backend.stats.countries.update", "*/15 * * * *", () -> UpdateCountriesStatistic()); + CreateTaskUpdater(this::UpdateCountriesStatistic, 30 * 60 * 1000); } } - @Job(name = "Update countries statistic", retries = 0) - public void UpdateCountriesStatistic() { + public boolean UpdateCountriesStatistic() { stats.getCountries().clear(); String query = ""; @@ -64,5 +57,6 @@ public class CountriesUpdater { } catch (GeoIp2Exception e) { } } + return true; } } diff --git a/src/main/java/app/updates/PlayersUpdater.java b/src/main/java/app/updates/PlayersUpdater.java index e9308fd..5524843 100644 --- a/src/main/java/app/updates/PlayersUpdater.java +++ b/src/main/java/app/updates/PlayersUpdater.java @@ -2,8 +2,6 @@ package app.updates; import app.entities.Stats; import jakarta.annotation.PostConstruct; -import org.jobrunr.jobs.annotations.Job; -import org.jobrunr.scheduling.JobScheduler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; @@ -11,38 +9,31 @@ import org.springframework.stereotype.Component; import java.io.IOException; import java.time.Instant; +import java.util.concurrent.CompletableFuture; @Component -public class PlayersUpdater { +public class PlayersUpdater extends BaseUpdater{ Stats stats; - ApplicationContext context; - JobScheduler jobScheduler; @Value("${backend.updates.a2s}") private boolean update = false; + private int timeout = 60 * 1000; @Autowired - public PlayersUpdater(Stats stats, - ApplicationContext context, - JobScheduler jobScheduler) { + public PlayersUpdater(Stats stats) { this.stats = stats; - this.context = context; - this.jobScheduler = jobScheduler; } @PostConstruct public void updateValues() { if (update) { stats.getServers().forEach((server_name, server) -> { - jobScheduler.enqueue(() -> UpdatePlayersOnServer(server_name)); - jobScheduler.scheduleRecurrently("backend.stats.info.update." + server_name, "* * * * *", () -> UpdatePlayersOnServer(server_name)); + CreateTaskUpdater(() -> { + stats.RefreshServerA2SData(server_name); + stats.getUpdates().merge(server_name, Instant.now().getEpochSecond(), (x, y) -> y); + return null; + }, timeout); }); } } - - @Job(name = "Update A2S data on: %0", retries = 0) - public void UpdatePlayersOnServer(String server_name) throws IOException { - stats.RefreshServerA2SData(server_name); - stats.getUpdates().merge("servers", Instant.now().getEpochSecond(), (x, y) -> y); - } } diff --git a/src/main/java/app/updates/SocialUpdater.java b/src/main/java/app/updates/SocialUpdater.java index 8ab7f15..f1b8905 100644 --- a/src/main/java/app/updates/SocialUpdater.java +++ b/src/main/java/app/updates/SocialUpdater.java @@ -4,8 +4,6 @@ import app.entities.Stats; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.PostConstruct; -import org.jobrunr.jobs.annotations.Job; -import org.jobrunr.scheduling.JobScheduler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -17,11 +15,10 @@ import java.net.URL; import java.time.Instant; @Component -public class SocialUpdater { +public class SocialUpdater extends BaseUpdater{ RestTemplate restTemplate; ObjectMapper objectMapper; Stats stats; - JobScheduler jobScheduler; @Value("${backend.social.discord}") private String discord_url = ""; @@ -30,32 +27,31 @@ public class SocialUpdater { private String vk_url; @Autowired - public SocialUpdater(Stats stats, - JobScheduler jobScheduler) { + public SocialUpdater(Stats stats) { restTemplate = new RestTemplate(); objectMapper = new ObjectMapper(); this.stats = stats; - this.jobScheduler = jobScheduler; } @PostConstruct public void SetUpdater(){ if(!discord_url.isEmpty()) { - jobScheduler.enqueue(() -> UpdateDiscordCount()); - jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateDiscordCount()); - jobScheduler.enqueue(() -> UpdateVKCount()); - jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateVKCount()); + CreateTaskUpdater(this::UpdateDiscordCount, 5 * 60 * 1000); + } + if(!vk_url.isEmpty()){ + CreateTaskUpdater(this::UpdateVKCount, 5 * 60 * 1000); } } - @Job(name = "Get Discord users count", retries = 0) - public void UpdateDiscordCount() throws IOException { - stats.setDiscord_users(objectMapper.readTree(new URL(discord_url)).get("approximate_member_count").asInt()); - stats.getUpdates().merge("discord_count", Instant.now().getEpochSecond(), (x, y) -> y); + public boolean UpdateDiscordCount(){ + try { + stats.setDiscord_users(objectMapper.readTree(new URL(discord_url)).get("approximate_member_count").asInt()); + stats.getUpdates().merge("discord_count", Instant.now().getEpochSecond(), (x, y) -> y); + } catch (IOException err) {} + return true; } - @Job(name = "Get VK users count", retries = 0) - public void UpdateVKCount() { + public boolean UpdateVKCount() { int count = 0; String response = restTemplate.getForEntity(vk_url, String.class).getBody(); int k_start = response.indexOf("

") + "

".length(); @@ -66,5 +62,6 @@ public class SocialUpdater { count += Integer.valueOf(response.substring(s_start, s_end)); stats.setVk_users(count); stats.getUpdates().merge("vk_count", Instant.now().getEpochSecond(), (x, y) -> y); + return true; } } diff --git a/src/main/java/app/updates/UniqueUpdater.java b/src/main/java/app/updates/UniqueUpdater.java index fa62954..0b04117 100644 --- a/src/main/java/app/updates/UniqueUpdater.java +++ b/src/main/java/app/updates/UniqueUpdater.java @@ -5,8 +5,6 @@ import app.entities.Stats; import jakarta.annotation.PostConstruct; import jakarta.persistence.EntityManager; import jakarta.persistence.PersistenceContext; -import org.jobrunr.jobs.annotations.Job; -import org.jobrunr.scheduling.JobScheduler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -14,9 +12,8 @@ import org.springframework.stereotype.Component; import static java.time.Instant.now; @Component -public class UniqueUpdater { +public class UniqueUpdater extends BaseUpdater{ Stats stats; - JobScheduler jobScheduler; @PersistenceContext EntityManager entityManager; @@ -32,31 +29,23 @@ public class UniqueUpdater { boolean server_update = false; @Autowired - public UniqueUpdater(Stats stats, - JobScheduler jobScheduler) { + public UniqueUpdater(Stats stats) { this.stats = stats; - this.jobScheduler = jobScheduler; } @PostConstruct public void updateValues() throws InterruptedException { if(global_update) { Thread.sleep(2000); - jobScheduler.enqueue(() -> UpdateServerUniqueTotal()); - jobScheduler.enqueue(() -> UpdateServerUniqueYear()); - jobScheduler.enqueue(() -> UpdateServerUniqueMonth()); - jobScheduler.enqueue(() -> UpdateServerUniqueDay()); - /////////////////////////////////////////////////////////////////////////////////////// - jobScheduler.scheduleRecurrently("0 0 */1 * *", () -> UpdateServerUniqueTotal()); - jobScheduler.scheduleRecurrently("0 0 */1 * *", () -> UpdateServerUniqueYear()); - jobScheduler.scheduleRecurrently("0 0 */1 * *", () -> UpdateServerUniqueMonth()); - jobScheduler.scheduleRecurrently("0 */1 * * *", () -> UpdateServerUniqueDay()); + CreateTaskUpdater(this::UpdateServerUniqueTotal, 24 * 60 * 60 * 1000); + CreateTaskUpdater(this::UpdateServerUniqueYear, 24 * 60 * 60 * 1000); + CreateTaskUpdater(this::UpdateServerUniqueMonth, 24 * 60 * 60 * 1000); + CreateTaskUpdater(this::UpdateServerUniqueDay, 60 * 60 * 1000); } /////////////////////////////////////////////////////////////////////////////////////// if(server_update) { stats.getServers().forEach((server_name, server) -> { - jobScheduler.enqueue(() -> getServerUnique(server_name, server.getDb())); - jobScheduler.scheduleRecurrently("backend.stats.unique.update." + server_name, "*/5 * * * *", () -> getServerUnique(server_name, server.getDb())); + CreateTaskUpdater(() -> getServerUnique(server_name, server.getDb()), 5 * 60 * 1000); }); } } @@ -77,31 +66,31 @@ public class UniqueUpdater { } /////////////////////////////////////////////////////////////////////////////////////////// - @Job(name = "Get server unique statistic %0", retries = 0) - public void getServerUnique(String server_name, String db) { + public boolean getServerUnique(String server_name, String db) { stats.getServers().get(server_name).UpdateUniq("total", getServerUniqueFromQuery(query_total, db)); stats.getServers().get(server_name).UpdateUniq("day", getServerUniqueFromQuery(query_day, db)); stats.getServers().get(server_name).UpdateUniq("month", getServerUniqueFromQuery(query_month, db)); stats.getServers().get(server_name).UpdateUniq("year", getServerUniqueFromQuery(query_year, db)); + return true; } - @Job(name = "Get total count unique players on all server", retries = 0) - public void UpdateServerUniqueTotal() { + public boolean UpdateServerUniqueTotal() { stats.UpdateUniq("total", getServerUniqueFromQuery(query_total)); + return true; } - @Job(name = "Get year count unique players on all server", retries = 0) - public void UpdateServerUniqueYear() { + public boolean UpdateServerUniqueYear() { stats.UpdateUniq("year", getServerUniqueFromQuery(query_year)); + return true; } - @Job(name = "Get month count unique players on all server", retries = 0) - public void UpdateServerUniqueMonth() { + public boolean UpdateServerUniqueMonth() { stats.UpdateUniq("month", getServerUniqueFromQuery(query_month)); + return true; } - @Job(name = "Get day count unique players on all server", retries = 0) - public void UpdateServerUniqueDay() { + public boolean UpdateServerUniqueDay() { stats.UpdateUniq("day", getServerUniqueFromQuery(query_day)); + return true; } } diff --git a/src/main/java/app/updates/VipCountUpdater.java b/src/main/java/app/updates/VipCountUpdater.java index 6539d00..0fdf967 100644 --- a/src/main/java/app/updates/VipCountUpdater.java +++ b/src/main/java/app/updates/VipCountUpdater.java @@ -4,54 +4,49 @@ import app.entities.Stats; import jakarta.annotation.PostConstruct; import jakarta.persistence.EntityManager; import jakarta.persistence.PersistenceContext; -import org.jobrunr.jobs.annotations.Job; -import org.jobrunr.scheduling.JobScheduler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.time.Instant; +import java.util.concurrent.CompletableFuture; @Component -public class VipCountUpdater { +public class VipCountUpdater extends BaseUpdater{ private Stats stats; - private JobScheduler jobScheduler; @PersistenceContext EntityManager entityManager; @Value("${backend.updates.vip_count}") - boolean update = false; + private boolean update = false; + private int timeout = 5 * 60 * 1000; @Autowired - public VipCountUpdater(Stats stats, - JobScheduler jobScheduler) { + public VipCountUpdater(Stats stats) { this.stats = stats; - this.jobScheduler = jobScheduler; } @PostConstruct public void SetUpdater(){ - if(update) { - jobScheduler.enqueue(() -> UpdateFreeVIPCount()); - jobScheduler.enqueue(() -> UpdateVIPCount()); - jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateFreeVIPCount()); - jobScheduler.scheduleRecurrently("*/5 * * * *", () -> UpdateVIPCount()); + if (update) { + CreateTaskUpdater(this::UpdateVIPCount, timeout); + CreateTaskUpdater(this::UpdateFreeVIPCount, timeout); } } - @Job(name = "Get VIP count", retries = 0) - public void UpdateVIPCount() { + public boolean UpdateVIPCount() { stats.setVip_players( (Long) entityManager.createNativeQuery("SELECT COUNT(*) as count FROM `sm_admins` WHERE `status` LIKE 'VIP' AND (`comment` LIKE 'Donate.User' OR `comment` LIKE 'f13bot.User')") .getSingleResult()); stats.getUpdates().merge("vip_count", Instant.now().getEpochSecond(), (x, y) -> y); + return true; } - @Job(name = "Get FreeVIP count", retries = 0) - public void UpdateFreeVIPCount() { + public boolean UpdateFreeVIPCount() { stats.setFreevip_players( (Long) entityManager.createNativeQuery("SELECT COUNT(*) as count FROM `sm_admins` WHERE `status` LIKE 'VIP' AND `comment` LIKE 'f13bot.FreeVIP'") .getSingleResult()); stats.getUpdates().merge("freevip_count", Instant.now().getEpochSecond(), (x, y) -> y); + return true; } }