Browse Source

Merge branch 'master' of https://github.com/ValvePython/steam

pull/35/head
Philipp Joos 9 years ago
parent
commit
2ce9be2ce6
  1. 1
      .travis.yml
  2. 7
      README.rst
  3. 7
      docs/intro.rst
  4. 1
      requirements.txt
  5. 69
      steam/client/__init__.py
  6. 49
      steam/client/builtins/misc.py
  7. 52
      steam/core/cm.py
  8. 5
      steam/enums/common.py
  9. 285
      steam/guard.py
  10. 10
      steam/util/__init__.py
  11. 12
      steam/webauth.py

1
.travis.yml

@ -12,6 +12,7 @@ install:
- pip install scrutinizer-ocular
script:
- make test
- make docs
after_script:
- coveralls
- ocular

7
README.rst

@ -6,13 +6,14 @@ Supports Python ``2.7+`` and ``3.3+``.
Documentation: http://steam.readthedocs.io/en/latest/
Main features
-------------
Key features
------------
* `SteamAuthenticator <http://valvepython.github.io/steam/api/steam.guard.html>`_ - enable/disable/manage 2FA on account and generate codes
* `SteamClient <http://valvepython.github.io/steam/api/steam.client.html>`_ - communication with the steam network based on ``gevent``.
* `SteamID <http://valvepython.github.io/steam/api/steam.client.html>`_ - convert between the various ID representations with ease
* `WebAPI <http://valvepython.github.io/steam/api/steam.webapi.html>`_ - simple API for Steam's Web API with automatic population of interfaces
* `WebAuth <http://valvepython.github.io/steam/api/steam.webauth.html>`_ - authentication for access to ``store.steampowered.com`` and ``steamcommunity.com``
* `SteamClient <http://valvepython.github.io/steam/api/steam.client.html>`_ - communication with the steam network based on ``gevent``.
Checkout the `User guide <http://valvepython.github.io/steam/user_guide.html>`_ for examples,
or the `API Reference <http://valvepython.github.io/steam/api/index.html>`_ for details.

7
docs/intro.rst

@ -11,13 +11,14 @@ A python module for interacting with various parts of Steam_.
Supports Python ``2.7+`` and ``3.3+``.
Main features
=============
Key features
============
* :doc:`SteamAuthenticator <api/steam.guard>` - enable/disable/manage 2FA on account and generate codes
* :doc:`SteamClient <api/steam.client>` - communication with the steam network based on ``gevent``
* :doc:`SteamID <api/steam.steamid>` - convert between the various ID representations with ease
* :doc:`WebAPI <api/steam.webapi>` - simple API for Steam's Web API with automatic population of interfaces
* :doc:`WebAuth <api/steam.webauth>` - authentication for access to ``store.steampowered.com`` and ``steamcommunity.com``
* :doc:`SteamClient <api/steam.client>` - communication with the steam network based on ``gevent``.
Checkout the :doc:`user_guide` for examples, or the :doc:`api/index` for details.

1
requirements.txt

@ -11,3 +11,4 @@ PyYAML==3.11
requests==2.9.1
vcrpy==1.7.4
vdf==2.0
sphinx==1.3.5

69
steam/client/__init__.py

