Browse Source

Merge 9004636d7c into 26166e047b

pull/454/merge
artur1214 1 year ago
committed by GitHub
parent
commit
eb0d6ec9e4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 13
      steam/client/__init__.py
  3. 2
      steam/client/builtins/web.py
  4. 515
      steam/webauth.py

1
.gitignore

@ -33,3 +33,4 @@
.coverage
*.swp
*.bin
*.mafile

13
steam/client/__init__.py

@ -34,6 +34,8 @@ from steam.client.builtins import BuiltinBase
from steam.utils import ip4_from_int, ip4_to_int
from steam.utils.proto import proto_fill_from_dict
# TODO: remove py2 support.
if six.PY2:
_cli_input = raw_input
else:
@ -42,14 +44,13 @@ else:
class SteamClient(CMClient, BuiltinBase):
EVENT_LOGGED_ON = 'logged_on'
"""After successful login
"""
"""After successful login"""
EVENT_AUTH_CODE_REQUIRED = 'auth_code_required'
"""When either email or 2FA code is needed for login
"""
"""When either email or 2FA code is needed for login"""
EVENT_NEW_LOGIN_KEY = 'new_login_key'
"""After a new login key is accepted
"""
"""After a new login key is accepted"""
_LOG = logging.getLogger("SteamClient")
_reconnect_backoff_c = 0

2
steam/client/builtins/web.py

