Browse Source

Upgrade SteamAuthenticatorError to have the eresult available where that applies, #357.

pull/361/head
luckydonald 4 years ago
parent
commit
fb6673a105
  1. 72
      steam/guard.py

72
steam/guard.py

@ -54,7 +54,7 @@ from binascii import hexlify
from time import time from time import time
from steam import webapi from steam import webapi
from steam.enums import ETwoFactorTokenType from steam.enums import ETwoFactorTokenType
from steam.steamid import SteamID from steam.exceptions import SteamError
from steam.core.crypto import hmac_sha1, sha1_hash from steam.core.crypto import hmac_sha1, sha1_hash
from steam.enums.common import EResult from steam.enums.common import EResult
from steam.webauth import MobileWebAuth from steam.webauth import MobileWebAuth
@ -127,7 +127,10 @@ class SteamAuthenticator(object):
if isinstance(backend, MobileWebAuth): if isinstance(backend, MobileWebAuth):
if not backend.logged_on: if not backend.logged_on:
raise SteamAuthenticatorError("MobileWebAuth instance not logged in") raise SteamAuthenticatorError(
message="MobileWebAuth instance not logged in",
eresult=None,
)
params['access_token'] = backend.oauth_token params['access_token'] = backend.oauth_token
params['http_timeout'] = 10 params['http_timeout'] = 10
@ -135,22 +138,32 @@ class SteamAuthenticator(object):
try: try:
resp = webapi.post('ITwoFactorService', action, 1, params=params) resp = webapi.post('ITwoFactorService', action, 1, params=params)
except requests.exceptions.RequestException as exp: except requests.exceptions.RequestException as exp:
raise SteamAuthenticatorError("Error adding via WebAPI: %s" % str(exp)) raise SteamAuthenticatorError(
message="Error adding via WebAPI: %s" % str(exp),
eresult=None,
)
resp = resp['response'] resp = resp['response']
else: else:
if not backend.logged_on: if not backend.logged_on:
raise SteamAuthenticatorError("SteamClient instance not logged in") raise SteamAuthenticatorError(
message="SteamClient instance not logged in",
eresult=None,
)
resp = backend.send_um_and_wait("TwoFactor.%s#1" % action, resp = backend.send_um_and_wait("TwoFactor.%s#1" % action,
params, timeout=10) params, timeout=10)
if resp is None: if resp is None:
raise SteamAuthenticatorError("Failed. Request timeout") raise SteamAuthenticatorError(
message="Failed. Request timeout",
eresult=None,
)
if resp.header.eresult != EResult.OK: if resp.header.eresult != EResult.OK:
raise SteamAuthenticatorError("Failed: %s (%s)" % (resp.header.error_message, raise SteamAuthenticatorError(
repr(resp.header.eresult))) message="Failed: %s (%s)" % (resp.header.error_message, repr(resp.header.eresult)),
eresult=resp.header.eresult,
)
resp = proto_to_dict(resp.body) resp = proto_to_dict(resp.body)
if action == 'AddAuthenticator': if action == 'AddAuthenticator':
@ -166,7 +179,10 @@ class SteamAuthenticator(object):
:raises: :class:`SteamAuthenticatorError` :raises: :class:`SteamAuthenticatorError`
""" """
if not self.has_phone_number(): if not self.has_phone_number():
raise SteamAuthenticatorError("Account doesn't have a verified phone number") raise SteamAuthenticatorError(
message="Account doesn't have a verified phone number",
eresult=None,
)
resp = self._send_request('AddAuthenticator', { resp = self._send_request('AddAuthenticator', {
'steamid': self.backend.steam_id, 'steamid': self.backend.steam_id,
@ -177,7 +193,10 @@ class SteamAuthenticator(object):
}) })
if resp['status'] != EResult.OK: if resp['status'] != EResult.OK:
raise SteamAuthenticatorError("Failed to add authenticator. Error: %s" % repr(EResult(resp['status']))) raise SteamAuthenticatorError(
message="Failed to add authenticator. Error: %s" % repr(EResult(resp['status'])),
eresult=resp['status'],
)
self.secrets = resp self.secrets = resp
self.steam_time_offset = int(resp['server_time']) - time() self.steam_time_offset = int(resp['server_time']) - time()
@ -203,7 +222,10 @@ class SteamAuthenticator(object):
return return
elif not resp['success']: elif not resp['success']:
self._finalize_attempts = 5 self._finalize_attempts = 5
raise SteamAuthenticatorError("Failed to finalize authenticator. Error: %s" % repr(EResult(resp['status']))) raise SteamAuthenticatorError(
message="Failed to finalize authenticator. Error: %s" % repr(EResult(resp['status'])),
eresult=resp['status'],
)
self.steam_time_offset = int(resp['server_time']) - time() self.steam_time_offset = int(resp['server_time']) - time()
@ -222,9 +244,15 @@ class SteamAuthenticator(object):
:raises: :class:`SteamAuthenticatorError` :raises: :class:`SteamAuthenticatorError`
""" """
if not self.secrets: if not self.secrets:
raise SteamAuthenticatorError("No authenticator secrets available?") raise SteamAuthenticatorError(
message="No authenticator secrets available?",
eresult=None,
)
if not isinstance(self.backend, MobileWebAuth): if not isinstance(self.backend, MobileWebAuth):
raise SteamAuthenticatorError("Only available via MobileWebAuth") raise SteamAuthenticatorError(
message="Only available via MobileWebAuth",
eresult=None,
)
resp = self._send_request('RemoveAuthenticator', { resp = self._send_request('RemoveAuthenticator', {
'steamid': self.backend.steam_id, 'steamid': self.backend.steam_id,
@ -233,9 +261,11 @@ class SteamAuthenticator(object):
}) })
if not resp['success']: if not resp['success']:
raise SteamAuthenticatorError("Failed to remove authenticator. (attempts remaining: %s)" % ( raise SteamAuthenticatorError(
resp['revocation_attempts_remaining'], message="Failed to remove authenticator. (attempts remaining: %s)" %
)) (resp['revocation_attempts_remaining'],),
eresult=None,
)
self.secrets.clear() self.secrets.clear()
@ -466,8 +496,14 @@ class SteamAuthenticator(object):
return resp return resp
class SteamAuthenticatorError(Exception): class SteamAuthenticatorError(SteamError):
pass def __init__(self, message, eresult=None):
Exception.__init__(self, message, eresult)
self.message = message
self.eresult = EResult(eresult) if eresult is not None else None #: :class:`.EResult`|:class:`None`
def __str__(self):
return "(%s) %s" % (self.eresult, self.message) if self.eresult is not None else self.message
def generate_twofactor_code(shared_secret): def generate_twofactor_code(shared_secret):

Loading…
Cancel
Save