Browse Source

Merge c16b85c842 into dca87e07cc

pull/288/merge
Tom Prince 5 years ago
committed by GitHub
parent
commit
dac6368234
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      requirements.txt
  2. 2
      setup.py
  3. 79
      steam/core/crypto.py

2
requirements.txt

@ -1,5 +1,5 @@
six>=1.10.0
pycryptodomex>=3.7.0
cryptography>=3.1
requests>=2.9.1
vdf>=3.3
gevent>=1.3.0

2
setup.py

@ -16,7 +16,7 @@ with open(path.join(here, 'steam/__init__.py'), encoding='utf-8') as f:
install_requires = [
'six>=1.10',
'pycryptodomex>=3.7.0',
'cryptography>=3.1',
'requests>=2.9.1',
'vdf>=3.3',
'cachetools>=3.0.0',

79
steam/core/crypto.py

@ -1,21 +1,24 @@
"""
All function in this module take and return :class:`bytes`
"""
import hashlib
import sys
from base64 import b64decode
from os import urandom as random_bytes
from struct import pack
from base64 import b64decode
from Cryptodome.Hash import SHA1, HMAC
from Cryptodome.PublicKey.RSA import import_key as rsa_import_key, construct as rsa_construct
from Cryptodome.Cipher import PKCS1_OAEP, PKCS1_v1_5
from Cryptodome.Cipher import AES as AES
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.primitives.serialization import load_der_public_key
class UniverseKey(object):
"""Public keys for Universes"""
Public = rsa_import_key(b64decode("""
Public = load_der_public_key(b64decode("""
MIGdMA0GCSqGSIb3DQEBAQUAA4GLADCBhwKBgQDf7BrWLBBmLBc1OhSwfFkRf53T
2Ct64+AVzRkeRuh7h3SiGEYxqQMUeYKO6UWiSRKpI2hzic9pobFhRr3Bvr/WARvY
gdTckPv+T1JzZsuVcNfFjrocejN1oWI0Rrtgt4Bo+hOneoo3S57G9F1fOpn5nsQ6
@ -39,8 +42,14 @@ def generate_session_key(hmac_secret=b''):
:rtype: :class:`tuple`
"""
session_key = random_bytes(32)
encrypted_session_key = PKCS1_OAEP.new(UniverseKey.Public, SHA1)\
.encrypt(session_key + hmac_secret)
encrypted_session_key = UniverseKey.Public.encrypt(
session_key + hmac_secret,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA256(),
label=None
)
)
return (session_key, encrypted_session_key)
@ -49,7 +58,13 @@ def symmetric_encrypt(message, key):
return symmetric_encrypt_with_iv(message, key, iv)
def symmetric_encrypt_ecb(message, key):
return AES.new(key, AES.MODE_ECB).encrypt(pad(message))
padder = PKCS7(algorithms.AES.block_size).padder()
plaintext = padder.update(message)
plaintext += padder.finalize()
encryptor = Cipher(algorithms.AES(key), modes.ECB()).encryptor()
cyphertext = encryptor.update(plaintext)
cyphertext += encryptor.finalize()
return cyphertext
def symmetric_encrypt_HMAC(message, key, hmac_secret):
prefix = random_bytes(3)
@ -58,11 +73,19 @@ def symmetric_encrypt_HMAC(message, key, hmac_secret):
return symmetric_encrypt_with_iv(message, key, iv)
def symmetric_encrypt_iv(iv, key):
return AES.new(key, AES.MODE_ECB).encrypt(iv)
encryptor = Cipher(algorithms.AES(key), modes.ECB()).encryptor()
cyphertext = encryptor.update(iv)
cyphertext += encryptor.finalize()
return cyphertext
def symmetric_encrypt_with_iv(message, key, iv):
encrypted_iv = symmetric_encrypt_iv(iv, key)
cyphertext = AES.new(key, AES.MODE_CBC, iv).encrypt(pad(message))
padder = PKCS7(algorithms.AES.block_size).padder()
plaintext = padder.update(message)
plaintext += padder.finalize()
encryptor = Cipher(algorithms.AES(key), modes.CBC(iv)).encryptor()
cyphertext = encryptor.update(plaintext)
cyphertext += encryptor.finalize()
return encrypted_iv + cyphertext
def symmetric_decrypt(cyphertext, key):
@ -70,7 +93,13 @@ def symmetric_decrypt(cyphertext, key):
return symmetric_decrypt_with_iv(cyphertext, key, iv)
def symmetric_decrypt_ecb(cyphertext, key):
return unpad(AES.new(key, AES.MODE_ECB).decrypt(cyphertext))
decryptor = Cipher(algorithms.AES(key), modes.ECB()).decryptor()
plaintext = decryptor.update(cyphertext)
plaintext += decryptor.finalize()
unpadder = PKCS7(algorithms.AES.block_size).unpadder()
message = unpadder.update(plaintext)
message += unpadder.finalize()
return message
def symmetric_decrypt_HMAC(cyphertext, key, hmac_secret):
""":raises: :class:`RuntimeError` when HMAC verification fails"""
@ -85,19 +114,33 @@ def symmetric_decrypt_HMAC(cyphertext, key, hmac_secret):
return message
def symmetric_decrypt_iv(cyphertext, key):
return AES.new(key, AES.MODE_ECB).decrypt(cyphertext[:BS])
decryptor = Cipher(algorithms.AES(key), modes.ECB()).decryptor()
iv = decryptor.update(cyphertext[:BS])
iv += decryptor.finalize()
return iv
def symmetric_decrypt_with_iv(cyphertext, key, iv):
return unpad(AES.new(key, AES.MODE_CBC, iv).decrypt(cyphertext[BS:]))
decryptor = Cipher(algorithms.AES(key), modes.CBC(iv)).decryptor()
plaintext = decryptor.update(cyphertext[BS:])
plaintext += decryptor.finalize()
unpadder = PKCS7(algorithms.AES.block_size).unpadder()
message = unpadder.update(plaintext)
message += unpadder.finalize()
return message
def hmac_sha1(secret, data):
return HMAC.new(secret, data, SHA1).digest()
h = HMAC(secret, hashes.SHA1())
h.update(data)
return h.finalize()
def sha1_hash(data):
return SHA1.new(data).digest()
return hashlib.sha1(data).digest()
def rsa_publickey(mod, exp):
return rsa_construct((mod, exp))
return rsa.RSAPublicNumbers(e=exp, n=mod).public_key()
def pkcs1v15_encrypt(key, message):
return PKCS1_v1_5.new(key).encrypt(message)
key.encrypt(
message,
padding.PKCS1v15,
)

Loading…
Cancel
Save