You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

257 lines
10 KiB

from typing import List
from pymongo.asynchronous.database import AsyncDatabase
from typing import List
from entities.PacketGroup import PacketGroup
from entities.PacketSignal import PacketSignal
from pymongo import DESCENDING
class MultiDeviceDbSupport:
def __init__(self, dbStore, core):
self.dbStore:AsyncDatabase = dbStore
self.core = core
def device2Hash(self, devices_hashes = []):
if devices_hashes == None:
devices_hashes = [self.core.defaultDeviceUUIDHash]
elif (type(devices_hashes) == list):
pass
else:
devices_hashes = [devices_hashes]
if devices_hashes.__len__() == 0:
devices_hashes = [self.core.defaultDeviceUUIDHash]
return devices_hashes
#return {"$or": [{"device_uuid": value}]}
def deviceHash2Match(self, devices_hashes = []):
devices_hashes = self.device2Hash(devices_hashes)
#print(devices_hashes)
orPart = []
for hash in devices_hashes:
uuid = self.core.devicesUuidHashToUuid.get(hash, "")
if uuid:
if uuid == self.core.defaultDeviceUUID:
orPart.append({"device_uuid": None})
orPart.append({"device_uuid": uuid})
return orPart#-->{"$or": orPart}
def devicesHash2UuidList(self, devices_hashes = []):
devices_hashes = self.device2Hash(devices_hashes)
uuids = []
for hash in devices_hashes:
uuid = self.core.devicesUuidHashToUuid.get(hash, "")
if uuid:
uuids.append(uuid)
return uuids
def devicesHash2NumList(self, devices_hashes = []):
uuids = self.devicesHash2UuidList(devices_hashes)
nums = []
for device in self.core.devices:
if device.nid:
nums.append(device.nid)
return nums
class MessageDbService(MultiDeviceDbSupport):
MESSAGE_PORTNUM = 1
def __init__(self, dbStore, core):
super().__init__(dbStore, core)
async def listOfMessage(self, limit:int, offset:int, after: float, before: float, devices = []):
collection = self.dbStore['packet']
payload = {
"to": int(self.core.PUB_CH),
"portnum":self.MESSAGE_PORTNUM,
"ts":{'$gt': after, "$lt": before},
"$or": self.deviceHash2Match(devices)
}
c = collection.find(payload).sort("ts", DESCENDING).skip(offset).limit(limit)
return await c.to_list()
class NodeDbService(MultiDeviceDbSupport):
def __init__(self, dbStore, core):
super().__init__(dbStore, core)
async def listOfNodes(self, devices = []):
pipeline = [ #ai slooop
{"$match": {"$or": self.deviceHash2Match(devices)}},
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def listOfDirectNodes(self, devices = []):
pipeline = [ #ai slooop
{"$match": {"hops_away": 0, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def oneNode(self, num: int, devices = []):
collection = self.dbStore['node_info']
c = await collection.find_one(
{"num":int(num), "$or": self.deviceHash2Match(devices)},
sort=[("ts", -1)]
)
return c
async def listOfSelectedNodes(self, nums: List[int], devices = []):
pipeline = [
{"$match": {"num": {"$in": nums}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def listOfFindLikeName(self, long_name, devices = []):
pipeline = [
{"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
class PacketDbService(MultiDeviceDbSupport):
def __init__(self, dbStore, core):
super().__init__(dbStore, core)
async def findPacketsSignals(self,
after: float = -1,
before: float = -1,
nums: List[int] = [],
devices = [],
ignore_host = False):
pipeline = []
match = {"$or": self.deviceHash2Match(devices)}
if after != -1 or before != -1:
match["ts"] = {}
if after != -1:
match["ts"]["$gt"] = after
if before != -1:
match["ts"]["$lt"] = before
if nums:
if type(nums) != list:
nums = [nums]
match["from"] = {"$in":nums}
if ignore_host and "from" not in match:
match["from"] = {"$nin": self.devicesHash2NumList(devices)}
pipeline.append({"$match":match})
collection = self.dbStore['packet']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [PacketSignal(p) for p in l]
async def findPacketsAndGroupCount(self,
after: float = -1,
before: float = -1,
nums: List[int] = [],
portnums: List[int] = [],
packetsPerNode = False,
packetsSumNode = False,
devices = [],
ignore_host = False):
pipeline = []
match = {"$or": self.deviceHash2Match(devices)}
if after != -1 or before != -1:
match["ts"] = {}
if after != -1:
match["ts"]["$gt"] = after
if before != -1:
match["ts"]["$lt"] = before
if nums:
if type(nums) != list:
nums = [nums]
match["from"] = {"$in":nums}
if ignore_host and "from" not in match:
match["from"] = {"$nin": self.devicesHash2NumList(devices)}
if portnums:
if type(portnums) != list:
portnums = [portnums]
match["portnums"] = {"$in":portnums}
if match:
pipeline.append({"$match":match})
#групировка по количеству на выхлопе _id номер портнума и count число его юзов
groupPipe = {
"$group":
{"_id": "$portnum", "count": {"$sum": 1}}
}
if packetsPerNode:
groupPipe["$group"]["_id"] = {
"portnum": "$portnum",
"from": "$from"
}
if packetsSumNode:
groupPipe["$group"]["_id"] = "$from"
pipeline.append(groupPipe)
###print(pipeline)
collection = self.dbStore['packet']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [PacketGroup(p, packetsPerNode, packetsSumNode) for p in l]
class DbService(NodeDbService, PacketDbService, MessageDbService):
def __init__(self, dbStore, core):
NodeDbService.__init__(self, dbStore, core)
PacketDbService.__init__(self, dbStore, core)
MessageDbService.__init__(self, dbStore, core)