""" The :class:`.CDNClient` class provides a simple API for downloading Steam content from SteamPipe Initializing :class:`.CDNClient` requires a logged in :class:`.SteamClient` instance .. warning:: This module uses :mod:`requests` library, which is not gevent cooperative by default. It is high recommended that you use :meth:`steam.monkey.patch_minimal()`. See example below .. code:: python import steam.monkey steam.monkey.patch_minimal() from steam.client import SteamClient, EMsg from steam.client.cdn import CDNClient mysteam = SteamClient() mysteam.cli_login() ... mycdn = CDNClient(mysteam) Getting depot manifests for an app .. code:: python >>> mycdn.get_manifests(570) [, , , , , , , , , , , , , , , , , , , , ] >>> mycdn.get_manifests(570, filter_func=lambda depot_id, info: 'Dota 2 Content' in info['name']) [, , , , , ] Listing files .. code:: python >>> file_list = mycdn.iter_files(570) >>> list(file_list)[:10] [, , , , , , , , , Reading a file directly from SteamPipe .. code:: python >>> file_list = mycdn.iter_files(570, r'game\dota\gameinfo.gi') >>> myfile = next(file_list) >>> print(myfile.read(80).decode('utf-8')) "GameInfo" { game "Dota 2" title "Dota 2" gamelogo 1 type multiplayer_only ... """ from zipfile import ZipFile from io import BytesIO from collections import OrderedDict, deque from six import itervalues, iteritems from binascii import crc32, unhexlify from datetime import datetime import logging import struct import vdf from gevent.pool import Pool as GPool from cachetools import LRUCache from steam import webapi from steam.exceptions import SteamError, ManifestError from steam.core.msg import MsgProto from steam.enums import EResult, EType from steam.enums.emsg import EMsg from steam.utils.web import make_requests_session from steam.core.crypto import symmetric_decrypt, symmetric_decrypt_ecb from steam.core.manifest import DepotManifest, DepotFile from steam.protobufs.content_manifest_pb2 import ContentManifestPayload try: import lzma except ImportError: from backports import lzma def decrypt_manifest_gid_2(encrypted_gid, password): """Decrypt manifest gid v2 bytes :param encrypted_gid: encrypted gid v2 bytes :type encrypted_gid: bytes :param password: encryption password :type password: byt :return: manifest gid :rtype: int """ return struct.unpack('" % ( self.__class__.__name__, 'https' if self.https else 'http', self.host, self.port, repr(self.type), repr(self.cell_id), ) class CDNDepotFile(DepotFile): def __init__(self, manifest, file_mapping): """File-like object proxy for content files located on SteamPipe :param manifest: parrent manifest instance :type manifest: :class:`.CDNDepotManifest` :param file_mapping: file mapping instance from manifest :type file_mapping: ContentManifestPayload.FileMapping """ if not isinstance(manifest, CDNDepotManifest): raise TypeError("Expected 'manifest' to be of type CDNDepotFile") if not isinstance(file_mapping, ContentManifestPayload.FileMapping): raise TypeError("Expected 'file_mapping' to be of type ContentManifestPayload.FileMapping") DepotFile.__init__(self, manifest, file_mapping) self.offset = 0 self._lc = None self._lcbuff = b'' def __repr__(self): return "<%s(%s, %s, %s, %s, %s)>" % ( self.__class__.__name__, self.manifest.app_id, self.manifest.depot_id, self.manifest.gid, repr(self.filename_raw), 'is_directory=True' if self.is_directory else self.size, ) @property def seekable(self): """:type: bool""" return self.is_file def tell(self): """:type: int""" if not self.seekable: raise ValueError("This file is not seekable, probably because its directory or symlink") return self.offset def seek(self, offset, whence=0): """Seen file :param offset: file offset :type offset: int :param whence: offset mode, see :meth:`io.IOBase.seek` :type whence: int """ if not self.seekable: raise ValueError("This file is not seekable, probably because its directory or symlink") if whence == 0: if offset < 0: raise IOError("Invalid argument") elif whence == 1: offset = self.offset + offset elif whence == 2: offset = self.size + offset else: raise ValueError("Invalid value for whence") self.offset = max(0, min(self.size, offset)) def _get_chunk(self, chunk): if not self._lc or self._lc.sha != chunk.sha: self._lcbuff = self.manifest.cdn_client.get_chunk( self.manifest.app_id, self.manifest.depot_id, chunk.sha.hex(), ) self._lc = chunk return self._lcbuff def __iter__(self): return self def __next__(self): return self.next() def __enter__(self): return self def __exit__(self, type, value, traceback): pass def next(self): line = self.readline() if line == b'': raise StopIteration return line def read(self, length=-1): """Read bytes from the file :param length: number of bytes to read. Read the whole file if not set :type length: int :returns: file data :rtype: bytes """ if length == -1: length = self.size - self.offset if length == 0 or self.offset >= self.size or self.size == 0: return b'' end_offset = self.offset + length # we cache last chunk to allow small length reads and local seek if (self._lc and self.offset >= self._lc.offset and end_offset <= self._lc.offset + self._lc.cb_original): data = self._lcbuff[self.offset - self._lc.offset:self.offset - self._lc.offset + length] # if we need to read outside the bounds of the cached chunk # we go to loop over chunks to determine which to download else: data = BytesIO() start_offset = None # Manifest orders the chunks by offset in ascending order for chunk in self.chunks: if chunk.offset >= end_offset: break chunk_start = chunk.offset chunk_end = chunk_start + chunk.cb_original if ( chunk_start <= self.offset < chunk_end or chunk_start < end_offset <= chunk_end): if start_offset is None: start_offset = chunk.offset data.write(self._get_chunk(chunk)) data.seek(self.offset - start_offset) data = data.read(length) self.offset = min(self.size, end_offset) return data def readline(self): """Read a single line :return: single file line :rtype: bytes """ buf = b'' for chunk in iter(lambda: self.read(256), b''): pos = chunk.find(b'\n') if pos > -1: pos += 1 # include \n buf += chunk[:pos] self.seek(self.offset - (len(chunk) - pos)) break buf += chunk return buf def readlines(self): """Get file contents as list of lines :return: list of lines :rtype: :class:`list` [:class:`bytes`] """ return [line for line in self] class CDNDepotManifest(DepotManifest): DepotFileClass = CDNDepotFile name = None #: set only by :meth:`CDNClient.get_manifests` def __init__(self, cdn_client, app_id, data): """Holds manifest metadata and file list. :param cdn_client: CDNClient instance :type cdn_client: :class:`.CDNClient` :param app_id: App ID :type app_id: int :param data: serialized manifest data :type data: bytes """ self.cdn_client = cdn_client self.app_id = app_id DepotManifest.__init__(self, data) def __repr__(self): params = ', '.join([ "app_id=" + str(self.app_id), "depot_id=" + str(self.depot_id), "gid=" + str(self.gid), "creation_time=" + repr( datetime.utcfromtimestamp(self.metadata.creation_time).isoformat().replace('T', ' ') ), ]) if self.name: params = repr(self.name) + ', ' + params if self.filenames_encrypted: params += ', filenames_encrypted=True' return "<%s(%s)>" % ( self.__class__.__name__, params, ) def deserialize(self, data): DepotManifest.deserialize(self, data) # order chunks in ascending order by their offset # required for CDNDepotFile for mapping in self.payload.mappings: mapping.chunks.sort(key=lambda x: x.offset, reverse=False) class CDNClient(object): DepotManifestClass = CDNDepotManifest _LOG = logging.getLogger("CDNClient") servers = deque() #: CS Server list _chunk_cache = LRUCache(20) cell_id = 0 #: Cell ID to use, initialized from SteamClient instance def __init__(self, client): """CDNClient allows loading and reading of manifests for Steam apps are used to list and download content :param client: logged in SteamClient instance :type client: :class:`.SteamClient` """ self.gpool = GPool(8) #: task pool self.steam = client #: SteamClient instance if self.steam: self.cell_id = self.steam.cell_id self.web = make_requests_session() self.depot_keys = {} #: depot decryption keys self.manifests = {} #: CDNDepotManifest instances self.app_depots = {} #: app depot info self.beta_passwords = {} #: beta branch decryption keys self.licensed_app_ids = set() #: app_ids that the SteamClient instance has access to self.licensed_depot_ids = set() #: depot_ids that the SteamClient instance has access to if not self.servers: self.fetch_content_servers() self.load_licenses() def clear_cache(self): """Cleared cached information. Next call on methods with caching will return fresh data""" self.manifests.clear() self.app_depots.clear() self.beta_passwords.clear() def load_licenses(self): """Read licenses from SteamClient instance, required for determining accessible content""" self.licensed_app_ids.clear() self.licensed_depot_ids.clear() if self.steam.steam_id.type == EType.AnonUser: packages = [17906] else: if not self.steam.licenses: self._LOG.debug("No steam licenses found on SteamClient instance") return packages = list(map(lambda l: {'packageid': l.package_id, 'access_token': l.access_token}, itervalues(self.steam.licenses))) for package_id, info in iteritems(self.steam.get_product_info(packages=packages)['packages']): self.licensed_app_ids.update(info['appids'].values()) self.licensed_depot_ids.update(info['depotids'].values()) def fetch_content_servers(self, num_servers=20): """Update CS server list :param num_servers: numbers of CS server to fetch :type num_servers: int """ self.servers.clear() self._LOG.debug("Trying to fetch content servers from Steam API") servers = get_content_servers_from_webapi(self.cell_id) servers = filter(lambda server: server.type != 'OpenCache', servers) # see #264 self.servers.extend(servers) if not self.servers: raise SteamError("Failed to fetch content servers") def get_content_server(self, rotate=False): """Get a CS server for content download :param rotate: forcefully rotate server list and get a new server :type rotate: bool """ if rotate: self.servers.rotate(-1) return self.servers[0] def get_depot_key(self, app_id, depot_id): """Get depot key, which is needed to decrypt files :param app_id: app id :type app_id: int :param depot_id: depot id :type depot_id: int :return: returns decryption key :rtype: bytes :raises SteamError: error message """ if depot_id not in self.depot_keys: msg = self.steam.get_depot_key(app_id, depot_id) if msg and msg.eresult == EResult.OK: self.depot_keys[depot_id] = msg.depot_encryption_key else: raise SteamError("Failed getting depot key", EResult.Timeout if msg is None else EResult(msg.eresult)) return self.depot_keys[depot_id] def cdn_cmd(self, command, args): """Run CDN command request :param command: command name :type command: str :param args: args :type args: str :returns: requests response :rtype: :class:`requests.Response` :raises SteamError: on error """ server = self.get_content_server() while True: url = "%s://%s:%s/%s/%s" % ( 'https' if server.https else 'http', server.host, server.port, command, args, ) try: resp = self.web.get(url, timeout=10) except Exception as exp: self._LOG.debug("Request error: %s", exp) else: if resp.ok: return resp elif 400 <= resp.status_code < 500: self._LOG.debug("Got HTTP %s", resp.status_code) raise SteamError("HTTP Error %s" % resp.status_code) self.steam.sleep(0.5) server = self.get_content_server(rotate=True) def get_chunk(self, app_id, depot_id, chunk_id): """Download a single content chunk :param app_id: App ID :type app_id: int :param depot_id: Depot ID :type depot_id: int :param chunk_id: Chunk ID :type chunk_id: int :returns: chunk data :rtype: bytes :raises SteamError: error message """ if (depot_id, chunk_id) not in self._chunk_cache: resp = self.cdn_cmd('depot', '%s/chunk/%s' % (depot_id, chunk_id)) data = symmetric_decrypt(resp.content, self.get_depot_key(app_id, depot_id)) if data[:2] == b'VZ': if data[-2:] != b'zv': raise SteamError("VZ: Invalid footer: %s" % repr(data[-2:])) if data[2:3] != b'a': raise SteamError("VZ: Invalid version: %s" % repr(data[2:3])) vzfilter = lzma._decode_filter_properties(lzma.FILTER_LZMA1, data[7:12]) vzdec = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[vzfilter]) checksum, decompressed_size = struct.unpack(' 0: is_enc_branch = True if (app_id, branch) not in self.beta_passwords: if not password: raise SteamError("Branch %r requires a password" % branch) result = self.check_beta_password(app_id, password) if result != EResult.OK: raise SteamError("Branch password is not valid. %r" % result) if (app_id, branch) not in self.beta_passwords: raise SteamError("Incorrect password for branch %r" % branch) def async_fetch_manifest( app_id, depot_id, manifest_gid, decrypt, depot_name, branch_name, branch_pass ): try: manifest_code = self.get_manifest_request_code( app_id, depot_id, int(manifest_gid), branch_name, branch_pass ) except SteamError as exc: return ManifestError("Failed to acquire manifest code", app_id, depot_id, manifest_gid, exc) try: manifest = self.get_manifest( app_id, depot_id, manifest_gid, decrypt=decrypt, manifest_request_code=manifest_code ) except Exception as exc: return ManifestError("Failed download", app_id, depot_id, manifest_gid, exc) manifest.name = depot_name return manifest tasks = [] shared_depots = {} for depot_id, depot_info in iteritems(depots): if not depot_id.isdigit(): continue depot_id = int(depot_id) # if filter_func set, use it to filter the list the depots if filter_func and not filter_func(depot_id, depot_info): continue # if we have no license for the depot, no point trying as we won't get depot_key if not self.has_license_for_depot(depot_id): self._LOG.debug("No license for depot %s (%s). Skipped", repr(depot_info.get('name', depot_id)), depot_id, ) continue # accumulate the shared depots if 'depotfromapp' in depot_info: shared_depots.setdefault(int(depot_info['depotfromapp']), set()).add(depot_id) continue # process depot, and get manifest for branch if is_enc_branch: egid = depot_info.get('encryptedmanifests', {}).get(branch, {}).get('encrypted_gid_2') if egid is not None: manifest_gid = decrypt_manifest_gid_2(unhexlify(egid), self.beta_passwords[(app_id, branch)]) else: manifest_gid = depot_info.get('manifests', {}).get('public') else: manifest_gid = depot_info.get('manifests', {}).get(branch) if manifest_gid is not None: tasks.append( self.gpool.spawn( async_fetch_manifest, app_id, depot_id, manifest_gid, decrypt, depot_info.get('name', depot_id), branch_name=branch, branch_pass=None, # TODO: figure out how to pass this correctly ) ) # collect results manifests = [] for task in tasks: result = task.get() if isinstance(result, ManifestError): raise result manifests.append(result) # load shared depot manifests for app_id, depot_ids in iteritems(shared_depots): def nested_ffunc(depot_id, depot_info, depot_ids=depot_ids, ffunc=filter_func): return (int(depot_id) in depot_ids and (ffunc is None or ffunc(depot_id, depot_info))) manifests += self.get_manifests(app_id, filter_func=nested_ffunc) return manifests def iter_files(self, app_id, filename_filter=None, branch='public', password=None, filter_func=None): """Like :meth:`.get_manifests` but returns a iterator that goes through all the files in all the manifest. :param app_id: App ID :type app_id: int :param filename_filter: wildcard filter for file paths :type branch: str :param branch: branch name :type branch: str :param password: branch password for locked branches :type password: str :param filter_func: Function to filter depots. ``func(depot_id, depot_info)`` :returns: generator of of CDN files :rtype: [:class:`.CDNDepotFile`] """ for manifest in self.get_manifests(app_id, branch, password, filter_func): for fp in manifest.iter_files(filename_filter): yield fp def get_manifest_for_workshop_item(self, item_id): """Get the manifest file for a worshop item that is hosted on SteamPipe :param item_id: Workshop ID :type item_id: int :returns: manifest instance :rtype: :class:`.CDNDepotManifest` :raises: ManifestError, SteamError """ resp = self.steam.send_um_and_wait('PublishedFile.GetDetails#1', { 'publishedfileids': [item_id], 'includetags': False, 'includeadditionalpreviews': False, 'includechildren': False, 'includekvtags': False, 'includevotes': False, 'short_description': True, 'includeforsaledata': False, 'includemetadata': False, 'language': 0 }, timeout=7) if resp.header.eresult != EResult.OK: raise SteamError(resp.header.error_message or 'No message', resp.header.eresult) wf = None if resp is None else resp.body.publishedfiledetails[0] if wf is None or wf.result != EResult.OK: raise SteamError("Failed getting workshop file info", EResult.Timeout if resp is None else EResult(wf.result)) elif not wf.hcontent_file: raise SteamError("Workshop file is not on SteamPipe", EResult.FileNotFound) app_id = ws_app_id = wf.consumer_appid try: manifest_code = self.get_manifest_request_code(app_id, ws_app_id, int(wf.hcontent_file)) manifest = self.get_manifest(app_id, ws_app_id, wf.hcontent_file, manifest_request_code=manifest_code) except SteamError as exc: return ManifestError("Failed to acquire manifest", app_id, depot_id, manifest_gid, exc) manifest.name = wf.title return manifest