Browse Source

Merge 3396d86aed into 2e4a3536f1

pull/34/merge
Philipp 8 years ago
committed by GitHub
parent
commit
40505a1224
  1. 86
      steam/authenticator.py
  2. 173
      steam/mobileauth.py

86
steam/authenticator.py

@ -0,0 +1,86 @@
import json
from guard import *
from mobileauth import MobileAuth
class MobileAuthenticator:
def __init__(self, username, password, authenticatorCredentials=False):
self.username = username
self.password = password
self.ready = None
self.mobile = MobileAuth(username, password)
self.credentials = authenticatorCredentials or { }
def login(self):
if self.ready != None:
return False
if 'secret' in self.credentials.keys():
code = generate_twofactor_code(self.credentials.get('secret'))
self.mobile.login(twofactor_code=code)
return True
else:
try:
self.mobile.login()
except EmailCodeRequired:
raise AuthenticatorAlreadyActive('Two factor authentication already active')
except TwoFactorCodeRequired
raise AuthenticatorAlreadyActive('Two factor authentication already active')
else:
self.ready = False
return True
def addAuthenticator(self):
if self.ready != False:
return None
data = {
'steamid': self.mobile.steamid,
'sms_phone_id': 1,
'access_token': self.mobile.oauth['oauth_token'],
'authenticator_time': get_time_offset(),
'authenticator_type': 1,
'device_identifier': generate_device_id(self.mobile.steamid)
}
response = self.mobile.request('https://api.steampowered.com/ITwoFactorService/AddAuthenticator/v1/', data)
if response.status_code == 200:
responseData = json.loads(response.text)
self.credentials = responseData['response']
self.credentials['secret'] = self.credentials['uri'].split('?secret=')[1].split('&issuer')[0]
return True
else:
return [False, responseData]
def finalizeAuthenticator(self, smsCode=None, tries=1):
if not smsCode or self.ready != False:
return None
timestamp = get_time_offset()
data = {
'steamid': self.mobile.steamid,
'access_token': self.mobile.oauth['oauth_token'],
'authenticator_time': timestamp,
'authenticator_code': generate_twofactor_code_for_time(self.credentials['secret'], timestamp),
'activation_code': smsCode
}
response = self.mobile.request('https://api.steampowered.com/ITwoFactorService/FinalizeAddAuthenticator/v1/', data)
if response.status_code == 200:
responseData = json.loads(response.text)
if responseData['success']:
return True
else:
if responseData['want_more'] and tries < 30:
return self.finalizeAuthenticator(smsCode, tries)
else:
return False
else:
return False
class MobileAuthenticatorException(Exception):
pass
class AuthenticatorAlreadyActive(MobileAuthenticatorException)
pass

173
steam/mobileauth.py

