Browse Source

Merge pull request #6 from OpenIPC/telnet-opener-skipcheck-installdesc

telnet_opener: enable telnet on post-2020 XM firmware via SkipCheck InstallDesc
pull/7/head
Dmitry Ilyin 3 weeks ago
committed by GitHub
parent
commit
e83683a785
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 126
      telnet_opener.py

126
telnet_opener.py

@ -14,6 +14,27 @@ import zipfile
TELNET_PORT = 4321 TELNET_PORT = 4321
ARCHIVE_URL = "https://github.com/widgetii/xmupdates/raw/main/archive" ARCHIVE_URL = "https://github.com/widgetii/xmupdates/raw/main/archive"
# Ports to probe for a listening telnet after the camera reboots.
# Port number is firmware-determined when telnet is enabled via env var
# (`telnetctrl 1`); 50119 was observed on GK7205V300 + 000699H7.
DEFAULT_TELNET_PORTS = (23, 4321, 50119)
# InstallDesc that bypasses Hardware/Vendor/Flash validation ("SkipCheck"
# magic) and flips `telnetctrl=1` via both env tool variants. Works on
# post-2020-05-07 XM firmware where a hardware-specific InstallDesc has
# its Shell commands silently dropped. The env var only takes effect on
# the next boot, so the caller must reboot the camera after upload.
ENABLETELNET_DESC = {
"UpgradeCommand": [
{"Command": "Shell", "Script": "armbenv -s telnetctrl 1"},
{"Command": "Shell", "Script": "XmEnv -s telnetctrl 1"},
],
"SupportFlashType": [{"FlashID": "SkipCheck"}],
"Hardware": "SkipCheck",
"CompatibleVersion": 2,
"Vendor": "SkipCheck",
}
""" """
Tested on XM boards: Tested on XM boards:
IPG-53H20PL-S 53H20L_S39 00002532 IPG-53H20PL-S 53H20L_S39 00002532
@ -125,53 +146,63 @@ def _read_until(sock, token, timeout):
return bytes(buf) return bytes(buf)
def enable_telnet_via_dvrip(host_ip, user="admin", password=""): def enable_telnet_via_dvrip(host_ip, user="admin", password="",
ports=DEFAULT_TELNET_PORTS, wait_timeout=180):
"""Upload SkipCheck InstallDesc that sets telnetctrl=1, then reboot
and probe `ports` until telnet opens. Returns the open port, or None.
"""
cam = DVRIPCam(host_ip, user=user, password=password) cam = DVRIPCam(host_ip, user=user, password=password)
if not cam.login(): if not cam.login():
print(f"DVRIP login failed for {host_ip}") print(f"DVRIP login failed for {host_ip}")
return False return None
upinfo = cam.get_upgrade_info()
sysinfo = cam.get_system_info() sysinfo = cam.get_system_info()
if not isinstance(upinfo, dict) or "Hardware" not in upinfo: upinfo = cam.get_upgrade_info()
print(f"Camera {host_ip} did not return upgrade info: {upinfo}") if isinstance(upinfo, dict) and "Hardware" in upinfo:
cam.close() print(f"Camera {upinfo['Hardware']}, firmware "
return False f"{sysinfo.get('SoftWareVersion', '?')}")
swver = extract_gen(sysinfo["SoftWareVersion"])
desc = {
"Hardware": upinfo["Hardware"],
"DevID": f"{swver}1001000000000000",
"CompatibleVersion": 2,
"Vendor": "General",
"CRC": "1ce6242100007636",
"UpgradeCommand": [cmd_armebenv(swver)],
}
add_flashes(desc, swver)
zipfname = "upgrade.bin" zipfname = "enabletelnet.bin"
make_zip(zipfname, json.dumps(desc, indent=2)) make_zip(zipfname, json.dumps(ENABLETELNET_DESC, indent=2))
print("Uploading SkipCheck enabletelnet InstallDesc...")
cam.upgrade(zipfname) cam.upgrade(zipfname)
cam.close() cam.close()
os.remove(zipfname) os.remove(zipfname)
print(f"Camera {host_ip} rebooting, waiting for telnet...") # `armbenv -s telnetctrl 1` only writes the env var; the telnet
for _ in range(45): # daemon is launched by an init script on next boot.
print("Rebooting camera to apply telnetctrl=1...")
cam = DVRIPCam(host_ip, user=user, password=password)
if cam.login():
cam.reboot()
cam.close()
else:
print("Could not log back in to reboot; camera may reboot itself.")
print(f"Waiting for telnet, probing {list(ports)}...")
deadline = time.monotonic() + wait_timeout
while time.monotonic() < deadline:
for p in ports:
if check_port(host_ip, p):
print(f"Telnet open on {host_ip}:{p} (login: root / xmhdipc)")
return p
time.sleep(4) time.sleep(4)
if check_port(host_ip, 23): print(f"Timed out after {wait_timeout}s; probe ports manually.")
return True return None
return False
def do_backup_via_telnet(host_ip, nfs_share, mount_point="/utils", def do_backup_via_telnet(host_ip, nfs_share, mount_point="/utils",
user="admin", password=""): user="admin", password="",
if not check_port(host_ip, 23): telnet_ports=DEFAULT_TELNET_PORTS):
print(f"Telnet (port 23) closed on {host_ip}, enabling...") telnet_port = next((p for p in telnet_ports if check_port(host_ip, p)), None)
if not enable_telnet_via_dvrip(host_ip, user, password): if telnet_port is None:
print(f"Telnet closed on {host_ip}, enabling via SkipCheck InstallDesc...")
telnet_port = enable_telnet_via_dvrip(host_ip, user, password, telnet_ports)
if telnet_port is None:
print(f"Could not enable telnet on {host_ip}") print(f"Could not enable telnet on {host_ip}")
return False return False
print(f"Connecting to {host_ip}:23 as root/xmhdipc") print(f"Connecting to {host_ip}:{telnet_port} as root/xmhdipc")
s = socket.create_connection((host_ip, 23), timeout=10) s = socket.create_connection((host_ip, telnet_port), timeout=10)
_read_until(s, b"login:", 5) _read_until(s, b"login:", 5)
s.sendall(b"root\n") s.sendall(b"root\n")
_read_until(s, b"assword:", 5) _read_until(s, b"assword:", 5)
@ -243,6 +274,7 @@ def open_telnet(host_ip, port, **kwargs):
nfs_share = kwargs.get("nfs") nfs_share = kwargs.get("nfs")
user = kwargs.get("username", "admin") user = kwargs.get("username", "admin")
password = kwargs.get("password", "") password = kwargs.get("password", "")
ports = kwargs.get("ports") or DEFAULT_TELNET_PORTS
if make_backup: if make_backup:
if not nfs_share: if not nfs_share:
@ -250,7 +282,12 @@ def open_telnet(host_ip, port, **kwargs):
"(NFS share with ipctool, where the backup will be written)") "(NFS share with ipctool, where the backup will be written)")
return False return False
return do_backup_via_telnet(host_ip, nfs_share, return do_backup_via_telnet(host_ip, nfs_share,
user=user, password=password) user=user, password=password,
telnet_ports=ports)
if not make_telnet:
return enable_telnet_via_dvrip(host_ip, user=user, password=password,
ports=ports) is not None
cam = DVRIPCam(host_ip, user=user, password=password) cam = DVRIPCam(host_ip, user=user, password=password)
if not cam.login(): if not cam.login():
@ -273,13 +310,8 @@ def open_telnet(host_ip, port, **kwargs):
"CompatibleVersion": 2, "CompatibleVersion": 2,
"Vendor": "General", "Vendor": "General",
"CRC": "1ce6242100007636", "CRC": "1ce6242100007636",
"UpgradeCommand": [cmd_telnetd(port)],
} }
upcmd = []
if make_telnet:
upcmd.append(cmd_telnetd(port))
else:
upcmd.append(cmd_armebenv(swver))
desc["UpgradeCommand"] = upcmd
add_flashes(desc, swver) add_flashes(desc, swver)
zipfname = "upgrade.bin" zipfname = "upgrade.bin"
@ -288,15 +320,10 @@ def open_telnet(host_ip, port, **kwargs):
cam.close() cam.close()
os.remove(zipfname) os.remove(zipfname)
if not make_telnet:
port = 23
print("Waiting for camera is rebooting...")
for i in range(10): for i in range(10):
time.sleep(4) time.sleep(4)
if check_port(host_ip, port): if check_port(host_ip, port):
tport = f" {port}" if port != 23 else "" print(f"Now use 'telnet {host_ip} {port}' to login")
print(f"Now use 'telnet {host_ip}{tport}' to login")
return return
print("Something went wrong") print("Something went wrong")
@ -331,8 +358,17 @@ def main():
help="NFS share for --backup, e.g. 10.0.0.1:/srv/ipctool. " help="NFS share for --backup, e.g. 10.0.0.1:/srv/ipctool. "
"Must contain the ipctool binary; backup-<MAC> is written here.", "Must contain the ipctool binary; backup-<MAC> is written here.",
) )
parser.add_argument(
"--ports",
default=",".join(str(p) for p in DEFAULT_TELNET_PORTS),
help="Comma-separated telnet ports to probe after reboot "
"(post-reboot port is firmware-determined). "
f"Default {','.join(str(p) for p in DEFAULT_TELNET_PORTS)}.",
)
args = parser.parse_args() args = parser.parse_args()
return open_telnet(args.hostname, TELNET_PORT, **vars(args)) kwargs = vars(args)
kwargs["ports"] = tuple(int(p) for p in args.ports.split(","))
return open_telnet(args.hostname, TELNET_PORT, **kwargs)
if __name__ == "__main__": if __name__ == "__main__":

Loading…
Cancel
Save