Browse Source

add initial CDNClient + DepotManifest

pull/191/head
Rossen Georgiev 6 years ago
parent
commit
340b7ffada
  1. 7
      docs/api/steam.client.cdn.rst
  2. 7
      docs/api/steam.core.manifest.rst
  3. 3
      steam/client/__init__.py
  4. 73
      steam/client/builtins/apps.py
  5. 137
      steam/client/cdn.py
  6. 116
      steam/core/manifest.py

7
docs/api/steam.client.cdn.rst

@ -0,0 +1,7 @@
cdn
===
.. automodule:: steam.client.cdn
:members:
:show-inheritance:

7
docs/api/steam.core.manifest.rst

@ -0,0 +1,7 @@
manifest
========
.. automodule:: steam.core.manifest
:members:
:undoc-members:
:show-inheritance:

3
steam/client/__init__.py

@ -53,6 +53,7 @@ class SteamClient(CMClient, BuiltinBase):
credential_location = None #: location for sentry credential_location = None #: location for sentry
username = None #: username when logged on username = None #: username when logged on
login_key = None #: can be used for subsequent logins (no 2FA code will be required) login_key = None #: can be used for subsequent logins (no 2FA code will be required)
cell_id = 0 #: cell id provided by CM
def __init__(self): def __init__(self):
CMClient.__init__(self) CMClient.__init__(self)
@ -169,6 +170,7 @@ class SteamClient(CMClient, BuiltinBase):
def _handle_disconnect(self, *args): def _handle_disconnect(self, *args):
self.logged_on = False self.logged_on = False
self.current_jobid = 0 self.current_jobid = 0
self.cell_id = 0
def _handle_logon(self, msg): def _handle_logon(self, msg):
CMClient._handle_logon(self, msg) CMClient._handle_logon(self, msg)
@ -178,6 +180,7 @@ class SteamClient(CMClient, BuiltinBase):
if result == EResult.OK: if result == EResult.OK:
self._reconnect_backoff_c = 0 self._reconnect_backoff_c = 0
self.logged_on = True self.logged_on = True
self.cell_id = msg.body.cell_id
self.emit(self.EVENT_LOGGED_ON) self.emit(self.EVENT_LOGGED_ON)
return return

73
steam/client/builtins/apps.py