@ -19,6 +19,8 @@ class Web(object):
def __handle_disconnect(self):
self._web_session = None
# TODO: DEPRECATED. This function not work anymore.
#This function must be rewritten to use WebAuth
def get_web_session_cookies(self):
"""Get web authentication cookies via WebAPI's ``AuthenticateUser``

515
steam/webauth.py

@ -61,10 +61,15 @@ from getpass import getpass
import six
import requests
from steam.enums.proto import EAuthSessionGuardType
from steam.steamid import SteamID
from steam.utils.web import make_requests_session, generate_session_id
from steam.utils.web import generate_session_id
from steam.core.crypto import rsa_publickey, pkcs1v15_encrypt
# TODO: Remove python2 support.
# TODO: Encrease min python version to 3.5
if six.PY2:
intBase = long
_cli_input = raw_input
@ -72,228 +77,328 @@ else:
intBase = int
_cli_input = input
API_HEADERS = {
'origin': 'https://steamcommunity.com',
'referer': 'https://steamcommunity.com/',
'accept': 'application/json, text/plain, */*'
}
API_URL = 'https://api.steampowered.com/{}Service/{}/v{}'
class WebAuth(object):
key = None
logged_on = False #: whether authentication has been completed successfully
session = None #: :class:`requests.Session` (with auth cookies after auth is completed)
session_id = None #: :class:`str`, session id string
captcha_gid = -1
captcha_code = ''
steam_id = None #: :class:`.SteamID` (after auth is completed)
def __init__(self, username, password=''):
self.__dict__.update(locals())
self.session = make_requests_session()
self._session_setup()
def _session_setup(self):
pass
@property
def captcha_url(self):
"""If a captch is required this property will return url to the image, or ``None``"""
if self.captcha_gid == -1:
return None
else:
return "https://steamcommunity.com/login/rendercaptcha/?gid=%s" % self.captcha_gid
"""New WEB Auth class.
def get_rsa_key(self, username):
"""Get rsa key for a given username
This class works with Steam API:
https://steamapi.xpaw.me/#IAuthenticationService
:param username: username
:type username: :class:`str`
:return: json response
:rtype: :class:`dict`
:raises HTTPError: any problem with http request, timeouts, 5xx, 4xx etc
"""
try:
resp = self.session.post('https://steamcommunity.com/login/getrsakey/',
timeout=15,
data={
'username': username,
'donotcache': int(time() * 1000),
},
).json()
except requests.exceptions.RequestException as e:
raise HTTPError(str(e))
Currently, supports bsaic login/password auth with no 2FA, 2FA via
steam guard code and 2FA via EMAIL confirmation.
return resp
TODO: Add QR code support.
def _load_key(self):
if not self.key:
resp = self.get_rsa_key(self.username)
TODO: Fully rework api handling. PUT api into separate class,
in order to make this class responsible only for actual auth.
self.key = rsa_publickey(intBase(resp['publickey_mod'], 16),
intBase(resp['publickey_exp'], 16),
)
self.timestamp = resp['timestamp']
IMPORTANT:
Actually, at real login page
steam handles function little bit different.
e.g. https://api.steampowered.com/IAuthenticationService/BeginAuthSessionViaCredentials/v1
can handle multipart/form-data; with something like.
def _send_login(self, password='', captcha='', email_code='', twofactor_code=''):
data = {
'username': self.username,
"password": b64encode(pkcs1v15_encrypt(self.key, password.encode('ascii'))),
"emailauth": email_code,
"emailsteamid": str(self.steam_id) if email_code else '',
"twofactorcode": twofactor_code,
"captchagid": self.captcha_gid,
"captcha_text": captcha,
"loginfriendlyname": "python-steam webauth",
"rsatimestamp": self.timestamp,
"remember_login": 'true',
"donotcache": int(time() * 100000),
{
input_protobuf_encoded: EgxhbmFjb25kYXJ0dXIa2AJodHgzZzlJaWY4dGE4RGxqR2VWbncwZWpxdi9uNDByRkZxaFduVk12VFFhRm1ZU1F4MHlrYUlJNmFURlVJWEQ5Z2VXMTlObWJDc3pydmNQZ1RTVllyOWl0SW5EWjgzMVh0YWtOaHJaUk9JN1lvMzhpb2xHRmdHdVBZT3NsekErTHZNZlJoQ3YzL1JFaEpNQlhjaXhzNklRRTZjbnM4d1JWbGI0TVA1Nzd0MHpGajRpcWF4U21KbnVjRDh5YzVIVkYvMERlMnFKd3dGTG0vR3B4SEdreFFlQURzZi9OTXJMTUszcWxnR3NLZm4ycGxNOGhkMzF3YnErSUlCZkNJb3dFZWExaUpJcmVjYkdLT0EvRlJ5VFpSQlVoVitLQmt6TGk3THY1UjVNYVRJSzNPTCtCMUZnZ2xWSG94c0ErTm5BMHVqSVZWZ0ZRdGpDL2tMTjd0SmhYamc9PSCA+aCT0ggoATgBQgVTdG9yZUqBAQp9TW96aWxsYS81LjAgKFdpbmRvd3MgTlQgMTAuMDsgV2luNjQ7IHg2NCkgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgQ2hyb21lLzExNi4wLjAuMCBTYWZhcmkvNTM3LjM2IE9QUi8xMDIuMC4wLjAQAlgI
}
it's protobuf encoded value. You can decode it here:
https://protobuf-decoder.netlify.app
some fields I can understand:
2) string - steamlogin
3) string - encrypted password
4) timestamp to map to a key - STime
5) some INT (probably it's always 1) it's DEPRECATED
7) whether we are requesting a persistent or an ephemeral session
8) (EMachineAuthWebDomain) identifier of client requesting auth.
e.g. "Store"
9) Protobuf of device type (see CAuthentication_DeviceDetails):
9.1) string - User-Agent
9.2) Int - platform identifier e.g. 2 (means Web Browser)
(See EAuthTokenPlatformType protobuf).
9.3) os_type (MOSTLY NOT PRESENTED IN REAL REQUESTS)
9.4) gaming_device_type (MOSTLY NOT PRESENTED IN REAL REQUESTS)
11) UNDOCUMENTED AT ALL: Some number (like 8)
FIELD NUMBERS I SKIPPED MEANS THEY ARE NOT PRESENTED IN REAL REQUEST
We currently uses basic multipart/form-data and "key-value"
data presentation.
But I Think, it's important to know, that real steam works differently,
and maybe we can once upon a time simulate it's REAL behavior.
"""
# Pretend to be chrome on windows, made this act as most like a
# browser as possible to (hopefully) avoid breakage in the future from valve
def __init__(self, username='', password='',
userAgent='Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
' AppleWebKit/537.36 (KHTML, like Gecko)'
' Chrome/118.0.0.0 Safari/537.36'):
# ALL FUNCTIONS RENAMED TO PEP8 NOTATION.
self.session = requests.session()
self.user_agent = userAgent
self.username = username
self.password = password
self.session.headers['User-Agent'] = self.user_agent
self.steam_id = None
self.client_id = None
self.request_id = None
self.refresh_token = None
self.access_token = None
self.session_id = None
self.email_auth_waits = False # Not used yet.
self.logged_on = False
@staticmethod
def send_api_request(data, steam_api_interface, steam_api_method,
steam_api_version):
"""Send request to Steam API via requests"""
steam_url = API_URL.format(steam_api_interface, steam_api_method,
steam_api_version)
if steam_api_method == "GetPasswordRSAPublicKey": # It's GET method
res = requests.get(steam_url, timeout=10, headers=API_HEADERS,
params=data)
else: # Every other API endpoints are POST.
res = requests.post(steam_url, timeout=10, headers=API_HEADERS,
data=data)
res.raise_for_status()
return res.json()
def _get_rsa_key(self):
"""Get rsa key to crypt password."""
return self.send_api_request({'account_name': self.username},
"IAuthentication", 'GetPasswordRSAPublicKey', 1)
def _encrypt_password(self):
"""Encrypt password via RSA key
Steam handles every password only in encoded way.
"""
r = self._get_rsa_key()
try:
return self.session.post('https://steamcommunity.com/login/dologin/', data=data, timeout=15).json()
except requests.exceptions.RequestException as e:
raise HTTPError(str(e))
mod = intBase(r['response']['publickey_mod'], 16)
exp = intBase(r['response']['publickey_exp'], 16)
def _finalize_login(self, login_response):
self.steam_id = SteamID(login_response['transfer_parameters']['steamid'])
def login(self, password='', captcha='', email_code='', twofactor_code='', language='english'):
"""Attempts web login and returns on a session with cookies set
:param password: password, if it wasn't provided on instance init
:type password: :class:`str`
:param captcha: text reponse for captcha challenge
:type captcha: :class:`str`
:param email_code: email code for steam guard
:type email_code: :class:`str`
:param twofactor_code: 2FA code for steam guard
:type twofactor_code: :class:`str`
:param language: select language for steam web pages (sets language cookie)
:type language: :class:`str`
:return: a session on success and :class:`None` otherwise
:rtype: :class:`requests.Session`, :class:`None`
:raises HTTPError: any problem with http request, timeouts, 5xx, 4xx etc
:raises LoginIncorrect: wrong username or password
:raises CaptchaRequired: when captcha is needed
:raises CaptchaRequiredLoginIncorrect: when captcha is needed and login is incorrect
:raises EmailCodeRequired: when email is needed
:raises TwoFactorCodeRequired: when 2FA is needed
"""
if self.logged_on:
return self.session
pub_key = rsa_publickey(mod, exp)
encrypted = pkcs1v15_encrypt(pub_key, self.password.encode('ascii'))
b64 = b64encode(encrypted)
return tuple((b64.decode('ascii'), r['response']['timestamp']))
def _startSessionWithCredentials(self, account_encrypted_password: str,
time_stamp: int):
"""Start login session via BeginAuthSessionViaCredentials
if password:
self.password = password
elif self.password:
password = self.password
else:
raise LoginIncorrect("password is not specified")
if not captcha and self.captcha_code:
captcha = self.captcha_code
"""
resp = self.send_api_request(
{'device_friendly_name': self.user_agent,
'account_name': self.username,
'encrypted_password': account_encrypted_password,
'encryption_timestamp': time_stamp,
'remember_login': '1',
'platform_type': '2',
'persistence': '1',
'website_id': 'Community',
},
'IAuthentication',
'BeginAuthSessionViaCredentials',
1
)
self.client_id = resp['response']['client_id']
self.request_id = resp['response']['request_id']
self.steam_id = SteamID(resp['response']['steamid'])
def _startLoginSession(self):
"""Starts login session via credentials."""
encrypted_password = self._encrypt_password()
self._startSessionWithCredentials(encrypted_password[0],
encrypted_password[1])
def _pollLoginStatus(self):
"""Get status of current Login Session
This function asks server about login session status.
If we logged in, this returns access_token that we needed.
TODO: add check of interval, returned from _startSessionWithCredentials
actually it has no need now, but
"""
resp = self.send_api_request({
'client_id': str(self.client_id),
'request_id': str(self.request_id)
}, 'IAuthentication', 'PollAuthSessionStatus', 1)
try:
self.refresh_token = resp['response']['refresh_token']
self.access_token = resp['response']['access_token']
except KeyError:
raise WebAuthException('Authentication requires 2fa token, which is not provided or invalid')
def _finalizeLogin(self):
self.sessionID = generate_session_id()
self.logged_on = True
for domain in ['store.steampowered.com', 'help.steampowered.com',
'steamcommunity.com']:
self.session.cookies.set('sessionid', self.sessionID, domain=domain)
self.session.cookies.set('steamLoginSecure',
str(self.steam_id.as_64) + "||" + str(
self.access_token), domain=domain)
def _update_login_token(
self,
code: str,
code_type: EAuthSessionGuardType = EAuthSessionGuardType.DeviceCode
):
"""Send 2FA token to steam
Please note, that very rare, login can be unsuccessful,
if you use code, that guard provided you BEFORE log in session
was started. To fix it, just rerun login.
self._load_key()
resp = self._send_login(password=password, captcha=captcha, email_code=email_code, twofactor_code=twofactor_code)
"""
data = {
'client_id': self.client_id,
'steamid': self.steam_id,
'code': code,
'code_type': code_type
}
res = self.send_api_request(data, 'IAuthentication',
'UpdateAuthSessionWithSteamGuardCode', 1)
return res
if resp['success'] and resp['login_complete']:
self.logged_on = True
self.password = self.captcha_code = ''
self.captcha_gid = -1
for cookie in list(self.session.cookies):
for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']:
self.session.cookies.set(cookie.name, cookie.value, domain=domain, secure=cookie.secure)
def login(self, username: str = '', password: str = '', code: str = None,
email_required=False):
"""Log in user by new Steam API
self.session_id = generate_session_id()
If user has no need 2FA, this function will just log in user.
If 2FA SteamGuard code needed, when user can provide it just
with guard.SteamAuthenticator.get_code like it always was.
for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']:
self.session.cookies.set('Steam_Language', language, domain=domain)
self.session.cookies.set('birthtime', '-3333', domain=domain)
self.session.cookies.set('sessionid', self.session_id, domain=domain)
If Email code is required, when user can provide email_required.
If email_required was provided, when this function only setup auth
and return new function.
self._finalize_login(resp)
this function will receive email code. Once email code will be provided
authentication process will be complete.
If wrong code provided in this new function, when error will be raised.
And new code will be waited.
"""
if self.logged_on:
return self.session
else:
if resp.get('captcha_needed', False):
self.captcha_gid = resp['captcha_gid']
self.captcha_code = ''
if resp.get('clear_password_field', False):
self.password = ''
raise CaptchaRequiredLoginIncorrect(resp['message'])
else:
raise CaptchaRequired(resp['message'])
elif resp.get('emailauth_needed', False):
self.steam_id = SteamID(resp['emailsteamid'])
raise EmailCodeRequired(resp['message'])
elif resp.get('requires_twofactor', False):
raise TwoFactorCodeRequired(resp['message'])
elif 'too many login failures' in resp.get('message', ''):
raise TooManyLoginFailures(resp['message'])
else:
self.password = ''
raise LoginIncorrect(resp['message'])
def cli_login(self, password='', captcha='', email_code='', twofactor_code='', language='english'):
"""Generates CLI prompts to perform the entire login process
if username == '' or password == '':
if self.username == '' and self.password == '':
raise ValueError("Username or password is provided empty!")
else:
self.username = username
self.password = password
:param password: password, if it wasn't provided on instance init
:type password: :class:`str`
:param captcha: text reponse for captcha challenge
:type captcha: :class:`str`
:param email_code: email code for steam guard
:type email_code: :class:`str`
:param twofactor_code: 2FA code for steam guard
:type twofactor_code: :class:`str`
:param language: select language for steam web pages (sets language cookie)
:type language: :class:`str`
:return: a session on success and :class:`None` otherwise
:rtype: :class:`requests.Session`, :class:`None`
self._startLoginSession()
if code:
self._update_login_token(code)
if email_required:
# We do another request, which force steam to send email code
# (otherwise code just not sent).
url = (f'https://login.steampowered.com/jwt/checkdevice/'
f'{self.steam_id}')
res = self.session.post(url, data={
'clientid': self.client_id,
'steamid': self.steam_id
}).json()
if res.get('result') == 8:
# This usually mean code sent now.
def end_login(email_code: str):
self._update_login_token(email_code,
EAuthSessionGuardType.EmailCode)
self._pollLoginStatus()
self._finalizeLogin()
return self.session
return end_login
if res.get('result') == 29:
# This code 100% means some data not valid
# Actually this must will never be called, because
# Errors can be only like wrong cookies. (Theoretically)
raise WebAuthException("Something invalid went. Try again later.")
self._pollLoginStatus()
self._finalizeLogin()
.. code:: python
return self.session
In [3]: user.cli_login()
Enter password for 'steamuser':
Solve CAPTCHA at https://steamcommunity.com/login/rendercaptcha/?gid=1111111111111111111
CAPTCHA code: 123456
Invalid password for 'steamuser'. Enter password:
Solve CAPTCHA at https://steamcommunity.com/login/rendercaptcha/?gid=2222222222222222222
CAPTCHA code: abcdef
Enter 2FA code: AB123
Out[3]: <requests.sessions.Session at 0x6fffe56bef0>
def logout_everywhere(self):
"""Log out on every device.
This function works just like button at
https://store.steampowered.com/twofactor/manage
and allows user to logout on every device.
Can be VERY useful e.g. for users, who practice account rent.
"""
session_id = self.session.cookies.get('sessionid',
domain='store.steampowered.com')
# loop until successful login
while True:
try:
return self.login(password, captcha, email_code, twofactor_code, language)
except (LoginIncorrect, CaptchaRequired) as exp:
email_code = twofactor_code = ''
if isinstance(exp, LoginIncorrect):
prompt = ("Enter password for %s: " if not password else
"Invalid password for %s. Enter password: ")
password = getpass(prompt % repr(self.username))
if isinstance(exp, CaptchaRequired):
prompt = "Solve CAPTCHA at %s\nCAPTCHA code: " % self.captcha_url
captcha = _cli_input(prompt)
else:
captcha = ''
except EmailCodeRequired:
prompt = ("Enter email code: " if not email_code else
"Incorrect code. Enter email code: ")
email_code, twofactor_code = _cli_input(prompt), ''
except TwoFactorCodeRequired:
prompt = ("Enter 2FA code: " if not twofactor_code else
"Incorrect code. Enter 2FA code: ")
email_code, twofactor_code = '', _cli_input(prompt)
# By the times I saw session can be both of keys, so select valid.
session_id = session_id or self.session.cookies.get(
'sessionId', domain='store.steampowered.com')
data = {
"action": "deauthorize",
"sessionid": session_id
}
resp = self.session.post(
'https://store.steampowered.com/twofactor/manage_action',
data=data
)
return resp.status_code == 200
def cli_login(self, username: str = '', password: str = '', code: str = '',
email_required: bool = False):
"""Generates CLI prompts to perform the entire login process
If you use email confirm, provide email_required = True,
else just provide code.
"""
res = self.login(
username,
password,
code,
email_required
)
if hasattr(res, '__call__'):
while True:
try:
twofactor_code = input('Enter your 2fa/email code: ')
resp = res(twofactor_code)
return resp
except WebAuthException:
pass
else:
return self.session
#TODO: DEPRECATED, must be rewritten, like WebAuth
class MobileWebAuth(WebAuth):
"""Identical to :class:`WebAuth`, except it authenticates as a mobile device."""
oauth_token = None #: holds oauth_token after successful login
def _send_login(self, password='', captcha='', email_code='', twofactor_code=''):
def _send_login(self, password='', captcha='', email_code='',
twofactor_code=''):
data = {
'username': self.username,
"password": b64encode(pkcs1v15_encrypt(self.key, password.encode('ascii'))),
"password": b64encode(
pkcs1v15_encrypt(self.key, password.encode('ascii'))),
"emailauth": email_code,
"emailsteamid": str(self.steam_id) if email_code else '',
"twofactorcode": twofactor_code,
@ -311,7 +416,9 @@ class MobileWebAuth(WebAuth):
self.session.cookies.set('mobileClient', 'android')
try:
return self.session.post('https://steamcommunity.com/login/dologin/', data=data, timeout=15).json()
return self.session.post(
'https://steamcommunity.com/login/dologin/', data=data,
timeout=15).json()
except requests.exceptions.RequestException as e:
raise HTTPError(str(e))
finally:
@ -357,7 +464,9 @@ class MobileWebAuth(WebAuth):
}
try:
resp = self.session.post('https://api.steampowered.com/IMobileAuthService/GetWGToken/v0001', data=data)
resp = self.session.post(
'https://api.steampowered.com/IMobileAuthService/GetWGToken/v0001',
data=data)
except requests.exceptions.RequestException as e:
raise HTTPError(str(e))
@ -371,13 +480,20 @@ class MobileWebAuth(WebAuth):
self.session_id = generate_session_id()
for domain in ['store.steampowered.com', 'help.steampowered.com', 'steamcommunity.com']:
for domain in ['store.steampowered.com', 'help.steampowered.com',
'steamcommunity.com']:
self.session.cookies.set('birthtime', '-3333', domain=domain)
self.session.cookies.set('sessionid', self.session_id, domain=domain)
self.session.cookies.set('mobileClientVersion', '0 (2.1.3)', domain=domain)
self.session.cookies.set('sessionid', self.session_id,
domain=domain)
self.session.cookies.set('mobileClientVersion', '0 (2.1.3)',
domain=domain)
self.session.cookies.set('mobileClient', 'android', domain=domain)
self.session.cookies.set('steamLogin', str(steam_id) + "%7C%7C" + resp_data['token'], domain=domain)
self.session.cookies.set('steamLoginSecure', str(steam_id) + "%7C%7C" + resp_data['token_secure'],
self.session.cookies.set('steamLogin',
str(steam_id) + "%7C%7C" + resp_data[
'token'], domain=domain)
self.session.cookies.set('steamLoginSecure',
str(steam_id) + "%7C%7C" + resp_data[
'token_secure'],
domain=domain, secure=True)
self.session.cookies.set('Steam_Language', language, domain=domain)
@ -389,23 +505,34 @@ class MobileWebAuth(WebAuth):
class WebAuthException(Exception):
pass
class TwoFactorAuthNotProvided(WebAuthException):
pass
class HTTPError(WebAuthException):
pass
class LoginIncorrect(WebAuthException):
pass
class CaptchaRequired(WebAuthException):
pass
class CaptchaRequiredLoginIncorrect(CaptchaRequired, LoginIncorrect):
pass
class EmailCodeRequired(WebAuthException):
pass
class TwoFactorCodeRequired(WebAuthException):
pass
class TooManyLoginFailures(WebAuthException):
pass

Loading…
Cancel
Save