from typing import List from pymongo.asynchronous.database import AsyncDatabase from typing import List from entities.PacketGroup import PacketGroup from entities.PacketSignal import PacketSignal from pymongo import DESCENDING class MultiDeviceDbSupport: def __init__(self, dbStore, core): self.dbStore:AsyncDatabase = dbStore self.core = core def device2Hash(self, devices_hashes = []): if devices_hashes == None: devices_hashes = [self.core.defaultDeviceUUIDHash] elif (type(devices_hashes) == list): pass else: devices_hashes = [devices_hashes] if devices_hashes.__len__() == 0: devices_hashes = [self.core.defaultDeviceUUIDHash] return devices_hashes #return {"$or": [{"device_uuid": value}]} def deviceHash2Match(self, devices_hashes = []): devices_hashes = self.device2Hash(devices_hashes) #print(devices_hashes) orPart = [] for hash in devices_hashes: uuid = self.core.devicesUuidHashToUuid.get(hash, "") if uuid: if uuid == self.core.defaultDeviceUUID: orPart.append({"device_uuid": None}) orPart.append({"device_uuid": uuid}) return orPart#-->{"$or": orPart} def devicesHash2UuidList(self, devices_hashes = []): devices_hashes = self.device2Hash(devices_hashes) uuids = [] for hash in devices_hashes: uuid = self.core.devicesUuidHashToUuid.get(hash, "") if uuid: uuids.append(uuid) return uuids def devicesHash2NumList(self, devices_hashes = []): uuids = self.devicesHash2UuidList(devices_hashes) nums = [] for device in self.core.devices: if device.nid: nums.append(device.nid) return nums class MessageDbService(MultiDeviceDbSupport): MESSAGE_PORTNUM = 1 def __init__(self, dbStore, core): super().__init__(dbStore, core) async def listOfMessage(self, limit:int, offset:int, after: float, before: float, devices = []): collection = self.dbStore['packet'] payload = { "to": int(self.core.PUB_CH), "portnum":self.MESSAGE_PORTNUM, "ts":{'$gt': after, "$lt": before}, "$or": self.deviceHash2Match(devices) } c = collection.find(payload).sort("ts", DESCENDING).skip(offset).limit(limit) return await c.to_list() class PacketDbService(MultiDeviceDbSupport): def __init__(self, dbStore, core): super().__init__(dbStore, core) async def findLastPackets(self, nums: List[int] = [], portnums: List[int] = [], devices = []): pipeline = [] match = {"$or": self.deviceHash2Match(devices)} if nums: match["from"] = {"$in": nums} if portnums: match["portnum"] = {"$in": portnums} if match: pipeline += [{"$match":match}] pipeline += [ {"$sort": {"from": 1, "ts": -1}}, {"$group": { "_id": "$from", "latest_doc": {"$first": "$$ROOT"} }}, {"$replaceRoot": {"newRoot": "$latest_doc"}}, {"$sort": {"from": 1}} ] collection = self.dbStore['packet'] c = await collection.aggregate(pipeline) return await c.to_list() async def findPacketsSignals(self, after: float = -1, before: float = -1, nums: List[int] = [], devices = [], ignore_host = False): pipeline = [] match = {"$or": self.deviceHash2Match(devices)} if after != -1 or before != -1: match["ts"] = {} if after != -1: match["ts"]["$gt"] = after if before != -1: match["ts"]["$lt"] = before if nums: if type(nums) != list: nums = [nums] match["from"] = {"$in":nums} if ignore_host and "from" not in match: match["from"] = {"$nin": self.devicesHash2NumList(devices)} pipeline.append({"$match":match}) collection = self.dbStore['packet'] c = await collection.aggregate(pipeline) l = await c.to_list() return [PacketSignal(p) for p in l] async def findPacketsAndGroupCount(self, after: float = -1, before: float = -1, nums: List[int] = [], portnums: List[int] = [], packetsPerNode = False, packetsSumNode = False, devices = [], ignore_host = False): pipeline = [] match = {"$or": self.deviceHash2Match(devices)} if after != -1 or before != -1: match["ts"] = {} if after != -1: match["ts"]["$gt"] = after if before != -1: match["ts"]["$lt"] = before if nums: if type(nums) != list: nums = [nums] match["from"] = {"$in":nums} if ignore_host and "from" not in match: match["from"] = {"$nin": self.devicesHash2NumList(devices)} if portnums: if type(portnums) != list: portnums = [portnums] match["portnums"] = {"$in":portnums} if match: pipeline.append({"$match":match}) #групировка по количеству на выхлопе _id номер портнума и count число его юзов groupPipe = { "$group": {"_id": "$portnum", "count": {"$sum": 1}} } if packetsPerNode: groupPipe["$group"]["_id"] = { "portnum": "$portnum", "from": "$from" } if packetsSumNode: groupPipe["$group"]["_id"] = "$from" pipeline.append(groupPipe) ###print(pipeline) collection = self.dbStore['packet'] c = await collection.aggregate(pipeline) l = await c.to_list() return [PacketGroup(p, packetsPerNode, packetsSumNode) for p in l] class NodeDbService(PacketDbService, MultiDeviceDbSupport): def __init__(self, dbStore, core): PacketDbService.__init__(self, dbStore, core) MultiDeviceDbSupport.__init__(self, dbStore, core) async def fillNodeFromPacketCollection(self, l, devices = []): nums = [node.get("num", 0) for node in l] #ищем сначала ноды с позицией 3 портнум packet_pos_nodes = await self.findLastPackets(nums, [3], devices)#### packet_pos_nodes_dict = {}### for packet in packet_pos_nodes: packet_pos_nodes_dict[packet.get("from")] = packet nums_with_pos = [node.get("from", 0) for node in packet_pos_nodes] #смотрим какие пакеты от нод имеют позицию, и делаем список нод где нет ее nums_without_pos = [fr0m for fr0m in nums if fr0m not in nums_with_pos] packet_without_pos = await self.findLastPackets(nums_without_pos, [], devices)#### packet_without_pos_dict = {} for packet in packet_without_pos: packet_without_pos_dict[packet.get("from")] = packet #суем свежачок new_nodes_list = [] def fillNodeBaseData(node, d1ct): ## base packet if d1ct[node["num"]].get("rx_snr", None) != None: node["snr"] = d1ct[node["num"]]["rx_snr"] if d1ct[node["num"]].get("rx_rssi", None) != None: node["rssi"] = d1ct[node["num"]]["rx_rssi"] if d1ct[node["num"]].get("ts", None) != None: node["ts"] = d1ct[node["num"]]["ts"] if d1ct[node["num"]].get("hop_start", None) != None and d1ct[node["num"]].get("hop_limit", None) != None: node["hops_away"] = d1ct[node["num"]].get("hop_start", 0) - d1ct[node["num"]].get("hop_limit", 0) if d1ct[node["num"]].get("portnum", None) != None: node["last_portnum"] = d1ct[node["num"]].get("portnum", None) return node for node in l: node["view_in_packets"] = True if node.get("num", 0) in packet_pos_nodes_dict.keys(): if packet_pos_nodes_dict[node["num"]].get("decoded_payload_object", {}): node["position"] = packet_pos_nodes_dict[node["num"]]["decoded_payload_object"] node = fillNodeBaseData(node, packet_pos_nodes_dict) elif node.get("num", 0) in packet_without_pos_dict.keys(): node = fillNodeBaseData(node, packet_without_pos_dict) else: node["view_in_packets"] = False new_nodes_list.append(node) return new_nodes_list async def listOfNodes(self, devices = []): pipeline = [ #ai slooop {"$match": {"$or": self.deviceHash2Match(devices)}}, # Сортировка для каждого num по убыванию ts {"$sort": {"num": 1, "ts": -1}}, # Группировка по num и взятие первого (последнего) документа {"$group": { "_id": "$num", "latest_doc": {"$first": "$$ROOT"} }}, # Замена корневого документа {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Сортировка результата (опционально) {"$sort": {"num": 1}} ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) l = await c.to_list() return await self.fillNodeFromPacketCollection(l, devices) async def listOfDirectNodes(self, devices = []): pipeline = [ #ai slooop {"$match": {"hops_away": 0, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку # Сортировка для каждого num по убыванию ts {"$sort": {"num": 1, "ts": -1}}, # Группировка по num и взятие первого (последнего) документа {"$group": { "_id": "$num", "latest_doc": {"$first": "$$ROOT"} }}, # Замена корневого документа {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Сортировка результата (опционально) {"$sort": {"num": 1}} ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) l = await c.to_list() return await self.fillNodeFromPacketCollection(l, devices) async def oneNode(self, num: int, devices = []): collection = self.dbStore['node_info'] c = await collection.find_one( {"num":int(num), "$or": self.deviceHash2Match(devices)}, sort=[("ts", -1)] ) l = await self.fillNodeFromPacketCollection([c], devices) return l[0] if l.__len__() > 0 else None async def listOfSelectedNodes(self, nums: List[int], devices = []): pipeline = [ {"$match": {"num": {"$in": nums}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) { "$group": { "_id": "$num", # Группируем по num "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) } }, {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру {"$sort": {"num": 1}} # Сортируем результат по num ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) l = await c.to_list() return await self.fillNodeFromPacketCollection(l, devices) async def listOfFindLikeName(self, long_name, devices = []): pipeline = [ {"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) { "$group": { "_id": "$num", # Группируем по num "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) } }, {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру {"$sort": {"num": 1}} # Сортируем результат по num ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() #no need fill, just search class DbService(NodeDbService, PacketDbService, MessageDbService): def __init__(self, dbStore, core): NodeDbService.__init__(self, dbStore, core) PacketDbService.__init__(self, dbStore, core) MessageDbService.__init__(self, dbStore, core)