from mesht_models import PUB_CH from mesht_device import PORTNUMS import sys, os from logger import logger class MeshtasticMessage: def __init__(self, from_radio, myId): self.myId = myId self.packet = from_radio.get("packet", {}) self.decoded = self.packet.get("decoded", {}) if not self.decoded: return def getFrom(self): return self.packet["from"] def getTo(self): return self.packet["to"] def isDm(self): return self.packet["to"] == self.myId def isPublic(self): return self.packet["to"] == PUB_CH def getText(self): try: return self.decoded["payload"].decode() except: return "" def __str__(self): return f"[{self.getFrom()}] -> [{self.getTo()}] [{'PUB' if self.isPublic() else 'DM'}] > {self.getText()}" class BotManager: prefix = "/?" def __init__(self, core): #delaem workdir esli ne delali os.chdir(os.path.dirname(os.path.abspath(__file__))) self.coreService = core self.exts = {} self.extensionLoader(["botExtensions"]) def extensionLoader(self, search_paths = []): if type(search_paths) == str: search_paths = [search_paths] for path in search_paths: logger.info(f"Try found extensions in {path}") if not os.path.exists(path) or not os.path.isdir(path): logger.info(f"Directory is not exists or not directory, skip") continue sys.path.insert(0, path) for extension in os.listdir(path): extension, ext = os.path.splitext(extension) if ext != ".py": continue logger.info(f"Found ext: {extension}") self.exts[f"{path}/{extension}"] = __import__(extension).BotExtension(self) sys.path.pop(0) logger.info(f"Found {self.exts.keys().__len__()} extension") async def handleMessage(self, from_radio): if not self.isTextMessage(from_radio): return if not self.isToMe(from_radio): return msg = MeshtasticMessage(from_radio, self.coreService.devices[0].my_node_id_dec) logger.info(msg) if msg.isPublic() and msg.getText().startswith(self.prefix): await self.processMessage(msg, msg.getText()[len(self.prefix):])#remove prefix elif msg.isDm(): await self.processMessage(msg, msg.getText()) else: #nichego ne delaem pass async def processMessage(self, msg: MeshtasticMessage, msgText: str): if not msgText: return for path, ext in self.exts.items(): if msgText.split()[0].lower() in ext.trigger: response = ext(msg, msgText) return await self.reply(msg, response) async def reply(self, msg: MeshtasticMessage, text: str): if msg.isDm(): return await self.coreService.devices[0].sendMsgToDM(text, msg.getFrom()) elif msg.isPublic(): return await self.coreService.devices[0].sendMsgToChannel(text) else: return def isTextMessage(self, from_radio): return from_radio and from_radio.get("packet", {}).get("decoded", {}).get("portnum", 0) == 1 def isToMe(self, from_radio): return self.coreService.devices[0].my_node_id_dec == from_radio.get("packet", {}).get("to", 0) or from_radio.get("packet", {}).get("to", 0) == PUB_CH