commit
6282692f4f
6 changed files with 353 additions and 0 deletions
@ -0,0 +1,15 @@ |
|||
*.bat |
|||
dist/ |
|||
__pycache__/ |
|||
output/ |
|||
build/ |
|||
vdf_client.py |
|||
vdf_client_v2.py |
|||
*.spec |
|||
|
|||
#fps_inspector |
|||
fps_inspector.py |
|||
PresentMon.dll |
|||
|
|||
#OpenHW Monitor |
|||
OpenHardwareMonitorLib.dll |
Binary file not shown.
Binary file not shown.
@ -0,0 +1,21 @@ |
|||
<h1>LcdHardwareMonitor / External LCD</h1> |
|||
<p>Send funny info into com port. Support lcd with any sizes (tested on 20x2)</p> |
|||
|
|||
<h2>Features</h2> |
|||
<li>All check in threads, not need wait before update lcd, set any refresh rate</li> |
|||
<li>Use LibreHardwareMonitor to get sensors data (need administrator run to show all)</li> |
|||
<li>Format lines wia not code, use: --lines</li> |
|||
<li>Formating sensors values see: --format-overrides</li> |
|||
<li>Customize lcd size, use: --max-w --max-h</li> |
|||
<li>Contains default sensors wia psutil and datetime: /time /date /cpu_percent /mem_usage /network</li> |
|||
|
|||
<h2>Use</h2> |
|||
<pre>1) Install deps from requirements.txt |
|||
2) To show all windows pc sensors execute with python vdf_client_threaded.py --lhwm-discover |
|||
3) Example run: python vdf_client_threaded.py --com COM6 --com-speed 9600 --lines "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage" --lines "/time /date /network" --max-h 2 --max-w 20 --format-overrides "/gpu-nvidia/0/load/0,{:>2}" --format-overrides "/gpu-nvidia/0/temperature/2,{:>2}" --refresh 1</pre> |
|||
|
|||
<h5>p.s</h5> |
|||
<pre>Maybe need install .NET Framework see <a>https://github.com/LibreHardwareMonitor/LibreHardwareMonitor</a> |
|||
Tested on Python 3.11 x64 Win10 |
|||
|
|||
Download LibreHardwareMonitorLib.dll, HidSharp.dll manualy if u not trust</pre> |
@ -0,0 +1,3 @@ |
|||
psutil |
|||
pyserial |
|||
pythonnet |
@ -0,0 +1,314 @@ |
|||
from datetime import datetime |
|||
import psutil |
|||
from time import sleep |
|||
import traceback |
|||
from threading import Thread |
|||
|
|||
def bytes2human(size, header = "", buf = 1000, txt = ""): |
|||
if size < buf: |
|||
return "{}{:>3}b{}".format(header, int(size), txt) |
|||
|
|||
size = size / buf |
|||
if size < buf: |
|||
return "{}{:>3}K{}".format(header, int(size), txt) |
|||
|
|||
size = size / buf |
|||
if size < buf: |
|||
return "{}{:>3}M{}".format(header, int(size), txt) |
|||
|
|||
size = size / buf |
|||
if size < buf: |
|||
return "{}{:>3}G{}".format(header, int(size), txt) |
|||
|
|||
size = size / buf |
|||
if size < buf: |
|||
return "{}{:>3}T{}".format(header, int(size), txt) |
|||
|
|||
return "BIG" |
|||
|
|||
class Datetime: |
|||
def __init__(self, blinkig_seconds = True): |
|||
self.blinkig_seconds = blinkig_seconds |
|||
self.char_seconds = ":" |
|||
self.iterrate_seconds = False |
|||
|
|||
def time(self): |
|||
self.iterrate_seconds = not self.iterrate_seconds |
|||
return "{:5}".format(datetime.now().strftime("%H"+(self.char_seconds if not self.blinkig_seconds else " " if self.iterrate_seconds else self.char_seconds) +"%M")) |
|||
|
|||
def date(self): |
|||
return "{:4}".format(datetime.now().strftime("%d%m")) |
|||
|
|||
class PsUtil: |
|||
@staticmethod |
|||
def cpu(header = "", tail = "%"): |
|||
c = int(psutil.cpu_percent()) |
|||
return "{}{:>2}{}".format(header, c if c < 100 else 99, tail) |
|||
|
|||
@staticmethod |
|||
def mem(header = "", tail = "", round_how_much = 1): |
|||
return "{}{:>4}{}".format(header, round(psutil.virtual_memory()[3]/1000000000, round_how_much), tail) |
|||
|
|||
@staticmethod |
|||
def network(sleep_time = 1.0, eth = "Ethernet"): |
|||
first = psutil.net_io_counters(pernic=True) |
|||
sleep(sleep_time) |
|||
second = psutil.net_io_counters(pernic=True) |
|||
|
|||
downloaded = 0 |
|||
uploaded = 0 |
|||
|
|||
try: |
|||
uploaded = (second[eth].bytes_sent - first[eth].bytes_sent) * 1 / sleep_time |
|||
downloaded = (second[eth].bytes_recv - first[eth].bytes_recv) * 1 / sleep_time |
|||
except: |
|||
traceback.print_exc() |
|||
return "" |
|||
|
|||
return f"{bytes2human(uploaded, '')} {bytes2human(downloaded, '')}" |
|||
|
|||
class LibreHardWareMonitor: |
|||
def __init__(self) -> None: |
|||
import clr |
|||
import os |
|||
clr.AddReference(os.path.join(os.path.dirname(os.path.abspath(__file__)), "LibreHardwareMonitorLib.dll")) |
|||
self.buildPc() |
|||
self.data = {} |
|||
self.updateDone = False |
|||
|
|||
def buildPc(self): |
|||
from LibreHardwareMonitor.Hardware import Computer |
|||
self.c = Computer() |
|||
self.c.IsBatteryEnabled = True |
|||
self.c.IsControllerEnabled = True |
|||
self.c.IsCpuEnabled = True |
|||
self.c.IsGpuEnabled = True |
|||
self.c.IsMemoryEnabled = True |
|||
self.c.IsPsuEnabled = True |
|||
self.c.Open() |
|||
|
|||
def discoverPc(self): |
|||
for hw in self.c.Hardware: |
|||
print("hw:", hw.Identifier, hw.Name) |
|||
for shw in hw.SubHardware: |
|||
print("shw:", shw.Identifier, shw.Name) |
|||
for sen in shw.Sensors: |
|||
print("sensor:", sen.Identifier, sen.Name, sen.Value) |
|||
for sen in hw.Sensors: |
|||
print("sensor:", sen.Identifier, sen.Name, sen.Value) |
|||
|
|||
def update(self): |
|||
for hw in self.c.Hardware: |
|||
for shw in hw.SubHardware: |
|||
for sen in shw.Sensors: |
|||
self.data[str(sen.Identifier)] = sen.Value |
|||
|
|||
for sen in hw.Sensors: |
|||
self.data[str(sen.Identifier)] = sen.Value |
|||
hw.Update() |
|||
self.updateDone = True |
|||
return self.data |
|||
|
|||
def avg_from_dict(key, dic): |
|||
sum = 0.0 |
|||
count = 0 |
|||
for k, v in dic.items(): |
|||
if key in k: |
|||
sum += v |
|||
count+=1 |
|||
|
|||
""" |
|||
from fps_inspector import start_fliprate_recording, get_fliprate_count, get_last_fliprates |
|||
class FPSCounter: |
|||
def __init__(self, format = "{:>3}") -> None: |
|||
self.listen = False |
|||
self.prev_value = 0 |
|||
self.format = format |
|||
|
|||
def update(self): |
|||
if not self.listen: |
|||
self.listen = True |
|||
start_fliprate_recording() |
|||
return -1 |
|||
else: |
|||
count = get_fliprate_count() |
|||
r_count = count - self.prev_value |
|||
self.prev_value = count |
|||
return self.format.format(r_count) |
|||
""" |
|||
|
|||
class ThreadedCollector: |
|||
checked_values = {} |
|||
checkers = [] |
|||
|
|||
def add_checker(self, func, name, sleep_time = 1, *args, **kwargs): |
|||
def updater(*args, **kwargs): |
|||
while 1: |
|||
try: |
|||
#print(args, kwargs) |
|||
val = func(*args, **kwargs) |
|||
|
|||
if type(val) == dict: |
|||
self.checked_values.update(val) |
|||
else: |
|||
self.checked_values[name] = val |
|||
except Exception as te: |
|||
traceback.print_exc() |
|||
print(f"Cannot update: {name}, err: {te}") |
|||
finally: |
|||
sleep(sleep_time) |
|||
|
|||
self.checkers.append(Thread(target=updater, name=name, daemon=True)) |
|||
|
|||
def start(self): |
|||
for t in self.checkers: |
|||
t.start() |
|||
|
|||
def printValues(self, grab = []): |
|||
for k,v in self.checked_values.items(): |
|||
print(k,v) |
|||
|
|||
if __name__ == "__main__": |
|||
import argparse |
|||
parser = argparse.ArgumentParser() |
|||
|
|||
parser.add_argument("--com", type=str, help="windows: COM1, lin: ttyusb0") |
|||
parser.add_argument("--com-speed", type=int, help="9600, ... etc") |
|||
|
|||
parser.add_argument("--max-w", type=int, default=20, help = "max charest in line") |
|||
parser.add_argument("--max-h", type=int, default=2, help = "max lines") |
|||
parser.add_argument("--lines", type=str, nargs="+", help='format lines, example: --lines "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage" --lines "/time /date /network"') |
|||
|
|||
parser.add_argument("--refresh", type=int, default=1, help = "Update display every N-seconds") |
|||
parser.add_argument("--dev", action="store_true", default=False, help="Enable dev mode") |
|||
parser.add_argument("--lhwm-discover", action="store_true", default=False, help="Enable dev mode", help="show found sensors with names") |
|||
parser.add_argument("--format-overrides", nargs="+", help='formated text with python str.format, example: --format-overrides "/gpu-nvidia/0/load/0,{:>2}" --format-overrides "/gpu-nvidia/0/temperature/2,{:>2}"') |
|||
|
|||
args = parser.parse_args() |
|||
if args.dev: |
|||
args.com = "COM6" if not args.com else args.com |
|||
args.com_speed = 9600 if not args.com_speed else args.com_speed |
|||
args.format_overrides = ["/gpu-nvidia/0/load/0,{:>2}", "/intelcpu/0/temperature/0,{:>2}", "/gpu-nvidia/0/temperature/2,{:>2}", "/intelcpu/0/load/0,{:>2}"] if not args.format_overrides else args.format_overrides |
|||
|
|||
|
|||
first_line = "/intelcpu/0/load/0% /intelcpu/0/temperature/0C /mem_usage /gpu-nvidia/0/load/0% /gpu-nvidia/0/temperature/2C" |
|||
second_line = "/time /date /network" |
|||
args.lines = [first_line, second_line] |
|||
|
|||
for line in args.lines: |
|||
print(line) |
|||
|
|||
overrides = {} |
|||
for override in args.format_overrides: |
|||
try: |
|||
k, v = override.split(",") |
|||
overrides[k] = v |
|||
print(f"Found override {k} to {v}") |
|||
except: |
|||
print(f"Cannot parse override {override}") |
|||
exit(1) |
|||
|
|||
|
|||
lhwm = LibreHardWareMonitor() |
|||
lhwm.discoverPc() |
|||
if args.lhwm_discover: |
|||
exit(0) |
|||
|
|||
#fps = FPSCounter() |
|||
|
|||
datetime_cls = Datetime() |
|||
###################################################### |
|||
tc = ThreadedCollector() |
|||
|
|||
check_contains = "\n".join(args.lines) |
|||
if "/time" in check_contains: |
|||
tc.add_checker(datetime_cls.time, "/time") |
|||
|
|||
if "/date" in check_contains: |
|||
tc.add_checker(datetime_cls.date, "/date") |
|||
###################################################### |
|||
if "/cpu_percent" in check_contains: |
|||
tc.add_checker(PsUtil.cpu, "/cpu_percent", header = "") |
|||
|
|||
if "/mem_usage" in check_contains: |
|||
tc.add_checker(PsUtil.mem, "/mem_usage") |
|||
|
|||
if "/network" in check_contains: |
|||
tc.add_checker(PsUtil.network, "/network") |
|||
###################################################### |
|||
tc.add_checker(lhwm.update, "lhwm") |
|||
###################################################### |
|||
#tc.add_checker(fps.update, "/fps", sleep_time=0.1) |
|||
tc.start() |
|||
|
|||
import serial, traceback |
|||
|
|||
def formatLine(line, limit = 20): |
|||
new_line = line |
|||
for k, v in tc.checked_values.items(): |
|||
if k in new_line: |
|||
if v is None: |
|||
new_value = "0" |
|||
elif type(v) == str: |
|||
new_value = v |
|||
elif type(v) == float: |
|||
new_value = str(int(v)) |
|||
else: |
|||
new_value = str(v) |
|||
|
|||
if k in overrides.keys(): |
|||
new_line = new_line.replace(k, overrides[k].format(new_value)) |
|||
else: |
|||
new_line = new_line.replace(k, new_value) |
|||
return new_line[:limit] |
|||
|
|||
def send2com(com_port, string, clear_screen = True): |
|||
if clear_screen: |
|||
com_port.write(bytes("\r", "utf8")) |
|||
|
|||
com_port.write(bytes(string, "utf8")) |
|||
|
|||
INIT = 0 |
|||
WORKING = 1 |
|||
TODIE = 2 |
|||
CURRENT_STATUS = INIT |
|||
|
|||
#import signal |
|||
#def signal_handler(sig, frame): |
|||
# CURRENT_STATUS = TODIE |
|||
#signal.signal(signal.SIGINT, signal_handler) |
|||
|
|||
str_to_format = "".join(["{:^"+str(args.max_w)+"}" for i in range(0, args.max_h)]) |
|||
print(str_to_format) |
|||
print(args.lines) |
|||
|
|||
while 1: |
|||
try: |
|||
with serial.Serial(args.com, args.com_speed) as com_port: |
|||
init_timer = 0.0 |
|||
step = 0.1 |
|||
while not lhwm.updateDone: |
|||
send2com(com_port, str_to_format.format("Wait LHWM sensors", f"{init_timer} sec...")) |
|||
init_timer += step |
|||
sleep(step) |
|||
else: |
|||
CURRENT_STATUS = WORKING |
|||
|
|||
while CURRENT_STATUS != TODIE: |
|||
try: |
|||
send2com(com_port, str_to_format.format(*[formatLine(arg, args.max_w) for arg in args.lines])) |
|||
sleep(args.refresh) |
|||
except KeyboardInterrupt: |
|||
CURRENT_STATUS = TODIE |
|||
print("Ctrl+C to exit") |
|||
|
|||
if CURRENT_STATUS == TODIE: |
|||
send2com(com_port, str_to_format.format("Bye...", f":)")) |
|||
exit(0) |
|||
except SystemExit: |
|||
exit(0) |
|||
except: |
|||
print("Critical error") |
|||
traceback.print_exc() |
|||
finally: |
|||
sleep(1) |
Loading…
Reference in new issue