You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

337 lines
14 KiB

from typing import List
from pymongo.asynchronous.database import AsyncDatabase
from typing import List
from entities.PacketGroup import PacketGroup
from entities.PacketSignal import PacketSignal
from pymongo import DESCENDING
class MultiDeviceDbSupport:
def __init__(self, dbStore, core):
self.dbStore:AsyncDatabase = dbStore
self.core = core
def device2Hash(self, devices_hashes = []):
if devices_hashes == None:
devices_hashes = [self.core.defaultDeviceUUIDHash]
elif (type(devices_hashes) == list):
pass
else:
devices_hashes = [devices_hashes]
if devices_hashes.__len__() == 0:
devices_hashes = [self.core.defaultDeviceUUIDHash]
return devices_hashes
#return {"$or": [{"device_uuid": value}]}
def deviceHash2Match(self, devices_hashes = []):
devices_hashes = self.device2Hash(devices_hashes)
#print(devices_hashes)
orPart = []
for hash in devices_hashes:
uuid = self.core.devicesUuidHashToUuid.get(hash, "")
if uuid:
if uuid == self.core.defaultDeviceUUID:
orPart.append({"device_uuid": None})
orPart.append({"device_uuid": uuid})
return orPart#-->{"$or": orPart}
def devicesHash2UuidList(self, devices_hashes = []):
devices_hashes = self.device2Hash(devices_hashes)
uuids = []
for hash in devices_hashes:
uuid = self.core.devicesUuidHashToUuid.get(hash, "")
if uuid:
uuids.append(uuid)
return uuids
def devicesHash2NumList(self, devices_hashes = []):
uuids = self.devicesHash2UuidList(devices_hashes)
nums = []
for device in self.core.devices:
if device.nid:
nums.append(device.nid)
return nums
class MessageDbService(MultiDeviceDbSupport):
MESSAGE_PORTNUM = 1
def __init__(self, dbStore, core):
super().__init__(dbStore, core)
async def listOfMessage(self, limit:int, offset:int, after: float, before: float, devices = []):
collection = self.dbStore['packet']
payload = {
"to": int(self.core.PUB_CH),
"portnum":self.MESSAGE_PORTNUM,
"ts":{'$gt': after, "$lt": before},
"$or": self.deviceHash2Match(devices)
}
c = collection.find(payload).sort("ts", DESCENDING).skip(offset).limit(limit)
return await c.to_list()
class PacketDbService(MultiDeviceDbSupport):
def __init__(self, dbStore, core):
super().__init__(dbStore, core)
async def findLastPackets(self, nums: List[int] = [], portnums: List[int] = [], devices = []):
pipeline = []
match = {"$or": self.deviceHash2Match(devices)}
if nums:
match["from"] = {"$in": nums}
if portnums:
match["portnum"] = {"$in": portnums}
if match:
pipeline += [{"$match":match}]
pipeline += [
{"$sort": {"from": 1, "ts": -1}},
{"$group": {
"_id": "$from",
"latest_doc": {"$first": "$$ROOT"}
}},
{"$replaceRoot": {"newRoot": "$latest_doc"}},
{"$sort": {"from": 1}}
]
collection = self.dbStore['packet']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def findPacketsSignals(self,
after: float = -1,
before: float = -1,
nums: List[int] = [],
devices = [],
ignore_host = False):
pipeline = []
match = {"$or": self.deviceHash2Match(devices)}
if after != -1 or before != -1:
match["ts"] = {}
if after != -1:
match["ts"]["$gt"] = after
if before != -1:
match["ts"]["$lt"] = before
if nums:
if type(nums) != list:
nums = [nums]
match["from"] = {"$in":nums}
if ignore_host and "from" not in match:
match["from"] = {"$nin": self.devicesHash2NumList(devices)}
pipeline.append({"$match":match})
collection = self.dbStore['packet']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [PacketSignal(p) for p in l]
async def findPacketsAndGroupCount(self,
after: float = -1,
before: float = -1,
nums: List[int] = [],
portnums: List[int] = [],
packetsPerNode = False,
packetsSumNode = False,
devices = [],
ignore_host = False):
pipeline = []
match = {"$or": self.deviceHash2Match(devices)}
if after != -1 or before != -1:
match["ts"] = {}
if after != -1:
match["ts"]["$gt"] = after
if before != -1:
match["ts"]["$lt"] = before
if nums:
if type(nums) != list:
nums = [nums]
match["from"] = {"$in":nums}
if ignore_host and "from" not in match:
match["from"] = {"$nin": self.devicesHash2NumList(devices)}
if portnums:
if type(portnums) != list:
portnums = [portnums]
match["portnums"] = {"$in":portnums}
if match:
pipeline.append({"$match":match})
#групировка по количеству на выхлопе _id номер портнума и count число его юзов
groupPipe = {
"$group":
{"_id": "$portnum", "count": {"$sum": 1}}
}
if packetsPerNode:
groupPipe["$group"]["_id"] = {
"portnum": "$portnum",
"from": "$from"
}
if packetsSumNode:
groupPipe["$group"]["_id"] = "$from"
pipeline.append(groupPipe)
###print(pipeline)
collection = self.dbStore['packet']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [PacketGroup(p, packetsPerNode, packetsSumNode) for p in l]
class NodeDbService(PacketDbService, MultiDeviceDbSupport):
def __init__(self, dbStore, core):
PacketDbService.__init__(self, dbStore, core)
MultiDeviceDbSupport.__init__(self, dbStore, core)
async def fillNodeFromPacketCollection(self, l, devices = []):
nums = [node.get("num", 0) for node in l]
#ищем сначала ноды с позицией 3 портнум
packet_pos_nodes = await self.findLastPackets(nums, [3], devices)####
packet_pos_nodes_dict = {}###
for packet in packet_pos_nodes:
packet_pos_nodes_dict[packet.get("from")] = packet
nums_with_pos = [node.get("from", 0) for node in packet_pos_nodes]
#смотрим какие пакеты от нод имеют позицию, и делаем список нод где нет ее
nums_without_pos = [fr0m for fr0m in nums if fr0m not in nums_with_pos]
packet_without_pos = await self.findLastPackets(nums_without_pos, [], devices)####
packet_without_pos_dict = {}
for packet in packet_without_pos:
packet_without_pos_dict[packet.get("from")] = packet
#суем свежачок
new_nodes_list = []
def fillNodeBaseData(node, d1ct):
## base packet
if d1ct[node["num"]].get("rx_snr", None) != None:
node["snr"] = d1ct[node["num"]]["rx_snr"]
if d1ct[node["num"]].get("rx_rssi", None) != None:
node["rssi"] = d1ct[node["num"]]["rx_rssi"]
if d1ct[node["num"]].get("ts", None) != None:
node["ts"] = d1ct[node["num"]]["ts"]
if d1ct[node["num"]].get("hop_start", None) != None and d1ct[node["num"]].get("hop_limit", None) != None:
node["hops_away"] = d1ct[node["num"]].get("hop_start", 0) - d1ct[node["num"]].get("hop_limit", 0)
if d1ct[node["num"]].get("portnum", None) != None:
node["last_portnum"] = d1ct[node["num"]].get("portnum", None)
return node
for node in l:
node["view_in_packets"] = True
if node.get("num", 0) in packet_pos_nodes_dict.keys():
if packet_pos_nodes_dict[node["num"]].get("decoded_payload_object", {}):
node["position"] = packet_pos_nodes_dict[node["num"]]["decoded_payload_object"]
node = fillNodeBaseData(node, packet_pos_nodes_dict)
elif node.get("num", 0) in packet_without_pos_dict.keys():
node = fillNodeBaseData(node, packet_without_pos_dict)
else:
node["view_in_packets"] = False
new_nodes_list.append(node)
return new_nodes_list
async def listOfNodes(self, devices = []):
pipeline = [ #ai slooop
{"$match": {"$or": self.deviceHash2Match(devices)}},
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return await self.fillNodeFromPacketCollection(l, devices)
async def listOfDirectNodes(self, devices = []):
pipeline = [ #ai slooop
{"$match": {"hops_away": 0, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return await self.fillNodeFromPacketCollection(l, devices)
async def oneNode(self, num: int, devices = []):
collection = self.dbStore['node_info']
c = await collection.find_one(
{"num":int(num), "$or": self.deviceHash2Match(devices)},
sort=[("ts", -1)]
)
l = await self.fillNodeFromPacketCollection([c], devices)
return l[0] if l.__len__() > 0 else None
async def listOfSelectedNodes(self, nums: List[int], devices = []):
pipeline = [
{"$match": {"num": {"$in": nums}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return await self.fillNodeFromPacketCollection(l, devices)
async def listOfFindLikeName(self, long_name, devices = []):
pipeline = [
{"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list() #no need fill, just search
class DbService(NodeDbService, PacketDbService, MessageDbService):
def __init__(self, dbStore, core):
NodeDbService.__init__(self, dbStore, core)
PacketDbService.__init__(self, dbStore, core)
MessageDbService.__init__(self, dbStore, core)