Browse Source

async features

master
gsd 8 months ago
parent
commit
46f181492a
  1. 36
      config_parser.py
  2. 48
      nvr_core.py
  3. 47
      nvr_types.py
  4. 38
      server.py

36
config_parser.py

@ -1,6 +1,7 @@
import os, sys import os, sys
from json import loads from json import loads
from dvrip import DVRIPCam from asyncio_dvrip import DVRIPCam
import asyncio
from nvr_core import NVR from nvr_core import NVR
from nvr_types import File from nvr_types import File
@ -18,6 +19,7 @@ def load_config(config_name):
sys.exit(1) sys.exit(1)
class Recorder: class Recorder:
loop = asyncio.get_event_loop()
def __init__(self, address, port, username, password, name = ""): def __init__(self, address, port, username, password, name = ""):
self.address = address self.address = address
self.port = int(port) self.port = int(port)
@ -26,12 +28,9 @@ class Recorder:
self.name = name self.name = name
@property @property
def client(self) -> DVRIPCam: def nvr(self):
return DVRIPCam(self.address, port = self.port, user = self.username, password = self.password) client = DVRIPCam(self.address, port = self.port, user = self.username, password = self.password)
return NVR(client, self.loop)
@property
def nvr(self) -> NVR:
return NVR(self.client)
def __str__(self) -> str: def __str__(self) -> str:
if not self.name: if not self.name:
@ -57,25 +56,4 @@ class Config:
return self.recorders[index] return self.recorders[index]
def getRecorders(self): def getRecorders(self):
return [str(r) for r in self.recorders] return [str(r) for r in self.recorders]
if __name__ == "__main__":
print(app_dir())
config = Config()
recorder: Recorder = config.getRecorder()
nvr: NVR = recorder.nvr
nvr.login()
f: File = File.from_b64("eyJiZWdpbiI6ICIyMDI0LTA4LTA2IDAyOjI3OjQxIiwgImVuZCI6ICIyMDI0LTA4LTA2IDAyOjI5OjQxIiwgIkRpc2tObyI6IDAsICJTZXJpYWxObyI6IDAsICJzaXplIjogMTI2NTMzNjMyLCAiZmlsZW5hbWUiOiAiL2lkZWEwLzIwMjQtMDgtMDYvMDAxLzAyLjI3LjQxLTAyLjI5LjQxW01dW0A3NjRlXVswXS5oMjY0IiwgImZpbGVuYW1lX2NsZWFyZWQiOiAiMDIuMjcuNDEtMDIuMjkuNDFfTV9fXzc2NGVfXzBfLmgyNjQiLCAiY2hhbm5lbCI6IDAsICJzdHJlYW0iOiAwfQ==")
nvr.save_file(f)
nvr.logout()
#client: DVRIPCam = recorder.client
#client.debug()
#if not client.login():
# print("can't login")
# sys.exit(2)
#print(client.get_system_info())
#print(client.get_channel_titles())
#print(client.get_channel_statuses())
#print(client.list_local_files(START, END, "h264", channel=3))
#client.close()

48
nvr_core.py