@ -11,6 +11,7 @@ Events
| ``error`` - after login failure
| ``auth_code_required`` - either email code or 2FA code is needed for login
| ``logged_on`` - after successful login, client can send messages
| ``new_login_key`` - after new login key has been received and acknowledged
| :class:`EMsg <steam.enums.emsg.EMsg>` - all messages are emitted with their ``EMsg``
@ -25,6 +26,9 @@ Events
"""
import os
import json
from time import time
from io import open
import logging
import gevent
import gevent.monkey
@ -39,14 +43,16 @@ from steam.core.msg import MsgProto
from steam.core.cm import CMClient
from steam import SteamID
from steam.client.builtins import BuiltinBase
from steam.util import ip_from_int
class SteamClient(CMClient, BuiltinBase):
_cm_servers_timestamp = None # used to decide when to update CM list on disk
_reconnect_backoff_c = 0
current_jobid = 0
credential_location = None #: location for sentry
username = None #: username when logged on
login_key = None #: can be used for subsequent logins (no 2FA code will be required)
credential_location = None #: location for sentry
username = None #: username when logged on
login_key = None #: can be used for subsequent logins (no 2FA code will be required)
def __init__(self):
CMClient.__init__(self)
@ -78,13 +84,56 @@ class SteamClient(CMClient, BuiltinBase):
"""
self.credential_location = path
def connect(self, *args, **kwargs):
"""Attempt to establish connection, see :method:`.CMClient.connect`"""
self._bootstrap_cm_list_from_file()
CMClient.connect(self, *args, **kwargs)
def disconnect(self, *args, **kwargs):
"""
Close connection
"""
"""Close connection, see :method:`.CMClient.disconnect`"""
self.logged_on = False
CMClient.disconnect(self, *args, **kwargs)
def _bootstrap_cm_list_from_file(self):
if not self.credential_location or self._cm_servers_timestamp is not None: return
filepath = os.path.join(self.credential_location, 'cm_servers.json')
if not os.path.isfile(filepath): return
self._LOG.debug("Reading CM servers from %s" % repr(filepath))
try:
with open(filepath, 'r') as f:
data = json.load(f)
except IOError as e:
self._LOG.error("load %s: %s" % (repr(filepath), str(e)))
return
self.cm_servers.clear()
self.cm_servers.merge_list(data['servers'])
self._cm_servers_timestamp = int(data['timestamp'])
def _handle_cm_list(self, msg):
if self._cm_servers_timestamp is None:
self.cm_servers.clear()
self._cm_servers_timestamp = int(time())
CMClient._handle_cm_list(self, msg) # just merges the list
if self.credential_location:
filepath = os.path.join(self.credential_location, 'cm_servers.json')
if not os.path.exists(filepath) or time() - (3600*24) > self._cm_servers_timestamp:
data = {
'timestamp': self._cm_servers_timestamp,
'servers': list(zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)),
}
try:
with open(filepath, 'wb') as f:
f.write(json.dumps(data, indent=True).encode('ascii'))
self._LOG.debug("Saved CM servers to %s" % repr(filepath))
except IOError as e:
self._LOG.error("saving %s: %s" % (filepath, str(e)))
def _handle_jobs(self, event, *args):
if isinstance(event, EMsg):
message = args[0]
@ -134,10 +183,14 @@ class SteamClient(CMClient, BuiltinBase):
self.emit("auth_code_required", is_2fa, code_mismatch)
def _handle_login_key(self, message):
self.login_key = message.body.login_key
resp = MsgProto(EMsg.ClientNewLoginKeyAccepted)
resp.body.unique_id = message.body.unique_id
self.send(resp)
if self.logged_on:
self.send(resp)
gevent.idle()
self.login_key = message.body.login_key
self.emit("new_login_key")
def _handle_update_machine_auth(self, message):
ok = self.store_sentry(self.username, message.body.bytes)

49
steam/client/builtins/misc.py

