Browse Source

init

master
gsd 6 days ago
commit
a337002501
  1. 2
      .gitignore
  2. 33
      colors.py
  3. 172
      fetchers.py
  4. 236556
      oui.txt
  5. 91
      service.py

2
.gitignore

@ -0,0 +1,2 @@
haSensors.py
__pycache__

33
colors.py

@ -0,0 +1,33 @@
class Colors:
""" ANSI color codes """
BLACK = "\033[0;30m"
RED = "\033[0;31m"
GREEN = "\033[0;32m"
BROWN = "\033[0;33m"
BLUE = "\033[0;34m"
PURPLE = "\033[0;35m"
CYAN = "\033[0;36m"
LIGHT_GRAY = "\033[0;37m"
DARK_GRAY = "\033[1;30m"
LIGHT_RED = "\033[1;31m"
LIGHT_GREEN = "\033[1;32m"
YELLOW = "\033[1;33m"
LIGHT_BLUE = "\033[1;34m"
LIGHT_PURPLE = "\033[1;35m"
LIGHT_CYAN = "\033[1;36m"
LIGHT_WHITE = "\033[1;37m"
BOLD = "\033[1m"
FAINT = "\033[2m"
ITALIC = "\033[3m"
UNDERLINE = "\033[4m"
BLINK = "\033[5m"
NEGATIVE = "\033[7m"
CROSSED = "\033[9m"
END = "\033[0m"
def getColorsDict():
d = dict(vars(Colors))
for k in list(d.keys()):
if k in ['__module__', '__firstlineno__', '__doc__', '__annotations__', '__weakref__', '__dict__', '__static_attributes__']:
del d[k]
return d

172
fetchers.py

@ -0,0 +1,172 @@
import os
import time
from datetime import datetime
import datetime as datetimesrc
import psutil
import subprocess
from colors import Colors, getColorsDict
def pretty_mac(_mac):
try:
mac_split = _mac.split(":")
mac = ""
for ch in mac_split:
if len(ch) == 2:
mac += f"{ch}:"
if len(ch) == 1:
mac += f"0{ch}:"
mac = mac[:-1]
return mac
except:
return _mac
class MacLookup:
prefixes = {}
#https://github.com/bauerj/mac_vendor_lookup/blob/master/mac_vendor_lookup.py
def __init__(self):
self.load_vendors()
pass
@staticmethod
def sanitise(_mac):
mac = _mac.replace(":", "").replace("-", "").replace(".", "").upper()
try:
int(mac, 16)
except ValueError:
raise Exception("{} contains unexpected character".format(_mac))
if len(mac) > 12:
raise Exception("{} is not a valid MAC address (too long)".format(_mac))
return mac
def lookup(self, mac):
try:
mac = self.sanitise(mac)
except:
return f"Unknown MAC ({mac})"
if not self.prefixes:
self.load_vendors()
if type(mac) == str:
mac = mac.encode("utf8")
try:
return self.prefixes[mac[:6]].decode("utf8")
except KeyError:
return f"Unknown MAC ({mac})"
def load_vendors(self):
from pathlib import Path
script_dir = Path(__file__).parent.resolve()
with open(os.path.join(script_dir, "oui.txt"), "rb") as macs:
for line in macs.readlines():
if not line:
break
if b"(base 16)" in line:
prefix, vendor = (i.strip() for i in line.split(b"(base 16)", 1))
self.prefixes[prefix] = vendor
global maclookup
maclookup = MacLookup()
global firstMacLookup
firstMacLookup = {}
def getAvg():
data = os.getloadavg()
return '{0:.2f} {1:.2f} {2:.2f}'.format(*data)
def getDateTime():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def getCpuPercent(colors = True):
if colors:
u = psutil.cpu_percent(interval=1)
color = ""
if u > 0:
color = Colors.GREEN
if u > 33:
color = Colors.YELLOW
if u > 66:
color = Colors.RED
if u > 99:
u = 99
return "{0}{1:3}%{2}".format(color, u, Colors.END)
else:
return "{0:3}%".format(psutil.cpu_percent(interval=1))
def getMemPercent(colors = True):
if colors:
u = psutil.virtual_memory().percent
color = ""
if u > 0:
color = Colors.GREEN
if u > 33:
color = Colors.YELLOW
if u > 66:
color = Colors.RED
if u > 99:
u = 99
return "{0}{1:3}%{2}".format(color, u, Colors.END)
else:
return "{0:3}%".format(psutil.virtual_memory().percent)
def getRowSeparator():
return Colors.NEGATIVE + "".join([" " for i in range(0, 60)]) + Colors.END
def getArp():
global maclookup
if maclookup is None:
maclookup = MacLookup()
global firstMacLookup
#linux | ? (192.168.1.136) at a8:7e:ea:64:85:de [ether] on eth0
#mac | mdns.mcast.net (224.0.0.251) at 1:0:5e:0:0:fb on en0 ifscope permanent [ethernet]
arp_res = subprocess.check_output("arp -a", shell=True, text=True)
result = []
found_macs = []
for line in arp_res.split("\n"):
if line:
host, ip, at, mac, *other = line.split(" ")
mac = pretty_mac(mac)
if mac not in firstMacLookup:
firstMacLookup[mac] = int(time.time())
found_macs.append(mac)
delta = int(time.time()) - firstMacLookup[mac]
result.append({
"host": host,
"ip": ip[1:-1],
"mac": mac,
"mac_name": maclookup.lookup(mac),
"delta": delta,
"mac_first_lookup_utime": str(datetimesrc.timedelta(seconds=delta))
})
unused_macs = [mac for mac in list(firstMacLookup.keys()) if mac not in found_macs]
for mac in unused_macs:
del firstMacLookup[mac]
txt = ""
exists = []
for host in result:
host.update(getColorsDict())
if host["ip"] not in exists:
exists.append(host["ip"])
host["time_color"] = Colors.RED
if host["delta"] > 60 * 60:
host["time_color"] = Colors.YELLOW
if host["delta"] > 60 * 60 * 12:
host["time_color"] = Colors.GREEN
mac_name_len = len(host["mac_name"])
host["mac_name"] = host["mac_name"][:31] + ("..." if mac_name_len > 31 else "")
txt += "{time_color}{mac_first_lookup_utime}{END} - {BLUE}{ip}{END} - {mac_name}\n".format(**host)
return txt[:-1]
if __name__ == "__main__":
print(getArp())

