#!/usr/bin/python3.6 VERSION = 1.0 #SYS IMPORTS from genericpath import exists, isfile from logging import root import sys import os import types import hashlib import shutil from os.path import isfile as check_file from os.path import isdir as check_dir from os import remove as remove_file from glob import glob from json import dumps import argparse from subprocess import call as SCall from time import time FNULL = open(os.devnull,'w') #VOLVO import a2s from valve.rcon import RCON from valve.rcon import log as RCON_LOGGING RCON_LOGGING.setLevel(40) import traceback DEBUG = 0 ALL_YES = 0 class colors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' class new_o: def __init__(self, server_name): self.prefix = "[{:^25}]".format(server_name) def info(self, text, end = "\n"): print(self.prefix + "[ {}INFO{}]{}".format(colors.OKBLUE, colors.ENDC, text), end = end) return True def error(self, text): print(self.prefix + "[{}ERROR{}]{}".format(colors.FAIL, colors.ENDC, text)) return True def warning(self, text): print(self.prefix + "[ {}WARN{}]{}".format(colors.WARNING, colors.ENDC, text)) return True def debug(self, text): if DEBUG: print(self.prefix + "[{}DEBUG{}]{}".format(colors.HEADER, colors.ENDC, text)) return True class o: def info(text): print("[ {}INFO{}]{}".format(colors.OKBLUE, colors.ENDC, text)) return True def error(text): print("[{}ERROR{}]{}".format(colors.FAIL, colors.ENDC, text)) return True def warning(text): print("[ {}WARN{}]{}".format(colors.WARNING, colors.ENDC, text)) return True def debug(text): if DEBUG: print("[{}DEBUG{}]{}".format(colors.HEADER, colors.ENDC, text)) return True def hashfile(filepath, blocksize = 65536): file_hash = hashlib.sha256() with open(filepath, "rb") as f: fb = f.read(blocksize) while len(fb) > 0: file_hash.update(fb) fb = f.read(blocksize) return file_hash.hexdigest() def create_symbolic_file(source_file, symbolic_file): cmd = f"ln -s {source_file} {symbolic_file}" print(f"Execute system cmd: {cmd}") SCall(cmd.split()) def compress_files(files, place, archive_name = None): if archive_name is None: archive_name = f"{time()}.tar.gz" else: archive_name += ".tar.gz" import tarfile with tarfile.open(f"{place}/{archive_name}", "w:gz") as tar: for file in files: tar.add(file) #cmd = f"find app -name {files} | tar cvzf {place}/{archive_name} -T -" #print(f"Execute system cmd: {cmd}") #SCall(cmd.split()) class tf2idb: def __init__(self, TF_FOLDER, ITEMS_GAME, DB_FILE): import vdf import sqlite3 import collections import copy def dict_merge(dct, merge_dct): """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of updating only top-level keys, dict_merge recurses down into dicts nested to an arbitrary depth, updating keys. The ``merge_dct`` is merged into ``dct``. :param dct: dict onto which the merge is executed :param merge_dct: dct merged into dct :return: None """ for k, v in merge_dct.items(): if (k == 'used_by_classes' or k == 'model_player_per_class'): #handles Demoman vs demoman... Valve pls v = dict((k2.lower(), v2) for k2, v2 in v.items()) if (k in dct and isinstance(dct[k], dict) and isinstance(v, collections.abc.Mapping)): dict_merge(dct[k], v) else: dct[k] = copy.deepcopy(v) def resolve_prefabs(item, prefabs): # generate list of prefabs prefab_list = item.get('prefab', '').split() for prefab in prefab_list: subprefabs = prefabs[prefab].get('prefab', '').split() prefab_list.extend(p for p in subprefabs if p not in prefab_list) # iterate over prefab list and merge, nested prefabs first result = {} for prefab in ( prefabs[p] for p in reversed(prefab_list) ): dict_merge(result, prefab) dict_merge(result, item) return result, prefab_list data = None ITEMS_GAME = TF_FOLDER + "/" + ITEMS_GAME DB_FILE = TF_FOLDER + "/" + DB_FILE db = sqlite3.connect(DB_FILE) dbc = db.cursor() with open(ITEMS_GAME) as f: data = vdf.parse(f) data = data['items_game'] dbc.execute('DROP TABLE IF EXISTS new_tf2idb_class') dbc.execute('DROP TABLE IF EXISTS new_tf2idb_item_attributes') dbc.execute('DROP TABLE IF EXISTS new_tf2idb_item') dbc.execute('DROP TABLE IF EXISTS new_tf2idb_particles') dbc.execute('DROP TABLE IF EXISTS new_tf2idb_equip_conflicts') dbc.execute('DROP TABLE IF EXISTS new_tf2idb_equip_regions') dbc.execute('DROP TABLE IF EXISTS new_tf2idb_capabilities') dbc.execute('DROP TABLE IF EXISTS new_tf2idb_attributes') dbc.execute('DROP TABLE IF EXISTS new_tf2idb_qualities') dbc.execute('CREATE TABLE "new_tf2idb_class" ("id" INTEGER NOT NULL , "class" TEXT NOT NULL , "slot" TEXT , PRIMARY KEY ("id", "class"))') dbc.execute('CREATE TABLE "new_tf2idb_item_attributes" (' '"id" INTEGER NOT NULL,' '"attribute" INTEGER NOT NULL,' '"value" TEXT NOT NULL,' '"static" INTEGER,' 'PRIMARY KEY ("id", "attribute")' ')' ) dbc.execute('CREATE TABLE "new_tf2idb_item" (' '"id" INTEGER PRIMARY KEY NOT NULL,' '"name" TEXT NOT NULL,' '"item_name" TEXT,' '"class" TEXT NOT NULL,' '"slot" TEXT,' '"quality" TEXT NOT NULL,' '"tool_type" TEXT,' '"min_ilevel" INTEGER,' '"max_ilevel" INTEGER,' '"baseitem" INTEGER,' '"holiday_restriction" TEXT,' '"has_string_attribute" INTEGER,' '"propername" INTEGER' ')' ) dbc.execute('CREATE TABLE "new_tf2idb_particles" ("id" INTEGER PRIMARY KEY NOT NULL , "name" TEXT NOT NULL )') dbc.execute('CREATE TABLE "new_tf2idb_equip_conflicts" ("name" TEXT NOT NULL , "region" TEXT NOT NULL , PRIMARY KEY ("name", "region"))') dbc.execute('CREATE TABLE "new_tf2idb_equip_regions" ("id" INTEGER NOT NULL , "region" TEXT NOT NULL , PRIMARY KEY ("id", "region"))') dbc.execute('CREATE TABLE "new_tf2idb_capabilities" ("id" INTEGER NOT NULL , "capability" TEXT NOT NULL )') dbc.execute('CREATE TABLE "new_tf2idb_attributes" (' '"id" INTEGER PRIMARY KEY NOT NULL,' '"name" TEXT NOT NULL,' '"attribute_class" TEXT,' '"attribute_type" TEXT,' '"description_string" TEXT,' '"description_format" TEXT,' '"effect_type" TEXT,' '"hidden" INTEGER,' '"stored_as_integer" INTEGER,' '"armory_desc" TEXT,' '"is_set_bonus" INTEGER,' '"is_user_generated" INTEGER,' '"can_affect_recipe_component_name" INTEGER,' '"apply_tag_to_item_definition" TEXT' ')' ) dbc.execute('CREATE TABLE "new_tf2idb_qualities" ("name" TEXT PRIMARY KEY NOT NULL , "value" INTEGER NOT NULL )') nonce = int(time()) dbc.execute('CREATE INDEX "tf2idb_item_attributes_%i" ON "new_tf2idb_item_attributes" ("attribute" ASC)' % nonce) dbc.execute('CREATE INDEX "tf2idb_class_%i" ON "new_tf2idb_class" ("class" ASC)' % nonce) dbc.execute('CREATE INDEX "tf2idb_item_%i" ON "new_tf2idb_item" ("slot" ASC)' % nonce) # qualities for qname,qdata in data['qualities'].items(): dbc.execute('INSERT INTO new_tf2idb_qualities (name, value) VALUES (?,?)', (qname, qdata['value'])) # particles for particle_type,particle_list in data['attribute_controlled_attached_particles'].items(): for k,v in particle_list.items(): dbc.execute('INSERT INTO new_tf2idb_particles (id,name) VALUES (?,?)', (k, v['system']) ) #TODO add the other fields too # attributes attribute_type = {} for k,v in data['attributes'].items(): at = v.get('attribute_type') if at: atype = at else: if v.get('stored_as_integer'): atype = 'integer' else: atype = 'float' attribute_type[v['name'].lower()] = (k, atype) dbc.execute('INSERT INTO new_tf2idb_attributes ' '(id,name,attribute_class,attribute_type,description_string,description_format,effect_type,hidden,stored_as_integer,armory_desc,is_set_bonus,' 'is_user_generated,can_affect_recipe_component_name,apply_tag_to_item_definition) ' 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)', (k,v.get('name'),v.get('attribute_class'),v.get('attribute_type'),v.get('description_string'),v.get('description_format'), v.get('effect_type'),v.get('hidden'),v.get('stored_as_integer'),v.get('armory_desc'),v.get('is_set_bonus'), v.get('is_user_generated'),v.get('can_affect_recipe_component_name'),v.get('apply_tag_to_item_definition') ) ) # conflicts for k,v in data['equip_conflicts'].items(): for region in v.keys(): dbc.execute('INSERT INTO new_tf2idb_equip_conflicts (name,region) VALUES (?,?)', (k, region)) # items for id,v in data['items'].items(): if id == 'default': continue i, prefabs_used = resolve_prefabs(v, data['prefabs']) baseitem = 'baseitem' in i try: tool = None if 'tool' in i: tool = i['tool'].get('type') has_string_attribute = False if 'static_attrs' in i: for name,value in i['static_attrs'].items(): aid,atype = attribute_type[name.lower()] if atype == 'string': has_string_attribute = True dbc.execute('INSERT INTO new_tf2idb_item_attributes (id,attribute,value,static) VALUES (?,?,?,?)', (id,aid,value,1)) if 'attributes' in i: for name,info in i['attributes'].items(): aid,atype = attribute_type[name.lower()] if atype == 'string': has_string_attribute = True dbc.execute('INSERT INTO new_tf2idb_item_attributes (id,attribute,value,static) VALUES (?,?,?,?)', (id,aid,info['value'],0)) dbc.execute('INSERT INTO new_tf2idb_item ' '(id,name,item_name,class,slot,quality,tool_type,min_ilevel,max_ilevel,baseitem,holiday_restriction,has_string_attribute,propername) ' 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)', (id,i['name'],i.get('item_name'),i['item_class'],i.get('item_slot'),i.get('item_quality', ''), tool, i.get('min_ilevel'), i.get('max_ilevel'),baseitem, i.get('holiday_restriction'), has_string_attribute, i.get('propername')) ) if 'used_by_classes' in i: for prof, val in i['used_by_classes'].items(): dbc.execute('INSERT INTO new_tf2idb_class (id,class,slot) VALUES (?,?,?)', (id, prof.lower(), val if val != '1' else None)) region_field = i.get('equip_region') or i.get('equip_regions') if region_field: if type(region_field) is str: region_field = {region_field: 1} for region in region_field.keys(): dbc.execute('INSERT INTO new_tf2idb_equip_regions (id,region) VALUES (?,?)', (id, region)) # capabilties for capability,val in i.get('capabilities', {}).items(): dbc.execute('INSERT INTO new_tf2idb_capabilities (id,capability) VALUES (?,?)', (id, (capability if val != '0' else '!'+capability))) except: traceback.print_exc() print(id) raise def replace_table(name): dbc.execute('DROP TABLE IF EXISTS %s' % name) dbc.execute('ALTER TABLE new_%s RENAME TO %s' % (name,name)) replace_table('tf2idb_class') replace_table('tf2idb_item_attributes') replace_table('tf2idb_item') replace_table('tf2idb_particles') replace_table('tf2idb_equip_conflicts') replace_table('tf2idb_equip_regions') replace_table('tf2idb_capabilities') replace_table('tf2idb_attributes') replace_table('tf2idb_qualities') db.commit() dbc.execute('VACUUM') class Manager: servers = [] max_lines = 4 current_server = None server_choices = [] def __init__(self, servers_file = "./servers.txt"): self.load_servers(servers_file) def select_server(self, choice): for server in self.servers: if server.select(choice): self.current_server = server o.debug("Selected: {}".format(self.current_server)) break if not self.current_server: o.error("Cannot choice server! Choice is invalid!") sys.exit(1) def load_servers(self, server_list_path): o.debug("Load servers file: {}".format(server_list_path)) ################################################################################ if check_file(server_list_path): with open(server_list_path, "r") as f_servers: readed_lines = 0 line = f_servers.readline() while line: if line and not line[0] == "#": if readed_lines == self.max_lines: readed_lines = 0 line = line.split("\n")[0] if not readed_lines: name = line elif readed_lines == 1: ip, port = line.split(":") elif readed_lines == 2: rcon_password = line elif readed_lines == 3: root_directory = line self.servers.append(Server(ip, port, rcon_password, name, root_directory)) ################################################################ readed_lines += 1 line = f_servers.readline() ############################################################################ if not self.servers: o.error("Servers not found! Insert server in {}".format(server_list_path)) sys.exit(2) else: o.error("Cannot find server file. App create him. Insert values!") with open(server_list_path, "w") as f_servers: f_servers.write(""" #////////////////////// #//PLACE HERE #////////////////////// #//ServerName #//IP:PORT #//RCON PASSWORD #//tf root folder #... """) sys.exit(1) ################################################################################ def execute(self, func, *args): """if self.current_server: getattr(self.current_server, func, None)(*args) else: for server in self.servers: for choice in self.server_choices: if server.choice(choice): getattr(server, func, None)(*args) break """ if self.current_server: getattr(self.current_server, func, None)(*args) elif self.server_choices: for server in self.servers: for choice in self.server_choices: if server.select(choice): getattr(server, func, None)(*args) break else: for server in self.servers: getattr(server, func, None)(*args) class Server: obj = None def __init__(self, ip, port, rcon_pass, name, root_directory): self.address = (ip, int(port)) self.password = rcon_pass self.name = name self.o = new_o(name) self.root = root_directory o.debug(self) def __str__(self): return "{:^25} on {}".format(self.name, self.address) def ln_vpk_files(self, vpk_directory): vpk_files = glob(f"{self.root}/*.vpk") vpk_directory_files = glob(f"{vpk_directory}/*.vpk") for server_vpk_file in vpk_files: for other_vpk_file in vpk_directory_files: if server_vpk_file.split("/")[-1] == other_vpk_file.split("/")[-1]: if hashfile(server_vpk_file) == hashfile(other_vpk_file): print(f"Create symbolic link from {other_vpk_file} to {server_vpk_file}") if self.wait_input(): remove_file(server_vpk_file) create_symbolic_file(other_vpk_file, server_vpk_file) return True def add_plugin(self, plugin_path, need_reloads_plugins = True): plugins_list = glob(f"{self.root}/addons/sourcemod/plugins/*.smx") + glob(f"{self.root}/addons/sourcemod/plugins/*/*.smx") plugin_file = plugin_path.split("/")[-1] for plugin in plugins_list: if plugin.split("/")[-1] == plugin_file: self.o.info("Plugin already added!") return True ############################################ new_plugin_path = f"{self.root}/addons/sourcemod/plugins/{plugin_file}" self.o.info(f"Add new plugins ({plugin_path}) to ({new_plugin_path})?") if self.wait_input(): shutil.copyfile(plugin_path, new_plugin_path) if need_reloads_plugins: self.o.info("Send sm plugins refresh command...", end = "\t") self.rcon("sm plugins refresh", hide_server_name=True) return True def remove_plugin(self, plugin_name, need_reloads_plugins = True): plugins_list = glob(f"{self.root}/addons/sourcemod/plugins/*.smx") + glob(f"{self.root}/addons/sourcemod/plugins/*/*.smx") for plugin in plugins_list: if plugin.split("/")[-1] == plugin_name + ".smx": self.o.info(f"Remove plugin: {plugin}?") if self.wait_input(): remove_file(plugin) if need_reloads_plugins: self.o.info("Send sm plugins refresh command...", end = "\t") self.rcon("sm plugins refresh", hide_server_name=True) return True def clear_maps(self, exclude = "itemtest.bsp"): map_list = glob(f"{self.root}/maps/*.bsp") self.o.info(f"Delete {len(map_list) - 1} maps?") if self.wait_input(): for map_path in map_list: if exclude in map_path: continue remove_file(map_path) return True def upgrade_metamod(self, metamod_arch): from tarfile import open from time import time import os backup_directory = os.path.join(self.root, "addons_backup") if not os.path.exists(backup_directory): print("Backup dir not created, create: ", backup_directory) os.mkdir(backup_directory) class File: def __init__(self, root, directory, file, new_root = None) -> None: self.root = root self.directory = directory self.file = file self.new_root = new_root def __str__(self) -> str: return str(self.internalPath) def __repr__(self) -> str: return self.__str__() @property def fullPath(self): #self.directory.replace(self.root, "." if self.new_root == None else self.new_root) return os.path.join(self.directory if self.new_root == None else self.directory.replace(self.root, "." if self.new_root == None else self.new_root), self.file) @property def internalPath(self): return os.path.join(self.directory.replace(self.root, "."), self.file) def copyPath(self, root):########TODO return File(root, self.directory, self.file) class UpgradedFile: config_exts = [".ini", ".cfg"] ignore_files = ["basevotes.smx", "admin-flatfile.smx"] def __init__(self, new, old, equals_path = True) -> None: self.new : File = new self.old : File|None = old self.equals_path = equals_path @property def isUpgrade(self): return self.old != None @property def isConfig(self): return "."+os.path.split(self.new.fullPath)[-1].split(".")[-1] in self.config_exts @property def isIgnore(self): return os.path.split(self.new.fullPath)[-1] in self.ignore_files def __str__(self): return f"[{'~' if self.isUpgrade else '+'}] [{'P' if self.equals_path else 'F'}] {self.new.fullPath} -> {self.old.fullPath if self.isUpgrade else 'new'}" def __repr__(self) -> str: return self.__str__() def Process(self, game_root, check = True): source = self.new.fullPath dest = self.old.fullPath if self.old != None else File(self.new.root, self.new.directory, self.new.file, game_root).fullPath if os.path.exists(dest): print("[-]", dest) if not check: os.remove(dest) print("[+]", source, "->", dest) dir_check = "/".join(os.path.split(dest)[:-1]) if not os.path.exists(dir_check): print("[!] dest directory is not exist, create: ", dir_check) os.makedirs(dir_check, exist_ok = True) if not check: shutil.copy2(source, dest) print() #extract sourcemod/metamod arch_directory = f"/tmp/mm.arch.{time()}" with open(metamod_arch) as arch: os.mkdir(arch_directory) arch.extractall(arch_directory) #new files new_filelist = [] for (dirpath, dirnames, filenames) in os.walk(arch_directory): #print(dirpath, dirnames, filenames) for file in filenames: new_filelist.append(File(arch_directory, dirpath, file)) print(new_filelist[0]) #old files old_filelist = [] for (dirpath, dirnames, filenames) in os.walk(os.path.join(self.root, "addons")): #print(dirpath, dirnames, filenames) for file in filenames: old_filelist.append(File(self.root, dirpath, file)) print(old_filelist[0]) class FilesToUpgrade: def __init__(self): self.files_to_upgrade = [] def append(self, el): self.files_to_upgrade.append(el) def Upgraded(self): return [f for f in self.files_to_upgrade if f.isUpgrade and not f.isConfig and not f.isIgnore] def Appended(self): return [f for f in self.files_to_upgrade if not f.isUpgrade and not f.isConfig and not f.isIgnore] def Configs(self): return [f for f in self.files_to_upgrade if f.isConfig] def PrintListing(self): print("Files to upgrade") for f in self.Upgraded(): print(f) print("Files to append") for f in self.Appended(): print(f) print("Updated config files") for f in self.Configs(): print(f) def BackupUpgradedFiles(self, path = f"/tmp/smmmbackup.{time()}.zip"): print("Backup files to:", path) import zipfile zf = zipfile.ZipFile(path, "w") for f in self.Upgraded(): zf.write(f.old.fullPath) zf.close() def Process(self, gameroot, check = True): print(f"Processing {'check to upgrade' if check else 'upgrade'}...") for f in self.Upgraded(): f.Process(gameroot, check) print(f"Processing {'check to append' if check else 'append'}...") for f in self.Appended(): f.Process(gameroot, check) print("Config files dont touched!") #collect print("") i = 0 files_to_upgrade = FilesToUpgrade() for nf in new_filelist: print(f"[{i}/{len(new_filelist)}] search...", end="\r") founded = False for of in old_filelist: if nf.internalPath == of.internalPath: files_to_upgrade.append(UpgradedFile(nf, of)) founded = True break if not founded: for of in old_filelist: if os.path.split(nf.internalPath)[-1] == os.path.split(of.internalPath)[-1]: if os.path.split(nf.internalPath)[0][-2:] != "64": #print(os.path.split(nf.internalPath), os.path.split(of.internalPath)) files_to_upgrade.append(UpgradedFile(nf, of, False)) founded = True break if not founded: files_to_upgrade.append(UpgradedFile(nf, None)) i+=1 #show file list files_to_upgrade.PrintListing() files_to_upgrade.Process(self.root, check=True) if self.wait_input(): files_to_upgrade.BackupUpgradedFiles(os.path.join(backup_directory, f"{time()}.zip")) files_to_upgrade.Process(self.root, check=False) #remove unzip archive from shutil import rmtree rmtree(arch_directory) def clear_download_cache(self): cache_files = glob(f"{self.root}/download/user_custom/*/*.dat") self.o.info(f"Delete {len(cache_files)} cache files?") if not self.wait_input(): return True count = 0 for cache_file in cache_files: try: remove_file(cache_file) count += 1 except Exception as delete_error: print(cache_file, delete_error) sys.exit(1) self.o.info(f"Deleted {count} cache files") return True def clear_logs(self): touch_if_date_lt = 60 * 60 * 24 default_logs_place = f"{self.root}/logs/*.log" sourcemod_logs_place = f"{self.root}/addons/sourcemod/logs/*.log" lgsm_logs_place = f"{self.root}/../../log/console/*.log" ###search default logs try: default_logs = [f for f in glob(default_logs_place) if time() - os.path.getmtime(f) > touch_if_date_lt] self.o.info(f"Found {len(default_logs)} logs in default directory") except: default_logs = [] self.o.error(f"Cannot find logs in {default_logs_place}") ###search sourcemod logs try: sourcemod_logs = [f for f in glob(sourcemod_logs_place) if time() - os.path.getmtime(f) > touch_if_date_lt] self.o.info(f"Found {len(sourcemod_logs)} logs in sourcemod directory") except: sourcemod_logs = [] self.o.error(f"Cannot find logs in {sourcemod_logs_place}") ###search lgsm logs try: lgsm_logs = [f for f in glob(lgsm_logs_place) if time() - os.path.getmtime(f) > touch_if_date_lt] self.o.info(f"Found {len(lgsm_logs)} logs in lgsm directory") except: lgsm_logs = [] self.o.error(f"Cannot find logs in {lgsm_logs_place}") logs_files = default_logs + sourcemod_logs + lgsm_logs self.o.info(f"Compress {len(logs_files)} logs files and delete?") if not self.wait_input(): return True ###compress stage archive_name = f"dl_{time()}" if default_logs: self.o.info(f"Compress {default_logs_place}") compress_files(default_logs, default_logs_place.replace("/*.log",""), archive_name) if sourcemod_logs: self.o.info(f"Compress {sourcemod_logs_place}") compress_files(sourcemod_logs, sourcemod_logs_place.replace("/*.log",""), archive_name) if lgsm_logs: self.o.info(f"Compress {lgsm_logs_place}") compress_files(lgsm_logs, lgsm_logs_place.replace("/*.log",""), archive_name) ###delete stage count = 0 for log_file in logs_files: try: remove_file(log_file) count += 1 except Exception as delete_error: print(log_file, delete_error) sys.exit(1) self.o.info(f"Deleted {count} logs files") return True def upgrade_plugin(self, fresh_path, need_reloads_plugins = True): new_hash = hashfile(fresh_path) fresh_plugin_name = fresh_path.split("/")[-1:][0] plugins_list = glob(f"{self.root}/addons/sourcemod/plugins/*.smx") + glob(f"{self.root}/addons/sourcemod/plugins/*/*.smx") for plugin_path in plugins_list: if plugin_path.split("/")[-1:][0] == fresh_plugin_name: old_hash = hashfile(plugin_path) if old_hash == new_hash: self.o.info(f"Plugin {fresh_plugin_name} currenty updated!") return False else: self.o.info(f"Upgrade plugin {fresh_plugin_name}...") self.o.info(f"copy {fresh_path} to {plugin_path}") shutil.copyfile(fresh_path, plugin_path) if need_reloads_plugins: self.o.info("Send sm plugins refresh command...", end = "\t") self.rcon("sm plugins refresh", hide_server_name=True) return True self.o.info("Upgraded plugin not found in server...") return False def update_tf2idb(self): from sqlite3 import OperationalError self.o.info("Updating TF2IDB") try: tf2idb(self.root, "scripts/items/items_game.txt", "addons/sourcemod/data/sqlite/tf2idb.sq3") except OperationalError: self.o.error("Cannot find database file") def wait_input(self, yes = "y", no = "n"): if ALL_YES: return True while True: print("Enter \"{}\" to accept or \"{}\" to negative response:".format(yes, no) ,end="\t") response = input() if response.lower() == yes: return True if response.lower() == no: return False def select(self, request): if self.name == request: return True elif self.address == request: return True elif str(self.address[1]) == request: return True elif request in self.root: return True else: return False def status(self): player_count, max_count, ping = self.count_players() if player_count < 0: print("{} | Not responsed(((".format(self)) else: print("{} | {}/{} players | {} ms".format(self, player_count, max_count, ping)) @property def map_config(self): map = self.current_map() if "/" in map: sub_dir, map = map.split("/") configs_directory = f"{self.root}/cfg/{sub_dir}" else: configs_directory = f"{self.root}/cfg" if not check_dir(configs_directory): os.mkdir(configs_directory) return f"{configs_directory}/{map}.cfg" def check2map_config(self, cfg_line): config = self.map_config try: with open(config, "r") as config_file: for line in config_file: line = line.replace("\n", "") if cfg_line == line: return True return False except IOError: return False def remove2map_config(self, line): config = self.map_config if not self.check2map_config(line): print(f"{self} | Line not currently in: {config}") buffer = [] with open(config, "r") as config_file: pass def add2map_config(self, line): config = self.map_config if self.check2map_config(line): print(f"{self} | Line currently in: {config}") return False with open(config, "a") as config_file: config_file.write(f"{line}\n") if self.check2map_config(line): print(f"{self} | Line: {line}\nAppend to: {config}") return True print(f"{self} | Line: {line}\nNot append in: {config}") return False def executemap_config(self): config = self.map() if config: self.rcon(f"sm_execcfg {config}.cfg", hide_server_name=True) def map(self): current_map = self.current_map() if current_map: print("{} | {}".format(self, current_map)) else: print("{} | {}".format(self, "not running")) def current_map(self): try: with RCON(self.address, self.password) as rcon: rcon_response = rcon.execute("status") return rcon_response.body.decode("utf8","ignore").split("\n")[5].split(": ")[1].split(" ")[0] except: traceback.print_exc() return None def count_players(self): try: ping = 0.0 try: start_time = time() server = a2s.info(tuple(self.address)) count, max_count = server.player_count, server.max_players ping = server.ping except NameError: ping = time() - start_time except ValueError: return -1, -1, -1 return int(count), int(max_count), round(ping, 3) * 1000 except: #traceback.print_exc() return -1, -1, -1 def net_status_json(self): rcon_response = "" json_result = {} try: with RCON(self.address, self.password) as rcon: rcon_response = rcon.execute("net_status") response = rcon_response.body.decode("utf8","ignore") response = response.split("\n") ##################################################### gamemode, server_type, connections = response[1].split(": ")[1].split(",") connections = int(connections.split()[0]) ##################################################### client_port, server_port, hltv_port, matchmaking_port, systemlink_port, lan_port = response[2].split(": ")[1].split(",") client_port = int(client_port.split()[1]) server_port = int(server_port.split()[1]) hltv_port = int(hltv_port.split()[1]) ##################################################### try: latency_avg_out, latency_avg_in = response[3].split(": ")[1].split(",") latency_avg_out = float(latency_avg_out.split()[2][:-1]) latency_avg_in = float(latency_avg_in.split()[1][:-1]) except IndexError: print(dumps({ "config":{ "gamemode":gamemode, "server_type":server_type, "connections":connections }, "ports":{ "client":client_port, "server":server_port, "hltv":hltv_port }, "latency":{ "avg_out":0.0, "avg_in":0.0 }, "loss":{ "avg_out":0.0, "avg_in":0.0 }, "packets":{ "total":{ "out":0, "in":0 }, "client":{ "out":0, "in":0 } }, "data":{ "total":{ "out":0.0, "in":0.0 }, "client":{ "out":0.0, "in":0.0 } } })) return ##################################################### loss_avg_out, loss_avg_in = response[4].split(": ")[1].split(",") loss_avg_out = float(loss_avg_out.split()[2]) loss_avg_in = float(loss_avg_in.split()[1]) ##################################################### packets_total_out, packets_total_in = response[5].split(": ")[1].split(",") packets_total_out = float(packets_total_out.split()[3][:-2]) packets_total_in = float(packets_total_in.split()[1][:-2]) ##################################################### packets_per_client_out, packets_per_client_in = response[6].split(",") packets_per_client_out = float(packets_per_client_out.split()[3][:-2]) packets_per_client_in = float(packets_per_client_in.split()[1][:-2]) ##################################################### data_total_out = response[7].split(": ")[1].split(",")[0].split() if len(data_total_out) > 4: if data_total_out[4] == "kB/s": data_total_out = float(data_total_out[3]) * 1024 elif data_total_out[4] == "MB/s": data_total_out = float(data_total_out[3]) * 1024 * 1024 else: data_total_out = float(data_total_out[3]) # data_total_in = response[7].split(": ")[1].split(",")[1].split() if len(data_total_in) > 2: if data_total_in[2] == "kB/s": data_total_in = float(data_total_in[1]) * 1024 elif data_total_in[2] == "MB/s": data_total_in = float(data_total_in[1]) * 1024 * 1024 else: data_total_in = float(data_total_in[1]) ##################################################### data_per_client_out = response[8].split(",")[0].split() if len(data_per_client_out) > 4: if data_per_client_out[4] == "kB/s": data_per_client_out = float(data_per_client_out[3]) * 1024 elif data_per_client_out[4] == "MB/s": data_per_client_out = float(data_per_client_out[3]) * 1024 * 1024 else: data_per_client_out = float(data_per_client_out[3]) # data_per_client_in = response[8].split(",")[1].split() if len(data_per_client_in) > 2: if data_per_client_in[2] == "kB/s": data_per_client_in = float(data_per_client_in[1]) * 1024 elif data_per_client_in[2] == "MB/s": data_per_client_in = float(data_per_client_in[1]) * 1024 * 1024 else: data_per_client_in = float(data_per_client_in[1]) ###################################################### json_result = { "config":{ "gamemode":gamemode, "server_type":server_type, "connections":connections }, "ports":{ "client":client_port, "server":server_port, "hltv":hltv_port }, "latency":{ "avg_out":latency_avg_out, "avg_in":latency_avg_in }, "loss":{ "avg_out":loss_avg_out, "avg_in":loss_avg_in }, "packets":{ "total":{ "out":packets_total_out, "in":packets_total_in }, "client":{ "out":packets_per_client_out, "in":packets_per_client_in } }, "data":{ "total":{ "out":data_total_out, "in":data_total_in }, "client":{ "out":data_per_client_out, "in":data_per_client_in } } } #################################################### print(dumps(json_result)) except Exception as rcon_error: traceback.print_exc() print(rcon_error) def rcon(self, command, result = False, hide_server_name = False): if not hide_server_name: self.o.info("{obj.name:^25}: ".format(obj = self), end="\t") rcon_response = "" try: with RCON(self.address, self.password) as rcon: rcon_response = rcon.execute(command) response = rcon_response.body.decode("utf8","ignore") if not result: print(response if response else "ok") else: return response except Exception as rcon_error: rcon_response = "Rcon execute error: {}".format(rcon_error) print(rcon_response) def show_directory(self, path): full_path = "{}/{}".format(self.root, path) print("Current directory: {}".format(full_path)) if check_dir(full_path): SCall("ls -all {}".format(full_path).split()) else: print("Directory does not exist") def copy_file(self, source_file, path): full_path = "{}/{}".format(self.root, path) print("Copy in > {}".format(full_path)) new_file = source_file.split("/")[-1:][0] if check_dir(full_path): SCall("cp {0} {1}/{2}".format(source_file, full_path, new_file).split()) else: print("Destonation directory doesn't exists!") def symbolic_file(self, source_file, path): full_path = "{}/{}".format(self.root, path) print("Create link in > {}".format(full_path)) new_file = source_file.split("/")[-1:][0] if not "/" in source_file: source_file = os.getcwd() + "/" + source_file destonation_file = "{}/{}".format(full_path, new_file) if check_file(destonation_file): print("Destonation file is current created, override him?") if self.wait_input(): remove_file("{}/{}".format(full_path, new_file)) else: print("Abort operation!") return if check_dir(full_path): cmd = "ln -s {0} {1}/{2}".format(source_file, full_path, new_file) print("Execute: ", cmd) SCall(cmd.split()) else: print("Destonation directory doesn't exists!") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--serverslist", help="Path to servers list", default = "./servers.txt", type = str) parser.add_argument("--rcon", "-r", help = "Command to execute", type = str, nargs="+", default = "") parser.add_argument("--choice", "-c", help = "Choice server, aka: part name in directory", type = str, default = ["global.choice"], nargs = "+") parser.add_argument("--status", "-s", help = "Show current number players on server", default = False, action = "store_true") parser.add_argument("--yes", "-y", help = "Say YES to all response", default = False, action = "store_true") parser.add_argument("--netstatus","-ns", help = "Show json net_status", default = False, action = "store_true") parser.add_argument("--map","-m", help = "Show map on server", default = False, action = "store_true") parser.add_argument("--add_map_config", "-amc", help = "Add line to config map", default = "", type = str, nargs="+") parser.add_argument("--execute_map_config", default=False, action="store_true") ################################################################################################################################################ parser.add_argument("--CopyFile", "-cp", help = "Path of file to copy in root directory\nNeed second argument: --DestDir", type = str, default = "") parser.add_argument("--SymLinkFile", "-ln", help = "Path of file to create symbolic link in root directory\nNeed second argument: --DestDir", type = str, default = "") parser.add_argument("--DestDir", "-dd", help = "Destonation directory, aka: addons/sourcemod/plugins", type = str, default = "/") parser.add_argument("--ShowDir", "-ls", help = "Show ls command on dest directory", default = False, action = "store_true") ################################################################################################################################################ parser.add_argument("--AddPlugin", "-ap", help = "Path to new plugin.", type = str, default = "") parser.add_argument("--RemovePlugin", "-rp", help = "Name plugin to remove. Names must match.", type = str, default = "") parser.add_argument("--UpgradePlugin", "-upg", help = "Path of file to uprade. Names must match.", type = str, default = "") parser.add_argument("--NoReloadPlugins", "-nrp", help = "Upgrade plugins without send sm plugins refresh command", default = False, action = "store_true") ################################################################################################################################################ parser.add_argument("--DeleteDownloadCache", "-ddc", help = "Clear download cache", default = False, action = "store_true") parser.add_argument("--DeleteUnusedMaps", "-dum", help = "Delete maps from maps folder", default = False, action = "store_true") parser.add_argument("--DeleteLogsFiles", "--dlf", help = "Delete logs file if older 1 day", default = False, action = "store_true") ################################################################################################################################################ parser.add_argument("--UpdateTF2IDB", help = "Update tf2idb database", default = False, action = "store_true") ################################################################################################################################################ parser.add_argument("--UpgradeMetaMod", help = "Upgrade current version of metamod", type = str, default="") ################################################################################################################################################ args = parser.parse_args() ALL_YES = 1 if args.yes else 0 ################################## manager = Manager(args.serverslist) #if len(args.choice) > 0 and args.choice[0] == "global.choice": if not args.choice == ["global.choice"]: if len(args.choice) == 1: manager.select_server(args.choice[0]) else: manager.server_choices = args.choice ################################## if args.rcon: command = "" for word in args.rcon: command += word + " " command = command[:-1] manager.execute("rcon", command) sys.exit(0) ################################## if args.add_map_config: append_line = "" for word in args.add_map_config: append_line += word + " " append_line = append_line[:-1] manager.execute("add2map_config", append_line) sys.exit(0) if args.execute_map_config: manager.execute("executemap_config") sys.exit(0) ################################## if args.netstatus: manager.execute("net_status_json") sys.exit(0) ################################## if args.status: manager.execute("status") sys.exit(0) ################################## if args.map: manager.execute("map") sys.exit(0) ################################## if args.ShowDir: if args.DestDir: manager.execute("show_directory", args.DestDir) sys.exit(0) else: o.error("Need --DestDir argument!") sys.exit(1) ################################## if args.CopyFile and args.DestDir: if check_file(args.CopyFile): manager.execute("copy_file", args.CopyFile, args.DestDir) sys.exit(0) else: o.error("Invalid path of source file!") sys.exit(1) ################################## if args.SymLinkFile and args.DestDir: if check_file(args.SymLinkFile): manager.execute("symbolic_file", args.SymLinkFile, args.DestDir) sys.exit(0) else: o.error("Invalid path of source file!") sys.exit(1) ################################### if args.AddPlugin: if check_file(args.AddPlugin): manager.execute("add_plugin", args.AddPlugin, not args.NoReloadPlugins) sys.exit(0) else: o.error("Invalid path of upgraded plugin!") sys.exit(1) ################################### if args.UpgradePlugin: if check_file(args.UpgradePlugin): manager.execute("upgrade_plugin", args.UpgradePlugin, not args.NoReloadPlugins) #manager.execute("rcon", "sm plugins refresh") sys.exit(0) else: o.error("Invalid path of upgraded plugin!") sys.exit(1) ################################### if args.RemovePlugin: if True: manager.execute("remove_plugin", args.RemovePlugin, not args.NoReloadPlugins) #manager.execute("rcon", "sm plugins refresh") sys.exit(0) else: o.error("Invalid name of remove plugin!") sys.exit(1) ################################### if args.DeleteDownloadCache: manager.execute("clear_download_cache") sys.exit(0) ################################### if args.DeleteUnusedMaps: manager.execute("clear_maps") sys.exit(0) ################################### if args.UpdateTF2IDB: manager.execute("update_tf2idb") sys.exit(0) ################################### if args.DeleteLogsFiles: manager.execute("clear_logs") sys.exit(0) ################################### if args.UpgradeMetaMod: manager.execute("upgrade_metamod", args.UpgradeMetaMod) sys.exit(0)