mirror of https://github.com/OpenIPC/python-dvr
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
376 lines
12 KiB
376 lines
12 KiB
#!/usr/bin/env python3
|
|
|
|
from dvrip import DVRIPCam
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import time
|
|
import requests
|
|
import zipfile
|
|
|
|
TELNET_PORT = 4321
|
|
ARCHIVE_URL = "https://github.com/widgetii/xmupdates/raw/main/archive"
|
|
|
|
# Ports to probe for a listening telnet after the camera reboots.
|
|
# Port number is firmware-determined when telnet is enabled via env var
|
|
# (`telnetctrl 1`); 50119 was observed on GK7205V300 + 000699H7.
|
|
DEFAULT_TELNET_PORTS = (23, 4321, 50119)
|
|
|
|
# InstallDesc that bypasses Hardware/Vendor/Flash validation ("SkipCheck"
|
|
# magic) and flips `telnetctrl=1` via both env tool variants. Works on
|
|
# post-2020-05-07 XM firmware where a hardware-specific InstallDesc has
|
|
# its Shell commands silently dropped. The env var only takes effect on
|
|
# the next boot, so the caller must reboot the camera after upload.
|
|
ENABLETELNET_DESC = {
|
|
"UpgradeCommand": [
|
|
{"Command": "Shell", "Script": "armbenv -s telnetctrl 1"},
|
|
{"Command": "Shell", "Script": "XmEnv -s telnetctrl 1"},
|
|
],
|
|
"SupportFlashType": [{"FlashID": "SkipCheck"}],
|
|
"Hardware": "SkipCheck",
|
|
"CompatibleVersion": 2,
|
|
"Vendor": "SkipCheck",
|
|
}
|
|
|
|
"""
|
|
Tested on XM boards:
|
|
IPG-53H20PL-S 53H20L_S39 00002532
|
|
IPG-80H20PS-S 50H20L 00022520
|
|
IVG-85HF20PYA-S HI3516EV200_50H20AI_S38 000559A7
|
|
IVG-85HG50PYA-S HI3516EV300_85H50AI 000529B2
|
|
|
|
Issues with: "armbenv: can't load library 'libdvr.so'"
|
|
IPG-50HV20PES-S 50H20L_18EV200_S38 00018520
|
|
"""
|
|
|
|
# downgrade archive (mainly Yandex.Disk)
|
|
# https://www.cctvsp.ru/articles/obnovlenie-proshivok-dlya-ip-kamer-ot-xiong-mai
|
|
|
|
XMV4 = {
|
|
"envtool": "XmEnv",
|
|
"flashes": [
|
|
"0x00EF4017",
|
|
"0x00EF4018",
|
|
"0x00C22017",
|
|
"0x00C22018",
|
|
"0x00C22019",
|
|
"0x00C84017",
|
|
"0x00C84018",
|
|
"0x001C7017",
|
|
"0x001C7018",
|
|
"0x00207017",
|
|
"0x00207018",
|
|
"0x000B4017",
|
|
"0x000B4018",
|
|
],
|
|
}
|
|
|
|
|
|
def down(template, filename):
|
|
t = template.copy()
|
|
t['downgrade'] = filename
|
|
return t
|
|
|
|
|
|
# Borrowed from InstallDesc
|
|
conf = {
|
|
"000559A7": down(XMV4, "General_IPC_HI3516EV200_50H20AI_S38.Nat.dss.OnvifS.HIK_V5.00.R02.20200507_all.bin"),
|
|
"000529B2": down(XMV4, "General_IPC_HI3516EV300_85H50AI_Nat_dss_OnvifS_HIK_V5_00_R02_20200507.bin"),
|
|
"000529E9": down(XMV4, "hacked_from_HI3516EV300_85H50AI.bin"),
|
|
}
|
|
|
|
|
|
def add_flashes(desc, swver):
|
|
board = conf.get(swver, XMV4)
|
|
desc["SupportFlashType"] = [{"FlashID": fid} for fid in board["flashes"]]
|
|
|
|
|
|
def get_envtool(swver):
|
|
board = conf.get(swver)
|
|
if board is None:
|
|
return "armbenv"
|
|
|
|
return board["envtool"]
|
|
|
|
|
|
def make_zip(filename, data):
|
|
zipf = zipfile.ZipFile(filename, "w", zipfile.ZIP_DEFLATED)
|
|
zipf.writestr("InstallDesc", data)
|
|
zipf.close()
|
|
|
|
|
|
def check_port(host_ip, port):
|
|
a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
result_of_check = a_socket.connect_ex((host_ip, port))
|
|
return result_of_check == 0
|
|
|
|
|
|
def extract_gen(swver):
|
|
return swver.split(".")[3]
|
|
|
|
|
|
def cmd_armebenv(swver):
|
|
envtool = get_envtool(swver)
|
|
return {
|
|
"Command": "Shell",
|
|
"Script": f"{envtool} -s xmuart 0; {envtool} -s telnetctrl 1",
|
|
}
|
|
|
|
|
|
def cmd_telnetd(port):
|
|
return {
|
|
"Command": "Shell",
|
|
"Script": f"busybox telnetd -F -p {port} -l /bin/sh",
|
|
}
|
|
|
|
|
|
def _read_until(sock, token, timeout):
|
|
deadline = time.monotonic() + timeout
|
|
buf = bytearray()
|
|
sock.settimeout(0.5)
|
|
while time.monotonic() < deadline:
|
|
try:
|
|
chunk = sock.recv(4096)
|
|
except socket.timeout:
|
|
if token in buf:
|
|
break
|
|
continue
|
|
if not chunk:
|
|
break
|
|
buf.extend(chunk)
|
|
if token in buf:
|
|
break
|
|
return bytes(buf)
|
|
|
|
|
|
def enable_telnet_via_dvrip(host_ip, user="admin", password="",
|
|
ports=DEFAULT_TELNET_PORTS, wait_timeout=180):
|
|
"""Upload SkipCheck InstallDesc that sets telnetctrl=1, then reboot
|
|
and probe `ports` until telnet opens. Returns the open port, or None.
|
|
"""
|
|
cam = DVRIPCam(host_ip, user=user, password=password)
|
|
if not cam.login():
|
|
print(f"DVRIP login failed for {host_ip}")
|
|
return None
|
|
sysinfo = cam.get_system_info()
|
|
upinfo = cam.get_upgrade_info()
|
|
if isinstance(upinfo, dict) and "Hardware" in upinfo:
|
|
print(f"Camera {upinfo['Hardware']}, firmware "
|
|
f"{sysinfo.get('SoftWareVersion', '?')}")
|
|
|
|
zipfname = "enabletelnet.bin"
|
|
make_zip(zipfname, json.dumps(ENABLETELNET_DESC, indent=2))
|
|
print("Uploading SkipCheck enabletelnet InstallDesc...")
|
|
cam.upgrade(zipfname)
|
|
cam.close()
|
|
os.remove(zipfname)
|
|
|
|
# `armbenv -s telnetctrl 1` only writes the env var; the telnet
|
|
# daemon is launched by an init script on next boot.
|
|
print("Rebooting camera to apply telnetctrl=1...")
|
|
cam = DVRIPCam(host_ip, user=user, password=password)
|
|
if cam.login():
|
|
cam.reboot()
|
|
cam.close()
|
|
else:
|
|
print("Could not log back in to reboot; camera may reboot itself.")
|
|
|
|
print(f"Waiting for telnet, probing {list(ports)}...")
|
|
deadline = time.monotonic() + wait_timeout
|
|
while time.monotonic() < deadline:
|
|
for p in ports:
|
|
if check_port(host_ip, p):
|
|
print(f"Telnet open on {host_ip}:{p} (login: root / xmhdipc)")
|
|
return p
|
|
time.sleep(4)
|
|
print(f"Timed out after {wait_timeout}s; probe ports manually.")
|
|
return None
|
|
|
|
|
|
def do_backup_via_telnet(host_ip, nfs_share, mount_point="/utils",
|
|
user="admin", password="",
|
|
telnet_ports=DEFAULT_TELNET_PORTS):
|
|
telnet_port = next((p for p in telnet_ports if check_port(host_ip, p)), None)
|
|
if telnet_port is None:
|
|
print(f"Telnet closed on {host_ip}, enabling via SkipCheck InstallDesc...")
|
|
telnet_port = enable_telnet_via_dvrip(host_ip, user, password, telnet_ports)
|
|
if telnet_port is None:
|
|
print(f"Could not enable telnet on {host_ip}")
|
|
return False
|
|
|
|
print(f"Connecting to {host_ip}:{telnet_port} as root/xmhdipc")
|
|
s = socket.create_connection((host_ip, telnet_port), timeout=10)
|
|
_read_until(s, b"login:", 5)
|
|
s.sendall(b"root\n")
|
|
_read_until(s, b"assword:", 5)
|
|
s.sendall(b"xmhdipc\n")
|
|
_read_until(s, b"# ", 5)
|
|
|
|
s.sendall(f"mkdir -p {mount_point}\n".encode())
|
|
_read_until(s, b"# ", 5)
|
|
s.sendall(f"mount -o nolock {nfs_share} {mount_point}\n".encode())
|
|
out = _read_until(s, b"# ", 10).decode(errors="replace")
|
|
print(out.strip())
|
|
|
|
s.sendall(b"cat /sys/class/net/eth0/address\n")
|
|
out = _read_until(s, b"# ", 5).decode(errors="replace")
|
|
m = re.search(r"([0-9a-f]{2}(?::[0-9a-f]{2}){5})", out.lower())
|
|
mac = m.group(1) if m else "unknown"
|
|
|
|
backup_path = f"{mount_point}/backup-{mac}"
|
|
print(f"Running ipctool backup -> {backup_path}")
|
|
s.sendall(f"{mount_point}/ipctool backup {backup_path}\n".encode())
|
|
out = _read_until(s, b"# ", 120).decode(errors="replace")
|
|
print(out.strip())
|
|
|
|
s.sendall(f"umount {mount_point}\n".encode())
|
|
_read_until(s, b"# ", 5)
|
|
s.sendall(b"exit\n")
|
|
s.close()
|
|
print(f"Done. Backup file is at {nfs_share.rstrip('/')}/backup-{mac} "
|
|
"on your NFS server.")
|
|
return True
|
|
|
|
|
|
def downgrade_old_version(cam, buildtime, swver):
|
|
milestone = datetime.date(2020, 5, 7)
|
|
dto = datetime.datetime.strptime(buildtime, "%Y-%m-%d %H:%M:%S")
|
|
if dto.date() > milestone:
|
|
print(
|
|
f"Current firmware date {dto.date()}, but it needs to be no more than"
|
|
f" {milestone}\nConsider downgrade and only then continue.\n\n"
|
|
)
|
|
a = input("Are you sure to overwrite current firmware without backup (y/n)? ")
|
|
if a == "y":
|
|
board = conf.get(swver)
|
|
if board is None:
|
|
print(f"{swver} firmware is not supported yet")
|
|
return False
|
|
|
|
print("DOWNGRADING\n")
|
|
url = f"{ARCHIVE_URL}/{swver}/{board['downgrade']}"
|
|
print(f"Downloading {url}")
|
|
r = requests.get(url, allow_redirects=True)
|
|
if r.status_code != requests.codes.ok:
|
|
print("Something went wrong")
|
|
return False
|
|
|
|
open('upgrade.bin', 'wb').write(r.content)
|
|
print(f"Upgrading...")
|
|
cam.upgrade('upgrade.bin')
|
|
print("Completed. Wait a minute and then rerun")
|
|
return False
|
|
|
|
return False
|
|
return True
|
|
|
|
|
|
def open_telnet(host_ip, port, **kwargs):
|
|
make_telnet = kwargs.get("telnet", False)
|
|
make_backup = kwargs.get("backup", False)
|
|
nfs_share = kwargs.get("nfs")
|
|
user = kwargs.get("username", "admin")
|
|
password = kwargs.get("password", "")
|
|
ports = kwargs.get("ports") or DEFAULT_TELNET_PORTS
|
|
|
|
if make_backup:
|
|
if not nfs_share:
|
|
print("--backup requires --nfs HOST:/exported/path "
|
|
"(NFS share with ipctool, where the backup will be written)")
|
|
return False
|
|
return do_backup_via_telnet(host_ip, nfs_share,
|
|
user=user, password=password,
|
|
telnet_ports=ports)
|
|
|
|
if not make_telnet:
|
|
return enable_telnet_via_dvrip(host_ip, user=user, password=password,
|
|
ports=ports) is not None
|
|
|
|
cam = DVRIPCam(host_ip, user=user, password=password)
|
|
if not cam.login():
|
|
print(f"Cannot connect {host_ip}")
|
|
return
|
|
upinfo = cam.get_upgrade_info()
|
|
hw = upinfo["Hardware"]
|
|
sysinfo = cam.get_system_info()
|
|
swver = extract_gen(sysinfo["SoftWareVersion"])
|
|
print(f"Modifying camera {hw}, firmware {swver}")
|
|
if not downgrade_old_version(cam, sysinfo["BuildTime"], swver):
|
|
cam.close()
|
|
return
|
|
|
|
print(f"Firmware generation {swver}")
|
|
|
|
desc = {
|
|
"Hardware": hw,
|
|
"DevID": f"{swver}1001000000000000",
|
|
"CompatibleVersion": 2,
|
|
"Vendor": "General",
|
|
"CRC": "1ce6242100007636",
|
|
"UpgradeCommand": [cmd_telnetd(port)],
|
|
}
|
|
add_flashes(desc, swver)
|
|
|
|
zipfname = "upgrade.bin"
|
|
make_zip(zipfname, json.dumps(desc, indent=2))
|
|
cam.upgrade(zipfname)
|
|
cam.close()
|
|
os.remove(zipfname)
|
|
|
|
for i in range(10):
|
|
time.sleep(4)
|
|
if check_port(host_ip, port):
|
|
print(f"Now use 'telnet {host_ip} {port}' to login")
|
|
return
|
|
|
|
print("Something went wrong")
|
|
return
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("hostname", help="Camera IP address or hostname")
|
|
parser.add_argument(
|
|
"-u", "--username", default="admin", help="Username for camera login"
|
|
)
|
|
parser.add_argument(
|
|
"-p", "--password", default="", help="Password for camera login"
|
|
)
|
|
parser.add_argument(
|
|
"-b",
|
|
"--backup",
|
|
action="store_true",
|
|
help="Mount NFS, run ipctool backup via telnet. Enables telnet "
|
|
"first if it's not already on (camera will reboot once). "
|
|
"Requires --nfs.",
|
|
)
|
|
parser.add_argument(
|
|
"-t",
|
|
"--telnet",
|
|
action="store_true",
|
|
help="Open telnet port without rebooting camera",
|
|
)
|
|
parser.add_argument(
|
|
"--nfs",
|
|
help="NFS share for --backup, e.g. 10.0.0.1:/srv/ipctool. "
|
|
"Must contain the ipctool binary; backup-<MAC> is written here.",
|
|
)
|
|
parser.add_argument(
|
|
"--ports",
|
|
default=",".join(str(p) for p in DEFAULT_TELNET_PORTS),
|
|
help="Comma-separated telnet ports to probe after reboot "
|
|
"(post-reboot port is firmware-determined). "
|
|
f"Default {','.join(str(p) for p in DEFAULT_TELNET_PORTS)}.",
|
|
)
|
|
args = parser.parse_args()
|
|
kwargs = vars(args)
|
|
kwargs["ports"] = tuple(int(p) for p in args.ports.split(","))
|
|
return open_telnet(args.hostname, TELNET_PORT, **kwargs)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
sys.exit(0 if main() is not False else 1)
|
|
|