Browse Source

add support for password proected branches

pull/191/head
Rossen Georgiev 6 years ago
parent
commit
f3a21174a9
  1. 50
      steam/client/cdn.py
  2. 6
      steam/core/crypto.py
  3. 9
      tests/test_core_crypto.py

50
steam/client/cdn.py

@ -3,7 +3,7 @@ from zipfile import ZipFile
from io import BytesIO
from collections import OrderedDict, deque
from six import itervalues, iteritems
from binascii import crc32
from binascii import crc32, unhexlify
from datetime import datetime
import logging
import struct
@ -12,9 +12,11 @@ import vdf
from gevent.pool import Pool as GPool
from cachetools import LRUCache
from steam import webapi
from steam.core.msg import MsgProto
from steam.enums import EResult, EServerType, EType
from steam.enums.emsg import EMsg
from steam.util.web import make_requests_session
from steam.core.crypto import symmetric_decrypt
from steam.core.crypto import symmetric_decrypt, symmetric_decrypt_ecb
from steam.core.manifest import DepotManifest, DepotFile
from steam.protobufs.content_manifest_pb2 import ContentManifestPayload
@ -23,6 +25,8 @@ try:
except ImportError:
from backports import lzma
def decrypt_manifest_gid_2(encrypted_gid, password):
return struct.unpack('<Q', symmetric_decrypt_ecb(encrypted_gid, password))[0]
def get_content_servers_from_cs(cell_id, host='cs.steamcontent.com', port=80, num_servers=20, session=None):
proto = 'https' if port == 443 else 'http'
@ -113,6 +117,7 @@ class CDNClient(object):
self.depot_keys = {}
self.manifests = {}
self.app_depots = {}
self.beta_passwords = {}
self.licensed_app_ids = set()
self.licensed_depot_ids = set()
@ -230,15 +235,32 @@ class CDNClient(object):
return self.manifests[(app_id, depot_id, manifest_id)]
def get_manifests(self, app_id, branch='public', filter_func=None):
def check_beta_password(self, app_id, password):
resp = self.steam.send_job_and_wait(MsgProto(EMsg.ClientCheckAppBetaPassword),
{'app_id': app_id, 'betapassword': password})
if resp.eresult != EResult.OK:
raise ValueError("Failed password check. %r" % EResult(resp.eresult))
for entry in resp.betapasswords:
self.beta_passwords[(app_id, entry.betaname.lower())] = unhexlify(entry.betapassword)
def get_manifests(self, app_id, branch='public', password=None, filter_func=None):
if app_id not in self.app_depots:
self.app_depots[app_id] = self.steam.get_product_info([app_id])['apps'][app_id]['depots']
depots = self.app_depots[app_id]
is_enc_branch = False
if branch not in depots['branches']:
raise ValueError("No branch named %s for app_id %s" % (repr(branch), app_id))
elif int(depots['branches'][branch].get('pwdrequired', 0)) > 0:
raise NotImplementedError("Password protected branches are not supported yet")
is_enc_branch = True
if (app_id, branch) not in self.beta_passwords:
if not password:
raise ValueError("Branch %r requires a password" % branch)
self.check_beta_password(app_id, password)
def async_fetch_manifest(app_id, depot_id, manifest_id, name):
manifest = self.get_manifest(app_id, depot_id, manifest_id)
@ -274,12 +296,24 @@ class CDNClient(object):
))
continue
# process depot, and get manifest for branch
if branch in depot_info.get('manifests', {}):
if is_enc_branch:
egid = depot_info.get('encryptedmanifests', {}).get(branch, {}).get('encrypted_gid_2')
if egid is not None:
manifest_gid = decrypt_manifest_gid_2(unhexlify(egid),
self.beta_passwords[(app_id, branch)])
else:
manifest_gid = depot_info.get('manifests', {}).get('public')
else:
manifest_gid = depot_info.get('manifests', {}).get(branch)
if manifest_gid is not None:
tasks.append(gpool.spawn(async_fetch_manifest,
app_id,
depot_id,
depot_info['manifests'][branch],
manifest_gid,
depot_info['name'],
))
@ -302,8 +336,8 @@ class CDNClient(object):
return manifests
def iter_files(self, app_id, filename_filter=None, branch='public', filter_func=None):
for manifest in self.get_manifests(app_id, branch, filter_func):
def iter_files(self, app_id, filename_filter=None, branch='public', password=None, filter_func=None):
for manifest in self.get_manifests(app_id, branch, password, filter_func):
for fp in manifest.iter_files(filename_filter):
yield fp

6
steam/core/crypto.py

@ -48,6 +48,9 @@ def symmetric_encrypt(message, key):
iv = random_bytes(BS)
return symmetric_encrypt_with_iv(message, key, iv)
def symmetric_encrypt_ecb(message, key):
return AES.new(key, AES.MODE_ECB).encrypt(pad(message))
def symmetric_encrypt_HMAC(message, key, hmac_secret):
prefix = random_bytes(3)
hmac = hmac_sha1(hmac_secret, prefix + message)
@ -66,6 +69,9 @@ def symmetric_decrypt(cyphertext, key):
iv = symmetric_decrypt_iv(cyphertext, key)
return symmetric_decrypt_with_iv(cyphertext, key, iv)
def symmetric_decrypt_ecb(cyphertext, key):
return unpad(AES.new(key, AES.MODE_ECB).decrypt(cyphertext))
def symmetric_decrypt_HMAC(cyphertext, key, hmac_secret):
""":raises: :class:`RuntimeError` when HMAC verification fails"""
iv = symmetric_decrypt_iv(cyphertext, key)

9
tests/test_core_crypto.py

@ -54,6 +54,15 @@ class crypto_testcase(unittest.TestCase):
self.assertEqual(message, dmessage)
def test_encryption_ecb(self):
message = b'My secret message'
key = b'9' * 32
cyphertext = crypto.symmetric_encrypt_ecb(message, key)
dmessage = crypto.symmetric_decrypt_ecb(cyphertext, key)
self.assertEqual(message, dmessage)
def test_encryption_hmac(self):
message = b'My secret message'
key = b'9' * 32

Loading…
Cancel
Save