You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
314 lines
10 KiB
314 lines
10 KiB
from datetime import datetime
|
|
import psutil
|
|
from time import sleep
|
|
import traceback
|
|
from threading import Thread
|
|
|
|
def bytes2human(size, header = "", buf = 1000, txt = ""):
|
|
if size < buf:
|
|
return "{}{:>3}b{}".format(header, int(size), txt)
|
|
|
|
size = size / buf
|
|
if size < buf:
|
|
return "{}{:>3}K{}".format(header, int(size), txt)
|
|
|
|
size = size / buf
|
|
if size < buf:
|
|
return "{}{:>3}M{}".format(header, int(size), txt)
|
|
|
|
size = size / buf
|
|
if size < buf:
|
|
return "{}{:>3}G{}".format(header, int(size), txt)
|
|
|
|
size = size / buf
|
|
if size < buf:
|
|
return "{}{:>3}T{}".format(header, int(size), txt)
|
|
|
|
return "BIG"
|
|
|
|
class Datetime:
|
|
def __init__(self, blinkig_seconds = True):
|
|
self.blinkig_seconds = blinkig_seconds
|
|
self.char_seconds = ":"
|
|
self.iterrate_seconds = False
|
|
|
|
def time(self):
|
|
self.iterrate_seconds = not self.iterrate_seconds
|
|
return "{:5}".format(datetime.now().strftime("%H"+(self.char_seconds if not self.blinkig_seconds else " " if self.iterrate_seconds else self.char_seconds) +"%M"))
|
|
|
|
def date(self):
|
|
return "{:4}".format(datetime.now().strftime("%d%m"))
|
|
|
|
class PsUtil:
|
|
@staticmethod
|
|
def cpu(header = "", tail = "%"):
|
|
c = int(psutil.cpu_percent())
|
|
return "{}{:>2}{}".format(header, c if c < 100 else 99, tail)
|
|
|
|
@staticmethod
|
|
def mem(header = "", tail = "", round_how_much = 1):
|
|
return "{}{:>4}{}".format(header, round(psutil.virtual_memory()[3]/1000000000, round_how_much), tail)
|
|
|
|
@staticmethod
|
|
def network(sleep_time = 1.0, eth = "Ethernet"):
|
|
first = psutil.net_io_counters(pernic=True)
|
|
sleep(sleep_time)
|
|
second = psutil.net_io_counters(pernic=True)
|
|
|
|
downloaded = 0
|
|
uploaded = 0
|
|
|
|
try:
|
|
uploaded = (second[eth].bytes_sent - first[eth].bytes_sent) * 1 / sleep_time
|
|
downloaded = (second[eth].bytes_recv - first[eth].bytes_recv) * 1 / sleep_time
|
|
except:
|
|
traceback.print_exc()
|
|
return ""
|
|
|
|
return f"{bytes2human(uploaded, '')} {bytes2human(downloaded, '')}"
|
|
|
|
class LibreHardWareMonitor:
|
|
def __init__(self) -> None:
|
|
import clr
|
|
import os
|
|
clr.AddReference(os.path.join(os.path.dirname(os.path.abspath(__file__)), "LibreHardwareMonitorLib.dll"))
|
|
self.buildPc()
|
|
self.data = {}
|
|
self.updateDone = False
|
|
|
|
def buildPc(self):
|
|
from LibreHardwareMonitor.Hardware import Computer
|
|
self.c = Computer()
|
|
self.c.IsBatteryEnabled = True
|
|
self.c.IsControllerEnabled = True
|
|
self.c.IsCpuEnabled = True
|
|
self.c.IsGpuEnabled = True
|
|
self.c.IsMemoryEnabled = True
|
|
self.c.IsPsuEnabled = True
|
|
self.c.Open()
|
|
|
|
def discoverPc(self):
|
|
for hw in self.c.Hardware:
|
|
print("hw:", hw.Identifier, hw.Name)
|
|
for shw in hw.SubHardware:
|
|
print("shw:", shw.Identifier, shw.Name)
|
|
for sen in shw.Sensors:
|
|
print("sensor:", sen.Identifier, sen.Name, sen.Value)
|
|
for sen in hw.Sensors:
|
|
print("sensor:", sen.Identifier, sen.Name, sen.Value)
|
|
|
|
def update(self):
|
|
for hw in self.c.Hardware:
|
|
for shw in hw.SubHardware:
|
|
for sen in shw.Sensors:
|
|
self.data[str(sen.Identifier)] = sen.Value
|
|
|
|
for sen in hw.Sensors:
|
|
self.data[str(sen.Identifier)] = sen.Value
|
|
hw.Update()
|
|
self.updateDone = True
|
|
return self.data
|
|
|
|
def avg_from_dict(key, dic):
|
|
sum = 0.0
|
|
count = 0
|
|
for k, v in dic.items():
|
|
if key in k:
|
|
sum += v
|
|
count+=1
|
|
|
|
"""
|
|
from fps_inspector import start_fliprate_recording, get_fliprate_count, get_last_fliprates
|
|
class FPSCounter:
|
|
def __init__(self, format = "{:>3}") -> None:
|
|
self.listen = False
|
|
self.prev_value = 0
|
|
self.format = format
|
|
|
|
def update(self):
|
|
if not self.listen:
|
|
self.listen = True
|
|
start_fliprate_recording()
|
|
return -1
|
|
else:
|
|
count = get_fliprate_count()
|
|
r_count = count - self.prev_value
|
|
self.prev_value = count
|
|
return self.format.format(r_count)
|
|
"""
|
|
|
|
class ThreadedCollector:
|
|
checked_values = {}
|
|
checkers = []
|
|
|
|
def add_checker(self, func, name, sleep_time = 1, *args, **kwargs):
|
|
def updater(*args, **kwargs):
|
|
while 1:
|
|
try:
|
|
#print(args, kwargs)
|
|
val = func(*args, **kwargs)
|
|
|
|
if type(val) == dict:
|
|
self.checked_values.update(val)
|
|
else:
|
|
self.checked_values[name] = val
|
|
except Exception as te:
|
|
traceback.print_exc()
|
|
print(f"Cannot update: {name}, err: {te}")
|
|
finally:
|
|
sleep(sleep_time)
|
|
|
|
self.checkers.append(Thread(target=updater, name=name, daemon=True))
|
|
|
|
def start(self):
|
|
for t in self.checkers:
|
|
t.start()
|
|
|
|
def printValues(self, grab = []):
|
|
for k,v in self.checked_values.items():
|
|
print(k,v)
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("--com", type=str, help="windows: COM1, lin: ttyusb0")
|
|
parser.add_argument("--com-speed", type=int, help="9600, ... etc")
|
|
|
|
parser.add_argument("--max-w", type=int, default=20, help = "max charest in line")
|
|
parser.add_argument("--max-h", type=int, default=2, help = "max lines")
|
|
parser.add_argument("--lines", type=str, nargs="+", help='format lines, example: --lines "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage" --lines "/time /date /network"')
|
|
|
|
parser.add_argument("--refresh", type=int, default=1, help = "Update display every N-seconds")
|
|
parser.add_argument("--dev", action="store_true", default=False, help="Enable dev mode")
|
|
parser.add_argument("--lhwm-discover", action="store_true", default=False, help="Enable dev mode", help="show found sensors with names")
|
|
parser.add_argument("--format-overrides", nargs="+", help='formated text with python str.format, example: --format-overrides "/gpu-nvidia/0/load/0,{:>2}" --format-overrides "/gpu-nvidia/0/temperature/2,{:>2}"')
|
|
|
|
args = parser.parse_args()
|
|
if args.dev:
|
|
args.com = "COM6" if not args.com else args.com
|
|
args.com_speed = 9600 if not args.com_speed else args.com_speed
|
|
args.format_overrides = ["/gpu-nvidia/0/load/0,{:>2}", "/intelcpu/0/temperature/0,{:>2}", "/gpu-nvidia/0/temperature/2,{:>2}", "/intelcpu/0/load/0,{:>2}"] if not args.format_overrides else args.format_overrides
|
|
|
|
|
|
first_line = "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage /gpu-nvidia/0/load/0% /gpu-nvidia/0/temperature/2C"
|
|
second_line = "/time /date /network"
|
|
args.lines = [first_line, second_line]
|
|
|
|
for line in args.lines:
|
|
print(line)
|
|
|
|
overrides = {}
|
|
for override in args.format_overrides:
|
|
try:
|
|
k, v = override.split(",")
|
|
overrides[k] = v
|
|
print(f"Found override {k} to {v}")
|
|
except:
|
|
print(f"Cannot parse override {override}")
|
|
exit(1)
|
|
|
|
|
|
lhwm = LibreHardWareMonitor()
|
|
lhwm.discoverPc()
|
|
if args.lhwm_discover:
|
|
exit(0)
|
|
|
|
#fps = FPSCounter()
|
|
|
|
datetime_cls = Datetime()
|
|
######################################################
|
|
tc = ThreadedCollector()
|
|
|
|
check_contains = "\n".join(args.lines)
|
|
if "/time" in check_contains:
|
|
tc.add_checker(datetime_cls.time, "/time")
|
|
|
|
if "/date" in check_contains:
|
|
tc.add_checker(datetime_cls.date, "/date")
|
|
######################################################
|
|
if "/cpu_percent" in check_contains:
|
|
tc.add_checker(PsUtil.cpu, "/cpu_percent", header = "")
|
|
|
|
if "/mem_usage" in check_contains:
|
|
tc.add_checker(PsUtil.mem, "/mem_usage")
|
|
|
|
if "/network" in check_contains:
|
|
tc.add_checker(PsUtil.network, "/network")
|
|
######################################################
|
|
tc.add_checker(lhwm.update, "lhwm")
|
|
######################################################
|
|
#tc.add_checker(fps.update, "/fps", sleep_time=0.1)
|
|
tc.start()
|
|
|
|
import serial, traceback
|
|
|
|
def formatLine(line, limit = 20):
|
|
new_line = line
|
|
for k, v in tc.checked_values.items():
|
|
if k in new_line:
|
|
if v is None:
|
|
new_value = "0"
|
|
elif type(v) == str:
|
|
new_value = v
|
|
elif type(v) == float:
|
|
new_value = str(int(v))
|
|
else:
|
|
new_value = str(v)
|
|
|
|
if k in overrides.keys():
|
|
new_line = new_line.replace(k, overrides[k].format(new_value))
|
|
else:
|
|
new_line = new_line.replace(k, new_value)
|
|
return new_line[:limit]
|
|
|
|
def send2com(com_port, string, clear_screen = True):
|
|
if clear_screen:
|
|
com_port.write(bytes("\r", "utf8"))
|
|
|
|
com_port.write(bytes(string, "utf8"))
|
|
|
|
INIT = 0
|
|
WORKING = 1
|
|
TODIE = 2
|
|
CURRENT_STATUS = INIT
|
|
|
|
#import signal
|
|
#def signal_handler(sig, frame):
|
|
# CURRENT_STATUS = TODIE
|
|
#signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
str_to_format = "".join(["{:^"+str(args.max_w)+"}" for i in range(0, args.max_h)])
|
|
print(str_to_format)
|
|
print(args.lines)
|
|
|
|
while 1:
|
|
try:
|
|
with serial.Serial(args.com, args.com_speed) as com_port:
|
|
init_timer = 0.0
|
|
step = 0.1
|
|
while not lhwm.updateDone:
|
|
send2com(com_port, str_to_format.format("Wait LHWM sensors", f"{init_timer} sec..."))
|
|
init_timer += step
|
|
sleep(step)
|
|
else:
|
|
CURRENT_STATUS = WORKING
|
|
|
|
while CURRENT_STATUS != TODIE:
|
|
try:
|
|
send2com(com_port, str_to_format.format(*[formatLine(arg, args.max_w) for arg in args.lines]))
|
|
sleep(args.refresh)
|
|
except KeyboardInterrupt:
|
|
CURRENT_STATUS = TODIE
|
|
print("Ctrl+C to exit")
|
|
|
|
if CURRENT_STATUS == TODIE:
|
|
send2com(com_port, str_to_format.format("Bye...", f":)"))
|
|
exit(0)
|
|
except SystemExit:
|
|
exit(0)
|
|
except:
|
|
print("Critical error")
|
|
traceback.print_exc()
|
|
finally:
|
|
sleep(1)
|
|
|