from typing import List from pymongo.asynchronous.database import AsyncDatabase from typing import List from entities.PacketGroup import PacketGroup class NodeDbService: def __init__(self, dbStore): self.dbStore:AsyncDatabase = dbStore async def listOfNodes(self): pipeline = [ #ai slooop # Сортировка для каждого num по убыванию ts {"$sort": {"num": 1, "ts": -1}}, # Группировка по num и взятие первого (последнего) документа {"$group": { "_id": "$num", "latest_doc": {"$first": "$$ROOT"} }}, # Замена корневого документа {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Сортировка результата (опционально) {"$sort": {"num": 1}} ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() async def listOfDirectNodes(self): pipeline = [ #ai slooop {"$match": {"hops_away": 0}}, # Фильтруем по списку # Сортировка для каждого num по убыванию ts {"$sort": {"num": 1, "ts": -1}}, # Группировка по num и взятие первого (последнего) документа {"$group": { "_id": "$num", "latest_doc": {"$first": "$$ROOT"} }}, # Замена корневого документа {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Сортировка результата (опционально) {"$sort": {"num": 1}} ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() async def oneNode(self, num: int): collection = self.dbStore['node_info'] c = await collection.find_one( {"num":int(num)}, sort=[("ts", -1)] ) return c async def listOfSelectedNodes(self, nums: List[int]): pipeline = [ {"$match": {"num": {"$in": nums}}}, # Фильтруем по списку {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) { "$group": { "_id": "$num", # Группируем по num "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) } }, {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру {"$sort": {"num": 1}} # Сортируем результат по num ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() async def listOfFindLikeName(self, long_name): pipeline = [ {"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}}}, # Фильтруем по списку {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) { "$group": { "_id": "$num", # Группируем по num "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) } }, {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру {"$sort": {"num": 1}} # Сортируем результат по num ] collection = self.dbStore['node_info'] c = await collection.aggregate(pipeline) return await c.to_list() class PacketDbService: def __init__(self, dbStore): self.dbStore:AsyncDatabase = dbStore async def findPacketsAndGroupCount(self, after: float = -1, before: float = -1, nums: List[int] = [], portnums: List[int] = [], packetsPerNode = False, packetsSumNode = False): pipeline = [] match = {} if after != -1 or before != -1: match["ts"] = {} if after != -1: match["ts"]["$gt"] = after if before != -1: match["ts"]["$lt"] = before if nums: if type(nums) != list: nums = [nums] match["from"] = {"$in":nums} if portnums: if type(portnums) != list: portnums = [portnums] match["portnums"] = {"$in":portnums} if match: pipeline.append({"$match":match}) #групировка по количеству на выхлопе _id номер портнума и count число его юзов groupPipe = { "$group": {"_id": "$portnum", "count": {"$sum": 1}} } if packetsPerNode: groupPipe["$group"]["_id"] = { "portnum": "$portnum", "from": "$from" } if packetsSumNode: groupPipe["$group"]["_id"] = "$from" print(packetsPerNode, packetsSumNode) pipeline.append(groupPipe) print(pipeline) ###print(pipeline) collection = self.dbStore['packet'] c = await collection.aggregate(pipeline) l = await c.to_list() print(l[0]) return [PacketGroup(p, packetsPerNode, packetsSumNode) for p in l] class DbService(NodeDbService, PacketDbService): def __init__(self, dbStore): NodeDbService.__init__(self, dbStore) PacketDbService.__init__(self, dbStore)