from datetime import datetime import psutil from time import sleep import traceback from threading import Thread def bytes2human(size, header = "", buf = 1000, txt = ""): if size < buf: return "{}{:>3}b{}".format(header, int(size), txt) size = size / buf if size < buf: return "{}{:>3}K{}".format(header, int(size), txt) size = size / buf if size < buf: return "{}{:>3}M{}".format(header, int(size), txt) size = size / buf if size < buf: return "{}{:>3}G{}".format(header, int(size), txt) size = size / buf if size < buf: return "{}{:>3}T{}".format(header, int(size), txt) return "BIG" class Datetime: def __init__(self, blinkig_seconds = True): self.blinkig_seconds = blinkig_seconds self.char_seconds = ":" self.iterrate_seconds = False def time(self): self.iterrate_seconds = not self.iterrate_seconds return "{:5}".format(datetime.now().strftime("%H"+(self.char_seconds if not self.blinkig_seconds else " " if self.iterrate_seconds else self.char_seconds) +"%M")) def date(self): return "{:4}".format(datetime.now().strftime("%d%m")) class PsUtil: @staticmethod def cpu(header = "", tail = "%"): c = int(psutil.cpu_percent()) return "{}{:>2}{}".format(header, c if c < 100 else 99, tail) @staticmethod def mem(header = "", tail = "", round_how_much = 1): return "{}{:>4}{}".format(header, round(psutil.virtual_memory()[3]/1000000000, round_how_much), tail) @staticmethod def network(sleep_time = 1.0, eth = "Ethernet"): first = psutil.net_io_counters(pernic=True) sleep(sleep_time) second = psutil.net_io_counters(pernic=True) downloaded = 0 uploaded = 0 try: uploaded = (second[eth].bytes_sent - first[eth].bytes_sent) * 1 / sleep_time downloaded = (second[eth].bytes_recv - first[eth].bytes_recv) * 1 / sleep_time except: traceback.print_exc() return "" return f"{bytes2human(uploaded, '')} {bytes2human(downloaded, '')}" class LibreHardWareMonitor: def __init__(self) -> None: import clr import os clr.AddReference(os.path.join(os.path.dirname(os.path.abspath(__file__)), "LibreHardwareMonitorLib.dll")) self.buildPc() self.data = {} self.updateDone = False def buildPc(self): from LibreHardwareMonitor.Hardware import Computer self.c = Computer() self.c.IsBatteryEnabled = True self.c.IsControllerEnabled = True self.c.IsCpuEnabled = True self.c.IsGpuEnabled = True self.c.IsMemoryEnabled = True self.c.IsPsuEnabled = True self.c.Open() def discoverPc(self): for hw in self.c.Hardware: print("hw:", hw.Identifier, hw.Name) for shw in hw.SubHardware: print("shw:", shw.Identifier, shw.Name) for sen in shw.Sensors: print("sensor:", sen.Identifier, sen.Name, sen.Value) for sen in hw.Sensors: print("sensor:", sen.Identifier, sen.Name, sen.Value) def update(self): for hw in self.c.Hardware: for shw in hw.SubHardware: for sen in shw.Sensors: self.data[str(sen.Identifier)] = sen.Value for sen in hw.Sensors: self.data[str(sen.Identifier)] = sen.Value hw.Update() self.updateDone = True return self.data def avg_from_dict(key, dic): sum = 0.0 count = 0 for k, v in dic.items(): if key in k: sum += v count+=1 """ from fps_inspector import start_fliprate_recording, get_fliprate_count, get_last_fliprates class FPSCounter: def __init__(self, format = "{:>3}") -> None: self.listen = False self.prev_value = 0 self.format = format def update(self): if not self.listen: self.listen = True start_fliprate_recording() return -1 else: count = get_fliprate_count() r_count = count - self.prev_value self.prev_value = count return self.format.format(r_count) """ class ThreadedCollector: checked_values = {} checkers = [] def add_checker(self, func, name, sleep_time = 1, *args, **kwargs): def updater(*args, **kwargs): while 1: try: #print(args, kwargs) val = func(*args, **kwargs) if type(val) == dict: self.checked_values.update(val) else: self.checked_values[name] = val except Exception as te: traceback.print_exc() print(f"Cannot update: {name}, err: {te}") finally: sleep(sleep_time) self.checkers.append(Thread(target=updater, name=name, daemon=True)) def start(self): for t in self.checkers: t.start() def printValues(self, grab = []): for k,v in self.checked_values.items(): print(k,v) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--com", type=str, help="windows: COM1, lin: ttyusb0") parser.add_argument("--com-speed", type=int, help="9600, ... etc") parser.add_argument("--max-w", type=int, default=20, help = "max charest in line") parser.add_argument("--max-h", type=int, default=2, help = "max lines") parser.add_argument("--lines", type=str, nargs="+", help='format lines, example: --lines "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage" --lines "/time /date /network"') parser.add_argument("--refresh", type=int, default=1, help = "Update display every N-seconds") parser.add_argument("--dev", action="store_true", default=False, help="Enable dev mode") parser.add_argument("--lhwm-discover", action="store_true", default=False, help="Enable dev mode", help="show found sensors with names") parser.add_argument("--format-overrides", nargs="+", help='formated text with python str.format, example: --format-overrides "/gpu-nvidia/0/load/0,{:>2}" --format-overrides "/gpu-nvidia/0/temperature/2,{:>2}"') args = parser.parse_args() if args.dev: args.com = "COM6" if not args.com else args.com args.com_speed = 9600 if not args.com_speed else args.com_speed args.format_overrides = ["/gpu-nvidia/0/load/0,{:>2}", "/intelcpu/0/temperature/0,{:>2}", "/gpu-nvidia/0/temperature/2,{:>2}", "/intelcpu/0/load/0,{:>2}"] if not args.format_overrides else args.format_overrides first_line = "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage /gpu-nvidia/0/load/0% /gpu-nvidia/0/temperature/2C" second_line = "/time /date /network" args.lines = [first_line, second_line] for line in args.lines: print(line) overrides = {} for override in args.format_overrides: try: k, v = override.split(",") overrides[k] = v print(f"Found override {k} to {v}") except: print(f"Cannot parse override {override}") exit(1) lhwm = LibreHardWareMonitor() lhwm.discoverPc() if args.lhwm_discover: exit(0) #fps = FPSCounter() datetime_cls = Datetime() ###################################################### tc = ThreadedCollector() check_contains = "\n".join(args.lines) if "/time" in check_contains: tc.add_checker(datetime_cls.time, "/time") if "/date" in check_contains: tc.add_checker(datetime_cls.date, "/date") ###################################################### if "/cpu_percent" in check_contains: tc.add_checker(PsUtil.cpu, "/cpu_percent", header = "") if "/mem_usage" in check_contains: tc.add_checker(PsUtil.mem, "/mem_usage") if "/network" in check_contains: tc.add_checker(PsUtil.network, "/network") ###################################################### tc.add_checker(lhwm.update, "lhwm") ###################################################### #tc.add_checker(fps.update, "/fps", sleep_time=0.1) tc.start() import serial, traceback def formatLine(line, limit = 20): new_line = line for k, v in tc.checked_values.items(): if k in new_line: if v is None: new_value = "0" elif type(v) == str: new_value = v elif type(v) == float: new_value = str(int(v)) else: new_value = str(v) if k in overrides.keys(): new_line = new_line.replace(k, overrides[k].format(new_value)) else: new_line = new_line.replace(k, new_value) return new_line[:limit] def send2com(com_port, string, clear_screen = True): if clear_screen: com_port.write(bytes("\r", "utf8")) com_port.write(bytes(string, "utf8")) INIT = 0 WORKING = 1 TODIE = 2 CURRENT_STATUS = INIT #import signal #def signal_handler(sig, frame): # CURRENT_STATUS = TODIE #signal.signal(signal.SIGINT, signal_handler) str_to_format = "".join(["{:^"+str(args.max_w)+"}" for i in range(0, args.max_h)]) print(str_to_format) print(args.lines) while 1: try: with serial.Serial(args.com, args.com_speed) as com_port: init_timer = 0.0 step = 0.1 while not lhwm.updateDone: send2com(com_port, str_to_format.format("Wait LHWM sensors", f"{init_timer} sec...")) init_timer += step sleep(step) else: CURRENT_STATUS = WORKING while CURRENT_STATUS != TODIE: try: send2com(com_port, str_to_format.format(*[formatLine(arg, args.max_w) for arg in args.lines])) sleep(args.refresh) except KeyboardInterrupt: CURRENT_STATUS = TODIE print("Ctrl+C to exit") if CURRENT_STATUS == TODIE: send2com(com_port, str_to_format.format("Bye...", f":)")) exit(0) except SystemExit: exit(0) except: print("Critical error") traceback.print_exc() finally: sleep(1)