You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1261 lines
42 KiB

#!/usr/bin/python3.6
VERSION = 1.0
#SYS IMPORTS
from genericpath import exists, isfile
from logging import root
import sys
import os
import types
import hashlib
import shutil
from os.path import isfile as check_file
from os.path import isdir as check_dir
from os import remove as remove_file
from glob import glob
from json import dumps
import argparse
from subprocess import call as SCall
from time import time
FNULL = open(os.devnull,'w')
#VOLVO
import a2s
from valve.rcon import RCON
from valve.rcon import log as RCON_LOGGING
RCON_LOGGING.setLevel(40)
import traceback
DEBUG = 0
ALL_YES = 0
class colors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class new_o:
def __init__(self, server_name):
self.prefix = "[{:^25}]".format(server_name)
def info(self, text, end = "\n"):
print(self.prefix + "[ {}INFO{}]{}".format(colors.OKBLUE, colors.ENDC, text), end = end)
return True
def error(self, text):
print(self.prefix + "[{}ERROR{}]{}".format(colors.FAIL, colors.ENDC, text))
return True
def warning(self, text):
print(self.prefix + "[ {}WARN{}]{}".format(colors.WARNING, colors.ENDC, text))
return True
def debug(self, text):
if DEBUG:
print(self.prefix + "[{}DEBUG{}]{}".format(colors.HEADER, colors.ENDC, text))
return True
class o:
def info(text):
print("[ {}INFO{}]{}".format(colors.OKBLUE, colors.ENDC, text))
return True
def error(text):
print("[{}ERROR{}]{}".format(colors.FAIL, colors.ENDC, text))
return True
def warning(text):
print("[ {}WARN{}]{}".format(colors.WARNING, colors.ENDC, text))
return True
def debug(text):
if DEBUG:
print("[{}DEBUG{}]{}".format(colors.HEADER, colors.ENDC, text))
return True
def hashfile(filepath, blocksize = 65536):
file_hash = hashlib.sha256()
with open(filepath, "rb") as f:
fb = f.read(blocksize)
while len(fb) > 0:
file_hash.update(fb)
fb = f.read(blocksize)
return file_hash.hexdigest()
def create_symbolic_file(source_file, symbolic_file):
cmd = f"ln -s {source_file} {symbolic_file}"
print(f"Execute system cmd: {cmd}")
SCall(cmd.split())
def compress_files(files, place, archive_name = None):
if archive_name is None:
archive_name = f"{time()}.tar.gz"
else:
archive_name += ".tar.gz"
import tarfile
with tarfile.open(f"{place}/{archive_name}", "w:gz") as tar:
for file in files:
tar.add(file)
#cmd = f"find app -name {files} | tar cvzf {place}/{archive_name} -T -"
#print(f"Execute system cmd: {cmd}")
#SCall(cmd.split())
class tf2idb:
def __init__(self, TF_FOLDER, ITEMS_GAME, DB_FILE):
import vdf
import sqlite3
import collections
import copy
def dict_merge(dct, merge_dct):
""" Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
updating only top-level keys, dict_merge recurses down into dicts nested
to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
``dct``.
:param dct: dict onto which the merge is executed
:param merge_dct: dct merged into dct
:return: None
"""
for k, v in merge_dct.items():
if (k == 'used_by_classes' or k == 'model_player_per_class'): #handles Demoman vs demoman... Valve pls
v = dict((k2.lower(), v2) for k2, v2 in v.items())
if (k in dct and isinstance(dct[k], dict) and isinstance(v, collections.abc.Mapping)):
dict_merge(dct[k], v)
else:
dct[k] = copy.deepcopy(v)
def resolve_prefabs(item, prefabs):
# generate list of prefabs
prefab_list = item.get('prefab', '').split()
for prefab in prefab_list:
subprefabs = prefabs[prefab].get('prefab', '').split()
prefab_list.extend(p for p in subprefabs if p not in prefab_list)
# iterate over prefab list and merge, nested prefabs first
result = {}
for prefab in ( prefabs[p] for p in reversed(prefab_list) ):
dict_merge(result, prefab)
dict_merge(result, item)
return result, prefab_list
data = None
ITEMS_GAME = TF_FOLDER + "/" + ITEMS_GAME
DB_FILE = TF_FOLDER + "/" + DB_FILE
db = sqlite3.connect(DB_FILE)
dbc = db.cursor()
with open(ITEMS_GAME) as f:
data = vdf.parse(f)
data = data['items_game']
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_class')
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_item_attributes')
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_item')
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_particles')
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_equip_conflicts')
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_equip_regions')
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_capabilities')
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_attributes')
dbc.execute('DROP TABLE IF EXISTS new_tf2idb_qualities')
dbc.execute('CREATE TABLE "new_tf2idb_class" ("id" INTEGER NOT NULL , "class" TEXT NOT NULL , "slot" TEXT , PRIMARY KEY ("id", "class"))')
dbc.execute('CREATE TABLE "new_tf2idb_item_attributes" ('
'"id" INTEGER NOT NULL,'
'"attribute" INTEGER NOT NULL,'
'"value" TEXT NOT NULL,'
'"static" INTEGER,'
'PRIMARY KEY ("id", "attribute")'
')'
)
dbc.execute('CREATE TABLE "new_tf2idb_item" ('
'"id" INTEGER PRIMARY KEY NOT NULL,'
'"name" TEXT NOT NULL,'
'"item_name" TEXT,'
'"class" TEXT NOT NULL,'
'"slot" TEXT,'
'"quality" TEXT NOT NULL,'
'"tool_type" TEXT,'
'"min_ilevel" INTEGER,'
'"max_ilevel" INTEGER,'
'"baseitem" INTEGER,'
'"holiday_restriction" TEXT,'
'"has_string_attribute" INTEGER,'
'"propername" INTEGER'
')'
)
dbc.execute('CREATE TABLE "new_tf2idb_particles" ("id" INTEGER PRIMARY KEY NOT NULL , "name" TEXT NOT NULL )')
dbc.execute('CREATE TABLE "new_tf2idb_equip_conflicts" ("name" TEXT NOT NULL , "region" TEXT NOT NULL , PRIMARY KEY ("name", "region"))')
dbc.execute('CREATE TABLE "new_tf2idb_equip_regions" ("id" INTEGER NOT NULL , "region" TEXT NOT NULL , PRIMARY KEY ("id", "region"))')
dbc.execute('CREATE TABLE "new_tf2idb_capabilities" ("id" INTEGER NOT NULL , "capability" TEXT NOT NULL )')
dbc.execute('CREATE TABLE "new_tf2idb_attributes" ('
'"id" INTEGER PRIMARY KEY NOT NULL,'
'"name" TEXT NOT NULL,'
'"attribute_class" TEXT,'
'"attribute_type" TEXT,'
'"description_string" TEXT,'
'"description_format" TEXT,'
'"effect_type" TEXT,'
'"hidden" INTEGER,'
'"stored_as_integer" INTEGER,'
'"armory_desc" TEXT,'
'"is_set_bonus" INTEGER,'
'"is_user_generated" INTEGER,'
'"can_affect_recipe_component_name" INTEGER,'
'"apply_tag_to_item_definition" TEXT'
')'
)
dbc.execute('CREATE TABLE "new_tf2idb_qualities" ("name" TEXT PRIMARY KEY NOT NULL , "value" INTEGER NOT NULL )')
nonce = int(time())
dbc.execute('CREATE INDEX "tf2idb_item_attributes_%i" ON "new_tf2idb_item_attributes" ("attribute" ASC)' % nonce)
dbc.execute('CREATE INDEX "tf2idb_class_%i" ON "new_tf2idb_class" ("class" ASC)' % nonce)
dbc.execute('CREATE INDEX "tf2idb_item_%i" ON "new_tf2idb_item" ("slot" ASC)' % nonce)
# qualities
for qname,qdata in data['qualities'].items():
dbc.execute('INSERT INTO new_tf2idb_qualities (name, value) VALUES (?,?)', (qname, qdata['value']))
# particles
for particle_type,particle_list in data['attribute_controlled_attached_particles'].items():
for k,v in particle_list.items():
dbc.execute('INSERT INTO new_tf2idb_particles (id,name) VALUES (?,?)', (k, v['system']) ) #TODO add the other fields too
# attributes
attribute_type = {}
for k,v in data['attributes'].items():
at = v.get('attribute_type')
if at:
atype = at
else:
if v.get('stored_as_integer'):
atype = 'integer'
else:
atype = 'float'
attribute_type[v['name'].lower()] = (k, atype)
dbc.execute('INSERT INTO new_tf2idb_attributes '
'(id,name,attribute_class,attribute_type,description_string,description_format,effect_type,hidden,stored_as_integer,armory_desc,is_set_bonus,'
'is_user_generated,can_affect_recipe_component_name,apply_tag_to_item_definition) '
'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)',
(k,v.get('name'),v.get('attribute_class'),v.get('attribute_type'),v.get('description_string'),v.get('description_format'),
v.get('effect_type'),v.get('hidden'),v.get('stored_as_integer'),v.get('armory_desc'),v.get('is_set_bonus'),
v.get('is_user_generated'),v.get('can_affect_recipe_component_name'),v.get('apply_tag_to_item_definition')
)
)
# conflicts
for k,v in data['equip_conflicts'].items():
for region in v.keys():
dbc.execute('INSERT INTO new_tf2idb_equip_conflicts (name,region) VALUES (?,?)', (k, region))
# items
for id,v in data['items'].items():
if id == 'default':
continue
i, prefabs_used = resolve_prefabs(v, data['prefabs'])
baseitem = 'baseitem' in i
try:
tool = None
if 'tool' in i:
tool = i['tool'].get('type')
has_string_attribute = False
if 'static_attrs' in i:
for name,value in i['static_attrs'].items():
aid,atype = attribute_type[name.lower()]
if atype == 'string':
has_string_attribute = True
dbc.execute('INSERT INTO new_tf2idb_item_attributes (id,attribute,value,static) VALUES (?,?,?,?)', (id,aid,value,1))
if 'attributes' in i:
for name,info in i['attributes'].items():
aid,atype = attribute_type[name.lower()]
if atype == 'string':
has_string_attribute = True
dbc.execute('INSERT INTO new_tf2idb_item_attributes (id,attribute,value,static) VALUES (?,?,?,?)', (id,aid,info['value'],0))
dbc.execute('INSERT INTO new_tf2idb_item '
'(id,name,item_name,class,slot,quality,tool_type,min_ilevel,max_ilevel,baseitem,holiday_restriction,has_string_attribute,propername) '
'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)',
(id,i['name'],i.get('item_name'),i['item_class'],i.get('item_slot'),i.get('item_quality', ''), tool, i.get('min_ilevel'), i.get('max_ilevel'),baseitem,
i.get('holiday_restriction'), has_string_attribute, i.get('propername'))
)
if 'used_by_classes' in i:
for prof, val in i['used_by_classes'].items():
dbc.execute('INSERT INTO new_tf2idb_class (id,class,slot) VALUES (?,?,?)', (id, prof.lower(), val if val != '1' else None))
region_field = i.get('equip_region') or i.get('equip_regions')
if region_field:
if type(region_field) is str:
region_field = {region_field: 1}
for region in region_field.keys():
dbc.execute('INSERT INTO new_tf2idb_equip_regions (id,region) VALUES (?,?)', (id, region))
# capabilties
for capability,val in i.get('capabilities', {}).items():
dbc.execute('INSERT INTO new_tf2idb_capabilities (id,capability) VALUES (?,?)', (id, (capability if val != '0' else '!'+capability)))
except:
traceback.print_exc()
print(id)
raise
def replace_table(name):
dbc.execute('DROP TABLE IF EXISTS %s' % name)
dbc.execute('ALTER TABLE new_%s RENAME TO %s' % (name,name))
replace_table('tf2idb_class')
replace_table('tf2idb_item_attributes')
replace_table('tf2idb_item')
replace_table('tf2idb_particles')
replace_table('tf2idb_equip_conflicts')
replace_table('tf2idb_equip_regions')
replace_table('tf2idb_capabilities')
replace_table('tf2idb_attributes')
replace_table('tf2idb_qualities')
db.commit()
dbc.execute('VACUUM')
class Manager:
servers = []
max_lines = 4
current_server = None
server_choices = []
def __init__(self, servers_file = "./servers.txt"):
self.load_servers(servers_file)
def select_server(self, choice):
for server in self.servers:
if server.select(choice):
self.current_server = server
o.debug("Selected: {}".format(self.current_server))
break
if not self.current_server:
o.error("Cannot choice server! Choice is invalid!")
sys.exit(1)
def load_servers(self, server_list_path):
o.debug("Load servers file: {}".format(server_list_path))
################################################################################
if check_file(server_list_path):
with open(server_list_path, "r") as f_servers:
readed_lines = 0
line = f_servers.readline()
while line:
if line and not line[0] == "#":
if readed_lines == self.max_lines:
readed_lines = 0
line = line.split("\n")[0]
if not readed_lines:
name = line
elif readed_lines == 1:
ip, port = line.split(":")
elif readed_lines == 2:
rcon_password = line
elif readed_lines == 3:
root_directory = line
self.servers.append(Server(ip, port, rcon_password, name, root_directory))
################################################################
readed_lines += 1
line = f_servers.readline()
############################################################################
if not self.servers:
o.error("Servers not found! Insert server in {}".format(server_list_path))
sys.exit(2)
else:
o.error("Cannot find server file. App create him. Insert values!")
with open(server_list_path, "w") as f_servers:
f_servers.write("""
#//////////////////////
#//PLACE HERE
#//////////////////////
#//ServerName
#//IP:PORT
#//RCON PASSWORD
#//tf root folder
#...
""")
sys.exit(1)
################################################################################
def execute(self, func, *args):
"""if self.current_server:
getattr(self.current_server, func, None)(*args)
else:
for server in self.servers:
for choice in self.server_choices:
if server.choice(choice):
getattr(server, func, None)(*args)
break
"""
if self.current_server:
getattr(self.current_server, func, None)(*args)
elif self.server_choices:
for server in self.servers:
for choice in self.server_choices:
if server.select(choice):
getattr(server, func, None)(*args)
break
else:
for server in self.servers:
getattr(server, func, None)(*args)
class Server:
obj = None
def __init__(self, ip, port, rcon_pass, name, root_directory):
self.address = (ip, int(port))
self.password = rcon_pass
self.name = name
self.o = new_o(name)
self.root = root_directory
o.debug(self)
def __str__(self):
return "{:^25} on {}".format(self.name, self.address)
def ln_vpk_files(self, vpk_directory):
vpk_files = glob(f"{self.root}/*.vpk")
vpk_directory_files = glob(f"{vpk_directory}/*.vpk")
for server_vpk_file in vpk_files:
for other_vpk_file in vpk_directory_files:
if server_vpk_file.split("/")[-1] == other_vpk_file.split("/")[-1]:
if hashfile(server_vpk_file) == hashfile(other_vpk_file):
print(f"Create symbolic link from {other_vpk_file} to {server_vpk_file}")
if self.wait_input():
remove_file(server_vpk_file)
create_symbolic_file(other_vpk_file, server_vpk_file)
return True
def add_plugin(self, plugin_path, need_reloads_plugins = True):
plugins_list = glob(f"{self.root}/addons/sourcemod/plugins/*.smx") + glob(f"{self.root}/addons/sourcemod/plugins/*/*.smx")
plugin_file = plugin_path.split("/")[-1]
for plugin in plugins_list:
if plugin.split("/")[-1] == plugin_file:
self.o.info("Plugin already added!")
return True
############################################
new_plugin_path = f"{self.root}/addons/sourcemod/plugins/{plugin_file}"
self.o.info(f"Add new plugins ({plugin_path}) to ({new_plugin_path})?")
if self.wait_input():
shutil.copyfile(plugin_path, new_plugin_path)
if need_reloads_plugins:
self.o.info("Send sm plugins refresh command...", end = "\t")
self.rcon("sm plugins refresh", hide_server_name=True)
return True
def remove_plugin(self, plugin_name, need_reloads_plugins = True):
plugins_list = glob(f"{self.root}/addons/sourcemod/plugins/*.smx") + glob(f"{self.root}/addons/sourcemod/plugins/*/*.smx")
for plugin in plugins_list:
if plugin.split("/")[-1] == plugin_name + ".smx":
self.o.info(f"Remove plugin: {plugin}?")
if self.wait_input():
remove_file(plugin)
if need_reloads_plugins:
self.o.info("Send sm plugins refresh command...", end = "\t")
self.rcon("sm plugins refresh", hide_server_name=True)
return True
def clear_maps(self, exclude = "itemtest.bsp"):
map_list = glob(f"{self.root}/maps/*.bsp")
self.o.info(f"Delete {len(map_list) - 1} maps?")
if self.wait_input():
for map_path in map_list:
if exclude in map_path:
continue
remove_file(map_path)
return True
def upgrade_metamod(self, metamod_arch):
from tarfile import open
from time import time
import os
backup_directory = os.path.join(self.root, "addons_backup")
if not os.path.exists(backup_directory):
print("Backup dir not created, create: ", backup_directory)
os.mkdir(backup_directory)
class File:
def __init__(self, root, directory, file, new_root = None) -> None:
self.root = root
self.directory = directory
self.file = file
self.new_root = new_root
def __str__(self) -> str:
return str(self.internalPath)
def __repr__(self) -> str:
return self.__str__()
@property
def fullPath(self):
#self.directory.replace(self.root, "." if self.new_root == None else self.new_root)
return os.path.join(self.directory if self.new_root == None else self.directory.replace(self.root, "." if self.new_root == None else self.new_root), self.file)
@property
def internalPath(self):
return os.path.join(self.directory.replace(self.root, "."), self.file)
def copyPath(self, root):########TODO
return File(root, self.directory, self.file)
class UpgradedFile:
config_exts = [".ini", ".cfg"]
ignore_files = ["basevotes.smx", "admin-flatfile.smx"]
def __init__(self, new, old, equals_path = True) -> None:
self.new : File = new
self.old : File|None = old
self.equals_path = equals_path
@property
def isUpgrade(self):
return self.old != None
@property
def isConfig(self):
return "."+os.path.split(self.new.fullPath)[-1].split(".")[-1] in self.config_exts
@property
def isIgnore(self):
return os.path.split(self.new.fullPath)[-1] in self.ignore_files
def __str__(self):
return f"[{'~' if self.isUpgrade else '+'}] [{'P' if self.equals_path else 'F'}] {self.new.fullPath} -> {self.old.fullPath if self.isUpgrade else 'new'}"
def __repr__(self) -> str:
return self.__str__()
def Process(self, game_root, check = True):
source = self.new.fullPath
dest = self.old.fullPath if self.old != None else File(self.new.root, self.new.directory, self.new.file, game_root).fullPath
if os.path.exists(dest):
print("[-]", dest)
if not check:
os.remove(dest)
print("[+]", source, "->", dest)
dir_check = "/".join(os.path.split(dest)[:-1])
if not os.path.exists(dir_check):
print("[!] dest directory is not exist, create: ", dir_check)
os.makedirs(dir_check, exist_ok = True)
if not check:
shutil.copy2(source, dest)
print()
#extract sourcemod/metamod
arch_directory = f"/tmp/mm.arch.{time()}"
with open(metamod_arch) as arch:
os.mkdir(arch_directory)
arch.extractall(arch_directory)
#new files
new_filelist = []
for (dirpath, dirnames, filenames) in os.walk(arch_directory):
#print(dirpath, dirnames, filenames)
for file in filenames:
new_filelist.append(File(arch_directory, dirpath, file))
print(new_filelist[0])
#old files
old_filelist = []
for (dirpath, dirnames, filenames) in os.walk(os.path.join(self.root, "addons")):
#print(dirpath, dirnames, filenames)
for file in filenames:
old_filelist.append(File(self.root, dirpath, file))
print(old_filelist[0])
class FilesToUpgrade:
def __init__(self):
self.files_to_upgrade = []
def append(self, el):
self.files_to_upgrade.append(el)
def Upgraded(self):
return [f for f in self.files_to_upgrade if f.isUpgrade and not f.isConfig and not f.isIgnore]
def Appended(self):
return [f for f in self.files_to_upgrade if not f.isUpgrade and not f.isConfig and not f.isIgnore]
def Configs(self):
return [f for f in self.files_to_upgrade if f.isConfig]
def PrintListing(self):
print("Files to upgrade")
for f in self.Upgraded():
print(f)
print("Files to append")
for f in self.Appended():
print(f)
print("Updated config files")
for f in self.Configs():
print(f)
def BackupUpgradedFiles(self, path = f"/tmp/smmmbackup.{time()}.zip"):
print("Backup files to:", path)
import zipfile
zf = zipfile.ZipFile(path, "w")
for f in self.Upgraded():
zf.write(f.old.fullPath)
zf.close()
def Process(self, gameroot, check = True):
print(f"Processing {'check to upgrade' if check else 'upgrade'}...")
for f in self.Upgraded():
f.Process(gameroot, check)
print(f"Processing {'check to append' if check else 'append'}...")
for f in self.Appended():
f.Process(gameroot, check)
print("Config files dont touched!")
#collect
print("")
i = 0
files_to_upgrade = FilesToUpgrade()
for nf in new_filelist:
print(f"[{i}/{len(new_filelist)}] search...", end="\r")
founded = False
for of in old_filelist:
if nf.internalPath == of.internalPath:
files_to_upgrade.append(UpgradedFile(nf, of))
founded = True
break
if not founded:
for of in old_filelist:
if os.path.split(nf.internalPath)[-1] == os.path.split(of.internalPath)[-1]:
if os.path.split(nf.internalPath)[0][-2:] != "64":
#print(os.path.split(nf.internalPath), os.path.split(of.internalPath))
files_to_upgrade.append(UpgradedFile(nf, of, False))
founded = True
break
if not founded:
files_to_upgrade.append(UpgradedFile(nf, None))
i+=1
#show file list
files_to_upgrade.PrintListing()
files_to_upgrade.Process(self.root, check=True)
if self.wait_input():
files_to_upgrade.BackupUpgradedFiles(os.path.join(backup_directory, f"{time()}.zip"))
files_to_upgrade.Process(self.root, check=False)
#remove unzip archive
from shutil import rmtree
rmtree(arch_directory)
def clear_download_cache(self):
cache_files = glob(f"{self.root}/download/user_custom/*/*.dat")
self.o.info(f"Delete {len(cache_files)} cache files?")
if not self.wait_input():
return True
count = 0
for cache_file in cache_files:
try:
remove_file(cache_file)
count += 1
except Exception as delete_error:
print(cache_file, delete_error)
sys.exit(1)
self.o.info(f"Deleted {count} cache files")
return True
def clear_logs(self):
touch_if_date_lt = 60 * 60 * 24
default_logs_place = f"{self.root}/logs/*.log"
sourcemod_logs_place = f"{self.root}/addons/sourcemod/logs/*.log"
lgsm_logs_place = f"{self.root}/../../log/console/*.log"
###search default logs
try:
default_logs = [f for f in glob(default_logs_place) if time() - os.path.getmtime(f) > touch_if_date_lt]
self.o.info(f"Found {len(default_logs)} logs in default directory")
except:
default_logs = []
self.o.error(f"Cannot find logs in {default_logs_place}")
###search sourcemod logs
try:
sourcemod_logs = [f for f in glob(sourcemod_logs_place) if time() - os.path.getmtime(f) > touch_if_date_lt]
self.o.info(f"Found {len(sourcemod_logs)} logs in sourcemod directory")
except:
sourcemod_logs = []
self.o.error(f"Cannot find logs in {sourcemod_logs_place}")
###search lgsm logs
try:
lgsm_logs = [f for f in glob(lgsm_logs_place) if time() - os.path.getmtime(f) > touch_if_date_lt]
self.o.info(f"Found {len(lgsm_logs)} logs in lgsm directory")
except:
lgsm_logs = []
self.o.error(f"Cannot find logs in {lgsm_logs_place}")
logs_files = default_logs + sourcemod_logs + lgsm_logs
self.o.info(f"Compress {len(logs_files)} logs files and delete?")
if not self.wait_input():
return True
###compress stage
archive_name = f"dl_{time()}"
if default_logs:
self.o.info(f"Compress {default_logs_place}")
compress_files(default_logs, default_logs_place.replace("/*.log",""), archive_name)
if sourcemod_logs:
self.o.info(f"Compress {sourcemod_logs_place}")
compress_files(sourcemod_logs, sourcemod_logs_place.replace("/*.log",""), archive_name)
if lgsm_logs:
self.o.info(f"Compress {lgsm_logs_place}")
compress_files(lgsm_logs, lgsm_logs_place.replace("/*.log",""), archive_name)
###delete stage
count = 0
for log_file in logs_files:
try:
remove_file(log_file)
count += 1
except Exception as delete_error:
print(log_file, delete_error)
sys.exit(1)
self.o.info(f"Deleted {count} logs files")
return True
def upgrade_plugin(self, fresh_path, need_reloads_plugins = True):
new_hash = hashfile(fresh_path)
fresh_plugin_name = fresh_path.split("/")[-1:][0]
plugins_list = glob(f"{self.root}/addons/sourcemod/plugins/*.smx") + glob(f"{self.root}/addons/sourcemod/plugins/*/*.smx")
for plugin_path in plugins_list:
if plugin_path.split("/")[-1:][0] == fresh_plugin_name:
old_hash = hashfile(plugin_path)
if old_hash == new_hash:
self.o.info(f"Plugin {fresh_plugin_name} currenty updated!")
return False
else:
self.o.info(f"Upgrade plugin {fresh_plugin_name}...")
self.o.info(f"copy {fresh_path} to {plugin_path}")
shutil.copyfile(fresh_path, plugin_path)
if need_reloads_plugins:
self.o.info("Send sm plugins refresh command...", end = "\t")
self.rcon("sm plugins refresh", hide_server_name=True)
return True
self.o.info("Upgraded plugin not found in server...")
return False
def update_tf2idb(self):
from sqlite3 import OperationalError
self.o.info("Updating TF2IDB")
try:
tf2idb(self.root, "scripts/items/items_game.txt", "addons/sourcemod/data/sqlite/tf2idb.sq3")
except OperationalError:
self.o.error("Cannot find database file")
def wait_input(self, yes = "y", no = "n"):
if ALL_YES:
return True
while True:
print("Enter \"{}\" to accept or \"{}\" to negative response:".format(yes, no) ,end="\t")
response = input()
if response.lower() == yes:
return True
if response.lower() == no:
return False
def select(self, request):
if self.name == request:
return True
elif self.address == request:
return True
elif str(self.address[1]) == request:
return True
elif request in self.root:
return True
else:
return False
def status(self):
player_count, max_count, ping = self.count_players()
if player_count < 0:
print("{} | Not responsed(((".format(self))
else:
print("{} | {}/{} players | {} ms".format(self, player_count, max_count, ping))
@property
def map_config(self):
map = self.current_map()
if "/" in map:
sub_dir, map = map.split("/")
configs_directory = f"{self.root}/cfg/{sub_dir}"
else:
configs_directory = f"{self.root}/cfg"
if not check_dir(configs_directory):
os.mkdir(configs_directory)
return f"{configs_directory}/{map}.cfg"
def check2map_config(self, cfg_line):
config = self.map_config
try:
with open(config, "r") as config_file:
for line in config_file:
line = line.replace("\n", "")
if cfg_line == line:
return True
return False
except IOError:
return False
def remove2map_config(self, line):
config = self.map_config
if not self.check2map_config(line):
print(f"{self} | Line not currently in: {config}")
buffer = []
with open(config, "r") as config_file:
pass
def add2map_config(self, line):
config = self.map_config
if self.check2map_config(line):
print(f"{self} | Line currently in: {config}")
return False
with open(config, "a") as config_file:
config_file.write(f"{line}\n")
if self.check2map_config(line):
print(f"{self} | Line: {line}\nAppend to: {config}")
return True
print(f"{self} | Line: {line}\nNot append in: {config}")
return False
def executemap_config(self):
config = self.map()
if config:
self.rcon(f"sm_execcfg {config}.cfg", hide_server_name=True)
def map(self):
current_map = self.current_map()
if current_map:
print("{} | {}".format(self, current_map))
else:
print("{} | {}".format(self, "not running"))
def current_map(self):
try:
with RCON(self.address, self.password) as rcon:
rcon_response = rcon.execute("status")
return rcon_response.body.decode("utf8","ignore").split("\n")[5].split(": ")[1].split(" ")[0]
except:
traceback.print_exc()
return None
def count_players(self):
try:
ping = 0.0
try:
start_time = time()
server = a2s.info(tuple(self.address))
count, max_count = server.player_count, server.max_players
ping = server.ping
except NameError:
ping = time() - start_time
except ValueError:
return -1, -1, -1
return int(count), int(max_count), round(ping, 3) * 1000
except:
#traceback.print_exc()
return -1, -1, -1
def net_status_json(self):
rcon_response = ""
json_result = {}
try:
with RCON(self.address, self.password) as rcon:
rcon_response = rcon.execute("net_status")
response = rcon_response.body.decode("utf8","ignore")
response = response.split("\n")
#####################################################
gamemode, server_type, connections = response[1].split(": ")[1].split(",")
connections = int(connections.split()[0])
#####################################################
client_port, server_port, hltv_port, matchmaking_port, systemlink_port, lan_port = response[2].split(": ")[1].split(",")
client_port = int(client_port.split()[1])
server_port = int(server_port.split()[1])
hltv_port = int(hltv_port.split()[1])
#####################################################
try:
latency_avg_out, latency_avg_in = response[3].split(": ")[1].split(",")
latency_avg_out = float(latency_avg_out.split()[2][:-1])
latency_avg_in = float(latency_avg_in.split()[1][:-1])
except IndexError:
print(dumps({
"config":{
"gamemode":gamemode,
"server_type":server_type,
"connections":connections
},
"ports":{
"client":client_port,
"server":server_port,
"hltv":hltv_port
},
"latency":{
"avg_out":0.0,
"avg_in":0.0
},
"loss":{
"avg_out":0.0,
"avg_in":0.0
},
"packets":{
"total":{
"out":0,
"in":0
},
"client":{
"out":0,
"in":0
}
},
"data":{
"total":{
"out":0.0,
"in":0.0
},
"client":{
"out":0.0,
"in":0.0
}
}
}))
return
#####################################################
loss_avg_out, loss_avg_in = response[4].split(": ")[1].split(",")
loss_avg_out = float(loss_avg_out.split()[2])
loss_avg_in = float(loss_avg_in.split()[1])
#####################################################
packets_total_out, packets_total_in = response[5].split(": ")[1].split(",")
packets_total_out = float(packets_total_out.split()[3][:-2])
packets_total_in = float(packets_total_in.split()[1][:-2])
#####################################################
packets_per_client_out, packets_per_client_in = response[6].split(",")
packets_per_client_out = float(packets_per_client_out.split()[3][:-2])
packets_per_client_in = float(packets_per_client_in.split()[1][:-2])
#####################################################
data_total_out = response[7].split(": ")[1].split(",")[0].split()
if len(data_total_out) > 4:
if data_total_out[4] == "kB/s":
data_total_out = float(data_total_out[3]) * 1024
elif data_total_out[4] == "MB/s":
data_total_out = float(data_total_out[3]) * 1024 * 1024
else:
data_total_out = float(data_total_out[3])
#
data_total_in = response[7].split(": ")[1].split(",")[1].split()
if len(data_total_in) > 2:
if data_total_in[2] == "kB/s":
data_total_in = float(data_total_in[1]) * 1024
elif data_total_in[2] == "MB/s":
data_total_in = float(data_total_in[1]) * 1024 * 1024
else:
data_total_in = float(data_total_in[1])
#####################################################
data_per_client_out = response[8].split(",")[0].split()
if len(data_per_client_out) > 4:
if data_per_client_out[4] == "kB/s":
data_per_client_out = float(data_per_client_out[3]) * 1024
elif data_per_client_out[4] == "MB/s":
data_per_client_out = float(data_per_client_out[3]) * 1024 * 1024
else:
data_per_client_out = float(data_per_client_out[3])
#
data_per_client_in = response[8].split(",")[1].split()
if len(data_per_client_in) > 2:
if data_per_client_in[2] == "kB/s":
data_per_client_in = float(data_per_client_in[1]) * 1024
elif data_per_client_in[2] == "MB/s":
data_per_client_in = float(data_per_client_in[1]) * 1024 * 1024
else:
data_per_client_in = float(data_per_client_in[1])
######################################################
json_result = {
"config":{
"gamemode":gamemode,
"server_type":server_type,
"connections":connections
},
"ports":{
"client":client_port,
"server":server_port,
"hltv":hltv_port
},
"latency":{
"avg_out":latency_avg_out,
"avg_in":latency_avg_in
},
"loss":{
"avg_out":loss_avg_out,
"avg_in":loss_avg_in
},
"packets":{
"total":{
"out":packets_total_out,
"in":packets_total_in
},
"client":{
"out":packets_per_client_out,
"in":packets_per_client_in
}
},
"data":{
"total":{
"out":data_total_out,
"in":data_total_in
},
"client":{
"out":data_per_client_out,
"in":data_per_client_in
}
}
}
####################################################
print(dumps(json_result))
except Exception as rcon_error:
traceback.print_exc()
print(rcon_error)
def rcon(self, command, result = False, hide_server_name = False):
if not hide_server_name:
self.o.info("{obj.name:^25}: ".format(obj = self), end="\t")
rcon_response = ""
try:
with RCON(self.address, self.password) as rcon:
rcon_response = rcon.execute(command)
response = rcon_response.body.decode("utf8","ignore")
if not result:
print(response if response else "ok")
else:
return response
except Exception as rcon_error:
rcon_response = "Rcon execute error: {}".format(rcon_error)
print(rcon_response)
def show_directory(self, path):
full_path = "{}/{}".format(self.root, path)
print("Current directory: {}".format(full_path))
if check_dir(full_path):
SCall("ls -all {}".format(full_path).split())
else:
print("Directory does not exist")
def copy_file(self, source_file, path):
full_path = "{}/{}".format(self.root, path)
print("Copy in > {}".format(full_path))
new_file = source_file.split("/")[-1:][0]
if check_dir(full_path):
SCall("cp {0} {1}/{2}".format(source_file, full_path, new_file).split())
else:
print("Destonation directory doesn't exists!")
def symbolic_file(self, source_file, path):
full_path = "{}/{}".format(self.root, path)
print("Create link in > {}".format(full_path))
new_file = source_file.split("/")[-1:][0]
if not "/" in source_file:
source_file = os.getcwd() + "/" + source_file
destonation_file = "{}/{}".format(full_path, new_file)
if check_file(destonation_file):
print("Destonation file is current created, override him?")
if self.wait_input():
remove_file("{}/{}".format(full_path, new_file))
else:
print("Abort operation!")
return
if check_dir(full_path):
cmd = "ln -s {0} {1}/{2}".format(source_file, full_path, new_file)
print("Execute: ", cmd)
SCall(cmd.split())
else:
print("Destonation directory doesn't exists!")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--serverslist", help="Path to servers list", default = "./servers.txt", type = str)
parser.add_argument("--rcon", "-r", help = "Command to execute", type = str, nargs="+", default = "")
parser.add_argument("--choice", "-c", help = "Choice server, aka: part name in directory", type = str, default = ["global.choice"], nargs = "+")
parser.add_argument("--status", "-s", help = "Show current number players on server", default = False, action = "store_true")
parser.add_argument("--yes", "-y", help = "Say YES to all response", default = False, action = "store_true")
parser.add_argument("--netstatus","-ns", help = "Show json net_status", default = False, action = "store_true")
parser.add_argument("--map","-m", help = "Show map on server", default = False, action = "store_true")
parser.add_argument("--add_map_config", "-amc", help = "Add line to config map", default = "", type = str, nargs="+")
parser.add_argument("--execute_map_config", default=False, action="store_true")
################################################################################################################################################
parser.add_argument("--CopyFile", "-cp", help = "Path of file to copy in root directory\nNeed second argument: --DestDir", type = str, default = "")
parser.add_argument("--SymLinkFile", "-ln", help = "Path of file to create symbolic link in root directory\nNeed second argument: --DestDir", type = str, default = "")
parser.add_argument("--DestDir", "-dd", help = "Destonation directory, aka: addons/sourcemod/plugins", type = str, default = "/")
parser.add_argument("--ShowDir", "-ls", help = "Show ls command on dest directory", default = False, action = "store_true")
################################################################################################################################################
parser.add_argument("--AddPlugin", "-ap", help = "Path to new plugin.", type = str, default = "")
parser.add_argument("--RemovePlugin", "-rp", help = "Name plugin to remove. Names must match.", type = str, default = "")
parser.add_argument("--UpgradePlugin", "-upg", help = "Path of file to uprade. Names must match.", type = str, default = "")
parser.add_argument("--NoReloadPlugins", "-nrp", help = "Upgrade plugins without send sm plugins refresh command", default = False, action = "store_true")
################################################################################################################################################
parser.add_argument("--DeleteDownloadCache", "-ddc", help = "Clear download cache", default = False, action = "store_true")
parser.add_argument("--DeleteUnusedMaps", "-dum", help = "Delete maps from maps folder", default = False, action = "store_true")
parser.add_argument("--DeleteLogsFiles", "--dlf", help = "Delete logs file if older 1 day", default = False, action = "store_true")
################################################################################################################################################
parser.add_argument("--UpdateTF2IDB", help = "Update tf2idb database", default = False, action = "store_true")
################################################################################################################################################
parser.add_argument("--UpgradeMetaMod", help = "Upgrade current version of metamod", type = str, default="")
################################################################################################################################################
args = parser.parse_args()
ALL_YES = 1 if args.yes else 0
##################################
manager = Manager(args.serverslist)
#if len(args.choice) > 0 and args.choice[0] == "global.choice":
if not args.choice == ["global.choice"]:
if len(args.choice) == 1:
manager.select_server(args.choice[0])
else:
manager.server_choices = args.choice
##################################
if args.rcon:
command = ""
for word in args.rcon:
command += word + " "
command = command[:-1]
manager.execute("rcon", command)
sys.exit(0)
##################################
if args.add_map_config:
append_line = ""
for word in args.add_map_config:
append_line += word + " "
append_line = append_line[:-1]
manager.execute("add2map_config", append_line)
sys.exit(0)
if args.execute_map_config:
manager.execute("executemap_config")
sys.exit(0)
##################################
if args.netstatus:
manager.execute("net_status_json")
sys.exit(0)
##################################
if args.status:
manager.execute("status")
sys.exit(0)
##################################
if args.map:
manager.execute("map")
sys.exit(0)
##################################
if args.ShowDir:
if args.DestDir:
manager.execute("show_directory", args.DestDir)
sys.exit(0)
else:
o.error("Need --DestDir argument!")
sys.exit(1)
##################################
if args.CopyFile and args.DestDir:
if check_file(args.CopyFile):
manager.execute("copy_file", args.CopyFile, args.DestDir)
sys.exit(0)
else:
o.error("Invalid path of source file!")
sys.exit(1)
##################################
if args.SymLinkFile and args.DestDir:
if check_file(args.SymLinkFile):
manager.execute("symbolic_file", args.SymLinkFile, args.DestDir)
sys.exit(0)
else:
o.error("Invalid path of source file!")
sys.exit(1)
###################################
if args.AddPlugin:
if check_file(args.AddPlugin):
manager.execute("add_plugin", args.AddPlugin, not args.NoReloadPlugins)
sys.exit(0)
else:
o.error("Invalid path of upgraded plugin!")
sys.exit(1)
###################################
if args.UpgradePlugin:
if check_file(args.UpgradePlugin):
manager.execute("upgrade_plugin", args.UpgradePlugin, not args.NoReloadPlugins)
#manager.execute("rcon", "sm plugins refresh")
sys.exit(0)
else:
o.error("Invalid path of upgraded plugin!")
sys.exit(1)
###################################
if args.RemovePlugin:
if True:
manager.execute("remove_plugin", args.RemovePlugin, not args.NoReloadPlugins)
#manager.execute("rcon", "sm plugins refresh")
sys.exit(0)
else:
o.error("Invalid name of remove plugin!")
sys.exit(1)
###################################
if args.DeleteDownloadCache:
manager.execute("clear_download_cache")
sys.exit(0)
###################################
if args.DeleteUnusedMaps:
manager.execute("clear_maps")
sys.exit(0)
###################################
if args.UpdateTF2IDB:
manager.execute("update_tf2idb")
sys.exit(0)
###################################
if args.DeleteLogsFiles:
manager.execute("clear_logs")
sys.exit(0)
###################################
if args.UpgradeMetaMod:
manager.execute("upgrade_metamod", args.UpgradeMetaMod)
sys.exit(0)