@ -1,10 +1,12 @@
from base64 import b64decode
from base64 import b64decode
from io import BytesIO
from io import BytesIO
from zipfile import ZipFile , ZIP_DEFLATED
from zipfile import ZipFile , ZIP_DEFLATED , BadZipFile
from struct import pack
from struct import pack
from datetime import datetime
from datetime import datetime
from fnmatch import fnmatch
from steam . enums import EDepotFileFlag
from steam . core . crypto import symmetric_decrypt
from steam . core . crypto import symmetric_decrypt
from steam . util . binary import StructReader
from steam . util . binary import StructReader
from steam . protobufs . content_manifest_pb2 import ( ContentManifestMetadata ,
from steam . protobufs . content_manifest_pb2 import ( ContentManifestMetadata ,
@ -18,7 +20,12 @@ class DepotManifest(object):
PROTOBUF_SIGNATURE_MAGIC = 0x1B81B817
PROTOBUF_SIGNATURE_MAGIC = 0x1B81B817
PROTOBUF_ENDOFMANIFEST_MAGIC = 0x32C415AB
PROTOBUF_ENDOFMANIFEST_MAGIC = 0x32C415AB
def __init__ ( self , data ) :
def __init__ ( self , data = None ) :
""" Manage depot manifest
: param data : manifest data
: type data : bytes
"""
self . metadata = ContentManifestMetadata ( )
self . metadata = ContentManifestMetadata ( )
self . payload = ContentManifestPayload ( )
self . payload = ContentManifestPayload ( )
self . signature = ContentManifestSignature ( )
self . signature = ContentManifestSignature ( )
@ -28,8 +35,8 @@ class DepotManifest(object):
def __repr__ ( self ) :
def __repr__ ( self ) :
params = ' , ' . join ( [
params = ' , ' . join ( [
str ( self . metadata . depot_id ) ,
str ( self . depot_id ) ,
str ( self . metadata . gid_manifest ) ,
str ( self . gid ) ,
repr ( datetime . utcfromtimestamp ( self . metadata . creation_time ) . isoformat ( ) . replace ( ' T ' , ' ' ) ) ,
repr ( datetime . utcfromtimestamp ( self . metadata . creation_time ) . isoformat ( ) . replace ( ' T ' , ' ' ) ) ,
] )
] )
@ -41,9 +48,35 @@ class DepotManifest(object):
params ,
params ,
)
)
@property
def depot_id ( self ) :
return self . metadata . depot_id
@property
def gid ( self ) :
return self . metadata . gid_manifest
@property
def creation_time ( self ) :
return self . metadata . creation_time
@property
def size_original ( self ) :
return self . metadata . cb_disk_original
@property
def size_compressed ( self ) :
return self . metadata . cb_disk_compressed
def decrypt_filenames ( self , depot_key ) :
def decrypt_filenames ( self , depot_key ) :
""" Decrypt all filenames in the manifest
: param depot_key : depot key
: type depot_key : bytes
: raises : : class : ` RuntimeError `
"""
if not self . metadata . filenames_encrypted :
if not self . metadata . filenames_encrypted :
return True
return
for mapping in self . payload . mappings :
for mapping in self . payload . mappings :
filename = b64decode ( mapping . filename )
filename = b64decode ( mapping . filename )
@ -51,17 +84,25 @@ class DepotManifest(object):
try :
try :
filename = symmetric_decrypt ( filename , depot_key )
filename = symmetric_decrypt ( filename , depot_key )
except Exception :
except Exception :
print ( " Unable to decrypt filename for depot manifest " )
RuntimeError ( " Unable to decrypt filename for depot manifest " )
return False
mapping . filename = filename
mapping . filename = filename
self . metadata . filenames_encrypted = False
self . metadata . filenames_encrypted = False
return True
def deserialize ( self , data ) :
def deserialize ( self , data ) :
with ZipFile ( BytesIO ( data ) ) as zf :
""" Deserialize a manifest (compressed or uncompressed)
data = StructReader ( zf . read ( zf . filelist [ 0 ] ) )
: param data : manifest data
: type data : bytes
"""
try :
with ZipFile ( BytesIO ( data ) ) as zf :
data = zf . read ( zf . filelist [ 0 ] )
except BadZipFile :
pass
data = StructReader ( data )
magic , length = data . unpack ( ' <II ' )
magic , length = data . unpack ( ' <II ' )
@ -92,7 +133,12 @@ class DepotManifest(object):
if magic != DepotManifest . PROTOBUF_ENDOFMANIFEST_MAGIC :
if magic != DepotManifest . PROTOBUF_ENDOFMANIFEST_MAGIC :
raise Exception ( " Expecting end of manifest " )
raise Exception ( " Expecting end of manifest " )
def serialize ( self ) :
def serialize ( self , compress = True ) :
""" Serialize manifest
: param compress : wether the output should be Zip compressed
: type compress : bytes
"""
data = BytesIO ( )
data = BytesIO ( )
part = self . payload . SerializeToString ( )
part = self . payload . SerializeToString ( )
@ -109,8 +155,79 @@ class DepotManifest(object):
data . write ( pack ( ' <I ' , DepotManifest . PROTOBUF_ENDOFMANIFEST_MAGIC ) )
data . write ( pack ( ' <I ' , DepotManifest . PROTOBUF_ENDOFMANIFEST_MAGIC ) )
zbuff = BytesIO ( )
if compress :
with ZipFile ( zbuff , ' w ' , ZIP_DEFLATED ) as zf :
zbuff = BytesIO ( )
zf . writestr ( ' z ' , data . getvalue ( ) )
with ZipFile ( zbuff , ' w ' , ZIP_DEFLATED ) as zf :
zf . writestr ( ' z ' , data . getvalue ( ) )
return zbuff . getvalue ( )
else :
return data . getvalue ( )
def __iter__ ( self ) :
for mapping in self . payload . mappings :
yield DepotFile ( self , mapping )
def iter_files ( self , pattern = None ) :
"""
: param pattern : unix shell wildcard pattern , see : module : ` . fnmatch `
: type pattern : str
"""
for mapping in self . payload . mappings :
if ( pattern is not None
and not fnmatch ( mapping . filename . rstrip ( ' \x00 \n \t ' ) , pattern ) ) :
continue
yield DepotFile ( self , mapping )
def __len__ ( self ) :
return len ( self . payload . mappings )
class DepotFile ( object ) :
def __init__ ( self , manifest , file_mapping ) :
""" Depot file
: param manifest : depot manifest
: type manifest : : class : ` . DepotManifest `
: param file_mapping : depot file mapping instance
: type file_mapping : ContentManifestPayload . FileMapping
"""
if not isinstance ( manifest , DepotManifest ) :
raise ValueError ( " Expected ' manifest ' to be of type DepotManifest " )
if not isinstance ( file_mapping , ContentManifestPayload . FileMapping ) :
raise ValueError ( " Expected ' file_mapping ' to be of type ContentManifestPayload.FileMapping " )
self . manifest = manifest
self . file_mapping = file_mapping
def __repr__ ( self ) :
return " < %s ( %s , %s , %s , %s )> " % (
self . __class__ . __name__ ,
self . manifest . depot_id ,
self . manifest . gid ,
repr ( self . filename ) ,
' is_directory=True ' if self . is_directory else self . size ,
)
@property
def filename ( self ) :
return self . file_mapping . filename . rstrip ( ' \x00 \n \t ' )
@property
def size ( self ) :
return self . file_mapping . size
@property
def chunks ( self ) :
return self . file_mapping . chunks
@property
def flags ( self ) :
return self . file_mapping . flags
@property
def is_directory ( self ) :
return self . flags & EDepotFileFlag . Directory > 0
return zbuff . getvalue ( )
@property
def is_file ( self ) :
return not self . is_directory