Browse Source

migrate from PyCrypto to cryptograhy #17

pull/34/head
Rossen Georgiev 9 years ago
parent
commit
c16ed4c773
  1. 2
      requirements.txt
  2. 2
      setup.py
  3. 6
      steam/client/__init__.py
  4. 7
      steam/client/builtins/web.py
  5. 65
      steam/core/crypto.py
  6. 25
      steam/webauth.py
  7. 23
      tests/test_core_crypto.py

2
requirements.txt

@ -5,7 +5,7 @@ gevent==1.1.0
mock==1.3.0 mock==1.3.0
nose==1.3.7 nose==1.3.7
protobuf==2.6.1 protobuf==2.6.1
pycrypto==2.6.1 cryptography==1.3
PyYAML==3.11 PyYAML==3.11
requests==2.9.1 requests==2.9.1
vcrpy==1.7.4 vcrpy==1.7.4

2
setup.py

@ -12,7 +12,7 @@ with open(path.join(here, 'steam/__init__.py'), encoding='utf-8') as f:
__version__ = f.readline().split('"')[1] __version__ = f.readline().split('"')[1]
install_requires = [ install_requires = [
'pycrypto>=2.6.1', 'cryptography>=1.3',
'requests>=2.9.1', 'requests>=2.9.1',
'vdf>=2.0', 'vdf>=2.0',
] ]

6
steam/client/__init__.py

@ -5,7 +5,7 @@ import gevent.monkey
gevent.monkey.patch_socket() gevent.monkey.patch_socket()
gevent.monkey.patch_ssl() gevent.monkey.patch_ssl()
from Crypto.Hash import SHA from steam.core.crypto import sha1_hash
from eventemitter import EventEmitter from eventemitter import EventEmitter
from steam.enums.emsg import EMsg from steam.enums.emsg import EMsg
from steam.enums import EResult, EOSType, EPersonaState from steam.enums import EResult, EOSType, EPersonaState
@ -120,7 +120,7 @@ class SteamClient(CMClient, BuiltinBase):
resp.body.filename = message.body.filename resp.body.filename = message.body.filename
resp.body.eresult = EResult.OK resp.body.eresult = EResult.OK
resp.body.sha_file = SHA.new(message.body.bytes).digest() resp.body.sha_file = sha1_hash(message.body.bytes)
resp.body.getlasterror = 0 resp.body.getlasterror = 0
resp.body.offset = message.body.offset resp.body.offset = message.body.offset
resp.body.cubwrote = message.body.cubtowrite resp.body.cubwrote = message.body.cubtowrite
@ -336,7 +336,7 @@ class SteamClient(CMClient, BuiltinBase):
message.body.eresult_sentryfile = EResult.FileNotFound message.body.eresult_sentryfile = EResult.FileNotFound
else: else:
message.body.eresult_sentryfile = EResult.OK message.body.eresult_sentryfile = EResult.OK
message.body.sha_sentryfile = SHA.new(sentry).digest() message.body.sha_sentryfile = sha1_hash(sentry)
if auth_code: if auth_code:
message.body.auth_code = auth_code message.body.auth_code = auth_code

7
steam/client/builtins/web.py

@ -1,10 +1,9 @@
""" """
Web related features Web related features
""" """
from Crypto.Hash import SHA from binascii import hexlify
from Crypto.Random import new as randombytes
from steam import WebAPI from steam import WebAPI
from steam.core.crypto import generate_session_key, symmetric_encrypt from steam.core.crypto import generate_session_key, symmetric_encrypt, sha1_hash, random_bytes
from steam.util.web import make_requests_session from steam.util.web import make_requests_session
@ -40,7 +39,7 @@ class Web(object):
return None return None
return { return {
'sessionid': SHA.new(randombytes().read(32)).hexdigest(), 'sessionid': hexlify(sha1_hash(random_bytes(32))),
'steamLogin': resp['authenticateuser']['token'], 'steamLogin': resp['authenticateuser']['token'],
'steamLoginSecure': resp['authenticateuser']['tokensecure'], 'steamLoginSecure': resp['authenticateuser']['tokensecure'],
} }

65
steam/core/crypto.py

