mirror of
https://github.com/SickGear/SickGear.git
synced 2025-01-06 01:53:37 +00:00
301 lines
13 KiB
Python
301 lines
13 KiB
Python
"""Detailed ZLIB parser
|
|
|
|
Author: Robert Xiao
|
|
Creation date: July 9 2007
|
|
|
|
"""
|
|
|
|
from hachoir_parser import Parser
|
|
from hachoir_core.field import (Bit, Bits, Field, Int16, UInt32,
|
|
Enum, FieldSet, GenericFieldSet,
|
|
PaddingBits, ParserError, RawBytes)
|
|
from hachoir_core.endian import LITTLE_ENDIAN
|
|
from hachoir_core.text_handler import textHandler, hexadecimal
|
|
from hachoir_core.tools import paddingSize, alignValue
|
|
|
|
def extend_data(data, length, offset):
|
|
"""Extend data using a length and an offset."""
|
|
if length >= offset:
|
|
new_data = data[-offset:] * (alignValue(length, offset) // offset)
|
|
return data + new_data[:length]
|
|
else:
|
|
return data + data[-offset:-offset+length]
|
|
|
|
def build_tree(lengths):
|
|
"""Build a Huffman tree from a list of lengths.
|
|
The ith entry of the input list is the length of the Huffman code corresponding to
|
|
integer i, or 0 if the integer i is unused."""
|
|
max_length = max(lengths) + 1
|
|
bit_counts = [0]*max_length
|
|
next_code = [0]*max_length
|
|
tree = {}
|
|
for i in lengths:
|
|
if i:
|
|
bit_counts[i] += 1
|
|
code = 0
|
|
for i in xrange(1, len(bit_counts)):
|
|
next_code[i] = code = (code + bit_counts[i-1]) << 1
|
|
for i, ln in enumerate(lengths):
|
|
if ln:
|
|
tree[(ln, next_code[ln])] = i
|
|
next_code[ln] += 1
|
|
return tree
|
|
|
|
class HuffmanCode(Field):
|
|
"""Huffman code. Uses tree parameter as the Huffman tree."""
|
|
def __init__(self, parent, name, tree, description=None):
|
|
Field.__init__(self, parent, name, 0, description)
|
|
|
|
endian = self.parent.endian
|
|
stream = self.parent.stream
|
|
addr = self.absolute_address
|
|
|
|
value = 0
|
|
while (self.size, value) not in tree:
|
|
if self.size > 256:
|
|
raise ParserError("Huffman code too long!")
|
|
bit = stream.readBits(addr, 1, endian)
|
|
value <<= 1
|
|
value += bit
|
|
self._size += 1
|
|
addr += 1
|
|
self.huffvalue = value
|
|
self.realvalue = tree[(self.size, value)]
|
|
def createValue(self):
|
|
return self.huffvalue
|
|
|
|
class DeflateBlock(FieldSet):
|
|
# code: (min, max, extrabits)
|
|
LENGTH_SYMBOLS = {257:(3,3,0),
|
|
258:(4,4,0),
|
|
259:(5,5,0),
|
|
260:(6,6,0),
|
|
261:(7,7,0),
|
|
262:(8,8,0),
|
|
263:(9,9,0),
|
|
264:(10,10,0),
|
|
265:(11,12,1),
|
|
266:(13,14,1),
|
|
267:(15,16,1),
|
|
268:(17,18,1),
|
|
269:(19,22,2),
|
|
270:(23,26,2),
|
|
271:(27,30,2),
|
|
272:(31,34,2),
|
|
273:(35,42,3),
|
|
274:(43,50,3),
|
|
275:(51,58,3),
|
|
276:(59,66,3),
|
|
277:(67,82,4),
|
|
278:(83,98,4),
|
|
279:(99,114,4),
|
|
280:(115,130,4),
|
|
281:(131,162,5),
|
|
282:(163,194,5),
|
|
283:(195,226,5),
|
|
284:(227,257,5),
|
|
285:(258,258,0)
|
|
}
|
|
DISTANCE_SYMBOLS = {0:(1,1,0),
|
|
1:(2,2,0),
|
|
2:(3,3,0),
|
|
3:(4,4,0),
|
|
4:(5,6,1),
|
|
5:(7,8,1),
|
|
6:(9,12,2),
|
|
7:(13,16,2),
|
|
8:(17,24,3),
|
|
9:(25,32,3),
|
|
10:(33,48,4),
|
|
11:(49,64,4),
|
|
12:(65,96,5),
|
|
13:(97,128,5),
|
|
14:(129,192,6),
|
|
15:(193,256,6),
|
|
16:(257,384,7),
|
|
17:(385,512,7),
|
|
18:(513,768,8),
|
|
19:(769,1024,8),
|
|
20:(1025,1536,9),
|
|
21:(1537,2048,9),
|
|
22:(2049,3072,10),
|
|
23:(3073,4096,10),
|
|
24:(4097,6144,11),
|
|
25:(6145,8192,11),
|
|
26:(8193,12288,12),
|
|
27:(12289,16384,12),
|
|
28:(16385,24576,13),
|
|
29:(24577,32768,13),
|
|
}
|
|
CODE_LENGTH_ORDER = [16, 17, 18, 0, 8, 7, 9, 6, 10, 5, 11, 4, 12, 3, 13, 2, 14, 1, 15]
|
|
def __init__(self, parent, name, uncomp_data="", *args, **kwargs):
|
|
FieldSet.__init__(self, parent, name, *args, **kwargs)
|
|
self.uncomp_data = uncomp_data
|
|
|
|
def createFields(self):
|
|
yield Bit(self, "final", "Is this the final block?") # BFINAL
|
|
yield Enum(Bits(self, "compression_type", 2), # BTYPE
|
|
{0:"None", 1:"Fixed Huffman", 2:"Dynamic Huffman", 3:"Reserved"})
|
|
if self["compression_type"].value == 0: # no compression
|
|
padding = paddingSize(self.current_size + self.absolute_address, 8) # align on byte boundary
|
|
if padding:
|
|
yield PaddingBits(self, "padding[]", padding)
|
|
yield Int16(self, "len")
|
|
yield Int16(self, "nlen", "One's complement of len")
|
|
if self["len"].value != ~self["nlen"].value:
|
|
raise ParserError("len must be equal to the one's complement of nlen!")
|
|
if self["len"].value: # null stored blocks produced by some encoders (e.g. PIL)
|
|
yield RawBytes(self, "data", self["len"].value, "Uncompressed data")
|
|
return
|
|
elif self["compression_type"].value == 1: # Fixed Huffman
|
|
length_tree = {} # (size, huffman code): value
|
|
distance_tree = {}
|
|
for i in xrange(144):
|
|
length_tree[(8, i+48)] = i
|
|
for i in xrange(144, 256):
|
|
length_tree[(9, i+256)] = i
|
|
for i in xrange(256, 280):
|
|
length_tree[(7, i-256)] = i
|
|
for i in xrange(280, 288):
|
|
length_tree[(8, i-88)] = i
|
|
for i in xrange(32):
|
|
distance_tree[(5, i)] = i
|
|
elif self["compression_type"].value == 2: # Dynamic Huffman
|
|
yield Bits(self, "huff_num_length_codes", 5, "Number of Literal/Length Codes, minus 257")
|
|
yield Bits(self, "huff_num_distance_codes", 5, "Number of Distance Codes, minus 1")
|
|
yield Bits(self, "huff_num_code_length_codes", 4, "Number of Code Length Codes, minus 4")
|
|
code_length_code_lengths = [0]*19 # confusing variable name...
|
|
for i in self.CODE_LENGTH_ORDER[:self["huff_num_code_length_codes"].value+4]:
|
|
field = Bits(self, "huff_code_length_code[%i]" % i, 3, "Code lengths for the code length alphabet")
|
|
yield field
|
|
code_length_code_lengths[i] = field.value
|
|
code_length_tree = build_tree(code_length_code_lengths)
|
|
length_code_lengths = []
|
|
distance_code_lengths = []
|
|
for numcodes, name, lengths in (
|
|
(self["huff_num_length_codes"].value + 257, "length", length_code_lengths),
|
|
(self["huff_num_distance_codes"].value + 1, "distance", distance_code_lengths)):
|
|
while len(lengths) < numcodes:
|
|
field = HuffmanCode(self, "huff_%s_code[]" % name, code_length_tree)
|
|
value = field.realvalue
|
|
if value < 16:
|
|
prev_value = value
|
|
field._description = "Literal Code Length %i (Huffman Code %i)" % (value, field.value)
|
|
yield field
|
|
lengths.append(value)
|
|
else:
|
|
info = {16: (3,6,2),
|
|
17: (3,10,3),
|
|
18: (11,138,7)}[value]
|
|
if value == 16:
|
|
repvalue = prev_value
|
|
else:
|
|
repvalue = 0
|
|
field._description = "Repeat Code %i, Repeating value (%i) %i to %i times (Huffman Code %i)" % (value, repvalue, info[0], info[1], field.value)
|
|
yield field
|
|
extrafield = Bits(self, "huff_%s_code_extra[%s" % (name, field.name.split('[')[1]), info[2])
|
|
num_repeats = extrafield.value+info[0]
|
|
extrafield._description = "Repeat Extra Bits (%i), total repeats %i"%(extrafield.value, num_repeats)
|
|
yield extrafield
|
|
lengths += [repvalue]*num_repeats
|
|
length_tree = build_tree(length_code_lengths)
|
|
distance_tree = build_tree(distance_code_lengths)
|
|
else:
|
|
raise ParserError("Unsupported compression type 3!")
|
|
while True:
|
|
field = HuffmanCode(self, "length_code[]", length_tree)
|
|
value = field.realvalue
|
|
if value < 256:
|
|
field._description = "Literal Code %r (Huffman Code %i)" % (chr(value), field.value)
|
|
yield field
|
|
self.uncomp_data += chr(value)
|
|
if value == 256:
|
|
field._description = "Block Terminator Code (256) (Huffman Code %i)" % field.value
|
|
yield field
|
|
break
|
|
elif value > 256:
|
|
info = self.LENGTH_SYMBOLS[value]
|
|
if info[2] == 0:
|
|
field._description = "Length Code %i, Value %i (Huffman Code %i)" % (value, info[0], field.value)
|
|
length = info[0]
|
|
yield field
|
|
else:
|
|
field._description = "Length Code %i, Values %i to %i (Huffman Code %i)" % (value, info[0], info[1], field.value)
|
|
yield field
|
|
extrafield = Bits(self, "length_extra[%s" % field.name.split('[')[1], info[2])
|
|
length = extrafield.value + info[0]
|
|
extrafield._description = "Length Extra Bits (%i), total length %i"%(extrafield.value, length)
|
|
yield extrafield
|
|
field = HuffmanCode(self, "distance_code[]", distance_tree)
|
|
value = field.realvalue
|
|
info = self.DISTANCE_SYMBOLS[value]
|
|
if info[2] == 0:
|
|
field._description = "Distance Code %i, Value %i (Huffman Code %i)" % (value, info[0], field.value)
|
|
distance = info[0]
|
|
yield field
|
|
else:
|
|
field._description = "Distance Code %i, Values %i to %i (Huffman Code %i)" % (value, info[0], info[1], field.value)
|
|
yield field
|
|
extrafield = Bits(self, "distance_extra[%s" % field.name.split('[')[1], info[2])
|
|
distance = extrafield.value + info[0]
|
|
extrafield._description = "Distance Extra Bits (%i), total length %i"%(extrafield.value, distance)
|
|
yield extrafield
|
|
self.uncomp_data = extend_data(self.uncomp_data, length, distance)
|
|
|
|
class DeflateData(GenericFieldSet):
|
|
endian = LITTLE_ENDIAN
|
|
def createFields(self):
|
|
uncomp_data = ""
|
|
blk=DeflateBlock(self, "compressed_block[]", uncomp_data)
|
|
yield blk
|
|
uncomp_data = blk.uncomp_data
|
|
while not blk["final"].value:
|
|
blk=DeflateBlock(self, "compressed_block[]", uncomp_data)
|
|
yield blk
|
|
uncomp_data = blk.uncomp_data
|
|
padding = paddingSize(self.current_size + self.absolute_address, 8) # align on byte boundary
|
|
if padding:
|
|
yield PaddingBits(self, "padding[]", padding)
|
|
self.uncompressed_data = uncomp_data
|
|
|
|
class ZlibData(Parser):
|
|
PARSER_TAGS = {
|
|
"id": "zlib",
|
|
"category": "archive",
|
|
"file_ext": ("zlib",),
|
|
"min_size": 8*8,
|
|
"description": "ZLIB Data",
|
|
}
|
|
endian = LITTLE_ENDIAN
|
|
|
|
def validate(self):
|
|
if self["compression_method"].value != 8:
|
|
return "Incorrect compression method"
|
|
if ((self["compression_info"].value << 12) +
|
|
(self["compression_method"].value << 8) +
|
|
(self["flag_compression_level"].value << 6) +
|
|
(self["flag_dictionary_present"].value << 5) +
|
|
(self["flag_check_bits"].value)) % 31 != 0:
|
|
return "Invalid flag check value"
|
|
return True
|
|
|
|
def createFields(self):
|
|
yield Enum(Bits(self, "compression_method", 4), {8:"deflate", 15:"reserved"}) # CM
|
|
yield Bits(self, "compression_info", 4, "base-2 log of the window size") # CINFO
|
|
yield Bits(self, "flag_check_bits", 5) # FCHECK
|
|
yield Bit(self, "flag_dictionary_present") # FDICT
|
|
yield Enum(Bits(self, "flag_compression_level", 2), # FLEVEL
|
|
{0:"Fastest", 1:"Fast", 2:"Default", 3:"Maximum, Slowest"})
|
|
if self["flag_dictionary_present"].value:
|
|
yield textHandler(UInt32(self, "dict_checksum", "ADLER32 checksum of dictionary information"), hexadecimal)
|
|
yield DeflateData(self, "data", self.stream, description = "Compressed Data")
|
|
yield textHandler(UInt32(self, "data_checksum", "ADLER32 checksum of compressed data"), hexadecimal)
|
|
|
|
def zlib_inflate(stream, wbits=None, prevdata=""):
|
|
if wbits is None or wbits >= 0:
|
|
return ZlibData(stream)["data"].uncompressed_data
|
|
else:
|
|
data = DeflateData(None, "root", stream, "", stream.askSize(None))
|
|
for unused in data:
|
|
pass
|
|
return data.uncompressed_data
|