You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

314 lines
10 KiB

from datetime import datetime
import psutil
from time import sleep
import traceback
from threading import Thread
def bytes2human(size, header = "", buf = 1000, txt = ""):
if size < buf:
return "{}{:>3}b{}".format(header, int(size), txt)
size = size / buf
if size < buf:
return "{}{:>3}K{}".format(header, int(size), txt)
size = size / buf
if size < buf:
return "{}{:>3}M{}".format(header, int(size), txt)
size = size / buf
if size < buf:
return "{}{:>3}G{}".format(header, int(size), txt)
size = size / buf
if size < buf:
return "{}{:>3}T{}".format(header, int(size), txt)
return "BIG"
class Datetime:
def __init__(self, blinkig_seconds = True):
self.blinkig_seconds = blinkig_seconds
self.char_seconds = ":"
self.iterrate_seconds = False
def time(self):
self.iterrate_seconds = not self.iterrate_seconds
return "{:5}".format(datetime.now().strftime("%H"+(self.char_seconds if not self.blinkig_seconds else " " if self.iterrate_seconds else self.char_seconds) +"%M"))
def date(self):
return "{:4}".format(datetime.now().strftime("%d%m"))
class PsUtil:
@staticmethod
def cpu(header = "", tail = "%"):
c = int(psutil.cpu_percent())
return "{}{:>2}{}".format(header, c if c < 100 else 99, tail)
@staticmethod
def mem(header = "", tail = "", round_how_much = 1):
return "{}{:>4}{}".format(header, round(psutil.virtual_memory()[3]/1000000000, round_how_much), tail)
@staticmethod
def network(sleep_time = 1.0, eth = "Ethernet"):
first = psutil.net_io_counters(pernic=True)
sleep(sleep_time)
second = psutil.net_io_counters(pernic=True)
downloaded = 0
uploaded = 0
try:
uploaded = (second[eth].bytes_sent - first[eth].bytes_sent) * 1 / sleep_time
downloaded = (second[eth].bytes_recv - first[eth].bytes_recv) * 1 / sleep_time
except:
traceback.print_exc()
return ""
return f"{bytes2human(uploaded, '')} {bytes2human(downloaded, '')}"
class LibreHardWareMonitor:
def __init__(self) -> None:
import clr
import os
clr.AddReference(os.path.join(os.path.dirname(os.path.abspath(__file__)), "LibreHardwareMonitorLib.dll"))
self.buildPc()
self.data = {}
self.updateDone = False
def buildPc(self):
from LibreHardwareMonitor.Hardware import Computer
self.c = Computer()
self.c.IsBatteryEnabled = True
self.c.IsControllerEnabled = True
self.c.IsCpuEnabled = True
self.c.IsGpuEnabled = True
self.c.IsMemoryEnabled = True
self.c.IsPsuEnabled = True
self.c.Open()
def discoverPc(self):
for hw in self.c.Hardware:
print("hw:", hw.Identifier, hw.Name)
for shw in hw.SubHardware:
print("shw:", shw.Identifier, shw.Name)
for sen in shw.Sensors:
print("sensor:", sen.Identifier, sen.Name, sen.Value)
for sen in hw.Sensors:
print("sensor:", sen.Identifier, sen.Name, sen.Value)
def update(self):
for hw in self.c.Hardware:
for shw in hw.SubHardware:
for sen in shw.Sensors:
self.data[str(sen.Identifier)] = sen.Value
for sen in hw.Sensors:
self.data[str(sen.Identifier)] = sen.Value
hw.Update()
self.updateDone = True
return self.data
def avg_from_dict(key, dic):
sum = 0.0
count = 0
for k, v in dic.items():
if key in k:
sum += v
count+=1
"""
from fps_inspector import start_fliprate_recording, get_fliprate_count, get_last_fliprates
class FPSCounter:
def __init__(self, format = "{:>3}") -> None:
self.listen = False
self.prev_value = 0
self.format = format
def update(self):
if not self.listen:
self.listen = True
start_fliprate_recording()
return -1
else:
count = get_fliprate_count()
r_count = count - self.prev_value
self.prev_value = count
return self.format.format(r_count)
"""
class ThreadedCollector:
checked_values = {}
checkers = []
def add_checker(self, func, name, sleep_time = 1, *args, **kwargs):
def updater(*args, **kwargs):
while 1:
try:
#print(args, kwargs)
val = func(*args, **kwargs)
if type(val) == dict:
self.checked_values.update(val)
else:
self.checked_values[name] = val
except Exception as te:
traceback.print_exc()
print(f"Cannot update: {name}, err: {te}")
finally:
sleep(sleep_time)
self.checkers.append(Thread(target=updater, name=name, daemon=True))
def start(self):
for t in self.checkers:
t.start()
def printValues(self, grab = []):
for k,v in self.checked_values.items():
print(k,v)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--com", type=str, help="windows: COM1, lin: ttyusb0")
parser.add_argument("--com-speed", type=int, help="9600, ... etc")
parser.add_argument("--max-w", type=int, default=20, help = "max charest in line")
parser.add_argument("--max-h", type=int, default=2, help = "max lines")
parser.add_argument("--lines", type=str, nargs="+", help='format lines, example: --lines "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage" --lines "/time /date /network"')
parser.add_argument("--refresh", type=int, default=1, help = "Update display every N-seconds")
parser.add_argument("--dev", action="store_true", default=False, help="Enable dev mode")
parser.add_argument("--lhwm-discover", action="store_true", default=False, help="Enable dev mode", help="show found sensors with names")
parser.add_argument("--format-overrides", nargs="+", help='formated text with python str.format, example: --format-overrides "/gpu-nvidia/0/load/0,{:>2}" --format-overrides "/gpu-nvidia/0/temperature/2,{:>2}"')
args = parser.parse_args()
if args.dev:
args.com = "COM6" if not args.com else args.com
args.com_speed = 9600 if not args.com_speed else args.com_speed
args.format_overrides = ["/gpu-nvidia/0/load/0,{:>2}", "/intelcpu/0/temperature/0,{:>2}", "/gpu-nvidia/0/temperature/2,{:>2}", "/intelcpu/0/load/0,{:>2}"] if not args.format_overrides else args.format_overrides
first_line = "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage /gpu-nvidia/0/load/0% /gpu-nvidia/0/temperature/2C"
second_line = "/time /date /network"
args.lines = [first_line, second_line]
for line in args.lines:
print(line)
overrides = {}
for override in args.format_overrides:
try:
k, v = override.split(",")
overrides[k] = v
print(f"Found override {k} to {v}")
except:
print(f"Cannot parse override {override}")
exit(1)
lhwm = LibreHardWareMonitor()
lhwm.discoverPc()
if args.lhwm_discover:
exit(0)
#fps = FPSCounter()
datetime_cls = Datetime()
######################################################
tc = ThreadedCollector()
check_contains = "\n".join(args.lines)
if "/time" in check_contains:
tc.add_checker(datetime_cls.time, "/time")
if "/date" in check_contains:
tc.add_checker(datetime_cls.date, "/date")
######################################################
if "/cpu_percent" in check_contains:
tc.add_checker(PsUtil.cpu, "/cpu_percent", header = "")
if "/mem_usage" in check_contains:
tc.add_checker(PsUtil.mem, "/mem_usage")
if "/network" in check_contains:
tc.add_checker(PsUtil.network, "/network")
######################################################
tc.add_checker(lhwm.update, "lhwm")
######################################################
#tc.add_checker(fps.update, "/fps", sleep_time=0.1)
tc.start()
import serial, traceback
def formatLine(line, limit = 20):
new_line = line
for k, v in tc.checked_values.items():
if k in new_line:
if v is None:
new_value = "0"
elif type(v) == str:
new_value = v
elif type(v) == float:
new_value = str(int(v))
else:
new_value = str(v)
if k in overrides.keys():
new_line = new_line.replace(k, overrides[k].format(new_value))
else:
new_line = new_line.replace(k, new_value)
return new_line[:limit]
def send2com(com_port, string, clear_screen = True):
if clear_screen:
com_port.write(bytes("\r", "utf8"))
com_port.write(bytes(string, "utf8"))
INIT = 0
WORKING = 1
TODIE = 2
CURRENT_STATUS = INIT
#import signal
#def signal_handler(sig, frame):
# CURRENT_STATUS = TODIE
#signal.signal(signal.SIGINT, signal_handler)
str_to_format = "".join(["{:^"+str(args.max_w)+"}" for i in range(0, args.max_h)])
print(str_to_format)
print(args.lines)
while 1:
try:
with serial.Serial(args.com, args.com_speed) as com_port:
init_timer = 0.0
step = 0.1
while not lhwm.updateDone:
send2com(com_port, str_to_format.format("Wait LHWM sensors", f"{init_timer} sec..."))
init_timer += step
sleep(step)
else:
CURRENT_STATUS = WORKING
while CURRENT_STATUS != TODIE:
try:
send2com(com_port, str_to_format.format(*[formatLine(arg, args.max_w) for arg in args.lines]))
sleep(args.refresh)
except KeyboardInterrupt:
CURRENT_STATUS = TODIE
print("Ctrl+C to exit")
if CURRENT_STATUS == TODIE:
send2com(com_port, str_to_format.format("Bye...", f":)"))
exit(0)
except SystemExit:
exit(0)
except:
print("Critical error")
traceback.print_exc()
finally:
sleep(1)