Browse Source

Merge branch 'master' of https://github.com/philippj/steam

pull/35/head
Philipp Joos 9 years ago
parent
commit
d91608ccae
  1. 1
      requirements.txt
  2. 314
      steam/account.py
  3. 4
      steam/guard.py
  4. 363
      steam/webpresence.py

1
requirements.txt

@ -1,5 +1,4 @@
coverage==4.0.3
setuptools>=11.3
gevent-eventemitter==1.4
enum34==1.1.2; python_version < '3.4'
gevent==1.1.0

314
steam/account.py

@ -10,14 +10,7 @@ Example usage:
account = steam.account.SteamAccount('username', 'password')
account.set_account_property('identity_secret', 'XYZ')
account.set_account_property('shared_secret', 'XYZ')
code = account.login_code
key = account.get_confirmation_key('conf')
TODO:
- Where to save the credentials (Windows/Linux)?
- Implement mobile authenticator features?
api_key = account.get_api_key()
"""
import os
import sys
@ -29,37 +22,40 @@ from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from steam.guard import *
import steam.guard
import steam.webauth
import steam.webapi
if sys.platform.startswith('win'):
BASE_LOCATION = '.' #Windows
else:
BASE_LOCATION = '.'
DEFAULT_MOBILE_HEADERS = {
'X-Requested-With': 'com.valvesoftware.android.steam.community',
'User-agent': 'Mozilla/5.0 (Linux; U; Android 4.1.1; en-us; Google Nexus 4 - 4.1.1 - API 16 - 768x1280 Build/JRO03S) \
AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30'
}
ACCOUNT_ATTRIBUTES = ['username', 'password', 'steamid', 'shared_secret', 'identity_secret', 'revocation_code',\
'secret_1', 'serial_number', 'deviceid', 'oauth_token']
'secret_1', 'serial_number', 'deviceid', 'oauth_token', 'apikey']
class SteamAccount(object):
username = None
password = None
mobile_session = None
web_session = None
web_api = None
authenticator = None
_path = None
_file = None
_fernet_key = None
_fernet_suite = None
_web_auth = None
_session = None
_mobile_auth = None
def __init__(self, username, password):
self.username = username
self.password = password
self._setup()
if not self._setup():
raise SteamAccountException('Could not access account.')
def __del__(self):
try:
@ -75,189 +71,113 @@ class SteamAccount(object):
setattr(self, property, value)
self._update_credential_file()
def del_account_property(self, property):
delattr(self, property)
self._update_credential_file()
def check_account_property(self, property):
return hasattr(self, property)
@property
def login_code(self):
try:
return generate_twofactor_code(self.shared_secret)
except AttributeError:
if self.authenticator:
return self.authenticator.get_code()
elif hasattr(self, 'shared_secret'):
return steam.guard.generate_twofactor_code(self.shared_secret)
else:
raise SharedSecretNotSet('Add shared_secret to this instance to generate login codes')
def get_confirmation_key(self, tag, timestamp=None):
if not timestamp:
timestamp = get_time_offset()
try:
return generate_confirmation_key(self.identity_secret, timestamp, tag)
except AttributeError:
raise IdentitySecretNotSet('Add identity_secret to this instance to generate confirmation keys')
def fetch_mobile_confirmations(self, retries=1):
self._verify_mobile_session()
if not self._verify_mobile_authenticator():
raise MobileAuthenticatorException('The steam mobile authenticator is required to access the mobile confirmations.')
timestamp = get_time_offset()
confirmation_key = self.get_confirmation_key('conf', timestamp)
confirmation_uri = 'https://steamcommunity.com/mobileconf/conf?p=%s&a=%s&k=%s&t=%s&m=android&tag=conf' %\
( self.deviceid, self.steamid, confirmation_key, timestamp)
response = self.session.get(confirmation_uri, headers=DEFAULT_MOBILE_HEADERS)
raw_confirmations = [ ]
if response.status_code == 200:
if 'Invalid authenticator' in response.text:
retries += 1
return self.fetch_mobile_confirmations(retries)
confirmation_ids = re.findall(r'data-confid="(\d+)"', response.text)
confirmation_keys = re.findall(r'data-key="(\d+)"', response.text)
confirmation_descriptions = re.findall(r'<div>((Confirm|Trade with|Sell -) .+)<\/div>', response.text)
if confirmation_ids and confirmation_keys:
for index, confirmation_id in enumerate(confirmation_ids):
raw_confirmations.append({
'id': confirmation_id,
'key': confirmation_keys[index],
'description': confirmation_descriptions[index]
})
return raw_confirmations
return [ ]
def add_mobile_authenticator(self):
if self._verify_mobile_authenticator():
raise MobileAuthenticatorException('The steam mobile authenticator is already enabled.')
self._verify_mobile_session()
deviceid = getattr(self, 'deviceid') or generate_device_id(self.steamid)
data = {
'steamid': self.steamid,
'sms_phone_id': 1,
'access_token': self.oauth_token,
'authenticator_time': get_time_offset(),
'authenticator_type': 1,
'device_identifier': deviceid
}
response = self.session.post('https://api.steampowered.com/ITwoFactorService/AddAuthenticator/v1/',
data, headers=DEFAULT_MOBILE_HEADERS)
if response.status_code == 200:
response_json = json.loads(response.text)
if response_json.get('response').get('status') == 1:
self.set_account_property('shared_secret', response_json.get('response').get('shared_secret'))
self.set_account_property('identity_secret', response_json.get('response').get('identity_secret'))
self.set_account_property('revocation_code', response_json.get('response').get('revocation_code'))
self.set_account_property('secret_1', response_json.get('response').get('secret_1'))
self.set_account_property('serial_number', response_json.get('response').get('serial_number'))
self.set_account_property('deviceid', deviceid)
return True
return False
def finalize_mobile_authenticator(self, sms_code, retries=1):
if self._verify_mobile_authenticator():
raise MobileAuthenticatorException('The steam mobile authenticator is already enabled.')
self._verify_mobile_session()
if not sms_code:
raise SMSCodeNotProvided('The sms code is required for finalizing the process of adding the mobile\
authenticator')
timestamp = get_time_offset()
def get_api_key(self, retrieve_if_missing=True):
if self.check_account_property('apikey'):
return self.apikey
data = {
'steamid': self.steamid,
'access_token': self.oauth_token,
'authenticator_time': timestamp,
'authenticator_code': generate_twofactor_code_for_time(self.shared_secret, timestamp),
'activation_code': sms_code
}
elif retrieve_if_missing:
if not self._has_web_session():
self._spawn_web_session()
response = self.session.post('https://api.steampowered.com/ITwoFactorService/FinalizeAddAuthenticator/v1/',
data, headers=DEFAULT_MOBILE_HEADERS)
if response.status_code == 200:
response_json = json.loads(response.text)
if response_json.get('response').get('success'):
self.set_account_property('has_mobile_authenticator', True)
return True
else:
if response_json.get('response').get('success') and retries < 30:
retries += 1
return self._finalize_mobile_authenticator(sms_code, retries)
return False
api_key = self.retrieve_api_key()
self.set_account_property('apikey', api_key)
self._spawn_web_api()
return api_key
else:
raise APIKeyException('Could not return the apikey. The apikey is not set as account property and retrieve_if_missing is not allowed.')
def remove_mobile_authenticator(self):
if not self._verify_mobile_authenticator():
raise MobileAuthenticatorException('The steam mobile authenticator is not enabled.')
def retrieve_api_key(self):
if not self._has_web_session():
raise APIKeyException('A web session is required to retrieve the api key.')
self._verify_mobile_session()
response = self.web_session.get('https://steamcommunity.com/dev/apikey')
data = {
'steamid': self.steamid,
'steamguard_scheme': 2,
'revocation_code': self.revocation_code,
'access_token': self.oauth_token
}
if 'Access Denied' in response.text:
raise APIKeyException('You need at least 1 game on this account to access the steam api key page.')
response = self.session.post('https://api.steampowered.com/ITwoFactorService/RemoveAuthenticator/v1/',
data, headers=DEFAULT_MOBILE_HEADERS)
else:
if 'Register for a new Steam Web API Key' in response.text:
regex_result = re.search(r'<input type="hidden" name="sessionid" value="(.*)">', response.text)
session_id = regex_result.group(1)
if response.status_code == 200:
response_json = json.loads(response.text)
if response_json.get('response').get('success'):
self.set_account_property('has_mobile_authenticator', False)
return True
return False
data = {
'domain': 'localhost.com',
'agreeToTerms': 'agreed',
'submit': 'Register',
'sessionid': session_id
}
def _verify_mobile_session(self):
if not isinstance(self._web_auth, steam.webauth.MobileWebAuth):
raise MobileAuthenticatorException('A mobile session is required.')
self.web_session.post('https://steamcommunity.com/dev/registerkey', data=data)
return self.retrieve_api_key()
if self._web_auth.complete:
raise MobileAuthenticatorException('The mobile session has to be logged in to steam.')
elif 'Your Steam Web API Key' in response.text:
regex_result = re.search(r'<p>Key: (.*)</p>', response.text)
api_key = regex_result.group(1)
return api_key
def _verify_mobile_authenticator(self):
if getattr(self, 'has_mobile_authenticator') and self.has_mobile_authenticator:
return True
return False
else:
raise APIKeyException('An unhandled api key page appeared, please try again.')
def _setup(self):
self._generate_fernet_key()
self._spawn_fernet_suite()
self._path = '%s/%s' % (BASE_LOCATION, self.username)
self._file = open(self._path, 'r+')
if not os.path.isfile(self._path):
self._create_credential_file()
else:
self._file = open(self._path, 'r+', 0)
credentials = self._parse_credential_file()
for key, value in credentials.iteritems():
setattr(self, key, value)
if self.check_account_property('shared_secret'):
self._spawn_authenticator()
else:
self.authenticator = steam.guard.SteamAuthenticator()
if self.check_account_property('apikey'):
self._spawn_web_api()
return True
def _create_credential_file(self):
open(self._path, 'a').close()
self._file = open(self._path, 'r+', 0)
data = json.dumps({
'username': self.username,
'password': self.password
})
text = self._fernet_suite.encrypt(data)
self._file.write(text)
token = self._fernet_suite.encrypt(data)
self._file.write(token)
def _parse_credential_file(self):
text = self._file.read()
data = json.loads(self._fernet_suite.decrypt(text))
self._file.seek(0)
token = self._file.read()
self._file.seek(0)
data = json.loads(self._fernet_suite.decrypt(token))
return data
def _update_credential_file(self):
credentials = self._gather_credentials()
data = json.dumps(credentials)
text = self._fernet_suite.encrypt(data)
token = self._fernet_suite.encrypt(data)
self._file.truncate()
self._file.write(text)
self._file.write(token)
def _gather_credentials(self):
data = { }
@ -275,25 +195,61 @@ class SteamAccount(object):
def _spawn_fernet_suite(self):
self._fernet_suite = Fernet(self._fernet_key)
def _spawn_web_session(self):
def _spawn_web_api(self):
self.web_api = steam.webapi.WebAPI(self.apikey)
def _spawn_authenticator(self):
secrets = {
'identity_secret': getattr(self, 'identity_secret'),
'shared_secret': getattr(self, 'shared_secret'),
'secret_1': getattr(self, 'secret_1'),
'revocation_code': getattr(self, 'revocation_code'),
}
self.authenticator = steam.guard.SteamAuthenticator(secrets)
self._spawn_mobile_session()
self.authenticator.medium = self._mobile_auth
def _has_session(self):
if self._has_web_session() or self._has_mobile_session():
return True
return False
def _has_web_session(self):
if isinstance(self._web_auth, steam.webauth.WebAuth):
return True
return False
def _has_mobile_session(self):
if isinstance(self._mobile_auth, steam.webauth.MobileWebAuth):
return True
return False
def _spawn_web_session(self, captcha='', email_code='', twofactor_code=''):
self._web_auth = steam.webauth.WebAuth(self.username, self.password)
self._login_web_session(self._web_auth, captcha, email_code, twofactor_code)
self.web_session = self._web_auth.session
def _spawn_mobile_session(self, captcha='', email_code='', twofactor_code=''):
self._mobile_auth = steam.webauth.MobileWebAuth(self.username, self.password)
self._login_web_session(self._mobile_auth, captcha, email_code, twofactor_code)
self.mobile_session = self._mobile_auth.session
def _spawn_mobile_session(self):
self._web_auth = steam.webauth.MobileWebAuth(self.username, self.password)
def _login_web_session(self, web_auth, captcha='', email_code='', twofactor_code=''):
if not isinstance(web_auth, steam.webauth.WebAuth) and not isinstance(web_auth, steam.webauth.MobileWebAuth):
raise WebAuthNotComplete('Please supply a valid WebAuth or MobileWebAuth session')
def _login_web_session(self, captcha='', email_code='', twofactor_code=''):
try:
self._web_auth.login()
web_auth.login()
except steam.webauth.CaptchaRequired:
if not captcha:
raise CaptchaNotProvided('The steam login captcha is required for logging in, but was not provided.')
self._web_auth.login(captcha=captcha)
web_auth.login(captcha=captcha)
except steam.webauth.EmailCodeRequired:
if not email_code:
raise EMailCodeNotProvided('The email code is required for logging in, but was not provided.')
self._web_auth.login(email_code=email_code)
web_auth.login(email_code=email_code)
except steam.webauth.TwoFactorCodeRequired:
if not twofactor_code:
@ -301,16 +257,14 @@ class SteamAccount(object):
twofactor_code = self.login_code
except SharedSecretNotSet:
raise TwoFACodeNotProvided('The twofactor code is required for logging in, but was not provided.')
self._web_auth.login(twofactor_code=twofactor_code)
web_auth.login(twofactor_code=twofactor_code)
if self._web_auth.complete:
if not getattr(self, 'steamid'):
self.set_account_property('steamid', self._web_auth.steamid)
if web_auth.complete:
if not hasattr(self, 'steamid'):
self.set_account_property('steamid', web_auth.steam_id)
if isinstance(self._web_auth, steam.webauth.MobileWebAuth) and not getattr(self, 'oauth_token'):
self.set_account_property('oauth_token', self._web_auth.oauth_token)
self._session = self._web_auth.session
if isinstance(web_auth, steam.webauth.MobileWebAuth) and not hasattr(self, 'oauth_token'):
self.set_account_property('oauth_token', web_auth.oauth_token)
else:
raise WebAuthNotComplete('The web authentication could not be completed.')
@ -326,6 +280,12 @@ class IdentitySecretNotSet(SteamAccountException):
class WebAuthNotComplete(SteamAccountException):
pass
class WebException(SteamAccountException):
pass
class APIKeyException(SteamAccountException):
pass
class MobileAuthenticatorException(SteamAccountException):
pass

4
steam/guard.py

@ -101,8 +101,8 @@ class SteamAuthenticator(object):
:return: trade confirmation key
:rtype: str
"""
return generate_confirmation_key(b64decode(self.identity_secret), tag,
self.get_time() if timestamp is None else timestamp)
return generate_confirmation_key(b64decode(self.identity_secret),
self.get_time() if timestamp is None else timestamp, tag)
def _send_request(self, action, params):
medium = self.medium