@ -69,6 +69,26 @@ class SteamUnifiedMessages(EventEmitter):
Incoming messages are emitted as events once with their ``jobid``
and once with their method name (e.g. ``Player.GetGameBadgeLevels#1``)
Example code:
.. code:: python
response = client.unified_messages.send_and_wait('Player.GetGameBadgeLevels#1', {
'property': 1,
'something': 'value',
})
# or alternatively
jobid = client.unified_message.send('Player.GetGameBadgeLevels#1', {'something': 1})
response, = client.unified_message.wait_event(jobid)
# or
message = client.unified_message.get('Player.GetGameBadgeLevels#1')
message.something = 1
response = client.unified_message.send_and_wait(message)
"""
def __init__(self, steam, logger_name=None):
self._LOG = logging.getLogger(logger_name if logger_name else self.__class__.__name__)
@ -117,10 +137,15 @@ class SteamUnifiedMessages(EventEmitter):
self._data[message] = method_name
return message
def send(self, message):
def send(self, message, params=None):
"""Send service method request
:param message: proto message instance (use :meth:`SteamUnifiedMessages.get`)
:param message:
proto message instance (use :meth:`SteamUnifiedMessages.get`)
or method name (e.g. ``Player.GetGameBadgeLevels#1``)
:type message: :class:`str`, proto message instance
:param params: message parameters
:type params: :class:`dict`
:return: ``jobid`` event identifier
:rtype: :class:`str`
@ -129,19 +154,33 @@ class SteamUnifiedMessages(EventEmitter):
.. note::
If you listen for ``jobid`` on the client instance you will get the encapsulated message
"""
if isinstance(message, str):
message = self.get(message)
if message not in self._data:
raise ValueError("Supplied message seems to be invalid. Did you use 'get' method?")
if params:
for k, v in params.items():
if isinstance(v, list):
getattr(message, k).extend(v)
else:
setattr(message, k, v)
capsule = MsgProto(EMsg.ClientServiceMethod)
capsule.body.method_name = self._data[message]
capsule.body.serialized_method = message.SerializeToString()
return self._steam.send_job(capsule)
def send_and_wait(self, message, timeout=None, raises=False):
def send_and_wait(self, message, params=None, timeout=None, raises=False):
"""Send service method request and wait for response
:param message: proto message instance (use :meth:`get`)
:param message:
proto message instance (use :meth:`SteamUnifiedMessages.get`)
or method name (e.g. ``Player.GetGameBadgeLevels#1``)
:type message: :class:`str`, proto message instance
:param params: message parameters
:type params: :class:`dict`
:param timeout: (optional) seconds to wait
:type timeout: :class:`int`
:param raises: (optional) On timeout if :class:`False` return :class:`None`, else raise :class:`gevent.Timeout`
@ -150,7 +189,7 @@ class SteamUnifiedMessages(EventEmitter):
:rtype: proto message, :class:`None`
:raises: ``gevent.Timeout``
"""
job_id = self.send(message)
job_id = self.send(message, params)
resp = self.wait_event(job_id, timeout, raises=raises)
if resp is None and not raises:
return None

52
steam/core/cm.py

@ -34,7 +34,7 @@ class CMClient(EventEmitter):
UDP = 1 #: UDP protocol enum
verbose_debug = False #: print message connects in debug
servers = None #: a instance of :class:`steam.core.cm.CMServerList`
cm_servers = None #: a instance of :class:`steam.core.cm.CMServerList`
current_server_addr = None #: (ip, port) tuple
_seen_logon = False
_connecting = False
@ -54,7 +54,7 @@ class CMClient(EventEmitter):
def __init__(self, protocol=0):
self._LOG = logging.getLogger("CMClient")
self.servers = CMServerList()
self.cm_servers = CMServerList()
if protocol == CMClient.TCP:
self.connection = TCPConnection()
@ -64,7 +64,7 @@ class CMClient(EventEmitter):
self.on(EMsg.ChannelEncryptRequest, self.__handle_encrypt_request),
self.on(EMsg.Multi, self.__handle_multi),
self.on(EMsg.ClientLogOnResponse, self._handle_logon),
self.on(EMsg.ClientCMList, self.__handle_cm_list),
self.on(EMsg.ClientCMList, self._handle_cm_list),
def emit(self, event, *args):
if event is not None:
@ -96,7 +96,7 @@ class CMClient(EventEmitter):
self._LOG.debug("Connect initiated.")
for i, server_addr in enumerate(self.servers):
for i, server_addr in enumerate(self.cm_servers):
if retry and i > retry:
return False
@ -278,7 +278,7 @@ class CMClient(EventEmitter):
result = self.wait_event(EMsg.ChannelEncryptResult, timeout=5)
if result is None:
self.servers.mark_bad(self.current_server_addr)
self.cm_servers.mark_bad(self.current_server_addr)
gevent.spawn(self.disconnect)
return
@ -337,7 +337,7 @@ class CMClient(EventEmitter):
if result in (EResult.TryAnotherCM,
EResult.ServiceUnavailable
):
self.servers.mark_bad(self.current_server_addr)
self.cm_servers.mark_bad(self.current_server_addr)
self.disconnect()
elif result == EResult.OK:
self._seen_logon = True
@ -360,11 +360,11 @@ class CMClient(EventEmitter):
self.emit("error", EResult(result))
self.disconnect()
def __handle_cm_list(self, msg):
def _handle_cm_list(self, msg):
self._LOG.debug("Updating CM list")
new_servers = zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)
self.servers.merge_list(new_servers)
self.cm_servers.merge_list(new_servers)
class CMServerList(object):
@ -390,19 +390,26 @@ class CMServerList(object):
Bad = 2
def __init__(self, bad_timespan=300):
self._log = logging.getLogger("CMServerList")
self._LOG = logging.getLogger("CMServerList")
self.bad_timespan = bad_timespan
self.list = defaultdict(dict)
self.bootstrap_from_builtin_list()
def clear(self):
"""Clears the server list"""
if len(self.list):
self._LOG.debug("List cleared.")
self.list.clear()
def bootstrap_from_builtin_list(self):
"""
Resets the server list to the built in one.
This method is called during initialization.
"""
self.list.clear()
self._LOG.debug("Bootstraping from builtin list")
self.clear()
# build-in list
self.merge_list([('162.254.195.44', 27019), ('146.66.152.11', 27017),
@ -431,33 +438,33 @@ class CMServerList(object):
:return: booststrap success
:rtype: :class:`bool`
"""
from steam import _webapi
self._LOG.debug("Attempting bootstrap via WebAPI")
from steam import _webapi
try:
resp = _webapi.get('ISteamDirectory', 'GetCMList', 1, params={'cellid': cellid})
except Exception as exp:
self._log.error("WebAPI boostrap failed: %s" % str(exp))
self._LOG.error("WebAPI boostrap failed: %s" % str(exp))
return False
result = EResult(resp['response']['result'])
if result != EResult.OK:
self._log.error("GetCMList failed with %s" % repr(result))
self._LOG.error("GetCMList failed with %s" % repr(result))
return False
serverlist = resp['response']['serverlist']
self._log.debug("Recieved %d servers from WebAPI" % len(serverlist))
self._LOG.debug("Recieved %d servers from WebAPI" % len(serverlist))
def str_to_tuple(serveraddr):
ip, port = serveraddr.split(':')
return str(ip), int(port)
self.list.clear()
self.clear()
self.merge_list(map(str_to_tuple, serverlist))
return True
def __iter__(self):
def genfunc():
while True:
@ -477,7 +484,7 @@ class CMServerList(object):
def reset_all(self):
"""Reset status for all servers in the list"""
self._log.debug("Marking all CMs as Good.")
self._LOG.debug("Marking all CMs as Good.")
for key in self.list:
self.mark_good(key)
@ -496,19 +503,20 @@ class CMServerList(object):
:param server_addr: (ip, port) tuple
:type server_addr: :class:`tuple`
"""
self._log.debug("Marking %s as Bad." % repr(server_addr))
self._LOG.debug("Marking %s as Bad." % repr(server_addr))
self.list[server_addr].update({'quality': CMServerList.Bad, 'timestamp': time()})
def merge_list(self, new_list):
"""Add new CM servers to the list
:param new_list: a list of (ip, port) tuples
:param new_list: a list of ``(ip, port)`` tuples
:type new_list: :class:`list`
"""
total = len(self.list)
for ip, port in new_list:
self.mark_good((ip, port))
if (ip, port) not in self.list:
self.mark_good((ip, port))
if total:
self._log.debug("Added %d new CM addresses." % (len(self.list) - total))
if len(self.list) > total:
self._LOG.debug("Added %d new CM addresses." % (len(self.list) - total))

5
steam/enums/common.py

@ -369,6 +369,11 @@ class ELeaderboardUploadScoreMethod(SteamIntEnum):
KeepBest = 1
ForceUpdate = 2
class ETwoFactorTokenType(SteamIntEnum):
NONE = 0
ValveMobileApp = 1
ThirdParty = 2
# Do not remove
from sys import modules
from enum import EnumMeta

285
steam/guard.py

@ -1,8 +1,259 @@
"""
This submodule contains various functionality related to Steam Guard.
:class:`SteamAuthenticator` provides methods for genereating codes
and enabling 2FA on a Steam account. Operations managing the authenticator
on an account require an instance of either :class:`.MobileWebAuth` or
:class:`.SteamClient`. The instance needs to be logged in.
Adding an authenticator
.. code:: python
sa = SteamAuthenticator(medium=medium)
sa.add() # SMS code will be send to account's phone number
sa.secrets # dict with authenticator secrets, make sure you save them
sa.finalize('SMS CODE') # activate the authenticator
sa.get_code() # generate 2FA code for login
sa.remove() # removes the authenticator from the account
.. warning::
Before you finalize the authenticator, make sure to save your secrets.
Otherwise you will lose access to the account.
Once authenticator is enabled all you need is the secrets to generate codes.
.. code:: python
sa = SteamAuthenticator(secrets)
sa.get_code()
You can obtain the authenticator secrets from an Android device using
:func:`extract_secrets_from_android_rooted`. See the function docstring for
details on what is required for it to work.
"""
import json
import subprocess
import struct
import requests
from base64 import b64decode, b64encode
from binascii import hexlify
from time import time
from steam import webapi
from steam.enums import ETwoFactorTokenType
from steam.steamid import SteamID
from steam.core.crypto import hmac_sha1, sha1_hash
from steam.enums.common import EResult
from steam.webauth import MobileWebAuth
from steam.util import proto_to_dict
class SteamAuthenticator(object):
"""Add/Remove authenticator from an account. Generate 2FA and confirmation codes."""
_finalize_attempts = 5
medium = None #: instance of :class:`.MobileWebAuth` or :class:`.SteamClient`
steam_time_offset = None #: offset from steam server time
secrets = None #: :class:`dict` with authenticator secrets
def __init__(self, secrets=None, medium=None):
"""
:param secret: a dict of authenticator secrets
:type secret: dict
:param medium: logged on session for steam user
:type mediumm: :class:`.MobileWebAuth`, :class:`.SteamClient`
"""
self.secrets = secrets or {}
self.medium = medium
def __getattr__(self, key):
if key not in self.secrets:
raise AttributeError("No such attribute")
return self.secrets[key]
def get_time(self):
"""
:return: Steam aligned timestamp
:rtype: int
"""
if self.steam_time_offset is None:
self.steam_time_offset = get_time_offset()
return int(time() + self.steam_time_offset)
def get_code(self, timestamp=None):
"""
:param timestamp: time to use for code generation
:type timestamp: int
:return: two factor code
:rtype: str
"""
return generate_twofactor_code_for_time(b64decode(self.shared_secret),
self.get_time() if timestamp is None else timestamp)
def get_confirmation_key(self, tag='', timestamp=None):
"""
:param tag: see :func:`generate_confimation_key` for this value
:type tag: str
:param timestamp: time to use for code generation
:type timestamp: int
:return: trade confirmation key
:rtype: str
"""
return generate_confirmation_key(b64decode(self.identity_secret), tag,
self.get_time() if timestamp is None else timestamp)
def _send_request(self, action, params):
action_map = {
'add': 'AddAuthenticator',
'finalize': 'FinalizeAddAuthenticator',
'remove': 'RemoveAuthenticator',
'status': 'QueryStatus',
'createcodes': 'CreateEmergencyCodes',
'destroycodes': 'DestroyEmergencyCodes',
}
medium = self.medium
if isinstance(medium, MobileWebAuth):
if not medium.complete:
raise SteamAuthenticatorError("MobileWebAuth instance not logged in")
params['access_token'] = medium.oauth_token
params['http_timeout'] = 10
try:
resp = webapi.post('ITwoFactorService', action_map[action], 1, params=params)
except requests.exceptions.RequestException as exp:
raise SteamAuthenticatorError("Error adding via WebAPI: %s" % str(exp))
resp = resp['response']
else:
if not medium.logged_on:
raise SteamAuthenticatorError("SteamClient instance not logged in")
resp = medium.unified_messages.send_and_wait("TwoFactor.%s#1" % action_map[action],
params, timeout=10)
if resp is None:
raise SteamAuthenticatorError("Failed to add authenticator. Request timeout")
resp = proto_to_dict(resp)
if action == 'add':
for key in ['shared_secret', 'identity_secret', 'secret_1']:
resp[key] = b64encode(resp[key])
return resp
def add(self):
"""Add authenticator to an account.
The account's phone number will receive a SMS code required for :meth:`finalize`.
:raises: :class:`SteamAuthenticatorError`
"""
params = {
'steamid': self.medium.steam_id,
'authenticator_time': int(time()),
'authenticator_type': int(ETwoFactorTokenType.ValveMobileApp),
'device_identifier': generate_device_id(self.medium.steam_id),
'sms_phone_id': '1',
}
resp = self._send_request('add', params)
if resp['status'] != EResult.OK:
raise SteamAuthenticatorError("Failed to add authenticator. Error: %s" % repr(EResult(resp['status'])))
for key in ['shared_secret', 'identity_secret', 'serial_number', 'secret_1', 'revocation_code', 'token_gid']:
if key in resp:
self.secrets[key] = resp[key]
self.steam_time_offset = int(resp['server_time']) - time()
def finalize(self, activation_code):
"""Finalize authenticator with received SMS code
:param activation_code: SMS code
:type activation_code: str
:raises: :class:`SteamAuthenticatorError`
"""
params = {
'steamid': self.medium.steam_id,
'authenticator_time': int(time()),
'authenticator_code': self.get_code(),
'activation_code': activation_code,
}
resp = self._send_request('finalize', params)
if resp['status'] != EResult.TwoFactorActivationCodeMismatch and resp.get('want_more', False) and self._finalize_attempts:
self.steam_time_offset += 30
self._finalize_attempts -= 1
self.finalize(activation_code)
return
elif not resp['success']:
self._finalize_attempts = 5
raise SteamAuthenticatorError("Failed to finalize authenticator. Error: %s" % repr(EResult(resp['status'])))
self.steam_time_offset = int(resp['server_time']) - time()
def remove(self):
"""Remove authenticator
.. warning::
Doesn't work via :class:`.SteamClient`. Disabled by Valve
:raises: :class:`SteamAuthenticatorError`
"""
if not self.secrets:
raise SteamAuthenticatorError("No authenticator secrets available?")
params = {
'steamid': self.medium.steam_id,
'revocation_code': self.revocation_code,
'steamguard_scheme': 1,
}
resp = self._send_request('remove', params)
if not resp['success']:
raise SteamAuthenticatorError("Failed to remove authenticator. (attempts remaining: %s)" % (
resp['revocation_attempts_remaining'],
))
self.secrets.clear()
def status(self, medium=None):
"""Fetch authenticator status for the account
:raises: :class:`SteamAuthenticatorError`
:return: dict with status parameters
:rtype: dict
"""
params = {'steamid': self.medium.steam_id}
return self._send_request('status', params)
def create_emergency_codes(self):
"""Generate emergency codes
:raises: :class:`SteamAuthenticatorError`
:return: list of codes
:rtype: list
"""
return self._send_request('createcodes', {}).get('code', [])
def destroy_emergency_codes(self):
"""Destroy all emergency codes
:raises: :class:`SteamAuthenticatorError`
"""
params = {'steamid': self.medium.steam_id}
self._send_request('destroycodes', params)
class SteamAuthenticatorError(Exception):
pass
def generate_twofactor_code(shared_secret):
"""Generate Steam 2FA code for login with current time
@ -69,7 +320,7 @@ def get_time_offset():
:rtype: int
"""
try:
resp = webapi.post('ITwoFactorService', 'QueryTime', 1, params={'http_timeout': 5})
resp = webapi.post('ITwoFactorService', 'QueryTime', 1, params={'http_timeout': 10})
except:
return 0
@ -84,5 +335,35 @@ def generate_device_id(steamid):
:return: android device id
:rtype: str
"""
h = hexlify(sha1(str(steamid).encode('ascii'))).decode('ascii')
h = hexlify(sha1_hash(str(steamid).encode('ascii'))).decode('ascii')
return "android:%s-%s-%s-%s-%s" % (h[:8], h[8:12], h[12:16], h[16:20], h[20:32])
def extract_secrets_from_android_rooted(adb_path='adb'):
"""Extract Steam Authenticator secrets from a rooted Android device
Prerequisite for this to work:
- rooted android device
- `adb binary <https://developer.android.com/studio/command-line/adb.html>`_
- device in debug mode, connected and paired
.. note::
If you know how to make this work, without requiring a the device to be rooted,
please open a issue on github. Thanks
:param adb_path: path to adb binary
:type adb_path: str
:raises: When there is any problem
:return: all secrets from the device, steamid as key
:rtype: dict
"""
data = subprocess.check_output([
adb_path, 'shell', 'su', '-c',
"'cat /data/data/com.valvesoftware.android.steam.community/files/Steamguard*'"
])
if data[0] != "{":
raise RuntimeError("Got invalid data: %s" % repr(data))
return {int(x['steamid']): x
for x in map(json.loads, data.replace("}{", '}||||{').split('|||||'))}

10
steam/util/__init__.py

@ -59,6 +59,16 @@ def clear_proto_bit(emsg):
"""
return int(emsg) & ~protobuf_mask
def proto_to_dict(message):
"""Converts protobuf message instance to dict (shallow)
:param message: protobuf message
:return: parameters and their values
:rtype: dict
"""
return {field.name: getattr(message, field.name, field.default_value)
for field in message.DESCRIPTOR.fields}
def chunks(arr, size):
"""Splits a list into chunks

12
steam/webauth.py

@ -78,7 +78,7 @@ class WebAuth(object):
complete = False #: whether authentication has been completed successfully
session = None #: :class:`requests.Session` (with auth cookies after auth is complete)
captcha_gid = -1
steamid = None #: :class:`steam.steamid.SteamID` (after auth is complete)
steam_id = None #: :class:`steam.steamid.SteamID` (after auth is complete)
def __init__(self, username, password):
self.__dict__.update(locals())
@ -134,7 +134,7 @@ class WebAuth(object):
'username' : self.username,
"password": b64encode(self.key.encrypt(self.password.encode('ascii'), PKCS1v15())),
"emailauth": email_code,
"emailsteamid": str(self.steamid) if email_code else '',
"emailsteamid": str(self.steam_id) if email_code else '',
"twofactorcode": twofactor_code,
"captchagid": self.captcha_gid,
"captcha_text": captcha,
@ -150,7 +150,7 @@ class WebAuth(object):
raise HTTPError(str(e))
def _finalize_login(self, login_response):
self.steamid = SteamID(login_response['transfer_parameters']['steamid'])
self.steam_id = SteamID(login_response['transfer_parameters']['steamid'])
def login(self, captcha='', email_code='', twofactor_code='', language='english'):
"""Attempts web login and returns on a session with cookies set
@ -200,7 +200,7 @@ class WebAuth(object):
raise CaptchaRequired(resp['message'])
elif resp.get('emailauth_needed', False):
self.steamid = SteamID(resp['emailsteamid'])
self.steam_id = SteamID(resp['emailsteamid'])
raise EmailCodeRequired(resp['message'])
elif resp.get('requires_twofactor', False):
raise TwoFactorCodeRequired(resp['message'])
@ -219,7 +219,7 @@ class MobileWebAuth(WebAuth):
'username' : self.username,
"password": b64encode(self.key.encrypt(self.password.encode('ascii'), PKCS1v15())),
"emailauth": email_code,
"emailsteamid": str(self.steamid) if email_code else '',
"emailsteamid": str(self.steam_id) if email_code else '',
"twofactorcode": twofactor_code,
"captchagid": self.captcha_gid,
"captcha_text": captcha,
@ -244,7 +244,7 @@ class MobileWebAuth(WebAuth):
def _finalize_login(self, login_response):
data = json.loads(login_response['oauth'])
self.steamid = SteamID(data['steamid'])
self.steam_id = SteamID(data['steamid'])
self.oauth_token = data['oauth_token']

Loading…
Cancel
Save