mirror of https://github.com/OpenIPC/python-dvr
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
786 lines
25 KiB
786 lines
25 KiB
import os
|
|
import struct
|
|
import json
|
|
import hashlib
|
|
import asyncio
|
|
from datetime import *
|
|
from re import compile
|
|
import time
|
|
import logging
|
|
|
|
class SomethingIsWrongWithCamera(Exception):
|
|
pass
|
|
|
|
class DVRIPCam(object):
|
|
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
|
CODES = {
|
|
100: "OK",
|
|
101: "Unknown error",
|
|
102: "Unsupported version",
|
|
103: "Request not permitted",
|
|
104: "User already logged in",
|
|
105: "User is not logged in",
|
|
106: "Username or password is incorrect",
|
|
107: "User does not have necessary permissions",
|
|
203: "Password is incorrect",
|
|
511: "Start of upgrade",
|
|
512: "Upgrade was not started",
|
|
513: "Upgrade data errors",
|
|
514: "Upgrade error",
|
|
515: "Upgrade successful",
|
|
}
|
|
QCODES = {
|
|
"AuthorityList": 1470,
|
|
"Users": 1472,
|
|
"Groups": 1474,
|
|
"AddGroup": 1476,
|
|
"ModifyGroup": 1478,
|
|
"DelGroup": 1480,
|
|
"AddUser": 1482,
|
|
"ModifyUser": 1484,
|
|
"DelUser": 1486,
|
|
"ModifyPassword": 1488,
|
|
"AlarmInfo": 1504,
|
|
"AlarmSet": 1500,
|
|
"ChannelTitle": 1046,
|
|
"EncodeCapability": 1360,
|
|
"General": 1042,
|
|
"KeepAlive": 1006,
|
|
"OPMachine": 1450,
|
|
"OPMailTest": 1636,
|
|
"OPMonitor": 1413,
|
|
"OPNetKeyboard": 1550,
|
|
"OPPTZControl": 1400,
|
|
"OPSNAP": 1560,
|
|
"OPSendFile": 0x5F2,
|
|
"OPSystemUpgrade": 0x5F5,
|
|
"OPTalk": 1434,
|
|
"OPTimeQuery": 1452,
|
|
"OPTimeSetting": 1450,
|
|
"NetWork.NetCommon": 1042,
|
|
"OPNetAlarm": 1506,
|
|
"SystemFunction": 1360,
|
|
"SystemInfo": 1020,
|
|
}
|
|
KEY_CODES = {
|
|
"M": "Menu",
|
|
"I": "Info",
|
|
"E": "Esc",
|
|
"F": "Func",
|
|
"S": "Shift",
|
|
"L": "Left",
|
|
"U": "Up",
|
|
"R": "Right",
|
|
"D": "Down",
|
|
}
|
|
OK_CODES = [100, 515]
|
|
PORTS = {
|
|
"tcp": 34567,
|
|
"udp": 34568,
|
|
}
|
|
|
|
def __init__(self, ip, **kwargs):
|
|
self.logger = logging.getLogger(__name__)
|
|
self.ip = ip
|
|
self.user = kwargs.get("user", "admin")
|
|
self.hash_pass = kwargs.get("hash_pass", self.sofia_hash(kwargs.get("password", "")))
|
|
self.proto = kwargs.get("proto", "tcp")
|
|
self.port = kwargs.get("port", self.PORTS.get(self.proto))
|
|
self.socket_reader = None
|
|
self.socket_writer = None
|
|
self.packet_count = 0
|
|
self.session = 0
|
|
self.alive_time = 20
|
|
self.alarm_func = None
|
|
self.timeout = 10
|
|
self.busy = asyncio.Lock()
|
|
|
|
def debug(self, format=None):
|
|
self.logger.setLevel(logging.DEBUG)
|
|
ch = logging.StreamHandler()
|
|
if format:
|
|
formatter = logging.Formatter(format)
|
|
ch.setFormatter(formatter)
|
|
self.logger.addHandler(ch)
|
|
|
|
async def connect(self, timeout=10):
|
|
try:
|
|
if self.proto == "tcp":
|
|
self.socket_reader, self.socket_writer = await asyncio.wait_for(asyncio.open_connection(self.ip, self.port), timeout=timeout)
|
|
self.socket_send = self.tcp_socket_send
|
|
self.socket_recv = self.tcp_socket_recv
|
|
elif self.proto == "udp":
|
|
raise f"Unsupported protocol {self.proto} (yet)"
|
|
else:
|
|
raise f"Unsupported protocol {self.proto}"
|
|
|
|
# it's important to extend timeout for upgrade procedure
|
|
self.timeout = timeout
|
|
except OSError:
|
|
raise SomethingIsWrongWithCamera('Cannot connect to camera')
|
|
|
|
def close(self):
|
|
try:
|
|
self.socket_writer.close()
|
|
except:
|
|
pass
|
|
self.socket_writer = None
|
|
|
|
def tcp_socket_send(self, bytes):
|
|
try:
|
|
return self.socket_writer.write(bytes)
|
|
except:
|
|
return None
|
|
|
|
async def tcp_socket_recv(self, bufsize):
|
|
try:
|
|
return await self.socket_reader.read(bufsize)
|
|
except:
|
|
return None
|
|
|
|
async def receive_with_timeout(self, length):
|
|
received = 0
|
|
buf = bytearray()
|
|
start_time = time.time()
|
|
|
|
while True:
|
|
try:
|
|
data = await asyncio.wait_for(self.socket_recv(length - received), timeout=self.timeout)
|
|
buf.extend(data)
|
|
received += len(data)
|
|
if length == received:
|
|
break
|
|
elapsed_time = time.time() - start_time
|
|
if elapsed_time > self.timeout:
|
|
return None
|
|
except asyncio.TimeoutError:
|
|
return None
|
|
return buf
|
|
|
|
async def receive_json(self, length):
|
|
data = await self.receive_with_timeout(length)
|
|
if data is None:
|
|
return {}
|
|
|
|
self.packet_count += 1
|
|
self.logger.debug("<= %s", data)
|
|
reply = json.loads(data[:-2])
|
|
return reply
|
|
|
|
async def send(self, msg, data={}, wait_response=True):
|
|
if self.socket_writer is None:
|
|
return {"Ret": 101}
|
|
await self.busy.acquire()
|
|
if hasattr(data, "__iter__"):
|
|
data = bytes(json.dumps(data, ensure_ascii=False), "utf-8")
|
|
pkt = (
|
|
struct.pack(
|
|
"BB2xII2xHI",
|
|
255,
|
|
0,
|
|
self.session,
|
|
self.packet_count,
|
|
msg,
|
|
len(data) + 2,
|
|
)
|
|
+ data
|
|
+ b"\x0a\x00"
|
|
)
|
|
self.logger.debug("=> %s", pkt)
|
|
self.socket_send(pkt)
|
|
if wait_response:
|
|
reply = {"Ret": 101}
|
|
data = await self.socket_recv(20)
|
|
if data is None or len(data) < 20:
|
|
return None
|
|
(
|
|
head,
|
|
version,
|
|
self.session,
|
|
sequence_number,
|
|
msgid,
|
|
len_data,
|
|
) = struct.unpack("BB2xII2xHI", data)
|
|
reply = await self.receive_json(len_data)
|
|
self.busy.release()
|
|
return reply
|
|
|
|
def sofia_hash(self, password=""):
|
|
md5 = hashlib.md5(bytes(password, "utf-8")).digest()
|
|
chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
|
return "".join([chars[sum(x) % 62] for x in zip(md5[::2], md5[1::2])])
|
|
|
|
async def login(self, loop):
|
|
if self.socket_writer is None:
|
|
await self.connect()
|
|
data = await self.send(
|
|
1000,
|
|
{
|
|
"EncryptType": "MD5",
|
|
"LoginType": "DVRIP-Web",
|
|
"PassWord": self.hash_pass,
|
|
"UserName": self.user,
|
|
},
|
|
)
|
|
if data is None or data["Ret"] not in self.OK_CODES:
|
|
return False
|
|
self.session = int(data["SessionID"], 16)
|
|
self.alive_time = data["AliveInterval"]
|
|
self.keep_alive(loop)
|
|
return data["Ret"] in self.OK_CODES
|
|
|
|
async def getAuthorityList(self):
|
|
data = await self.send(self.QCODES["AuthorityList"])
|
|
if data["Ret"] in self.OK_CODES:
|
|
return data["AuthorityList"]
|
|
else:
|
|
return []
|
|
|
|
async def getGroups(self):
|
|
data = await self.send(self.QCODES["Groups"])
|
|
if data["Ret"] in self.OK_CODES:
|
|
return data["Groups"]
|
|
else:
|
|
return []
|
|
|
|
async def addGroup(self, name, comment="", auth=None):
|
|
data = await self.set_command(
|
|
"AddGroup",
|
|
{
|
|
"Group": {
|
|
"AuthorityList": auth or await self.getAuthorityList(),
|
|
"Memo": comment,
|
|
"Name": name,
|
|
},
|
|
},
|
|
)
|
|
return data["Ret"] in self.OK_CODES
|
|
|
|
async def modifyGroup(self, name, newname=None, comment=None, auth=None):
|
|
g = [x for x in await self.getGroups() if x["Name"] == name]
|
|
if g == []:
|
|
print(f'Group "{name}" not found!')
|
|
return False
|
|
g = g[0]
|
|
data = await self.send(
|
|
self.QCODES["ModifyGroup"],
|
|
{
|
|
"Group": {
|
|
"AuthorityList": auth or g["AuthorityList"],
|
|
"Memo": comment or g["Memo"],
|
|
"Name": newname or g["Name"],
|
|
},
|
|
"GroupName": name,
|
|
},
|
|
)
|
|
return data["Ret"] in self.OK_CODES
|
|
|
|
async def delGroup(self, name):
|
|
data = await self.send(
|
|
self.QCODES["DelGroup"],
|
|
{"Name": name, "SessionID": "0x%08X" % self.session,},
|
|
)
|
|
return data["Ret"] in self.OK_CODES
|
|
|
|
async def getUsers(self):
|
|
data = await self.send(self.QCODES["Users"])
|
|
if data["Ret"] in self.OK_CODES:
|
|
return data["Users"]
|
|
else:
|
|
return []
|
|
|
|
async def addUser(
|
|
self, name, password, comment="", group="user", auth=None, sharable=True
|
|
):
|
|
g = [x for x in await self.getGroups() if x["Name"] == group]
|
|
if g == []:
|
|
print(f'Group "{group}" not found!')
|
|
return False
|
|
g = g[0]
|
|
data = await self.set_command(
|
|
"AddUser",
|
|
{
|
|
"User": {
|
|
"AuthorityList": auth or g["AuthorityList"],
|
|
"Group": g["Name"],
|
|
"Memo": comment,
|
|
"Name": name,
|
|
"Password": self.sofia_hash(password),
|
|
"Reserved": False,
|
|
"Sharable": sharable,
|
|
},
|
|
},
|
|
)
|
|
return data["Ret"] in self.OK_CODES
|
|
|
|
async def modifyUser(
|
|
self, name, newname=None, comment=None, group=None, auth=None, sharable=None
|
|
):
|
|
u = [x for x in self.getUsers() if x["Name"] == name]
|
|
if u == []:
|
|
print(f'User "{name}" not found!')
|
|
return False
|
|
u = u[0]
|
|
if group:
|
|
g = [x for x in await self.getGroups() if x["Name"] == group]
|
|
if g == []:
|
|
print(f'Group "{group}" not found!')
|
|
return False
|
|
u["AuthorityList"] = g[0]["AuthorityList"]
|
|
data = await self.send(
|
|
self.QCODES["ModifyUser"],
|
|
{
|
|
"User": {
|
|
"AuthorityList": auth or u["AuthorityList"],
|
|
"Group": group or u["Group"],
|
|
"Memo": comment or u["Memo"],
|
|
"Name": newname or u["Name"],
|
|
"Password": "",
|
|
"Reserved": u["Reserved"],
|
|
"Sharable": sharable or u["Sharable"],
|
|
},
|
|
"UserName": name,
|
|
},
|
|
)
|
|
return data["Ret"] in self.OK_CODES
|
|
|
|
async def delUser(self, name):
|
|
data = await self.send(
|
|
self.QCODES["DelUser"],
|
|
{"Name": name, "SessionID": "0x%08X" % self.session,},
|
|
)
|
|
return data["Ret"] in self.OK_CODES
|
|
|
|
async def changePasswd(self, newpass="", oldpass=None, user=None):
|
|
data = await self.send(
|
|
self.QCODES["ModifyPassword"],
|
|
{
|
|
"EncryptType": "MD5",
|
|
"NewPassWord": self.sofia_hash(newpass),
|
|
"PassWord": oldpass or self.password,
|
|
"SessionID": "0x%08X" % self.session,
|
|
"UserName": user or self.user,
|
|
},
|
|
)
|
|
return data["Ret"] in self.OK_CODES
|
|
|
|
async def channel_title(self, titles):
|
|
if isinstance(titles, str):
|
|
titles = [titles]
|
|
await self.send(
|
|
self.QCODES["ChannelTitle"],
|
|
{
|
|
"ChannelTitle": titles,
|
|
"Name": "ChannelTitle",
|
|
"SessionID": "0x%08X" % self.session,
|
|
},
|
|
)
|
|
|
|
async def channel_bitmap(self, width, height, bitmap):
|
|
header = struct.pack("HH12x", width, height)
|
|
self.socket_send(
|
|
struct.pack(
|
|
"BB2xII2xHI",
|
|
255,
|
|
0,
|
|
self.session,
|
|
self.packet_count,
|
|
0x041A,
|
|
len(bitmap) + 16,
|
|
)
|
|
+ header
|
|
+ bitmap
|
|
)
|
|
reply, rcvd = await self.recv_json()
|
|
if reply and reply["Ret"] != 100:
|
|
return False
|
|
return True
|
|
|
|
async def reboot(self):
|
|
await self.set_command("OPMachine", {"Action": "Reboot"})
|
|
self.close()
|
|
|
|
def setAlarm(self, func):
|
|
self.alarm_func = func
|
|
|
|
def clearAlarm(self):
|
|
self.alarm_func = None
|
|
|
|
async def alarmStart(self, loop):
|
|
loop.create_task(self.alarm_worker())
|
|
|
|
return await self.get_command("", self.QCODES["AlarmSet"])
|
|
|
|
async def alarm_worker(self):
|
|
while self.socket_writer:
|
|
await self.busy.acquire()
|
|
try:
|
|
(
|
|
head,
|
|
version,
|
|
session,
|
|
sequence_number,
|
|
msgid,
|
|
len_data,
|
|
) = struct.unpack("BB2xII2xHI", await self.socket_recv(20))
|
|
await asyncio.sleep(0.1) # Just for receive whole packet
|
|
reply = await self.socket_recv(len_data)
|
|
self.packet_count += 1
|
|
reply = json.loads(reply[:-2])
|
|
if msgid == self.QCODES["AlarmInfo"] and self.session == session:
|
|
if self.alarm_func is not None:
|
|
self.alarm_func(reply[reply["Name"]], sequence_number)
|
|
except:
|
|
pass
|
|
finally:
|
|
self.busy.release()
|
|
|
|
async def set_remote_alarm(self, state):
|
|
await self.set_command(
|
|
"OPNetAlarm", {"Event": 0, "State": state},
|
|
)
|
|
|
|
async def keep_alive_workner(self):
|
|
while self.socket_writer:
|
|
|
|
ret = await self.send(
|
|
self.QCODES["KeepAlive"],
|
|
{"Name": "KeepAlive", "SessionID": "0x%08X" % self.session},
|
|
)
|
|
if ret is None:
|
|
self.close()
|
|
break
|
|
|
|
await asyncio.sleep(self.alive_time)
|
|
|
|
def keep_alive(self, loop):
|
|
loop.create_task(self.keep_alive_workner())
|
|
|
|
async def keyDown(self, key):
|
|
await self.set_command(
|
|
"OPNetKeyboard", {"Status": "KeyDown", "Value": key},
|
|
)
|
|
|
|
async def keyUp(self, key):
|
|
await self.set_command(
|
|
"OPNetKeyboard", {"Status": "KeyUp", "Value": key},
|
|
)
|
|
|
|
async def keyPress(self, key):
|
|
await self.keyDown(key)
|
|
await asyncio.sleep(0.3)
|
|
await self.keyUp(key)
|
|
|
|
async def keyScript(self, keys):
|
|
for k in keys:
|
|
if k != " " and k.upper() in self.KEY_CODES:
|
|
await self.keyPress(self.KEY_CODES[k.upper()])
|
|
else:
|
|
await asyncio.sleep(1)
|
|
|
|
async def ptz(self, cmd, step=5, preset=-1, ch=0):
|
|
CMDS = [
|
|
"DirectionUp",
|
|
"DirectionDown",
|
|
"DirectionLeft",
|
|
"DirectionRight",
|
|
"DirectionLeftUp",
|
|
"DirectionLeftDown",
|
|
"DirectionRightUp",
|
|
"DirectionRightDown",
|
|
"ZoomTile",
|
|
"ZoomWide",
|
|
"FocusNear",
|
|
"FocusFar",
|
|
"IrisSmall",
|
|
"IrisLarge",
|
|
"SetPreset",
|
|
"GotoPreset",
|
|
"ClearPreset",
|
|
"StartTour",
|
|
"StopTour",
|
|
]
|
|
# ptz_param = { "AUX" : { "Number" : 0, "Status" : "On" }, "Channel" : ch, "MenuOpts" : "Enter", "POINT" : { "bottom" : 0, "left" : 0, "right" : 0, "top" : 0 }, "Pattern" : "SetBegin", "Preset" : -1, "Step" : 5, "Tour" : 0 }
|
|
ptz_param = {
|
|
"AUX": {"Number": 0, "Status": "On"},
|
|
"Channel": ch,
|
|
"MenuOpts": "Enter",
|
|
"Pattern": "Start",
|
|
"Preset": preset,
|
|
"Step": step,
|
|
"Tour": 1 if "Tour" in cmd else 0,
|
|
}
|
|
return await self.set_command(
|
|
"OPPTZControl", {"Command": cmd, "Parameter": ptz_param},
|
|
)
|
|
|
|
async def set_info(self, command, data):
|
|
return await self.set_command(command, data, 1040)
|
|
|
|
async def set_command(self, command, data, code=None):
|
|
if not code:
|
|
code = self.QCODES[command]
|
|
return await self.send(
|
|
code, {"Name": command, "SessionID": "0x%08X" % self.session, command: data}
|
|
)
|
|
|
|
async def get_info(self, command):
|
|
return await self.get_command(command, 1042)
|
|
|
|
async def get_command(self, command, code=None):
|
|
if not code:
|
|
code = self.QCODES[command]
|
|
|
|
data = await self.send(code, {"Name": command, "SessionID": "0x%08X" % self.session})
|
|
if data["Ret"] in self.OK_CODES and command in data:
|
|
return data[command]
|
|
else:
|
|
return data
|
|
|
|
async def get_time(self):
|
|
return datetime.strptime(await self.get_command("OPTimeQuery"), self.DATE_FORMAT)
|
|
|
|
async def set_time(self, time=None):
|
|
if time is None:
|
|
time = datetime.now()
|
|
return await self.set_command("OPTimeSetting", time.strftime(self.DATE_FORMAT))
|
|
|
|
async def get_netcommon(self):
|
|
return await self.get_command("NetWork.NetCommon")
|
|
|
|
async def get_system_info(self):
|
|
return await self.get_command("SystemInfo")
|
|
|
|
async def get_general_info(self):
|
|
return await self.get_command("General")
|
|
|
|
async def get_encode_capabilities(self):
|
|
return await self.get_command("EncodeCapability")
|
|
|
|
async def get_system_capabilities(self):
|
|
return await self.get_command("SystemFunction")
|
|
|
|
async def get_camera_info(self, default_config=False):
|
|
"""Request data for 'Camera' from the target DVRIP device."""
|
|
if default_config:
|
|
code = 1044
|
|
else:
|
|
code = 1042
|
|
return await self.get_command("Camera", code)
|
|
|
|
async def get_encode_info(self, default_config=False):
|
|
"""Request data for 'Simplify.Encode' from the target DVRIP device.
|
|
|
|
Arguments:
|
|
default_config -- returns the default values for the type if True
|
|
"""
|
|
if default_config:
|
|
code = 1044
|
|
else:
|
|
code = 1042
|
|
return await self.get_command("Simplify.Encode", code)
|
|
|
|
async def recv_json(self, buf=bytearray()):
|
|
p = compile(b".*({.*})")
|
|
|
|
packet = await self.socket_recv(0xFFFF)
|
|
if not packet:
|
|
return None, buf
|
|
buf.extend(packet)
|
|
m = p.search(buf)
|
|
if m is None:
|
|
return None, buf
|
|
buf = buf[m.span(1)[1] :]
|
|
return json.loads(m.group(1)), buf
|
|
|
|
async def get_upgrade_info(self):
|
|
return await self.get_command("OPSystemUpgrade")
|
|
|
|
async def upgrade(self, filename="", packetsize=0x8000, vprint=None):
|
|
if not vprint:
|
|
vprint = lambda x: print(x)
|
|
|
|
data = await self.set_command(
|
|
"OPSystemUpgrade", {"Action": "Start", "Type": "System"}, 0x5F0
|
|
)
|
|
if data["Ret"] not in self.OK_CODES:
|
|
return data
|
|
|
|
vprint("Ready to upgrade")
|
|
blocknum = 0
|
|
sentbytes = 0
|
|
fsize = os.stat(filename).st_size
|
|
rcvd = bytearray()
|
|
with open(filename, "rb") as f:
|
|
while True:
|
|
bytes = f.read(packetsize)
|
|
if not bytes:
|
|
break
|
|
header = struct.pack(
|
|
"BB2xII2xHI", 255, 0, self.session, blocknum, 0x5F2, len(bytes)
|
|
)
|
|
self.socket_send(header + bytes)
|
|
blocknum += 1
|
|
sentbytes += len(bytes)
|
|
|
|
reply, rcvd = await self.recv_json(rcvd)
|
|
if reply and reply["Ret"] != 100:
|
|
vprint("Upgrade failed")
|
|
return reply
|
|
|
|
progress = sentbytes / fsize * 100
|
|
vprint(f"Uploaded {progress:.2f}%")
|
|
vprint("End of file")
|
|
|
|
pkt = struct.pack("BB2xIIxBHI", 255, 0, self.session, blocknum, 1, 0x05F2, 0)
|
|
self.socket_send(pkt)
|
|
vprint("Waiting for upgrade...")
|
|
while True:
|
|
reply, rcvd = await self.recv_json(rcvd)
|
|
print(reply)
|
|
if not reply:
|
|
return
|
|
if reply["Name"] == "" and reply["Ret"] == 100:
|
|
break
|
|
|
|
while True:
|
|
data, rcvd = await self.recv_json(rcvd)
|
|
print(reply)
|
|
if data is None:
|
|
vprint("Done")
|
|
return
|
|
if data["Ret"] in [512, 514, 513]:
|
|
vprint("Upgrade failed")
|
|
return data
|
|
if data["Ret"] == 515:
|
|
vprint("Upgrade successful")
|
|
self.close()
|
|
return data
|
|
vprint(f"Upgraded {data['Ret']}%")
|
|
|
|
async def reassemble_bin_payload(self, metadata={}):
|
|
def internal_to_type(data_type, value):
|
|
if data_type == 0x1FC or data_type == 0x1FD:
|
|
if value == 1:
|
|
return "mpeg4"
|
|
elif value == 2:
|
|
return "h264"
|
|
elif value == 3:
|
|
return "h265"
|
|
elif data_type == 0x1F9:
|
|
if value == 1 or value == 6:
|
|
return "info"
|
|
elif data_type == 0x1FA:
|
|
if value == 0xE:
|
|
return "g711a"
|
|
elif data_type == 0x1FE and value == 0:
|
|
return "jpeg"
|
|
return None
|
|
|
|
def internal_to_datetime(value):
|
|
second = value & 0x3F
|
|
minute = (value & 0xFC0) >> 6
|
|
hour = (value & 0x1F000) >> 12
|
|
day = (value & 0x3E0000) >> 17
|
|
month = (value & 0x3C00000) >> 22
|
|
year = ((value & 0xFC000000) >> 26) + 2000
|
|
return datetime(year, month, day, hour, minute, second)
|
|
|
|
length = 0
|
|
buf = bytearray()
|
|
start_time = time.time()
|
|
|
|
while True:
|
|
data = await self.receive_with_timeout(20)
|
|
(
|
|
head,
|
|
version,
|
|
session,
|
|
sequence_number,
|
|
total,
|
|
cur,
|
|
msgid,
|
|
len_data,
|
|
) = struct.unpack("BB2xIIBBHI", data)
|
|
packet = await self.receive_with_timeout(len_data)
|
|
frame_len = 0
|
|
if length == 0:
|
|
media = None
|
|
frame_len = 8
|
|
(data_type,) = struct.unpack(">I", packet[:4])
|
|
if data_type == 0x1FC or data_type == 0x1FE:
|
|
frame_len = 16
|
|
(media, metadata["fps"], w, h, dt, length,) = struct.unpack(
|
|
"BBBBII", packet[4:frame_len]
|
|
)
|
|
metadata["width"] = w * 8
|
|
metadata["height"] = h * 8
|
|
metadata["datetime"] = internal_to_datetime(dt)
|
|
if data_type == 0x1FC:
|
|
metadata["frame"] = "I"
|
|
elif data_type == 0x1FD:
|
|
(length,) = struct.unpack("I", packet[4:frame_len])
|
|
metadata["frame"] = "P"
|
|
elif data_type == 0x1FA:
|
|
(media, samp_rate, length) = struct.unpack(
|
|
"BBH", packet[4:frame_len]
|
|
)
|
|
elif data_type == 0x1F9:
|
|
(media, n, length) = struct.unpack("BBH", packet[4:frame_len])
|
|
# special case of JPEG shapshots
|
|
elif data_type == 0xFFD8FFE0:
|
|
return packet
|
|
else:
|
|
raise ValueError(data_type)
|
|
if media is not None:
|
|
metadata["type"] = internal_to_type(data_type, media)
|
|
buf.extend(packet[frame_len:])
|
|
length -= len(packet) - frame_len
|
|
if length == 0:
|
|
return buf
|
|
elapsed_time = time.time() - start_time
|
|
if elapsed_time > self.timeout:
|
|
return None
|
|
|
|
async def snapshot(self, channel=0):
|
|
command = "OPSNAP"
|
|
await self.send(
|
|
self.QCODES[command],
|
|
{
|
|
"Name": command,
|
|
"SessionID": "0x%08X" % self.session,
|
|
command: {"Channel": channel},
|
|
},
|
|
wait_response=False,
|
|
)
|
|
packet = await self.reassemble_bin_payload()
|
|
return packet
|
|
|
|
async def start_monitor(self, frame_callback, user={}, stream="Main"):
|
|
params = {
|
|
"Channel": 0,
|
|
"CombinMode": "NONE",
|
|
"StreamType": stream,
|
|
"TransMode": "TCP",
|
|
}
|
|
data = await self.set_command("OPMonitor", {"Action": "Claim", "Parameter": params})
|
|
if data["Ret"] not in self.OK_CODES:
|
|
return data
|
|
|
|
await self.send(
|
|
1410,
|
|
{
|
|
"Name": "OPMonitor",
|
|
"SessionID": "0x%08X" % self.session,
|
|
"OPMonitor": {"Action": "Start", "Parameter": params},
|
|
},
|
|
wait_response=False,
|
|
)
|
|
self.monitoring = True
|
|
while self.monitoring:
|
|
meta = {}
|
|
frame = await self.reassemble_bin_payload(meta)
|
|
frame_callback(frame, meta, user)
|
|
|
|
def stop_monitor(self):
|
|
self.monitoring = False
|
|
|