From 8c6c2eb5461966771cb2acc27eaabf609532eae0 Mon Sep 17 00:00:00 2001 From: Rossen Georgiev Date: Sun, 28 Apr 2019 22:00:05 +0100 Subject: [PATCH] improvements + CDNDepotManifest + CDNDepotFile * fixed VZ decompressoin and add some checks * add get_manifest() which returns a list of CDNDepotManifest for app_id and branch * add iter_files() which will get all manifests, and iter files for app_id and branch * add get_manifest_for_workshop_item() can be used to download steampiped hosted workshop items * add basic caching inside CDNClient * CDNDepotManifest, which enhances DepotManifest and inits CDNDepotFiles * CDNDepotFile expands DepotFile to a file like object that can be used to directly seek and read files from steampipe --- steam/client/builtins/apps.py | 4 +- steam/client/cdn.py | 415 ++++++++++++++++++++++++++++------ steam/core/manifest.py | 52 ++++- 3 files changed, 396 insertions(+), 75 deletions(-) diff --git a/steam/client/builtins/apps.py b/steam/client/builtins/apps.py index 40ad9eb..bcd39b5 100644 --- a/steam/client/builtins/apps.py +++ b/steam/client/builtins/apps.py @@ -92,10 +92,8 @@ class Apps(object): data = dict(apps={}, packages={}) while True: - chunk = self.wait_event(job_id, timeout=timeout) + chunk = self.wait_event(job_id, timeout=timeout, raises=True) - if chunk is None: - return chunk = chunk[0].body for app in chunk.apps: diff --git a/steam/client/cdn.py b/steam/client/cdn.py index 22aa1ce..16e3a44 100644 --- a/steam/client/cdn.py +++ b/steam/client/cdn.py @@ -2,14 +2,19 @@ from zipfile import ZipFile from io import BytesIO from collections import OrderedDict, deque -from six import itervalues -import vdf +from six import itervalues, iteritems +from binascii import crc32 +from datetime import datetime +import logging +import struct +import vdf from steam import webapi from steam.enums import EResult, EServerType from steam.util.web import make_requests_session from steam.core.crypto import symmetric_decrypt -from steam.core.manifest import DepotManifest +from steam.core.manifest import DepotManifest, DepotFile +from steam.protobufs.content_manifest_pb2 import ContentManifestPayload try: import lzma @@ -17,7 +22,7 @@ except ImportError: from backports import lzma -def get_content_servers_from_cs(host, port, cell_id, num_servers=20, session=None): +def get_content_servers_from_cs(cell_id, host='cs.steamcontent.com', port=80, num_servers=20, session=None): proto = 'https' if port == 443 else 'http' url = '%s://%s:%s/serverlist/%s/%s/' % (proto, host, port, cell_id, num_servers) @@ -70,33 +75,70 @@ def get_content_servers_from_webapi(cell_id, num_servers=20): return servers +class ContentServer(object): + https = False + host = None + vhost = None + port = None + type = None + cell_id = 0 + load = None + weighted_load = None + + def __repr__(self): + return "<%s('%s://%s:%s', type=%s, cell_id=%s)>" % ( + self.__class__.__name__, + 'https' if self.https else 'http', + self.host, + self.port, + repr(self.type), + repr(self.cell_id), + ) + + class CDNClient(object): - def __init__(self, client, app_id): + _LOG = logging.getLogger("CDNClient") + servers = deque() + + def __init__(self, client): self.steam = client - self.app_id = app_id self.web = make_requests_session() - self.servers = deque() self.cdn_auth_tokens = {} self.depot_keys = {} + self.workshop_depots = {} + self.manifests = {} + self.app_depots = {} + + if not self.servers: + self.fetch_content_servers() @property def cell_id(self): return self.steam.cell_id - def init_servers(self, num_servers=20): + def fetch_content_servers(self, num_servers=20): self.servers.clear() - for ip, port in self.steam.servers[EServerType.CS]: - servers = get_content_servers_from_cs(ip, port, self.cell_id, num_servers, self.web) + if self.steam: + for ip, port in self.steam.servers.get(EServerType.CS, []): + servers = get_content_servers_from_cs(self.cell_id, ip, port, num_servers, self.web) - if servers: - self.servers.extend(servers) - break + if servers: + self.servers.extend(servers) + break + else: + self._LOG.debug("No content servers available on SteamClient instance") if not self.servers: - raise RuntimeError("No content servers on SteamClient instance. Is it logged in?") + self._LOG.debug("Trying to fetch content servers from Steam API") + + servers = get_content_servers_from_webapi(self.cell_id) + self.servers.extend(servers) - def get_content_server(self, rotate=True): + if not self.servers: + raise ValueError("Failed to fetch content servers") + + def get_content_server(self, rotate=False): if rotate: self.servers.rotate(-1) return self.servers[0] @@ -105,26 +147,25 @@ class CDNClient(object): if depot_id not in self.cdn_auth_tokens: msg = self.steam.get_cdn_auth_token(depot_id, 'steampipe.steamcontent.com') - if msg.eresult == EResult.OK: + if msg and msg.eresult == EResult.OK: self.cdn_auth_tokens[depot_id] = msg.token - elif msg is None: - raise Exception("Failed getting depot key: %s" % repr(EResult.Timeout)) else: - raise Exception("Failed getting depot key: %s" % repr(EResult(msg.eresult))) + raise ValueError("Failed getting depot key: %s" % repr( + EResult.Timeout if msg is None else EResult(msg.eresult))) return self.cdn_auth_tokens[depot_id] - def get_depot_key(self, depot_id): - if depot_id not in self.depot_keys: - msg = self.steam.get_depot_key(self.app_id, depot_id) - if msg.eresult == EResult.OK: - self.depot_keys[depot_id] = msg.depot_encryption_key - elif msg is None: - raise Exception("Failed getting depot key: %s" % repr(EResult.Timeout)) + def get_depot_key(self, app_id, depot_id): + if (app_id, depot_id) not in self.depot_keys: + msg = self.steam.get_depot_key(app_id, depot_id) + + if msg and msg.eresult == EResult.OK: + self.depot_keys[(app_id, depot_id)] = msg.depot_encryption_key else: - raise Exception("Failed getting depot key: %s" % repr(EResult(msg.eresult))) + raise ValueError("Failed getting depot key: %s" % repr( + EResult.Timeout if msg is None else EResult(msg.eresult))) - return self.depot_keys[depot_id] + return self.depot_keys[(app_id, depot_id)] def get(self, command, args, auth_token=''): server = self.get_content_server() @@ -138,60 +179,308 @@ class CDNClient(object): args, auth_token, ) - resp = self.web.get(url) - if resp.ok: - return resp - elif resp.status_code in (401, 403, 404): - resp.raise_for_status() + try: + resp = self.web.get(url) + except: + pass + else: + if resp.ok: + return resp + elif resp.status_code in (401, 403, 404): + resp.raise_for_status() server = self.get_content_server(rotate=True) - def get_manifest(self, depot_id, manifest_id, cdn_auth_token=None, decrypt=True): + def get_chunk(self, app_id, depot_id, chunk_id, cdn_auth_token=None): if cdn_auth_token is None: cdn_auth_token = self.get_cdn_auth_token(depot_id) - resp = self.get('depot', '%s/manifest/%s/5' % (depot_id, manifest_id), cdn_auth_token) + resp = self.get('depot', '%s/chunk/%s' % (depot_id, chunk_id), cdn_auth_token) if resp.ok: - manifest = DepotManifest(resp.content) - if decrypt: - manifest.decrypt_filenames(self.get_depot_key(depot_id)) - return manifest + data = symmetric_decrypt(resp.content, self.get_depot_key(app_id, depot_id)) - def get_chunk(self, depot_id, chunk_id, cdn_auth_token=None): - if cdn_auth_token is None: - cdn_auth_token = self.get_cdn_auth_token(depot_id) + if data[:2] == b'VZ': + if data[2:3] != b'a': + raise ValueError("Invalid VZ version: %s" % repr(data[2:3])) - resp = self.get('depot', '%s/chunk/%s' % (depot_id, chunk_id), cdn_auth_token) + vzfilter = lzma._decode_filter_properties(lzma.FILTER_LZMA1, data[7:12]) + vzdec = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[vzfilter]) + checksum, decompressed_size, vz_footer = struct.unpack(' 0: + raise NotImplementedError("Password protected branches are not supported yet") + + manifests = [] + + for depot_id, depot_config in iteritems(depots): + if not depot_id.isdigit(): + continue + + depot_id = int(depot_id) + + if branch in depot_config.get('manifests', {}): + try: + manifest = self.get_manifest(app_id, depot_id, depot_config['manifests'][branch]) + except ValueError as exp: + self._LOG.error("Depot %s (%s): %s", + repr(depot_config['name']), + depot_id, + str(exp), + ) + continue + + manifest.name = depot_config['name'] + manifests.append(manifest) + + return manifests + + def iter_files(self, app_id, filename_filter=None, branch='public'): + for manifest in self.get_manifests(app_id, branch): + for fp in manifest.iter_files(filename_filter): + yield fp + + def get_manifest_for_workshop_item(self, item_id): + resp, error = self.steam.unified_messages.send_and_wait('PublishedFile.GetDetails#1', { + 'publishedfileids': [item_id], + 'includetags': False, + 'includeadditionalpreviews': False, + 'includechildren': False, + 'includekvtags': False, + 'includevotes': False, + 'short_description': True, + 'includeforsaledata': False, + 'includemetadata': False, + 'language': 0 + }, timeout=7) + + if error: + raise error + + wf = None if resp is None else resp.publishedfiledetails[0] + + if wf is None or wf.result != EResult.OK: + raise ValueError("Failed getting workshop file info: %s" % repr( + EResult.Timeout if resp is None else EResult(wf.result))) + elif not wf.hcontent_file: + raise ValueError("Workshop file is not on steampipe") + + app_id = wf.consumer_appid + + ws_app_id = self.workshop_depots.get(app_id) + + if ws_app_id is None: + ws_app_id = int(self.steam.get_product_info([app_id])['apps'][app_id]['depots'].get( + 'workshopdepot', app_id)) + self.workshop_depots[app_id] = ws_app_id + + manifest = self.get_manifest(app_id, ws_app_id, wf.hcontent_file) + manifest.name = wf.title + return manifest + + +class CDNDepotManifest(DepotManifest): + name = None #: set only by :meth:`CDNClient.get_manifests` + + def __init__(self, cdn_client, app_id, data): + self.cdn_client = cdn_client + self.app_id = app_id + DepotManifest.__init__(self, data) def __repr__(self): - return "<%s('%s://%s:%s', type=%s, cell_id=%s)>" % ( + params = ', '.join([ + "app_id=" + str(self.app_id), + "depot_id=" + str(self.depot_id), + "gid=" + str(self.gid), + "creation_time=" + repr( + datetime.utcfromtimestamp(self.metadata.creation_time).isoformat().replace('T', ' ') + ), + ]) + + if self.name: + params = repr(self.name) + ', ' + params + + if self.metadata.filenames_encrypted: + params += ', filenames_encrypted=True' + + return "<%s(%s)>" % ( self.__class__.__name__, - 'https' if self.https else 'http', - self.host, - self.port, - repr(self.type), - repr(self.cell_id), + params, ) + + def deserialize(self, data): + DepotManifest.deserialize(self, data) + + # order chunks in ascending order by their offset + # required for CDNDepotFile + for mapping in self.payload.mappings: + mapping.chunks.sort(key=lambda x: x.offset) + + def _make_depot_file(self, file_mapping): + return CDNDepotFile(self, file_mapping) + + +class CDNDepotFile(DepotFile): + def __init__(self, manifest, file_mapping): + if not isinstance(manifest, CDNDepotManifest): + raise TypeError("Expected 'manifest' to be of type CDNDepotFile") + if not isinstance(file_mapping, ContentManifestPayload.FileMapping): + raise TypeError("Expected 'file_mapping' to be of type ContentManifestPayload.FileMapping") + + DepotFile.__init__(self, manifest, file_mapping) + + self.offset = 0 + self._lc = None + self._lcbuff = b'' + + def __repr__(self): + return "<%s(%s, %s, %s, %s, %s)>" % ( + self.__class__.__name__, + self.manifest.app_id, + self.manifest.depot_id, + self.manifest.gid, + repr(self.filename), + 'is_directory=True' if self.is_directory else self.size, + ) + + @property + def seekable(self): + return self.is_file + + def tell(self): + if not self.seekable: + raise ValueError("This file is not seekable, probably because its directory or symlink") + return self.offset + + def seek(self, offset, whence=0): + if not self.seekable: + raise ValueError("This file is not seekable, probably because its directory or symlink") + + if whence == 0: + if offset < 0: + raise IOError("Invalid argument") + elif whence == 1: + offset = self.offset + offset + elif whence == 2: + offset = self.size + offset + else: + raise ValueError("Invalid value for whence") + + self.offset = max(0, min(self.size, offset)) + + def _get_chunk(self, chunk): + if not self._lc or self._lc.sha != chunk.sha: + self._lcbuff = self.manifest.cdn_client.get_chunk( + self.manifest.app_id, + self.manifest.depot_id, + chunk.sha.hex(), + ) + self._lc = chunk + return self._lcbuff + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + pass + + def next(self): + line = self.readline() + if line == b'': + raise StopIteration + return line + + def read(self, length=-1): + if length == -1: + length = self.size - self.offset + if length == 0 or self.offset >= self.size or self.size == 0: + return b'' + + end_offset = self.offset + length + + # we cache last chunk to allow small length reads and local seek + if (self._lc + and self.offset >= self._lc.offset + and end_offset <= self._lc.offset + self._lc.cb_original): + data = self._lcbuff[self.offset - self._lc.offset:self.offset - self._lc.offset + length] + # if we need to read outside the bounds of the cached chunk + # we go to loop over chunks to determine which to download + else: + data = BytesIO() + start_offset = None + + # Manifest orders the chunks in ascending order by offset + for chunk in self.chunks: + if chunk.offset >= end_offset: + break + elif (chunk.offset <= self.offset < chunk.offset + chunk.cb_original + or chunk.offset < end_offset <= chunk.offset + chunk.cb_original): + if start_offset is None: + start_offset = chunk.offset + data.write(self._get_chunk(chunk)) + + data.seek(self.offset - start_offset) + data = data.read(length) + + self.offset = min(self.size, end_offset) + return data + + def readline(self): + buf = b'' + + for chunk in iter(lambda: self.read(256), b''): + pos = chunk.find(b'\n') + if pos > -1: + pos += 1 # include \n + buf += chunk[:pos] + self.seek(self.offset - (len(chunk) - pos)) + break + + buf += chunk + + return buf + + def readlines(self): + return [line for line in self] diff --git a/steam/core/manifest.py b/steam/core/manifest.py index db84712..a42b2dc 100644 --- a/steam/core/manifest.py +++ b/steam/core/manifest.py @@ -5,6 +5,7 @@ from zipfile import ZipFile, ZIP_DEFLATED, BadZipFile from struct import pack from datetime import datetime from fnmatch import fnmatch +import os.path from steam.enums import EDepotFileFlag from steam.core.crypto import symmetric_decrypt @@ -21,7 +22,7 @@ class DepotManifest(object): PROTOBUF_ENDOFMANIFEST_MAGIC = 0x32C415AB def __init__(self, data=None): - """Manage depot manifest + """Represents depot manifest :param data: manifest data :type data: bytes @@ -35,9 +36,11 @@ class DepotManifest(object): def __repr__(self): params = ', '.join([ - str(self.depot_id), - str(self.gid), - repr(datetime.utcfromtimestamp(self.metadata.creation_time).isoformat().replace('T', ' ')), + "depot_id=" + str(self.depot_id), + "gid=" + str(self.gid), + "creation_time=" + repr( + datetime.utcfromtimestamp(self.metadata.creation_time).isoformat().replace('T', ' ') + ), ]) if self.metadata.filenames_encrypted: @@ -164,9 +167,12 @@ class DepotManifest(object): else: return data.getvalue() + def _make_depot_file(self, file_mapping): + return DepotFile(self, file_mapping) + def __iter__(self): for mapping in self.payload.mappings: - yield DepotFile(self, mapping) + yield self._make_depot_file(mapping) def iter_files(self, pattern=None): """ @@ -177,7 +183,7 @@ class DepotManifest(object): if (pattern is not None and not fnmatch(mapping.filename.rstrip('\x00 \n\t'), pattern)): continue - yield DepotFile(self, mapping) + yield self._make_depot_file(mapping) def __len__(self): return len(self.payload.mappings) @@ -192,9 +198,9 @@ class DepotFile(object): :type file_mapping: ContentManifestPayload.FileMapping """ if not isinstance(manifest, DepotManifest): - raise ValueError("Expected 'manifest' to be of type DepotManifest") + raise TypeError("Expected 'manifest' to be of type DepotManifest") if not isinstance(file_mapping, ContentManifestPayload.FileMapping): - raise ValueError("Expected 'file_mapping' to be of type ContentManifestPayload.FileMapping") + raise TypeError("Expected 'file_mapping' to be of type ContentManifestPayload.FileMapping") self.manifest = manifest self.file_mapping = file_mapping @@ -210,24 +216,52 @@ class DepotFile(object): @property def filename(self): + """ + :returns: Filename with null terminator and whitespaces removed + :rtype: str + """ return self.file_mapping.filename.rstrip('\x00 \n\t') + @property + def filename_norm(self): + """ + :return: Return current OS compatible path + :rtype: str + """ + return os.path.join(*self.filename.split('\\')) + @property def size(self): + """ + :return: file size in bytes + :rtype: int + """ return self.file_mapping.size @property def chunks(self): + """ + :return: file size in bytes + :rtype: int + """ return self.file_mapping.chunks @property def flags(self): + """ + :returns: file flags + :rtype: :class:`.EDepotFileFlag` + """ return self.file_mapping.flags @property def is_directory(self): return self.flags & EDepotFileFlag.Directory > 0 + @property + def is_symlink(self): + return not not self.file_mapping.linktarget + @property def is_file(self): - return not self.is_directory + return not self.is_directory and not self.is_symlink