4 changed files with 539 additions and 527 deletions
@ -0,0 +1,182 @@ |
|||
"""Classes to (de)serialize message headers""" |
|||
import struct |
|||
from steam.enums.emsg import EMsg |
|||
from steam.protobufs import steammessages_base_pb2 |
|||
from steam.protobufs import gc_pb2 |
|||
from steam.util import set_proto_bit, clear_proto_bit |
|||
|
|||
|
|||
class MsgHdr: |
|||
_size = struct.calcsize("<Iqq") |
|||
msg = EMsg.Invalid |
|||
targetJobID = -1 |
|||
sourceJobID = -1 |
|||
|
|||
def __init__(self, data=None): |
|||
if data: self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<Iqq", self.msg, self.targetJobID, self.sourceJobID) |
|||
|
|||
def load(self, data): |
|||
(msg, self.targetJobID, self.sourceJobID) = struct.unpack_from("<Iqq", data) |
|||
self.msg = EMsg(msg) |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["msg: %s" % repr(self.msg), |
|||
"targetJobID: %s" % self.targetJobID, |
|||
"sourceJobID: %s" % self.sourceJobID, |
|||
]) |
|||
|
|||
class ExtendedMsgHdr: |
|||
_size = struct.calcsize("<IBHqqBqi") |
|||
msg = EMsg.Invalid |
|||
headerSize = 36 |
|||
headerVersion = 2 |
|||
targetJobID = -1 |
|||
sourceJobID = -1 |
|||
headerCanary = 239 |
|||
steamID = -1 |
|||
sessionID = -1 |
|||
|
|||
def __init__(self, data=None): |
|||
if data: self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<IBHqqBqi", |
|||
self.msg, |
|||
self.headerSize, |
|||
self.headerVersion, |
|||
self.targetJobID, |
|||
self.sourceJobID, |
|||
self.headerCanary, |
|||
self.steamID, |
|||
self.sessionID, |
|||
) |
|||
|
|||
def load(self, data): |
|||
(msg, |
|||
self.headerSize, |
|||
self.headerVersion, |
|||
self.targetJobID, |
|||
self.sourceJobID, |
|||
self.headerCanary, |
|||
self.steamID, |
|||
self.sessionID, |
|||
) = struct.unpack_from("<IBHqqBqi", data) |
|||
|
|||
self.msg = EMsg(msg) |
|||
|
|||
if self.headerSize != 36 or self.headerVersion != 2: |
|||
raise RuntimeError("Failed to parse header") |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["msg: %s" % self.msg, |
|||
"headerSize: %s" % self.headerSize, |
|||
"headerVersion: %s" % self.headerVersion, |
|||
"targetJobID: %s" % self.targetJobID, |
|||
"sourceJobID: %s" % self.sourceJobID, |
|||
"headerCanary: %s" % self.headerCanary, |
|||
"steamID: %s" % self.steamID, |
|||
"sessionID: %s" % self.sessionID, |
|||
]) |
|||
|
|||
|
|||
class MsgHdrProtoBuf: |
|||
_size = struct.calcsize("<II") |
|||
msg = EMsg.Invalid |
|||
|
|||
def __init__(self, data=None): |
|||
self.proto = steammessages_base_pb2.CMsgProtoBufHeader() |
|||
|
|||
if data: |
|||
self.load(data) |
|||
|
|||
def serialize(self): |
|||
proto_data = self.proto.SerializeToString() |
|||
return struct.pack("<II", set_proto_bit(self.msg), len(proto_data)) + proto_data |
|||
|
|||
def load(self, data): |
|||
msg, proto_length = struct.unpack_from("<II", data) |
|||
|
|||
self.msg = EMsg(clear_proto_bit(msg)) |
|||
size = MsgHdrProtoBuf._size |
|||
self._fullsize = size + proto_length |
|||
self.proto.ParseFromString(data[size:self._fullsize]) |
|||
|
|||
|
|||
class GCMsgHdr: |
|||
_size = struct.calcsize("<Hqq") |
|||
proto = None |
|||
headerVersion = 1 |
|||
targetJobID = -1 |
|||
sourceJobID = -1 |
|||
|
|||
def __init__(self, msg, data=None): |
|||
self.msg = clear_proto_bit(msg) |
|||
|
|||
if data: |
|||
self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<Hqq", |
|||
self.headerVersion, |
|||
self.targetJobID, |
|||
self.sourceJobID, |
|||
) |
|||
|
|||
def load(self, data): |
|||
(self.headerVersion, |
|||
self.targetJobID, |
|||
self.sourceJobID, |
|||
) = struct.unpack_from("<Hqq", data) |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["headerVersion: %s" % self.headerVersion, |
|||
"targetJobID: %s" % self.targetJobID, |
|||
"sourceJobID: %s" % self.sourceJobID, |
|||
]) |
|||
|
|||
class GCMsgHdrProto: |
|||
_size = struct.calcsize("<Ii") |
|||
headerLength = 0 |
|||
|
|||
def __init__(self, msg, data=None): |
|||
self.proto = gc_pb2.CMsgProtoBufHeader() |
|||
self.msg = clear_proto_bit(msg) |
|||
|
|||
if data: |
|||
self.load(data) |
|||
|
|||
def serialize(self): |
|||
proto_data = self.proto.SerializeToString() |
|||
self.headerLength = len(proto_data) |
|||
|
|||
return struct.pack("<Ii", |
|||
set_proto_bit(self.msg), |
|||
self.headerLength, |
|||
) + proto_data |
|||
|
|||
def load(self, data): |
|||
(msg, |
|||
self.headerLength, |
|||
) = struct.unpack_from("<Ii", data) |
|||
|
|||
self.msg = clear_proto_bit(msg) |
|||
|
|||
if self.headerLength: |
|||
x = GCMsgHdrProto._size |
|||
self.proto.ParseFromString(data[x:x+self.headerLength]) |
|||
|
|||
def __str__(self): |
|||
resp = ["msg: %s" % self.msg, |
|||
"headerLength: %s" % self.headerLength, |
|||
] |
|||
|
|||
proto = str(self.proto) |
|||
|
|||
if proto: |
|||
resp.append('-- proto --') |
|||
resp.append(proto) |
|||
|
|||
return '\n'.join(resp) |
@ -0,0 +1,242 @@ |
|||
"""Classes to (de)serialize various struct messages""" |
|||
import struct |
|||
from steam.enums import EResult, EUniverse |
|||
from steam.enums.emsg import EMsg |
|||
|
|||
_emsg_map = {} |
|||
|
|||
def get_struct(emsg): |
|||
return _emsg_map.get(emsg, None) |
|||
|
|||
class MapEMsgMeta(type): |
|||
"""Automatically maps subclasses of :class:`StructMessage` to ``EMsg``""" |
|||
|
|||
def __new__(metacls, name, bases, classdict): |
|||
cls = type.__new__(metacls, name, bases, classdict) |
|||
|
|||
if name != 'StructMessage': |
|||
try: |
|||
_emsg_map[EMsg[name]] = cls |
|||
except KeyError: |
|||
pass |
|||
|
|||
return cls |
|||
|
|||
class StructMessage: |
|||
__metaclass__ = MapEMsgMeta |
|||
|
|||
def __init__(self, data=None): |
|||
if data: self.load(data) |
|||
|
|||
def serialize(self): |
|||
return b'' |
|||
|
|||
def load(self, data): |
|||
pass |
|||
|
|||
class ChannelEncryptRequest(StructMessage): |
|||
protocolVersion = 1 |
|||
universe = EUniverse.Invalid |
|||
challenge = b'' |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<II", self.protocolVersion, self.universe) + self.challenge |
|||
|
|||
def load(self, data): |
|||
(self.protocolVersion, |
|||
universe, |
|||
) = struct.unpack_from("<II", data) |
|||
|
|||
self.universe = EUniverse(universe) |
|||
|
|||
if len(data) > 8: |
|||
self.challenge = data[8:] |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["protocolVersion: %s" % self.protocolVersion, |
|||
"universe: %s" % repr(self.universe), |
|||
"challenge: %s" % repr(self.challenge), |
|||
]) |
|||
|
|||
class ChannelEncryptResponse(StructMessage): |
|||
protocolVersion = 1 |
|||
keySize = 128 |
|||
key = '' |
|||
crc = 0 |
|||
|
|||
def __init__(self, data=None): |
|||
if data: |
|||
self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<II128sII", |
|||
self.protocolVersion, |
|||
self.keySize, |
|||
self.key, |
|||
self.crc, |
|||
0 |
|||
) |
|||
|
|||
def load(self, data): |
|||
(self.protocolVersion, |
|||
self.keySize, |
|||
self.key, |
|||
self.crc, |
|||
_, |
|||
) = struct.unpack_from("<II128sII", data) |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["protocolVersion: %s" % self.protocolVersion, |
|||
"keySize: %s" % self.keySize, |
|||
"key: %s" % repr(self.key), |
|||
"crc: %s" % self.crc, |
|||
]) |
|||
|
|||
class ChannelEncryptResult(StructMessage): |
|||
eresult = EResult.Invalid |
|||
|
|||
def __init__(self, data=None): |
|||
if data: |
|||
self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<I", self.eresult) |
|||
|
|||
def load(self, data): |
|||
(result,) = struct.unpack_from("<I", data) |
|||
self.eresult = EResult(result) |
|||
|
|||
def __str__(self): |
|||
return "result: %s" % repr(self.eresult) |
|||
|
|||
class ClientLogOnResponse(StructMessage): |
|||
eresult = EResult.Invalid |
|||
|
|||
def __init__(self, data=None): |
|||
if data: |
|||
self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<I", self.eresult) |
|||
|
|||
def load(self, data): |
|||
(result,) = struct.unpack_from("<I", data) |
|||
self.eresult = EResult(result) |
|||
|
|||
def __str__(self): |
|||
return "eresult: %s" % repr(self.eresult) |
|||
|
|||
class ClientVACBanStatus(StructMessage): |
|||
numBans = 0 |
|||
|
|||
def __init__(self, data=None): |
|||
if data: self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<L", self.steamIdChat) |
|||
|
|||
def load(self, data): |
|||
self.steamIdChat, = struct.unpack_from("<L", data) |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["numBans: %d" % self.numBans, |
|||
]) |
|||
|
|||
class ClientChatMsg(StructMessage): |
|||
steamIdChatter = 0 |
|||
steamIdChatRoom = 0 |
|||
ChatMsgType = 0 |
|||
text = "" |
|||
|
|||
def __init__(self, data=None): |
|||
if data: |
|||
self.load(data) |
|||
|
|||
def serialize(self): |
|||
rbytes = struct.pack("<QQI", |
|||
self.steamIdChatter, |
|||
self.steamIdChatRoom, |
|||
self.ChatMsgType, |
|||
) |
|||
# utf-8 encode only when unicode in py2 and str in py3 |
|||
rbytes += (self.text.encode('utf-8') |
|||
if (not isinstance(self.text, str) and bytes is str) |
|||
or isinstance(self.text, str) |
|||
else self.text |
|||
) + b'\x00' |
|||
|
|||
return rbytes |
|||
|
|||
def load(self, data): |
|||
(self.steamIdChatter, |
|||
self.steamIdChatRoom, |
|||
self.ChatMsgType, |
|||
) = struct.unpack_from("<QQI", data) |
|||
|
|||
self.text = data[struct.calcsize("<QQI"):-1].decode('utf-8') |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["steamIdChatter: %d" % self.steamIdChatter, |
|||
"steamIdChatRoom: %d" % self.steamIdChatRoom, |
|||
"ChatMsgType: %d" % self.ChatMsgType, |
|||
"text: %s" % repr(self.text), |
|||
]) |
|||
|
|||
class ClientJoinChat(StructMessage): |
|||
steamIdChat = 0 |
|||
isVoiceSpeaker = False |
|||
|
|||
def __init__(self, data=None): |
|||
if data: self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<Q?", |
|||
self.steamIdChat, |
|||
self.isVoiceSpeaker |
|||
) |
|||
|
|||
def load(self, data): |
|||
(self.steamIdChat, |
|||
self.isVoiceSpeaker |
|||
) = struct.unpack_from("<Q?", data) |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["steamIdChat: %d" % self.steamIdChat, |
|||
"isVoiceSpeaker: %r" % self.isVoiceSpeaker, |
|||
]) |
|||
|
|||
class ClientChatMemberInfo(StructMessage): |
|||
steamIdChat = 0 |
|||
type = 0 |
|||
steamIdUserActedOn = 0 |
|||
chatAction = 0 |
|||
steamIdUserActedBy = 0 |
|||
|
|||
def __init__(self, data=None): |
|||
if data: |
|||
self.load(data) |
|||
|
|||
def serialize(self): |
|||
return struct.pack("<QIQIQ", |
|||
self.steamIdChat, |
|||
self.type, |
|||
self.steamIdUserActedOn, |
|||
self.chatAction, |
|||
self.steamIdUserActedBy |
|||
) |
|||
|
|||
def load(self, data): |
|||
(self.steamIdChat, |
|||
self.type, |
|||
self.steamIdUserActedOn, |
|||
self.chatAction, |
|||
self.steamIdUserActedBy |
|||
) = struct.unpack_from("<QIQIQ", data) |
|||
|
|||
def __str__(self): |
|||
return '\n'.join(["steamIdChat: %d" % self.steamIdChat, |
|||
"type: %r" % self.type, |
|||
"steamIdUserActedOn: %d" % self.steamIdUserActedOn, |
|||
"chatAction: %d" % self.chatAction, |
|||
"steamIdUserActedBy: %d" % self.steamIdUserActedBy |
|||
]) |
@ -0,0 +1,65 @@ |
|||
import re |
|||
from importlib import import_module |
|||
|
|||
service_lookup = { |
|||
'Broadcast': 'steam.protobufs.steammessages_broadcast_pb2', |
|||
'Cloud': 'steam.protobufs.steammessages_cloud_pb2', |
|||
'DNReport': 'steam.protobufs.steammessages_cloud_pb2', |
|||
'Credentials': 'steam.protobufs.steammessages_credentials_pb2', |
|||
'ContentBuilder': 'steam.protobufs.steammessages_depotbuilder_pb2', |
|||
'DeviceAuth': 'steam.protobufs.steammessages_deviceauth_pb2', |
|||
'Econ': 'steam.protobufs.steammessages_econ_pb2', |
|||
'GameNotifications': 'steam.protobufs.steammessages_gamenotifications_pb2', |
|||
'GameServers': 'steam.protobufs.steammessages_gameservers_pb2', |
|||
'Inventory': 'steam.protobufs.steammessages_inventory_pb2', |
|||
'Community': 'steam.protobufs.steammessages_linkfilter_pb2', |
|||
'Offline': 'steam.protobufs.steammessages_offline_pb2', |
|||
'Parental': 'steam.protobufs.steammessages_parental_pb2', |
|||
'PartnerApps': 'steam.protobufs.steammessages_partnerapps_pb2', |
|||
'PhysicalGoods': 'steam.protobufs.steammessages_physicalgoods_pb2', |
|||
'PlayerClient': 'steam.protobufs.steammessages_player_pb2', |
|||
'Player': 'steam.protobufs.steammessages_player_pb2', |
|||
'PublishedFile': 'steam.protobufs.steammessages_publishedfile_pb2', |
|||
'KeyEscrow': 'steam.protobufs.steammessages_secrets_pb2', |
|||
'TwoFactor': 'steam.protobufs.steammessages_twofactor_pb2', |
|||
'MsgTest': 'steam.protobufs.steammessages_unified_test_pb2', |
|||
'Video': 'steam.protobufs.steammessages_video_pb2', |
|||
} |
|||
|
|||
method_lookup = { |
|||
} |
|||
|
|||
def get_um(method_name, response=False): |
|||
"""Get protobuf for given method name |
|||
|
|||
:param method_name: full method name (e.g. ``Player.GetGameBadgeLevels#1``) |
|||
:type method_name: :class:`str` |
|||
:param response: whether to return proto for response or request |
|||
:type response: :class:`bool` |
|||
:return: protobuf message |
|||
""" |
|||
key = (method_name, response) |
|||
|
|||
if key not in method_lookup: |
|||
match = re.findall(r'^([a-z]+)\.([a-z]+)#(\d)?$', method_name, re.I) |
|||
if not match: |
|||
return None |
|||
|
|||
interface, method, version = match[0] |
|||
|
|||
if interface not in service_lookup: |
|||
raise None |
|||
|
|||
package = import_module(service_lookup[interface]) |
|||
|
|||
service = getattr(package, interface, None) |
|||
if service is None: |
|||
return None |
|||
|
|||
for method_desc in service.GetDescriptor().methods: |
|||
name = "%s.%s#%d" % (interface, method_desc.name, 1) |
|||
|
|||
method_lookup[(name, False)] = getattr(package, method_desc.input_type.full_name, None) |
|||
method_lookup[(name, True)] = getattr(package, method_desc.output_type.full_name, None) |
|||
|
|||
return method_lookup[key] |
Loading…
Reference in new issue