@ -2,20 +2,34 @@ import vdf
from steam.enums import EResult, EServerType from steam.enums import EResult, EServerType
from steam.enums.emsg import EMsg from steam.enums.emsg import EMsg
from steam.core.msg import MsgProto from steam.core.msg import MsgProto
from steam.util import ip_from_int from steam.util import ip_from_int, proto_fill_from_dict
class Apps(object): class Apps(object):
servers = None #: :class:`dict: Servers by type
licenses = None #: :class:`dict` Account licenses
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(Apps, self).__init__(*args, **kwargs) super(Apps, self).__init__(*args, **kwargs)
self.servers = {}
self.licenses = {}
self.on(self.EVENT_DISCONNECTED, self.__handle_disconnect)
self.on(EMsg.ClientServerList, self._handle_server_list) self.on(EMsg.ClientServerList, self._handle_server_list)
self.on(EMsg.ClientLicenseList, self._handle_licenses)
def __handle_disconnect(self):
self.servers = {} self.servers = {}
self.licenses = {}
def _handle_server_list(self, message): def _handle_server_list(self, message):
for entry in message.body.servers: for entry in message.body.servers:
self.servers.setdefault(EServerType(entry.server_type), [])\ self.servers.setdefault(EServerType(entry.server_type), [])\
.append((ip_from_int(entry.server_ip), entry.server_port)) .append((ip_from_int(entry.server_ip), entry.server_port))
def _handle_licenses(self, message):
for entry in message.body.licenses:
self.licenses[entry.package_id] = entry
def get_player_count(self, app_id, timeout=5): def get_player_count(self, app_id, timeout=5):
"""Get numbers of players for app id """Get numbers of players for app id
@ -27,7 +41,7 @@ class Apps(object):
resp = self.send_job_and_wait(MsgProto(EMsg.ClientGetNumberOfCurrentPlayersDP), resp = self.send_job_and_wait(MsgProto(EMsg.ClientGetNumberOfCurrentPlayersDP),
{'appid': app_id}, {'appid': app_id},
timeout=timeout timeout=timeout
) )
if resp is None: if resp is None:
return EResult.Timeout return EResult.Timeout
elif resp.eresult == EResult.OK: elif resp.eresult == EResult.OK:
@ -38,9 +52,9 @@ class Apps(object):
def get_product_info(self, apps=[], packages=[], timeout=15): def get_product_info(self, apps=[], packages=[], timeout=15):
"""Get product info for apps and packages """Get product info for apps and packages
:param apps: items in the list should be either just ``app_id``, or ``(app_id, access_token)`` :param apps: items in the list should be either just ``app_id``, or :class:`dict`
:type apps: :class:`list` :type apps: :class:`list`
:param packages: items in the list should be either just ``package_id``, or ``(package_id, access_token)`` :param packages: items in the list should be either just ``package_id``, or :class:`dict`
:type packages: :class:`list` :type packages: :class:`list`
:return: dict with ``apps`` and ``packages`` containing their info, see example below :return: dict with ``apps`` and ``packages`` containing their info, see example below
:rtype: :class:`dict`, :class:`None` :rtype: :class:`dict`, :class:`None`
@ -51,22 +65,23 @@ class Apps(object):
'packages': {123: {...}, ...} 'packages': {123: {...}, ...}
} }
""" """
if not apps and not packages: return if not apps and not packages:
return
message = MsgProto(EMsg.ClientPICSProductInfoRequest) message = MsgProto(EMsg.ClientPICSProductInfoRequest)
for app in apps: for app in apps:
app_info = message.body.apps.add() app_info = message.body.apps.add()
app_info.only_public = False app_info.only_public = False
if isinstance(app, tuple): if isinstance(app, dict):
app_info.appid, app_info.access_token = app proto_fill_from_dict(app_info, app)
else: else:
app_info.appid = app app_info.appid = app
for package in packages: for package in packages:
package_info = message.body.packages.add() package_info = message.body.packages.add()
if isinstance(package, tuple): if isinstance(package, dict):
package_info.appid, package_info.access_token = package proto_fill_from_dict(package_info, package)
else: else:
package_info.packageid = package package_info.packageid = package
@ -79,7 +94,8 @@ class Apps(object):
while True: while True:
chunk = self.wait_event(job_id, timeout=timeout) chunk = self.wait_event(job_id, timeout=timeout)
if chunk is None: return if chunk is None:
return
chunk = chunk[0].body chunk = chunk[0].body
for app in chunk.apps: for app in chunk.apps:
@ -110,8 +126,8 @@ class Apps(object):
'send_app_info_changes': app_changes, 'send_app_info_changes': app_changes,
'send_package_info_changes': package_changes, 'send_package_info_changes': package_changes,
}, },
timeout=15 timeout=10
) )
def get_app_ticket(self, app_id): def get_app_ticket(self, app_id):
"""Get app ownership ticket """Get app ownership ticket
@ -123,8 +139,8 @@ class Apps(object):
""" """
return self.send_job_and_wait(MsgProto(EMsg.ClientGetAppOwnershipTicket), return self.send_job_and_wait(MsgProto(EMsg.ClientGetAppOwnershipTicket),
{'app_id': app_id}, {'app_id': app_id},
timeout=15 timeout=10
) )
def get_depot_key(self, depot_id, app_id=0): def get_depot_key(self, depot_id, app_id=0):
"""Get depot decryption key """Get depot decryption key
@ -141,28 +157,28 @@ class Apps(object):
'depot_id': depot_id, 'depot_id': depot_id,
'app_id': app_id, 'app_id': app_id,
}, },
timeout=15 timeout=10
) )
def get_cdn_auth_token(self, app_id, hostname): def get_cdn_auth_token(self, depot_id, hostname):
"""Get CDN authentication token """Get CDN authentication token
:param app_id: app id :param depot_id: depot id
:type app_id: :class:`int` :type depot_id: :class:`int`
:param hostname: cdn hostname :param hostname: cdn hostname
:type hostname: :class:`str` :type hostname: :class:`str`
:return: `CMsgClientGetCDNAuthTokenResponse <https://github.com/ValvePython/steam/blob/39627fe883feeed2206016bacd92cf0e4580ead6/protobufs/steammessages_clientserver_2.proto#L585-L589>`_ :return: `CMsgClientGetCDNAuthTokenResponse <https://github.com/ValvePython/steam/blob/39627fe883feeed2206016bacd92cf0e4580ead6/protobufs/steammessages_clientserver_2.proto#L585-L589>`_
:rtype: proto message :rtype: proto message
""" """
return self.send_job_and_wait(MsgProto(EMsg.ClientGetCDNAuthToken), return self.send_job_and_wait(MsgProto(EMsg.ClientGetCDNAuthToken),
{ {
'app_id': app_id, 'depot_id': depot_id,
'host_name': hostname, 'host_name': hostname,
}, },
timeout=15 timeout=10
) )
def get_product_access_tokens(self, app_ids=[], package_ids=[]): def get_access_tokens(self, app_ids=[], package_ids=[]):
"""Get access tokens """Get access tokens
:param app_ids: list of app ids :param app_ids: list of app ids
@ -178,20 +194,21 @@ class Apps(object):
'packages': {456: 'token', ...} 'packages': {456: 'token', ...}
} }
""" """
if not app_ids and not package_ids: return if not app_ids and not package_ids:
return
resp = self.send_job_and_wait(MsgProto(EMsg.ClientPICSAccessTokenRequest), resp = self.send_job_and_wait(MsgProto(EMsg.ClientPICSAccessTokenRequest),
{ {
'appids': map(int, app_ids), 'appids': map(int, app_ids),
'packageids': map(int, package_ids), 'packageids': map(int, package_ids),
}, },
timeout=15 timeout=10
) )
if resp: if resp:
return {'apps': dict(map(lambda app: (app.appid, app.access_token), resp.app_access_tokens)), return {'apps': dict(map(lambda app: (app.appid, app.access_token), resp.app_access_tokens)),
'packages': dict(map(lambda pkg: (pkg.appid, pkg.access_token), resp.package_access_tokens)), 'packages': dict(map(lambda pkg: (pkg.appid, pkg.access_token), resp.package_access_tokens)),
} }
def register_product_key(self, key): def register_product_key(self, key):
"""Register/Redeem a CD-Key """Register/Redeem a CD-Key

