Browse Source

db service

main
gsd 4 months ago
parent
commit
b168ac62d4
  1. 87
      dbService.py
  2. 6
      service.py
  3. 5
      transport_tcp.py
  4. 5
      webExtensions/extra/NodeDTO.py
  5. 68
      webExtensions/nodeList.py
  6. 13
      webExtensions/publicEndpoints.py

87
dbService.py

@ -0,0 +1,87 @@
from typing import List
from pymongo.asynchronous.database import AsyncDatabase
class DbService:
def __init__(self, dbStore):
self.dbStore:AsyncDatabase = dbStore
async def listOfNodes(self):
pipeline = [ #ai slooop
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def listOfDirectNodes(self):
pipeline = [ #ai slooop
{"$match": {"hops_away": 0}}, # Фильтруем по списку
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def oneNode(self, num: int):
collection = self.dbStore['node_info']
c = await collection.find_one(
{"num":num},
sort=[("ts", -1)]
)
return c
async def listOfSelectedNodes(self, nums: List[int]):
pipeline = [
{"$match": {"num": {"$in": nums}}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()
async def listOfFindLikeName(self, long_name):
pipeline = [
{"$match": {"long_name": {"$regex":long_name ,'$options' : 'i'}}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
return await c.to_list()

6
service.py

@ -21,6 +21,7 @@ from contextlib import asynccontextmanager
#mongo #mongo
from pymongo import AsyncMongoClient from pymongo import AsyncMongoClient
from dbService import DbService
#other #other
from botManager import BotManager from botManager import BotManager
@ -46,7 +47,7 @@ class MeshListener(MeshArgsParse):
elif args.transport == "tcp": elif args.transport == "tcp":
from transport_tcp import TCPTransport from transport_tcp import TCPTransport
ip, port = args.tcp_address.split(":") ip, port = args.tcp_address.split(":")
self.transport = TCPTransport(ip, int(port)) self.transport = TCPTransport(ip, int(port), alive_pool_connect=self.args.serial_alive_pool_seconds)
else: else:
logger.error("Unknown mesh transport") logger.error("Unknown mesh transport")
sys.exit(1) sys.exit(1)
@ -120,7 +121,7 @@ class MongoDriver(MeshArgsParse):
logger.error("Unknown mongo client") logger.error("Unknown mongo client")
sys.exit(1) sys.exit(1)
self.dbStore = self.dbClient[self.args.mongo_db] self.dbStore = self.dbClient[self.args.mongo_db]
#self.dbCollection = self.dbStore.from_radio self.dbService = DbService(self.dbStore)
async def dbSaveRadio(self, new_from_radio): async def dbSaveRadio(self, new_from_radio):
'''try: '''try:
@ -242,6 +243,7 @@ if __name__ == "__main__":
#serial transport #serial transport
parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141") parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141")
parser.add_argument("--serial-baudrate", default=115200) parser.add_argument("--serial-baudrate", default=115200)
parser.add_argument("--serial-alive-pool-seconds", default=60)
#ble transport #ble transport
parser.add_argument("--ble-adapter", default=None) parser.add_argument("--ble-adapter", default=None)
parser.add_argument("--ble-mesh-mac", default="22AC1D28-5345-465E-2E82-18CDE5857A45") parser.add_argument("--ble-mesh-mac", default="22AC1D28-5345-465E-2E82-18CDE5857A45")

5
transport_tcp.py

@ -18,7 +18,7 @@ def encode_frame(payload):
class TCPTransport: class TCPTransport:
def __init__(self, host, port=4403): def __init__(self, host, port=4403, alive_pool_connect = 60):
self.host = host self.host = host
self.port = int(port) self.port = int(port)
self.reader: StreamReader = None self.reader: StreamReader = None
@ -29,6 +29,7 @@ class TCPTransport:
self._error = None self._error = None
self._closing = False self._closing = False
self.socket_start = time() self.socket_start = time()
self.alive_pool_connect = alive_pool_connect
async def start(self): async def start(self):
self._closing = False self._closing = False
@ -54,7 +55,7 @@ class TCPTransport:
try: try:
while True: while True:
await self.send(b"0") await self.send(b"0")
await asyncio.sleep(1) await asyncio.sleep(self.alive_pool_connect)
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
except: except:

5
webExtensions/extra/NodeDTO.py

@ -3,4 +3,9 @@ __slots__ = ["num", "snr", "hops_away", "ts", "long_name", "short_name"]
class NodeDTO: class NodeDTO:
def __init__(self, data): def __init__(self, data):
for slot in __slots__: for slot in __slots__:
setattr(self, slot, data.get(slot, None))
class NodeShortDTO:
def __init__(self, data):
for slot in ["num", "long_name", "short_name"]:
setattr(self, slot, data.get(slot, None)) setattr(self, slot, data.get(slot, None))

68
webExtensions/nodeList.py

@ -4,7 +4,7 @@ from fastapi.requests import Request
from fastapi import Query from fastapi import Query
from pymongo.asynchronous.database import AsyncDatabase from pymongo.asynchronous.database import AsyncDatabase
from extra.NodeDTO import NodeDTO from extra.NodeDTO import NodeDTO, NodeShortDTO
from typing import List, Annotated from typing import List, Annotated
class WebExtension: class WebExtension:
@ -13,61 +13,29 @@ class WebExtension:
def __init__(self, core): def __init__(self, core):
self.core = core self.core = core
self.app = core.app self.app = core.app
self.dbStore = core.dbStore self.dbService = core.dbService
@self.app.get(f"{self.core.context}/nodes/list") @self.app.get(f"{self.core.context}/nodes/list")
@self.core.authManager.authRequest() @self.core.authManager.authRequest()
async def listOfNodes(request: Request): async def listOfNodes(request: Request):
pipeline = [ #ai slooop l = await self.dbService.listOfNodes()
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [NodeDTO(node) for node in l] return [NodeDTO(node) for node in l]
@self.app.get(f"{self.core.context}/nodes/direct") @self.app.get(f"{self.core.context}/nodes/direct")
@self.core.authManager.authRequest() @self.core.authManager.authRequest()
async def listOfDirectNodes(request: Request): async def listOfDirectNodes(request: Request):
pipeline = [ #ai slooop l = await self.dbService.listOfDirectNodes()
{"$match": {"hops_away": 0}}, # Фильтруем по списку
# Сортировка для каждого num по убыванию ts
{"$sort": {"num": 1, "ts": -1}},
# Группировка по num и взятие первого (последнего) документа
{"$group": {
"_id": "$num",
"latest_doc": {"$first": "$$ROOT"}
}},
# Замена корневого документа
{"$replaceRoot": {"newRoot": "$latest_doc"}},
# Сортировка результата (опционально)
{"$sort": {"num": 1}}
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [NodeDTO(node) for node in l] return [NodeDTO(node) for node in l]
@self.app.get(self.core.context + "/nodes/search")
async def listOfFindLikeName(name=str):
l = await self.dbService.listOfFindLikeName(name)
return [NodeShortDTO(node) for node in l]
@self.app.get(self.core.context + "/nodes/{num}") @self.app.get(self.core.context + "/nodes/{num}")
@self.core.authManager.authRequest() @self.core.authManager.authRequest()
async def oneNode(request: Request, num: int): async def oneNode(request: Request, num: int):
collection = self.dbStore['node_info'] c = await self.dbService.oneNode(num)
c = await collection.find_one(
{"num":num},
sort=[("ts", -1)]
)
if c: if c:
return NodeDTO(c) return NodeDTO(c)
else: else:
@ -76,19 +44,5 @@ class WebExtension:
@self.app.get(self.core.context + "/nodes") @self.app.get(self.core.context + "/nodes")
@self.core.authManager.authRequest() @self.core.authManager.authRequest()
async def listOfSelectedNodes(request: Request, nums: List[int] = Query(None)): async def listOfSelectedNodes(request: Request, nums: List[int] = Query(None)):
pipeline = [ l = await self.dbService.listOfSelectedNodes(nums)
{"$match": {"num": {"$in": nums}}}, # Фильтруем по списку
{"$sort": {"num": 1, "ts": -1}}, # Сортируем по num и ts (по убыванию)
{
"$group": {
"_id": "$num", # Группируем по num
"latest_doc": {"$first": "$$ROOT"} # Берем первую (последнюю по ts)
}
},
{"$replaceRoot": {"newRoot": "$latest_doc"}}, # Восстанавливаем структуру
{"$sort": {"num": 1}} # Сортируем результат по num
]
collection = self.dbStore['node_info']
c = await collection.aggregate(pipeline)
l = await c.to_list()
return [NodeDTO(node) for node in l] return [NodeDTO(node) for node in l]

13
webExtensions/publicEndpoints.py

@ -8,6 +8,7 @@ class WebExtension:
def __init__(self, core): def __init__(self, core):
self.core = core self.core = core
self.app = core.app self.app = core.app
self.dbService = core.dbService
@self.app.get(f"{self.core.context}/ping") @self.app.get(f"{self.core.context}/ping")
async def pong(request:Request): async def pong(request:Request):
@ -24,7 +25,7 @@ class WebExtension:
async def sendCodeToNode(request:Request, num: int): async def sendCodeToNode(request:Request, num: int):
#в сообщение одноразовый код и юзер агент кто слал, так-же чтобы не спапили по фингерпринту один код в минуту #в сообщение одноразовый код и юзер агент кто слал, так-же чтобы не спапили по фингерпринту один код в минуту
if num: if num:
node = await self.getNodeInfo(num) node = await self.dbService.oneNode(num)
if node: if node:
code = self.core.authManager.request_auth(num) code = self.core.authManager.request_auth(num)
await self.core.device.sendMsgToDM(f"Auth code: {code}", num) await self.core.device.sendMsgToDM(f"Auth code: {code}", num)
@ -47,12 +48,4 @@ class WebExtension:
@self.app.get(f"{self.core.context}/auth/logout") @self.app.get(f"{self.core.context}/auth/logout")
async def clearSession(request:Request): async def clearSession(request:Request):
return self.core.authManager.setAuth(JSONResponse({"status":True}), 0, True) return self.core.authManager.setAuth(JSONResponse({"status":True}), 0, True)
async def getNodeInfo(self, num:int):
collection = self.core.dbStore['node_info']
c = await collection.find_one(
{"num":num},
sort=[("ts", -1)]
)
return c
Loading…
Cancel
Save