@ -1,19 +1,26 @@
import sys import sys
from os import urandom as random_bytes
from struct import pack from struct import pack
from base64 import b64decode from base64 import b64decode
from Crypto import Random from cryptography.hazmat.primitives.hmac import HMAC
from Crypto.Cipher import PKCS1_OAEP, AES from cryptography.hazmat.primitives.hashes import Hash, SHA1
from Crypto.PublicKey import RSA from cryptography.hazmat.primitives.asymmetric.padding import PSS, OAEP, MGF1
from Crypto.Hash import HMAC, SHA from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers.algorithms import AES
public_key = """ from cryptography.hazmat.primitives.ciphers.modes import CBC, ECB
import cryptography.hazmat.backends
backend = cryptography.hazmat.backends.default_backend()
class UniverseKey(object):
Public = backend.load_der_public_key(b64decode("""
MIGdMA0GCSqGSIb3DQEBAQUAA4GLADCBhwKBgQDf7BrWLBBmLBc1OhSwfFkRf53T MIGdMA0GCSqGSIb3DQEBAQUAA4GLADCBhwKBgQDf7BrWLBBmLBc1OhSwfFkRf53T
2Ct64+AVzRkeRuh7h3SiGEYxqQMUeYKO6UWiSRKpI2hzic9pobFhRr3Bvr/WARvY 2Ct64+AVzRkeRuh7h3SiGEYxqQMUeYKO6UWiSRKpI2hzic9pobFhRr3Bvr/WARvY
gdTckPv+T1JzZsuVcNfFjrocejN1oWI0Rrtgt4Bo+hOneoo3S57G9F1fOpn5nsQ6 gdTckPv+T1JzZsuVcNfFjrocejN1oWI0Rrtgt4Bo+hOneoo3S57G9F1fOpn5nsQ6
6WOiu4gZKODnFMBCiQIBEQ== 6WOiu4gZKODnFMBCiQIBEQ==
""" """))
BS = AES.block_size BS = 16
pad = lambda s: s + (BS - len(s) % BS) * pack('B', BS - len(s) % BS) pad = lambda s: s + (BS - len(s) % BS) * pack('B', BS - len(s) % BS)
if sys.version_info < (3,): if sys.version_info < (3,):
@ -29,29 +36,35 @@ def generate_session_key(hmac_secret=b''):
:return: (session_key, encrypted_session_key) tuple :return: (session_key, encrypted_session_key) tuple
:rtype: :class:`tuple` :rtype: :class:`tuple`
""" """
session_key = Random.new().read(32) session_key = random_bytes(32)
cipher = PKCS1_OAEP.new(RSA.importKey(b64decode(public_key))) encrypted_session_key = UniverseKey.Public.encrypt(session_key + hmac_secret,
encrypted_session_key = cipher.encrypt(session_key + hmac_secret) OAEP(MGF1(SHA1()), SHA1(), None)
)
return (session_key, encrypted_session_key) return (session_key, encrypted_session_key)
def symmetric_encrypt(message, key): def symmetric_encrypt(message, key):
iv = Random.new().read(BS) iv = random_bytes(BS)
return symmetric_encrypt_with_iv(message, key, iv) return symmetric_encrypt_with_iv(message, key, iv)
def symmetric_encrypt_HMAC(message, key, hmac_secret): def symmetric_encrypt_HMAC(message, key, hmac_secret):
random_bytes = Random.new().read(3) prefix = random_bytes(3)
hmac = HMAC.new(hmac_secret, digestmod=SHA) hmac = HMAC(hmac_secret, SHA1(), backend)
hmac.update(random_bytes) hmac.update(prefix)
hmac.update(message) hmac.update(message)
iv = hmac.digest()[:13] + random_bytes iv = hmac.finalize()[:13] + prefix
return symmetric_encrypt_with_iv(message, key, iv) return symmetric_encrypt_with_iv(message, key, iv)
def symmetric_encrypt_iv(iv, key):
encryptor = Cipher(AES(key), ECB(), backend).encryptor()
return encryptor.update(iv) + encryptor.finalize()
def symmetric_encrypt_with_iv(message, key, iv): def symmetric_encrypt_with_iv(message, key, iv):
encrypted_iv = AES.new(key, AES.MODE_ECB).encrypt(iv) encrypted_iv = symmetric_encrypt_iv(iv, key)
cyphertext = AES.new(key, AES.MODE_CBC, iv).encrypt(pad(message)) encryptor = Cipher(AES(key), CBC(iv), backend).encryptor()
cyphertext = encryptor.update(pad(message)) + encryptor.finalize()
return encrypted_iv + cyphertext return encrypted_iv + cyphertext
def symmetric_decrypt(cyphertext, key): def symmetric_decrypt(cyphertext, key):
@ -63,18 +76,24 @@ def symmetric_decrypt_HMAC(cyphertext, key, hmac_secret):
iv = symmetric_decrypt_iv(cyphertext, key) iv = symmetric_decrypt_iv(cyphertext, key)
message = symmetric_decrypt_with_iv(cyphertext, key, iv) message = symmetric_decrypt_with_iv(cyphertext, key, iv)
hmac = HMAC.new(hmac_secret, digestmod=SHA) hmac = HMAC(hmac_secret, SHA1(), backend)
hmac.update(iv[-3:]) hmac.update(iv[-3:])
hmac.update(message) hmac.update(message)
if iv[:13] != hmac.digest()[:13]: if iv[:13] != hmac.finalize()[:13]:
raise RuntimeError("Unable to decrypt message. HMAC does not match.") raise RuntimeError("Unable to decrypt message. HMAC does not match.")
return message return message
def symmetric_decrypt_iv(cyphertext, key): def symmetric_decrypt_iv(cyphertext, key):
return AES.new(key, AES.MODE_ECB).decrypt(cyphertext[:BS]) decryptor = Cipher(AES(key), ECB(), backend).decryptor()
return decryptor.update(cyphertext[:BS]) + decryptor.finalize()
def symmetric_decrypt_with_iv(cyphertext, key, iv): def symmetric_decrypt_with_iv(cyphertext, key, iv):
message = AES.new(key, AES.MODE_CBC, iv).decrypt(cyphertext[BS:]) decryptor = Cipher(AES(key), CBC(iv), backend).decryptor()
return unpad(message) return unpad(decryptor.update(cyphertext[BS:]) + decryptor.finalize())
def sha1_hash(data):
sha = Hash(SHA1(), backend)
sha.update(data)
return sha.finalize()

