Browse Source

improvements + CDNDepotManifest + CDNDepotFile

* fixed VZ decompressoin and add some checks
* add get_manifest() which returns a list of CDNDepotManifest for app_id and branch
* add iter_files() which will get all manifests, and iter files for app_id and branch
* add get_manifest_for_workshop_item() can be used to download steampiped hosted workshop items
* add basic caching inside CDNClient
* CDNDepotManifest, which enhances DepotManifest and inits CDNDepotFiles
* CDNDepotFile expands DepotFile to a file like object that can be used to directly seek and
  read files from steampipe
pull/191/head
Rossen Georgiev 6 years ago
parent
commit
8c6c2eb546
  1. 4
      steam/client/builtins/apps.py
  2. 415
      steam/client/cdn.py
  3. 52
      steam/core/manifest.py

4
steam/client/builtins/apps.py

@ -92,10 +92,8 @@ class Apps(object):
data = dict(apps={}, packages={})
while True:
chunk = self.wait_event(job_id, timeout=timeout)
chunk = self.wait_event(job_id, timeout=timeout, raises=True)
if chunk is None:
return
chunk = chunk[0].body
for app in chunk.apps:

415
steam/client/cdn.py

@ -2,14 +2,19 @@
from zipfile import ZipFile
from io import BytesIO
from collections import OrderedDict, deque
from six import itervalues
import vdf
from six import itervalues, iteritems
from binascii import crc32
from datetime import datetime
import logging
import struct
import vdf
from steam import webapi
from steam.enums import EResult, EServerType
from steam.util.web import make_requests_session
from steam.core.crypto import symmetric_decrypt
from steam.core.manifest import DepotManifest
from steam.core.manifest import DepotManifest, DepotFile
from steam.protobufs.content_manifest_pb2 import ContentManifestPayload
try:
import lzma
@ -17,7 +22,7 @@ except ImportError:
from backports import lzma
def get_content_servers_from_cs(host, port, cell_id, num_servers=20, session=None):
def get_content_servers_from_cs(cell_id, host='cs.steamcontent.com', port=80, num_servers=20, session=None):
proto = 'https' if port == 443 else 'http'
url = '%s://%s:%s/serverlist/%s/%s/' % (proto, host, port, cell_id, num_servers)
@ -70,33 +75,70 @@ def get_content_servers_from_webapi(cell_id, num_servers=20):
return servers
class ContentServer(object):
https = False
host = None
vhost = None
port = None
type = None
cell_id = 0
load = None
weighted_load = None
def __repr__(self):
return "<%s('%s://%s:%s', type=%s, cell_id=%s)>" % (
self.__class__.__name__,
'https' if self.https else 'http',
self.host,
self.port,
repr(self.type),
repr(self.cell_id),
)
class CDNClient(object):
def __init__(self, client, app_id):
_LOG = logging.getLogger("CDNClient")
servers = deque()
def __init__(self, client):
self.steam = client
self.app_id = app_id
self.web = make_requests_session()
self.servers = deque()
self.cdn_auth_tokens = {}
self.depot_keys = {}
self.workshop_depots = {}
self.manifests = {}
self.app_depots = {}
if not self.servers:
self.fetch_content_servers()
@property
def cell_id(self):
return self.steam.cell_id
def init_servers(self, num_servers=20):
def fetch_content_servers(self, num_servers=20):
self.servers.clear()
for ip, port in self.steam.servers[EServerType.CS]:
servers = get_content_servers_from_cs(ip, port, self.cell_id, num_servers, self.web)
if self.steam:
for ip, port in self.steam.servers.get(EServerType.CS, []):
servers = get_content_servers_from_cs(self.cell_id, ip, port, num_servers, self.web)
if servers:
self.servers.extend(servers)
break
if servers:
self.servers.extend(servers)
break
else:
self._LOG.debug("No content servers available on SteamClient instance")
if not self.servers:
raise RuntimeError("No content servers on SteamClient instance. Is it logged in?")
self._LOG.debug("Trying to fetch content servers from Steam API")
servers = get_content_servers_from_webapi(self.cell_id)
self.servers.extend(servers)
def get_content_server(self, rotate=True):
if not self.servers:
raise ValueError("Failed to fetch content servers")
def get_content_server(self, rotate=False):
if rotate:
self.servers.rotate(-1)
return self.servers[0]
@ -105,26 +147,25 @@ class CDNClient(object):
if depot_id not in self.cdn_auth_tokens:
msg = self.steam.get_cdn_auth_token(depot_id, 'steampipe.steamcontent.com')
if msg.eresult == EResult.OK:
if msg and msg.eresult == EResult.OK:
self.cdn_auth_tokens[depot_id] = msg.token
elif msg is None:
raise Exception("Failed getting depot key: %s" % repr(EResult.Timeout))
else:
raise Exception("Failed getting depot key: %s" % repr(EResult(msg.eresult)))
raise ValueError("Failed getting depot key: %s" % repr(
EResult.Timeout if msg is None else EResult(msg.eresult)))
return self.cdn_auth_tokens[depot_id]
def get_depot_key(self, depot_id):
if depot_id not in self.depot_keys:
msg = self.steam.get_depot_key(self.app_id, depot_id)
if msg.eresult == EResult.OK:
self.depot_keys[depot_id] = msg.depot_encryption_key
elif msg is None:
raise Exception("Failed getting depot key: %s" % repr(EResult.Timeout))
def get_depot_key(self, app_id, depot_id):
if (app_id, depot_id) not in self.depot_keys:
msg = self.steam.get_depot_key(app_id, depot_id)
if msg and msg.eresult == EResult.OK:
self.depot_keys[(app_id, depot_id)] = msg.depot_encryption_key
else:
raise Exception("Failed getting depot key: %s" % repr(EResult(msg.eresult)))
raise ValueError("Failed getting depot key: %s" % repr(
EResult.Timeout if msg is None else EResult(msg.eresult)))
return self.depot_keys[depot_id]
return self.depot_keys[(app_id, depot_id)]
def get(self, command, args, auth_token=''):
server = self.get_content_server()
@ -138,60 +179,308 @@ class CDNClient(object):
args,
auth_token,
)
resp = self.web.get(url)
if resp.ok:
return resp
elif resp.status_code in (401, 403, 404):
resp.raise_for_status()
try:
resp = self.web.get(url)
except:
pass
else:
if resp.ok:
return resp
elif resp.status_code in (401, 403, 404):
resp.raise_for_status()
server = self.get_content_server(rotate=True)
def get_manifest(self, depot_id, manifest_id, cdn_auth_token=None, decrypt=True):
def get_chunk(self, app_id, depot_id, chunk_id, cdn_auth_token=None):
if cdn_auth_token is None:
cdn_auth_token = self.get_cdn_auth_token(depot_id)
resp = self.get('depot', '%s/manifest/%s/5' % (depot_id, manifest_id), cdn_auth_token)
resp = self.get('depot', '%s/chunk/%s' % (depot_id, chunk_id), cdn_auth_token)
if resp.ok:
manifest = DepotManifest(resp.content)
if decrypt:
manifest.decrypt_filenames(self.get_depot_key(depot_id))
return manifest
data = symmetric_decrypt(resp.content, self.get_depot_key(app_id, depot_id))
def get_chunk(self, depot_id, chunk_id, cdn_auth_token=None):
if cdn_auth_token is None:
cdn_auth_token = self.get_cdn_auth_token(depot_id)
if data[:2] == b'VZ':
if data[2:3] != b'a':
raise ValueError("Invalid VZ version: %s" % repr(data[2:3]))
resp = self.get('depot', '%s/chunk/%s' % (depot_id, chunk_id), cdn_auth_token)
vzfilter = lzma._decode_filter_properties(lzma.FILTER_LZMA1, data[7:12])
vzdec = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[vzfilter])
checksum, decompressed_size, vz_footer = struct.unpack('<II2s', data[-10:])
# i have no idea why, but some chunks will decompress with 1 extra byte at the end
udata = vzdec.decompress(data[12:-10])[:decompressed_size]
if resp.ok:
data = symmetric_decrypt(resp.content, self.get_depot_key(depot_id))
if vz_footer != b'zv':
raise ValueError("Invalid VZ footer: %s" % repr(vz_footer))
if crc32(udata) != checksum:
raise ValueError("CRC checksum doesn't match for decompressed data")
return udata[:decompressed_size]
if data[:3] == b'VZa':
f = lzma._decode_filter_properties(lzma.FILTER_LZMA1, data[7:12])
return lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[f]).decompress(data[12:-10])
else:
with ZipFile(BytesIO(data)) as zf:
return zf.read(zf.filelist[0])
def get_manifest(self, app_id, depot_id, manifest_id, cdn_auth_token=None, decrypt=True):
if (app_id, depot_id, manifest_id) not in self.manifests:
if cdn_auth_token is None:
cdn_auth_token = self.get_cdn_auth_token(depot_id)
class ContentServer(object):
https = False
host = None
vhost = None
port = None
type = None
cell_id = 0
load = None
weighted_load = None
resp = self.get('depot', '%s/manifest/%s/5' % (depot_id, manifest_id), cdn_auth_token)
if resp.ok:
manifest = CDNDepotManifest(self, app_id, resp.content)
if decrypt:
manifest.decrypt_filenames(self.get_depot_key(app_id, depot_id))
self.manifests[(app_id, depot_id, manifest_id)] = manifest
return self.manifests[(app_id, depot_id, manifest_id)]
def get_manifests(self, app_id, branch='public'):
if app_id not in self.app_depots:
self.app_depots[app_id] = self.steam.get_product_info([app_id])['apps'][app_id]['depots']
depots = self.app_depots[app_id]
if branch not in depots['branches']:
raise ValueError("No branch named %s for app_id %s" % (repr(branch), app_id))
elif int(depots['branches'][branch].get('pwdrequired', 0)) > 0:
raise NotImplementedError("Password protected branches are not supported yet")
manifests = []
for depot_id, depot_config in iteritems(depots):
if not depot_id.isdigit():
continue
depot_id = int(depot_id)
if branch in depot_config.get('manifests', {}):
try:
manifest = self.get_manifest(app_id, depot_id, depot_config['manifests'][branch])
except ValueError as exp:
self._LOG.error("Depot %s (%s): %s",
repr(depot_config['name']),
depot_id,
str(exp),
)
continue
manifest.name = depot_config['name']
manifests.append(manifest)
return manifests
def iter_files(self, app_id, filename_filter=None, branch='public'):
for manifest in self.get_manifests(app_id, branch):
for fp in manifest.iter_files(filename_filter):
yield fp
def get_manifest_for_workshop_item(self, item_id):
resp, error = self.steam.unified_messages.send_and_wait('PublishedFile.GetDetails#1', {
'publishedfileids': [item_id],
'includetags': False,
'includeadditionalpreviews': False,
'includechildren': False,
'includekvtags': False,
'includevotes': False,
'short_description': True,
'includeforsaledata': False,
'includemetadata': False,
'language': 0
}, timeout=7)
if error:
raise error
wf = None if resp is None else resp.publishedfiledetails[0]
if wf is None or wf.result != EResult.OK:
raise ValueError("Failed getting workshop file info: %s" % repr(
EResult.Timeout if resp is None else EResult(wf.result)))
elif not wf.hcontent_file:
raise ValueError("Workshop file is not on steampipe")
app_id = wf.consumer_appid
ws_app_id = self.workshop_depots.get(app_id)
if ws_app_id is None:
ws_app_id = int(self.steam.get_product_info([app_id])['apps'][app_id]['depots'].get(
'workshopdepot', app_id))
self.workshop_depots[app_id] = ws_app_id
manifest = self.get_manifest(app_id, ws_app_id, wf.hcontent_file)
manifest.name = wf.title
return manifest
class CDNDepotManifest(DepotManifest):
name = None #: set only by :meth:`CDNClient.get_manifests`
def __init__(self, cdn_client, app_id, data):
self.cdn_client = cdn_client
self.app_id = app_id
DepotManifest.__init__(self, data)
def __repr__(self):
return "<%s('%s://%s:%s', type=%s, cell_id=%s)>" % (
params = ', '.join([
"app_id=" + str(self.app_id),
"depot_id=" + str(self.depot_id),
"gid=" + str(self.gid),
"creation_time=" + repr(
datetime.utcfromtimestamp(self.metadata.creation_time).isoformat().replace('T', ' ')
),
])
if self.name:
params = repr(self.name) + ', ' + params
if self.metadata.filenames_encrypted:
params += ', filenames_encrypted=True'
return "<%s(%s)>" % (
self.__class__.__name__,
'https' if self.https else 'http',
self.host,
self.port,
repr(self.type),
repr(self.cell_id),
params,
)
def deserialize(self, data):
DepotManifest.deserialize(self, data)
# order chunks in ascending order by their offset
# required for CDNDepotFile
for mapping in self.payload.mappings:
mapping.chunks.sort(key=lambda x: x.offset)
def _make_depot_file(self, file_mapping):
return CDNDepotFile(self, file_mapping)
class CDNDepotFile(DepotFile):
def __init__(self, manifest, file_mapping):
if not isinstance(manifest, CDNDepotManifest):
raise TypeError("Expected 'manifest' to be of type CDNDepotFile")
if not isinstance(file_mapping, ContentManifestPayload.FileMapping):
raise TypeError("Expected 'file_mapping' to be of type ContentManifestPayload.FileMapping")
DepotFile.__init__(self, manifest, file_mapping)
self.offset = 0
self._lc = None
self._lcbuff = b''
def __repr__(self):
return "<%s(%s, %s, %s, %s, %s)>" % (
self.__class__.__name__,
self.manifest.app_id,
self.manifest.depot_id,
self.manifest.gid,
repr(self.filename),
'is_directory=True' if self.is_directory else self.size,
)
@property
def seekable(self):
return self.is_file
def tell(self):
if not self.seekable:
raise ValueError("This file is not seekable, probably because its directory or symlink")
return self.offset
def seek(self, offset, whence=0):
if not self.seekable:
raise ValueError("This file is not seekable, probably because its directory or symlink")
if whence == 0:
if offset < 0:
raise IOError("Invalid argument")
elif whence == 1:
offset = self.offset + offset
elif whence == 2:
offset = self.size + offset
else:
raise ValueError("Invalid value for whence")
self.offset = max(0, min(self.size, offset))
def _get_chunk(self, chunk):
if not self._lc or self._lc.sha != chunk.sha:
self._lcbuff = self.manifest.cdn_client.get_chunk(
self.manifest.app_id,
self.manifest.depot_id,
chunk.sha.hex(),
)
self._lc = chunk
return self._lcbuff
def __iter__(self):
return self
def __next__(self):
return self.next()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
pass
def next(self):
line = self.readline()
if line == b'':
raise StopIteration
return line
def read(self, length=-1):
if length == -1:
length = self.size - self.offset
if length == 0 or self.offset >= self.size or self.size == 0:
return b''
end_offset = self.offset + length
# we cache last chunk to allow small length reads and local seek
if (self._lc
and self.offset >= self._lc.offset
and end_offset <= self._lc.offset + self._lc.cb_original):
data = self._lcbuff[self.offset - self._lc.offset:self.offset - self._lc.offset + length]
# if we need to read outside the bounds of the cached chunk
# we go to loop over chunks to determine which to download
else:
data = BytesIO()
start_offset = None
# Manifest orders the chunks in ascending order by offset
for chunk in self.chunks:
if chunk.offset >= end_offset:
break
elif (chunk.offset <= self.offset < chunk.offset + chunk.cb_original
or chunk.offset < end_offset <= chunk.offset + chunk.cb_original):
if start_offset is None:
start_offset = chunk.offset
data.write(self._get_chunk(chunk))
data.seek(self.offset - start_offset)
data = data.read(length)
self.offset = min(self.size, end_offset)
return data
def readline(self):
buf = b''
for chunk in iter(lambda: self.read(256), b''):
pos = chunk.find(b'\n')
if pos > -1:
pos += 1 # include \n
buf += chunk[:pos]
self.seek(self.offset - (len(chunk) - pos))
break
buf += chunk
return buf
def readlines(self):
return [line for line in self]

52
steam/core/manifest.py

@ -5,6 +5,7 @@ from zipfile import ZipFile, ZIP_DEFLATED, BadZipFile
from struct import pack
from datetime import datetime
from fnmatch import fnmatch
import os.path
from steam.enums import EDepotFileFlag
from steam.core.crypto import symmetric_decrypt
@ -21,7 +22,7 @@ class DepotManifest(object):
PROTOBUF_ENDOFMANIFEST_MAGIC = 0x32C415AB
def __init__(self, data=None):
"""Manage depot manifest
"""Represents depot manifest
:param data: manifest data
:type data: bytes
@ -35,9 +36,11 @@ class DepotManifest(object):
def __repr__(self):
params = ', '.join([
str(self.depot_id),
str(self.gid),
repr(datetime.utcfromtimestamp(self.metadata.creation_time).isoformat().replace('T', ' ')),
"depot_id=" + str(self.depot_id),
"gid=" + str(self.gid),
"creation_time=" + repr(
datetime.utcfromtimestamp(self.metadata.creation_time).isoformat().replace('T', ' ')
),
])
if self.metadata.filenames_encrypted:
@ -164,9 +167,12 @@ class DepotManifest(object):
else:
return data.getvalue()
def _make_depot_file(self, file_mapping):
return DepotFile(self, file_mapping)
def __iter__(self):
for mapping in self.payload.mappings:
yield DepotFile(self, mapping)
yield self._make_depot_file(mapping)
def iter_files(self, pattern=None):
"""
@ -177,7 +183,7 @@ class DepotManifest(object):
if (pattern is not None
and not fnmatch(mapping.filename.rstrip('\x00 \n\t'), pattern)):
continue
yield DepotFile(self, mapping)
yield self._make_depot_file(mapping)
def __len__(self):
return len(self.payload.mappings)
@ -192,9 +198,9 @@ class DepotFile(object):
:type file_mapping: ContentManifestPayload.FileMapping
"""
if not isinstance(manifest, DepotManifest):
raise ValueError("Expected 'manifest' to be of type DepotManifest")
raise TypeError("Expected 'manifest' to be of type DepotManifest")
if not isinstance(file_mapping, ContentManifestPayload.FileMapping):
raise ValueError("Expected 'file_mapping' to be of type ContentManifestPayload.FileMapping")
raise TypeError("Expected 'file_mapping' to be of type ContentManifestPayload.FileMapping")
self.manifest = manifest
self.file_mapping = file_mapping
@ -210,24 +216,52 @@ class DepotFile(object):
@property
def filename(self):
"""
:returns: Filename with null terminator and whitespaces removed
:rtype: str
"""
return self.file_mapping.filename.rstrip('\x00 \n\t')
@property
def filename_norm(self):
"""
:return: Return current OS compatible path
:rtype: str
"""
return os.path.join(*self.filename.split('\\'))
@property
def size(self):
"""
:return: file size in bytes
:rtype: int
"""
return self.file_mapping.size
@property
def chunks(self):
"""
:return: file size in bytes
:rtype: int
"""
return self.file_mapping.chunks
@property
def flags(self):
"""
:returns: file flags
:rtype: :class:`.EDepotFileFlag`
"""
return self.file_mapping.flags
@property
def is_directory(self):
return self.flags & EDepotFileFlag.Directory > 0
@property
def is_symlink(self):
return not not self.file_mapping.linktarget
@property
def is_file(self):
return not self.is_directory
return not self.is_directory and not self.is_symlink

Loading…
Cancel
Save