You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

177 lines
6.7 KiB

from typing import List
from pymongo.asynchronous.database import AsyncDatabase
from typing import List
from entities.PacketGroup import PacketGroup
from entities.PacketSignal import PacketSignal
class NodeDbService:
def __init__(self, dbStore):
self.dbStore:AsyncDatabase = dbStore
async def listOfNodes(self):
pipeline = [ #ai slooop
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def listOfDirectNodes(self):
pipeline = [ #ai slooop
{"$match": {"hops_away": 0}}, # Фильтруем по списку
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def oneNode(self, num: int):
collection = self.dbStore['node_info']
c = await collection.find_one(
{"num":int(num)},
sort=[("ts", -1)]
)
return c
async def listOfSelectedNodes(self, nums: List[int]):
pipeline = [
{"$match": {"num": {"$in": nums}}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def listOfFindLikeName(self, long_name):
pipeline = [
{"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
class PacketDbService:
def __init__(self, dbStore):
self.dbStore:AsyncDatabase = dbStore
async def findPacketsSignals(self,
after: float = -1,
before: float = -1,
nums: List[int] = []):
pipeline = []
match = {}
if after != -1 or before != -1:
match["ts"] = {}
if after != -1:
match["ts"]["$gt"] = after
if before != -1:
match["ts"]["$lt"] = before
if nums:
if type(nums) != list:
nums = [nums]
match["from"] = {"$in":nums}
pipeline.append({"$match":match})
collection = self.dbStore['packet']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [PacketSignal(p) for p in l]
async def findPacketsAndGroupCount(self,
after: float = -1,
before: float = -1,
nums: List[int] = [],
portnums: List[int] = [],
packetsPerNode = False,
packetsSumNode = False):
pipeline = []
match = {}
if after != -1 or before != -1:
match["ts"] = {}
if after != -1:
match["ts"]["$gt"] = after
if before != -1:
match["ts"]["$lt"] = before
if nums:
if type(nums) != list:
nums = [nums]
match["from"] = {"$in":nums}
if portnums:
if type(portnums) != list:
portnums = [portnums]
match["portnums"] = {"$in":portnums}
if match:
pipeline.append({"$match":match})
#групировка по количеству на выхлопе _id номер портнума и count число его юзов
groupPipe = {
"$group":
{"_id": "$portnum", "count": {"$sum": 1}}
}
if packetsPerNode:
groupPipe["$group"]["_id"] = {
"portnum": "$portnum",
"from": "$from"
}
if packetsSumNode:
groupPipe["$group"]["_id"] = "$from"
pipeline.append(groupPipe)
###print(pipeline)
collection = self.dbStore['packet']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [PacketGroup(p, packetsPerNode, packetsSumNode) for p in l]
class DbService(NodeDbService, PacketDbService):
def __init__(self, dbStore):
NodeDbService.__init__(self, dbStore)
PacketDbService.__init__(self, dbStore)