SickGear/lib/hachoir/parser/archive/sevenzip.py
JackDandy 980e05cc99 Change Hachoir can't support PY2 so backport their PY3 to prevent a need for system dependant external binaries like mediainfo.
Backported 400 revisions from rev 1de4961-8897c5b (2018-2014).
Move core/benchmark, core/cmd_line, core/memory, core/profiler and core/timeout to core/optional/*
Remove metadata/qt*

PORT: Version 2.0a3 (inline with 3.0a3 @ f80c7d5).
Basic Support for XMP Packets.
tga: improvements to adhere more closely to the spec.
pdf: slightly improved parsing.
rar: fix TypeError on unknown block types.
Add MacRoman win32 codepage.
tiff/exif: support SubIFDs and tiled images.
Add method to export metadata in dictionary.
mpeg_video: don't attempt to parse Stream past length.
mpeg_video: parse ESCR correctly, add SCR value.
Change centralise CustomFragments.
field: don't set parser class if class is None, to enable autodetect.
field: add value/display for CustomFragment.
parser: inline warning to enable tracebacks in debug mode.
Fix empty bytestrings in makePrintable.
Fix contentSize in jpeg.py to account for image_data blocks.
Fix the ELF parser.
Enhance the AR archive parser.
elf parser: fix wrong wrong fields order in parsing little endian section flags.
elf parser: add s390 as a machine type.
Flesh out mp4 parser.

PORT: Version 2.0a1 (inline with 3.0a1).
Major refactoring and PEP8.
Fix ResourceWarning warnings on files. Add a close() method and support for the context manager protocol ("with obj: ...") to parsers, input and output streams.
metadata: get comment from ZIP.
Support for InputIOStream.read(0).
Fix sizeGe when size is None.
Remove unused new_seekable_field_set file.
Remove parser Mapsforge .map.
Remove parser Parallel Realities Starfighter .pak files.
sevenzip: fix for newer archives.
java: update access flags and modifiers for Java 1.7 and update description text for most recent Java.
Support ustar prefix field in tar archives.
Remove file_system* parsers.
Remove misc parsers 3d0, 3ds, gnome_keyring, msoffice*, mstask, ole*, word*.
Remove program parsers macho, nds, prc.
Support non-8bit Character subclasses.
Python parser supports Python 3.7.
Enhance mpeg_ts parser to support MTS/M2TS.
Support for creation date in tiff.
Change don't hardcode errno constant.

PORT: 1.9.1
Internal Only: The following are legacy reference to upstream commit messages.
Relevant changes up to b0a115f8.
Use integer division.
Replace HACHOIR_ERRORS with Exception.
Fix metadata.Data: make it sortable.
Import fixes from e7de492.
PORT: Version 2.0a1 (inline with 3.0a1 @ e9f8fad).
Replace hachoir.core.field with hachoir.field
Replace hachoir.core.stream with hachoir.stream
Remove the compatibility module for PY1.5 to PY2.5.
metadata: support TIFF picture.
metadata: fix string normalization.
metadata: fix datetime regex Fix hachoir bug #57.
FileFromInputStream: fix comparison between None and an int.
InputIOStream: open the file in binary mode.
2018-03-28 00:43:11 +01:00

802 lines
26 KiB
Python

"""
7zip file parser
Informations:
- File 7zformat.txt of 7-zip SDK:
http://www.7-zip.org/sdk.html
Author: Olivier SCHWAB
Creation date: 6 december 2006
Updated by: Robert Xiao
Date: February 26 2011
"""
from hachoir.parser import Parser
from hachoir.field import (Field, FieldSet, ParserError, CString,
Enum, Bit, Bits, UInt8, UInt32, UInt64,
Bytes, RawBytes, TimestampWin64)
from hachoir.stream import StringInputStream
from hachoir.core.endian import LITTLE_ENDIAN
from hachoir.core.text_handler import textHandler, hexadecimal
from hachoir.core.tools import alignValue, humanFilesize, makePrintable
from hachoir.parser.common.msdos import MSDOSFileAttr32
try:
from pylzma import decompress as lzmadecompress
has_lzma = True
except ImportError:
has_lzma = False
class SZUInt64(Field):
"""
Variable length UInt64, where the first byte gives both the number of bytes
needed and the upper byte value.
"""
def __init__(self, parent, name, max_size=None, description=None):
Field.__init__(self, parent, name, size=8, description=description)
value = 0
addr = self.absolute_address
mask = 0x80
firstByte = parent.stream.readBits(addr, 8, LITTLE_ENDIAN)
for i in xrange(8):
addr += 8
if not (firstByte & mask):
value += ((firstByte & (mask - 1)) << (8 * i))
break
value |= (parent.stream.readBits(addr, 8, LITTLE_ENDIAN) << (8 * i))
mask >>= 1
self._size += 8
self.createValue = lambda: value
kEnd = 0x00
kHeader = 0x01
kArchiveProperties = 0x02
kAdditionalStreamsInfo = 0x03
kMainStreamsInfo = 0x04
kFilesInfo = 0x05
kPackInfo = 0x06
kUnPackInfo = 0x07
kSubStreamsInfo = 0x08
kSize = 0x09
kCRC = 0x0A
kFolder = 0x0B
kCodersUnPackSize = 0x0C
kNumUnPackStream = 0x0D
kEmptyStream = 0x0E
kEmptyFile = 0x0F
kAnti = 0x10
kName = 0x11
kCreationTime = 0x12
kLastAccessTime = 0x13
kLastWriteTime = 0x14
kWinAttributes = 0x15
kComment = 0x16
kEncodedHeader = 0x17
kStartPos = 0x18
kDummy = 0x19
PROP_IDS = {
kEnd: 'kEnd',
kHeader: 'kHeader',
kArchiveProperties: 'kArchiveProperties',
kAdditionalStreamsInfo: 'kAdditionalStreamsInfo',
kMainStreamsInfo: 'kMainStreamsInfo',
kFilesInfo: 'kFilesInfo',
kPackInfo: 'kPackInfo',
kUnPackInfo: 'kUnPackInfo',
kSubStreamsInfo: 'kSubStreamsInfo',
kSize: 'kSize',
kCRC: 'kCRC',
kFolder: 'kFolder',
kCodersUnPackSize: 'kCodersUnPackSize',
kNumUnPackStream: 'kNumUnPackStream',
kEmptyStream: 'kEmptyStream',
kEmptyFile: 'kEmptyFile',
kAnti: 'kAnti',
kName: 'kName',
kCreationTime: 'kCreationTime',
kLastAccessTime: 'kLastAccessTime',
kLastWriteTime: 'kLastWriteTime',
kWinAttributes: 'kWinAttributes',
kComment: 'kComment',
kEncodedHeader: 'kEncodedHeader',
kStartPos: 'kStartPos',
kDummy: 'kDummy',
}
PROP_DESC = {
kEnd: 'End-of-header marker',
kHeader: 'Archive header',
kArchiveProperties: 'Archive properties',
kAdditionalStreamsInfo: 'AdditionalStreamsInfo',
kMainStreamsInfo: 'MainStreamsInfo',
kFilesInfo: 'FilesInfo',
kPackInfo: 'PackInfo',
kUnPackInfo: 'UnPackInfo',
kSubStreamsInfo: 'SubStreamsInfo',
kSize: 'Size',
kCRC: 'CRC',
kFolder: 'Folder',
kCodersUnPackSize: 'CodersUnPackSize',
kNumUnPackStream: 'NumUnPackStream',
kEmptyStream: 'EmptyStream',
kEmptyFile: 'EmptyFile',
kAnti: 'Anti',
kName: 'Name',
kCreationTime: 'CreationTime',
kLastAccessTime: 'LastAccessTime',
kLastWriteTime: 'LastWriteTime',
kWinAttributes: 'WinAttributes',
kComment: 'Comment',
kEncodedHeader: 'Encoded archive header',
kStartPos: 'Unknown',
kDummy: 'Dummy entry',
}
def ReadNextByte(self):
return self.stream.readBits(self.absolute_address + self.current_size, 8, self.endian)
def PropID(self, name):
return Enum(UInt8(self, name), PROP_IDS)
class SevenZipBitVector(FieldSet):
def __init__(self, parent, name, num, has_all_byte=False, **args):
FieldSet.__init__(self, parent, name, **args)
self.has_all_byte = has_all_byte
self.num = num
def createFields(self):
if self.has_all_byte:
yield Enum(UInt8(self, "all_defined"), {0: 'False', 1: 'True'})
if self['all_defined'].value:
return
nbytes = alignValue(self.num, 8) // 8
ctr = 0
for i in xrange(nbytes):
for j in reversed(xrange(8)):
yield Bit(self, "bit[%d]" % (ctr + j))
ctr += 8
def isAllDefined(self):
return self.has_all_byte and self['all_defined'].value
def isDefined(self, index):
if self.isAllDefined():
return True
return self['bit[%d]' % index].value
def createValue(self):
if self.isAllDefined():
return range(self.num)
return [i for i in xrange(self.num) if self['bit[%d]' % i].value]
def createDisplay(self):
if self.isAllDefined():
return 'all'
return ','.join(str(i) for i in self.value)
class ArchiveProperty(FieldSet):
def createFields(self):
yield PropID(self, "id")
size = SZUInt64(self, "size")
yield size
if size.value:
yield RawBytes(self, "data", size.value)
def createDescription(self):
return self['id'].display
class ArchiveProperties(FieldSet):
def createFields(self):
yield PropID(self, "id")
while not self.eof:
uid = ReadNextByte(self)
if uid == kEnd:
yield PropID(self, "end_marker")
break
yield ArchiveProperty(self, "prop[]")
class Digests(FieldSet):
def __init__(self, parent, name, num_digests, digest_desc=None, desc=None):
FieldSet.__init__(self, parent, name, desc)
self.num_digests = num_digests
if digest_desc is None:
self.digest_desc = ['stream %d' % i for i in xrange(num_digests)]
else:
self.digest_desc = digest_desc
def createFields(self):
yield PropID(self, "id")
definearr = SevenZipBitVector(self, "defined", self.num_digests, has_all_byte=True)
yield definearr
for index in definearr.value:
yield textHandler(UInt32(self, "digest[]",
"Digest for %s" % self.digest_desc[index]), hexadecimal)
class PackInfo(FieldSet):
def createFields(self):
yield PropID(self, "id")
yield SZUInt64(self, "pack_pos", "File offset to the packed data")
num = SZUInt64(self, "num_pack_streams", "Number of packed streams")
yield num
while not self.eof:
uid = ReadNextByte(self)
if uid == kEnd:
yield PropID(self, "end_marker")
break
elif uid == kSize:
yield PropID(self, "size_marker")
for index in xrange(num.value):
yield SZUInt64(self, "pack_size[]")
elif uid == kCRC:
yield Digests(self, "digests", num.value)
else:
raise ParserError("Unexpected ID (%i)" % uid)
METHODS = {
"\0": "Copy",
"\3": "Delta",
"\4": "x86_BCJ",
"\5": "PowerPC",
"\6": "IA64",
"\7": "ARM_LE",
"\x08": "ARMT_LE", # thumb
"\x09": "SPARC",
"\x21": "LZMA2",
"\2\3\2": "Common-Swap-2",
"\2\3\4": "Common-Swap-4",
"\3\1\1": "7z-LZMA",
"\3\3\1\3": "7z-Branch-x86-BCJ",
"\3\3\1\x1b": "7z-Branch-x86-BCJ2",
"\3\3\2\5": "7z-Branch-PowerPC-BE",
"\3\3\3\1": "7z-Branch-Alpha-LE",
"\3\3\4\1": "7z-Branch-IA64-LE",
"\3\3\5\1": "7z-Branch-ARM-LE",
"\3\3\6\5": "7z-Branch-M68-BE",
"\3\3\7\1": "7z-Branch-ARMT-LE",
"\3\3\x08\5": "7z-Branch-SPARC-BE",
"\3\4\1": "7z-PPMD",
"\3\x7f\1": "7z-Experimental",
"\4\0": "Reserved",
"\4\1\0": "Zip-Copy",
"\4\1\1": "Zip-Shrink",
"\4\1\6": "Zip-Implode",
"\4\1\x08": "Zip-Deflate",
"\4\1\x09": "Zip-Deflate64",
"\4\1\x10": "Zip-BZip2",
"\4\1\x14": "Zip-LZMA",
"\4\1\x60": "Zip-JPEG",
"\4\1\x61": "Zip-WavPack",
"\4\1\x62": "Zip-PPMD",
"\4\1\x63": "Zip-wzAES",
"\4\2\2": "BZip2",
"\4\3\1": "RAR-15",
"\4\3\2": "RAR-20",
"\4\3\3": "RAR-29",
"\4\4\1": "Arj3",
"\4\4\2": "Arj4",
"\4\5": "Z",
"\4\6": "LZH",
"\4\7": "7z-Reserved",
"\4\x08": "CAB",
"\4\x09\1": "NSIS-Deflate",
"\4\x09\2": "NSIS-BZip2",
"\6\0": "Crypto-Reserved",
"\6\1\x00": "Crypto-AES128-ECB",
"\6\1\x01": "Crypto-AES128-CBC",
"\6\1\x02": "Crypto-AES128-CFB",
"\6\1\x03": "Crypto-AES128-OFB",
"\6\1\x04": "Crypto-AES128-CTR",
"\6\1\x40": "Crypto-AES192-ECB",
"\6\1\x41": "Crypto-AES192-CBC",
"\6\1\x42": "Crypto-AES192-CFB",
"\6\1\x43": "Crypto-AES192-OFB",
"\6\1\x44": "Crypto-AES192-CTR",
"\6\1\x80": "Crypto-AES256-ECB",
"\6\1\x81": "Crypto-AES256-CBC",
"\6\1\x82": "Crypto-AES256-CFB",
"\6\1\x83": "Crypto-AES256-OFB",
"\6\1\x84": "Crypto-AES256-CTR",
"\6\1\xc0": "Crypto-AES-ECB",
"\6\1\xc1": "Crypto-AES-CBC",
"\6\1\xc2": "Crypto-AES-CFB",
"\6\1\xc3": "Crypto-AES-OFB",
"\6\1\xc4": "Crypto-AES-CTR",
"\6\7": "Crypto-Reserved",
"\6\x0f": "Crypto-Reserved",
"\6\xf0": "Crypto-Misc",
"\6\xf1\1\1": "Crypto-Zip",
"\6\xf1\3\2": "Crypto-RAR-Unknown",
"\6\xf1\3\3": "Crypto-RAR-29", # AES128
"\6\xf1\7\1": "Crypto-7z", # AES256
"\7\0": "Hash-None",
"\7\1": "Hash-CRC",
"\7\2": "Hash-SHA1",
"\7\3": "Hash-SHA256",
"\7\4": "Hash-SHA384",
"\7\5": "Hash-SHA512",
"\7\xf0": "Hash-Misc",
"\7\xf1\3\3": "Hash-RAR-29", # modified SHA1
"\7\xf1\7\1": "Hash-7z", # SHA256
}
class Coder(FieldSet):
def createFields(self):
yield Bits(self, "id_size", 4)
yield Bit(self, "is_not_simple", "If unset, stream setup is simple")
yield Bit(self, "has_attribs", "Are there compression properties attached?")
yield Bit(self, "unused[]")
yield Bit(self, "is_not_last_method", "Are there more methods after this one in the alternative method list?")
size = self['id_size'].value
if size > 0:
yield Enum(RawBytes(self, "id", size), METHODS)
if self['is_not_simple'].value:
yield SZUInt64(self, "num_stream_in")
yield SZUInt64(self, "num_stream_out")
self.info("Streams: IN=%u OUT=%u" % \
(self["num_stream_in"].value, self["num_stream_out"].value))
if self['has_attribs'].value:
size = SZUInt64(self, "properties_size")
yield size
yield RawBytes(self, "properties", size.value)
def _get_num_streams(self, direction):
if self['is_not_simple'].value:
return self['num_stream_%s' % direction].value
return 1
in_streams = property(lambda self: self._get_num_streams('in'))
out_streams = property(lambda self: self._get_num_streams('out'))
class CoderList(FieldSet):
def createFields(self):
while not self.eof:
field = Coder(self, "coder[]")
yield field
if not field['is_not_last_method'].value:
break
class BindPairInfo(FieldSet):
def createFields(self):
# 64 bits values then cast to 32 in fact
yield SZUInt64(self, "in_index")
yield SZUInt64(self, "out_index")
self.info("Indexes: IN=%u OUT=%u" % \
(self["in_index"].value, self["out_index"].value))
class Folder(FieldSet):
def createFields(self):
yield SZUInt64(self, "num_coders")
num = self["num_coders"].value
self.info("Folder: %u codecs" % num)
in_streams = out_streams = 0
# Coder info
for index in xrange(num):
ci = CoderList(self, "coders[]")
yield ci
in_streams += ci['coder[0]'].in_streams
out_streams += ci['coder[0]'].out_streams
self._in_streams = in_streams
self._out_streams = out_streams
# Bind pairs
self.info("out streams: %u" % out_streams)
for index in xrange(out_streams - 1):
yield BindPairInfo(self, "bind_pair[]")
# Packed streams
# @todo: Actually find mapping
packed_streams = in_streams - out_streams + 1
if packed_streams > 1:
for index in xrange(packed_streams):
yield SZUInt64(self, "pack_stream[]")
def _get_num_streams(self, direction):
list(self)
return getattr(self, '_' + direction + '_streams')
in_streams = property(lambda self: self._get_num_streams('in'))
out_streams = property(lambda self: self._get_num_streams('out'))
class UnpackInfo(FieldSet):
def createFields(self):
yield PropID(self, "id")
yield PropID(self, "folder_marker")
assert self['folder_marker'].value == kFolder
yield SZUInt64(self, "num_folders")
# Get generic info
num = self["num_folders"].value
self.info("%u folders" % num)
yield UInt8(self, "is_external")
if self['is_external'].value:
yield SZUInt64(self, "folder_data_offset", "Offset to folder data within data stream")
else:
# Read folder items
for folder_index in xrange(num):
yield Folder(self, "folder[]")
yield PropID(self, "unpacksize_marker")
assert self['unpacksize_marker'].value == kCodersUnPackSize
for folder_index in xrange(num):
folder = self["folder[%u]" % folder_index]
for index in xrange(folder.out_streams):
yield SZUInt64(self, "unpack_size[%d][%d]" % (folder_index, index))
# Extract digests
while not self.eof:
uid = ReadNextByte(self)
if uid == kEnd:
yield PropID(self, "end_marker")
break
elif uid == kCRC:
yield Digests(self, "digests", num)
else:
raise ParserError("Unexpected ID (%i)" % uid)
class SubStreamInfo(FieldSet):
def createFields(self):
yield PropID(self, "id")
num_folders = self['../unpack_info/num_folders'].value
num_unpackstreams = [1] * num_folders
while not self.eof:
uid = ReadNextByte(self)
if uid == kEnd:
yield PropID(self, "end_marker")
break
elif uid == kNumUnPackStream:
yield PropID(self, "num_unpackstream_marker")
for i in xrange(num_folders):
field = SZUInt64(self, "num_unpackstreams[]")
yield field
num_unpackstreams[i] = field.value
elif uid == kSize:
yield PropID(self, "size_marker")
for i in xrange(num_folders):
# The last substream's size is the stream size minus the other substreams.
for j in xrange(num_unpackstreams[i] - 1):
yield SZUInt64(self, "unpack_size[%d][%d]" % (i, j))
elif uid == kCRC:
digests = []
for i in xrange(num_folders):
if num_unpackstreams[i] == 1 and 'digests' in self['../unpack_info']:
continue
for j in xrange(num_unpackstreams[i]):
digests.append('folder %i, stream %i' % (i, j))
yield Digests(self, "digests", len(digests), digests)
else:
raise ParserError("Unexpected ID (%i)" % uid)
class StreamsInfo(FieldSet):
def createFields(self):
yield PropID(self, "id")
while not self.eof:
uid = ReadNextByte(self)
if uid == kEnd:
yield PropID(self, "end")
break
elif uid == kPackInfo:
yield PackInfo(self, "pack_info", PROP_DESC[uid])
elif uid == kUnPackInfo:
yield UnpackInfo(self, "unpack_info", PROP_DESC[uid])
elif uid == kSubStreamsInfo:
yield SubStreamInfo(self, "substreams_info", PROP_DESC[uid])
else:
raise ParserError("Unexpected ID (%i)" % uid)
class EncodedHeader(StreamsInfo):
pass
class EmptyStreamProperty(FieldSet):
def createFields(self):
yield PropID(self, "id")
yield SZUInt64(self, "size")
yield SevenZipBitVector(self, "vec", self['../num_files'].value)
def createValue(self):
return self['vec'].value
def createDisplay(self):
return self['vec'].display
class EmptyFileProperty(FieldSet):
def createFields(self):
yield PropID(self, "id")
yield SZUInt64(self, "size")
empty_streams = self['../empty_streams/vec'].value
yield SevenZipBitVector(self, "vec", len(empty_streams))
def createValue(self):
empty_streams = self['../empty_streams/vec'].value
return [empty_streams[i] for i in self['vec'].value]
def createDisplay(self):
return ','.join(str(i) for i in self.value)
class FileTimeProperty(FieldSet):
def createFields(self):
yield PropID(self, "id")
yield SZUInt64(self, "size")
definearr = SevenZipBitVector(self, "defined", self['../num_files'].value, has_all_byte=True)
yield definearr
yield UInt8(self, "is_external")
if self['is_external'].value:
yield SZUInt64(self, "folder_data_offset", "Offset to folder data within data stream")
else:
for index in definearr.value:
yield TimestampWin64(self, "timestamp[%d]" % index)
class FileNames(FieldSet):
def createFields(self):
yield PropID(self, "id")
yield SZUInt64(self, "size")
yield UInt8(self, "is_external")
if self['is_external'].value:
yield SZUInt64(self, "folder_data_offset", "Offset to folder data within data stream")
else:
for index in xrange(self['../num_files'].value):
yield CString(self, "name[%d]" % index, charset="UTF-16-LE")
class FileAttributes(FieldSet):
def createFields(self):
yield PropID(self, "id")
yield SZUInt64(self, "size")
definearr = SevenZipBitVector(self, "defined", self['../num_files'].value, has_all_byte=True)
yield definearr
yield UInt8(self, "is_external")
if self['is_external'].value:
yield SZUInt64(self, "folder_data_offset", "Offset to folder data within data stream")
else:
for index in definearr.value:
yield MSDOSFileAttr32(self, "attributes[%d]" % index)
class FilesInfo(FieldSet):
def createFields(self):
yield PropID(self, "id")
yield SZUInt64(self, "num_files")
while not self.eof:
uid = ReadNextByte(self)
if uid == kEnd:
yield PropID(self, "end_marker")
break
elif uid == kEmptyStream:
yield EmptyStreamProperty(self, "empty_streams")
elif uid == kEmptyFile:
yield EmptyFileProperty(self, "empty_files")
elif uid == kAnti:
yield EmptyFileProperty(self, "anti_files")
elif uid == kCreationTime:
yield FileTimeProperty(self, "creation_time")
elif uid == kLastAccessTime:
yield FileTimeProperty(self, "access_time")
elif uid == kLastWriteTime:
yield FileTimeProperty(self, "modified_time")
elif uid == kName:
yield FileNames(self, "filenames")
elif uid == kWinAttributes:
yield FileAttributes(self, "attributes")
elif uid == kDummy:
yield ArchiveProperty(self, "dummy[]")
else:
yield ArchiveProperty(self, "prop[]")
class Header(FieldSet):
def createFields(self):
yield PropID(self, "id")
while not self.eof:
uid = ReadNextByte(self)
if uid == kEnd:
yield PropID(self, "end")
break
elif uid == kArchiveProperties:
yield ArchiveProperties(self, "props", PROP_DESC[uid])
elif uid == kAdditionalStreamsInfo:
yield StreamsInfo(self, "additional_streams", PROP_DESC[uid])
elif uid == kMainStreamsInfo:
yield StreamsInfo(self, "main_streams", PROP_DESC[uid])
elif uid == kFilesInfo:
yield FilesInfo(self, "files_info", PROP_DESC[uid])
else:
raise ParserError("Unexpected ID %u" % uid)
class NextHeader(FieldSet):
def __init__(self, parent, name, desc="Next header"):
FieldSet.__init__(self, parent, name, desc)
self._size = 8 * self["/signature/start_hdr/next_hdr_size"].value
def createFields(self):
uid = ReadNextByte(self)
if uid == kHeader:
yield Header(self, "header", PROP_DESC[uid])
elif uid == kEncodedHeader:
yield EncodedHeader(self, "encoded_hdr", PROP_DESC[uid])
else:
raise ParserError("Unexpected ID %u" % uid)
class NextHeaderParser(Parser):
PARSER_TAGS = {
}
endian = LITTLE_ENDIAN
def createFields(self):
uid = ReadNextByte(self)
if uid == kHeader:
yield Header(self, "header", PROP_DESC[uid])
elif uid == kEncodedHeader:
yield EncodedHeader(self, "encoded_hdr", PROP_DESC[uid])
else:
raise ParserError("Unexpected ID %u" % uid)
def validate(self):
return True
class CompressedData(Bytes):
def __init__(self, parent, name, length, decompressor, description=None,
parser=None, filename=None, mime_type=None, parser_class=None):
if filename:
if not isinstance(filename, unicode):
filename = makePrintable(filename, "ISO-8859-1")
if not description:
description = 'File "%s" (%s)' % (
filename, humanFilesize(length))
Bytes.__init__(self, parent, name, length, description)
self.setupInputStream(decompressor, parser,
filename, mime_type, parser_class)
def setupInputStream(self, decompressor, parser, filename, mime_type, parser_class):
def createInputStream(cis, **args):
tags = args.setdefault("tags", [])
if parser_class:
tags.append(("class", parser_class))
if parser is not None:
tags.append(("id", parser.PARSER_TAGS["id"]))
if mime_type:
tags.append(("mime", mime_type))
if filename:
tags.append(("filename", filename))
# print args
return StringInputStream(decompressor(self.value), **args)
self.setSubIStream(createInputStream)
def get_header_decompressor(self):
unpack_info = self['/next_hdr/encoded_hdr/unpack_info']
assert unpack_info['num_folders'].value == 1
coder = unpack_info['folder[0]/coders[0]/coder[0]']
method = METHODS[coder['id'].value]
if method == 'Copy':
return lambda data: data
elif method == '7z-LZMA' and has_lzma:
props = coder['properties'].value
length = unpack_info['unpack_size[0][0]'].value
return lambda data: lzmadecompress(props + data, maxlength=length)
def get_header_field(self, name, size, description=None):
decompressor = get_header_decompressor(self)
if decompressor is None:
return RawBytes(self, name, size, description=description)
return CompressedData(self, name, size, decompressor, description=description, parser_class=NextHeaderParser)
class Body(FieldSet):
def __init__(self, parent, name, desc="Body data"):
FieldSet.__init__(self, parent, name, desc)
self._size = 8 * self["/signature/start_hdr/next_hdr_offset"].value
def createFields(self):
if "encoded_hdr" in self["/next_hdr"]:
pack_size = sum([s.value for s in self.array("/next_hdr/encoded_hdr/pack_info/pack_size")])
body_size = self["/next_hdr/encoded_hdr/pack_info/pack_pos"].value
if body_size:
yield RawBytes(self, "compressed_data", body_size, "Compressed data")
# Here we could check if copy method was used to "compress" it,
# but this never happens, so just output "compressed file info"
yield get_header_field(self, "compressed_file_info", pack_size,
"Compressed file information")
size = (self._size // 8) - pack_size - body_size
if size > 0:
yield RawBytes(self, "unknown_data", size)
elif "header" in self["/next_hdr"]:
yield RawBytes(self, "compressed_data", self._size // 8, "Compressed data")
class StartHeader(FieldSet):
static_size = 160
def createFields(self):
yield textHandler(UInt64(self, "next_hdr_offset",
"Next header offset"), hexadecimal)
yield UInt64(self, "next_hdr_size", "Next header size")
yield textHandler(UInt32(self, "next_hdr_crc",
"Next header CRC"), hexadecimal)
class SignatureHeader(FieldSet):
static_size = 96 + StartHeader.static_size
def createFields(self):
yield Bytes(self, "signature", 6, "Signature Header")
yield UInt8(self, "major_ver", "Archive major version")
yield UInt8(self, "minor_ver", "Archive minor version")
yield textHandler(UInt32(self, "start_hdr_crc",
"Start header CRC"), hexadecimal)
yield StartHeader(self, "start_hdr", "Start header")
class SevenZipParser(Parser):
MAGIC = "7z\xbc\xaf\x27\x1c"
PARSER_TAGS = {
"id": "7zip",
"category": "archive",
"file_ext": ("7z",),
"mime": (u"application/x-7z-compressed",),
"min_size": 32 * 8,
"magic": ((MAGIC, 0),),
"description": "Compressed archive in 7z format"
}
endian = LITTLE_ENDIAN
def createFields(self):
yield SignatureHeader(self, "signature", "Signature Header")
yield Body(self, "body_data")
yield NextHeader(self, "next_hdr")
def validate(self):
if self.stream.readBytes(0, len(self.MAGIC)) != self.MAGIC:
return "Invalid signature"
return True
def createContentSize(self):
size = self["/signature/start_hdr/next_hdr_offset"].value * 8
size += self["/signature/start_hdr/next_hdr_size"].value * 8
size += SignatureHeader.static_size
return size