from typing import List from pymongo.asynchronous.database import AsyncDatabase from typing import List from entities.PacketGroup import PacketGroup from entities.PacketSignal import PacketSignal from pymongo import DESCENDING class MultiDeviceDbSupport: def __init__(self, dbStore, core): self.dbStore:AsyncDatabase = dbStore self.core = core #return {"$or": [{"device_uuid": value}]} def deviceHash2Match(self, devices_hashes = []): if devices_hashes == None: devices_hashes = [self.core.defaultDeviceUUID] elif (type(devices_hashes) == list): if devices_hashes.__len__() == 0: devices_hashes = [self.core.defaultDeviceUUID] else: devices_hashes = [devices_hashes] orPart = [] for hash in devices_hashes: uuid = self.core.devicesUuidHashToUuid.get(hash, "") if uuid: if uuid == self.core.defaultDeviceUUID: orPart.append({"device_uuid", None}) orPart.append({"device_uuid": uuid}) return orPart#-->{"$or": orPart} class MessageDbService(MultiDeviceDbSupport): def __init__(self, dbStore, core): super().__init__(dbStore, core) async def listOfMessage(self, limit:int, offset:int, after: float, before: float, devices = []): collection = self.dbStore['packet'] payload = { "to": int(self.core.PUB_CH), "portnum":self.MESSAGE_PORTNUM, "ts":{'$gt': after, "$lt": before}, "$or": self.deviceHash2Match(devices) } c = collection.find(payload).sort("ts", DESCENDING).skip(offset).limit(limit) return await c.to_list() class NodeDbService(MultiDeviceDbSupport): def __init__(self, dbStore, core): super().__init__(dbStore, core) async def listOfNodes(self, devices = []): pipeline = [ #ai slooop {"$match": {"$or": self.deviceHash2Match(devices)}}, # Сортировка для каждого num по убыванию ts {"$sort": {"num": 1, "ts": -1}}, # Группировка по num и взятие первого (последнего) документа {"$group": { "_id": "$num", "latest_doc": {"$first": "$$ROOT"} }}, # Замена корневого документа {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Сортировка результата (опционально) {"$sort": {"num": 1}} ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() async def listOfDirectNodes(self, devices = []): pipeline = [ #ai slooop {"$match": {"hops_away": 0, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку # Сортировка для каждого num по убыванию ts {"$sort": {"num": 1, "ts": -1}}, # Группировка по num и взятие первого (последнего) документа {"$group": { "_id": "$num", "latest_doc": {"$first": "$$ROOT"} }}, # Замена корневого документа {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Сортировка результата (опционально) {"$sort": {"num": 1}} ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() async def oneNode(self, num: int, devices = []): collection = self.dbStore['node_info'] c = await collection.find_one( {"num":int(num), "$or": self.deviceHash2Match(devices)}, sort=[("ts", -1)] ) return c async def listOfSelectedNodes(self, nums: List[int], devices = []): pipeline = [ {"$match": {"num": {"$in": nums}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) { "$group": { "_id": "$num", # Группируем по num "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) } }, {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру {"$sort": {"num": 1}} # Сортируем результат по num ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() async def listOfFindLikeName(self, long_name, devices = []): pipeline = [ {"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) { "$group": { "_id": "$num", # Группируем по num "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) } }, {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру {"$sort": {"num": 1}} # Сортируем результат по num ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() class PacketDbService(MultiDeviceDbSupport): def __init__(self, dbStore, core): super().__init__(dbStore, core) async def findPacketsSignals(self, after: float = -1, before: float = -1, nums: List[int] = [], devices = []): pipeline = [] match = {"$or": self.deviceHash2Match(devices)} if after != -1 or before != -1: match["ts"] = {} if after != -1: match["ts"]["$gt"] = after if before != -1: match["ts"]["$lt"] = before if nums: if type(nums) != list: nums = [nums] match["from"] = {"$in":nums} pipeline.append({"$match":match}) collection = self.dbStore['packet'] c = await collection.aggregate(pipeline) l = await c.to_list() return [PacketSignal(p) for p in l] async def findPacketsAndGroupCount(self, after: float = -1, before: float = -1, nums: List[int] = [], portnums: List[int] = [], packetsPerNode = False, packetsSumNode = False, devices = []): pipeline = [] match = {"$or": self.deviceHash2Match(devices)} if after != -1 or before != -1: match["ts"] = {} if after != -1: match["ts"]["$gt"] = after if before != -1: match["ts"]["$lt"] = before if nums: if type(nums) != list: nums = [nums] match["from"] = {"$in":nums} if portnums: if type(portnums) != list: portnums = [portnums] match["portnums"] = {"$in":portnums} if match: pipeline.append({"$match":match}) #групировка по количеству на выхлопе _id номер портнума и count число его юзов groupPipe = { "$group": {"_id": "$portnum", "count": {"$sum": 1}} } if packetsPerNode: groupPipe["$group"]["_id"] = { "portnum": "$portnum", "from": "$from" } if packetsSumNode: groupPipe["$group"]["_id"] = "$from" pipeline.append(groupPipe) ###print(pipeline) collection = self.dbStore['packet'] c = await collection.aggregate(pipeline) l = await c.to_list() return [PacketGroup(p, packetsPerNode, packetsSumNode) for p in l] class DbService(NodeDbService, PacketDbService, MessageDbService): def __init__(self, dbStore, core): NodeDbService.__init__(self, dbStore, core) PacketDbService.__init__(self, dbStore, core) MessageDbService.__init__(self, dbStore, core)