"""Detailed ZLIB parser Author: Robert Xiao Creation date: July 9 2007 """ from hachoir.parser import Parser from hachoir.field import (Bit, Bits, Field, Int16, UInt32, Enum, FieldSet, GenericFieldSet, PaddingBits, ParserError, RawBytes) from hachoir.core.endian import LITTLE_ENDIAN from hachoir.core.text_handler import textHandler, hexadecimal from hachoir.core.tools import paddingSize, alignValue def extend_data(data: bytearray, length, offset): """Extend data using a length and an offset, LZ-style.""" if length >= offset: new_data = data[-offset:] * (alignValue(length, offset) // offset) data += new_data[:length] else: data += data[-offset:-offset + length] def build_tree(lengths): """Build a Huffman tree from a list of lengths. The ith entry of the input list is the length of the Huffman code corresponding to integer i, or 0 if the integer i is unused.""" max_length = max(lengths) + 1 bit_counts = [0] * max_length next_code = [0] * max_length tree = {} for i in lengths: if i: bit_counts[i] += 1 code = 0 for i in range(1, len(bit_counts)): next_code[i] = code = (code + bit_counts[i - 1]) << 1 for i, ln in enumerate(lengths): if ln: tree[(ln, next_code[ln])] = i next_code[ln] += 1 return tree class HuffmanCode(Field): """Huffman code. Uses tree parameter as the Huffman tree.""" def __init__(self, parent, name, tree, description=None): Field.__init__(self, parent, name, 0, description) endian = self.parent.endian stream = self.parent.stream addr = self.absolute_address value = 0 while (self.size, value) not in tree: if self.size > 256: raise ParserError("Huffman code too long!") bit = stream.readBits(addr, 1, endian) value <<= 1 value += bit self._size += 1 addr += 1 self.huffvalue = value self.realvalue = tree[(self.size, value)] def createValue(self): return self.huffvalue class DeflateBlock(FieldSet): # code: (min, max, extrabits) LENGTH_SYMBOLS = {257: (3, 3, 0), 258: (4, 4, 0), 259: (5, 5, 0), 260: (6, 6, 0), 261: (7, 7, 0), 262: (8, 8, 0), 263: (9, 9, 0), 264: (10, 10, 0), 265: (11, 12, 1), 266: (13, 14, 1), 267: (15, 16, 1), 268: (17, 18, 1), 269: (19, 22, 2), 270: (23, 26, 2), 271: (27, 30, 2), 272: (31, 34, 2), 273: (35, 42, 3), 274: (43, 50, 3), 275: (51, 58, 3), 276: (59, 66, 3), 277: (67, 82, 4), 278: (83, 98, 4), 279: (99, 114, 4), 280: (115, 130, 4), 281: (131, 162, 5), 282: (163, 194, 5), 283: (195, 226, 5), 284: (227, 257, 5), 285: (258, 258, 0) } DISTANCE_SYMBOLS = {0: (1, 1, 0), 1: (2, 2, 0), 2: (3, 3, 0), 3: (4, 4, 0), 4: (5, 6, 1), 5: (7, 8, 1), 6: (9, 12, 2), 7: (13, 16, 2), 8: (17, 24, 3), 9: (25, 32, 3), 10: (33, 48, 4), 11: (49, 64, 4), 12: (65, 96, 5), 13: (97, 128, 5), 14: (129, 192, 6), 15: (193, 256, 6), 16: (257, 384, 7), 17: (385, 512, 7), 18: (513, 768, 8), 19: (769, 1024, 8), 20: (1025, 1536, 9), 21: (1537, 2048, 9), 22: (2049, 3072, 10), 23: (3073, 4096, 10), 24: (4097, 6144, 11), 25: (6145, 8192, 11), 26: (8193, 12288, 12), 27: (12289, 16384, 12), 28: (16385, 24576, 13), 29: (24577, 32768, 13), } CODE_LENGTH_ORDER = [16, 17, 18, 0, 8, 7, 9, 6, 10, 5, 11, 4, 12, 3, 13, 2, 14, 1, 15] def __init__(self, parent, name, uncomp_data=b"", *args, **kwargs): FieldSet.__init__(self, parent, name, *args, **kwargs) self.uncomp_data = bytearray(uncomp_data) def createFields(self): yield Bit(self, "final", "Is this the final block?") # BFINAL yield Enum(Bits(self, "compression_type", 2), # BTYPE {0: "None", 1: "Fixed Huffman", 2: "Dynamic Huffman", 3: "Reserved"}) if self["compression_type"].value == 0: # no compression # align on byte boundary padding = paddingSize(self.current_size + self.absolute_address, 8) if padding: yield PaddingBits(self, "padding[]", padding) yield Int16(self, "len") yield Int16(self, "nlen", "One's complement of len") if self["len"].value != ~self["nlen"].value: raise ParserError( "len must be equal to the one's complement of nlen!") # null stored blocks produced by some encoders (e.g. PIL) if self["len"].value: yield RawBytes(self, "data", self["len"].value, "Uncompressed data") return elif self["compression_type"].value == 1: # Fixed Huffman length_tree = {} # (size, huffman code): value distance_tree = {} for i in range(144): length_tree[(8, i + 48)] = i for i in range(144, 256): length_tree[(9, i + 256)] = i for i in range(256, 280): length_tree[(7, i - 256)] = i for i in range(280, 288): length_tree[(8, i - 88)] = i for i in range(32): distance_tree[(5, i)] = i elif self["compression_type"].value == 2: # Dynamic Huffman yield Bits(self, "huff_num_length_codes", 5, "Number of Literal/Length Codes, minus 257") yield Bits(self, "huff_num_distance_codes", 5, "Number of Distance Codes, minus 1") yield Bits(self, "huff_num_code_length_codes", 4, "Number of Code Length Codes, minus 4") code_length_code_lengths = [0] * 19 # confusing variable name... for i in self.CODE_LENGTH_ORDER[:self["huff_num_code_length_codes"].value + 4]: field = Bits( self, "huff_code_length_code[%i]" % i, 3, "Code lengths for the code length alphabet") yield field code_length_code_lengths[i] = field.value code_length_tree = build_tree(code_length_code_lengths) length_code_lengths = [] distance_code_lengths = [] for numcodes, name, lengths in ( (self["huff_num_length_codes"].value + 257, "length", length_code_lengths), (self["huff_num_distance_codes"].value + 1, "distance", distance_code_lengths)): while len(lengths) < numcodes: field = HuffmanCode(self, "huff_%s_code[]" % name, code_length_tree) value = field.realvalue if value < 16: prev_value = value field._description = "Literal Code Length %i (Huffman Code %i)" % ( value, field.value) yield field lengths.append(value) else: info = {16: (3, 6, 2), 17: (3, 10, 3), 18: (11, 138, 7)}[value] if value == 16: repvalue = prev_value else: repvalue = 0 field._description = "Repeat Code %i, Repeating value (%i) %i to %i times (Huffman Code %i)" % ( value, repvalue, info[0], info[1], field.value) yield field extrafield = Bits(self, "huff_%s_code_extra[%s" % ( name, field.name.split('[')[1]), info[2]) num_repeats = extrafield.value + info[0] extrafield._description = "Repeat Extra Bits (%i), total repeats %i" % ( extrafield.value, num_repeats) yield extrafield lengths += [repvalue] * num_repeats length_tree = build_tree(length_code_lengths) distance_tree = build_tree(distance_code_lengths) else: raise ParserError("Unsupported compression type 3!") while True: field = HuffmanCode(self, "length_code[]", length_tree) value = field.realvalue if value < 256: field._description = "Literal Code %r (Huffman Code %i)" % ( chr(value), field.value) yield field self.uncomp_data.append(value) if value == 256: field._description = "Block Terminator Code (256) (Huffman Code %i)" % field.value yield field break elif value > 256: info = self.LENGTH_SYMBOLS[value] if info[2] == 0: field._description = "Length Code %i, Value %i (Huffman Code %i)" % ( value, info[0], field.value) length = info[0] yield field else: field._description = "Length Code %i, Values %i to %i (Huffman Code %i)" % ( value, info[0], info[1], field.value) yield field extrafield = Bits( self, "length_extra[%s" % field.name.split('[')[1], info[2]) length = extrafield.value + info[0] extrafield._description = "Length Extra Bits (%i), total length %i" % ( extrafield.value, length) yield extrafield field = HuffmanCode(self, "distance_code[]", distance_tree) value = field.realvalue info = self.DISTANCE_SYMBOLS[value] if info[2] == 0: field._description = "Distance Code %i, Value %i (Huffman Code %i)" % ( value, info[0], field.value) distance = info[0] yield field else: field._description = "Distance Code %i, Values %i to %i (Huffman Code %i)" % ( value, info[0], info[1], field.value) yield field extrafield = Bits( self, "distance_extra[%s" % field.name.split('[')[1], info[2]) distance = extrafield.value + info[0] extrafield._description = "Distance Extra Bits (%i), total length %i" % ( extrafield.value, distance) yield extrafield extend_data(self.uncomp_data, length, distance) class DeflateData(GenericFieldSet): endian = LITTLE_ENDIAN def createFields(self): uncomp_data = bytearray() blk = DeflateBlock(self, "compressed_block[]", uncomp_data) yield blk uncomp_data = blk.uncomp_data while not blk["final"].value: blk = DeflateBlock(self, "compressed_block[]", uncomp_data) yield blk uncomp_data = blk.uncomp_data # align on byte boundary padding = paddingSize(self.current_size + self.absolute_address, 8) if padding: yield PaddingBits(self, "padding[]", padding) self.uncompressed_data = uncomp_data class ZlibData(Parser): PARSER_TAGS = { "id": "zlib", "category": "archive", "file_ext": ("zlib",), "min_size": 8 * 8, "description": "ZLIB Data", } endian = LITTLE_ENDIAN def validate(self): if self["compression_method"].value != 8: return "Incorrect compression method" if ((self["compression_info"].value << 12) + (self["compression_method"].value << 8) + (self["flag_compression_level"].value << 6) + (self["flag_dictionary_present"].value << 5) + (self["flag_check_bits"].value)) % 31 != 0: return "Invalid flag check value" return True def createFields(self): # CM yield Enum(Bits(self, "compression_method", 4), {8: "deflate", 15: "reserved"}) # CINFO yield Bits(self, "compression_info", 4, "base-2 log of the window size") yield Bits(self, "flag_check_bits", 5) # FCHECK yield Bit(self, "flag_dictionary_present") # FDICT yield Enum(Bits(self, "flag_compression_level", 2), # FLEVEL {0: "Fastest", 1: "Fast", 2: "Default", 3: "Maximum, Slowest"}) if self["flag_dictionary_present"].value: yield textHandler(UInt32(self, "dict_checksum", "ADLER32 checksum of dictionary information"), hexadecimal) yield DeflateData(self, "data", self.stream, description="Compressed Data") yield textHandler(UInt32(self, "data_checksum", "ADLER32 checksum of compressed data"), hexadecimal) def zlib_inflate(stream, wbits=None): if wbits is None or wbits >= 0: return ZlibData(stream)["data"].uncompressed_data else: data = DeflateData(None, "root", stream, "", stream.askSize(None)) for _ in data: pass return data.uncompressed_data