You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
479 lines
19 KiB
479 lines
19 KiB
import traceback
|
|
from genericpath import exists
|
|
import os, sys
|
|
from asyncio_dvrip import DVRIPCam
|
|
import asyncio
|
|
from nvr_core import NVR
|
|
from nvr_types import File
|
|
import platform
|
|
import aiofiles
|
|
|
|
from subprocess import DEVNULL
|
|
|
|
from global_funcs import *
|
|
|
|
class NeedNVR(Exception):
|
|
pass
|
|
|
|
class ImageNoCreated(Exception):
|
|
def __init__(self, msg):
|
|
pass
|
|
|
|
class Recorder:
|
|
loop = asyncio.get_event_loop()
|
|
def __init__(self, address, port, username, password, name = "", index = 0, name_overrides = {}):
|
|
self.address = address
|
|
self.port = int(port)
|
|
self.username = username
|
|
self.password = password
|
|
self.name = name
|
|
self.index = index
|
|
self.channels = 0
|
|
self.nvr_index = 0
|
|
self.name_overrides = name_overrides
|
|
|
|
@property
|
|
def nvr(self):
|
|
client = DVRIPCam(self.address, port = self.port, user = self.username, password = self.password)
|
|
self.nvr_index += 1
|
|
return NVR(client, self.loop, self.nvr_index)
|
|
|
|
def __str__(self) -> str:
|
|
if not self.name:
|
|
return f"{self.address}:{self.port}"
|
|
else:
|
|
return self.name
|
|
|
|
class TranscodeStatus:
|
|
def __init__(self, b64) -> None:
|
|
self.b64 = b64
|
|
self.uuid = str(uuid_from_string(b64))
|
|
self.h264 = 0
|
|
self.downloaded_h264_bytes = 0
|
|
self.total_h264_bytes = 0
|
|
self.avi = 0
|
|
self.mp4 = 0
|
|
self.outFile = None
|
|
self.done = False
|
|
self.outSize = 0
|
|
self.error = ""
|
|
|
|
@property
|
|
def outName(self):
|
|
if self.outFile:
|
|
return os.path.split(self.outFile)[-1]
|
|
return ""
|
|
|
|
async def generate_bytes(self):
|
|
async with aiofiles.open(self.outFile, "rb") as out:
|
|
while True:
|
|
chunk = await out.read(32 * 1024)
|
|
if chunk:
|
|
yield chunk
|
|
else:
|
|
break
|
|
yield b""
|
|
|
|
class Go2RtcChannel:
|
|
#vhod_hd: dvrip://bfwc:[email protected]:34567?channel=3&subtype=0
|
|
def __init__(self, recorder: Recorder, count_of_streams = 2) -> None:
|
|
self.proto = "dvrip"
|
|
self.login = recorder.username
|
|
self.password = recorder.password
|
|
self.host = f"{recorder.address}:{recorder.port}"
|
|
self.recorder_index = recorder.index
|
|
self.count_of_channels = recorder.channels
|
|
self.count_of_streams = count_of_streams
|
|
|
|
def generate_lines(self):
|
|
lines = ""
|
|
for i in range(0, self.count_of_channels):
|
|
for j in range(0, self.count_of_streams):
|
|
lines += f" {self.recorder_index}_{i}_{j}: {self.proto}://{self.login}:{self.password}@{self.host}?channel={i}&subtype={j}\n"
|
|
return lines
|
|
|
|
class Go2Rtc:
|
|
WIN = "https://github.com/AlexxIT/go2rtc/releases/download/v1.9.4/go2rtc_win64.zip"
|
|
LNX = "https://github.com/AlexxIT/go2rtc/releases/download/v1.9.4/go2rtc_linux_amd64"
|
|
|
|
def __init__(self, port = 1984) -> None:
|
|
self.logger = create_logger(Go2Rtc.__name__)
|
|
self.port = port
|
|
self.enabled = False
|
|
try:
|
|
self.check_exists()
|
|
self.logger.info("Go2rtc support enabled")
|
|
except:
|
|
self.logger.error("Go2rtc is disabled")
|
|
pass
|
|
|
|
def check_exists(self):
|
|
go2rtc_directory = os.path.join(app_dir(), "go2rtc")
|
|
if os.path.exists(go2rtc_directory):
|
|
self.logger.debug("Go2Rtc directory exists")
|
|
if platform.system() == "Windows" and os.path.exists(os.path.join(go2rtc_directory, "go2rtc.exe")):
|
|
self.logger.debug("[WIN] Go2Rtc is exists, continue create config")
|
|
self.exec = os.path.join(go2rtc_directory, "go2rtc.exe")
|
|
self.enabled = True
|
|
elif platform.system() == "Linux" and os.path.exists(os.path.join(go2rtc_directory, "go2rtc")):
|
|
self.logger.debug("[LNX] Go2Rtc is exists, continue create config")
|
|
self.exec = os.path.join(go2rtc_directory, "go2rtc")
|
|
self.enabled = True
|
|
else:
|
|
self.logger.error(f"Unknown platform: {platform.system()}")
|
|
raise Exception(f"go2rtc not downloaded, windows: {self.WIN} linux: {self.LNX}")
|
|
else:
|
|
self.logger.warning("Go2Rtc not found, he is disabled")
|
|
raise Exception("Go2Rtc not found, he is disabled")
|
|
|
|
#http://localhost:1984/go2rtc/stream.html?src=0_0_0
|
|
def get_stream(self, recorder_index, channel_index, stream_index):
|
|
if os.path.exists("/docker"):
|
|
return {"port":0, "route":f"go2rtc/stream.html?src={recorder_index}_{channel_index}_{stream_index}"}
|
|
else:
|
|
return {"port":self.port, "route":f"go2rtc/stream.html?src={recorder_index}_{channel_index}_{stream_index}"}
|
|
|
|
async def start_go2rtc(self, recorders, base_path = "/go2rtc"):
|
|
api = "api:\n"
|
|
api += f' listen: ":{self.port}"\n'
|
|
api += f' base_path: "{base_path}"\n'
|
|
api += f' origin: "*"\n'
|
|
|
|
lines = api + "streams:\n"
|
|
for recorder in recorders:
|
|
lines += Go2RtcChannel(recorder, 2).generate_lines()
|
|
|
|
cfg_file = os.path.join(app_dir(), "go2rtc", "go2rtc.yaml")
|
|
self.logger.debug(f"go2rtc config: {cfg_file}")
|
|
|
|
async with aiofiles.open(cfg_file, "w", encoding="utf8") as cfg:
|
|
await cfg.write(lines)
|
|
|
|
await asyncio.create_subprocess_exec(self.exec, *["-c", cfg_file])
|
|
|
|
class TranscodeTools:
|
|
statuses:dict[str, TranscodeStatus] = {}
|
|
WIN32PYTHON = "python-win32"
|
|
preview_storage = []
|
|
|
|
def __init__(self, tools_directory, transcode_directory, hide_checks = True, delete_temporary_files = False) -> None:
|
|
self.delete_temporary_files = delete_temporary_files
|
|
self.logger = create_logger(TranscodeTools.__name__)
|
|
debug(self.logger)
|
|
self.hide_checks = hide_checks
|
|
self.tools_directory = tools_directory
|
|
self.transcode_directory = transcode_directory
|
|
if not os.path.exists(tools_directory):
|
|
self.logger.error("download git repo https://git.pblr-nyk.pro/gsd/MiskaRisa264 and place in backend folder to enable transcode tools")
|
|
self.enabled = False
|
|
else:
|
|
python_win32_exists = self.python_win32_exists
|
|
if not python_win32_exists:
|
|
self.logger.error("download https://www.python.org/ftp/python/3.12.3/python-3.12.3-embed-win32.zip and unzip in backend/MiskaRisa/python-win32 all contains files")
|
|
|
|
check_exists_needed_files = self.check_exists_needed_files
|
|
if not check_exists_needed_files:
|
|
self.logger.error("MiskaRisa264 is not fully downloaded, watch in directory to find lost files")
|
|
|
|
check_ffmpeg = self.check_ffmpeg()
|
|
if not check_ffmpeg:
|
|
self.logger.error("ffmpeg in not installed on system or on windows in not in PATH env")
|
|
|
|
check_converter = self.check_converter()
|
|
if not check_converter:
|
|
self.logger.error("failed run h264_converter.py with python-win32")
|
|
|
|
self.enabled = check_exists_needed_files and python_win32_exists and check_ffmpeg and check_converter
|
|
if not self.enabled:
|
|
self.logger.error("Cannot enabled transcode tools, have a errors on init, run config_parser with --no-hide-check parameters to more info")
|
|
|
|
self.logger.info("Transcode tools " + "enabled" if self.enabled else "disabled")
|
|
self.queue_encoding = asyncio.Queue()
|
|
self.queue_images = asyncio.Queue()
|
|
|
|
async def queue_encoding_task(self):
|
|
self.logger.info("Start encoding queue")
|
|
while True:
|
|
await self.queue_encoding.get()
|
|
await asyncio.sleep(1)
|
|
|
|
async def queue_images_task(self):
|
|
self.logger.info("Start images queue")
|
|
while True:
|
|
await self.queue_images.get()
|
|
await asyncio.sleep(1)
|
|
|
|
@property
|
|
def check_exists_needed_files(self):
|
|
for file in ["H264Play.dll", "h264_converter.py", "StreamReader.dll"]:
|
|
if not os.path.exists(os.path.join(self.tools_directory, file)):
|
|
return False
|
|
return True
|
|
|
|
@property
|
|
def python_win32_exists(self):
|
|
return os.path.exists(os.path.join(self.tools_directory, self.WIN32PYTHON))
|
|
|
|
@property
|
|
def python_win32(self):
|
|
return os.path.join(self.tools_directory, self.WIN32PYTHON, "python.exe")
|
|
|
|
@property
|
|
def converter_script(self):
|
|
return os.path.join(self.tools_directory, "h264_converter.py")
|
|
|
|
def check_ffmpeg(self):
|
|
from subprocess import call, DEVNULL
|
|
try:
|
|
return not call("ffmpeg -version".split(), stdin=DEVNULL if self.hide_checks else None, stdout=DEVNULL if self.hide_checks else None, stderr=DEVNULL if self.hide_checks else None)
|
|
except:
|
|
return False
|
|
|
|
def check_converter(self):
|
|
from subprocess import call, DEVNULL
|
|
cmd = f"{self.python_win32} {self.converter_script} --help"
|
|
if platform.system() == "Linux":
|
|
cmd = "wine " + cmd
|
|
try:
|
|
return not call(cmd.split(), stdin=DEVNULL if self.hide_checks else None, stdout=DEVNULL if self.hide_checks else None, stderr=DEVNULL if self.hide_checks else None)
|
|
except:
|
|
return False
|
|
|
|
async def h264toavi(self, source_file, delete_source_file = False):
|
|
exec_string = []
|
|
exec_cmd = ""
|
|
if platform.system() == "Windows":
|
|
exec_cmd = f"{self.python_win32}"
|
|
elif platform.system() == "Linux":
|
|
exec_cmd = "wine"
|
|
exec_string.append(self.python_win32)
|
|
else:
|
|
raise Exception("Unknown platform to transcode")
|
|
|
|
exec_string.append(self.converter_script)
|
|
exec_string.append(source_file)
|
|
|
|
self.logger.debug(f"execute {exec_cmd} {exec_string}")
|
|
proc = await asyncio.create_subprocess_exec(exec_cmd, *exec_string)
|
|
await proc.communicate()
|
|
|
|
if delete_source_file:
|
|
self.deleteFile(source_file)
|
|
if os.path.exists(source_file + ".avi"):
|
|
return source_file + ".avi"
|
|
else:
|
|
raise Exception("AVI not be created")
|
|
|
|
async def avitomp4(self, source_file, delete_source_file = False):
|
|
exec_string = ["-y", "-i", source_file, "-movflags", "faststart", f"{source_file}.mp4"]
|
|
self.logger.debug(f"execute {exec_string}")
|
|
proc = await asyncio.create_subprocess_exec("ffmpeg", *exec_string)
|
|
await proc.communicate()
|
|
|
|
if delete_source_file:
|
|
self.deleteFile(source_file)
|
|
if os.path.exists(source_file + ".mp4"):
|
|
return source_file + ".mp4"
|
|
else:
|
|
raise Exception("MP4 not be created")
|
|
|
|
async def anytoimage(self, source_file, out_file, delete_source_file = False):
|
|
exec_string = ["-y", "-i", source_file, "-ss", "1", "-vframes", "1", out_file]
|
|
self.logger.debug(f"execute {exec_string}")
|
|
proc = await asyncio.create_subprocess_exec("ffmpeg", *exec_string)
|
|
await proc.communicate()
|
|
|
|
if delete_source_file:
|
|
self.deleteFile(source_file)
|
|
if os.path.exists(out_file):
|
|
return out_file
|
|
else:
|
|
raise ImageNoCreated(f"{out_file} not be created")
|
|
|
|
def deleteFile(self, source_file):
|
|
try:
|
|
os.remove(source_file)
|
|
except:
|
|
pass
|
|
|
|
async def processing_preview_task(self, file:File, nvr: NVR, ext = "webp", preview_pre_bytes = 1024 * 512):
|
|
async def task(file, nvr, ext, preview_pre_bytes):
|
|
await self.processing_preview(file, nvr, ext, preview_pre_bytes)
|
|
|
|
self.queue_images.put_nowait(asyncio.create_task(task(file, nvr, ext, preview_pre_bytes)))
|
|
|
|
async def processing_preview(self, file:File, nvr: NVR, ext = "webp", preview_pre_bytes = 1024 * 512):
|
|
raw_file = file.previewPath(self.transcode_directory)
|
|
preview_file = f"{raw_file}.{ext}"
|
|
|
|
def clear_from_storage():
|
|
try:
|
|
idx = self.preview_storage.index(preview_file)
|
|
del self.preview_storage[idx]
|
|
except:
|
|
self.preview_storage = []
|
|
|
|
if preview_file in self.preview_storage:
|
|
return preview_file
|
|
else:
|
|
self.preview_storage.append(preview_file)
|
|
|
|
if os.path.exists(preview_file) and os.path.getsize(preview_file) != 0:
|
|
self.logger.info(f"{preview_file} is exists")
|
|
return preview_file
|
|
|
|
if nvr is None:
|
|
clear_from_storage()
|
|
raise NeedNVR
|
|
|
|
if not os.path.exists(raw_file) or os.path.getsize(raw_file) == 0:
|
|
try:
|
|
await nvr.login()
|
|
async def download_task():
|
|
self.logger.info(f"download new preview {preview_file}")
|
|
async with aiofiles.open(raw_file, "wb") as raw:
|
|
async for chunk in nvr.stream_file(file, preview_pre_bytes):
|
|
await raw.write(chunk)
|
|
await download_task()
|
|
|
|
except Exception as te:
|
|
clear_from_storage()
|
|
self.logger.info(f"Cancel download file: {te}")
|
|
if os.path.exists(raw_file):
|
|
os.remove(raw_file)
|
|
finally:
|
|
nvr.logout()
|
|
|
|
raw_file_avi = await self.h264toavi(raw_file)
|
|
if self.delete_temporary_files:
|
|
self.deleteFile(raw_file)
|
|
|
|
try:
|
|
return await self.anytoimage(raw_file_avi, preview_file, self.delete_temporary_files)
|
|
except ImageNoCreated:
|
|
clear_from_storage()
|
|
self.logger.warning("Redownload with more size")
|
|
return await self.processing_preview(file, nvr, ext, preview_pre_bytes * 2)
|
|
|
|
async def processing_safe(self, status: TranscodeStatus, file:File, nvr: NVR, reCreate:bool = False):
|
|
async def task(status: TranscodeStatus, file:File, nvr: NVR, reCreate):
|
|
try:
|
|
await self.processing_mp4(status, file, nvr, reCreate)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
self.statuses[status.b64].error = str(e)
|
|
|
|
self.logger.info(f"Start enconding {file.filename_cleared}")
|
|
self.queue_encoding.put_nowait(asyncio.create_task(task(status = status, file = file, nvr = nvr, reCreate = reCreate)))
|
|
|
|
async def processing_mp4(self, status: TranscodeStatus, file:File, nvr: NVR, reCreate:bool = False):
|
|
raw_file = file.localPath(self.transcode_directory)
|
|
self.logger.info(f"save path: {raw_file}")
|
|
|
|
mp4_file = os.path.join(f"{raw_file}.avi.mp4")
|
|
if os.path.exists(mp4_file) and os.path.getsize(mp4_file) != 0:
|
|
if not reCreate:
|
|
nvr.logout()
|
|
self.statuses[status.b64].outFile = mp4_file
|
|
self.statuses[status.b64].done = True
|
|
self.statuses[status.b64].outSize = os.path.getsize(mp4_file)
|
|
return
|
|
|
|
if not os.path.exists(raw_file) or os.path.getsize(raw_file) != file.size or reCreate:
|
|
self.logger.debug(f"save raw file to {raw_file}")
|
|
async with aiofiles.open(raw_file, "wb") as raw:
|
|
self.statuses[status.b64].total_h264_bytes = file.size
|
|
await nvr.login()
|
|
async for chunk in nvr.stream_file(file):
|
|
self.statuses[status.b64].downloaded_h264_bytes += len(chunk)
|
|
self.statuses[status.b64].h264 = round(100 * self.statuses[status.b64].downloaded_h264_bytes / self.statuses[status.b64].total_h264_bytes)
|
|
await raw.write(chunk)
|
|
self.logger.debug("raw file is downloaded")
|
|
else:
|
|
self.logger.debug("File already content on server")
|
|
self.logger.debug("logout from nvr, he is not more needed")
|
|
nvr.logout()
|
|
|
|
self.statuses[status.b64].avi = 0
|
|
avi_file = raw_file + ".avi"
|
|
if not os.path.exists(avi_file) or reCreate:
|
|
avi_file = await self.h264toavi(raw_file, self.delete_temporary_files)
|
|
else:
|
|
self.logger.debug("file avi format already exitst")
|
|
self.statuses[status.b64].avi = 100
|
|
|
|
|
|
self.statuses[status.b64].mp4 = 0
|
|
mp4_file = avi_file + ".mp4"
|
|
if not os.path.exists(mp4_file) or reCreate:
|
|
mp4_file = await self.avitomp4(avi_file, self.delete_temporary_files)
|
|
else:
|
|
self.logger.debug("file mp4 format already exists")
|
|
self.statuses[status.b64].mp4 = 100
|
|
|
|
self.statuses[status.b64].outFile = mp4_file
|
|
self.statuses[status.b64].done = True
|
|
self.statuses[status.b64].outSize = os.path.getsize(mp4_file)
|
|
|
|
async def transcode_test(self, raw_file):
|
|
avi_file = await self.h264toavi(raw_file)
|
|
mp4_file = await self.avitomp4(avi_file)
|
|
|
|
def h264toavi_test(self, raw_file):
|
|
loop = asyncio.get_event_loop()
|
|
tasks = [loop.create_task(self.transcode_test(raw_file))]
|
|
loop.run_until_complete(asyncio.wait(tasks))
|
|
loop.close()
|
|
|
|
class Config:
|
|
def __init__(self, config_name = "config.json", args = None) -> None:
|
|
self.logger = create_logger(Config.__name__)
|
|
raw = load_config(config_name) if args == None or not args.err_check else {}
|
|
self.listen_address = raw.get("backend", {}).get("address", "0.0.0.0")
|
|
self.listen_port = int(raw.get("backend", {}).get("port", "8080"))
|
|
self.auth = raw.get("backend", {}).get("auth", {}).items()
|
|
|
|
self.recorders = []
|
|
i = 0
|
|
for raw_server in raw.get("recorders", []):
|
|
self.recorders.append(Recorder(raw_server.get("ip"), raw_server.get("port"), raw_server.get("user"), raw_server.get("password"), raw_server.get("name", ""), index=i, name_overrides=raw_server.get("name_overrides", {})))
|
|
i += 1
|
|
if (self.recorders.__len__() == 0):
|
|
self.logger.warning("Recorders not find")
|
|
else:
|
|
for recorder in self.recorders:
|
|
self.logger.info(recorder)
|
|
self.transcode_tools:TranscodeTools = self.check_transcode_tools(not args.no_hide_check if args != None else True, raw.get("delete_temporary_files", False))
|
|
|
|
def getRecorder(self, index = 0) -> Recorder:
|
|
return self.recorders[index]
|
|
|
|
def getRecorders(self):
|
|
return [str(r) for r in self.recorders]
|
|
|
|
def check_transcode_directory(self):
|
|
t_dir = os.path.join(app_dir(), "transcode")
|
|
if not os.path.exists(t_dir):
|
|
os.mkdir(t_dir)
|
|
return t_dir
|
|
|
|
def check_transcode_tools(self, hide_check, delete_temporary_files):
|
|
tools_dir = os.path.join(app_dir(), "MiskaRisa264")
|
|
return TranscodeTools(tools_dir, self.check_transcode_directory(), hide_check, delete_temporary_files)
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--no-hide-check", action="store_true")
|
|
parser.add_argument("--err-check", action="store_true")
|
|
parser.add_argument("--test-h264toavi", type=str)
|
|
args = parser.parse_args()
|
|
config = Config(args = args)
|
|
if args.err_check:
|
|
sys.exit(0 if config.transcode_tools.enabled else 1)
|
|
|
|
if args.test_h264toavi:
|
|
config.transcode_tools.h264toavi_test(args.test_h264toavi)
|
|
sys.exit(0)
|
|
|
|
sys.exit(0)
|