diff --git a/requirements.txt b/requirements.txt index cb09240..d974099 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ coverage==4.0.3 -setuptools>=11.3 gevent-eventemitter==1.4 enum34==1.1.2; python_version < '3.4' gevent==1.1.0 diff --git a/steam/account.py b/steam/account.py index a999c2d..9e97306 100644 --- a/steam/account.py +++ b/steam/account.py @@ -10,14 +10,7 @@ Example usage: account = steam.account.SteamAccount('username', 'password') account.set_account_property('identity_secret', 'XYZ') account.set_account_property('shared_secret', 'XYZ') - code = account.login_code - key = account.get_confirmation_key('conf') - - - -TODO: - - Where to save the credentials (Windows/Linux)? - - Implement mobile authenticator features? + api_key = account.get_api_key() """ import os import sys @@ -29,37 +22,40 @@ from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend -from steam.guard import * +import steam.guard import steam.webauth +import steam.webapi if sys.platform.startswith('win'): BASE_LOCATION = '.' #Windows else: BASE_LOCATION = '.' -DEFAULT_MOBILE_HEADERS = { - 'X-Requested-With': 'com.valvesoftware.android.steam.community', - 'User-agent': 'Mozilla/5.0 (Linux; U; Android 4.1.1; en-us; Google Nexus 4 - 4.1.1 - API 16 - 768x1280 Build/JRO03S) \ - AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30' -} - ACCOUNT_ATTRIBUTES = ['username', 'password', 'steamid', 'shared_secret', 'identity_secret', 'revocation_code',\ - 'secret_1', 'serial_number', 'deviceid', 'oauth_token'] + 'secret_1', 'serial_number', 'deviceid', 'oauth_token', 'apikey'] class SteamAccount(object): username = None password = None + + mobile_session = None + web_session = None + web_api = None + authenticator = None + _path = None _file = None _fernet_key = None _fernet_suite = None _web_auth = None - _session = None + _mobile_auth = None + def __init__(self, username, password): self.username = username self.password = password - self._setup() + if not self._setup(): + raise SteamAccountException('Could not access account.') def __del__(self): try: @@ -75,189 +71,113 @@ class SteamAccount(object): setattr(self, property, value) self._update_credential_file() + def del_account_property(self, property): + delattr(self, property) + self._update_credential_file() + + def check_account_property(self, property): + return hasattr(self, property) + @property def login_code(self): - try: - return generate_twofactor_code(self.shared_secret) - except AttributeError: + if self.authenticator: + return self.authenticator.get_code() + elif hasattr(self, 'shared_secret'): + return steam.guard.generate_twofactor_code(self.shared_secret) + else: raise SharedSecretNotSet('Add shared_secret to this instance to generate login codes') - def get_confirmation_key(self, tag, timestamp=None): - if not timestamp: - timestamp = get_time_offset() - try: - return generate_confirmation_key(self.identity_secret, timestamp, tag) - except AttributeError: - raise IdentitySecretNotSet('Add identity_secret to this instance to generate confirmation keys') - - def fetch_mobile_confirmations(self, retries=1): - self._verify_mobile_session() - - if not self._verify_mobile_authenticator(): - raise MobileAuthenticatorException('The steam mobile authenticator is required to access the mobile confirmations.') - - timestamp = get_time_offset() - confirmation_key = self.get_confirmation_key('conf', timestamp) - - confirmation_uri = 'https://steamcommunity.com/mobileconf/conf?p=%s&a=%s&k=%s&t=%s&m=android&tag=conf' %\ - ( self.deviceid, self.steamid, confirmation_key, timestamp) - - response = self.session.get(confirmation_uri, headers=DEFAULT_MOBILE_HEADERS) - - raw_confirmations = [ ] - - if response.status_code == 200: - if 'Invalid authenticator' in response.text: - retries += 1 - return self.fetch_mobile_confirmations(retries) - - confirmation_ids = re.findall(r'data-confid="(\d+)"', response.text) - confirmation_keys = re.findall(r'data-key="(\d+)"', response.text) - confirmation_descriptions = re.findall(r'
Key: (.*)
', response.text) + api_key = regex_result.group(1) + return api_key - def _verify_mobile_authenticator(self): - if getattr(self, 'has_mobile_authenticator') and self.has_mobile_authenticator: - return True - return False + else: + raise APIKeyException('An unhandled api key page appeared, please try again.') def _setup(self): self._generate_fernet_key() self._spawn_fernet_suite() self._path = '%s/%s' % (BASE_LOCATION, self.username) - self._file = open(self._path, 'r+') if not os.path.isfile(self._path): self._create_credential_file() else: + self._file = open(self._path, 'r+', 0) credentials = self._parse_credential_file() for key, value in credentials.iteritems(): setattr(self, key, value) + if self.check_account_property('shared_secret'): + self._spawn_authenticator() + else: + self.authenticator = steam.guard.SteamAuthenticator() + + if self.check_account_property('apikey'): + self._spawn_web_api() + return True + def _create_credential_file(self): + open(self._path, 'a').close() + self._file = open(self._path, 'r+', 0) data = json.dumps({ 'username': self.username, 'password': self.password }) - text = self._fernet_suite.encrypt(data) - self._file.write(text) + token = self._fernet_suite.encrypt(data) + self._file.write(token) def _parse_credential_file(self): - - text = self._file.read() - data = json.loads(self._fernet_suite.decrypt(text)) + self._file.seek(0) + token = self._file.read() + self._file.seek(0) + data = json.loads(self._fernet_suite.decrypt(token)) return data def _update_credential_file(self): credentials = self._gather_credentials() data = json.dumps(credentials) - - text = self._fernet_suite.encrypt(data) + token = self._fernet_suite.encrypt(data) self._file.truncate() - self._file.write(text) + self._file.write(token) def _gather_credentials(self): data = { } @@ -275,25 +195,61 @@ class SteamAccount(object): def _spawn_fernet_suite(self): self._fernet_suite = Fernet(self._fernet_key) - def _spawn_web_session(self): + def _spawn_web_api(self): + self.web_api = steam.webapi.WebAPI(self.apikey) + + def _spawn_authenticator(self): + secrets = { + 'identity_secret': getattr(self, 'identity_secret'), + 'shared_secret': getattr(self, 'shared_secret'), + 'secret_1': getattr(self, 'secret_1'), + 'revocation_code': getattr(self, 'revocation_code'), + } + self.authenticator = steam.guard.SteamAuthenticator(secrets) + self._spawn_mobile_session() + self.authenticator.medium = self._mobile_auth + + def _has_session(self): + if self._has_web_session() or self._has_mobile_session(): + return True + return False + + def _has_web_session(self): + if isinstance(self._web_auth, steam.webauth.WebAuth): + return True + return False + + def _has_mobile_session(self): + if isinstance(self._mobile_auth, steam.webauth.MobileWebAuth): + return True + return False + + def _spawn_web_session(self, captcha='', email_code='', twofactor_code=''): self._web_auth = steam.webauth.WebAuth(self.username, self.password) + self._login_web_session(self._web_auth, captcha, email_code, twofactor_code) + self.web_session = self._web_auth.session + + def _spawn_mobile_session(self, captcha='', email_code='', twofactor_code=''): + self._mobile_auth = steam.webauth.MobileWebAuth(self.username, self.password) + self._login_web_session(self._mobile_auth, captcha, email_code, twofactor_code) + self.mobile_session = self._mobile_auth.session - def _spawn_mobile_session(self): - self._web_auth = steam.webauth.MobileWebAuth(self.username, self.password) + def _login_web_session(self, web_auth, captcha='', email_code='', twofactor_code=''): + if not isinstance(web_auth, steam.webauth.WebAuth) and not isinstance(web_auth, steam.webauth.MobileWebAuth): + raise WebAuthNotComplete('Please supply a valid WebAuth or MobileWebAuth session') - def _login_web_session(self, captcha='', email_code='', twofactor_code=''): try: - self._web_auth.login() + web_auth.login() except steam.webauth.CaptchaRequired: if not captcha: raise CaptchaNotProvided('The steam login captcha is required for logging in, but was not provided.') - self._web_auth.login(captcha=captcha) + web_auth.login(captcha=captcha) except steam.webauth.EmailCodeRequired: if not email_code: raise EMailCodeNotProvided('The email code is required for logging in, but was not provided.') - self._web_auth.login(email_code=email_code) + web_auth.login(email_code=email_code) except steam.webauth.TwoFactorCodeRequired: if not twofactor_code: @@ -301,16 +257,14 @@ class SteamAccount(object): twofactor_code = self.login_code except SharedSecretNotSet: raise TwoFACodeNotProvided('The twofactor code is required for logging in, but was not provided.') - self._web_auth.login(twofactor_code=twofactor_code) + web_auth.login(twofactor_code=twofactor_code) - if self._web_auth.complete: - if not getattr(self, 'steamid'): - self.set_account_property('steamid', self._web_auth.steamid) + if web_auth.complete: + if not hasattr(self, 'steamid'): + self.set_account_property('steamid', web_auth.steam_id) - if isinstance(self._web_auth, steam.webauth.MobileWebAuth) and not getattr(self, 'oauth_token'): - self.set_account_property('oauth_token', self._web_auth.oauth_token) - - self._session = self._web_auth.session + if isinstance(web_auth, steam.webauth.MobileWebAuth) and not hasattr(self, 'oauth_token'): + self.set_account_property('oauth_token', web_auth.oauth_token) else: raise WebAuthNotComplete('The web authentication could not be completed.') @@ -326,6 +280,12 @@ class IdentitySecretNotSet(SteamAccountException): class WebAuthNotComplete(SteamAccountException): pass +class WebException(SteamAccountException): + pass + +class APIKeyException(SteamAccountException): + pass + class MobileAuthenticatorException(SteamAccountException): pass diff --git a/steam/webpresence.py b/steam/webpresence.py deleted file mode 100644 index 782f5c1..0000000 --- a/steam/webpresence.py +++ /dev/null @@ -1,363 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Wrapping methods for `ISteamWebUserPresenceOAuth` and `Polling` functionality - -Example usage: - -.. code:: python - import steam.webauth - import steam.webpresence - webAuth = steam.webauth.MobileWebAuth('username', 'password') - webAuth.login() - - def my_callback(messages): - for message in messages: - print '[%s] %s from %s' % (message.timestamp, message.type, message.steamid_from.as_64) - - webPresence = steam.webpresence.WebUserPresence(webAuth) - webPresence.logon() - webPresence.start_polling(my_callback) - print 'Started polling' -""" -import threading -import requests - -from steam.webauth import MobileWebAuth -from steam import SteamID - -DEFAULT_MOBILE_HEADERS = { - 'X-Requested-With': 'com.valvesoftware.android.steam.community', - 'User-agent': 'Mozilla/5.0 (Linux; U; Android 4.1.1; en-us; Google Nexus 4 - 4.1.1 - API 16 - 768x1280 Build/JRO03S) \ - AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30' -} - -API_BASE = 'https://api.steampowered.com' - -DEFAULT_TIMEOUT = 30 - -PERSONA_STATES = ['Offline', 'Online', 'Busy', 'Away', 'Snooze', 'Looking to trade', 'Looking to play'] - -class WebUserPresence: - """ - Wrapping methods for `ISteamWebUserPresenceOAuth` and `Polling` functionality - """ - _loggedon = False - _timeout = None - - _oauth_token = None - _session = None - - _steamid = None - _umqid = None - _message_base = None - - def __init__(self, mobile_web_auth, timeout=None): - if not isinstance(mobile_web_auth, MobileWebAuth): - raise InvalidInstanceSupplied('The instance supplied as parameter is no valid instance of `MobileWebAuth`.') - - if not mobile_web_auth.complete: - raise InstanceNotReady('Please make sure your `MobileWebAuth` instance is logged in.') - - self._timeout = timeout - self._oauth_token = mobile_web_auth.oauth_token - self._session = mobile_web_auth.session - - def _request(self, uri, data, timeout=None): - """ - HTTP Request - :param uri: URI to be requested - :param data: Data to be delivered - :param timeout: HTTP timeout - :return: `requests.Response` - """ - timeout = timeout or self._timeout or DEFAULT_TIMEOUT - return self._session.post(uri, data=data, headers=DEFAULT_MOBILE_HEADERS, timeout=timeout) - - def _call(self, method, data, timeout=None): - """ - Calling an `ISteamWebUserPresenceOAuth` api method - :param method: Method to be called - :param data: Data to be delivered - :param timeout: HTTP timeout - :return: If successful, response as tuple, otherwise exceptions are thrown - """ - uri = '%s/ISteamWebUserPresenceOAuth/%s/v1/' % (API_BASE, method) - - try: - response = self._request(uri, data, timeout=timeout) - except requests.exceptions.ReadTimeout: - raise HTTPError('Timeout') - - if response.status_code == 401: - raise NotAuthorized('Not authorized. Please check your OAuth token and verify your `MobileWebAuth` login.') - elif response.status_code != 200: - raise HTTPError('HTTP request failed. Status code: %s' % response.status_code) - - if self._loggedon: - self._message_base += 1 - - try: - json_response = response.json() - except: - raise ValueError('Could not build json_response') - else: - return json_response - - def logon(self): - """ - Sends logon to the api - :return: True if successful. Raises an `LogonFailed` exception otherwise - """ - login_data = { - 'access_token': self._oauth_token - } - response = self._call('Logon', login_data) - if response.get('error') == 'OK': - self._steamid = SteamID(response.get('steamid')) - self._umqid = response.get('umqid') - self._message_base = response.get('message') - - self._loggedon = True - return True - else: - raise LogonFailed('Logon failed. Please check your OAuth token and verify your `MobileWebAuth` login.') - - def logoff(self): - """ - Sends logoff to the api - :return: True if logoff was successful, False otherwise. - """ - response = self._call('Logoff', self._build_call_data({}), True) - self._loggedon = False - return response.get('error') == 'OK' or False - - def poll(self): - """ - Starts an poll request - :return: Full response as Tuple - """ - poll_data = { - 'pollid': 0, - 'sectimeout': 5, - 'secidletime': 0, - 'use_accountids': 1 - } - response = self._call('Poll', self._build_call_data(poll_data, True), timeout=60) - if response.get('error') == 'OK': - return response - else: - raise PollCreationFailed(response.get('error')) - - def start_polling(self, callback): - """ - Creates an instance of `WebPolling` and starts the thread - :param callback: Function reference for handling `WebPollMessages` - :return: True - """ - web_polling = self._spawn_web_polling(callback) - web_polling.start() - return True - - def stop_polling(self): - """ - Stops the active `WebPolling` - :return: True, False. - """ - return self._kill_web_polling() - - def message(self, steamid, text): - """ - Delivers a steam message - :param steamid: SteamID64 of the target user - :param text: Message to deliver - :return: True if deliver was successful, False otherwise - """ - message_data = self._build_call_data({ - 'text': text, - 'type': 'saytext', - 'steamid_dst': steamid - }) - response = self._call('Message', message_data) - return response.get('error') == 'OK' or False - - def _spawn_web_polling(self, callback): - """ - Spawns a `WebPolling` instance - :param callback: A method reference for handling incoming `WebPollMessages` - :return: Instance reference - """ - self._web_polling = WebPolling(self, callback) - return self._web_polling - - def _kill_web_polling(self): - """ - Stops the run() method of WebPolling after the current poll request - :return: True if `_web_polling` is spawned and could be stopped, False otherwise - """ - if getattr(self, '_web_polling'): - self._web_polling._active = False - return True - return False - - def _prep_messages(self, messages): - """ - Prepares incoming raw `WebPolling` messages by creating `WebPollMessage` instances - :param messages: raw `WebPolling` messages - :return: List of `WebPollMessages` instances - """ - prepped_messages = [ ] - for message in messages: - prepped_messages.append(WebPollMessage(message, self)) - return prepped_messages - - def _build_call_data(self, data, set_message_base=False): - """ - Builds the data for `_call`'s - :param data: Data to deliver as Tuple - :param set_message_base: True, False. Only required for calls where `message` has to be set - :return: Prepared call data - """ - base_data = { - 'umqid': self._umqid, - 'access_token': self._oauth_token - } - if set_message_base: - base_data.__setitem__('message', self._message_base) - - for key in data: - base_data.__setitem__(key, data.get(key)) - - return base_data - -class WebPolling(threading.Thread): - """ - Threaded class for `Polling` - """ - _active = True - _web_user_presence = None - _callback = None - - def __init__(self, web_user_presence, callback): - threading.Thread.__init__(self) - - if not isinstance(web_user_presence, WebUserPresence): - raise InvalidInstanceSupplied('The instance supplied as parameter is no valid instance of `WebUserPresence`.') - - if not web_user_presence._loggedon: - raise InstanceNotReady('The `WebUserPresence` has to be logged on.') - - self._web_user_presence = web_user_presence - self._callback = callback - - self.setDaemon(False) - - def run(self): - """ - Sends HTTP requests while class is `_active` and calls a specified callback - :return: - """ - while self._active: - try: - response = self._web_user_presence.poll() - except PollCreationFailed: - """ - Ignore timeout exception - """ - pass - except HTTPError: - """ - Ignore http exceptions - """ - pass - else: - prepped_messages = self._web_user_presence._prep_messages(response.get('messages')) - if self._callback: - self._callback(prepped_messages) - -class WebPollMessage(object): - """ - Class for proper handling of polling messages - """ - complete = False - - _web_user_presence = None - - timestamp = 0 - type = None - steamid_from = SteamID() - - text = None - - persona_state = 0 - persona_name = None - status_flags = None - - _answerable = False - _full_data = None - - def __init__(self, message, web_user_presence): - if not isinstance(web_user_presence, WebUserPresence): - raise InvalidInstanceSupplied('The instance supplied as parameter is no valid instance of `WebUserPresence`.') - - if not web_user_presence._loggedon: - raise InstanceNotReady('The `WebUserPresence` has to be logged on.') - - self._full_data = message - - self._web_user_presence = web_user_presence - - self.timestamp = message.get('utc_timestamp') - self.type = message.get('type') - self.steamid_from = SteamID(message.get('accountid_from')) - - if self.type == 'typing': - self._answerable = True - elif self.type == 'saytext': - self._answerable = True - self.text = message.get('text') - elif self.type == 'personastate': - self.persona_name = message.get('persona_name') - self.persona_state = message.get('persona_state') - self.status_flags = message.get('status_flags') - - def answer(self, message): - """ - If the current message is `_answerable` ( current message has something to do with the steam chat ) an - answer can be sent - :param message: Message to be delivered - :return: True if message could be delivered, False otherwise - """ - if not self._answerable: - return False - self._web_user_presence.message(self.steamid_from.as_64, message) - return True - - def persona_state_to_str(self): - """ - Returns the `persona_state` as string - :return: `persona_state` as String or False if `persona_state` is not available - """ - if self.persona_state: - return PERSONA_STATES.__getitem__(self.persona_state) - return False - -class WebUserPresenceException(Exception): - pass - -class InvalidInstanceSupplied(WebUserPresenceException): - pass - -class InstanceNotReady(WebUserPresenceException): - pass - -class NotAuthorized(WebUserPresenceException): - pass - -class LogonFailed(WebUserPresenceException): - pass - -class PollCreationFailed(WebUserPresenceException): - pass - -class HTTPError(WebUserPresenceException): - pass \ No newline at end of file