SickGear/lib/hachoir_parser/archive/bzip2_parser.py
2015-08-20 16:48:28 +01:00

223 lines
8.9 KiB
Python

"""
BZIP2 archive file
Author: Victor Stinner, Robert Xiao
"""
from hachoir_parser import Parser
from hachoir_core.tools import paddingSize
from hachoir_core.field import (Field, FieldSet, GenericVector,
ParserError, String,
PaddingBits, Bit, Bits, Character,
UInt32, Enum, CompressedField)
from hachoir_core.endian import BIG_ENDIAN
from hachoir_core.text_handler import textHandler, hexadecimal
from hachoir_parser.archive.zlib import build_tree, HuffmanCode
try:
from bz2 import BZ2Decompressor
class Bunzip2:
def __init__(self, stream):
self.bzip2 = BZ2Decompressor()
def __call__(self, size, data=''):
try:
return self.bzip2.decompress(data)
except EOFError:
return ''
has_deflate = True
except ImportError:
has_deflate = False
class ZeroTerminatedNumber(Field):
"""Zero (bit) terminated number: e.g. 11110 is 4."""
def __init__(self, parent, name, description=None):
Field.__init__(self, parent, name, 0, description)
endian = self.parent.endian
stream = self.parent.stream
addr = self.absolute_address
value = 0
while True:
bit = stream.readBits(addr, 1, endian)
addr += 1
self._size += 1
if not bit:
break
value += 1
self._value = value
def createValue(self):
return self._value
def move_to_front(l, c):
l[:] = l[c:c+1] + l[0:c] + l[c+1:]
class Bzip2Bitmap(FieldSet):
def __init__(self, parent, name, nb_items, start_index, *args, **kwargs):
FieldSet.__init__(self, parent, name, *args, **kwargs)
self.nb_items = nb_items
self.start_index = start_index
def createFields(self):
for i in xrange(self.start_index, self.start_index+self.nb_items):
yield Bit(self, "symbol_used[%i]"%i, "Is the symbol %i (%r) used?"%(i, chr(i)))
class Bzip2Lengths(FieldSet):
def __init__(self, parent, name, symbols, *args, **kwargs):
FieldSet.__init__(self, parent, name, *args, **kwargs)
self.symbols = symbols
def createFields(self):
yield Bits(self, "start_length", 5)
length = self["start_length"].value
lengths = []
for i in xrange(self.symbols):
while True:
bit = Bit(self, "change_length[%i][]"%i, "Should the length be changed for symbol %i?"%i)
yield bit
if not bit.value:
break
else:
bit = Enum(Bit(self, "length_decrement[%i][]"%i, "Decrement the value?"), {True: "Decrement", False: "Increment"})
yield bit
if bit.value:
length -= 1
else:
length += 1
lengths.append(length)
self.final_length = length
self.tree = build_tree(lengths)
class Bzip2Selectors(FieldSet):
def __init__(self, parent, name, ngroups, *args, **kwargs):
FieldSet.__init__(self, parent, name, *args, **kwargs)
self.groups = range(ngroups)
def createFields(self):
for i in xrange(self["../selectors_used"].value):
field = ZeroTerminatedNumber(self, "selector_list[]")
move_to_front(self.groups, field.value)
field.realvalue = self.groups[0]
field._description = "MTF'ed selector index: raw value %i, real value %i"%(field.value, field.realvalue)
yield field
class Bzip2Block(FieldSet):
def createFields(self):
yield textHandler(Bits(self, "blockheader", 48, "Block header"), hexadecimal)
if self["blockheader"].value != 0x314159265359: # pi
raise ParserError("Invalid block header!")
yield textHandler(UInt32(self, "crc32", "CRC32 for this block"), hexadecimal)
yield Bit(self, "randomized", "Is this block randomized?")
yield Bits(self, "orig_bwt_pointer", 24, "Starting pointer into BWT after untransform")
yield GenericVector(self, "huffman_used_map", 16, Bit, 'block_used', "Bitmap showing which blocks (representing 16 literals each) are in use")
symbols_used = []
for index, block_used in enumerate(self["huffman_used_map"].array('block_used')):
if block_used.value:
start_index = index*16
field = Bzip2Bitmap(self, "huffman_used_bitmap[%i]"%index, 16, start_index, "Bitmap for block %i (literals %i to %i) showing which symbols are in use"%(index, start_index, start_index + 15))
yield field
for i, used in enumerate(field):
if used.value:
symbols_used.append(start_index + i)
yield Bits(self, "huffman_groups", 3, "Number of different Huffman tables in use")
yield Bits(self, "selectors_used", 15, "Number of times the Huffman tables are switched")
yield Bzip2Selectors(self, "selectors_list", self["huffman_groups"].value)
trees = []
for group in xrange(self["huffman_groups"].value):
field = Bzip2Lengths(self, "huffman_lengths[]", len(symbols_used)+2)
yield field
trees.append(field.tree)
counter = 0
rle_run = 0
selector_tree = None
while True:
if counter%50 == 0:
select_id = self["selectors_list"].array("selector_list")[counter//50].realvalue
selector_tree = trees[select_id]
field = HuffmanCode(self, "huffman_code[]", selector_tree)
if field.realvalue in [0, 1]:
# RLE codes
if rle_run == 0:
rle_power = 1
rle_run += (field.realvalue + 1) * rle_power
rle_power <<= 1
field._description = "RLE Run Code %i (for %r); Total accumulated run %i (Huffman Code %i)" % (field.realvalue, chr(symbols_used[0]), rle_run, field.value)
elif field.realvalue == len(symbols_used)+1:
field._description = "Block Terminator (%i) (Huffman Code %i)"%(field.realvalue, field.value)
yield field
break
else:
rle_run = 0
move_to_front(symbols_used, field.realvalue-1)
field._description = "Literal %r (value %i) (Huffman Code %i)"%(chr(symbols_used[0]), field.realvalue, field.value)
yield field
if field.realvalue == len(symbols_used)+1:
break
counter += 1
class Bzip2Stream(FieldSet):
START_BLOCK = 0x314159265359 # pi
END_STREAM = 0x177245385090 # sqrt(pi)
def createFields(self):
end = False
while not end:
marker = self.stream.readBits(self.absolute_address + self.current_size, 48, self.endian)
if marker == self.START_BLOCK:
yield Bzip2Block(self, "block[]")
elif marker == self.END_STREAM:
yield textHandler(Bits(self, "stream_end", 48, "End-of-stream marker"), hexadecimal)
yield textHandler(UInt32(self, "crc32", "CRC32 for entire stream"), hexadecimal)
padding = paddingSize(self.current_size, 8)
if padding:
yield PaddingBits(self, "padding[]", padding)
end = True
else:
raise ParserError("Invalid marker 0x%02X!"%marker)
class Bzip2Parser(Parser):
PARSER_TAGS = {
"id": "bzip2",
"category": "archive",
"file_ext": ("bz2",),
"mime": (u"application/x-bzip2",),
"min_size": 10*8,
"magic": (('BZh', 0),),
"description": "bzip2 archive"
}
endian = BIG_ENDIAN
def validate(self):
if self.stream.readBytes(0, 3) != 'BZh':
return "Wrong file signature"
if not("1" <= self["blocksize"].value <= "9"):
return "Wrong blocksize"
return True
def createFields(self):
yield String(self, "id", 3, "Identifier (BZh)", charset="ASCII")
yield Character(self, "blocksize", "Block size (KB of memory needed to uncompress)")
if self._size is None: # TODO: is it possible to handle piped input?
raise NotImplementedError
size = (self._size - self.current_size)/8
if size:
for tag, filename in self.stream.tags:
if tag == "filename" and filename.endswith(".bz2"):
filename = filename[:-4]
break
else:
filename = None
data = Bzip2Stream(self, "file", size=size*8)
if has_deflate:
CompressedField(self, Bunzip2)
def createInputStream(**args):
if filename:
args.setdefault("tags",[]).append(("filename", filename))
return self._createInputStream(**args)
data._createInputStream = createInputStream
yield data