137
steam/client/cdn.py

@ -0,0 +1,137 @@
from collections import OrderedDict, deque
from six import itervalues
import vdf
from steam import webapi
from steam.enums import EServerType
from steam.util.web import make_requests_session
from steam.core.manifest import DepotManifest
def get_content_servers_from_cs(host, port, cell_id, num_servers=20, session=None):
proto = 'https' if port == 443 else 'http'
url = '%s://%s:%s/serverlist/%s/%s/' % (proto, host, port, cell_id, num_servers)
session = make_requests_session() if session is None else session
resp = session.get(url)
if resp.status_code != 200:
return []
kv = vdf.loads(resp.text, mapper=OrderedDict)
if kv.get('deferred') == '1':
return []
servers = []
for entry in itervalues(kv['serverlist']):
server = ContentServer()
server.type = entry['type']
server.https = True if entry['https_support'] == 'mandatory' else False
server.host = entry['Host']
server.vhost = entry['vhost']
server.port = 443 if server.https else 80
server.cell_id = entry['cell']
server.load = entry['load']
server.weighted_load = entry['weightedload']
servers.append(server)
return servers
def get_content_servers_from_webapi(cell_id, num_servers=20):
params = {'cellid': cell_id, 'max_servers': num_servers}
resp = webapi.get('IContentServerDirectoryService', 'GetServersForSteamPipe', params=params)
servers = []
for entry in resp['response']['servers']:
server = ContentServer()
server.type = entry['type']
server.https = True if entry['https_support'] == 'mandatory' else False
server.host = entry['host']
server.vhost = entry['vhost']
server.port = 443 if server.https else 80
server.cell_id = entry.get('cell_id', 0)
server.load = entry['load']
server.weighted_load = entry['weighted_load']
servers.append(server)
return servers
class CDNClient(object):
def __init__(self, client, app_id):
self.steam = client
self.app_id = app_id
self.web = make_requests_session()
self.servers = deque()
@property
def cell_id(self):
return self.steam.cell_id
def init_servers(self, num_servers=10):
self.servers.clear()
for ip, port in self.steam.servers[EServerType.CS]:
servers = get_content_servers_from_cs(ip, port, self.cell_id, num_servers, self.web)
if servers:
self.servers.extend(servers)
break
if not self.servers:
raise RuntimeError("No content servers on SteamClient instance. Is it logged in?")
def get_content_server(self):
server = self.servers[0]
self.servers.rotate(-1)
return server
def get(self, command, args, auth_token=''):
server = self.get_content_server()
url = "%s://%s:%s/%s/%s%s" % (
'https' if server.https else 'http',
server.host,
server.port,
command,
args,
auth_token,
)
return self.web.get(url)
def get_manifest(self, depot_id, manifest_id, auth_token):
resp = self.get('depot', '%s/manifest/%s/5' % (depot_id, manifest_id), auth_token)
resp.raise_for_status()
if resp.ok:
return DepotManifest(resp.content)
class ContentServer(object):
https = False
host = None
vhost = None
port = None
type = None
cell_id = 0
load = None
weighted_load = None
def __repr__(self):
return "<%s('%s://%s:%s', type=%s, cell_id=%s)>" % (
self.__class__.__name__,
'https' if self.https else 'http',
self.host,
self.port,
repr(self.type),
repr(self.cell_id),
)