@ -0,0 +1,173 @@
# -*- coding: utf-8 -*-
from time import time
import sys
from base64 import b64encode
import requests
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from steam.core.crypto import backend
from steam.util.web import make_requests_session
from steam import SteamID
if sys.version_info < (3,):
intBase = long
else:
intBase = int
class MobileAuth(object):
key = None
complete = False #: whether authentication has been completed successfully
session = None #: :class:`requests.Session` (with auth cookies after auth is complete)
captcha_gid = -1
steamid = None #: :class:`steam.steamid.SteamID` (after auth is complete)
oauth = {}
def __init__(self, username, password):
self.__dict__.update(locals())
self.session = make_requests_session()
@property
def captcha_url(self):
if self.captcha_gid == -1:
return None
else:
return "https://store.steampowered.com/login/rendercaptcha/?gid=%s" % self.captcha_gid
def get_rsa_key(self, username):
try:
resp = self.session.post('https://steamcommunity.com/mobilelogin/getrsakey/',
timeout=15,
data={
'username': username,
'donotchache': int(time() * 1000),
},
).json()
except requests.exceptions.RequestException as e:
raise HTTPError(str(e))
return resp
def _load_key(self):
if not self.key:
resp = self.get_rsa_key(self.username)
nums = RSAPublicNumbers(intBase(resp['publickey_exp'], 16),
intBase(resp['publickey_mod'], 16),
)
self.key = backend.load_rsa_public_numbers(nums)
self.timestamp = resp['timestamp']
def request(self, uri, data):
if not self.complete:
return None
headers = {
'X-Requested-With': 'com.valvesoftware.android.steam.community',
'User-agent': 'Mozilla/5.0 (Linux; U; Android 4.1.1; en-us; Google Nexus 4 - 4.1.1 - API 16 - 768x1280 Build/JRO03S) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30'
}
try:
response = self.session.post(uri, data=data, headers=headers)
except requests.exceptions.RequestException as e:
raise HTTPError(str(e))
else:
return response
def refreshSession(self, oauth_token=None):
oauth_token = oauth_token or self.oauth['oauth_token']
response = self.request('https://api.steampowered.com/IMobileAuthService/GetWGToken/v0001', {'access_token': oauth_token})
try:
data = json.loads(response)
except Exception, e:
raise RefreshSessionFailed(str(e))
else:
self.oauth['wgtoken'] = data['response']['token']
self.oauth['wgtoken_secure'] = data['response']['token_secure']
self.session.cookies.set('steamLogin', '%s||%s' % (self.steamid, sself.oauth['wgtoken']), domain=domain, secure=False)
self.session.cookies.set('steamLoginSecure', '%s||%s' % (self.steamid, self.oauth['wgtoken_secure']), domain=domain, secure=True)
def login(self, captcha='', email_code='', twofactor_code='', language='english'):
if self.complete:
return self.session
for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']:
self.session.cookies.set('forceMobile', '1', domain=domain, secure=False)
self.session.cookies.set('mobileClientVersion', '0 (2.1.3)', domain=domain, secure=False)
self.session.cookies.set('mobileClient', 'android', domain=domain, secure=False)
self.session.cookies.set('Steam_Language', 'english', domain=domain, secure=False)
self.session.cookies.set('dob', '', domain=domain, secure=False)
self._load_key()
data = {
'username' : self.username,
"password": b64encode(self.key.encrypt(self.password.encode('ascii'), PKCS1v15())),
"emailauth": email_code,
"emailsteamid": str(self.steamid) if email_code else '',
"twofactorcode": twofactor_code,
"captchagid": self.captcha_gid,
"captcha_text": captcha,
"loginfriendlyname": "python-steam webauth",
"rsatimestamp": self.timestamp,
"remember_login": 'true',
"donotcache": int(time() * 100000),
}
data['oauth_client_id'] = 'DE45CD61'
data['oauth_scope'] = 'read_profile write_profile read_client write_client'
data['loginfriendlyname'] = '#login_emailauth_friendlyname_mobile'
try:
resp = self.session.post('https://steamcommunity.com/mobilelogin/dologin/', data=data, timeout=15).json()
except requests.exceptions.RequestException as e:
raise HTTPError(str(e))
self.captcha_gid = -1
if resp['success'] and resp['login_complete']:
self.complete = True
self.password = None
self.steamid = SteamID(resp['oauth']['steamid'])
self.oauth = resp['oauth']
for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']:
self.session.cookies.set('steamLogin', '%s||%s' % (self.steamid, resp['oauth']['wgtoken']), domain=domain, secure=False)
self.session.cookies.set('steamLoginSecure', '%s||%s' % (self.steamid, data['oauth']['wgtoken_secure']), domain=domain, secure=True)
return resp
else:
if resp.get('captcha_needed', False):
self.captcha_gid = resp['captcha_gid']
raise CaptchaRequired(resp['message'])
elif resp.get('emailauth_needed', False):
self.steamid = SteamID(resp['emailsteamid'])
raise EmailCodeRequired(resp['message'])
elif resp.get('requires_twofactor', False):
raise TwoFactorCodeRequired(resp['message'])
else:
raise LoginIncorrect(resp['message'])
return None
class MobileWebAuthException(Exception):
pass
class HTTPError(MobileWebAuthException):
pass
class LoginIncorrect(MobileWebAuthException):
pass
class CaptchaRequired(MobileWebAuthException):
pass
class EmailCodeRequired(MobileWebAuthException):
pass
class TwoFactorCodeRequired(MobileWebAuthException):
pass
class RefreshSessionFailed(MobileWebAuthException):
pass
Loading…
Cancel
Save