From 99d74c82f2ee96f1f35aacaba9dba809048f5df7 Mon Sep 17 00:00:00 2001 From: Rossen Georgiev Date: Tue, 14 Jun 2016 23:38:10 +0100 Subject: [PATCH] implemented SteamAuthenticator; fix #32 --- README.rst | 7 +- docs/intro.rst | 7 +- steam/enums/common.py | 5 + steam/guard.py | 250 ++++++++++++++++++++++++++++++++++++++++- steam/util/__init__.py | 10 ++ 5 files changed, 271 insertions(+), 8 deletions(-) diff --git a/README.rst b/README.rst index 6d08794..4dd8663 100644 --- a/README.rst +++ b/README.rst @@ -6,13 +6,14 @@ Supports Python ``2.7+`` and ``3.3+``. Documentation: http://steam.readthedocs.io/en/latest/ -Main features -------------- +Key features +------------ +* `SteamAuthenticator `_ - enable/disable/manage 2FA on account and generate codes +* `SteamClient `_ - communication with the steam network based on ``gevent``. * `SteamID `_ - convert between the various ID representations with ease * `WebAPI `_ - simple API for Steam's Web API with automatic population of interfaces * `WebAuth `_ - authentication for access to ``store.steampowered.com`` and ``steamcommunity.com`` -* `SteamClient `_ - communication with the steam network based on ``gevent``. Checkout the `User guide `_ for examples, or the `API Reference `_ for details. diff --git a/docs/intro.rst b/docs/intro.rst index c377611..4a1b3ab 100644 --- a/docs/intro.rst +++ b/docs/intro.rst @@ -11,13 +11,14 @@ A python module for interacting with various parts of Steam_. Supports Python ``2.7+`` and ``3.3+``. -Main features -============= +Key features +============ +* :doc:`SteamAuthenticator ` - enable/disable/manage 2FA on account and generate codes +* :doc:`SteamClient ` - communication with the steam network based on ``gevent`` * :doc:`SteamID ` - convert between the various ID representations with ease * :doc:`WebAPI ` - simple API for Steam's Web API with automatic population of interfaces * :doc:`WebAuth ` - authentication for access to ``store.steampowered.com`` and ``steamcommunity.com`` -* :doc:`SteamClient ` - communication with the steam network based on ``gevent``. Checkout the :doc:`user_guide` for examples, or the :doc:`api/index` for details. diff --git a/steam/enums/common.py b/steam/enums/common.py index 8433649..b512c75 100644 --- a/steam/enums/common.py +++ b/steam/enums/common.py @@ -369,6 +369,11 @@ class ELeaderboardUploadScoreMethod(SteamIntEnum): KeepBest = 1 ForceUpdate = 2 +class ETwoFactorTokenType(SteamIntEnum): + NONE = 0 + ValveMobileApp = 1 + ThirdParty = 2 + # Do not remove from sys import modules from enum import EnumMeta diff --git a/steam/guard.py b/steam/guard.py index 7d184f2..9fd06af 100644 --- a/steam/guard.py +++ b/steam/guard.py @@ -1,8 +1,254 @@ +""" +This submodule contains various functionality related to Steam Guard. + +:class:`SteamAuthenticator` provides methods for genereating codes +and enabling 2FA on a Steam account. Operations managing the authenticator +on an account require an instance of either :class:`.MobileWebAuth` or +:class:`.SteamClient`. The instance needs to be logged in. + +Adding an authenticator + +.. code:: python + + sa = SteamAuthenticator(medium=medium) + sa.add() # SMS code will be send to account's phone number + sa.finalize('SMS CODE') + + sa.secrets # dict with authenticator secrets + + sa.get_code() # 2FA code for login + + sa.remove() # removes the authenticator from the account + +.. warning:: + Once the authenticator is enabled, make sure you save your secrets. + Otherwise you will lose access to the account. + +Once authenticator is enabled all you need is the secrets to generate codes. + +.. code:: python + + sa = SteamAuthenticator(secrets) + sa.get_code() + +""" import struct +import requests +from base64 import b64decode, b64encode from binascii import hexlify from time import time from steam import webapi +from steam.enums import ETwoFactorTokenType +from steam.steamid import SteamID from steam.core.crypto import hmac_sha1, sha1_hash +from steam.enums.common import EResult +from steam.webauth import MobileWebAuth +from steam.util import proto_to_dict + + +class SteamAuthenticator(object): + """Add/Remove authenticator from an account. Generate 2FA and confirmation codes.""" + _finalize_attempts = 5 + medium = None #: instance of :class:`.MobileWebAuth` or :class:`.SteamClient` + steam_time_offset = None #: offset from steam server time + secrets = None #: :class:`dict` with authenticator secrets + + def __init__(self, secrets=None, medium=None): + """ + :param secret: a dict of authenticator secrets + :type secret: dict + :param medium: logged on session for steam user + :type mediumm: :class:`.MobileWebAuth`, :class:`.SteamClient` + """ + self.secrets = secrets or {} + self.medium = medium + + def __getattr__(self, key): + if key not in self.secrets: + raise AttributeError("No such attribute") + return self.secrets[key] + + def get_time(self): + """ + :return: Steam aligned timestamp + :rtype: int + """ + if self.steam_time_offset is None: + self.steam_time_offset = get_time_offset() + return int(time() + self.steam_time_offset) + + def get_code(self, timestamp=None): + """ + :param timestamp: time to use for code generation + :type timestamp: int + :return: two factor code + :rtype: str + """ + return generate_twofactor_code_for_time(b64decode(self.shared_secret), + self.get_time() if timestamp is None else timestamp) + + def get_confirmation_key(self, tag='', timestamp=None): + """ + :param tag: see :func:`generate_confimation_key` for this value + :type tag: str + :param timestamp: time to use for code generation + :type timestamp: int + :return: trade confirmation key + :rtype: str + """ + return generate_confirmation_key(b64decode(self.identity_secret), tag, + self.get_time() if timestamp is None else timestamp) + + def _send_request(self, action, params): + action_map = { + 'add': 'AddAuthenticator', + 'finalize': 'FinalizeAddAuthenticator', + 'remove': 'RemoveAuthenticator', + 'status': 'QueryStatus', + 'createcodes': 'CreateEmergencyCodes', + 'destroycodes': 'DestroyEmergencyCodes', + } + medium = self.medium + + if isinstance(medium, MobileWebAuth): + if not medium.complete: + raise SteamAuthenticatorError("MobileWebAuth instance not logged in") + + params['access_token'] = medium.oauth_token + params['http_timeout'] = 10 + + try: + resp = webapi.post('ITwoFactorService', action_map[action], 1, params=params) + except requests.exceptions.RequestException as exp: + raise SteamAuthenticatorError("Error adding via WebAPI: %s" % str(exp)) + + resp = resp['response'] + else: + if not medium.logged_on: + raise SteamAuthenticatorError("SteamClient instance not logged in") + + resp = medium.unified_messages.send_and_wait("TwoFactor.%s#1" % action_map[action], + params, timeout=10) + if resp is None: + raise SteamAuthenticatorError("Failed to add authenticator. Request timeout") + + resp = proto_to_dict(resp) + + if action == 'add': + for key in ['shared_secret', 'identity_secret', 'secret_1']: + resp[key] = b64encode(resp[key]) + + return resp + + def add(self): + """Add authenticator to an account. + The account's phone number will receive a SMS code required for :meth:`finalize`. + + :raises: :class:`SteamAuthenticatorError` + """ + params = { + 'steamid': self.medium.steam_id, + 'authenticator_time': int(time()), + 'authenticator_type': int(ETwoFactorTokenType.ValveMobileApp), + 'device_identifier': generate_device_id(self.medium.steam_id), + 'sms_phone_id': '1', + } + + resp = self._send_request('add', params) + + if resp['status'] != EResult.OK: + raise SteamAuthenticatorError("Failed to add authenticator. Error: %s" % repr(EResult(resp['status']))) + + for key in ['shared_secret', 'identity_secret', 'serial_number', 'secret_1', 'revocation_code', 'token_gid']: + if key in resp: + self.secrets[key] = resp[key] + + self.steam_time_offset = int(resp['server_time']) - time() + + def finalize(self, activation_code): + """Finalize authenticator with received SMS code + + :param activation_code: SMS code + :type activation_code: str + :raises: :class:`SteamAuthenticatorError` + """ + params = { + 'steamid': self.medium.steam_id, + 'authenticator_time': int(time()), + 'authenticator_code': self.get_code(), + 'activation_code': activation_code, + } + + resp = self._send_request('finalize', params) + + if resp['status'] != EResult.TwoFactorActivationCodeMismatch and resp.get('want_more', False) and self._finalize_attempts: + self.steam_time_offset += 30 + self._finalize_attempts -= 1 + self.finalize(activation_code) + return + elif not resp['success']: + self._finalize_attempts = 5 + raise SteamAuthenticatorError("Failed to finalize authenticator. Error: %s" % repr(EResult(resp['status']))) + + self.steam_time_offset = int(resp['server_time']) - time() + + def remove(self): + """Remove authenticator + + .. warning:: + Doesn't work via :class:`.SteamClient`. Disabled by Valve + + :raises: :class:`SteamAuthenticatorError` + """ + if not self.secrets: + raise SteamAuthenticatorError("No authenticator secrets available?") + + params = { + 'steamid': self.medium.steam_id, + 'revocation_code': self.revocation_code, + 'steamguard_scheme': 1, + } + + resp = self._send_request('remove', params) + + if not resp['success']: + raise SteamAuthenticatorError("Failed to remove authenticator. (attempts remaining: %s)" % ( + resp['revocation_attempts_remaining'], + )) + + self.secrets.clear() + + def status(self, medium=None): + """Fetch authenticator status for the account + + :raises: :class:`SteamAuthenticatorError` + :return: dict with status parameters + :rtype: dict + """ + params = {'steamid': self.medium.steam_id} + return self._send_request('status', params) + + def create_emergency_codes(self): + """Generate emergency codes + + :raises: :class:`SteamAuthenticatorError` + :return: list of codes + :rtype: list + """ + return self._send_request('createcodes', {}).get('code', []) + + def destroy_emergency_codes(self): + """Destroy all emergency codes + + :raises: :class:`SteamAuthenticatorError` + """ + params = {'steamid': self.medium.steam_id} + self._send_request('destroycodes', params) + + +class SteamAuthenticatorError(Exception): + pass + def generate_twofactor_code(shared_secret): """Generate Steam 2FA code for login with current time @@ -69,7 +315,7 @@ def get_time_offset(): :rtype: int """ try: - resp = webapi.post('ITwoFactorService', 'QueryTime', 1, params={'http_timeout': 5}) + resp = webapi.post('ITwoFactorService', 'QueryTime', 1, params={'http_timeout': 10}) except: return 0 @@ -84,5 +330,5 @@ def generate_device_id(steamid): :return: android device id :rtype: str """ - h = hexlify(sha1(str(steamid).encode('ascii'))).decode('ascii') + h = hexlify(sha1_hash(str(steamid).encode('ascii'))).decode('ascii') return "android:%s-%s-%s-%s-%s" % (h[:8], h[8:12], h[12:16], h[16:20], h[20:32]) diff --git a/steam/util/__init__.py b/steam/util/__init__.py index 8444bf2..0195a45 100644 --- a/steam/util/__init__.py +++ b/steam/util/__init__.py @@ -59,6 +59,16 @@ def clear_proto_bit(emsg): """ return int(emsg) & ~protobuf_mask +def proto_to_dict(message): + """Converts protobuf message instance to dict (shallow) + + :param message: protobuf message + :return: parameters and their values + :rtype: dict + """ + return {field.name: getattr(message, field.name, field.default_value) + for field in message.DESCRIPTOR.fields} + def chunks(arr, size): """Splits a list into chunks