mirror of https://github.com/OpenIPC/python-dvr
committed by
GitHub
4 changed files with 214 additions and 24 deletions
@ -0,0 +1,94 @@ |
|||||
|
from time import sleep, monotonic |
||||
|
from dvrip import DVRIPCam, SomethingIsWrongWithCamera |
||||
|
from pathlib import Path |
||||
|
import logging |
||||
|
|
||||
|
|
||||
|
class NVR: |
||||
|
nvr = None |
||||
|
logger = None |
||||
|
|
||||
|
def __init__(self, host_ip, user, password, logger): |
||||
|
self.logger = logger |
||||
|
self.nvr = DVRIPCam( |
||||
|
host_ip, |
||||
|
user=user, |
||||
|
password=password, |
||||
|
) |
||||
|
if logger.level <= logging.DEBUG: |
||||
|
self.nvr.debug() |
||||
|
|
||||
|
def login(self): |
||||
|
try: |
||||
|
self.logger.info(f"Connecting to NVR...") |
||||
|
self.nvr.login() |
||||
|
self.logger.info("Successfuly connected to NVR.") |
||||
|
return |
||||
|
except SomethingIsWrongWithCamera: |
||||
|
self.logger.error("Can't connect to NVR") |
||||
|
self.nvr.close() |
||||
|
|
||||
|
def logout(self): |
||||
|
self.nvr.close() |
||||
|
|
||||
|
def get_channel_statuses(self): |
||||
|
channel_statuses = self.nvr.get_channel_statuses() |
||||
|
if 'Ret' in channel_statuses: |
||||
|
return None |
||||
|
|
||||
|
channel_titles = self.nvr.get_channel_titles() |
||||
|
if 'Ret' in channel_titles: |
||||
|
return None |
||||
|
|
||||
|
for i in range(min(len(channel_statuses), len(channel_titles))): |
||||
|
channel_statuses[i]['Title'] = channel_titles[i] |
||||
|
channel_statuses[i]['Channel'] = i |
||||
|
|
||||
|
return [c for c in channel_statuses if c['Status'] != ''] |
||||
|
|
||||
|
def get_local_files(self, channel, start, end, filetype): |
||||
|
return self.nvr.list_local_files(start, end, filetype, channel) |
||||
|
|
||||
|
def generateTargetFileName(self, filename): |
||||
|
# My NVR's filename example: /idea0/2023-11-19/002/05.38.58-05.39.34[M][@69f17][0].h264 |
||||
|
# You should check file names in your NVR and review the transformation |
||||
|
filenameSplit = filename.replace("][", "/").replace("[", "/").replace("]", "/").split("/") |
||||
|
return f"{filenameSplit[3]}_{filenameSplit[2]}_{filenameSplit[4]}{filenameSplit[-1]}" |
||||
|
|
||||
|
def save_files(self, download_dir, files): |
||||
|
self.logger.info(f"Files downloading: start") |
||||
|
|
||||
|
size_to_download = sum(int(f['FileLength'], 0) for f in files) |
||||
|
|
||||
|
for file in files: |
||||
|
target_file_name = self.generateTargetFileName(file["FileName"]) |
||||
|
target_file_path = f"{download_dir}/{target_file_name}" |
||||
|
|
||||
|
size = int(file['FileLength'], 0) |
||||
|
size_to_download -= size |
||||
|
|
||||
|
if Path(f"{target_file_path}").is_file(): |
||||
|
self.logger.info(f" {target_file_name} file already exists, skipping download") |
||||
|
continue |
||||
|
|
||||
|
self.logger.info(f" {target_file_name} [{size/1024:.1f} MBytes] downloading...") |
||||
|
time_dl = monotonic() |
||||
|
self.nvr.download_file( |
||||
|
file["BeginTime"], file["EndTime"], file["FileName"], target_file_path |
||||
|
) |
||||
|
time_dl = monotonic() - time_dl |
||||
|
speed = size / time_dl |
||||
|
self.logger.info(f" Done [{speed:.1f} KByte/s] {size_to_download/1024:.1f} MBytes more to download") |
||||
|
|
||||
|
self.logger.info(f"Files downloading: done") |
||||
|
|
||||
|
def list_files(self, files): |
||||
|
self.logger.info(f"Files listing: start") |
||||
|
|
||||
|
for file in files: |
||||
|
target_file_name = self.generateTargetFileName(file["FileName"]) |
||||
|
|
||||
|
size = int(file['FileLength'], 0) |
||||
|
self.logger.info(f" {target_file_name} [{size/1024:.1f} MBytes]") |
||||
|
|
||||
|
self.logger.info(f"Files listing: end") |
@ -0,0 +1,11 @@ |
|||||
|
{ |
||||
|
"host_ip": "10.0.0.8", |
||||
|
"user": "admin", |
||||
|
"password": "mypassword", |
||||
|
"channel": 0, |
||||
|
"download_dir": "./download", |
||||
|
"start": "2023-11-19 6:22:34", |
||||
|
"end": "2023-11-19 6:23:09", |
||||
|
"just_list_files": false, |
||||
|
"log_level": "INFO" |
||||
|
} |
@ -0,0 +1,89 @@ |
|||||
|
from pathlib import Path |
||||
|
import os |
||||
|
import json |
||||
|
import logging |
||||
|
from collections import namedtuple |
||||
|
from NVR import NVR |
||||
|
|
||||
|
|
||||
|
def init_logger(log_level): |
||||
|
logger = logging.getLogger(__name__) |
||||
|
logger.setLevel(log_level) |
||||
|
ch = logging.StreamHandler() |
||||
|
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") |
||||
|
ch.setFormatter(formatter) |
||||
|
logger.addHandler(ch) |
||||
|
return logger |
||||
|
|
||||
|
|
||||
|
def load_config(): |
||||
|
def config_decoder(config_dict): |
||||
|
return namedtuple("X", config_dict.keys())(*config_dict.values()) |
||||
|
|
||||
|
config_path = os.environ.get("NVRVIDEODOWNLOADER_CFG") |
||||
|
|
||||
|
if config_path is None or not Path(config_path).exists(): |
||||
|
config_path = "NVRVideoDownloader.json" |
||||
|
|
||||
|
if Path(config_path).exists(): |
||||
|
with open(config_path, "r") as file: |
||||
|
return json.loads(file.read(), object_hook=config_decoder) |
||||
|
|
||||
|
return { |
||||
|
"host_ip": os.environ.get("IP_ADDRESS"), |
||||
|
"user": os.environ.get("USER"), |
||||
|
"password": os.environ.get("PASSWORD"), |
||||
|
"channel": os.environ.get("CHANNEL"), |
||||
|
"download_dir": os.environ.get("DOWNLOAD_DIR"), |
||||
|
"start": os.environ.get("START"), |
||||
|
"end": os.environ.get("END"), |
||||
|
"just_list_files": os.environ.get("DUMP_LOCAL_FILES").lower() in ["true", "1", "y", "yes"], |
||||
|
"log_level": "INFO" |
||||
|
} |
||||
|
|
||||
|
|
||||
|
def main(): |
||||
|
config = load_config() |
||||
|
logger = init_logger(config.log_level) |
||||
|
channel = config.channel; |
||||
|
start = config.start |
||||
|
end = config.end |
||||
|
just_list_files = config.just_list_files; |
||||
|
|
||||
|
nvr = NVR(config.host_ip, config.user, config.password, logger) |
||||
|
|
||||
|
try: |
||||
|
nvr.login() |
||||
|
|
||||
|
channel_statuses = nvr.get_channel_statuses() |
||||
|
if channel_statuses: |
||||
|
channel_statuses_short = [{f"{c['Channel']}:{c['Title']}({c['ChnName']})"} |
||||
|
for c in channel_statuses if c['Status'] != 'NoConfig'] |
||||
|
logger.info(f"Configured channels in NVR: {channel_statuses_short}") |
||||
|
|
||||
|
videos = nvr.get_local_files(channel, start, end, "h264") |
||||
|
if videos: |
||||
|
size = sum(int(f['FileLength'], 0) for f in videos) |
||||
|
logger.info(f"Video files found: {len(videos)}. Total size: {size/1024:.1f}M") |
||||
|
Path(config.download_dir).parent.mkdir( |
||||
|
parents=True, exist_ok=True |
||||
|
) |
||||
|
if just_list_files: |
||||
|
nvr.list_files(videos) |
||||
|
else: |
||||
|
nvr.save_files(config.download_dir, videos) |
||||
|
else: |
||||
|
logger.info(f"No video files found") |
||||
|
|
||||
|
nvr.logout() |
||||
|
except ConnectionRefusedError: |
||||
|
logger.error(f"Connection can't be established or got disconnected") |
||||
|
except TypeError as e: |
||||
|
print(e) |
||||
|
logger.error(f"Error while downloading a file") |
||||
|
except KeyError: |
||||
|
logger.error(f"Error while getting the file list") |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
main() |
Loading…
Reference in new issue