You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

69 lines
2.0 KiB

import aiohttp, os, sys
from colors import *
import traceback
class BackendClient:
up = False
warn = False
def __init__(self):
if not os.getenv("BACKEND_URL", ""):
error("BACKEND_URL not setted in env")
sys.exit(10)
if not os.getenv("SECRET_KEY", ""):
error("SECRET_KEY not setted in env")
sys.exit(11)
self.secret_key = os.getenv("SECRET_KEY")
self.pulse_url = f"{os.getenv('BACKEND_URL')}/api/pulse"
self.vip_url = f"{os.getenv('BACKEND_URL')}/api/external/vip"
async def pulse(self, exit_if_error = False):
async with aiohttp.ClientSession(cookies={
"secretkey":self.secret_key}) as session:
try:
async with session.get(self.pulse_url, ssl=False) as response:
await response.text()
self.up = True
if not self.warn:
info("Backend connected!")
self.warn = True
except:
error("Backend not respond")
traceback.print_exc()
if exit_if_error:
sys.exit(200)
self.up = False
self.warn = False
return self.up
async def vip(self, steamid, amount: int, extra: str, unique: str = ""):
async with aiohttp.ClientSession(cookies={
"secretkey":self.secret_key}) as session:
async with session.post(self.vip_url + f"?steam={steamid}&amount={int(amount)}&service=qiwi&extra={extra}&unique={unique}", ssl=False) as response:
try:
result = int(await response.text())
if result == 0:
warning(f"[S64:{steamid}] VIP as not be added, maybe permition already exists")
return 99
elif result > 0:
info(f"[S64:{steamid}] VIP has be added!")
return 100
elif result < 0:
info(f"[S64:{steamid}] VIP has be extends!")
return 101
except:
error(f"[S64:{steamid}] Backend returned error")
traceback.print_exc()
return False
return False
async def prices(self, exit_if_error = False):
async with aiohttp.ClientSession() as session:
try:
async with session.get(self.vip_url) as response:
return await response.json()
except:
traceback.print_exc()
if exit_if_error:
error("Cannot fetch prices")
sys.exit(200)