You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

217 lines
7.6 KiB

from time import sleep
from dvrip import DVRIPCam, SomethingIsWrongWithCamera
from pathlib import Path
import subprocess
import json
from datetime import datetime
class SolarCam:
cam = None
logger = None
def __init__(self, host_ip, user, password, logger):
self.logger = logger
self.cam = DVRIPCam(
host_ip,
user=user,
password=password,
)
def login(self, num_retries=10):
for i in range(num_retries):
try:
self.logger.debug("Try login...")
self.cam.login()
self.logger.debug(
f"Success! Connected to Camera. Waiting few seconds to let Camera fully boot..."
)
# waiting until camera is ready
sleep(10)
return
except SomethingIsWrongWithCamera:
self.logger.debug("Could not connect...Camera could be offline")
self.cam.close()
if i == 9:
raise ConnectionRefusedError(
f"Could not connect {num_retries} times...aborting"
)
sleep(2)
def logout(self):
self.cam.close()
def get_time(self):
return self.cam.get_time()
def set_time(self, time=None):
if time is None:
time = datetime.now()
return self.cam.set_time(time=time)
def get_local_files(self, start, end, filetype):
return self.cam.list_local_files(start, end, filetype)
def dump_local_files(
self, files, blacklist_path, download_dir, target_filetype=None
):
with open(f"{blacklist_path}.dmp", "a") as outfile:
for file in files:
target_file_path = self.generateTargetFilePath(
file["FileName"], download_dir
)
outfile.write(f"{target_file_path}\n")
if target_filetype:
target_file_path_convert = self.generateTargetFilePath(
file["FileName"], download_dir, extention=f"{target_filetype}"
)
outfile.write(f"{target_file_path_convert}\n")
def generateTargetFilePath(self, filename, downloadDir, extention=""):
fileExtention = Path(filename).suffix
filenameSplit = filename.split("/")
filenameDisk = f"{filenameSplit[3]}_{filenameSplit[5][:8]}".replace(".", "-")
targetPathClean = f"{downloadDir}/{filenameDisk}"
if extention != "":
return f"{targetPathClean}{extention}"
return f"{targetPathClean}{fileExtention}"
def convertFile(self, sourceFile, targetFile):
if (
subprocess.run(
f"ffmpeg -framerate 15 -i {sourceFile} -b:v 1M -c:v libvpx-vp9 -c:a libopus {targetFile}",
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=True,
).returncode
!= 0
):
self.logger.debug(f"Error converting video. Check {sourceFile}")
self.logger.debug(f"File successfully converted: {targetFile}")
Path(sourceFile).unlink()
self.logger.debug(f"Orginal file successfully deleted: {sourceFile}")
def save_files(self, download_dir, files, blacklist=None, target_filetype=None):
self.logger.debug(f"Start downloading files")
for file in files:
target_file_path = self.generateTargetFilePath(
file["FileName"], download_dir
)
target_file_path_convert = None
if target_filetype:
target_file_path_convert = self.generateTargetFilePath(
file["FileName"], download_dir, extention=f"{target_filetype}"
)
if Path(f"{target_file_path}").is_file():
self.logger.debug(f"File already exists: {target_file_path}")
continue
if (
target_file_path_convert
and Path(f"{target_file_path_convert}").is_file()
):
self.logger.debug(
f"Converted file already exists: {target_file_path_convert}"
)
continue
if blacklist:
if target_file_path in blacklist:
self.logger.debug(f"File is on the blacklist: {target_file_path}")
continue
if target_file_path_convert and target_file_path_convert in blacklist:
self.logger.debug(
f"File is on the blacklist: {target_file_path_convert}"
)
continue
self.logger.debug(f"Downloading {target_file_path}...")
self.cam.download_file(
file["BeginTime"], file["EndTime"], file["FileName"], target_file_path
)
self.logger.debug(f"Finished downloading {target_file_path}...")
if target_file_path_convert:
self.logger.debug(f"Converting {target_file_path_convert}...")
self.convertFile(target_file_path, target_file_path_convert)
self.logger.debug(f"Finished converting {target_file_path_convert}.")
self.logger.debug(f"Finish downloading files")
def move_cam(self, direction, step=5):
match direction:
case "up":
self.cam.ptz_step("DirectionUp", step=step)
case "down":
self.cam.ptz_step("DirectionDown", step=step)
case "left":
self.cam.ptz_step("DirectionLeft", step=step)
case "right":
self.cam.ptz_step("DirectionRight", step=step)
case _:
self.logger.debug(f"No direction found")
def mute_cam(self):
print(
self.cam.send(
1040,
{
"fVideo.Volume": [
{"AudioMode": "Single", "LeftVolume": 0, "RightVolume": 0}
],
"Name": "fVideo.Volume",
},
)
)
def set_volume(self, volume):
print(
self.cam.send(
1040,
{
"fVideo.Volume": [
{
"AudioMode": "Single",
"LeftVolume": volume,
"RightVolume": volume,
}
],
"Name": "fVideo.Volume",
},
)
)
def get_battery(self):
data = self.cam.send_custom(
1610,
{"Name": "OPTUpData", "OPTUpData": {"UpLoadDataType": 5}},
size=260,
)[87:-2].decode("utf-8")
json_data = json.loads(data)
return {
"BatteryPercent": json_data["Dev.ElectCapacity"]["percent"],
"Charging": json_data["Dev.ElectCapacity"]["electable"],
}
def get_storage(self):
# get available storage in gb
storage_result = []
data = self.cam.send(1020, {"Name": "StorageInfo"})
for storage_index, storage in enumerate(data["StorageInfo"]):
for partition_index, partition in enumerate(storage["Partition"]):
s = {
"Storage": storage_index,
"Partition": partition_index,
"RemainingSpace": int(partition["RemainSpace"], 0) / 1024,
"TotalSpace": int(partition["TotalSpace"], 0) / 1024,
}
storage_result.append(s)
return storage_result