@ -1,5 +1,6 @@
import asyncio
from datetime import datetime from datetime import datetime
from dvrip import DVRIPCam from asyncio_dvrip import DVRIPCam
from nvr_types import File as NvrFile from nvr_types import File as NvrFile
from nvr_types import list_local_files from nvr_types import list_local_files
@ -17,47 +18,46 @@ def date_today(begin = True):
return datetime.now().strftime("%Y-%m-%d 23:59:59") return datetime.now().strftime("%Y-%m-%d 23:59:59")
class NVR: class NVR:
def __init__(self, client) -> None: def __init__(self, client, loop) -> None:
self.client:DVRIPCam = client self.client:DVRIPCam = client
self.loop = loop
def login(self): async def login(self):
self.client.login() await self.client.login(self.loop)
def logout(self): def logout(self):
self.client.close() self.client.close()
@property async def channels(self):
def channels(self): return await self.client.get_command("ChannelTitle", 1048)
return self.client.get_channel_titles()
def files(self, channel, start = None, end = None, ftype = H264, stype = SECONDARY_STREAM, json = False): async def files(self, channel, start = None, end = None, ftype = H264, stype = SECONDARY_STREAM, json = False):
if not start: if not start:
start = date_today() start = date_today()
if not end: if not end:
end = date_today(False) end = date_today(False)
print("Search files", start, end) print("Search files", start, end)
for raw_file in list_local_files(self.client, startTime=start, endTime=end, filetype=ftype, channel=channel, streamType=stype): for raw_file in await list_local_files(self.client, startTime=start, endTime=end, filetype=ftype, channel=channel, streamType=stype):
if json: if json:
yield NvrFile(raw_file, channel, stype).json yield NvrFile(raw_file, channel, stype).json
else: else:
yield NvrFile(raw_file, channel, stype) yield NvrFile(raw_file, channel, stype)
def stream_file(self, file: NvrFile): async def stream_file(self, file: NvrFile) -> bytes:
return file.generate_bytes(self.client) len_data = await file.generate_first_bytes(self.client)
print("len data =",len_data)
def save_file(self, file:NvrFile, savePath = "out.unknown"): if (len_data is None):
yield b""
else:
async for chunk in file.get_file_stream(self.client, len_data):
if (chunk == None):
break
yield chunk
async def save_file(self, file:NvrFile, savePath = "out.unknown"):
downloaded_bytes = 0 downloaded_bytes = 0
with open(savePath, "wb") as f: with open(savePath, "wb") as f:
for byte in file.generate_bytes(self.client): async for byte in file.generate_bytes(self.client):
f.write(byte) f.write(byte)
downloaded_bytes += len(byte) downloaded_bytes += len(byte)
print("\r", downloaded_bytes, "/", file.size) print("\r", downloaded_bytes, "/", file.size)
def download_test(self, filename = "testfile.unknown"):
download_file = list(self.files(0))[0]
downloaded_bytes = 0
#with open(filename, "wb") as f:
# for byte in download_file.download_stream(self.client):
# downloaded_bytes += len(byte)
# f.write(byte)
# print("\r", downloaded_bytes, "/", download_file.size)

47
nvr_types.py

