You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

786 lines
25 KiB

import os
import struct
import json
import hashlib
import asyncio
from datetime import *
from re import compile
import time
import logging
class SomethingIsWrongWithCamera(Exception):
pass
class DVRIPCam(object):
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
CODES = {
100: "OK",
101: "Unknown error",
102: "Unsupported version",
103: "Request not permitted",
104: "User already logged in",
105: "User is not logged in",
106: "Username or password is incorrect",
107: "User does not have necessary permissions",
203: "Password is incorrect",
511: "Start of upgrade",
512: "Upgrade was not started",
513: "Upgrade data errors",
514: "Upgrade error",
515: "Upgrade successful",
}
QCODES = {
"AuthorityList": 1470,
"Users": 1472,
"Groups": 1474,
"AddGroup": 1476,
"ModifyGroup": 1478,
"DelGroup": 1480,
"AddUser": 1482,
"ModifyUser": 1484,
"DelUser": 1486,
"ModifyPassword": 1488,
"AlarmInfo": 1504,
"AlarmSet": 1500,
"ChannelTitle": 1046,
"EncodeCapability": 1360,
"General": 1042,
"KeepAlive": 1006,
"OPMachine": 1450,
"OPMailTest": 1636,
"OPMonitor": 1413,
"OPNetKeyboard": 1550,
"OPPTZControl": 1400,
"OPSNAP": 1560,
"OPSendFile": 0x5F2,
"OPSystemUpgrade": 0x5F5,
"OPTalk": 1434,
"OPTimeQuery": 1452,
"OPTimeSetting": 1450,
"NetWork.NetCommon": 1042,
"OPNetAlarm": 1506,
"SystemFunction": 1360,
"SystemInfo": 1020,
}
KEY_CODES = {
"M": "Menu",
"I": "Info",
"E": "Esc",
"F": "Func",
"S": "Shift",
"L": "Left",
"U": "Up",
"R": "Right",
"D": "Down",
}
OK_CODES = [100, 515]
PORTS = {
"tcp": 34567,
"udp": 34568,
}
def __init__(self, ip, **kwargs):
self.logger = logging.getLogger(__name__)
self.ip = ip
self.user = kwargs.get("user", "admin")
self.hash_pass = kwargs.get("hash_pass", self.sofia_hash(kwargs.get("password", "")))
self.proto = kwargs.get("proto", "tcp")
self.port = kwargs.get("port", self.PORTS.get(self.proto))
self.socket_reader = None
self.socket_writer = None
self.packet_count = 0
self.session = 0
self.alive_time = 20
self.alarm_func = None
self.timeout = 10
self.busy = asyncio.Lock()
def debug(self, format=None):
self.logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
if format:
formatter = logging.Formatter(format)
ch.setFormatter(formatter)
self.logger.addHandler(ch)
async def connect(self, timeout=10):
try:
if self.proto == "tcp":
self.socket_reader, self.socket_writer = await asyncio.wait_for(asyncio.open_connection(self.ip, self.port), timeout=timeout)
self.socket_send = self.tcp_socket_send
self.socket_recv = self.tcp_socket_recv
elif self.proto == "udp":
raise f"Unsupported protocol {self.proto} (yet)"
else:
raise f"Unsupported protocol {self.proto}"
# it's important to extend timeout for upgrade procedure
self.timeout = timeout
except OSError:
raise SomethingIsWrongWithCamera('Cannot connect to camera')
def close(self):
try:
self.socket_writer.close()
except:
pass
self.socket_writer = None
def tcp_socket_send(self, bytes):
try:
return self.socket_writer.write(bytes)
except:
return None
async def tcp_socket_recv(self, bufsize):
try:
return await self.socket_reader.read(bufsize)
except:
return None
async def receive_with_timeout(self, length):
received = 0
buf = bytearray()
start_time = time.time()
while True:
try:
data = await asyncio.wait_for(self.socket_recv(length - received), timeout=self.timeout)
buf.extend(data)
received += len(data)
if length == received:
break
elapsed_time = time.time() - start_time
if elapsed_time > self.timeout:
return None
except asyncio.TimeoutError:
return None
return buf
async def receive_json(self, length):
data = await self.receive_with_timeout(length)
if data is None:
return {}
self.packet_count += 1
self.logger.debug("<= %s", data)
reply = json.loads(data[:-2])
return reply
async def send(self, msg, data={}, wait_response=True):
if self.socket_writer is None:
return {"Ret": 101}
await self.busy.acquire()
if hasattr(data, "__iter__"):
data = bytes(json.dumps(data, ensure_ascii=False), "utf-8")
pkt = (
struct.pack(
"BB2xII2xHI",
255,
0,
self.session,
self.packet_count,
msg,
len(data) + 2,
)
+ data
+ b"\x0a\x00"
)
self.logger.debug("=> %s", pkt)
self.socket_send(pkt)
if wait_response:
reply = {"Ret": 101}
data = await self.socket_recv(20)
if data is None or len(data) < 20:
return None
(
head,
version,
self.session,
sequence_number,
msgid,
len_data,
) = struct.unpack("BB2xII2xHI", data)
reply = await self.receive_json(len_data)
self.busy.release()
return reply
def sofia_hash(self, password=""):
md5 = hashlib.md5(bytes(password, "utf-8")).digest()
chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
return "".join([chars[sum(x) % 62] for x in zip(md5[::2], md5[1::2])])
async def login(self, loop):
if self.socket_writer is None:
await self.connect()
data = await self.send(
1000,
{
"EncryptType": "MD5",
"LoginType": "DVRIP-Web",
"PassWord": self.hash_pass,
"UserName": self.user,
},
)
if data is None or data["Ret"] not in self.OK_CODES:
return False
self.session = int(data["SessionID"], 16)
self.alive_time = data["AliveInterval"]
self.keep_alive(loop)
return data["Ret"] in self.OK_CODES
async def getAuthorityList(self):
data = await self.send(self.QCODES["AuthorityList"])
if data["Ret"] in self.OK_CODES:
return data["AuthorityList"]
else:
return []
async def getGroups(self):
data = await self.send(self.QCODES["Groups"])
if data["Ret"] in self.OK_CODES:
return data["Groups"]
else:
return []
async def addGroup(self, name, comment="", auth=None):
data = await self.set_command(
"AddGroup",
{
"Group": {
"AuthorityList": auth or await self.getAuthorityList(),
"Memo": comment,
"Name": name,
},
},
)
return data["Ret"] in self.OK_CODES
async def modifyGroup(self, name, newname=None, comment=None, auth=None):
g = [x for x in await self.getGroups() if x["Name"] == name]
if g == []:
print(f'Group "{name}" not found!')
return False
g = g[0]
data = await self.send(
self.QCODES["ModifyGroup"],
{
"Group": {
"AuthorityList": auth or g["AuthorityList"],
"Memo": comment or g["Memo"],
"Name": newname or g["Name"],
},
"GroupName": name,
},
)
return data["Ret"] in self.OK_CODES
async def delGroup(self, name):
data = await self.send(
self.QCODES["DelGroup"],
{"Name": name, "SessionID": "0x%08X" % self.session,},
)
return data["Ret"] in self.OK_CODES
async def getUsers(self):
data = await self.send(self.QCODES["Users"])
if data["Ret"] in self.OK_CODES:
return data["Users"]
else:
return []
async def addUser(
self, name, password, comment="", group="user", auth=None, sharable=True
):
g = [x for x in await self.getGroups() if x["Name"] == group]
if g == []:
print(f'Group "{group}" not found!')
return False
g = g[0]
data = await self.set_command(
"AddUser",
{
"User": {
"AuthorityList": auth or g["AuthorityList"],
"Group": g["Name"],
"Memo": comment,
"Name": name,
"Password": self.sofia_hash(password),
"Reserved": False,
"Sharable": sharable,
},
},
)
return data["Ret"] in self.OK_CODES
async def modifyUser(
self, name, newname=None, comment=None, group=None, auth=None, sharable=None
):
u = [x for x in self.getUsers() if x["Name"] == name]
if u == []:
print(f'User "{name}" not found!')
return False
u = u[0]
if group:
g = [x for x in await self.getGroups() if x["Name"] == group]
if g == []:
print(f'Group "{group}" not found!')
return False
u["AuthorityList"] = g[0]["AuthorityList"]
data = await self.send(
self.QCODES["ModifyUser"],
{
"User": {
"AuthorityList": auth or u["AuthorityList"],
"Group": group or u["Group"],
"Memo": comment or u["Memo"],
"Name": newname or u["Name"],
"Password": "",
"Reserved": u["Reserved"],
"Sharable": sharable or u["Sharable"],
},
"UserName": name,
},
)
return data["Ret"] in self.OK_CODES
async def delUser(self, name):
data = await self.send(
self.QCODES["DelUser"],
{"Name": name, "SessionID": "0x%08X" % self.session,},
)
return data["Ret"] in self.OK_CODES
async def changePasswd(self, newpass="", oldpass=None, user=None):
data = await self.send(
self.QCODES["ModifyPassword"],
{
"EncryptType": "MD5",
"NewPassWord": self.sofia_hash(newpass),
"PassWord": oldpass or self.password,
"SessionID": "0x%08X" % self.session,
"UserName": user or self.user,
},
)
return data["Ret"] in self.OK_CODES
async def channel_title(self, titles):
if isinstance(titles, str):
titles = [titles]
await self.send(
self.QCODES["ChannelTitle"],
{
"ChannelTitle": titles,
"Name": "ChannelTitle",
"SessionID": "0x%08X" % self.session,
},
)
async def channel_bitmap(self, width, height, bitmap):
header = struct.pack("HH12x", width, height)
self.socket_send(
struct.pack(
"BB2xII2xHI",
255,
0,
self.session,
self.packet_count,
0x041A,
len(bitmap) + 16,
)
+ header
+ bitmap
)
reply, rcvd = await self.recv_json()
if reply and reply["Ret"] != 100:
return False
return True
async def reboot(self):
await self.set_command("OPMachine", {"Action": "Reboot"})
self.close()
def setAlarm(self, func):
self.alarm_func = func
def clearAlarm(self):
self.alarm_func = None
async def alarmStart(self, loop):
loop.create_task(self.alarm_worker())
return await self.get_command("", self.QCODES["AlarmSet"])
async def alarm_worker(self):
while self.socket_writer:
await self.busy.acquire()
try:
(
head,
version,
session,
sequence_number,
msgid,
len_data,
) = struct.unpack("BB2xII2xHI", await self.socket_recv(20))
await asyncio.sleep(0.1) # Just for receive whole packet
reply = await self.socket_recv(len_data)
self.packet_count += 1
reply = json.loads(reply[:-2])
if msgid == self.QCODES["AlarmInfo"] and self.session == session:
if self.alarm_func is not None:
self.alarm_func(reply[reply["Name"]], sequence_number)
except:
pass
finally:
self.busy.release()
async def set_remote_alarm(self, state):
await self.set_command(
"OPNetAlarm", {"Event": 0, "State": state},
)
async def keep_alive_workner(self):
while self.socket_writer:
ret = await self.send(
self.QCODES["KeepAlive"],
{"Name": "KeepAlive", "SessionID": "0x%08X" % self.session},
)
if ret is None:
self.close()
break
await asyncio.sleep(self.alive_time)
def keep_alive(self, loop):
loop.create_task(self.keep_alive_workner())
async def keyDown(self, key):
await self.set_command(
"OPNetKeyboard", {"Status": "KeyDown", "Value": key},
)
async def keyUp(self, key):
await self.set_command(
"OPNetKeyboard", {"Status": "KeyUp", "Value": key},
)
async def keyPress(self, key):
await self.keyDown(key)
await asyncio.sleep(0.3)
await self.keyUp(key)
async def keyScript(self, keys):
for k in keys:
if k != " " and k.upper() in self.KEY_CODES:
await self.keyPress(self.KEY_CODES[k.upper()])
else:
await asyncio.sleep(1)
async def ptz(self, cmd, step=5, preset=-1, ch=0):
CMDS = [
"DirectionUp",
"DirectionDown",
"DirectionLeft",
"DirectionRight",
"DirectionLeftUp",
"DirectionLeftDown",
"DirectionRightUp",
"DirectionRightDown",
"ZoomTile",
"ZoomWide",
"FocusNear",
"FocusFar",
"IrisSmall",
"IrisLarge",
"SetPreset",
"GotoPreset",
"ClearPreset",
"StartTour",
"StopTour",
]
# ptz_param = { "AUX" : { "Number" : 0, "Status" : "On" }, "Channel" : ch, "MenuOpts" : "Enter", "POINT" : { "bottom" : 0, "left" : 0, "right" : 0, "top" : 0 }, "Pattern" : "SetBegin", "Preset" : -1, "Step" : 5, "Tour" : 0 }
ptz_param = {
"AUX": {"Number": 0, "Status": "On"},
"Channel": ch,
"MenuOpts": "Enter",
"Pattern": "Start",
"Preset": preset,
"Step": step,
"Tour": 1 if "Tour" in cmd else 0,
}
return await self.set_command(
"OPPTZControl", {"Command": cmd, "Parameter": ptz_param},
)
async def set_info(self, command, data):
return await self.set_command(command, data, 1040)
async def set_command(self, command, data, code=None):
if not code:
code = self.QCODES[command]
return await self.send(
code, {"Name": command, "SessionID": "0x%08X" % self.session, command: data}
)
async def get_info(self, command):
return await self.get_command(command, 1042)
async def get_command(self, command, code=None):
if not code:
code = self.QCODES[command]
data = await self.send(code, {"Name": command, "SessionID": "0x%08X" % self.session})
if data["Ret"] in self.OK_CODES and command in data:
return data[command]
else:
return data
async def get_time(self):
return datetime.strptime(await self.get_command("OPTimeQuery"), self.DATE_FORMAT)
async def set_time(self, time=None):
if time is None:
time = datetime.now()
return await self.set_command("OPTimeSetting", time.strftime(self.DATE_FORMAT))
async def get_netcommon(self):
return await self.get_command("NetWork.NetCommon")
async def get_system_info(self):
return await self.get_command("SystemInfo")
async def get_general_info(self):
return await self.get_command("General")
async def get_encode_capabilities(self):
return await self.get_command("EncodeCapability")
async def get_system_capabilities(self):
return await self.get_command("SystemFunction")
async def get_camera_info(self, default_config=False):
"""Request data for 'Camera' from the target DVRIP device."""
if default_config:
code = 1044
else:
code = 1042
return await self.get_command("Camera", code)
async def get_encode_info(self, default_config=False):
"""Request data for 'Simplify.Encode' from the target DVRIP device.
Arguments:
default_config -- returns the default values for the type if True
"""
if default_config:
code = 1044
else:
code = 1042
return await self.get_command("Simplify.Encode", code)
async def recv_json(self, buf=bytearray()):
p = compile(b".*({.*})")
packet = await self.socket_recv(0xFFFF)
if not packet:
return None, buf
buf.extend(packet)
m = p.search(buf)
if m is None:
return None, buf
buf = buf[m.span(1)[1] :]
return json.loads(m.group(1)), buf
async def get_upgrade_info(self):
return await self.get_command("OPSystemUpgrade")
async def upgrade(self, filename="", packetsize=0x8000, vprint=None):
if not vprint:
vprint = lambda x: print(x)
data = await self.set_command(
"OPSystemUpgrade", {"Action": "Start", "Type": "System"}, 0x5F0
)
if data["Ret"] not in self.OK_CODES:
return data
vprint("Ready to upgrade")
blocknum = 0
sentbytes = 0
fsize = os.stat(filename).st_size
rcvd = bytearray()
with open(filename, "rb") as f:
while True:
bytes = f.read(packetsize)
if not bytes:
break
header = struct.pack(
"BB2xII2xHI", 255, 0, self.session, blocknum, 0x5F2, len(bytes)
)
self.socket_send(header + bytes)
blocknum += 1
sentbytes += len(bytes)
reply, rcvd = await self.recv_json(rcvd)
if reply and reply["Ret"] != 100:
vprint("Upgrade failed")
return reply
progress = sentbytes / fsize * 100
vprint(f"Uploaded {progress:.2f}%")
vprint("End of file")
pkt = struct.pack("BB2xIIxBHI", 255, 0, self.session, blocknum, 1, 0x05F2, 0)
self.socket_send(pkt)
vprint("Waiting for upgrade...")
while True:
reply, rcvd = await self.recv_json(rcvd)
print(reply)
if not reply:
return
if reply["Name"] == "" and reply["Ret"] == 100:
break
while True:
data, rcvd = await self.recv_json(rcvd)
print(reply)
if data is None:
vprint("Done")
return
if data["Ret"] in [512, 514, 513]:
vprint("Upgrade failed")
return data
if data["Ret"] == 515:
vprint("Upgrade successful")
self.close()
return data
vprint(f"Upgraded {data['Ret']}%")
async def reassemble_bin_payload(self, metadata={}):
def internal_to_type(data_type, value):
if data_type == 0x1FC or data_type == 0x1FD:
if value == 1:
return "mpeg4"
elif value == 2:
return "h264"
elif value == 3:
return "h265"
elif data_type == 0x1F9:
if value == 1 or value == 6:
return "info"
elif data_type == 0x1FA:
if value == 0xE:
return "g711a"
elif data_type == 0x1FE and value == 0:
return "jpeg"
return None
def internal_to_datetime(value):
second = value & 0x3F
minute = (value & 0xFC0) >> 6
hour = (value & 0x1F000) >> 12
day = (value & 0x3E0000) >> 17
month = (value & 0x3C00000) >> 22
year = ((value & 0xFC000000) >> 26) + 2000
return datetime(year, month, day, hour, minute, second)
length = 0
buf = bytearray()
start_time = time.time()
while True:
data = await self.receive_with_timeout(20)
(
head,
version,
session,
sequence_number,
total,
cur,
msgid,
len_data,
) = struct.unpack("BB2xIIBBHI", data)
packet = await self.receive_with_timeout(len_data)
frame_len = 0
if length == 0:
media = None
frame_len = 8
(data_type,) = struct.unpack(">I", packet[:4])
if data_type == 0x1FC or data_type == 0x1FE:
frame_len = 16
(media, metadata["fps"], w, h, dt, length,) = struct.unpack(
"BBBBII", packet[4:frame_len]
)
metadata["width"] = w * 8
metadata["height"] = h * 8
metadata["datetime"] = internal_to_datetime(dt)
if data_type == 0x1FC:
metadata["frame"] = "I"
elif data_type == 0x1FD:
(length,) = struct.unpack("I", packet[4:frame_len])
metadata["frame"] = "P"
elif data_type == 0x1FA:
(media, samp_rate, length) = struct.unpack(
"BBH", packet[4:frame_len]
)
elif data_type == 0x1F9:
(media, n, length) = struct.unpack("BBH", packet[4:frame_len])
# special case of JPEG shapshots
elif data_type == 0xFFD8FFE0:
return packet
else:
raise ValueError(data_type)
if media is not None:
metadata["type"] = internal_to_type(data_type, media)
buf.extend(packet[frame_len:])
length -= len(packet) - frame_len
if length == 0:
return buf
elapsed_time = time.time() - start_time
if elapsed_time > self.timeout:
return None
async def snapshot(self, channel=0):
command = "OPSNAP"
await self.send(
self.QCODES[command],
{
"Name": command,
"SessionID": "0x%08X" % self.session,
command: {"Channel": channel},
},
wait_response=False,
)
packet = await self.reassemble_bin_payload()
return packet
async def start_monitor(self, frame_callback, user={}, stream="Main"):
params = {
"Channel": 0,
"CombinMode": "NONE",
"StreamType": stream,
"TransMode": "TCP",
}
data = await self.set_command("OPMonitor", {"Action": "Claim", "Parameter": params})
if data["Ret"] not in self.OK_CODES:
return data
await self.send(
1410,
{
"Name": "OPMonitor",
"SessionID": "0x%08X" % self.session,
"OPMonitor": {"Action": "Start", "Parameter": params},
},
wait_response=False,
)
self.monitoring = True
while self.monitoring:
meta = {}
frame = await self.reassemble_bin_payload(meta)
frame_callback(frame, meta, user)
def stop_monitor(self):
self.monitoring = False