25
steam/webauth.py

@ -41,8 +41,11 @@ Alternatively, if Steam Guard is not enabled on the account:
import time import time
import sys import sys
from base64 import b64encode from base64 import b64encode
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from steam.core.crypto import backend
from steam.util.web import make_requests_session from steam.util.web import make_requests_session
from steam import SteamID from steam import SteamID
@ -53,7 +56,7 @@ else:
class WebAuth(object): class WebAuth(object):
cipher = None key = None
complete = False #: whether authentication has been completed successfully complete = False #: whether authentication has been completed successfully
session = None #: :class:`requests.Session` (with auth cookies after auth is complete) session = None #: :class:`requests.Session` (with auth cookies after auth is complete)
captcha_gid = -1 captcha_gid = -1
@ -93,15 +96,15 @@ class WebAuth(object):
return resp return resp
def _make_cipher(self): def _load_key(self):
if not self.cipher: if not self.key:
resp = self.get_rsa_key(self.username) resp = self.get_rsa_key(self.username)
rsa = RSA.construct((intBase(resp['publickey_mod'], 16), nums = RSAPublicNumbers(intBase(resp['publickey_exp'], 16),
intBase(resp['publickey_exp'], 16), intBase(resp['publickey_mod'], 16),
)) )
self.cipher = PKCS1_v1_5.new(rsa) self.key = backend.load_rsa_public_numbers(nums)
self.timestamp = resp['timestamp'] self.timestamp = resp['timestamp']
def login(self, captcha='', email_code='', twofactor_code='', language='english'): def login(self, captcha='', email_code='', twofactor_code='', language='english'):
@ -126,11 +129,11 @@ class WebAuth(object):
if self.complete: if self.complete:
return self.session return self.session
self._make_cipher() self._load_key()
params = { params = {
'username' : self.username, 'username' : self.username,
"password": b64encode(self.cipher.encrypt(self.password)), "password": b64encode(self.key.encrypt(self.password, PKCS1v15())),
"emailauth": email_code, "emailauth": email_code,
"emailsteamid": str(self.steamid) if email_code else '', "emailsteamid": str(self.steamid) if email_code else '',
"twofactorcode": twofactor_code, "twofactorcode": twofactor_code,

23
tests/test_core_crypto.py

@ -7,18 +7,15 @@ from steam.core import crypto
class crypto_testcase(unittest.TestCase): class crypto_testcase(unittest.TestCase):
def setUp(self): def setUp(self):
class NotRandom: patcher = mock.patch('os.urandom')
def read(self, n): self.addCleanup(patcher.stop)
return b'1' * n self.urandom = patcher.start()
self.urandom.side_effect = lambda n: b'1' * n
def fakeNew(): patcher = mock.patch('steam.core.crypto.random_bytes')
return NotRandom() self.addCleanup(patcher.stop)
self.random_bytes = patcher.start()
self._oldnew = crypto.Random.new self.random_bytes.side_effect = lambda n: b'1' * n
crypto.Random.new = fakeNew
def tearDown(self):
crypto.Random.new = self._oldnew
def test_keygen(self): def test_keygen(self):
expected_key = b'1' * 32 expected_key = b'1' * 32
@ -70,4 +67,6 @@ class crypto_testcase(unittest.TestCase):
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
crypto.symmetric_decrypt_HMAC(cyphertext, key, b'4'*16) crypto.symmetric_decrypt_HMAC(cyphertext, key, b'4'*16)
def test_sha1_hash(self):
self.assertEqual(crypto.sha1_hash(b'123'), b'@\xbd\x00\x15c\x08_\xc3Qe2\x9e\xa1\xff\\^\xcb\xdb\xbe\xef')
self.assertEqual(crypto.sha1_hash(b'999999'), b'\x1fU#\xa8\xf55(\x9b4\x01\xb2\x99X\xd0\x1b)f\xeda\xd2')

Loading…
Cancel
Save