import aiodocker import asyncio import aiohttp from time import time import os, sys, traceback class Client: def __init__(self): self.endpoint = os.getenv("ENDPOINT","") self.gateway = os.getenv("GATEWAY", "api/docker") self.secretkey = os.getenv("SECRET_KEY","") self.docker_url = os.getenv("DOCKER_URL", "") ################################################# self.url = self.endpoint + "/" + self.gateway self.docker = aiodocker.Docker(self.docker_url) self.servers = [] async def loop(self, timeout = 5): while 1: try: t = [] for server in self.servers: t.append(self.update_stats(server)) await asyncio.gather(*t) except Exception as e: print(e) traceback.print_exc() except KeyboardInterrupt: print("Exit") sys.exit(0) finally: await asyncio.sleep(timeout) async def request_srvs(self): try: async with aiohttp.ClientSession(cookies={"secretkey":self.secretkey}) as session: async with session.get(self.url, ssl = False) as response: self.servers = await response.json() return self.servers except: traceback.print_exc() return [] async def update_stats(self, sc): srv = sc['srv'] dc = sc['container'] try: stats = await self.request_stats(dc) except: stats = None print(f"Cannot get container stats on: {srv}") try: if stats != None: await self.send_stats(srv, stats) else: await self.down_stats(srv) except: print(f"Cannot update information on: {srv}") traceback.print_exc() async def request_stats(self, container_name): c = await self.docker.containers.get(container_name) s = await c.stats(stream=False) s = s[0] cpu_delta = s['cpu_stats']['cpu_usage']['total_usage'] - s['precpu_stats']['cpu_usage']['total_usage'] system_cpu_delta = s['cpu_stats']['system_cpu_usage'] - s['precpu_stats']['system_cpu_usage'] used_memory = s['memory_stats']['usage']# - s['memory_stats']['stats']['cache'] available_memory = s['memory_stats']['limit'] b = { "cpu":{ "percent": round((cpu_delta / system_cpu_delta) * s['cpu_stats']['online_cpus'] * 100.0, 2) }, "mem":{ "percent":round((used_memory / available_memory) * 100.0, 2), "usage":round(used_memory, 2), "limit":round(available_memory, 2) }, "net":{ "input":round(s["networks"]['eth0']['rx_bytes'], 2), "output":round(s["networks"]['eth0']['tx_bytes'], 2) }, "io":{ "input":round(s["blkio_stats"]['io_service_bytes_recursive'][0]['value'], 2), "output":round(s["blkio_stats"]['io_service_bytes_recursive'][1]['value'], 2) }, "utime":time() } return b async def send_stats(self, srv, stats): async with aiohttp.ClientSession(cookies={"secretkey":self.secretkey}) as session: async with session.post(self.url + "/" + srv, ssl=False, json=stats) as response: return await response.text() async def down_stats(self, srv): async with aiohttp.ClientSession(cookies={"secretkey":self.secretkey}) as session: async with session.delete(self.url + "/" + srv, ssl=False) as response: return await response.text() if __name__ == "__main__": c = Client() loop = asyncio.get_event_loop() print("Pre setup stage, get servers list...") try_count = 5 while try_count: try_count -= 1 if try_count == 0: print("Exit cannot get backend") sys.exit(1) loop.run_until_complete(c.request_srvs()) if c.servers.__len__() == 0: print("Backend not respond, or server list not setup! Try again after 5 seconds...") loop.run_until_complete(asyncio.sleep(5)) else: break print(f"Found {c.servers.__len__()} servers!") loop.run_until_complete(c.loop(int(os.getenv("TIMEOUT", "5"))))