SickGear/lib/hachoir/parser/archive/ace.py

283 lines
9.8 KiB
Python
Raw Permalink Normal View History

"""
ACE parser
From wotsit.org and the SDK header (bitflags)
Partial study of a new block type (5) I've called "new_recovery", as its
syntax is very close to the former one (of type 2).
Status: can only read totally file and header blocks.
Author: Christophe Gisquet <christophe.gisquet@free.fr>
Creation date: 19 january 2006
"""
from hachoir.parser import Parser
from hachoir.field import (StaticFieldSet, FieldSet,
Bit, Bits, NullBits, RawBytes, Enum,
UInt8, UInt16, UInt32,
PascalString8, PascalString16, String,
TimeDateMSDOS32)
from hachoir.core.text_handler import textHandler, filesizeHandler, hexadecimal
from hachoir.core.endian import LITTLE_ENDIAN
from hachoir.parser.common.msdos import MSDOSFileAttr32
MAGIC = b"**ACE**"
OS_MSDOS = 0
OS_WIN32 = 2
HOST_OS = {
0: "MS-DOS",
1: "OS/2",
2: "Win32",
3: "Unix",
4: "MAC-OS",
5: "Win NT",
6: "Primos",
7: "APPLE GS",
8: "ATARI",
9: "VAX VMS",
10: "AMIGA",
11: "NEXT",
}
COMPRESSION_TYPE = {
0: "Store",
1: "Lempel-Ziv 77",
2: "ACE v2.0",
}
COMPRESSION_MODE = {
0: "fastest",
1: "fast",
2: "normal",
3: "good",
4: "best",
}
# TODO: Computing the CRC16 would also prove useful
# def markerValidate(self):
# return not self["extend"].value and self["signature"].value == MAGIC and \
# self["host_os"].value<12
class MarkerFlags(StaticFieldSet):
format = (
(Bit, "extend", "Whether the header is extended"),
(Bit, "has_comment", "Whether the archive has a comment"),
(NullBits, "unused", 7, "Reserved bits"),
(Bit, "sfx", "SFX"),
(Bit, "limited_dict", "Junior SFX with 256K dictionary"),
(Bit, "multi_volume", "Part of a set of ACE archives"),
(Bit, "has_av_string", "This header holds an AV-string"),
(Bit, "recovery_record", "Recovery record preset"),
(Bit, "locked", "Archive is locked"),
(Bit, "solid", "Archive uses solid compression")
)
def markerFlags(self):
yield MarkerFlags(self, "flags", "Marker flags")
def markerHeader(self):
yield String(self, "signature", 7, "Signature")
yield UInt8(self, "ver_extract", "Version needed to extract archive")
yield UInt8(self, "ver_created", "Version used to create archive")
yield Enum(UInt8(self, "host_os", "OS where the files were compressed"), HOST_OS)
yield UInt8(self, "vol_num", "Volume number")
yield TimeDateMSDOS32(self, "time", "Date and time (MS DOS format)")
yield Bits(self, "reserved", 64, "Reserved size for future extensions")
flags = self["flags"]
if flags["has_av_string"].value:
yield PascalString8(self, "av_string", "AV String")
if flags["has_comment"].value:
size = filesizeHandler(UInt16(self, "comment_size", "Comment size"))
yield size
if size.value > 0:
yield RawBytes(self, "compressed_comment", size.value,
"Compressed comment")
class FileFlags(StaticFieldSet):
format = (
(Bit, "extend", "Whether the header is extended"),
(Bit, "has_comment", "Presence of file comment"),
(Bits, "unused", 10, "Unused bit flags"),
(Bit, "encrypted", "File encrypted with password"),
(Bit, "previous", "File continued from previous volume"),
(Bit, "next", "File continues on the next volume"),
(Bit, "solid", "File compressed using previously archived files")
)
def fileFlags(self):
yield FileFlags(self, "flags", "File flags")
def fileHeader(self):
yield filesizeHandler(UInt32(self, "compressed_size", "Size of the compressed file"))
yield filesizeHandler(UInt32(self, "uncompressed_size", "Uncompressed file size"))
yield TimeDateMSDOS32(self, "ftime", "Date and time (MS DOS format)")
if self["/header/host_os"].value in (OS_MSDOS, OS_WIN32):
yield MSDOSFileAttr32(self, "file_attr", "File attributes")
else:
yield textHandler(UInt32(self, "file_attr", "File attributes"), hexadecimal)
yield textHandler(UInt32(self, "file_crc32", "CRC32 checksum over the compressed file)"), hexadecimal)
yield Enum(UInt8(self, "compression_type", "Type of compression"), COMPRESSION_TYPE)
yield Enum(UInt8(self, "compression_mode", "Quality of compression"), COMPRESSION_MODE)
yield textHandler(UInt16(self, "parameters", "Compression parameters"), hexadecimal)
yield textHandler(UInt16(self, "reserved", "Reserved data"), hexadecimal)
# Filename
yield PascalString16(self, "filename", "Filename")
# Comment
if self["flags/has_comment"].value:
yield filesizeHandler(UInt16(self, "comment_size", "Size of the compressed comment"))
if self["comment_size"].value > 0:
yield RawBytes(self, "comment_data", self["comment_size"].value, "Comment data")
def fileBody(self):
size = self["compressed_size"].value
if size > 0:
yield RawBytes(self, "compressed_data", size, "Compressed data")
def fileDesc(self):
return "File entry: %s (%s)" % (self["filename"].value, self["compressed_size"].display)
def recoveryHeader(self):
yield filesizeHandler(UInt32(self, "rec_blk_size", "Size of recovery data"))
self.body_size = self["rec_blk_size"].size
yield String(self, "signature", 7, "Signature, normally '**ACE**'")
yield textHandler(UInt32(self, "relative_start",
"Relative start (to this block) of the data this block is mode of"),
hexadecimal)
yield UInt32(self, "num_blocks", "Number of blocks the data is split into")
yield UInt32(self, "size_blocks", "Size of these blocks")
yield UInt16(self, "crc16_blocks", "CRC16 over recovery data")
# size_blocks blocks of size size_blocks follow
# The ultimate data is the xor data of all those blocks
size = self["size_blocks"].value
for index in range(self["num_blocks"].value):
yield RawBytes(self, "data[]", size, "Recovery block %i" % index)
yield RawBytes(self, "xor_data", size, "The XOR value of the above data blocks")
def recoveryDesc(self):
return "Recovery block, size=%u" % self["body_size"].display
def newRecoveryHeader(self):
"""
This header is described nowhere
"""
if self["flags/extend"].value:
yield filesizeHandler(UInt32(self, "body_size", "Size of the unknown body following"))
self.body_size = self["body_size"].value
yield textHandler(UInt32(self, "unknown[]", "Unknown field, probably 0"),
hexadecimal)
yield String(self, "signature", 7, "Signature, normally '**ACE**'")
yield textHandler(UInt32(self, "relative_start",
"Offset (=crc16's) of this block in the file"), hexadecimal)
yield textHandler(UInt32(self, "unknown[]",
"Unknown field, probably 0"), hexadecimal)
class BaseFlags(StaticFieldSet):
format = (
(Bit, "extend", "Whether the header is extended"),
(NullBits, "unused", 15, "Unused bit flags")
)
def parseFlags(self):
yield BaseFlags(self, "flags", "Unknown flags")
def parseHeader(self):
if self["flags/extend"].value:
yield filesizeHandler(UInt32(self, "body_size", "Size of the unknown body following"))
self.body_size = self["body_size"].value
def parseBody(self):
if self.body_size > 0:
yield RawBytes(self, "body_data", self.body_size, "Body data, unhandled")
class Block(FieldSet):
TAG_INFO = {
0: ("header", "Archiver header", markerFlags, markerHeader, None),
1: ("file[]", fileDesc, fileFlags, fileHeader, fileBody),
2: ("recovery[]", recoveryDesc, recoveryHeader, None, None),
5: ("new_recovery[]", None, None, newRecoveryHeader, None)
}
def __init__(self, parent, name, description=None):
FieldSet.__init__(self, parent, name, description)
self.body_size = 0
self.desc_func = None
type = self["block_type"].value
if type in self.TAG_INFO:
self._name, desc, self.parseFlags, self.parseHeader, self.parseBody = self.TAG_INFO[
type]
if desc:
if isinstance(desc, str):
self._description = desc
else:
self.desc_func = desc
else:
self.warning("Processing as unknown block block of type %u" % type)
if not self.parseFlags:
self.parseFlags = parseFlags
if not self.parseHeader:
self.parseHeader = parseHeader
if not self.parseBody:
self.parseBody = parseBody
def createFields(self):
yield textHandler(UInt16(self, "crc16", "Archive CRC16 (from byte 4 on)"), hexadecimal)
yield filesizeHandler(UInt16(self, "head_size", "Block size (from byte 4 on)"))
yield UInt8(self, "block_type", "Block type")
# Flags
yield from self.parseFlags(self)
# Rest of the header
yield from self.parseHeader(self)
size = self["head_size"].value - (self.current_size // 8) + (2 + 2)
if size > 0:
yield RawBytes(self, "extra_data", size, "Extra header data, unhandled")
# Body in itself
yield from self.parseBody(self)
def createDescription(self):
if self.desc_func:
return self.desc_func(self)
else:
return "Block: %s" % self["type"].display
class AceFile(Parser):
endian = LITTLE_ENDIAN
PARSER_TAGS = {
"id": "ace",
"category": "archive",
"file_ext": ("ace",),
"mime": ("application/x-ace-compressed",),
"min_size": 50 * 8,
"description": "ACE archive"
}
def validate(self):
if self.stream.readBytes(7 * 8, len(MAGIC)) != MAGIC:
return "Invalid magic"
return True
def createFields(self):
while not self.eof:
yield Block(self, "block[]")