You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

479 lines
19 KiB

import traceback
from genericpath import exists
import os, sys
from asyncio_dvrip import DVRIPCam
import asyncio
from nvr_core import NVR
from nvr_types import File
import platform
import aiofiles
from subprocess import DEVNULL
from global_funcs import *
class NeedNVR(Exception):
pass
class ImageNoCreated(Exception):
def __init__(self, msg):
pass
class Recorder:
loop = asyncio.get_event_loop()
def __init__(self, address, port, username, password, name = "", index = 0, name_overrides = {}):
self.address = address
self.port = int(port)
self.username = username
self.password = password
self.name = name
self.index = index
self.channels = 0
self.nvr_index = 0
self.name_overrides = name_overrides
@property
def nvr(self):
client = DVRIPCam(self.address, port = self.port, user = self.username, password = self.password)
self.nvr_index += 1
return NVR(client, self.loop, self.nvr_index)
def __str__(self) -> str:
if not self.name:
return f"{self.address}:{self.port}"
else:
return self.name
class TranscodeStatus:
def __init__(self, b64) -> None:
self.b64 = b64
self.uuid = str(uuid_from_string(b64))
self.h264 = 0
self.downloaded_h264_bytes = 0
self.total_h264_bytes = 0
self.avi = 0
self.mp4 = 0
self.outFile = None
self.done = False
self.outSize = 0
self.error = ""
@property
def outName(self):
if self.outFile:
return os.path.split(self.outFile)[-1]
return ""
async def generate_bytes(self):
async with aiofiles.open(self.outFile, "rb") as out:
while True:
chunk = await out.read(32 * 1024)
if chunk:
yield chunk
else:
break
yield b""
class Go2RtcChannel:
#vhod_hd: dvrip://bfwc:[email protected]:34567?channel=3&subtype=0
def __init__(self, recorder: Recorder, count_of_streams = 2) -> None:
self.proto = "dvrip"
self.login = recorder.username
self.password = recorder.password
self.host = f"{recorder.address}:{recorder.port}"
self.recorder_index = recorder.index
self.count_of_channels = recorder.channels
self.count_of_streams = count_of_streams
def generate_lines(self):
lines = ""
for i in range(0, self.count_of_channels):
for j in range(0, self.count_of_streams):
lines += f" {self.recorder_index}_{i}_{j}: {self.proto}://{self.login}:{self.password}@{self.host}?channel={i}&subtype={j}\n"
return lines
class Go2Rtc:
WIN = "https://github.com/AlexxIT/go2rtc/releases/download/v1.9.4/go2rtc_win64.zip"
LNX = "https://github.com/AlexxIT/go2rtc/releases/download/v1.9.4/go2rtc_linux_amd64"
def __init__(self, port = 1984) -> None:
self.logger = create_logger(Go2Rtc.__name__)
self.port = port
self.enabled = False
try:
self.check_exists()
self.logger.info("Go2rtc support enabled")
except:
self.logger.error("Go2rtc is disabled")
pass
def check_exists(self):
go2rtc_directory = os.path.join(app_dir(), "go2rtc")
if os.path.exists(go2rtc_directory):
self.logger.debug("Go2Rtc directory exists")
if platform.system() == "Windows" and os.path.exists(os.path.join(go2rtc_directory, "go2rtc.exe")):
self.logger.debug("[WIN] Go2Rtc is exists, continue create config")
self.exec = os.path.join(go2rtc_directory, "go2rtc.exe")
self.enabled = True
elif platform.system() == "Linux" and os.path.exists(os.path.join(go2rtc_directory, "go2rtc")):
self.logger.debug("[LNX] Go2Rtc is exists, continue create config")
self.exec = os.path.join(go2rtc_directory, "go2rtc")
self.enabled = True
else:
self.logger.error(f"Unknown platform: {platform.system()}")
raise Exception(f"go2rtc not downloaded, windows: {self.WIN} linux: {self.LNX}")
else:
self.logger.warning("Go2Rtc not found, he is disabled")
raise Exception("Go2Rtc not found, he is disabled")
#http://localhost:1984/go2rtc/stream.html?src=0_0_0
def get_stream(self, recorder_index, channel_index, stream_index):
if os.path.exists("/docker"):
return {"port":0, "route":f"go2rtc/stream.html?src={recorder_index}_{channel_index}_{stream_index}"}
else:
return {"port":self.port, "route":f"go2rtc/stream.html?src={recorder_index}_{channel_index}_{stream_index}"}
async def start_go2rtc(self, recorders, base_path = "/go2rtc"):
api = "api:\n"
api += f' listen: ":{self.port}"\n'
api += f' base_path: "{base_path}"\n'
api += f' origin: "*"\n'
lines = api + "streams:\n"
for recorder in recorders:
lines += Go2RtcChannel(recorder, 2).generate_lines()
cfg_file = os.path.join(app_dir(), "go2rtc", "go2rtc.yaml")
self.logger.debug(f"go2rtc config: {cfg_file}")
async with aiofiles.open(cfg_file, "w", encoding="utf8") as cfg:
await cfg.write(lines)
await asyncio.create_subprocess_exec(self.exec, *["-c", cfg_file])
class TranscodeTools:
statuses:dict[str, TranscodeStatus] = {}
WIN32PYTHON = "python-win32"
preview_storage = []
def __init__(self, tools_directory, transcode_directory, hide_checks = True, delete_temporary_files = False) -> None:
self.delete_temporary_files = delete_temporary_files
self.logger = create_logger(TranscodeTools.__name__)
debug(self.logger)
self.hide_checks = hide_checks
self.tools_directory = tools_directory
self.transcode_directory = transcode_directory
if not os.path.exists(tools_directory):
self.logger.error("download git repo https://git.pblr-nyk.pro/gsd/MiskaRisa264 and place in backend folder to enable transcode tools")
self.enabled = False
else:
python_win32_exists = self.python_win32_exists
if not python_win32_exists:
self.logger.error("download https://www.python.org/ftp/python/3.12.3/python-3.12.3-embed-win32.zip and unzip in backend/MiskaRisa/python-win32 all contains files")
check_exists_needed_files = self.check_exists_needed_files
if not check_exists_needed_files:
self.logger.error("MiskaRisa264 is not fully downloaded, watch in directory to find lost files")
check_ffmpeg = self.check_ffmpeg()
if not check_ffmpeg:
self.logger.error("ffmpeg in not installed on system or on windows in not in PATH env")
check_converter = self.check_converter()
if not check_converter:
self.logger.error("failed run h264_converter.py with python-win32")
self.enabled = check_exists_needed_files and python_win32_exists and check_ffmpeg and check_converter
if not self.enabled:
self.logger.error("Cannot enabled transcode tools, have a errors on init, run config_parser with --no-hide-check parameters to more info")
self.logger.info("Transcode tools " + "enabled" if self.enabled else "disabled")
self.queue_encoding = asyncio.Queue()
self.queue_images = asyncio.Queue()
async def queue_encoding_task(self):
self.logger.info("Start encoding queue")
while True:
await self.queue_encoding.get()
await asyncio.sleep(1)
async def queue_images_task(self):
self.logger.info("Start images queue")
while True:
await self.queue_images.get()
await asyncio.sleep(1)
@property
def check_exists_needed_files(self):
for file in ["H264Play.dll", "h264_converter.py", "StreamReader.dll"]:
if not os.path.exists(os.path.join(self.tools_directory, file)):
return False
return True
@property
def python_win32_exists(self):
return os.path.exists(os.path.join(self.tools_directory, self.WIN32PYTHON))
@property
def python_win32(self):
return os.path.join(self.tools_directory, self.WIN32PYTHON, "python.exe")
@property
def converter_script(self):
return os.path.join(self.tools_directory, "h264_converter.py")
def check_ffmpeg(self):
from subprocess import call, DEVNULL
try:
return not call("ffmpeg -version".split(), stdin=DEVNULL if self.hide_checks else None, stdout=DEVNULL if self.hide_checks else None, stderr=DEVNULL if self.hide_checks else None)
except:
return False
def check_converter(self):
from subprocess import call, DEVNULL
cmd = f"{self.python_win32} {self.converter_script} --help"
if platform.system() == "Linux":
cmd = "wine " + cmd
try:
return not call(cmd.split(), stdin=DEVNULL if self.hide_checks else None, stdout=DEVNULL if self.hide_checks else None, stderr=DEVNULL if self.hide_checks else None)
except:
return False
async def h264toavi(self, source_file, delete_source_file = False):
exec_string = []
exec_cmd = ""
if platform.system() == "Windows":
exec_cmd = f"{self.python_win32}"
elif platform.system() == "Linux":
exec_cmd = "wine"
exec_string.append(self.python_win32)
else:
raise Exception("Unknown platform to transcode")
exec_string.append(self.converter_script)
exec_string.append(source_file)
self.logger.debug(f"execute {exec_cmd} {exec_string}")
proc = await asyncio.create_subprocess_exec(exec_cmd, *exec_string)
await proc.communicate()
if delete_source_file:
self.deleteFile(source_file)
if os.path.exists(source_file + ".avi"):
return source_file + ".avi"
else:
raise Exception("AVI not be created")
async def avitomp4(self, source_file, delete_source_file = False):
exec_string = ["-y", "-i", source_file, "-movflags", "faststart", f"{source_file}.mp4"]
self.logger.debug(f"execute {exec_string}")
proc = await asyncio.create_subprocess_exec("ffmpeg", *exec_string)
await proc.communicate()
if delete_source_file:
self.deleteFile(source_file)
if os.path.exists(source_file + ".mp4"):
return source_file + ".mp4"
else:
raise Exception("MP4 not be created")
async def anytoimage(self, source_file, out_file, delete_source_file = False):
exec_string = ["-y", "-i", source_file, "-ss", "1", "-vframes", "1", out_file]
self.logger.debug(f"execute {exec_string}")
proc = await asyncio.create_subprocess_exec("ffmpeg", *exec_string)
await proc.communicate()
if delete_source_file:
self.deleteFile(source_file)
if os.path.exists(out_file):
return out_file
else:
raise ImageNoCreated(f"{out_file} not be created")
def deleteFile(self, source_file):
try:
os.remove(source_file)
except:
pass
async def processing_preview_task(self, file:File, nvr: NVR, ext = "webp", preview_pre_bytes = 1024 * 512):
async def task(file, nvr, ext, preview_pre_bytes):
await self.processing_preview(file, nvr, ext, preview_pre_bytes)
self.queue_images.put_nowait(asyncio.create_task(task(file, nvr, ext, preview_pre_bytes)))
async def processing_preview(self, file:File, nvr: NVR, ext = "webp", preview_pre_bytes = 1024 * 512):
raw_file = file.previewPath(self.transcode_directory)
preview_file = f"{raw_file}.{ext}"
def clear_from_storage():
try:
idx = self.preview_storage.index(preview_file)
del self.preview_storage[idx]
except:
self.preview_storage = []
if preview_file in self.preview_storage:
return preview_file
else:
self.preview_storage.append(preview_file)
if os.path.exists(preview_file) and os.path.getsize(preview_file) != 0:
self.logger.info(f"{preview_file} is exists")
return preview_file
if nvr is None:
clear_from_storage()
raise NeedNVR
if not os.path.exists(raw_file) or os.path.getsize(raw_file) == 0:
try:
await nvr.login()
async def download_task():
self.logger.info(f"download new preview {preview_file}")
async with aiofiles.open(raw_file, "wb") as raw:
async for chunk in nvr.stream_file(file, preview_pre_bytes):
await raw.write(chunk)
await download_task()
except Exception as te:
clear_from_storage()
self.logger.info(f"Cancel download file: {te}")
if os.path.exists(raw_file):
os.remove(raw_file)
finally:
nvr.logout()
raw_file_avi = await self.h264toavi(raw_file)
if self.delete_temporary_files:
self.deleteFile(raw_file)
try:
return await self.anytoimage(raw_file_avi, preview_file, self.delete_temporary_files)
except ImageNoCreated:
clear_from_storage()
self.logger.warning("Redownload with more size")
return await self.processing_preview(file, nvr, ext, preview_pre_bytes * 2)
async def processing_safe(self, status: TranscodeStatus, file:File, nvr: NVR, reCreate:bool = False):
async def task(status: TranscodeStatus, file:File, nvr: NVR, reCreate):
try:
await self.processing_mp4(status, file, nvr, reCreate)
except Exception as e:
traceback.print_exc()
self.statuses[status.b64].error = str(e)
self.logger.info(f"Start enconding {file.filename_cleared}")
self.queue_encoding.put_nowait(asyncio.create_task(task(status = status, file = file, nvr = nvr, reCreate = reCreate)))
async def processing_mp4(self, status: TranscodeStatus, file:File, nvr: NVR, reCreate:bool = False):
raw_file = file.localPath(self.transcode_directory)
self.logger.info(f"save path: {raw_file}")
mp4_file = os.path.join(f"{raw_file}.avi.mp4")
if os.path.exists(mp4_file) and os.path.getsize(mp4_file) != 0:
if not reCreate:
nvr.logout()
self.statuses[status.b64].outFile = mp4_file
self.statuses[status.b64].done = True
self.statuses[status.b64].outSize = os.path.getsize(mp4_file)
return
if not os.path.exists(raw_file) or os.path.getsize(raw_file) != file.size or reCreate:
self.logger.debug(f"save raw file to {raw_file}")
async with aiofiles.open(raw_file, "wb") as raw:
self.statuses[status.b64].total_h264_bytes = file.size
await nvr.login()
async for chunk in nvr.stream_file(file):
self.statuses[status.b64].downloaded_h264_bytes += len(chunk)
self.statuses[status.b64].h264 = round(100 * self.statuses[status.b64].downloaded_h264_bytes / self.statuses[status.b64].total_h264_bytes)
await raw.write(chunk)
self.logger.debug("raw file is downloaded")
else:
self.logger.debug("File already content on server")
self.logger.debug("logout from nvr, he is not more needed")
nvr.logout()
self.statuses[status.b64].avi = 0
avi_file = raw_file + ".avi"
if not os.path.exists(avi_file) or reCreate:
avi_file = await self.h264toavi(raw_file, self.delete_temporary_files)
else:
self.logger.debug("file avi format already exitst")
self.statuses[status.b64].avi = 100
self.statuses[status.b64].mp4 = 0
mp4_file = avi_file + ".mp4"
if not os.path.exists(mp4_file) or reCreate:
mp4_file = await self.avitomp4(avi_file, self.delete_temporary_files)
else:
self.logger.debug("file mp4 format already exists")
self.statuses[status.b64].mp4 = 100
self.statuses[status.b64].outFile = mp4_file
self.statuses[status.b64].done = True
self.statuses[status.b64].outSize = os.path.getsize(mp4_file)
async def transcode_test(self, raw_file):
avi_file = await self.h264toavi(raw_file)
mp4_file = await self.avitomp4(avi_file)
def h264toavi_test(self, raw_file):
loop = asyncio.get_event_loop()
tasks = [loop.create_task(self.transcode_test(raw_file))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
class Config:
def __init__(self, config_name = "config.json", args = None) -> None:
self.logger = create_logger(Config.__name__)
raw = load_config(config_name) if args == None or not args.err_check else {}
self.listen_address = raw.get("backend", {}).get("address", "0.0.0.0")
self.listen_port = int(raw.get("backend", {}).get("port", "8080"))
self.auth = raw.get("backend", {}).get("auth", {}).items()
self.recorders = []
i = 0
for raw_server in raw.get("recorders", []):
self.recorders.append(Recorder(raw_server.get("ip"), raw_server.get("port"), raw_server.get("user"), raw_server.get("password"), raw_server.get("name", ""), index=i, name_overrides=raw_server.get("name_overrides", {})))
i += 1
if (self.recorders.__len__() == 0):
self.logger.warning("Recorders not find")
else:
for recorder in self.recorders:
self.logger.info(recorder)
self.transcode_tools:TranscodeTools = self.check_transcode_tools(not args.no_hide_check if args != None else True, raw.get("delete_temporary_files", False))
def getRecorder(self, index = 0) -> Recorder:
return self.recorders[index]
def getRecorders(self):
return [str(r) for r in self.recorders]
def check_transcode_directory(self):
t_dir = os.path.join(app_dir(), "transcode")
if not os.path.exists(t_dir):
os.mkdir(t_dir)
return t_dir
def check_transcode_tools(self, hide_check, delete_temporary_files):
tools_dir = os.path.join(app_dir(), "MiskaRisa264")
return TranscodeTools(tools_dir, self.check_transcode_directory(), hide_check, delete_temporary_files)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--no-hide-check", action="store_true")
parser.add_argument("--err-check", action="store_true")
parser.add_argument("--test-h264toavi", type=str)
args = parser.parse_args()
config = Config(args = args)
if args.err_check:
sys.exit(0 if config.transcode_tools.enabled else 1)
if args.test_h264toavi:
config.transcode_tools.h264toavi_test(args.test_h264toavi)
sys.exit(0)
sys.exit(0)