116
steam/core/manifest.py

@ -0,0 +1,116 @@
from base64 import b64decode
from io import BytesIO
from zipfile import ZipFile, ZIP_DEFLATED
from struct import pack
from datetime import datetime
from steam.core.crypto import symmetric_decrypt
from steam.util.binary import StructReader
from steam.protobufs.content_manifest_pb2 import (ContentManifestMetadata,
ContentManifestPayload,
ContentManifestSignature)
class DepotManifest(object):
PROTOBUF_PAYLOAD_MAGIC = 0x71F617D0
PROTOBUF_METADATA_MAGIC = 0x1F4812BE
PROTOBUF_SIGNATURE_MAGIC = 0x1B81B817
PROTOBUF_ENDOFMANIFEST_MAGIC = 0x32C415AB
def __init__(self, data):
self.metadata = ContentManifestMetadata()
self.payload = ContentManifestPayload()
self.signature = ContentManifestSignature()
if data:
self.deserialize(data)
def __repr__(self):
params = ', '.join([
str(self.metadata.depot_id),
str(self.metadata.gid_manifest),
repr(datetime.utcfromtimestamp(self.metadata.creation_time).isoformat().replace('T', ' ')),
])
if self.metadata.filenames_encrypted:
params += ', filenames_encrypted=True'
return "<%s(%s)>" % (
self.__class__.__name__,
params,
)
def decrypt_filenames(self, depot_key):
if not self.metadata.filenames_encrypted:
return True
for mapping in self.payload.mappings:
filename = b64decode(mapping.filename)
try:
filename = symmetric_decrypt(filename, depot_key)
except Exception:
print("Unable to decrypt filename for depot manifest")
return False
mapping.filename = filename
self.metadata.filenames_encrypted = False
return True
def deserialize(self, data):
with ZipFile(BytesIO(data)) as zf:
data = StructReader(zf.read(zf.filelist[0]))
magic, length = data.unpack('<II')
if magic != DepotManifest.PROTOBUF_PAYLOAD_MAGIC:
raise Exception("Expecting protobuf payload")
self.payload = ContentManifestPayload()
self.payload.ParseFromString(data.read(length))
magic, length = data.unpack('<II')
if magic != DepotManifest.PROTOBUF_METADATA_MAGIC:
raise Exception("Expecting protobuf metadata")
self.metadata = ContentManifestMetadata()
self.metadata.ParseFromString(data.read(length))
magic, length = data.unpack('<II')
if magic != DepotManifest.PROTOBUF_SIGNATURE_MAGIC:
raise Exception("Expecting protobuf signature")
self.signature = ContentManifestSignature()
self.signature.ParseFromString(data.read(length))
magic, = data.unpack('<I')
if magic != DepotManifest.PROTOBUF_ENDOFMANIFEST_MAGIC:
raise Exception("Expecting end of manifest")
def serialize(self):
data = BytesIO()
part = self.payload.SerializeToString()
data.write(pack('<II', DepotManifest.PROTOBUF_PAYLOAD_MAGIC, len(part)))
data.write(part)
part = self.metadata.SerializeToString()
data.write(pack('<II', DepotManifest.PROTOBUF_METADATA_MAGIC, len(part)))
data.write(part)
part = self.signature.SerializeToString()
data.write(pack('<II', DepotManifest.PROTOBUF_SIGNATURE_MAGIC, len(part)))
data.write(part)
data.write(pack('<I', DepotManifest.PROTOBUF_ENDOFMANIFEST_MAGIC))
zbuff = BytesIO()
with ZipFile(zbuff, 'w', ZIP_DEFLATED) as zf:
zf.writestr('z', data.getvalue())
return zbuff.getvalue()
Loading…
Cancel
Save