You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
286 lines
12 KiB
286 lines
12 KiB
import asyncio
|
|
|
|
from fastapi import FastAPI, Response, BackgroundTasks, Header, Request
|
|
from fastapi.responses import StreamingResponse, FileResponse
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
from fastapi.exceptions import HTTPException
|
|
import uvicorn
|
|
import traceback
|
|
import aiofiles
|
|
|
|
from config_parser import Config as ConfigParser
|
|
from config_parser import TranscodeStatus, TranscodeTools, Go2Rtc, NeedNVR
|
|
from global_funcs import create_logger, hexed
|
|
from nvr_core import NVR
|
|
from nvr_types import File
|
|
import os
|
|
import time
|
|
|
|
class Server:
|
|
app: FastAPI = FastAPI()
|
|
config: ConfigParser = ConfigParser()
|
|
go2rtc: Go2Rtc = Go2Rtc()
|
|
security = HTTPBasic()
|
|
|
|
API_BASE_REF = "/api/dvrip"
|
|
|
|
def __init__(self):
|
|
self.logger = create_logger(Server.__name__)
|
|
self.setup_events()
|
|
self.setup_routers()
|
|
self.setup_middleware()
|
|
|
|
def setup_events(self):
|
|
@self.app.on_event('startup')
|
|
async def on_startup():
|
|
for i in range(0, len(self.config.recorders)):
|
|
nvr:NVR = self.config.getRecorder(i).nvr
|
|
await nvr.login()
|
|
channels = await nvr.channels()
|
|
nvr.logout()
|
|
self.config.recorders[i].channels = len(channels)
|
|
self.logger.info(f"{self.config.recorders[i]} channels count: {self.config.recorders[i].channels}")
|
|
await self.go2rtc.start_go2rtc(self.config.recorders)
|
|
|
|
asyncio.create_task(self.config.transcode_tools.queue_images_task())
|
|
asyncio.create_task(self.config.transcode_tools.queue_encoding_task())
|
|
|
|
def setup_middleware(self):
|
|
if len(self.config.auth) != 0:
|
|
self.logger.info("auth is enabled")
|
|
|
|
@self.app.middleware("http")
|
|
async def auth_handler(request: Request, call_next):
|
|
try:
|
|
s = await self.security(request)
|
|
if s.username not in self.config.auth.mapping or hexed(s.password) != self.config.auth.mapping[s.username]:
|
|
raise HTTPException(status_code=401, headers={"WWW-Authenticate": "Basic"})
|
|
except HTTPException as e:
|
|
return Response(status_code=e.status_code, headers=e.headers)
|
|
|
|
return await call_next(request)
|
|
|
|
@self.app.get(self.API_BASE_REF + "/logout")
|
|
async def logout(request: Request):
|
|
request.scope.update(headers={"Authorization":"b"})
|
|
return Response(status_code=401)
|
|
|
|
else:
|
|
self.logger.warn("auth is disabled")
|
|
|
|
def setup_routers(self):
|
|
@self.app.get(self.API_BASE_REF + "/ping", status_code=200)
|
|
async def getPing():
|
|
return {"pong":time.time()}
|
|
|
|
@self.app.get(self.API_BASE_REF, status_code=200)
|
|
async def getRecorders(response: Response):
|
|
try:
|
|
return {"ok":True, "data":self.config.getRecorders()}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return {"ok":False, "error":e}
|
|
|
|
@self.app.get(self.API_BASE_REF + "/channels/{recorder_index}", status_code=200)
|
|
async def getRecorder(response: Response, recorder_index:int):
|
|
try:
|
|
nvr:NVR = self.config.getRecorder(recorder_index).nvr
|
|
await nvr.login()
|
|
channels = await nvr.channels()
|
|
overrides = []
|
|
for channel in channels:
|
|
overrides.append(self.config.getRecorder(recorder_index).name_overrides.get(channel, channel))
|
|
return {"ok":True, "data":overrides}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return {"ok":False, "error":e}
|
|
finally:
|
|
nvr.logout()
|
|
|
|
@self.app.get(self.API_BASE_REF + "/history/{recorder_index}/{channel}/{stream}")
|
|
async def getHistory(response: Response, recorder_index:int, channel: int, stream: int, start_date:str = None, end_date:str = None):
|
|
try:
|
|
nvr:NVR = self.config.getRecorder(recorder_index).nvr
|
|
await nvr.login()
|
|
return {"ok":True, "data":[f async for f in nvr.files(channel, start_date, end_date, stype=stream, json=True)]}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return {"ok":False, "error":e}
|
|
finally:
|
|
nvr.logout()
|
|
|
|
@self.app.get(self.API_BASE_REF + "/snapshot/{recorder_index}/{channel}")
|
|
async def getSnapshot(response: Response, recorder_index:int, channel: int):
|
|
try:
|
|
nvr:NVR = self.config.getRecorder(recorder_index).nvr
|
|
await nvr.login()
|
|
data = await nvr.client.snapshot(channel)
|
|
|
|
return Response(content=bytes(data), media_type="image/jpg")
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return b""
|
|
finally:
|
|
nvr.logout()
|
|
|
|
@self.app.get(self.API_BASE_REF + "/file/{recorder_index}")
|
|
async def getFile(response: Response, recorder_index:int, b64:str, background_tasks: BackgroundTasks):
|
|
try:
|
|
if len(b64) == 0:
|
|
response.status_code = 404
|
|
return ""
|
|
nvr:NVR = self.config.getRecorder(recorder_index).nvr
|
|
await nvr.login()
|
|
file: File = File.from_b64(b64 + "==")
|
|
|
|
async def after():
|
|
try:
|
|
await nvr.client.busy.release()
|
|
except:
|
|
pass
|
|
nvr.logout()
|
|
|
|
background_tasks.add_task(after)
|
|
headers = {}
|
|
headers.update({"Content-Length":str(file.size)})
|
|
headers.update({"Content-Disposition": f'attachment; filename="{file.filename_cleared}.{file.type}"'})
|
|
return StreamingResponse(nvr.stream_file(file), media_type="application/octet-stream", headers=headers)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return {"ok":False, "error":e}
|
|
|
|
@self.app.get(self.API_BASE_REF + "/transcode/status/{recorder_index}")
|
|
async def getTranscodeStatus(response: Response, recorder_index:int, b64:str, background_tasks: BackgroundTasks, recreate:bool = False):
|
|
try:
|
|
if len(b64) == 0:
|
|
response.status_code = 404
|
|
return ""
|
|
|
|
if b64 in self.config.transcode_tools.statuses:
|
|
if not recreate:
|
|
return {"ok":True, "data":self.config.transcode_tools.statuses[b64]}
|
|
|
|
nvr:NVR = self.config.getRecorder(recorder_index).nvr
|
|
file: File = File.from_b64(b64 + "==")
|
|
|
|
self.config.transcode_tools.statuses[b64] = TranscodeStatus(b64)
|
|
background_tasks.add_task(self.config.transcode_tools.processing_safe, status = self.config.transcode_tools.statuses[b64], file = file, nvr = nvr, reCreate = recreate)
|
|
return {"ok":True, "data":self.config.transcode_tools.statuses[b64]}
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return {"ok":False, "error":e}
|
|
|
|
@self.app.get(self.API_BASE_REF + "/transcode/download")
|
|
async def getTranscodeDownload(response: Response, b64:str):
|
|
try:
|
|
if len(b64) == 0:
|
|
response.status_code = 404
|
|
return ""
|
|
|
|
if not b64 in self.config.transcode_tools.statuses:
|
|
response.status_code = 404
|
|
return ""
|
|
|
|
if self.config.transcode_tools.statuses[b64].done:
|
|
headers = {}
|
|
headers.update({"Content-Length":str(self.config.transcode_tools.statuses[b64].outSize)})
|
|
headers.update({"Content-Disposition": f'attachment; filename="{self.config.transcode_tools.statuses[b64].outName}"'})
|
|
return StreamingResponse(self.config.transcode_tools.statuses[b64].generate_bytes(), media_type="application/octet-stream", headers=headers)
|
|
else:
|
|
response.status_code = 429
|
|
return ""
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return {"ok":False, "error":e}
|
|
|
|
@self.app.get(self.API_BASE_REF + "/transcode/stream")
|
|
async def getTranscodeStream(response: Response, b64:str, range: str = Header(None), chunk_size:int = 256):
|
|
try:
|
|
if len(b64) == 0:
|
|
response.status_code = 404
|
|
return ""
|
|
|
|
if not b64 in self.config.transcode_tools.statuses:
|
|
response.status_code = 404
|
|
return ""
|
|
|
|
if self.config.transcode_tools.statuses[b64].done:
|
|
start, end = range.replace("bytes=", "").split("-")
|
|
start = int(start)
|
|
end = int(end) if end else start + 1024 * chunk_size
|
|
async with aiofiles.open(self.config.transcode_tools.statuses[b64].outFile, "rb") as video:
|
|
await video.seek(start)
|
|
data = await video.read(end - start)
|
|
headers = {
|
|
'Content-Range': f'bytes {str(start)}-{str(end)}/{self.config.transcode_tools.statuses[b64].outSize}',
|
|
'Accept-Ranges': 'bytes'
|
|
}
|
|
return Response(data, status_code=206, headers=headers, media_type="video/mp4")
|
|
else:
|
|
response.status_code = 429
|
|
return ""
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return {"ok":False, "error":e}
|
|
|
|
@self.app.get(self.API_BASE_REF + "/preview")
|
|
async def getNullPreview():
|
|
return FileResponse("./assets/loading.webp")
|
|
|
|
@self.app.get(self.API_BASE_REF + "/preview/{recorder_index}")
|
|
async def getTranscodePreview(response: Response, recorder_index:int, b64:str, background_tasks: BackgroundTasks, status:bool = False):
|
|
try:
|
|
if len(b64) == 0:
|
|
response.status_code = 404
|
|
return "" if not status else {"status":-1}
|
|
|
|
file: File = File.from_b64(b64 + "==")
|
|
|
|
async def loadPreview(file):
|
|
nvr:NVR = self.config.getRecorder(recorder_index).nvr
|
|
await self.config.transcode_tools.processing_preview_task(file, nvr, "webp")
|
|
|
|
try:
|
|
preview = await self.config.transcode_tools.processing_preview(file, None, "webp")
|
|
headers = {}
|
|
try:
|
|
headers.update({"Content-Length":str(os.path.getsize(preview))})
|
|
if headers["Content-Length"] == 0:
|
|
raise NeedNVR
|
|
except:
|
|
raise NeedNVR
|
|
|
|
headers.update({"Content-Disposition": f'attachment; filename="preview.webp"'})
|
|
return FileResponse(preview, media_type="application/octet-stream", headers=headers) if not status else {"status":1}
|
|
except NeedNVR:
|
|
background_tasks.add_task(loadPreview, file = file)
|
|
return FileResponse("./assets/loading.webp") if not status else {"status":0}
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
response.status_code = 400
|
|
return {"ok":False, "error":e}
|
|
|
|
@self.app.get(self.API_BASE_REF + "/stream/{recorder_index}/{channel_index}/{stream_index}")
|
|
async def getGo2RtcStream(recorder_index, channel_index, stream_index):
|
|
return self.go2rtc.get_stream(recorder_index, channel_index, stream_index)
|
|
|
|
def run(self):
|
|
uvicorn.run(
|
|
self.app,
|
|
host=self.config.listen_address,
|
|
port=self.config.listen_port,
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
Server().run()
|