@ -1,5 +1,5 @@
from datetime import datetime from datetime import datetime
from dvrip import DVRIPCam from asyncio_dvrip import DVRIPCam
import json import json
import struct import struct
import base64 import base64
@ -49,9 +49,10 @@ class File:
print(data) print(data)
return File(data, data.get("channel"), data.get("stream")) return File(data, data.get("channel"), data.get("stream"))
def generate_bytes(self, client:DVRIPCam, version = 0): async def generate_first_bytes(self, client:DVRIPCam, version = 0):
client.logger.debug("init request")
#init request #init request
client.send( await client.send(
1424, 1424,
{ {
"Name": "OPPlayBack", "Name": "OPPlayBack",
@ -69,7 +70,7 @@ class File:
}, },
}, },
) )
client.logger.debug("download request")
#download request #download request
msg = 1420 msg = 1420
data = { data = {
@ -88,11 +89,12 @@ class File:
}, },
} }
if client.socket is None: #if client.socket_writer is None:
#todo raise error # client.logger.debug("socket writer is null")
return [] # await client.connect()
client.busy.acquire() client.logger.debug("Blocking busy")
await client.busy.acquire()
if hasattr(data, "__iter__"): if hasattr(data, "__iter__"):
if version == 1: if version == 1:
data["SessionID"] = f"{client.session:#0{12}x}" data["SessionID"] = f"{client.session:#0{12}x}"
@ -116,10 +118,13 @@ class File:
+ data + data
+ tail + tail
) )
client.logger.debug("Send first package")
client.socket_send(pkt) client.socket_send(pkt)
data = client.socket_recv(20) client.logger.debug("Grab first response")
data = await client.socket_recv(20)
client.logger.debug(data)
if data is None or len(data) < 20: if data is None or len(data) < 20:
return [] return None
( (
head, head,
version, version,
@ -128,20 +133,20 @@ class File:
msgid, msgid,
len_data, len_data,
) = struct.unpack("BB2xII2xHI", data) ) = struct.unpack("BB2xII2xHI", data)
return self.get_file_stream(client, len_data) return len_data
def get_file_stream(self, client: DVRIPCam, first_chunk_size): async def get_file_stream(self, client: DVRIPCam, first_chunk_size) -> bytes:
yield client.receive_with_timeout(first_chunk_size) yield bytes(await client.receive_with_timeout(first_chunk_size))
while True: while True:
header = client.receive_with_timeout(20) header = await client.receive_with_timeout(20)
len_data = struct.unpack("I", header[16:])[0] len_data = struct.unpack("I", header[16:])[0]
if len_data == 0: if len_data == 0:
break break
yield client.receive_with_timeout(len_data) yield bytes(await client.receive_with_timeout(len_data))
client.busy.release() await client.busy.release()
client.send( await client.send(
1420, 1420,
{ {
"Name": "OPPlayBack", "Name": "OPPlayBack",
@ -160,12 +165,12 @@ class File:
}, },
}, },
) )
yield b"" yield bytes(b"")
def list_local_files(client: DVRIPCam, startTime, endTime, filetype, channel = 0, streamType = 0): async def list_local_files(client: DVRIPCam, startTime, endTime, filetype, channel = 0, streamType = 0):
# 1440 OPFileQuery # 1440 OPFileQuery
result = [] result = []
data = client.send( data = await client.send(
1440, 1440,
{ {
"Name": "OPFileQuery", "Name": "OPFileQuery",
@ -203,7 +208,7 @@ def list_local_files(client: DVRIPCam, startTime, endTime, filetype, channel = 0
max_event["status"] = "run" max_event["status"] = "run"
while len(data["OPFileQuery"]) == 64 or max_event["status"] == "limit": while len(data["OPFileQuery"]) == 64 or max_event["status"] == "limit":
newStartTime = data["OPFileQuery"][-1]["BeginTime"] newStartTime = data["OPFileQuery"][-1]["BeginTime"]
data = client.send( data = await client.send(
1440, 1440,
{ {
"Name": "OPFileQuery", "Name": "OPFileQuery",

38
server.py

@ -1,5 +1,5 @@
from fastapi import FastAPI, Response from fastapi import FastAPI, Response, BackgroundTasks
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse, FileResponse
import uvicorn import uvicorn
import traceback import traceback
@ -17,7 +17,7 @@ class Server:
def setup_events(self): def setup_events(self):
@self.app.on_event('startup') @self.app.on_event('startup')
def on_startup(): async def on_startup():
print("i am alive") print("i am alive")
def setup_routers(self): def setup_routers(self):
@ -30,16 +30,13 @@ class Server:
response.status_code = 400 response.status_code = 400
return {"ok":False, "error":e} return {"ok":False, "error":e}
#@self.app.get("/{recorder_index}")
#async def getRecorder(recorder_index:int):
# return self.config.getRecorder(recorder_index).nvr
@self.app.get("/api/recorders/{recorder_index}/channels", status_code=200) @self.app.get("/api/recorders/{recorder_index}/channels", status_code=200)
async def getRecorder(response: Response, recorder_index:int): async def getRecorder(response: Response, recorder_index:int):
try: try:
nvr:NVR = self.config.getRecorder(recorder_index).nvr nvr:NVR = self.config.getRecorder(recorder_index).nvr
nvr.login() await nvr.login()
return {"ok":True, "data":nvr.channels} channels = await nvr.channels()
return {"ok":True, "data":channels}
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
response.status_code = 400 response.status_code = 400
@ -51,8 +48,8 @@ class Server:
async def getHistory(response: Response, recorder_index:int, channel: int, stream: int, start_date:str = None, end_date:str = None): async def getHistory(response: Response, recorder_index:int, channel: int, stream: int, start_date:str = None, end_date:str = None):
try: try:
nvr:NVR = self.config.getRecorder(recorder_index).nvr nvr:NVR = self.config.getRecorder(recorder_index).nvr
nvr.login() await nvr.login()
return {"ok":True, "data":list(nvr.files(channel, start_date, end_date, stype=stream, json=False))} return {"ok":True, "data":[f async for f in nvr.files(channel, start_date, end_date, stype=stream, json=False)]}
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
response.status_code = 400 response.status_code = 400
@ -61,23 +58,30 @@ class Server:
nvr.logout() nvr.logout()
@self.app.get("/api/recorders/{recorder_index}/file") @self.app.get("/api/recorders/{recorder_index}/file")
async def getFile(response: Response, recorder_index:int, b64:str): async def getFile(response: Response, recorder_index:int, b64:str, background_tasks: BackgroundTasks):
try: try:
if len(b64) == 0: if len(b64) == 0:
response.status_code = 404 response.status_code = 404
return "" return ""
nvr:NVR = self.config.getRecorder(recorder_index).nvr nvr:NVR = self.config.getRecorder(recorder_index).nvr
nvr.login() await nvr.login()
nvr.client.debug()
file: File = File.from_b64(b64 + "==") file: File = File.from_b64(b64 + "==")
print("open") print("open")
return StreamingResponse(file.generate_bytes(nvr.client))
async def after():
try:
await nvr.client.busy.release()
except:
print("Already released")
nvr.logout()
background_tasks.add_task(after)
return StreamingResponse(nvr.stream_file(file), media_type="application/octet-stream")
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
response.status_code = 400 response.status_code = 400
return {"ok":False, "error":e} return {"ok":False, "error":e}
finally:
nvr.logout()
def run(self): def run(self):
uvicorn.run( uvicorn.run(

Loading…
Cancel
Save