236556
oui.txt

File diff suppressed because it is too large

91
service.py

@ -0,0 +1,91 @@
import argparse
import os
import time
import traceback
from fetchers import *
from colors import getColorsDict
clear = lambda: os.system('clear')
results = {
"row_separator": getRowSeparator(),
"ha_sensors": ""
}
logging = False
results.update(getColorsDict())
def printColors():
res = ""
for k,v in getColorsDict().items():
res += f"{v}{k}"
print(res)
def createDaemon(func, key, sleep = 60):
from threading import Thread
results[key] = ""
def resTo():
while True:
try:
results[key] = func()
if logging:
print(f'[{time.time()}] Save result to {key}')
except:
traceback.print_exc()
finally:
time.sleep(sleep)
t = Thread(target=resTo, name=f"daemon-{key}")
t.daemon = True
t.start()
class Service:
def __init__(self, args):
self.args = args
self.startFetcher()
self.looper()
def startFetcher(self):
createDaemon(getAvg, "sys_avg", 1)
createDaemon(getDateTime, "date_time", 1)
createDaemon(getCpuPercent, "cpu_percent", 1)
createDaemon(getMemPercent, "mem_percent", 1)
createDaemon(getArp, "arp_result", 60)
try:
from haSensors import getHaSensors
createDaemon(getHaSensors, "ha_sensors", 60)
except:
pass
def printer():
self.printer()
createDaemon(printer, "printer", 1)
def looper(self):
try:
while True:
time.sleep(1)
except:
exit(0)
def printer(self):
out = '''
{YELLOW}{date_time:^19}{END} | {CYAN}{sys_avg:^14}{END} | CPU:{cpu_percent:^5} | MEM:{mem_percent:^5}
{row_separator}
{ha_sensors}
{row_separator}
{arp_result}
'''.format(**results)[1:-1]
format_cr = ""
for row in out.split("\n")[:self.args.rows]:
format_cr += f"{row}\n"#[:self.args.cols]
clear()
print(format_cr, end="")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--cols", default=60)
parser.add_argument("--rows", default=40)
args = parser.parse_args()
Service(args)
#printColors()
Loading…
Cancel
Save