Browse Source

Cleanup

pull/35/head
Philipp Joos 9 years ago
parent
commit
253dcdd4cb
  1. 1
      requirements.txt
  2. 174
      steam/authenticator.py
  3. 4
      steam/guard.py
  4. 220
      steam/mobile.py
  5. 116
      steam/mobileauth.py
  6. 23
      steam/webauth.py

1
requirements.txt

@ -1,4 +1,5 @@
coverage==4.0.3 coverage==4.0.3
setuptools>=11.3
gevent-eventemitter==1.4 gevent-eventemitter==1.4
enum34==1.1.2; python_version < '3.4' enum34==1.1.2; python_version < '3.4'
gevent==1.1.0 gevent==1.1.0

174
steam/authenticator.py

@ -1,86 +1,98 @@
"""
This module provides methods for managing the mobile authenticator
.. warning::
Save your credentials!
Example usage:
.. code:: python
import steam.authenticator
ma = steam.authenticator.MobileAuthenticator('username', 'password')
credentials = ma.add_authenticator()
sms_code = raw_input('SMS Code: ')
ma.finalize_authenticator(sms_code)
print credentials
"""
import json import json
from guard import * from guard import *
from mobileauth import MobileAuth from mobile import SteamMobile
class MobileAuthenticator:
def __init__(self, username, password, authenticatorCredentials=False): class MobileAuthenticator(object):
self.username = username mobile = None
self.password = password
self.ready = None def __init__(self, username, password, login_mode='normal', email_code='', twofactor_code=''):
self.mobile = MobileAuth(username, password) self.mobile = SteamMobile(username, password, login_mode, email_code, twofactor_code)
self.credentials = authenticatorCredentials or { }
def add_authenticator(self):
def login(self): """Start process of linking a mobile authenticator to the logged in steam account
if self.ready != None:
return False :return: account credentials or False
:rtype: :class:`tuple`, :class:`bool`
if 'secret' in self.credentials.keys(): """
code = generate_twofactor_code(self.credentials.get('secret'))
self.mobile.login(twofactor_code=code) data = {
return True 'steamid': self.mobile.steamid,
else: 'sms_phone_id': 1,
try: 'access_token': self.mobile.oauth['oauth_token'],
self.mobile.login() 'authenticator_time': get_time_offset(),
except EmailCodeRequired: 'authenticator_type': 1,
raise AuthenticatorAlreadyActive('Two factor authentication already active') 'device_identifier': generate_device_id(self.mobile.steamid)
except TwoFactorCodeRequired }
raise AuthenticatorAlreadyActive('Two factor authentication already active')
else: [status, body] = self.mobile.api_request('ITwoFactorService', 'AddAuthenticator', data,
self.ready = False return_including_status_code=True)
return True if status == 200:
responseData = json.loads(body)
def addAuthenticator(self): self.credentials = responseData['response']
if self.ready != False: self.credentials['secret'] = self.credentials['uri'].split('?secret=')[1].split('&issuer')[0]
return None return responseData
data = {
'steamid': self.mobile.steamid,
'sms_phone_id': 1,
'access_token': self.mobile.oauth['oauth_token'],
'authenticator_time': get_time_offset(),
'authenticator_type': 1,
'device_identifier': generate_device_id(self.mobile.steamid)
}
response = self.mobile.request('https://api.steampowered.com/ITwoFactorService/AddAuthenticator/v1/', data)
if response.status_code == 200:
responseData = json.loads(response.text)
self.credentials = responseData['response']
self.credentials['secret'] = self.credentials['uri'].split('?secret=')[1].split('&issuer')[0]
return True
else:
return [False, responseData]
def finalizeAuthenticator(self, smsCode=None, tries=1):
if not smsCode or self.ready != False:
return None
timestamp = get_time_offset()
data = {
'steamid': self.mobile.steamid,
'access_token': self.mobile.oauth['oauth_token'],
'authenticator_time': timestamp,
'authenticator_code': generate_twofactor_code_for_time(self.credentials['secret'], timestamp),
'activation_code': smsCode
}
response = self.mobile.request('https://api.steampowered.com/ITwoFactorService/FinalizeAddAuthenticator/v1/', data)
if response.status_code == 200:
responseData = json.loads(response.text)
if responseData['success']:
return True
else:
if responseData['want_more'] and tries < 30:
return self.finalizeAuthenticator(smsCode, tries)
else: else:
return False return False
else:
return False def finalize_authenticator(self, sms_code=None, tries=1):
"""Start process of linking a mobile authenticator to the logged in steam account
class MobileAuthenticatorException(Exception):
pass :param sms_code: text reponse recieved by sms
:type sms_code: :class:`str`
:return: :class:`None` it no sms code is supplied, `True` or `False`
:rtype: :class:`None`, :class:`bool`
"""
class AuthenticatorAlreadyActive(MobileAuthenticatorException) if not sms_code:
pass return None
timestamp = get_time_offset()
data = {
'steamid': self.mobile.steamid,
'access_token': self.mobile.oauth['oauth_token'],
'authenticator_time': timestamp,
'authenticator_code': generate_twofactor_code_for_time(self.credentials['shared_secret'], timestamp),
'activation_code': sms_code
}
[status, body] = self.mobile.api_request('ITwoFactorService', 'FinalizeAddAuthenticator', data,
return_including_status_code=True)
if status == 200:
responseData = json.loads(body)['response']
if responseData['success']:
return True
else:
if responseData['want_more'] and tries < 30:
return self.finalizeAuthenticator(sms_code, tries)
else:
return False
else:
return False
class MobileAuthenticatorException(Exception):
pass

