diff --git a/dbService.py b/dbService.py new file mode 100644 index 0000000..6aa121d --- /dev/null +++ b/dbService.py @@ -0,0 +1,87 @@ +from typing import List +from pymongo.asynchronous.database import AsyncDatabase + +class DbService: + def __init__(self, dbStore): + self.dbStore:AsyncDatabase = dbStore + + async def listOfNodes(self): + pipeline = [ #ai slooop + # Сортировка для каждого num по убыванию ts + {"$sort": {"num": 1, "ts": -1}}, + # Группировка по num и взятие первого (последнего) документа + {"$group": { + "_id": "$num", + "latest_doc": {"$first": "$$ROOT"} + }}, + # Замена корневого документа + {"$replaceRoot": {"newRoot": "$latest_doc"}}, + # Сортировка результата (опционально) + {"$sort": {"num": 1}} + ] + + collection = self.dbStore['node_info'] + c = await collection.aggregate(pipeline) + return await c.to_list() + + async def listOfDirectNodes(self): + pipeline = [ #ai slooop + {"$match": {"hops_away": 0}}, # Фильтруем по списку + # Сортировка для каждого num по убыванию ts + {"$sort": {"num": 1, "ts": -1}}, + # Группировка по num и взятие первого (последнего) документа + {"$group": { + "_id": "$num", + "latest_doc": {"$first": "$$ROOT"} + }}, + # Замена корневого документа + {"$replaceRoot": {"newRoot": "$latest_doc"}}, + # Сортировка результата (опционально) + {"$sort": {"num": 1}} + ] + + collection = self.dbStore['node_info'] + c = await collection.aggregate(pipeline) + return await c.to_list() + + async def oneNode(self, num: int): + collection = self.dbStore['node_info'] + c = await collection.find_one( + {"num":num}, + sort=[("ts", -1)] + ) + return c + + async def listOfSelectedNodes(self, nums: List[int]): + pipeline = [ + {"$match": {"num": {"$in": nums}}}, # Фильтруем по списку + {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) + { + "$group": { + "_id": "$num", # Группируем по num + "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) + } + }, + {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру + {"$sort": {"num": 1}} # Сортируем результат по num + ] + collection = self.dbStore['node_info'] + c = await collection.aggregate(pipeline) + return await c.to_list() + + async def listOfFindLikeName(self, long_name): + pipeline = [ + {"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}}}, # Фильтруем по списку + {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) + { + "$group": { + "_id": "$num", # Группируем по num + "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) + } + }, + {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру + {"$sort": {"num": 1}} # Сортируем результат по num + ] + collection = self.dbStore['node_info'] + c = await collection.aggregate(pipeline) + return await c.to_list() \ No newline at end of file diff --git a/service.py b/service.py index a22fef9..c9e78b5 100644 --- a/service.py +++ b/service.py @@ -21,6 +21,7 @@ from contextlib import asynccontextmanager #mongo from pymongo import AsyncMongoClient +from dbService import DbService #other from botManager import BotManager @@ -46,7 +47,7 @@ class MeshListener(MeshArgsParse): elif args.transport == "tcp": from transport_tcp import TCPTransport ip, port = args.tcp_address.split(":") - self.transport = TCPTransport(ip, int(port)) + self.transport = TCPTransport(ip, int(port), alive_pool_connect=self.args.serial_alive_pool_seconds) else: logger.error("Unknown mesh transport") sys.exit(1) @@ -120,7 +121,7 @@ class MongoDriver(MeshArgsParse): logger.error("Unknown mongo client") sys.exit(1) self.dbStore = self.dbClient[self.args.mongo_db] - #self.dbCollection = self.dbStore.from_radio + self.dbService = DbService(self.dbStore) async def dbSaveRadio(self, new_from_radio): '''try: @@ -242,6 +243,7 @@ if __name__ == "__main__": #serial transport parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141") parser.add_argument("--serial-baudrate", default=115200) + parser.add_argument("--serial-alive-pool-seconds", default=60) #ble transport parser.add_argument("--ble-adapter", default=None) parser.add_argument("--ble-mesh-mac", default="22AC1D28-5345-465E-2E82-18CDE5857A45") diff --git a/transport_tcp.py b/transport_tcp.py index 8a10d0b..52b2312 100644 --- a/transport_tcp.py +++ b/transport_tcp.py @@ -18,7 +18,7 @@ def encode_frame(payload): class TCPTransport: - def __init__(self, host, port=4403): + def __init__(self, host, port=4403, alive_pool_connect = 60): self.host = host self.port = int(port) self.reader: StreamReader = None @@ -29,6 +29,7 @@ class TCPTransport: self._error = None self._closing = False self.socket_start = time() + self.alive_pool_connect = alive_pool_connect async def start(self): self._closing = False @@ -54,7 +55,7 @@ class TCPTransport: try: while True: await self.send(b"0") - await asyncio.sleep(1) + await asyncio.sleep(self.alive_pool_connect) except asyncio.CancelledError: pass except: diff --git a/webExtensions/extra/NodeDTO.py b/webExtensions/extra/NodeDTO.py index 8e490ce..c3f3a6a 100644 --- a/webExtensions/extra/NodeDTO.py +++ b/webExtensions/extra/NodeDTO.py @@ -3,4 +3,9 @@ __slots__ = ["num", "snr", "hops_away", "ts", "long_name", "short_name"] class NodeDTO: def __init__(self, data): for slot in __slots__: + setattr(self, slot, data.get(slot, None)) + +class NodeShortDTO: + def __init__(self, data): + for slot in ["num", "long_name", "short_name"]: setattr(self, slot, data.get(slot, None)) \ No newline at end of file diff --git a/webExtensions/nodeList.py b/webExtensions/nodeList.py index 37ffb99..575f6d6 100644 --- a/webExtensions/nodeList.py +++ b/webExtensions/nodeList.py @@ -4,7 +4,7 @@ from fastapi.requests import Request from fastapi import Query from pymongo.asynchronous.database import AsyncDatabase -from extra.NodeDTO import NodeDTO +from extra.NodeDTO import NodeDTO, NodeShortDTO from typing import List, Annotated class WebExtension: @@ -13,61 +13,29 @@ class WebExtension: def __init__(self, core): self.core = core self.app = core.app - self.dbStore = core.dbStore + self.dbService = core.dbService @self.app.get(f"{self.core.context}/nodes/list") @self.core.authManager.authRequest() async def listOfNodes(request: Request): - pipeline = [ #ai slooop - # Сортировка для каждого num по убыванию ts - {"$sort": {"num": 1, "ts": -1}}, - # Группировка по num и взятие первого (последнего) документа - {"$group": { - "_id": "$num", - "latest_doc": {"$first": "$$ROOT"} - }}, - # Замена корневого документа - {"$replaceRoot": {"newRoot": "$latest_doc"}}, - # Сортировка результата (опционально) - {"$sort": {"num": 1}} - ] - - collection = self.dbStore['node_info'] - c = await collection.aggregate(pipeline) - l = await c.to_list() + l = await self.dbService.listOfNodes() return [NodeDTO(node) for node in l] @self.app.get(f"{self.core.context}/nodes/direct") @self.core.authManager.authRequest() async def listOfDirectNodes(request: Request): - pipeline = [ #ai slooop - {"$match": {"hops_away": 0}}, # Фильтруем по списку - # Сортировка для каждого num по убыванию ts - {"$sort": {"num": 1, "ts": -1}}, - # Группировка по num и взятие первого (последнего) документа - {"$group": { - "_id": "$num", - "latest_doc": {"$first": "$$ROOT"} - }}, - # Замена корневого документа - {"$replaceRoot": {"newRoot": "$latest_doc"}}, - # Сортировка результата (опционально) - {"$sort": {"num": 1}} - ] - - collection = self.dbStore['node_info'] - c = await collection.aggregate(pipeline) - l = await c.to_list() + l = await self.dbService.listOfDirectNodes() return [NodeDTO(node) for node in l] + @self.app.get(self.core.context + "/nodes/search") + async def listOfFindLikeName(name=str): + l = await self.dbService.listOfFindLikeName(name) + return [NodeShortDTO(node) for node in l] + @self.app.get(self.core.context + "/nodes/{num}") @self.core.authManager.authRequest() async def oneNode(request: Request, num: int): - collection = self.dbStore['node_info'] - c = await collection.find_one( - {"num":num}, - sort=[("ts", -1)] - ) + c = await self.dbService.oneNode(num) if c: return NodeDTO(c) else: @@ -76,19 +44,5 @@ class WebExtension: @self.app.get(self.core.context + "/nodes") @self.core.authManager.authRequest() async def listOfSelectedNodes(request: Request, nums: List[int] = Query(None)): - pipeline = [ - {"$match": {"num": {"$in": nums}}}, # Фильтруем по списку - {"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию) - { - "$group": { - "_id": "$num", # Группируем по num - "latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts) - } - }, - {"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру - {"$sort": {"num": 1}} # Сортируем результат по num - ] - collection = self.dbStore['node_info'] - c = await collection.aggregate(pipeline) - l = await c.to_list() + l = await self.dbService.listOfSelectedNodes(nums) return [NodeDTO(node) for node in l] \ No newline at end of file diff --git a/webExtensions/publicEndpoints.py b/webExtensions/publicEndpoints.py index 34423b7..ac22e86 100644 --- a/webExtensions/publicEndpoints.py +++ b/webExtensions/publicEndpoints.py @@ -8,6 +8,7 @@ class WebExtension: def __init__(self, core): self.core = core self.app = core.app + self.dbService = core.dbService @self.app.get(f"{self.core.context}/ping") async def pong(request:Request): @@ -24,7 +25,7 @@ class WebExtension: async def sendCodeToNode(request:Request, num: int): #в сообщение одноразовый код и юзер агент кто слал, так-же чтобы не спапили по фингерпринту один код в минуту if num: - node = await self.getNodeInfo(num) + node = await self.dbService.oneNode(num) if node: code = self.core.authManager.request_auth(num) await self.core.device.sendMsgToDM(f"Auth code: {code}", num) @@ -47,12 +48,4 @@ class WebExtension: @self.app.get(f"{self.core.context}/auth/logout") async def clearSession(request:Request): - return self.core.authManager.setAuth(JSONResponse({"status":True}), 0, True) - - async def getNodeInfo(self, num:int): - collection = self.core.dbStore['node_info'] - c = await collection.find_one( - {"num":num}, - sort=[("ts", -1)] - ) - return c + return self.core.authManager.setAuth(JSONResponse({"status":True}), 0, True) \ No newline at end of file