363
steam/webpresence.py

@ -1,363 +0,0 @@
# -*- coding: utf-8 -*-
"""
Wrapping methods for `ISteamWebUserPresenceOAuth` and `Polling` functionality
Example usage:
.. code:: python
import steam.webauth
import steam.webpresence
webAuth = steam.webauth.MobileWebAuth('username', 'password')
webAuth.login()
def my_callback(messages):
for message in messages:
print '[%s] %s from %s' % (message.timestamp, message.type, message.steamid_from.as_64)
webPresence = steam.webpresence.WebUserPresence(webAuth)
webPresence.logon()
webPresence.start_polling(my_callback)
print 'Started polling'
"""
import threading
import requests
from steam.webauth import MobileWebAuth
from steam import SteamID
DEFAULT_MOBILE_HEADERS = {
'X-Requested-With': 'com.valvesoftware.android.steam.community',
'User-agent': 'Mozilla/5.0 (Linux; U; Android 4.1.1; en-us; Google Nexus 4 - 4.1.1 - API 16 - 768x1280 Build/JRO03S) \
AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30'
}
API_BASE = 'https://api.steampowered.com'
DEFAULT_TIMEOUT = 30
PERSONA_STATES = ['Offline', 'Online', 'Busy', 'Away', 'Snooze', 'Looking to trade', 'Looking to play']
class WebUserPresence:
"""
Wrapping methods for `ISteamWebUserPresenceOAuth` and `Polling` functionality
"""
_loggedon = False
_timeout = None
_oauth_token = None
_session = None
_steamid = None
_umqid = None
_message_base = None
def __init__(self, mobile_web_auth, timeout=None):
if not isinstance(mobile_web_auth, MobileWebAuth):
raise InvalidInstanceSupplied('The instance supplied as parameter is no valid instance of `MobileWebAuth`.')
if not mobile_web_auth.complete:
raise InstanceNotReady('Please make sure your `MobileWebAuth` instance is logged in.')
self._timeout = timeout
self._oauth_token = mobile_web_auth.oauth_token
self._session = mobile_web_auth.session
def _request(self, uri, data, timeout=None):
"""
HTTP Request
:param uri: URI to be requested
:param data: Data to be delivered
:param timeout: HTTP timeout
:return: `requests.Response`
"""
timeout = timeout or self._timeout or DEFAULT_TIMEOUT
return self._session.post(uri, data=data, headers=DEFAULT_MOBILE_HEADERS, timeout=timeout)
def _call(self, method, data, timeout=None):
"""
Calling an `ISteamWebUserPresenceOAuth` api method
:param method: Method to be called
:param data: Data to be delivered
:param timeout: HTTP timeout
:return: If successful, response as tuple, otherwise exceptions are thrown
"""
uri = '%s/ISteamWebUserPresenceOAuth/%s/v1/' % (API_BASE, method)
try:
response = self._request(uri, data, timeout=timeout)
except requests.exceptions.ReadTimeout:
raise HTTPError('Timeout')
if response.status_code == 401:
raise NotAuthorized('Not authorized. Please check your OAuth token and verify your `MobileWebAuth` login.')
elif response.status_code != 200:
raise HTTPError('HTTP request failed. Status code: %s' % response.status_code)
if self._loggedon:
self._message_base += 1
try:
json_response = response.json()
except:
raise ValueError('Could not build json_response')
else:
return json_response
def logon(self):
"""
Sends logon to the api
:return: True if successful. Raises an `LogonFailed` exception otherwise
"""
login_data = {
'access_token': self._oauth_token
}
response = self._call('Logon', login_data)
if response.get('error') == 'OK':
self._steamid = SteamID(response.get('steamid'))
self._umqid = response.get('umqid')
self._message_base = response.get('message')
self._loggedon = True
return True
else:
raise LogonFailed('Logon failed. Please check your OAuth token and verify your `MobileWebAuth` login.')
def logoff(self):
"""
Sends logoff to the api
:return: True if logoff was successful, False otherwise.
"""
response = self._call('Logoff', self._build_call_data({}), True)
self._loggedon = False
return response.get('error') == 'OK' or False
def poll(self):
"""
Starts an poll request
:return: Full response as Tuple
"""
poll_data = {
'pollid': 0,
'sectimeout': 5,
'secidletime': 0,
'use_accountids': 1
}
response = self._call('Poll', self._build_call_data(poll_data, True), timeout=60)
if response.get('error') == 'OK':
return response
else:
raise PollCreationFailed(response.get('error'))
def start_polling(self, callback):
"""
Creates an instance of `WebPolling` and starts the thread
:param callback: Function reference for handling `WebPollMessages`
:return: True
"""
web_polling = self._spawn_web_polling(callback)
web_polling.start()
return True
def stop_polling(self):
"""
Stops the active `WebPolling`
:return: True, False.
"""
return self._kill_web_polling()
def message(self, steamid, text):
"""
Delivers a steam message
:param steamid: SteamID64 of the target user
:param text: Message to deliver
:return: True if deliver was successful, False otherwise
"""
message_data = self._build_call_data({
'text': text,
'type': 'saytext',
'steamid_dst': steamid
})
response = self._call('Message', message_data)
return response.get('error') == 'OK' or False
def _spawn_web_polling(self, callback):
"""
Spawns a `WebPolling` instance
:param callback: A method reference for handling incoming `WebPollMessages`
:return: Instance reference
"""
self._web_polling = WebPolling(self, callback)
return self._web_polling
def _kill_web_polling(self):
"""
Stops the run() method of WebPolling after the current poll request
:return: True if `_web_polling` is spawned and could be stopped, False otherwise
"""
if getattr(self, '_web_polling'):
self._web_polling._active = False
return True
return False
def _prep_messages(self, messages):
"""
Prepares incoming raw `WebPolling` messages by creating `WebPollMessage` instances
:param messages: raw `WebPolling` messages
:return: List of `WebPollMessages` instances
"""
prepped_messages = [ ]
for message in messages:
prepped_messages.append(WebPollMessage(message, self))
return prepped_messages
def _build_call_data(self, data, set_message_base=False):
"""
Builds the data for `_call`'s
:param data: Data to deliver as Tuple
:param set_message_base: True, False. Only required for calls where `message` has to be set
:return: Prepared call data
"""
base_data = {
'umqid': self._umqid,
'access_token': self._oauth_token
}
if set_message_base:
base_data.__setitem__('message', self._message_base)
for key in data:
base_data.__setitem__(key, data.get(key))
return base_data
class WebPolling(threading.Thread):
"""
Threaded class for `Polling`
"""
_active = True
_web_user_presence = None
_callback = None
def __init__(self, web_user_presence, callback):
threading.Thread.__init__(self)
if not isinstance(web_user_presence, WebUserPresence):
raise InvalidInstanceSupplied('The instance supplied as parameter is no valid instance of `WebUserPresence`.')
if not web_user_presence._loggedon:
raise InstanceNotReady('The `WebUserPresence` has to be logged on.')
self._web_user_presence = web_user_presence
self._callback = callback
self.setDaemon(False)
def run(self):
"""
Sends HTTP requests while class is `_active` and calls a specified callback
:return:
"""
while self._active:
try:
response = self._web_user_presence.poll()
except PollCreationFailed:
"""
Ignore timeout exception
"""
pass
except HTTPError:
"""
Ignore http exceptions
"""
pass
else:
prepped_messages = self._web_user_presence._prep_messages(response.get('messages'))
if self._callback:
self._callback(prepped_messages)
class WebPollMessage(object):
"""
Class for proper handling of polling messages
"""
complete = False
_web_user_presence = None
timestamp = 0
type = None
steamid_from = SteamID()
text = None
persona_state = 0
persona_name = None
status_flags = None
_answerable = False
_full_data = None
def __init__(self, message, web_user_presence):
if not isinstance(web_user_presence, WebUserPresence):
raise InvalidInstanceSupplied('The instance supplied as parameter is no valid instance of `WebUserPresence`.')
if not web_user_presence._loggedon:
raise InstanceNotReady('The `WebUserPresence` has to be logged on.')
self._full_data = message
self._web_user_presence = web_user_presence
self.timestamp = message.get('utc_timestamp')
self.type = message.get('type')
self.steamid_from = SteamID(message.get('accountid_from'))
if self.type == 'typing':
self._answerable = True
elif self.type == 'saytext':
self._answerable = True
self.text = message.get('text')
elif self.type == 'personastate':
self.persona_name = message.get('persona_name')
self.persona_state = message.get('persona_state')
self.status_flags = message.get('status_flags')
def answer(self, message):
"""
If the current message is `_answerable` ( current message has something to do with the steam chat ) an
answer can be sent
:param message: Message to be delivered
:return: True if message could be delivered, False otherwise
"""
if not self._answerable:
return False
self._web_user_presence.message(self.steamid_from.as_64, message)
return True
def persona_state_to_str(self):
"""
Returns the `persona_state` as string
:return: `persona_state` as String or False if `persona_state` is not available
"""
if self.persona_state:
return PERSONA_STATES.__getitem__(self.persona_state)
return False
class WebUserPresenceException(Exception):
pass
class InvalidInstanceSupplied(WebUserPresenceException):
pass
class InstanceNotReady(WebUserPresenceException):
pass
class NotAuthorized(WebUserPresenceException):
pass
class LogonFailed(WebUserPresenceException):
pass
class PollCreationFailed(WebUserPresenceException):
pass
class HTTPError(WebUserPresenceException):
pass
Loading…
Cancel
Save