You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

1128 lines
35 KiB

import os
import struct
import json
from time import sleep
import hashlib
import threading
from socket import socket, AF_INET, SOCK_STREAM, SOCK_DGRAM, SOL_SOCKET
from datetime import *
from re import compile
import time
import logging
from pathlib import Path
class SomethingIsWrongWithCamera(Exception):
pass
class DVRIPCam(object):
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
CODES = {
100: "OK",
101: "Unknown error",
102: "Unsupported version",
103: "Request not permitted",
104: "User already logged in",
105: "User is not logged in",
106: "Username or password is incorrect",
107: "User does not have necessary permissions",
203: "Password is incorrect",
511: "Start of upgrade",
512: "Upgrade was not started",
513: "Upgrade data errors",
514: "Upgrade error",
515: "Upgrade successful",
}
QCODES = {
"AuthorityList": 1470,
"Users": 1472,
"Groups": 1474,
"AddGroup": 1476,
"ModifyGroup": 1478,
"DelGroup": 1480,
"User": 1482,
"ModifyUser": 1484,
"DelUser": 1486,
"ModifyPassword": 1488,
"AlarmInfo": 1504,
"AlarmSet": 1500,
"ChannelTitle": 1046,
"EncodeCapability": 1360,
"General": 1042,
"KeepAlive": 1006,
"OPMachine": 1450,
"OPMailTest": 1636,
"OPMonitor": 1413,
"OPNetKeyboard": 1550,
"OPPTZControl": 1400,
"OPSNAP": 1560,
"OPSendFile": 0x5F2,
"OPSystemUpgrade": 0x5F5,
"OPTalk": 1434,
"OPTimeQuery": 1452,
"OPTimeSetting": 1450,
"NetWork.NetCommon": 1042,
"OPNetAlarm": 1506,
"SystemFunction": 1360,
"SystemInfo": 1020,
}
OPFEED_QCODES = {
"OPFeedBook": {
"SET": 2300,
"GET": 2302,
},
"OPFeedManual": {
"SET": 2304,
},
"OPFeedHistory": {
"GET": 2306,
"SET": 2308,
},
}
KEY_CODES = {
"M": "Menu",
"I": "Info",
"E": "Esc",
"F": "Func",
"S": "Shift",
"L": "Left",
"U": "Up",
"R": "Right",
"D": "Down",
}
OK_CODES = [100, 515]
PORTS = {
"tcp": 34567,
"udp": 34568,
}
def __init__(self, ip, **kwargs):
self.logger = logging.getLogger(__name__)
self.ip = ip
self.iface = kwargs.get("iface", None)
self.user = kwargs.get("user", "admin")
hash_pass = kwargs.get("hash_pass")
self.hash_pass = kwargs.get(
"hash_pass", self.sofia_hash(kwargs.get("password", ""))
)
self.proto = kwargs.get("proto", "tcp")
self.port = kwargs.get("port", self.PORTS.get(self.proto))
self.socket = None
self.packet_count = 0
self.session = 0
self.alive_time = 20
self.alive = None
self.alarm = None
self.alarm_func = None
self.busy = threading.Condition()
def debug(self, format=None):
self.logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
if format:
formatter = logging.Formatter(format)
ch.setFormatter(formatter)
self.logger.addHandler(ch)
def connect(self, timeout=10):
try:
if self.proto == "tcp":
self.socket_send = self.tcp_socket_send
self.socket_recv = self.tcp_socket_recv
self.socket = socket(AF_INET, SOCK_STREAM)
if self.iface:
self.socket.setsockopt(
SOL_SOCKET, 25, str(self.iface + '\0').encode())
self.socket.connect((self.ip, self.port))
elif self.proto == "udp":
self.socket_send = self.udp_socket_send
self.socket_recv = self.udp_socket_recv
self.socket = socket(AF_INET, SOCK_DGRAM)
else:
raise f"Unsupported protocol {self.proto}"
# it's important to extend timeout for upgrade procedure
self.timeout = timeout
self.socket.settimeout(timeout)
except OSError:
raise SomethingIsWrongWithCamera("Cannot connect to camera")
def close(self):
try:
self.alive.cancel()
self.socket.close()
except:
pass
self.socket = None
def udp_socket_send(self, bytes):
return self.socket.sendto(bytes, (self.ip, self.port))
def udp_socket_recv(self, bytes):
data, _ = self.socket.recvfrom(bytes)
return data
def tcp_socket_send(self, bytes):
try:
return self.socket.sendall(bytes)
except:
return None
def tcp_socket_recv(self, bufsize):
try:
return self.socket.recv(bufsize)
except:
return None
def receive_with_timeout(self, length):
received = 0
buf = bytearray()
start_time = time.time()
while True:
data = self.socket_recv(length - received)
buf.extend(data)
received += len(data)
if length == received:
break
elapsed_time = time.time() - start_time
if elapsed_time > self.timeout:
return None
return buf
def receive_json(self, length):
data = self.receive_with_timeout(length)
if data is None:
return {}
self.packet_count += 1
self.logger.debug("<= %s", data)
try:
reply = json.loads(data[:-2])
return reply
except:
return data
def send_custom(
self, msg, data={}, wait_response=True, download=False, version=0
):
if self.socket is None:
return {"Ret": 101}
# self.busy.wait()
self.busy.acquire()
if hasattr(data, "__iter__"):
if version == 1:
data["SessionID"] = f"{self.session:#0{12}x}"
data = bytes(
json.dumps(data, ensure_ascii=False, separators=(",", ":")), "utf-8"
)
tail = b"\x00"
if version == 0:
tail = b"\x0a" + tail
pkt = (
struct.pack(
"BB2xII2xHI",
255,
version,
self.session,
self.packet_count,
msg,
len(data) + len(tail),
)
+ data
+ tail
)
self.logger.debug("=> %s", pkt)
self.socket_send(pkt)
if wait_response:
reply = {"Ret": 101}
data = self.socket_recv(20)
if data is None or len(data) < 20:
return None
(
head,
version,
self.session,
sequence_number,
msgid,
len_data,
) = struct.unpack("BB2xII2xHI", data)
reply = None
if download:
reply = self.get_file(len_data)
else:
reply = self.get_specific_size(len_data)
self.busy.release()
return reply
def send(self, msg, data={}, wait_response=True):
if self.socket is None:
return {"Ret": 101}
# self.busy.wait()
self.busy.acquire()
if hasattr(data, "__iter__"):
data = bytes(json.dumps(data, ensure_ascii=False), "utf-8")
pkt = (
struct.pack(
"BB2xII2xHI",
255,
0,
self.session,
self.packet_count,
msg,
len(data) + 2,
)
+ data
+ b"\x0a\x00"
)
self.logger.debug("=> %s", pkt)
self.socket_send(pkt)
if wait_response:
reply = {"Ret": 101}
data = self.socket_recv(20)
if data is None or len(data) < 20:
return None
(
head,
version,
self.session,
sequence_number,
msgid,
len_data,
) = struct.unpack("BB2xII2xHI", data)
reply = self.receive_json(len_data)
self.busy.release()
return reply
def sofia_hash(self, password=""):
md5 = hashlib.md5(bytes(password, "utf-8")).digest()
chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
return "".join([chars[sum(x) % 62] for x in zip(md5[::2], md5[1::2])])
def login(self):
if self.socket is None:
self.connect()
data = self.send(
1000,
{
"EncryptType": "MD5",
"LoginType": "DVRIP-Web",
"PassWord": self.hash_pass,
"UserName": self.user,
},
)
if data is None or data["Ret"] not in self.OK_CODES:
if data["Ret"] in self.CODES:
print(f'[{data["Ret"]}] {self.CODES[data["Ret"]]}')
return False
self.session = int(data["SessionID"], 16)
self.alive_time = data["AliveInterval"]
self.keep_alive()
if not hasattr(self, 'devtype'):
self.devtype = data["DeviceType "]
return data["Ret"] in self.OK_CODES
def getAuthorityList(self):
data = self.send(self.QCODES["AuthorityList"])
if data["Ret"] in self.OK_CODES:
return data["AuthorityList"]
else:
return []
def getGroups(self):
data = self.send(self.QCODES["Groups"])
if data["Ret"] in self.OK_CODES:
return data["Groups"]
else:
return []
def addGroup(self, name, comment="", auth=None):
data = self.set_command(
"AddGroup",
{
"Group": {
"AuthorityList": auth or self.getAuthorityList(),
"Memo": comment,
"Name": name,
},
},
)
return data["Ret"] in self.OK_CODES
def modifyGroup(self, name, newname=None, comment=None, auth=None):
g = [x for x in self.getGroups() if x["Name"] == name]
if g == []:
print(f'Group "{name}" not found!')
return False
g = g[0]
data = self.send(
self.QCODES["ModifyGroup"],
{
"Group": {
"AuthorityList": auth or g["AuthorityList"],
"Memo": comment or g["Memo"],
"Name": newname or g["Name"],
},
"GroupName": name,
},
)
return data["Ret"] in self.OK_CODES
def delGroup(self, name):
data = self.send(
self.QCODES["DelGroup"],
{
"Name": name,
"SessionID": "0x%08X" % self.session,
},
)
return data["Ret"] in self.OK_CODES
def getUsers(self):
data = self.send(self.QCODES["Users"])
if data["Ret"] in self.OK_CODES:
return data["Users"]
else:
return []
def addUser(
self, name, password, comment="", group="user", auth=None, sharable=True
):
g = [x for x in self.getGroups() if x["Name"] == group]
if g == []:
print(f'Group "{group}" not found!')
return False
g = g[0]
data = self.set_command(
"User",
{
"AuthorityList": auth or g["AuthorityList"],
"Group": g["Name"],
"Memo": comment,
"Name": name,
"Password": self.sofia_hash(password),
"Reserved": False,
"Sharable": sharable,
},
)
return data["Ret"] in self.OK_CODES
def modifyUser(
self, name, newname=None, comment=None, group=None, auth=None, sharable=None
):
u = [x for x in self.getUsers() if x["Name"] == name]
if u == []:
print(f'User "{name}" not found!')
return False
u = u[0]
if group:
g = [x for x in self.getGroups() if x["Name"] == group]
if g == []:
print(f'Group "{group}" not found!')
return False
u["AuthorityList"] = g[0]["AuthorityList"]
data = self.send(
self.QCODES["ModifyUser"],
{
"User": {
"AuthorityList": auth or u["AuthorityList"],
"Group": group or u["Group"],
"Memo": comment or u["Memo"],
"Name": newname or u["Name"],
"Password": "",
"Reserved": u["Reserved"],
"Sharable": sharable or u["Sharable"],
},
"UserName": name,
},
)
return data["Ret"] in self.OK_CODES
def delUser(self, name):
data = self.send(
self.QCODES["DelUser"],
{
"Name": name,
"SessionID": "0x%08X" % self.session,
},
)
return data["Ret"] in self.OK_CODES
def changePasswd(self, newpass="", oldpass=None, user=None):
data = self.send(
self.QCODES["ModifyPassword"],
{
"EncryptType": "MD5",
"NewPassWord": self.sofia_hash(newpass),
"PassWord": oldpass or self.hash_pass,
"SessionID": "0x%08X" % self.session,
"UserName": user or self.user,
},
)
return data["Ret"] in self.OK_CODES
def channel_title(self, titles):
if isinstance(titles, str):
titles = [titles]
self.send(
self.QCODES["ChannelTitle"],
{
"ChannelTitle": titles,
"Name": "ChannelTitle",
"SessionID": "0x%08X" % self.session,
},
)
def channel_bitmap(self, width, height, bitmap):
header = struct.pack("HH12x", width, height)
self.socket_send(
struct.pack(
"BB2xII2xHI",
255,
0,
self.session,
self.packet_count,
0x041A,
len(bitmap) + 16,
)
+ header
+ bitmap
)
reply, rcvd = self.recv_json()
if reply and reply["Ret"] != 100:
return False
return True
def reboot(self):
self.set_command("OPMachine", {"Action": "Reboot"})
self.close()
def setAlarm(self, func):
self.alarm_func = func
def clearAlarm(self):
self.alarm_func = None
def alarmStart(self):
self.alarm = threading.Thread(
name="DVRAlarm%08X" % self.session,
target=self.alarm_thread,
args=[self.busy],
)
res = self.get_command("", self.QCODES["AlarmSet"])
self.alarm.start()
return res
def alarm_thread(self, event):
while True:
event.acquire()
try:
(
head,
version,
session,
sequence_number,
msgid,
len_data,
) = struct.unpack("BB2xII2xHI", self.socket_recv(20))
sleep(0.1) # Just for receive whole packet
reply = self.socket_recv(len_data)
self.packet_count += 1
reply = json.loads(reply[:-2])
if msgid == self.QCODES["AlarmInfo"] and self.session == session:
if self.alarm_func is not None:
self.alarm_func(reply[reply["Name"]], sequence_number)
except:
pass
finally:
event.release()
if self.socket is None:
break
def set_remote_alarm(self, state):
self.set_command(
"OPNetAlarm",
{"Event": 0, "State": state},
)
def keep_alive(self):
ret = self.send(
self.QCODES["KeepAlive"],
{"Name": "KeepAlive", "SessionID": "0x%08X" % self.session},
)
if ret is None:
self.close()
return
self.alive = threading.Timer(self.alive_time, self.keep_alive)
self.alive.daemon = True
self.alive.start()
def keyDown(self, key):
self.set_command(
"OPNetKeyboard",
{"Status": "KeyDown", "Value": key},
)
def keyUp(self, key):
self.set_command(
"OPNetKeyboard",
{"Status": "KeyUp", "Value": key},
)
def keyPress(self, key):
self.keyDown(key)
sleep(0.3)
self.keyUp(key)
def keyScript(self, keys):
for k in keys:
if k != " " and k.upper() in self.KEY_CODES:
self.keyPress(self.KEY_CODES[k.upper()])
else:
sleep(1)
def ptz(self, cmd, step=5, preset=-1, ch=0):
CMDS = [
"DirectionUp",
"DirectionDown",
"DirectionLeft",
"DirectionRight",
"DirectionLeftUp",
"DirectionLeftDown",
"DirectionRightUp",
"DirectionRightDown",
"ZoomTile",
"ZoomWide",
"FocusNear",
"FocusFar",
"IrisSmall",
"IrisLarge",
"SetPreset",
"GotoPreset",
"ClearPreset",
"StartTour",
"StopTour",
]
# ptz_param = { "AUX" : { "Number" : 0, "Status" : "On" }, "Channel" : ch, "MenuOpts" : "Enter", "POINT" : { "bottom" : 0, "left" : 0, "right" : 0, "top" : 0 }, "Pattern" : "SetBegin", "Preset" : -1, "Step" : 5, "Tour" : 0 }
ptz_param = {
"AUX": {"Number": 0, "Status": "On"},
"Channel": ch,
"MenuOpts": "Enter",
"Pattern": "Start",
"Preset": preset,
"Step": step,
"Tour": 1 if "Tour" in cmd else 0,
}
return self.set_command(
"OPPTZControl",
{"Command": cmd, "Parameter": ptz_param},
)
def set_info(self, command, data):
return self.set_command(command, data, 1040)
def set_command(self, command, data, code=None):
if not code:
code = self.OPFEED_QCODES.get(command)
if code:
code = code.get("SET")
if not code:
code = self.QCODES[command]
return self.send(
code, {"Name": command, "SessionID": "0x%08X" % self.session, command: data}
)
def get_info(self, command):
return self.get_command(command, 1042)
def get_command(self, command, code=None):
if not code:
code = self.OPFEED_QCODES.get(command)
if code:
code = code.get("GET")
if not code:
code = self.QCODES[command]
data = self.send(code, {"Name": command, "SessionID": "0x%08X" % self.session})
if isinstance(data, (bytes, bytearray)):
data = bytes(b for b in data[:-2] if b >= 32 or b in (9, 10, 13))
data = json.loads(data)
if data["Ret"] in self.OK_CODES and command in data:
return data[command]
else:
return data
def get_time(self):
return datetime.strptime(self.get_command("OPTimeQuery"), self.DATE_FORMAT)
def set_time(self, time=None):
if time is None:
time = datetime.now()
return self.set_command("OPTimeSetting", time.strftime(self.DATE_FORMAT))
def get_netcommon(self):
return self.get_command("NetWork.NetCommon")
def get_system_info(self):
return self.get_command("SystemInfo")
def get_general_info(self):
return self.get_command("General")
def get_encode_capabilities(self):
return self.get_command("EncodeCapability")
def get_system_capabilities(self):
return self.get_command("SystemFunction")
def get_camera_info(self, default_config=False):
"""Request data for 'Camera' from the target DVRIP device."""
if default_config:
code = 1044
else:
code = 1042
return self.get_command("Camera", code)
def get_encode_info(self, default_config=False):
"""Request data for 'Simplify.Encode' from the target DVRIP device.
Arguments:
default_config -- returns the default values for the type if True
"""
if default_config:
code = 1044
else:
code = 1042
return self.get_command("Simplify.Encode", code)
def recv_json(self, buf=bytearray()):
p = compile(b".*({.*})")
packet = self.socket_recv(0xFFFF)
if not packet:
return None, buf
buf.extend(packet)
m = p.search(buf)
if m is None:
return None, buf
buf = buf[m.span(1)[1] :]
return json.loads(m.group(1)), buf
def get_upgrade_info(self):
return self.get_command("OPSystemUpgrade")
def upgrade(self, filename="", packetsize=0x8000, vprint=None):
if not vprint:
vprint = lambda *args, **kwargs: print(*args, **kwargs)
data = self.set_command(
"OPSystemUpgrade", {"Action": "Start", "Type": "System"}, 0x5F0
)
if data["Ret"] not in self.OK_CODES:
return data
self.logger.debug(f"Sending file: {filename}")
blocknum = 0
sentbytes = 0
fsize = os.stat(filename).st_size
rcvd = bytearray()
with open(filename, "rb") as f:
while True:
bytes = f.read(packetsize)
if not bytes:
break
header = struct.pack(
"BB2xII2xHI", 255, 0, self.session, blocknum, 0x5F2, len(bytes)
)
self.socket_send(header + bytes)
blocknum += 1
sentbytes += len(bytes)
reply, rcvd = self.recv_json(rcvd)
if reply and reply["Ret"] != 100:
vprint("\nUpgrade failed")
return reply
progress = sentbytes / fsize * 100
vprint(f"Uploading: {progress:.1f}%", end='\r')
vprint()
self.logger.debug("Upload complete")
pkt = struct.pack("BB2xIIxBHI", 255, 0, self.session, blocknum, 1, 0x05F2, 0)
self.socket_send(pkt)
self.logger.debug("Starting upgrade...")
while True:
data, rcvd = self.recv_json(rcvd)
self.logger.debug(reply)
if data is None:
vprint("\nDone")
return
if data["Ret"] in [512, 514, 513]:
vprint("\nUpgrade failed")
return data
if data["Ret"] == 515:
vprint("\nUpgrade successful")
self.socket.close()
return data
vprint(f"Upgrading: {data['Ret']:>3}%", end='\r')
vprint()
def get_file(self, first_chunk_size):
buf = bytearray()
data = self.receive_with_timeout(first_chunk_size)
buf.extend(data)
while True:
header = self.receive_with_timeout(20)
len_data = struct.unpack("I", header[16:])[0]
if len_data == 0:
return buf
data = self.receive_with_timeout(len_data)
buf.extend(data)
def get_specific_size(self, size):
return self.receive_with_timeout(size)
def reassemble_bin_payload(self, metadata={}):
def internal_to_type(data_type, value):
if data_type == 0x1FC or data_type == 0x1FD:
if value == 1:
return "mpeg4"
elif value == 2:
return "h264"
elif value == 3:
return "h265"
elif data_type == 0x1F9:
if value == 1 or value == 6:
return "info"
elif data_type == 0x1FA:
if value == 0xE:
return "g711a"
elif data_type == 0x1FE and value == 0:
return "jpeg"
return None
def internal_to_datetime(value):
second = value & 0x3F
minute = (value & 0xFC0) >> 6
hour = (value & 0x1F000) >> 12
day = (value & 0x3E0000) >> 17
month = (value & 0x3C00000) >> 22
year = ((value & 0xFC000000) >> 26) + 2000
return datetime(year, month, day, hour, minute, second)
length = 0
buf = bytearray()
start_time = time.time()
while True:
data = self.receive_with_timeout(20)
(
head,
version,
session,
sequence_number,
total,
cur,
msgid,
len_data,
) = struct.unpack("BB2xIIBBHI", data)
packet = self.receive_with_timeout(len_data)
frame_len = 0
if length == 0:
media = None
frame_len = 8
(data_type,) = struct.unpack(">I", packet[:4])
if data_type == 0x1FC or data_type == 0x1FE:
frame_len = 16
(
media,
metadata["fps"],
w,
h,
dt,
length,
) = struct.unpack("BBBBII", packet[4:frame_len])
metadata["width"] = w * 8
metadata["height"] = h * 8
metadata["datetime"] = internal_to_datetime(dt)
if data_type == 0x1FC:
metadata["frame"] = "I"
elif data_type == 0x1FD:
(length,) = struct.unpack("I", packet[4:frame_len])
metadata["frame"] = "P"
elif data_type == 0x1FA:
(media, samp_rate, length) = struct.unpack(
"BBH", packet[4:frame_len]
)
elif data_type == 0x1F9:
(media, n, length) = struct.unpack("BBH", packet[4:frame_len])
# special case of JPEG shapshots
elif data_type == 0xFFD8FFE0:
return packet
else:
raise ValueError(data_type)
if media is not None:
metadata["type"] = internal_to_type(data_type, media)
buf.extend(packet[frame_len:])
length -= len(packet) - frame_len
if length == 0:
return buf
elapsed_time = time.time() - start_time
if elapsed_time > self.timeout:
return None
def snapshot(self, channel=0):
command = "OPSNAP"
self.send(
self.QCODES[command],
{
"Name": command,
"SessionID": "0x%08X" % self.session,
command: {"Channel": channel},
},
wait_response=False,
)
packet = self.reassemble_bin_payload()
return packet
def start_monitor(self, frame_callback, user={}, stream="Main"):
params = {
"Channel": 0,
"CombinMode": "NONE",
"StreamType": stream,
"TransMode": "TCP",
}
data = self.set_command("OPMonitor", {"Action": "Claim", "Parameter": params})
if data["Ret"] not in self.OK_CODES:
return data
self.send(
1410,
{
"Name": "OPMonitor",
"SessionID": "0x%08X" % self.session,
"OPMonitor": {"Action": "Start", "Parameter": params},
},
wait_response=False,
)
self.monitoring = True
while self.monitoring:
meta = {}
frame = self.reassemble_bin_payload(meta)
frame_callback(frame, meta, user)
def stop_monitor(self):
self.monitoring = False
def list_local_files(self, startTime, endTime, filetype, channel = 0):
# 1440 OPFileQuery
result = []
data = self.send(
1440,
{
"Name": "OPFileQuery",
"OPFileQuery": {
"BeginTime": startTime,
"Channel": channel,
"DriverTypeMask": "0x0000FFFF",
"EndTime": endTime,
"Event": "*",
"StreamType": "0x00000000",
"Type": filetype,
},
},
)
if data == None:
self.logger.debug("Could not get files.")
raise ConnectionRefusedError("Could not get files")
# When no file can be found
if data["Ret"] != 100:
self.logger.debug(
f"No files found for channel {channel} for this time range. Start: {startTime}, End: {endTime}"
)
return []
# OPFileQuery only returns the first 64 items
# we therefore need to add the results to a list, modify the starttime with the begintime value of the last item we received and query again
# max number of results are 511
result = data["OPFileQuery"]
max_event = {"status": "init", "last_num_results": 0}
while max_event["status"] == "init" or max_event["status"] == "limit":
if max_event["status"] == "init":
max_event["status"] = "run"
while len(data["OPFileQuery"]) == 64 or max_event["status"] == "limit":
newStartTime = data["OPFileQuery"][-1]["BeginTime"]
data = self.send(
1440,
{
"Name": "OPFileQuery",
"OPFileQuery": {
"BeginTime": newStartTime,
"Channel": channel,
"DriverTypeMask": "0x0000FFFF",
"EndTime": endTime,
"Event": "*",
"StreamType": "0x00000000",
"Type": filetype,
},
},
)
result += data["OPFileQuery"]
max_event["status"] = "run"
if len(result) % 511 == 0 or max_event["status"] == "limit":
self.logger.debug("Max number of events reached...")
if len(result) == max_event["last_num_results"]:
self.logger.debug(
"No new events since last run. All events queried"
)
return result
max_event["status"] = "limit"
max_event["last_num_results"] = len(result)
self.logger.debug(f"Found {len(result)} files.")
return result
def ptz_step(self, cmd, step=5):
# To do a single step the first message will just send a tilt command which last forever
# the second command will stop the tilt movement
# that means if second message does not arrive for some reason the camera will be keep moving in that direction forever
parms_start = {
"AUX": {"Number": 0, "Status": "On"},
"Channel": 0,
"MenuOpts": "Enter",
"POINT": {"bottom": 0, "left": 0, "right": 0, "top": 0},
"Pattern": "SetBegin",
"Preset": 65535,
"Step": step,
"Tour": 0,
}
self.set_command("OPPTZControl", {"Command": cmd, "Parameter": parms_start})
parms_end = {
"AUX": {"Number": 0, "Status": "On"},
"Channel": 0,
"MenuOpts": "Enter",
"POINT": {"bottom": 0, "left": 0, "right": 0, "top": 0},
"Pattern": "SetBegin",
"Preset": -1,
"Step": step,
"Tour": 0,
}
self.set_command("OPPTZControl", {"Command": cmd, "Parameter": parms_end})
def download_file(
self, startTime, endTime, filename, targetFilePath, download=True
):
Path(targetFilePath).parent.mkdir(parents=True, exist_ok=True)
self.logger.debug(f"Downloading: {targetFilePath}")
self.send(
1424,
{
"Name": "OPPlayBack",
"OPPlayBack": {
"Action": "Claim",
"Parameter": {
"PlayMode": "ByName",
"FileName": filename,
"StreamType": 0,
"Value": 0,
"TransMode": "TCP",
# Maybe IntelligentPlayBack is needed in some edge case
# "IntelligentPlayBackEvent": "",
# "IntelligentPlayBackSpeed": 2031619,
},
"StartTime": startTime,
"EndTime": endTime,
},
},
)
actionStart = "Start"
if download:
actionStart = f"Download{actionStart}"
data = self.send_custom(
1420,
{
"Name": "OPPlayBack",
"OPPlayBack": {
"Action": actionStart,
"Parameter": {
"PlayMode": "ByName",
"FileName": filename,
"StreamType": 0,
"Value": 0,
"TransMode": "TCP",
# Maybe IntelligentPlayBack is needed in some edge case
# "IntelligentPlayBackEvent": "",
# "IntelligentPlayBackSpeed": 0,
},
"StartTime": startTime,
"EndTime": endTime,
},
},
download=True,
)
try:
with open(targetFilePath, "wb") as bin_data:
bin_data.write(data)
except TypeError:
Path(targetFilePath).unlink(missing_ok=True)
self.logger.debug(f"An error occured while downloading {targetFilePath}")
raise
self.logger.debug(f"File successfully downloaded: {targetFilePath}")
actionStop = "Stop"
if download:
actionStop = f"Download{actionStop}"
self.send(
1420,
{
"Name": "OPPlayBack",
"OPPlayBack": {
"Action": actionStop,
"Parameter": {
"FileName": filename,
"PlayMode": "ByName",
"StreamType": 0,
"TransMode": "TCP",
"Channel": 0,
"Value": 0,
# Maybe IntelligentPlayBack is needed in some edge case
# "IntelligentPlayBackEvent": "",
# "IntelligentPlayBackSpeed": 0,
},
"StartTime": startTime,
"EndTime": endTime,
},
},
)
return None
def get_channel_titles(self):
return self.get_command("ChannelTitle", 1048)
def get_channel_statuses(self):
return self.get_info("NetWork.ChnStatus")