Browse Source

basic CM client implementation

pull/18/merge
Rossen Georgiev 9 years ago
parent
commit
92eb3ae7ea
  1. 277
      steam/client/cm.py
  2. 147
      steam/client/connection.py
  3. 265
      steam/client/msg.py
  4. 13
      steam/enums.py

277
steam/client/cm.py

@ -0,0 +1,277 @@
import struct
import binascii
import logging
import zipfile
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import gevent
from gevent import event
from gevent import queue
from Crypto.Random import random
from steam.steamid import SteamID
from steam.enums import EMsg, EResult, EUniverse
from steam.client import crypto
from steam.client.connection import TCPConnection
from steam.client.msg import is_proto, clear_proto_bit
from steam.client.msg import Msg, MsgProto
server_list = [
('162.254.196.41', '27020'), ('162.254.196.40', '27021'),
('162.254.196.43', '27019'), ('162.254.196.40', '27018'),
('162.254.196.43', '27020'), ('162.254.196.41', '27019'),
('162.254.196.41', '27018'), ('162.254.196.42', '27020'),
('162.254.196.41', '27017'), ('162.254.196.41', '27021'),
('146.66.152.10', '27017'), ('146.66.152.10', '27018'),
('146.66.152.11', '27019'), ('146.66.152.11', '27020'),
('146.66.152.10', '27019'), ('162.254.197.42', '27018'),
('162.254.197.41', '27019'), ('162.254.197.41', '27017'),
('208.78.164.14', '27017'), ('208.78.164.14', '27019'),
('208.78.164.9', '27019'), ('208.78.164.14', '27018'),
('208.78.164.9', '27018'), ('208.78.164.13', '27017'),
]
logger = logging.getLogger("CMClient")
class CMClient:
TCP = 0
UDP = 1
def __init__(self, protocol=0):
self.reconnect = False
self._init_attributes()
self.registered_callbacks = {}
if protocol == CMClient.TCP:
self.connection = TCPConnection()
# elif protocol == CMClient.UDP:
# self.connection = UDPConnection()
else:
raise ValueError("Only TCP is supported")
self.event_connected = event.Event()
self.event_ready = event.Event()
self.event_disconnected = event.Event()
self.register_callback(EMsg.ChannelEncryptRequest, self._handle_encrypt_request),
self.register_callback(EMsg.Multi, self._handle_multi),
self.register_callback(EMsg.ClientLogOnResponse, self._handle_logon),
def connect(self, reconnect=None):
if reconnect is not None:
self.reconnect = reconnect
logger.debug("Connect (reconnect=%s)" % self.reconnect)
while True:
with gevent.Timeout(15, False):
server_addr = random.choice(server_list)
self.connection.connect(server_addr)
if not self.connection.event_connected.is_set():
if self.reconnect:
logger.debug("Failed to connect. Retrying...")
continue
logger.debug("Failed to connect")
return False
break
logger.debug("Event: Connected")
self.event_connected.set()
self._recv_loop = gevent.spawn(self._recv_messages)
return True
def disconnect(self):
self.connection.disconnect()
self._recv_loop.kill(block=False)
self._heartbeat_loop.kill()
self._init_attributes()
self.event_connected.clear()
self.event_ready.clear()
self.event_disconnected.set()
logger.debug("Event: Disconnected")
def _init_attributes(self):
self.key = None
self.steam_id = None
self.session_id = None
self.cell_id = None
self.webapi_nonce = None
self._recv_loop = None
self._heartbeat_loop = None
def send_message(self, message):
if not isinstance(message, (Msg, MsgProto)):
raise ValueError("Expected Msg or MsgProto, got %s" % message)
data = message.serialize()
if self.key:
data = crypto.encrypt(data, self.key)
logger.debug("Outgoing: %s", repr(message.msg))
self.connection.put_message(data)
def _recv_messages(self):
while True:
try:
message = self.connection.get_message(timeout=1)
except queue.Empty:
if not self.connection.event_connected.is_set():
self.disconnect()
if self.reconnect:
gevent.spawn(self.connect)
return
continue
if self.key:
message = crypto.decrypt(message, self.key)
self._parse_message(message)
def _parse_message(self, message):
emsg_id, = struct.unpack_from("<I", message)
emsg = EMsg(clear_proto_bit(emsg_id))
logger.debug("Incoming: %s", repr(emsg))
if emsg in (EMsg.ChannelEncryptRequest,
EMsg.ChannelEncryptResponse,
EMsg.ChannelEncryptResult,
):
msg = Msg(emsg, message)
else:
try:
if is_proto(emsg_id):
msg = MsgProto(emsg, message)
print str(msg)
else:
msg = Msg(emsg, message, extended=True)
except:
logger.fatal("Failed to deserialize message: %s %s",
str(emsg),
is_proto(emsg_id)
)
raise
self.dispatch_message(emsg, msg)
def dispatch_message(self, emsg, msg):
if emsg in self.registered_callbacks:
for callback in list(self.registered_callbacks[emsg]):
if isinstance(callback, event.AsyncResult):
self.unregister_callback(emsg, callback)
callback.set((emsg, msg))
else:
gevent.spawn(callback, emsg, msg)
def register_callback(self, emsg, callback):
if emsg not in self.registered_callbacks:
self.registered_callbacks[emsg] = [callback]
else:
callbacks = self.registered_callbacks[emsg]
if callback not in callbacks:
callbacks.append(callback)
def unregister_callback(self, emsg, callback):
if (emsg not in self.registered_callbacks
or callback not in self.registered_callbacks[emsg]):
return ValueError("Callback is not registered")
callbacks = self.registered_callbacks[emsg]
if len(callbacks) == 1:
self.registered_callbacks.pop(emsg)
else:
callbacks.pop(callbacks.index(callback))
def wait_for_message(self, emsg, block=True, timeout=None):
result = event.AsyncResult()
self.register_callback(emsg, result)
return result.get(block, timeout)[1]
def _handle_encrypt_request(self, emsg, msg):
logger.debug("Securing channel")
if msg.body.protocolVersion != 1:
raise RuntimeError("Unsupported protocol version")
if msg.body.universe != EUniverse.Public:
raise RuntimeError("Unsupported universe")
resp = Msg(EMsg.ChannelEncryptResponse)
key, resp.body.key = crypto.generate_session_key()
resp.body.crc = binascii.crc32(resp.body.key) & 0xffffffff
self.send_message(resp)
msg = self.wait_for_message(EMsg.ChannelEncryptResult)
if msg.body.result != EResult.OK:
logger.debug("Failed to secure channel: %s" % msg.body.result)
self.disconnect()
return
logger.debug("Channel secured")
self.key = key
self.event_ready.set()
logger.debug("Event: Ready")
def _handle_multi(self, emsg, msg):
logger.debug("Unzipping CMsgMulti")
data = zipfile.ZipFile(StringIO(msg.body.message_body)).read('z')
if len(data) != msg.body.size_unzipped:
logger.fatal("Unzipped size mismatch")
self.disconnect()
return
while len(data) > 0:
size, = struct.unpack_from("<I", data)
self._parse_message(data[4:4+size])
data = data[4+size:]
def _heartbeat(self, interval):
message = MsgProto(EMsg.ClientHeartBeat)
while True:
gevent.sleep(interval)
self.send_message(message)
def _handle_logon(self, emsg, msg):
if msg.body.eresult == EResult.OK:
logger.debug("Logon completed")
self.steam_id = SteamID(msg.header.steamid)
self.session_id = msg.header.client_sessionid
self.cell_id = msg.body.cell_id
self.webapi_nonce = msg.body.webapi_authenticate_user_nonce
if self._heartbeat_loop:
self._heartbeat_loop.kill()
logger.debug("Heartbeat started.")
interval = msg.body.out_of_game_heartbeat_seconds
self._heartbeat_loop = gevent.spawn(self._heartbeat, interval)

