mirror of https://github.com/OpenIPC/python-dvr
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
7.6 KiB
217 lines
7.6 KiB
from time import sleep
|
|
from dvrip import DVRIPCam, SomethingIsWrongWithCamera
|
|
from pathlib import Path
|
|
import subprocess
|
|
import json
|
|
from datetime import datetime
|
|
|
|
|
|
class SolarCam:
|
|
cam = None
|
|
logger = None
|
|
|
|
def __init__(self, host_ip, user, password, logger):
|
|
self.logger = logger
|
|
self.cam = DVRIPCam(
|
|
host_ip,
|
|
user=user,
|
|
password=password,
|
|
)
|
|
|
|
def login(self, num_retries=10):
|
|
for i in range(num_retries):
|
|
try:
|
|
self.logger.debug("Try login...")
|
|
self.cam.login()
|
|
self.logger.debug(
|
|
f"Success! Connected to Camera. Waiting few seconds to let Camera fully boot..."
|
|
)
|
|
# waiting until camera is ready
|
|
sleep(10)
|
|
return
|
|
except SomethingIsWrongWithCamera:
|
|
self.logger.debug("Could not connect...Camera could be offline")
|
|
self.cam.close()
|
|
|
|
if i == 9:
|
|
raise ConnectionRefusedError(
|
|
f"Could not connect {num_retries} times...aborting"
|
|
)
|
|
sleep(2)
|
|
|
|
def logout(self):
|
|
self.cam.close()
|
|
|
|
def get_time(self):
|
|
return self.cam.get_time()
|
|
|
|
def set_time(self, time=None):
|
|
if time is None:
|
|
time = datetime.now()
|
|
return self.cam.set_time(time=time)
|
|
|
|
def get_local_files(self, start, end, filetype):
|
|
return self.cam.list_local_files(start, end, filetype)
|
|
|
|
def dump_local_files(
|
|
self, files, blacklist_path, download_dir, target_filetype=None
|
|
):
|
|
with open(f"{blacklist_path}.dmp", "a") as outfile:
|
|
for file in files:
|
|
target_file_path = self.generateTargetFilePath(
|
|
file["FileName"], download_dir
|
|
)
|
|
outfile.write(f"{target_file_path}\n")
|
|
|
|
if target_filetype:
|
|
target_file_path_convert = self.generateTargetFilePath(
|
|
file["FileName"], download_dir, extention=f"{target_filetype}"
|
|
)
|
|
outfile.write(f"{target_file_path_convert}\n")
|
|
|
|
def generateTargetFilePath(self, filename, downloadDir, extention=""):
|
|
fileExtention = Path(filename).suffix
|
|
filenameSplit = filename.split("/")
|
|
filenameDisk = f"{filenameSplit[3]}_{filenameSplit[5][:8]}".replace(".", "-")
|
|
targetPathClean = f"{downloadDir}/{filenameDisk}"
|
|
|
|
if extention != "":
|
|
return f"{targetPathClean}{extention}"
|
|
|
|
return f"{targetPathClean}{fileExtention}"
|
|
|
|
def convertFile(self, sourceFile, targetFile):
|
|
if (
|
|
subprocess.run(
|
|
f"ffmpeg -framerate 15 -i {sourceFile} -b:v 1M -c:v libvpx-vp9 -c:a libopus {targetFile}",
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
shell=True,
|
|
).returncode
|
|
!= 0
|
|
):
|
|
self.logger.debug(f"Error converting video. Check {sourceFile}")
|
|
|
|
self.logger.debug(f"File successfully converted: {targetFile}")
|
|
Path(sourceFile).unlink()
|
|
self.logger.debug(f"Orginal file successfully deleted: {sourceFile}")
|
|
|
|
def save_files(self, download_dir, files, blacklist=None, target_filetype=None):
|
|
self.logger.debug(f"Start downloading files")
|
|
|
|
for file in files:
|
|
target_file_path = self.generateTargetFilePath(
|
|
file["FileName"], download_dir
|
|
)
|
|
|
|
target_file_path_convert = None
|
|
if target_filetype:
|
|
target_file_path_convert = self.generateTargetFilePath(
|
|
file["FileName"], download_dir, extention=f"{target_filetype}"
|
|
)
|
|
|
|
if Path(f"{target_file_path}").is_file():
|
|
self.logger.debug(f"File already exists: {target_file_path}")
|
|
continue
|
|
|
|
if (
|
|
target_file_path_convert
|
|
and Path(f"{target_file_path_convert}").is_file()
|
|
):
|
|
self.logger.debug(
|
|
f"Converted file already exists: {target_file_path_convert}"
|
|
)
|
|
continue
|
|
|
|
if blacklist:
|
|
if target_file_path in blacklist:
|
|
self.logger.debug(f"File is on the blacklist: {target_file_path}")
|
|
continue
|
|
if target_file_path_convert and target_file_path_convert in blacklist:
|
|
self.logger.debug(
|
|
f"File is on the blacklist: {target_file_path_convert}"
|
|
)
|
|
continue
|
|
|
|
self.logger.debug(f"Downloading {target_file_path}...")
|
|
self.cam.download_file(
|
|
file["BeginTime"], file["EndTime"], file["FileName"], target_file_path
|
|
)
|
|
self.logger.debug(f"Finished downloading {target_file_path}...")
|
|
|
|
if target_file_path_convert:
|
|
self.logger.debug(f"Converting {target_file_path_convert}...")
|
|
self.convertFile(target_file_path, target_file_path_convert)
|
|
self.logger.debug(f"Finished converting {target_file_path_convert}.")
|
|
|
|
self.logger.debug(f"Finish downloading files")
|
|
|
|
def move_cam(self, direction, step=5):
|
|
match direction:
|
|
case "up":
|
|
self.cam.ptz_step("DirectionUp", step=step)
|
|
case "down":
|
|
self.cam.ptz_step("DirectionDown", step=step)
|
|
case "left":
|
|
self.cam.ptz_step("DirectionLeft", step=step)
|
|
case "right":
|
|
self.cam.ptz_step("DirectionRight", step=step)
|
|
case _:
|
|
self.logger.debug(f"No direction found")
|
|
|
|
def mute_cam(self):
|
|
print(
|
|
self.cam.send(
|
|
1040,
|
|
{
|
|
"fVideo.Volume": [
|
|
{"AudioMode": "Single", "LeftVolume": 0, "RightVolume": 0}
|
|
],
|
|
"Name": "fVideo.Volume",
|
|
},
|
|
)
|
|
)
|
|
|
|
def set_volume(self, volume):
|
|
print(
|
|
self.cam.send(
|
|
1040,
|
|
{
|
|
"fVideo.Volume": [
|
|
{
|
|
"AudioMode": "Single",
|
|
"LeftVolume": volume,
|
|
"RightVolume": volume,
|
|
}
|
|
],
|
|
"Name": "fVideo.Volume",
|
|
},
|
|
)
|
|
)
|
|
|
|
def get_battery(self):
|
|
data = self.cam.send_custom(
|
|
1610,
|
|
{"Name": "OPTUpData", "OPTUpData": {"UpLoadDataType": 5}},
|
|
size=260,
|
|
)[87:-2].decode("utf-8")
|
|
json_data = json.loads(data)
|
|
return {
|
|
"BatteryPercent": json_data["Dev.ElectCapacity"]["percent"],
|
|
"Charging": json_data["Dev.ElectCapacity"]["electable"],
|
|
}
|
|
|
|
def get_storage(self):
|
|
# get available storage in gb
|
|
storage_result = []
|
|
data = self.cam.send(1020, {"Name": "StorageInfo"})
|
|
for storage_index, storage in enumerate(data["StorageInfo"]):
|
|
for partition_index, partition in enumerate(storage["Partition"]):
|
|
s = {
|
|
"Storage": storage_index,
|
|
"Partition": partition_index,
|
|
"RemainingSpace": int(partition["RemainSpace"], 0) / 1024,
|
|
"TotalSpace": int(partition["TotalSpace"], 0) / 1024,
|
|
}
|
|
storage_result.append(s)
|
|
return storage_result
|
|
|