4
steam/guard.py

@ -1,9 +1,11 @@
import struct import struct
import hashlib
from binascii import hexlify from binascii import hexlify
from time import time from time import time
from steam import webapi from steam import webapi
from steam.core.crypto import hmac_sha1, sha1_hash from steam.core.crypto import hmac_sha1, sha1_hash
def generate_twofactor_code(shared_secret): def generate_twofactor_code(shared_secret):
"""Generate Steam 2FA code for login with current time """Generate Steam 2FA code for login with current time
@ -84,5 +86,5 @@ def generate_device_id(steamid):
:return: android device id :return: android device id
:rtype: str :rtype: str
""" """
h = hexlify(sha1(str(steamid).encode('ascii'))).decode('ascii') h = hexlify(str(hashlib.sha1(str(steamid).encode('ascii')))).decode('ascii')
return "android:%s-%s-%s-%s-%s" % (h[:8], h[8:12], h[12:16], h[16:20], h[20:32]) return "android:%s-%s-%s-%s-%s" % (h[:8], h[8:12], h[12:16], h[16:20], h[20:32])

220
steam/mobile.py

@ -0,0 +1,220 @@
# -*- coding: utf-8 -*-
"""
This module provides utility functions to access steam mobile pages
"""
import json
import requests
import mobileauth
API_ENDPOINTS = {
"IMobileAuthService": {
"methods": {
"GetWGToken": {
"version": 1
}
}
},
"ISteamWebUserPresenceOAuth": {
"methods": {
"Logon": {
"version": 1
},
"Logoff": {
"version": 1
},
"Message": {
"version": 1
},
"DeviceInfo": {
"version": 1
},
"Poll": {
"version": 1
}
}
},
"ISteamUserOAuth": {
"methods": {
"GetUserSummaries": {
"version": 1
},
"GetGroupSummaries": {
"version": 1
},
"GetGroupList": {
"version": 1
},
"GetFriendList": {
"version": 1
},
"Search": {
"version": 1
}
}
},
"ISteamGameOAuth": {
"methods": {
"GetAppInfo": {
"version": 1
}
}
},
"IMobileNotificationService": {
"methods": {
"SwitchSessionToPush": {
"version": 1
}
}
},
"IFriendMessagesService": {
"methods": {
"GetActiveMessageSessions": {
"version": 1
},
"GetRecentMessages": {
"version": 1
},
"MarkOfflineMessagesRead": {
"version": 1
}
}
},
"ITwoFactorService": {
"methods": {
"AddAuthenticator": {
"version": 1
},
"RecoverAuthenticatorCommit": {
"version": 1
},
"RecoverAuthenticatorContinue": {
"version": 1
},
"RemoveAuthenticator": {
"version": 1
},
"RemoveAuthenticatorViaChallengeStart": {
"version": 1
},
"RemoveAuthenticatorViaChallengeContinue": {
"version": 1
},
"FinalizeAddAuthenticator": {
"version": 1
},
"QueryStatus": {
"version": 1
},
"QueryTime": {
"version": 1
},
"QuerySecrets": {
"version": 1
},
"SendEmail": {
"version": 1
},
"ValidateToken": {
"version": 1
},
"CreateEmergencyCodes": {
"version": 1
},
"DestroyEmergencyCodes": {
"version": 1
}
}
}
}
API_BASE_URL = 'https://api.steampowered.com'
class SteamMobile(object):
session = None
oauth = None
steamid = None
def __init__(self, username, password, login_mode='normal', email_code='', twofactor_code=''):
mobile_auth = mobileauth.MobileAuth(username, password)
try:
if login_mode == 'normal':
mobile_auth.login()
elif login_mode == 'email':
mobile_auth.login(email_code=email_code)
elif login_mode == 'twofa':
mobile_auth.login(twofactor_code=twofactor_code)
except mobileauth.CaptchaRequired:
raise CaptchaNotSupported("Captcha's are currently not supported. Please wait a few minutes before you try to login again.")
except mobileauth.EmailCodeRequired:
mobile_auth.login(email_code=email_code)
except mobileauth.TwoFactorCodeRequired:
mobile_auth.login(twofactor_code=twofactor_code)
self.session = mobile_auth.session
self.steamid = mobile_auth.steamid
self.oauth = mobile_auth.oauth
def refresh_session(self, oauth_token=None):
oauth_token = oauth_token or self.oauth['oauth_token']
response = self.api_request('IMobileAuthService', 'GetWGToken', {'access_token': oauth_token})
try:
data = json.loads(response)
except Exception, e:
raise SessionRefreshFailed(str(e))
else:
self.oauth['wgtoken'] = data['response']['token']
self.oauth['wgtoken_secure'] = data['response']['token_secure']
for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']:
self.session.cookies.set('steamLogin', '%s||%s' % (self.steamid, self.oauth['wgtoken']), domain=domain,
secure=False)
self.session.cookies.set('steamLoginSecure', '%s||%s' % (self.steamid, self.oauth['wgtoken_secure']),
domain=domain, secure=True)
def _request(self, uri, data={}, return_including_status_code=False):
headers = {
'X-Requested-With': 'com.valvesoftware.android.steam.community',
'User-agent': 'Mozilla/5.0 (Linux; U; Android 4.1.1; en-us; Google Nexus 4 - 4.1.1 - API 16 - 768x1280 Build/JRO03S)\
AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30'
}
try:
response = self.session.post(uri, data=data, headers=headers)
except requests.exceptions.RequestException as e:
raise mobileauth.HTTPError(str(e))
else:
if return_including_status_code:
return [response.status_code, response.text]
else:
return response.text
def api_request(self, interface, method, data={}, return_including_status_code=False):
if interface in API_ENDPOINTS.keys() and method in API_ENDPOINTS.get(interface).get('methods').keys():
uri = '%s/%s/%s/v%s/' % (
API_BASE_URL, interface, method, API_ENDPOINTS.get(interface).get('methods').get(method).get('version'))
response = self._request(uri, data, return_including_status_code)
return response
else:
raise APIEndpointNotFound('Endpoint %s.%s not found' % (interface, method))
class SteamMobileException(Exception):
pass
class CaptchaNotSupported(SteamMobileException):
pass
class SessionRefreshFailed(SteamMobileException):
pass
class APIEndpointNotFound(SteamMobileException):
pass

