You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

286 lines
12 KiB

import asyncio
from fastapi import FastAPI, Response, BackgroundTasks, Header, Request
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.exceptions import HTTPException
import uvicorn
import traceback
import aiofiles
from config_parser import Config as ConfigParser
from config_parser import TranscodeStatus, TranscodeTools, Go2Rtc, NeedNVR
from global_funcs import create_logger, hexed
from nvr_core import NVR
from nvr_types import File
import os
import time
class Server:
app: FastAPI = FastAPI()
config: ConfigParser = ConfigParser()
go2rtc: Go2Rtc = Go2Rtc()
security = HTTPBasic()
API_BASE_REF = "/api/dvrip"
def __init__(self):
self.logger = create_logger(Server.__name__)
self.setup_events()
self.setup_routers()
self.setup_middleware()
def setup_events(self):
@self.app.on_event('startup')
async def on_startup():
for i in range(0, len(self.config.recorders)):
nvr:NVR = self.config.getRecorder(i).nvr
await nvr.login()
channels = await nvr.channels()
nvr.logout()
self.config.recorders[i].channels = len(channels)
self.logger.info(f"{self.config.recorders[i]} channels count: {self.config.recorders[i].channels}")
await self.go2rtc.start_go2rtc(self.config.recorders)
asyncio.create_task(self.config.transcode_tools.queue_images_task())
asyncio.create_task(self.config.transcode_tools.queue_encoding_task())
def setup_middleware(self):
if len(self.config.auth) != 0:
self.logger.info("auth is enabled")
@self.app.middleware("http")
async def auth_handler(request: Request, call_next):
try:
s = await self.security(request)
if s.username not in self.config.auth.mapping or hexed(s.password) != self.config.auth.mapping[s.username]:
raise HTTPException(status_code=401, headers={"WWW-Authenticate": "Basic"})
except HTTPException as e:
return Response(status_code=e.status_code, headers=e.headers)
return await call_next(request)
@self.app.get(self.API_BASE_REF + "/logout")
async def logout(request: Request):
request.scope.update(headers={"Authorization":"b"})
return Response(status_code=401)
else:
self.logger.warn("auth is disabled")
def setup_routers(self):
@self.app.get(self.API_BASE_REF + "/ping", status_code=200)
async def getPing():
return {"pong":time.time()}
@self.app.get(self.API_BASE_REF, status_code=200)
async def getRecorders(response: Response):
try:
return {"ok":True, "data":self.config.getRecorders()}
except Exception as e:
traceback.print_exc()
response.status_code = 400
return {"ok":False, "error":e}
@self.app.get(self.API_BASE_REF + "/channels/{recorder_index}", status_code=200)
async def getRecorder(response: Response, recorder_index:int):
try:
nvr:NVR = self.config.getRecorder(recorder_index).nvr
await nvr.login()
channels = await nvr.channels()
overrides = []
for channel in channels:
overrides.append(self.config.getRecorder(recorder_index).name_overrides.get(channel, channel))
return {"ok":True, "data":overrides}
except Exception as e:
traceback.print_exc()
response.status_code = 400
return {"ok":False, "error":e}
finally:
nvr.logout()
@self.app.get(self.API_BASE_REF + "/history/{recorder_index}/{channel}/{stream}")
async def getHistory(response: Response, recorder_index:int, channel: int, stream: int, start_date:str = None, end_date:str = None):
try:
nvr:NVR = self.config.getRecorder(recorder_index).nvr
await nvr.login()
return {"ok":True, "data":[f async for f in nvr.files(channel, start_date, end_date, stype=stream, json=True)]}
except Exception as e:
traceback.print_exc()
response.status_code = 400
return {"ok":False, "error":e}
finally:
nvr.logout()
@self.app.get(self.API_BASE_REF + "/snapshot/{recorder_index}/{channel}")
async def getSnapshot(response: Response, recorder_index:int, channel: int):
try:
nvr:NVR = self.config.getRecorder(recorder_index).nvr
await nvr.login()
data = await nvr.client.snapshot(channel)
return Response(content=bytes(data), media_type="image/jpg")
except Exception as e:
traceback.print_exc()
response.status_code = 400
return b""
finally:
nvr.logout()
@self.app.get(self.API_BASE_REF + "/file/{recorder_index}")
async def getFile(response: Response, recorder_index:int, b64:str, background_tasks: BackgroundTasks):
try:
if len(b64) == 0:
response.status_code = 404
return ""
nvr:NVR = self.config.getRecorder(recorder_index).nvr
await nvr.login()
file: File = File.from_b64(b64 + "==")
async def after():
try:
await nvr.client.busy.release()
except:
pass
nvr.logout()
background_tasks.add_task(after)
headers = {}
headers.update({"Content-Length":str(file.size)})
headers.update({"Content-Disposition": f'attachment; filename="{file.filename_cleared}.{file.type}"'})
return StreamingResponse(nvr.stream_file(file), media_type="application/octet-stream", headers=headers)
except Exception as e:
traceback.print_exc()
response.status_code = 400
return {"ok":False, "error":e}
@self.app.get(self.API_BASE_REF + "/transcode/status/{recorder_index}")
async def getTranscodeStatus(response: Response, recorder_index:int, b64:str, background_tasks: BackgroundTasks, recreate:bool = False):
try:
if len(b64) == 0:
response.status_code = 404
return ""
if b64 in self.config.transcode_tools.statuses:
if not recreate:
return {"ok":True, "data":self.config.transcode_tools.statuses[b64]}
nvr:NVR = self.config.getRecorder(recorder_index).nvr
file: File = File.from_b64(b64 + "==")
self.config.transcode_tools.statuses[b64] = TranscodeStatus(b64)
background_tasks.add_task(self.config.transcode_tools.processing_safe, status = self.config.transcode_tools.statuses[b64], file = file, nvr = nvr, reCreate = recreate)
return {"ok":True, "data":self.config.transcode_tools.statuses[b64]}
except Exception as e:
traceback.print_exc()
response.status_code = 400
return {"ok":False, "error":e}
@self.app.get(self.API_BASE_REF + "/transcode/download")
async def getTranscodeDownload(response: Response, b64:str):
try:
if len(b64) == 0:
response.status_code = 404
return ""
if not b64 in self.config.transcode_tools.statuses:
response.status_code = 404
return ""
if self.config.transcode_tools.statuses[b64].done:
headers = {}
headers.update({"Content-Length":str(self.config.transcode_tools.statuses[b64].outSize)})
headers.update({"Content-Disposition": f'attachment; filename="{self.config.transcode_tools.statuses[b64].outName}"'})
return StreamingResponse(self.config.transcode_tools.statuses[b64].generate_bytes(), media_type="application/octet-stream", headers=headers)
else:
response.status_code = 429
return ""
except Exception as e:
traceback.print_exc()
response.status_code = 400
return {"ok":False, "error":e}
@self.app.get(self.API_BASE_REF + "/transcode/stream")
async def getTranscodeStream(response: Response, b64:str, range: str = Header(None), chunk_size:int = 256):
try:
if len(b64) == 0:
response.status_code = 404
return ""
if not b64 in self.config.transcode_tools.statuses:
response.status_code = 404
return ""
if self.config.transcode_tools.statuses[b64].done:
start, end = range.replace("bytes=", "").split("-")
start = int(start)
end = int(end) if end else start + 1024 * chunk_size
async with aiofiles.open(self.config.transcode_tools.statuses[b64].outFile, "rb") as video:
await video.seek(start)
data = await video.read(end - start)
headers = {
'Content-Range': f'bytes {str(start)}-{str(end)}/{self.config.transcode_tools.statuses[b64].outSize}',
'Accept-Ranges': 'bytes'
}
return Response(data, status_code=206, headers=headers, media_type="video/mp4")
else:
response.status_code = 429
return ""
except Exception as e:
traceback.print_exc()
response.status_code = 400
return {"ok":False, "error":e}
@self.app.get(self.API_BASE_REF + "/preview")
async def getNullPreview():
return FileResponse("./assets/loading.webp")
@self.app.get(self.API_BASE_REF + "/preview/{recorder_index}")
async def getTranscodePreview(response: Response, recorder_index:int, b64:str, background_tasks: BackgroundTasks, status:bool = False):
try:
if len(b64) == 0:
response.status_code = 404
return "" if not status else {"status":-1}
file: File = File.from_b64(b64 + "==")
async def loadPreview(file):
nvr:NVR = self.config.getRecorder(recorder_index).nvr
await self.config.transcode_tools.processing_preview_task(file, nvr, "webp")
try:
preview = await self.config.transcode_tools.processing_preview(file, None, "webp")
headers = {}
try:
headers.update({"Content-Length":str(os.path.getsize(preview))})
if headers["Content-Length"] == 0:
raise NeedNVR
except:
raise NeedNVR
headers.update({"Content-Disposition": f'attachment; filename="preview.webp"'})
return FileResponse(preview, media_type="application/octet-stream", headers=headers) if not status else {"status":1}
except NeedNVR:
background_tasks.add_task(loadPreview, file = file)
return FileResponse("./assets/loading.webp") if not status else {"status":0}
except Exception as e:
traceback.print_exc()
response.status_code = 400
return {"ok":False, "error":e}
@self.app.get(self.API_BASE_REF + "/stream/{recorder_index}/{channel_index}/{stream_index}")
async def getGo2RtcStream(recorder_index, channel_index, stream_index):
return self.go2rtc.get_stream(recorder_index, channel_index, stream_index)
def run(self):
uvicorn.run(
self.app,
host=self.config.listen_address,
port=self.config.listen_port,
)
if __name__ == "__main__":
Server().run()