diff --git a/telnet_opener.py b/telnet_opener.py index b95b4c6..7e9d6d6 100755 --- a/telnet_opener.py +++ b/telnet_opener.py @@ -14,6 +14,27 @@ import zipfile TELNET_PORT = 4321 ARCHIVE_URL = "https://github.com/widgetii/xmupdates/raw/main/archive" +# Ports to probe for a listening telnet after the camera reboots. +# Port number is firmware-determined when telnet is enabled via env var +# (`telnetctrl 1`); 50119 was observed on GK7205V300 + 000699H7. +DEFAULT_TELNET_PORTS = (23, 4321, 50119) + +# InstallDesc that bypasses Hardware/Vendor/Flash validation ("SkipCheck" +# magic) and flips `telnetctrl=1` via both env tool variants. Works on +# post-2020-05-07 XM firmware where a hardware-specific InstallDesc has +# its Shell commands silently dropped. The env var only takes effect on +# the next boot, so the caller must reboot the camera after upload. +ENABLETELNET_DESC = { + "UpgradeCommand": [ + {"Command": "Shell", "Script": "armbenv -s telnetctrl 1"}, + {"Command": "Shell", "Script": "XmEnv -s telnetctrl 1"}, + ], + "SupportFlashType": [{"FlashID": "SkipCheck"}], + "Hardware": "SkipCheck", + "CompatibleVersion": 2, + "Vendor": "SkipCheck", +} + """ Tested on XM boards: IPG-53H20PL-S 53H20L_S39 00002532 @@ -125,53 +146,63 @@ def _read_until(sock, token, timeout): return bytes(buf) -def enable_telnet_via_dvrip(host_ip, user="admin", password=""): +def enable_telnet_via_dvrip(host_ip, user="admin", password="", + ports=DEFAULT_TELNET_PORTS, wait_timeout=180): + """Upload SkipCheck InstallDesc that sets telnetctrl=1, then reboot + and probe `ports` until telnet opens. Returns the open port, or None. + """ cam = DVRIPCam(host_ip, user=user, password=password) if not cam.login(): print(f"DVRIP login failed for {host_ip}") - return False - upinfo = cam.get_upgrade_info() + return None sysinfo = cam.get_system_info() - if not isinstance(upinfo, dict) or "Hardware" not in upinfo: - print(f"Camera {host_ip} did not return upgrade info: {upinfo}") - cam.close() - return False - swver = extract_gen(sysinfo["SoftWareVersion"]) - - desc = { - "Hardware": upinfo["Hardware"], - "DevID": f"{swver}1001000000000000", - "CompatibleVersion": 2, - "Vendor": "General", - "CRC": "1ce6242100007636", - "UpgradeCommand": [cmd_armebenv(swver)], - } - add_flashes(desc, swver) + upinfo = cam.get_upgrade_info() + if isinstance(upinfo, dict) and "Hardware" in upinfo: + print(f"Camera {upinfo['Hardware']}, firmware " + f"{sysinfo.get('SoftWareVersion', '?')}") - zipfname = "upgrade.bin" - make_zip(zipfname, json.dumps(desc, indent=2)) + zipfname = "enabletelnet.bin" + make_zip(zipfname, json.dumps(ENABLETELNET_DESC, indent=2)) + print("Uploading SkipCheck enabletelnet InstallDesc...") cam.upgrade(zipfname) cam.close() os.remove(zipfname) - print(f"Camera {host_ip} rebooting, waiting for telnet...") - for _ in range(45): + # `armbenv -s telnetctrl 1` only writes the env var; the telnet + # daemon is launched by an init script on next boot. + print("Rebooting camera to apply telnetctrl=1...") + cam = DVRIPCam(host_ip, user=user, password=password) + if cam.login(): + cam.reboot() + cam.close() + else: + print("Could not log back in to reboot; camera may reboot itself.") + + print(f"Waiting for telnet, probing {list(ports)}...") + deadline = time.monotonic() + wait_timeout + while time.monotonic() < deadline: + for p in ports: + if check_port(host_ip, p): + print(f"Telnet open on {host_ip}:{p} (login: root / xmhdipc)") + return p time.sleep(4) - if check_port(host_ip, 23): - return True - return False + print(f"Timed out after {wait_timeout}s; probe ports manually.") + return None def do_backup_via_telnet(host_ip, nfs_share, mount_point="/utils", - user="admin", password=""): - if not check_port(host_ip, 23): - print(f"Telnet (port 23) closed on {host_ip}, enabling...") - if not enable_telnet_via_dvrip(host_ip, user, password): + user="admin", password="", + telnet_ports=DEFAULT_TELNET_PORTS): + telnet_port = next((p for p in telnet_ports if check_port(host_ip, p)), None) + if telnet_port is None: + print(f"Telnet closed on {host_ip}, enabling via SkipCheck InstallDesc...") + telnet_port = enable_telnet_via_dvrip(host_ip, user, password, telnet_ports) + if telnet_port is None: print(f"Could not enable telnet on {host_ip}") return False - print(f"Connecting to {host_ip}:23 as root/xmhdipc") - s = socket.create_connection((host_ip, 23), timeout=10) + print(f"Connecting to {host_ip}:{telnet_port} as root/xmhdipc") + s = socket.create_connection((host_ip, telnet_port), timeout=10) _read_until(s, b"login:", 5) s.sendall(b"root\n") _read_until(s, b"assword:", 5) @@ -243,6 +274,7 @@ def open_telnet(host_ip, port, **kwargs): nfs_share = kwargs.get("nfs") user = kwargs.get("username", "admin") password = kwargs.get("password", "") + ports = kwargs.get("ports") or DEFAULT_TELNET_PORTS if make_backup: if not nfs_share: @@ -250,7 +282,12 @@ def open_telnet(host_ip, port, **kwargs): "(NFS share with ipctool, where the backup will be written)") return False return do_backup_via_telnet(host_ip, nfs_share, - user=user, password=password) + user=user, password=password, + telnet_ports=ports) + + if not make_telnet: + return enable_telnet_via_dvrip(host_ip, user=user, password=password, + ports=ports) is not None cam = DVRIPCam(host_ip, user=user, password=password) if not cam.login(): @@ -273,13 +310,8 @@ def open_telnet(host_ip, port, **kwargs): "CompatibleVersion": 2, "Vendor": "General", "CRC": "1ce6242100007636", + "UpgradeCommand": [cmd_telnetd(port)], } - upcmd = [] - if make_telnet: - upcmd.append(cmd_telnetd(port)) - else: - upcmd.append(cmd_armebenv(swver)) - desc["UpgradeCommand"] = upcmd add_flashes(desc, swver) zipfname = "upgrade.bin" @@ -288,15 +320,10 @@ def open_telnet(host_ip, port, **kwargs): cam.close() os.remove(zipfname) - if not make_telnet: - port = 23 - print("Waiting for camera is rebooting...") - for i in range(10): time.sleep(4) if check_port(host_ip, port): - tport = f" {port}" if port != 23 else "" - print(f"Now use 'telnet {host_ip}{tport}' to login") + print(f"Now use 'telnet {host_ip} {port}' to login") return print("Something went wrong") @@ -331,8 +358,17 @@ def main(): help="NFS share for --backup, e.g. 10.0.0.1:/srv/ipctool. " "Must contain the ipctool binary; backup- is written here.", ) + parser.add_argument( + "--ports", + default=",".join(str(p) for p in DEFAULT_TELNET_PORTS), + help="Comma-separated telnet ports to probe after reboot " + "(post-reboot port is firmware-determined). " + f"Default {','.join(str(p) for p in DEFAULT_TELNET_PORTS)}.", + ) args = parser.parse_args() - return open_telnet(args.hostname, TELNET_PORT, **vars(args)) + kwargs = vars(args) + kwargs["ports"] = tuple(int(p) for p in args.ports.split(",")) + return open_telnet(args.hostname, TELNET_PORT, **kwargs) if __name__ == "__main__":