116
steam/mobileauth.py

@ -1,4 +1,54 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
This module simplifies the process of obtaining an authenticated session for mobile steam websites.
After authentication is complete, a :class:`requests.Session` is created containing the auth and mobile specific cookies.
The session can be used to access ``steamcommunity.com``, ``store.steampowered.com``, and ``help.steampowered.com``.
The main purpose of a mobile session, is to access the mobile trade confirmations page and to easily access the
ITwoFactorService api, but can also used to access all 'normal' steam pages.
.. warning::
A web session may expire randomly, or when you login from different IP address.
Some pages will return status code `401` when that happens.
Keep in mind if you are trying to write robust code.
Example usage:
.. code:: python
import steam.mobileauth as ma
user = ma.WebAuth('username', 'password')
try:
user.login()
except wa.CaptchaRequired:
print user.captcha_url
# ask a human to solve captcha
user.login(captcha='ABC123')
except wa.EmailCodeRequired:
user.login(email_code='ZXC123')
except wa.TwoFactorCodeRequired:
user.login(twofactor_code='ZXC123')
user.session.get('https://store.steampowered.com/account/history/')
# OR
session = user.login()
session.get('https://store.steampowered.com/account/history')
Alternatively, if Steam Guard is not enabled on the account:
.. code:: python
try:
session = wa.WebAuth('username', 'password').login()
except wa.HTTPError:
pass
The :class:`MobileAuth` instance should be discarded once a session is obtained
as it is not reusable.
"""
import json
from time import time from time import time
import sys import sys
from base64 import b64encode from base64 import b64encode
@ -20,10 +70,11 @@ else:
class MobileAuth(object): class MobileAuth(object):
key = None key = None
complete = False #: whether authentication has been completed successfully complete = False #: whether authentication has been completed successfully
session = None #: :class:`requests.Session` (with auth cookies after auth is complete) session = None #: :class:`requests.Session` (with auth cookies after auth is complete)
captcha_gid = -1 captcha_gid = -1
steamid = None #: :class:`steam.steamid.SteamID` (after auth is complete) steamid = None #: :class:`steam.steamid.SteamID` (after auth is complete)
oauth = {} oauth = {}
def __init__(self, username, password): def __init__(self, username, password):
self.__dict__.update(locals()) self.__dict__.update(locals())
self.session = make_requests_session() self.session = make_requests_session()
@ -38,12 +89,12 @@ class MobileAuth(object):
def get_rsa_key(self, username): def get_rsa_key(self, username):
try: try:
resp = self.session.post('https://steamcommunity.com/mobilelogin/getrsakey/', resp = self.session.post('https://steamcommunity.com/mobilelogin/getrsakey/',
timeout=15, timeout=15,
data={ data={
'username': username, 'username': username,
'donotchache': int(time() * 1000), 'donotchache': int(time() * 1000),
}, },
).json() ).json()
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
raise HTTPError(str(e)) raise HTTPError(str(e))
@ -59,40 +110,11 @@ class MobileAuth(object):
self.key = backend.load_rsa_public_numbers(nums) self.key = backend.load_rsa_public_numbers(nums)
self.timestamp = resp['timestamp'] self.timestamp = resp['timestamp']
def request(self, uri, data):
if not self.complete:
return None
headers = {
'X-Requested-With': 'com.valvesoftware.android.steam.community',
'User-agent': 'Mozilla/5.0 (Linux; U; Android 4.1.1; en-us; Google Nexus 4 - 4.1.1 - API 16 - 768x1280 Build/JRO03S) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30'
}
try:
response = self.session.post(uri, data=data, headers=headers)
except requests.exceptions.RequestException as e:
raise HTTPError(str(e))
else:
return response
def refreshSession(self, oauth_token=None):
oauth_token = oauth_token or self.oauth['oauth_token']
response = self.request('https://api.steampowered.com/IMobileAuthService/GetWGToken/v0001', {'access_token': oauth_token})
try:
data = json.loads(response)
except Exception, e:
raise RefreshSessionFailed(str(e))
else:
self.oauth['wgtoken'] = data['response']['token']
self.oauth['wgtoken_secure'] = data['response']['token_secure']
self.session.cookies.set('steamLogin', '%s||%s' % (self.steamid, sself.oauth['wgtoken']), domain=domain, secure=False)
self.session.cookies.set('steamLoginSecure', '%s||%s' % (self.steamid, self.oauth['wgtoken_secure']), domain=domain, secure=True)
def login(self, captcha='', email_code='', twofactor_code='', language='english'): def login(self, captcha='', email_code='', twofactor_code='', language='english'):
if self.complete: if self.complete:
return self.session return self.session
for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']: for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']:
self.session.cookies.set('forceMobile', '1', domain=domain, secure=False) self.session.cookies.set('forceMobile', '1', domain=domain, secure=False)
self.session.cookies.set('mobileClientVersion', '0 (2.1.3)', domain=domain, secure=False) self.session.cookies.set('mobileClientVersion', '0 (2.1.3)', domain=domain, secure=False)
@ -103,7 +125,7 @@ class MobileAuth(object):
self._load_key() self._load_key()
data = { data = {
'username' : self.username, 'username': self.username,
"password": b64encode(self.key.encrypt(self.password.encode('ascii'), PKCS1v15())), "password": b64encode(self.key.encrypt(self.password.encode('ascii'), PKCS1v15())),
"emailauth": email_code, "emailauth": email_code,
"emailsteamid": str(self.steamid) if email_code else '', "emailsteamid": str(self.steamid) if email_code else '',
@ -129,11 +151,14 @@ class MobileAuth(object):
if resp['success'] and resp['login_complete']: if resp['success'] and resp['login_complete']:
self.complete = True self.complete = True
self.password = None self.password = None
resp['oauth'] = json.loads(resp['oauth'])
self.steamid = SteamID(resp['oauth']['steamid']) self.steamid = SteamID(resp['oauth']['steamid'])
self.oauth = resp['oauth'] self.oauth = resp['oauth']
for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']: for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']:
self.session.cookies.set('steamLogin', '%s||%s' % (self.steamid, resp['oauth']['wgtoken']), domain=domain, secure=False) self.session.cookies.set('steamLogin', '%s||%s' % (self.steamid, resp['oauth']['wgtoken']),
self.session.cookies.set('steamLoginSecure', '%s||%s' % (self.steamid, data['oauth']['wgtoken_secure']), domain=domain, secure=True) domain=domain, secure=False)
self.session.cookies.set('steamLoginSecure', '%s||%s' % (self.steamid, resp['oauth']['wgtoken_secure']),
domain=domain, secure=True)
return resp return resp
else: else:
@ -151,23 +176,26 @@ class MobileAuth(object):
return None return None
class MobileWebAuthException(Exception): class MobileWebAuthException(Exception):
pass pass
class HTTPError(MobileWebAuthException): class HTTPError(MobileWebAuthException):
pass pass
class LoginIncorrect(MobileWebAuthException): class LoginIncorrect(MobileWebAuthException):
pass pass
class CaptchaRequired(MobileWebAuthException): class CaptchaRequired(MobileWebAuthException):
pass pass
class EmailCodeRequired(MobileWebAuthException): class EmailCodeRequired(MobileWebAuthException):
pass pass
class TwoFactorCodeRequired(MobileWebAuthException):
pass
class RefreshSessionFailed(MobileWebAuthException): class TwoFactorCodeRequired(MobileWebAuthException):
pass pass

23
steam/webauth.py

@ -70,9 +70,9 @@ else:
class WebAuth(object): class WebAuth(object):
key = None key = None
complete = False #: whether authentication has been completed successfully complete = False #: whether authentication has been completed successfully
session = None #: :class:`requests.Session` (with auth cookies after auth is complete) session = None #: :class:`requests.Session` (with auth cookies after auth is complete)
captcha_gid = -1 captcha_gid = -1
steamid = None #: :class:`steam.steamid.SteamID` (after auth is complete) steamid = None #: :class:`steam.steamid.SteamID` (after auth is complete)
def __init__(self, username, password): def __init__(self, username, password):
self.__dict__.update(locals()) self.__dict__.update(locals())
@ -97,12 +97,12 @@ class WebAuth(object):
""" """
try: try:
resp = self.session.post('https://store.steampowered.com/login/getrsakey/', resp = self.session.post('https://store.steampowered.com/login/getrsakey/',
timeout=15, timeout=15,
data={ data={
'username': username, 'username': username,
'donotchache': int(time() * 1000), 'donotchache': int(time() * 1000),
}, },
).json() ).json()
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
raise HTTPError(str(e)) raise HTTPError(str(e))
@ -144,7 +144,7 @@ class WebAuth(object):
self._load_key() self._load_key()
data = { data = {
'username' : self.username, 'username': self.username,
"password": b64encode(self.key.encrypt(self.password.encode('ascii'), PKCS1v15())), "password": b64encode(self.key.encrypt(self.password.encode('ascii'), PKCS1v15())),
"emailauth": email_code, "emailauth": email_code,
"emailsteamid": str(self.steamid) if email_code else '', "emailsteamid": str(self.steamid) if email_code else '',
@ -199,17 +199,22 @@ class WebAuth(object):
class WebAuthException(Exception): class WebAuthException(Exception):
pass pass
class HTTPError(WebAuthException): class HTTPError(WebAuthException):
pass pass
class LoginIncorrect(WebAuthException): class LoginIncorrect(WebAuthException):
pass pass
class CaptchaRequired(WebAuthException): class CaptchaRequired(WebAuthException):
pass pass
class EmailCodeRequired(WebAuthException): class EmailCodeRequired(WebAuthException):
pass pass
class TwoFactorCodeRequired(WebAuthException): class TwoFactorCodeRequired(WebAuthException):
pass pass

Loading…
Cancel
Save