You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
337 lines
14 KiB
337 lines
14 KiB
from typing import List
|
|
from pymongo.asynchronous.database import AsyncDatabase
|
|
from typing import List
|
|
from entities.PacketGroup import PacketGroup
|
|
from entities.PacketSignal import PacketSignal
|
|
from pymongo import DESCENDING
|
|
|
|
class MultiDeviceDbSupport:
|
|
def __init__(self, dbStore, core):
|
|
self.dbStore:AsyncDatabase = dbStore
|
|
self.core = core
|
|
|
|
def device2Hash(self, devices_hashes = []):
|
|
if devices_hashes == None:
|
|
devices_hashes = [self.core.defaultDeviceUUIDHash]
|
|
elif (type(devices_hashes) == list):
|
|
pass
|
|
else:
|
|
devices_hashes = [devices_hashes]
|
|
|
|
if devices_hashes.__len__() == 0:
|
|
devices_hashes = [self.core.defaultDeviceUUIDHash]
|
|
return devices_hashes
|
|
|
|
#return {"$or": [{"device_uuid": value}]}
|
|
def deviceHash2Match(self, devices_hashes = []):
|
|
devices_hashes = self.device2Hash(devices_hashes)
|
|
|
|
#print(devices_hashes)
|
|
orPart = []
|
|
for hash in devices_hashes:
|
|
uuid = self.core.devicesUuidHashToUuid.get(hash, "")
|
|
if uuid:
|
|
if uuid == self.core.defaultDeviceUUID:
|
|
orPart.append({"device_uuid": None})
|
|
orPart.append({"device_uuid": uuid})
|
|
return orPart#-->{"$or": orPart}
|
|
|
|
def devicesHash2UuidList(self, devices_hashes = []):
|
|
devices_hashes = self.device2Hash(devices_hashes)
|
|
uuids = []
|
|
for hash in devices_hashes:
|
|
uuid = self.core.devicesUuidHashToUuid.get(hash, "")
|
|
if uuid:
|
|
uuids.append(uuid)
|
|
return uuids
|
|
|
|
def devicesHash2NumList(self, devices_hashes = []):
|
|
uuids = self.devicesHash2UuidList(devices_hashes)
|
|
nums = []
|
|
for device in self.core.devices:
|
|
if device.nid:
|
|
nums.append(device.nid)
|
|
|
|
return nums
|
|
|
|
class MessageDbService(MultiDeviceDbSupport):
|
|
MESSAGE_PORTNUM = 1
|
|
def __init__(self, dbStore, core):
|
|
super().__init__(dbStore, core)
|
|
|
|
async def listOfMessage(self, limit:int, offset:int, after: float, before: float, devices = []):
|
|
collection = self.dbStore['packet']
|
|
|
|
payload = {
|
|
"to": int(self.core.PUB_CH),
|
|
"portnum":self.MESSAGE_PORTNUM,
|
|
"ts":{'$gt': after, "$lt": before},
|
|
"$or": self.deviceHash2Match(devices)
|
|
}
|
|
|
|
c = collection.find(payload).sort("ts", DESCENDING).skip(offset).limit(limit)
|
|
return await c.to_list()
|
|
|
|
class PacketDbService(MultiDeviceDbSupport):
|
|
def __init__(self, dbStore, core):
|
|
super().__init__(dbStore, core)
|
|
|
|
async def findLastPackets(self, nums: List[int] = [], portnums: List[int] = [], devices = []):
|
|
pipeline = []
|
|
match = {"$or": self.deviceHash2Match(devices)}
|
|
|
|
if nums:
|
|
match["from"] = {"$in": nums}
|
|
|
|
if portnums:
|
|
match["portnum"] = {"$in": portnums}
|
|
|
|
if match:
|
|
pipeline += [{"$match":match}]
|
|
|
|
pipeline += [
|
|
{"$sort": {"from": 1, "ts": -1}},
|
|
{"$group": {
|
|
"_id": "$from",
|
|
"latest_doc": {"$first": "$$ROOT"}
|
|
}},
|
|
{"$replaceRoot": {"newRoot": "$latest_doc"}},
|
|
{"$sort": {"from": 1}}
|
|
]
|
|
collection = self.dbStore['packet']
|
|
c = await collection.aggregate(pipeline)
|
|
return await c.to_list()
|
|
|
|
async def findPacketsSignals(self,
|
|
after: float = -1,
|
|
before: float = -1,
|
|
nums: List[int] = [],
|
|
devices = [],
|
|
ignore_host = False):
|
|
pipeline = []
|
|
match = {"$or": self.deviceHash2Match(devices)}
|
|
if after != -1 or before != -1:
|
|
match["ts"] = {}
|
|
if after != -1:
|
|
match["ts"]["$gt"] = after
|
|
if before != -1:
|
|
match["ts"]["$lt"] = before
|
|
|
|
if nums:
|
|
if type(nums) != list:
|
|
nums = [nums]
|
|
match["from"] = {"$in":nums}
|
|
|
|
if ignore_host and "from" not in match:
|
|
match["from"] = {"$nin": self.devicesHash2NumList(devices)}
|
|
|
|
pipeline.append({"$match":match})
|
|
collection = self.dbStore['packet']
|
|
c = await collection.aggregate(pipeline)
|
|
l = await c.to_list()
|
|
return [PacketSignal(p) for p in l]
|
|
|
|
async def findPacketsAndGroupCount(self,
|
|
after: float = -1,
|
|
before: float = -1,
|
|
nums: List[int] = [],
|
|
portnums: List[int] = [],
|
|
packetsPerNode = False,
|
|
packetsSumNode = False,
|
|
devices = [],
|
|
ignore_host = False):
|
|
pipeline = []
|
|
|
|
match = {"$or": self.deviceHash2Match(devices)}
|
|
if after != -1 or before != -1:
|
|
match["ts"] = {}
|
|
if after != -1:
|
|
match["ts"]["$gt"] = after
|
|
if before != -1:
|
|
match["ts"]["$lt"] = before
|
|
|
|
if nums:
|
|
if type(nums) != list:
|
|
nums = [nums]
|
|
match["from"] = {"$in":nums}
|
|
|
|
if ignore_host and "from" not in match:
|
|
match["from"] = {"$nin": self.devicesHash2NumList(devices)}
|
|
|
|
if portnums:
|
|
if type(portnums) != list:
|
|
portnums = [portnums]
|
|
match["portnums"] = {"$in":portnums}
|
|
|
|
if match:
|
|
pipeline.append({"$match":match})
|
|
|
|
#групировка по количеству на выхлопе _id номер портнума и count число его юзов
|
|
groupPipe = {
|
|
"$group":
|
|
{"_id": "$portnum", "count": {"$sum": 1}}
|
|
}
|
|
|
|
if packetsPerNode:
|
|
groupPipe["$group"]["_id"] = {
|
|
"portnum": "$portnum",
|
|
"from": "$from"
|
|
}
|
|
|
|
if packetsSumNode:
|
|
groupPipe["$group"]["_id"] = "$from"
|
|
|
|
pipeline.append(groupPipe)
|
|
|
|
###print(pipeline)
|
|
collection = self.dbStore['packet']
|
|
c = await collection.aggregate(pipeline)
|
|
l = await c.to_list()
|
|
return [PacketGroup(p, packetsPerNode, packetsSumNode) for p in l]
|
|
|
|
class NodeDbService(PacketDbService, MultiDeviceDbSupport):
|
|
def __init__(self, dbStore, core):
|
|
PacketDbService.__init__(self, dbStore, core)
|
|
MultiDeviceDbSupport.__init__(self, dbStore, core)
|
|
|
|
async def fillNodeFromPacketCollection(self, l, devices = []):
|
|
nums = [node.get("num", 0) for node in l]
|
|
|
|
#ищем сначала ноды с позицией 3 портнум
|
|
packet_pos_nodes = await self.findLastPackets(nums, [3], devices)####
|
|
packet_pos_nodes_dict = {}###
|
|
for packet in packet_pos_nodes:
|
|
packet_pos_nodes_dict[packet.get("from")] = packet
|
|
nums_with_pos = [node.get("from", 0) for node in packet_pos_nodes]
|
|
|
|
#смотрим какие пакеты от нод имеют позицию, и делаем список нод где нет ее
|
|
nums_without_pos = [fr0m for fr0m in nums if fr0m not in nums_with_pos]
|
|
packet_without_pos = await self.findLastPackets(nums_without_pos, [], devices)####
|
|
packet_without_pos_dict = {}
|
|
for packet in packet_without_pos:
|
|
packet_without_pos_dict[packet.get("from")] = packet
|
|
|
|
#суем свежачок
|
|
new_nodes_list = []
|
|
def fillNodeBaseData(node, d1ct):
|
|
## base packet
|
|
if d1ct[node["num"]].get("rx_snr", None) != None:
|
|
node["snr"] = d1ct[node["num"]]["rx_snr"]
|
|
if d1ct[node["num"]].get("rx_rssi", None) != None:
|
|
node["rssi"] = d1ct[node["num"]]["rx_rssi"]
|
|
if d1ct[node["num"]].get("ts", None) != None:
|
|
node["ts"] = d1ct[node["num"]]["ts"]
|
|
if d1ct[node["num"]].get("hop_start", None) != None and d1ct[node["num"]].get("hop_limit", None) != None:
|
|
node["hops_away"] = d1ct[node["num"]].get("hop_start", 0) - d1ct[node["num"]].get("hop_limit", 0)
|
|
if d1ct[node["num"]].get("portnum", None) != None:
|
|
node["last_portnum"] = d1ct[node["num"]].get("portnum", None)
|
|
return node
|
|
|
|
for node in l:
|
|
node["view_in_packets"] = True
|
|
if node.get("num", 0) in packet_pos_nodes_dict.keys():
|
|
if packet_pos_nodes_dict[node["num"]].get("decoded_payload_object", {}):
|
|
node["position"] = packet_pos_nodes_dict[node["num"]]["decoded_payload_object"]
|
|
|
|
node = fillNodeBaseData(node, packet_pos_nodes_dict)
|
|
elif node.get("num", 0) in packet_without_pos_dict.keys():
|
|
node = fillNodeBaseData(node, packet_without_pos_dict)
|
|
else:
|
|
node["view_in_packets"] = False
|
|
|
|
new_nodes_list.append(node)
|
|
|
|
return new_nodes_list
|
|
|
|
async def listOfNodes(self, devices = []):
|
|
pipeline = [ #ai slooop
|
|
{"$match": {"$or": self.deviceHash2Match(devices)}},
|
|
# Сортировка для каждого num по убыванию ts
|
|
{"$sort": {"num": 1, "ts": -1}},
|
|
# Группировка по num и взятие первого (последнего) документа
|
|
{"$group": {
|
|
"_id": "$num",
|
|
"latest_doc": {"$first": "$$ROOT"}
|
|
}},
|
|
# Замена корневого документа
|
|
{"$replaceRoot": {"newRoot": "$latest_doc"}},
|
|
# Сортировка результата (опционально)
|
|
{"$sort": {"num": 1}}
|
|
]
|
|
|
|
collection = self.dbStore['node_info']
|
|
c = await collection.aggregate(pipeline)
|
|
l = await c.to_list()
|
|
return await self.fillNodeFromPacketCollection(l, devices)
|
|
|
|
async def listOfDirectNodes(self, devices = []):
|
|
pipeline = [ #ai slooop
|
|
{"$match": {"hops_away": 0, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
|
|
# Сортировка для каждого num по убыванию ts
|
|
{"$sort": {"num": 1, "ts": -1}},
|
|
# Группировка по num и взятие первого (последнего) документа
|
|
{"$group": {
|
|
"_id": "$num",
|
|
"latest_doc": {"$first": "$$ROOT"}
|
|
}},
|
|
# Замена корневого документа
|
|
{"$replaceRoot": {"newRoot": "$latest_doc"}},
|
|
# Сортировка результата (опционально)
|
|
{"$sort": {"num": 1}}
|
|
]
|
|
|
|
collection = self.dbStore['node_info']
|
|
c = await collection.aggregate(pipeline)
|
|
l = await c.to_list()
|
|
return await self.fillNodeFromPacketCollection(l, devices)
|
|
|
|
async def oneNode(self, num: int, devices = []):
|
|
collection = self.dbStore['node_info']
|
|
c = await collection.find_one(
|
|
{"num":int(num), "$or": self.deviceHash2Match(devices)},
|
|
sort=[("ts", -1)]
|
|
)
|
|
l = await self.fillNodeFromPacketCollection([c], devices)
|
|
return l[0] if l.__len__() > 0 else None
|
|
|
|
async def listOfSelectedNodes(self, nums: List[int], devices = []):
|
|
pipeline = [
|
|
{"$match": {"num": {"$in": nums}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
|
|
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
|
|
{
|
|
"$group": {
|
|
"_id": "$num", # Группируем по num
|
|
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
|
|
}
|
|
},
|
|
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
|
|
{"$sort": {"num": 1}} # Сортируем результат по num
|
|
]
|
|
collection = self.dbStore['node_info']
|
|
c = await collection.aggregate(pipeline)
|
|
l = await c.to_list()
|
|
return await self.fillNodeFromPacketCollection(l, devices)
|
|
|
|
async def listOfFindLikeName(self, long_name, devices = []):
|
|
pipeline = [
|
|
{"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}, "$or": self.deviceHash2Match(devices)}}, # Фильтруем по списку
|
|
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
|
|
{
|
|
"$group": {
|
|
"_id": "$num", # Группируем по num
|
|
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
|
|
}
|
|
},
|
|
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
|
|
{"$sort": {"num": 1}} # Сортируем результат по num
|
|
]
|
|
collection = self.dbStore['node_info']
|
|
c = await collection.aggregate(pipeline)
|
|
return await c.to_list() #no need fill, just search
|
|
|
|
class DbService(NodeDbService, PacketDbService, MessageDbService):
|
|
def __init__(self, dbStore, core):
|
|
NodeDbService.__init__(self, dbStore, core)
|
|
PacketDbService.__init__(self, dbStore, core)
|
|
MessageDbService.__init__(self, dbStore, core)
|
|
|