147
steam/client/connection.py

@ -0,0 +1,147 @@
import struct
import logging
import gevent
from gevent import socket
from gevent import queue
from gevent import event
from gevent.select import select as gselect
logger = logging.getLogger("Connection")
class Connection:
MAGIC = 'VT01'
FMT = '<I4s'
FMT_SIZE = struct.calcsize(FMT)
def __init__(self):
self.socket = None
self.connected = False
self.server_addr = None
self._reader = None
self._writer = None
self._readbuf = ''
self.send_queue = queue.Queue()
self.recv_queue = queue.Queue()
self.event_connected = event.Event()
def connect(self, server_addr):
self._new_socket()
logger.debug("Attempting connection to %s", str(server_addr))
self._connect(server_addr)
self.server_addr = server_addr
self._reader = gevent.spawn(self._reader_loop)
self._writer = gevent.spawn(self._writer_loop)
logger.debug("Connected.")
self.event_connected.set()
def disconnect(self):
if not self.event_connected.is_set():
return
self.server_addr = None
if self._reader:
self._reader.kill(block=False)
self._reader = None
if self._writer:
self._writer.kill(block=False)
self._writer = None
self._readbuf = ''
self.send_queue.queue.clear()
self.recv_queue.queue.clear()
self.socket.close()
logger.debug("Disconnected.")
self.event_connected.clear()
def get_message(self, block=True, timeout=None):
return self.recv_queue.get(block, timeout)
def put_message(self, message):
self.send_queue.put(message)
def _writer_loop(self):
while True:
message = self.send_queue.get()
packet = struct.pack(Connection.FMT, len(message), Connection.MAGIC) + message
try:
self._write_data(packet)
except:
logger.debug("Connection error (writer).")
self.disconnect()
return
def _reader_loop(self):
while True:
rlist, _, _ = gselect([self.socket], [], [])
if self.socket in rlist:
data = self._read_data()
if not data:
logger.debug("Connection error (reader).")
self.disconnect()
return
self._readbuf += data
self._read_packets()
def _read_packets(self):
header_size = Connection.FMT_SIZE
buf = self._readbuf
while len(buf) > header_size:
message_length, magic = struct.unpack_from(Connection.FMT, buf)
if magic != Connection.MAGIC:
raise RuntimeError("invalid magic, got %s" % repr(magic))
packet_length = header_size + message_length
if len(buf) < packet_length:
return
message = buf[header_size:packet_length]
buf = buf[packet_length:]
self.recv_queue.put(message)
self._readbuf = buf
class TCPConnection(Connection):
def _new_socket(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def _connect(self, server_addr):
self.socket.connect(server_addr)
def _read_data(self):
return self.socket.recv(2048)
def _write_data(self, data):
self.socket.sendall(data)
class UDPConnection(Connection):
def _new_socket(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def _connect(self, server_addr):
pass
def _read_data(self):
pass
def _write_data(self, data):
pass

265
steam/client/msg.py

@ -0,0 +1,265 @@
import struct
import fnmatch
from steam.enums import EMsg, EUniverse, EResult
from steam.protobufs import steammessages_base_pb2
from steam.protobufs import steammessages_clientserver_pb2
from steam.protobufs import steammessages_clientserver_2_pb2
class MsgHdr:
_size = struct.calcsize("<Iqq")
def __init__(self, data=None):
if data:
self.load(data)
else:
self.msg = EMsg.Invalid
self.targetJobID = -1
self.sourceJobID = -1
def serialize(self):
return struct.pack("<Iqq",
self.msg,
self.targetJobID,
self.sourceJobID,
)
def load(self, data):
(msg,
self.targetJobID,
self.sourceJobID,
) = struct.unpack_from("<Iqq", data)
self.msg = EMsg(msg)
class ExtendedMsgHdr:
_size = struct.calcsize("<IBHqqBqi")
def __init__(self, data=None):
if data:
self.load(data)
else:
self.msg = EMsg.Invalid
self.headerSize = 36
self.headerVersion = 2
self.targetJobID = -1
self.sourceJobID = -1
self.headerCanary = 239
self.steamID = -1
self.sessionID = -1
def serialize(self):
return struct.pack("<IBHqqBqi",
self.msg,
self.headerSize,
self.headerVersion,
self.targetJobID,
self.sourceJobID,
self.headerCanary,
self.steamID,
self.sessionID,
)
def load(self, data):
(msg,
self.headerSize,
self.headerVersion,
self.targetJobID,
self.sourceJobID,
self.headerCanary,
self.steamID,
self.sessionID,
) = struct.unpack_from("<IBHqqBqi", data)
self.msg = EMsg(msg)
if self.headerSize != 36 or self.headerVersion != 2:
raise RuntimeError("Failed to parse header")
protobuf_mask = 0x80000000
def is_proto(emsg):
return (int(emsg) & protobuf_mask) > 0
def set_proto_bit(emsg):
return int(emsg) | protobuf_mask
def clear_proto_bit(emsg):
return int(emsg) & ~protobuf_mask
class MsgHdrProtoBuf:
_size = struct.calcsize("<II")
def __init__(self, data=None):
self.proto = steammessages_base_pb2.CMsgProtoBufHeader()
if data:
self.load(data)
else:
self.msg = EMsg.Invalid
def serialize(self):
proto_data = self.proto.SerializeToString()
return struct.pack("<II", set_proto_bit(self.msg), len(proto_data)) + proto_data
def load(self, data):
msg, proto_length = struct.unpack_from("<II", data)
self.msg = EMsg(clear_proto_bit(msg))
size = MsgHdrProtoBuf._size
self._fullsize = size + proto_length
self.proto.ParseFromString(data[size:self._fullsize])
class Msg:
def __init__(self, msg, data=None, extended=False):
self.extended = extended
self.header = ExtendedMsgHdr(data) if extended else MsgHdr(data)
self.msg = msg
if data:
data = data[self.header._size:]
if msg == EMsg.ChannelEncryptRequest:
self.header.msg = EMsg.ChannelEncryptRequest
self.body = ChannelEncryptRequest(data)
elif msg == EMsg.ChannelEncryptResponse:
self.header.msg = EMsg.ChannelEncryptResponse
self.body = ChannelEncryptResponse(data)
elif msg == EMsg.ChannelEncryptResult:
self.header.msg = EMsg.ChannelEncryptResult
self.body = ChannelEncryptResult(data)
else:
self.body = None
def serialize(self):
return self.header.serialize() + self.body.serialize()
def __str__(self):
return ''
cmsg_lookup = None
cmsg_lookup2 = None
def get_cmsg(emsg):
global cmsg_lookup, cmsg_lookup2
if emsg == EMsg.Multi:
return steammessages_base_pb2.CMsgMulti
emsg = "cmsg" + str(emsg).lower()
if not cmsg_lookup:
cmsg_list = steammessages_clientserver_pb2.__dict__
cmsg_list = fnmatch.filter(cmsg_list, 'CMsg*')
cmsg_lookup = dict(zip(map(lambda x: x.lower(), cmsg_list), cmsg_list))
name = cmsg_lookup.get(emsg, None)
if name:
return getattr(steammessages_clientserver_pb2, name)
if not cmsg_lookup2:
cmsg_list = steammessages_clientserver_2_pb2.__dict__
cmsg_list = fnmatch.filter(cmsg_list, 'CMsg*')
cmsg_lookup2 = dict(zip(map(lambda x: x.lower(), cmsg_list), cmsg_list))
name = cmsg_lookup2.get(emsg, None)
if name:
return getattr(steammessages_clientserver_2_pb2, name)
return None
class MsgProto:
def __init__(self, msg, data=None):
self._header = MsgHdrProtoBuf(data)
self._header.msg = msg
self.msg = msg
self.header = self._header.proto
self.body = get_cmsg(msg)()
if data:
data = data[self._header._fullsize:]
self.body.ParseFromString(data)
def serialize(self):
return self._header.serialize() + self.body.SerializeToString()
def __str__(self):
return '\n'.join(['MsgProto',
'-' * 20,
str(self.header),
'-' * 20,
str(self.body),
])
class ChannelEncryptRequest:
def __init__(self, data=None):
if data:
self.load(data)
else:
self.protocolVersion = 1
self.universe = EUniverse.Invalid
def serialize(self):
return struct.pack("<II", self.protocolVersion, self.universe)
def load(self, data):
(self.protocolVersion,
universe,
) = struct.unpack_from("<II", data)
self.universe = EUniverse(universe)
class ChannelEncryptResponse:
def __init__(self, data=None):
if data:
self.load(data)
else:
self.protocolVersion = 1
self.keySize = 128
self.key = ''
self.crc = 0
def serialize(self):
return struct.pack("<II128sII",
self.protocolVersion,
self.keySize,
self.key,
self.crc,
0
)
def load(self, data):
(self.protocolVersion,
self.keySize,
self.key,
self.crc,
_,
) = struct.unpack_from("<II128sII", data)
class ChannelEncryptResult:
def __init__(self, data=None):
if data:
self.load(data)
else:
self.result
def serialize(self):
return struct.pack("<I", self.result)
def load(self, data):
(result,) = struct.unpack_from("<I", data)
self.result = EResult(result)

13
steam/enums.py

@ -1,4 +1,7 @@
from enum import Enum
import sys
if sys.version_info > (3,):
long = int
class EasyEnum(Enum):
@ -9,17 +12,20 @@ class EasyEnum(Enum):
return self.name
def __eq__(self, other):
if isinstance(other, int):
if isinstance(other, (int, long)):
return self.value == other
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, int):
if isinstance(other, (int, long)):
return self.value != other
else:
return NotImplemented
def __call__(self):
return int(self)
class EMsg(EasyEnum):
Invalid = 0
@ -1823,6 +1829,7 @@ class EMsg(EasyEnum):
class EResult(EasyEnum):
Invalid = 0
OK = 1 # success
Fail = 2 # generic failure
NoConnection = 3 # no/failed network connection
@ -1886,6 +1893,7 @@ class EUniverse(EasyEnum):
Beta = 2
Internal = 3
Dev = 4
Max = 5
class EType(EasyEnum):
@ -1900,6 +1908,7 @@ class EType(EasyEnum):
Chat = 8
ConsoleUser = 9
AnonUser = 10
Max = 11
class EServerType(EasyEnum):

Loading…
Cancel
Save