From 46f181492adccc118d31b6c5c039ead3dc77e955 Mon Sep 17 00:00:00 2001 From: gsd Date: Wed, 7 Aug 2024 01:53:35 +0300 Subject: [PATCH] async features --- config_parser.py | 36 +++++++----------------------------- nvr_core.py | 48 ++++++++++++++++++++++++------------------------ nvr_types.py | 47 ++++++++++++++++++++++++++--------------------- server.py | 38 +++++++++++++++++++++----------------- 4 files changed, 78 insertions(+), 91 deletions(-) diff --git a/config_parser.py b/config_parser.py index 34d07fe..f764247 100644 --- a/config_parser.py +++ b/config_parser.py @@ -1,6 +1,7 @@ import os, sys from json import loads -from dvrip import DVRIPCam +from asyncio_dvrip import DVRIPCam +import asyncio from nvr_core import NVR from nvr_types import File @@ -18,6 +19,7 @@ def load_config(config_name): sys.exit(1) class Recorder: + loop = asyncio.get_event_loop() def __init__(self, address, port, username, password, name = ""): self.address = address self.port = int(port) @@ -26,12 +28,9 @@ class Recorder: self.name = name @property - def client(self) -> DVRIPCam: - return DVRIPCam(self.address, port = self.port, user = self.username, password = self.password) - - @property - def nvr(self) -> NVR: - return NVR(self.client) + def nvr(self): + client = DVRIPCam(self.address, port = self.port, user = self.username, password = self.password) + return NVR(client, self.loop) def __str__(self) -> str: if not self.name: @@ -57,25 +56,4 @@ class Config: return self.recorders[index] def getRecorders(self): - return [str(r) for r in self.recorders] - -if __name__ == "__main__": - print(app_dir()) - config = Config() - recorder: Recorder = config.getRecorder() - nvr: NVR = recorder.nvr - nvr.login() - f: File = File.from_b64("eyJiZWdpbiI6ICIyMDI0LTA4LTA2IDAyOjI3OjQxIiwgImVuZCI6ICIyMDI0LTA4LTA2IDAyOjI5OjQxIiwgIkRpc2tObyI6IDAsICJTZXJpYWxObyI6IDAsICJzaXplIjogMTI2NTMzNjMyLCAiZmlsZW5hbWUiOiAiL2lkZWEwLzIwMjQtMDgtMDYvMDAxLzAyLjI3LjQxLTAyLjI5LjQxW01dW0A3NjRlXVswXS5oMjY0IiwgImZpbGVuYW1lX2NsZWFyZWQiOiAiMDIuMjcuNDEtMDIuMjkuNDFfTV9fXzc2NGVfXzBfLmgyNjQiLCAiY2hhbm5lbCI6IDAsICJzdHJlYW0iOiAwfQ==") - nvr.save_file(f) - nvr.logout() - - #client: DVRIPCam = recorder.client - #client.debug() - #if not client.login(): - # print("can't login") - # sys.exit(2) - #print(client.get_system_info()) - #print(client.get_channel_titles()) - #print(client.get_channel_statuses()) - #print(client.list_local_files(START, END, "h264", channel=3)) - #client.close() \ No newline at end of file + return [str(r) for r in self.recorders] \ No newline at end of file diff --git a/nvr_core.py b/nvr_core.py index 08505a7..8609588 100644 --- a/nvr_core.py +++ b/nvr_core.py @@ -1,5 +1,6 @@ +import asyncio from datetime import datetime -from dvrip import DVRIPCam +from asyncio_dvrip import DVRIPCam from nvr_types import File as NvrFile from nvr_types import list_local_files @@ -17,47 +18,46 @@ def date_today(begin = True): return datetime.now().strftime("%Y-%m-%d 23:59:59") class NVR: - def __init__(self, client) -> None: + def __init__(self, client, loop) -> None: self.client:DVRIPCam = client + self.loop = loop - def login(self): - self.client.login() + async def login(self): + await self.client.login(self.loop) def logout(self): self.client.close() - @property - def channels(self): - return self.client.get_channel_titles() + async def channels(self): + return await self.client.get_command("ChannelTitle", 1048) - def files(self, channel, start = None, end = None, ftype = H264, stype = SECONDARY_STREAM, json = False): + async def files(self, channel, start = None, end = None, ftype = H264, stype = SECONDARY_STREAM, json = False): if not start: start = date_today() if not end: end = date_today(False) print("Search files", start, end) - for raw_file in list_local_files(self.client, startTime=start, endTime=end, filetype=ftype, channel=channel, streamType=stype): + for raw_file in await list_local_files(self.client, startTime=start, endTime=end, filetype=ftype, channel=channel, streamType=stype): if json: yield NvrFile(raw_file, channel, stype).json else: yield NvrFile(raw_file, channel, stype) - def stream_file(self, file: NvrFile): - return file.generate_bytes(self.client) - - def save_file(self, file:NvrFile, savePath = "out.unknown"): + async def stream_file(self, file: NvrFile) -> bytes: + len_data = await file.generate_first_bytes(self.client) + print("len data =",len_data) + if (len_data is None): + yield b"" + else: + async for chunk in file.get_file_stream(self.client, len_data): + if (chunk == None): + break + yield chunk + + async def save_file(self, file:NvrFile, savePath = "out.unknown"): downloaded_bytes = 0 with open(savePath, "wb") as f: - for byte in file.generate_bytes(self.client): + async for byte in file.generate_bytes(self.client): f.write(byte) downloaded_bytes += len(byte) - print("\r", downloaded_bytes, "/", file.size) - - def download_test(self, filename = "testfile.unknown"): - download_file = list(self.files(0))[0] - downloaded_bytes = 0 - #with open(filename, "wb") as f: - # for byte in download_file.download_stream(self.client): - # downloaded_bytes += len(byte) - # f.write(byte) - # print("\r", downloaded_bytes, "/", download_file.size) \ No newline at end of file + print("\r", downloaded_bytes, "/", file.size) \ No newline at end of file diff --git a/nvr_types.py b/nvr_types.py index 22c6543..ffe4e61 100644 --- a/nvr_types.py +++ b/nvr_types.py @@ -1,5 +1,5 @@ from datetime import datetime -from dvrip import DVRIPCam +from asyncio_dvrip import DVRIPCam import json import struct import base64 @@ -49,9 +49,10 @@ class File: print(data) return File(data, data.get("channel"), data.get("stream")) - def generate_bytes(self, client:DVRIPCam, version = 0): + async def generate_first_bytes(self, client:DVRIPCam, version = 0): + client.logger.debug("init request") #init request - client.send( + await client.send( 1424, { "Name": "OPPlayBack", @@ -69,7 +70,7 @@ class File: }, }, ) - + client.logger.debug("download request") #download request msg = 1420 data = { @@ -88,11 +89,12 @@ class File: }, } - if client.socket is None: - #todo raise error - return [] + #if client.socket_writer is None: + # client.logger.debug("socket writer is null") + # await client.connect() - client.busy.acquire() + client.logger.debug("Blocking busy") + await client.busy.acquire() if hasattr(data, "__iter__"): if version == 1: data["SessionID"] = f"{client.session:#0{12}x}" @@ -116,10 +118,13 @@ class File: + data + tail ) + client.logger.debug("Send first package") client.socket_send(pkt) - data = client.socket_recv(20) + client.logger.debug("Grab first response") + data = await client.socket_recv(20) + client.logger.debug(data) if data is None or len(data) < 20: - return [] + return None ( head, version, @@ -128,20 +133,20 @@ class File: msgid, len_data, ) = struct.unpack("BB2xII2xHI", data) - return self.get_file_stream(client, len_data) + return len_data - def get_file_stream(self, client: DVRIPCam, first_chunk_size): - yield client.receive_with_timeout(first_chunk_size) + async def get_file_stream(self, client: DVRIPCam, first_chunk_size) -> bytes: + yield bytes(await client.receive_with_timeout(first_chunk_size)) while True: - header = client.receive_with_timeout(20) + header = await client.receive_with_timeout(20) len_data = struct.unpack("I", header[16:])[0] if len_data == 0: break - yield client.receive_with_timeout(len_data) - client.busy.release() - client.send( + yield bytes(await client.receive_with_timeout(len_data)) + await client.busy.release() + await client.send( 1420, { "Name": "OPPlayBack", @@ -160,12 +165,12 @@ class File: }, }, ) - yield b"" + yield bytes(b"") -def list_local_files(client: DVRIPCam, startTime, endTime, filetype, channel = 0, streamType = 0): +async def list_local_files(client: DVRIPCam, startTime, endTime, filetype, channel = 0, streamType = 0): # 1440 OPFileQuery result = [] - data = client.send( + data = await client.send( 1440, { "Name": "OPFileQuery", @@ -203,7 +208,7 @@ def list_local_files(client: DVRIPCam, startTime, endTime, filetype, channel = 0 max_event["status"] = "run" while len(data["OPFileQuery"]) == 64 or max_event["status"] == "limit": newStartTime = data["OPFileQuery"][-1]["BeginTime"] - data = client.send( + data = await client.send( 1440, { "Name": "OPFileQuery", diff --git a/server.py b/server.py index fe25682..495baa7 100644 --- a/server.py +++ b/server.py @@ -1,5 +1,5 @@ -from fastapi import FastAPI, Response -from fastapi.responses import StreamingResponse +from fastapi import FastAPI, Response, BackgroundTasks +from fastapi.responses import StreamingResponse, FileResponse import uvicorn import traceback @@ -17,7 +17,7 @@ class Server: def setup_events(self): @self.app.on_event('startup') - def on_startup(): + async def on_startup(): print("i am alive") def setup_routers(self): @@ -30,16 +30,13 @@ class Server: response.status_code = 400 return {"ok":False, "error":e} - #@self.app.get("/{recorder_index}") - #async def getRecorder(recorder_index:int): - # return self.config.getRecorder(recorder_index).nvr - @self.app.get("/api/recorders/{recorder_index}/channels", status_code=200) async def getRecorder(response: Response, recorder_index:int): try: nvr:NVR = self.config.getRecorder(recorder_index).nvr - nvr.login() - return {"ok":True, "data":nvr.channels} + await nvr.login() + channels = await nvr.channels() + return {"ok":True, "data":channels} except Exception as e: traceback.print_exc() response.status_code = 400 @@ -51,8 +48,8 @@ class Server: async def getHistory(response: Response, recorder_index:int, channel: int, stream: int, start_date:str = None, end_date:str = None): try: nvr:NVR = self.config.getRecorder(recorder_index).nvr - nvr.login() - return {"ok":True, "data":list(nvr.files(channel, start_date, end_date, stype=stream, json=False))} + await nvr.login() + return {"ok":True, "data":[f async for f in nvr.files(channel, start_date, end_date, stype=stream, json=False)]} except Exception as e: traceback.print_exc() response.status_code = 400 @@ -61,23 +58,30 @@ class Server: nvr.logout() @self.app.get("/api/recorders/{recorder_index}/file") - async def getFile(response: Response, recorder_index:int, b64:str): + async def getFile(response: Response, recorder_index:int, b64:str, background_tasks: BackgroundTasks): try: if len(b64) == 0: response.status_code = 404 return "" nvr:NVR = self.config.getRecorder(recorder_index).nvr - nvr.login() - + await nvr.login() + nvr.client.debug() file: File = File.from_b64(b64 + "==") print("open") - return StreamingResponse(file.generate_bytes(nvr.client)) + + async def after(): + try: + await nvr.client.busy.release() + except: + print("Already released") + nvr.logout() + + background_tasks.add_task(after) + return StreamingResponse(nvr.stream_file(file), media_type="application/octet-stream") except Exception as e: traceback.print_exc() response.status_code = 400 return {"ok":False, "error":e} - finally: - nvr.logout() def run(self): uvicorn.run(