SickGear/lib/hachoir/stream/input.py

620 lines
20 KiB
Python
Raw Permalink Normal View History

from hachoir.core.endian import BIG_ENDIAN, LITTLE_ENDIAN, MIDDLE_ENDIAN
from hachoir.core.error import info
from hachoir.core.log import Logger
from hachoir.core.bits import str2long
from hachoir.core.tools import lowerBound
from hachoir.core.tools import alignValue
from errno import ESPIPE
from weakref import ref as weakref_ref
from hachoir.stream import StreamError
class InputStreamError(StreamError):
pass
class ReadStreamError(InputStreamError):
def __init__(self, size, address, got=None):
self.size = size
self.address = address
self.got = got
if self.got is not None:
msg = "Can't read %u bits at address %u (got %u bits)" % (
self.size, self.address, self.got)
else:
msg = "Can't read %u bits at address %u" % (
self.size, self.address)
InputStreamError.__init__(self, msg)
class NullStreamError(InputStreamError):
def __init__(self, source):
self.source = source
msg = "Input size is nul (source='%s')!" % self.source
InputStreamError.__init__(self, msg)
class FileFromInputStream:
_offset = 0
_from_end = False
def __init__(self, stream):
self.stream = stream
self._setSize(stream.askSize(self))
def _setSize(self, size):
if size is None:
self._size = size
elif size % 8:
raise InputStreamError("Invalid size")
else:
self._size = size // 8
def tell(self):
if self._from_end:
while self._size is None:
self.stream._feed(max(self.stream._current_size << 1, 1 << 16))
self._from_end = False
self._offset += self._size
return self._offset
def seek(self, pos, whence=0):
if whence == 0:
self._from_end = False
self._offset = pos
elif whence == 1:
self._offset += pos
elif whence == 2:
self._from_end = True
self._offset = pos
else:
raise ValueError("seek() second argument must be 0, 1 or 2")
def read(self, size=None):
def read(address, size):
shift, data, missing = self.stream.read(8 * address, 8 * size)
if shift:
raise InputStreamError("TODO: handle non-byte-aligned data")
return data
if self._size or size is not None and not self._from_end:
# We don't want self.tell() to read anything
# and the size must be known if we read until the end.
pos = self.tell()
if size is None or (self._size is not None and self._size < pos + size):
size = self._size - pos
if size <= 0:
return b''
data = read(pos, size)
self._offset += len(data)
return data
elif self._from_end:
# TODO: not tested
max_size = - self._offset
if size is None or max_size < size:
size = max_size
if size <= 0:
return b''
data = b'', b''
self._offset = max(
0, self.stream._current_size // 8 + self._offset)
self._from_end = False
bs = max(max_size, 1 << 16)
while True:
d = read(self._offset, bs)
data = data[1], d
self._offset += len(d)
if self._size:
bs = self._size - self._offset
if not bs:
data = data[0] + data[1]
d = len(data) - max_size
return data[d:d + size]
else:
# TODO: not tested
data = []
size = 1 << 16
while True:
d = read(self._offset, size)
data.append(d)
self._offset += len(d)
if self._size:
size = self._size - self._offset
if not size:
return b''.join(data)
class InputStream(Logger):
_set_size = None
_current_size = 0
def __init__(self, source=None, size=None, packets=None, **args):
self.source = source
self._size = size # in bits
if size == 0:
raise NullStreamError(source)
self.tags = tuple(args.get("tags", tuple()))
self.packets = packets
def close(self):
raise NotImplementedError
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def askSize(self, client):
if self._size != self._current_size:
if self._set_size is None:
self._set_size = []
self._set_size.append(weakref_ref(client))
return self._size
def _setSize(self, size=None):
assert self._size is None or self._current_size <= self._size
if self._size != self._current_size:
self._size = self._current_size
if not self._size:
raise NullStreamError(self.source)
if self._set_size:
for client in self._set_size:
client = client()
if client:
client._setSize(self._size)
del self._set_size
size = property(lambda self: self._size, doc="Size of the stream in bits")
checked = property(lambda self: self._size == self._current_size)
def sizeGe(self, size, const=False):
if self._current_size >= size:
return True
elif self._size is not None and 0 < self._size < size:
return False
else:
return not const and not self._feed(size)
def _feed(self, size):
return self.read(size - 1, 1)[2]
def read(self, address, size):
"""
Read 'size' bits at position 'address' (in bits)
from the beginning of the stream.
"""
raise NotImplementedError
def readBits(self, address, nbits, endian):
assert endian in (BIG_ENDIAN, LITTLE_ENDIAN, MIDDLE_ENDIAN)
if endian is MIDDLE_ENDIAN:
# read an aligned chunk of words
wordaddr, remainder = divmod(address, 16)
wordnbits = alignValue(remainder + nbits, 16)
_, data, missing = self.read(wordaddr * 16, wordnbits)
shift = remainder
else:
shift, data, missing = self.read(address, nbits)
if missing:
raise ReadStreamError(nbits, address)
value = str2long(data, endian)
if endian in (BIG_ENDIAN, MIDDLE_ENDIAN):
value >>= len(data) * 8 - shift - nbits
else:
value >>= shift
return value & (1 << nbits) - 1
def readInteger(self, address, signed, nbits, endian):
""" Read an integer number """
value = self.readBits(address, nbits, endian)
# Signe number. Example with nbits=8:
# if 128 <= value: value -= 256
if signed and (1 << (nbits - 1)) <= value:
value -= (1 << nbits)
return value
def readBytes(self, address, nb_bytes):
shift, data, missing = self.read(address, 8 * nb_bytes)
if shift:
raise InputStreamError("TODO: handle non-byte-aligned data")
if missing:
raise ReadStreamError(8 * nb_bytes, address)
return data
def searchBytesLength(self, needle, include_needle,
start_address=0, end_address=None):
"""
If include_needle is True, add its length to the result.
Returns None is needle can't be found.
"""
pos = self.searchBytes(needle, start_address, end_address)
if pos is None:
return None
length = (pos - start_address) // 8
if include_needle:
length += len(needle)
return length
def searchBytes(self, needle, start_address=0, end_address=None):
"""
Search some bytes in [start_address;end_address[. Addresses must
be aligned to byte. Returns the address of the bytes if found,
None else.
"""
if start_address % 8:
raise InputStreamError(
"Unable to search bytes with address with bit granularity")
length = len(needle)
size = max(3 * length, 4096)
buffer = b''
if self._size and (end_address is None or self._size < end_address):
end_address = self._size
while True:
if end_address is not None:
todo = (end_address - start_address) >> 3
if todo < size:
if todo <= 0:
return None
size = todo
data = self.readBytes(start_address, size)
if end_address is None and self._size:
end_address = self._size
size = (end_address - start_address) >> 3
assert size > 0
data = data[:size]
start_address += 8 * size
buffer = buffer[len(buffer) - length + 1:] + data
found = buffer.find(needle)
if found >= 0:
return start_address + (found - len(buffer)) * 8
def file(self):
return FileFromInputStream(self)
class InputPipe(object):
"""
InputPipe makes input streams seekable by caching a certain
amount of data. The memory usage may be unlimited in worst cases.
A function (set_size) is called when the size of the stream is known.
InputPipe sees the input stream as an array of blocks of
size = (2 ^ self.buffer_size) and self.buffers maps to this array.
It also maintains a circular ordered list of non-discarded blocks,
sorted by access time.
Each element of self.buffers is an array of 3 elements:
* self.buffers[i][0] is the data.
len(self.buffers[i][0]) == 1 << self.buffer_size
(except at the end: the length may be smaller)
* self.buffers[i][1] is the index of a more recently used block
* self.buffers[i][2] is the opposite of self.buffers[1],
in order to have a double-linked list.
For any discarded block, self.buffers[i] = None
self.last is the index of the most recently accessed block.
self.first is the first (= smallest index) non-discarded block.
How InputPipe discards blocks:
* Just before returning from the read method.
* Only if there are more than self.buffer_nb_min blocks in memory.
* While self.buffers[self.first] is that least recently used block.
Property: There is no hole in self.buffers, except at the beginning.
"""
buffer_nb_min = 256
buffer_size = 16
last = None
size = None
def __init__(self, input, set_size=None):
self._input = input
self.first = self.address = 0
self.buffers = []
self.set_size = set_size
current_size = property(lambda self: len(self.buffers) << self.buffer_size)
def _append(self, data):
if self.last is None:
self.last = next = prev = 0
else:
prev = self.last
last = self.buffers[prev]
next = last[1]
self.last = self.buffers[next][2] = last[1] = len(self.buffers)
self.buffers.append([data, next, prev])
def _get(self, index):
if index >= len(self.buffers):
return b''
buf = self.buffers[index]
if buf is None:
raise InputStreamError(
"Error: Buffers too small. Can't seek backward.")
if self.last != index:
next = buf[1]
prev = buf[2]
self.buffers[next][2] = prev
self.buffers[prev][1] = next
first = self.buffers[self.last][1]
buf[1] = first
buf[2] = self.last
self.buffers[first][2] = index
self.buffers[self.last][1] = index
self.last = index
return buf[0]
def _flush(self):
lim = len(self.buffers) - self.buffer_nb_min
while self.first < lim:
buf = self.buffers[self.first]
if buf[2] != self.last:
break
info("Discarding buffer %u." % self.first)
self.buffers[self.last][1] = buf[1]
self.buffers[buf[1]][2] = self.last
self.buffers[self.first] = None
self.first += 1
def seek(self, address):
assert 0 <= address
self.address = address
def read(self, size):
end = self.address + size
for i in range(len(self.buffers), (end >> self.buffer_size) + 1):
data = self._input.read(1 << self.buffer_size)
if len(data) < 1 << self.buffer_size:
self.size = (len(self.buffers) << self.buffer_size) + len(data)
if self.set_size:
self.set_size(self.size)
if data:
self._append(data)
break
self._append(data)
block, offset = divmod(self.address, 1 << self.buffer_size)
data = b''.join(self._get(index)
for index in range(block, (end - 1 >> self.buffer_size) + 1)
)[offset:offset + size]
self._flush()
self.address += len(data)
return data
class InputIOStream(InputStream):
def __init__(self, input, size=None, **args):
if not hasattr(input, "seek"):
if size is None:
input = InputPipe(input, self._setSize)
else:
input = InputPipe(input)
elif size is None:
try:
input.seek(0, 2)
size = input.tell() * 8
except IOError as err:
if err.errno == ESPIPE:
input = InputPipe(input, self._setSize)
else:
source = args.get("source", "<inputio:%r>" % input)
raise InputStreamError(
"Unable to get size of %s: %s" % (source, err))
self._input = input
InputStream.__init__(self, size=size, **args)
def close(self):
self._input.close()
def __current_size(self):
if self._size:
return self._size
if self._input.size:
return 8 * self._input.size
return 8 * self._input.current_size
_current_size = property(__current_size)
def read(self, address, size):
if not size:
return (0, b'', False)
assert size > 0
_size = self._size
address, shift = divmod(address, 8)
self._input.seek(address)
size = (size + shift + 7) >> 3
data = self._input.read(size)
got = len(data)
missing = size != got
if missing and _size == self._size:
raise ReadStreamError(8 * size, 8 * address, 8 * got)
return shift, data, missing
def file(self):
if hasattr(self._input, "fileno"):
from os import dup, fdopen
new_fd = dup(self._input.fileno())
new_file = fdopen(new_fd, "rb")
new_file.seek(0)
return new_file
return InputStream.file(self)
class StringInputStream(InputStream):
def __init__(self, data, source="<string>", **args):
self.data = data
InputStream.__init__(self, source=source, size=8 * len(data), **args)
self._current_size = self._size
def close(self):
pass
def read(self, address, size):
address, shift = divmod(address, 8)
size = (size + shift + 7) >> 3
data = self.data[address:address + size]
got = len(data)
if got != size:
raise ReadStreamError(8 * size, 8 * address, 8 * got)
return shift, data, False
class InputSubStream(InputStream):
def __init__(self, stream, offset, size=None, source=None, **args):
if offset is None:
offset = 0
if size is None and stream.size is not None:
size = stream.size - offset
if size <= 0:
raise ValueError("InputSubStream: offset is outside input stream")
self.stream = stream
self._offset = offset
if source is None:
source = "<substream input=%s offset=%s size=%s>" % (
stream.source, offset, size)
InputStream.__init__(self, source=source, size=size, **args)
self.stream.askSize(self)
_current_size = property(lambda self: min(
self._size, max(0, self.stream._current_size - self._offset)))
def close(self):
self.stream = None
def read(self, address, size):
return self.stream.read(self._offset + address, size)
def InputFieldStream(field, **args):
if not field.parent:
return field.stream
stream = field.parent.stream
args["size"] = field.size
args.setdefault("source", stream.source + field.path)
return InputSubStream(stream, field.absolute_address, **args)
class FragmentedStream(InputStream):
def __init__(self, field, **args):
self.stream = field.parent.stream
data = field.getData()
self.fragments = [(0, data.absolute_address, data.size)]
self.next = field.next
args.setdefault("source", "%s%s" % (self.stream.source, field.path))
InputStream.__init__(self, **args)
if not self.next:
self._current_size = data.size
self._setSize()
def close(self):
self.stream = None
def _feed(self, end):
if self._current_size < end:
if self.checked:
raise ReadStreamError(end - self._size, self._size)
a, fa, fs = self.fragments[-1]
while self.stream.sizeGe(fa + min(fs, end - a)):
a += fs
f = self.next
if a >= end:
self._current_size = end
if a == end and not f:
self._setSize()
return False
if f:
self.next = f.next
f = f.getData()
if not f:
self._current_size = a
self._setSize()
return True
fa = f.absolute_address
fs = f.size
self.fragments += [(a, fa, fs)]
self._current_size = a + max(0, self.stream.size - fa)
self._setSize()
return True
return False
def read(self, address, size):
assert size > 0
missing = self._feed(address + size)
if missing:
size = self._size - address
if size <= 0:
return 0, b'', True
d = []
i = lowerBound(self.fragments, lambda x: x[0] <= address)
a, fa, fs = self.fragments[i - 1]
a -= address
fa -= a
fs += a
s = None
while True:
n = min(fs, size)
u, v, w = self.stream.read(fa, n)
assert not w
if s is None:
s = u
else:
assert not u
d += [v]
size -= n
if not size:
return s, b''.join(d), missing
a, fa, fs = self.fragments[i]
i += 1
class ConcatStream(InputStream):
# TODO: concatene any number of any type of stream
def __init__(self, streams, **args):
if len(streams) > 2 or not streams[0].checked:
raise NotImplementedError
self.__size0 = streams[0].size
size1 = streams[1].askSize(self)
if size1 is not None:
args["size"] = self.__size0 + size1
self.__streams = streams
InputStream.__init__(self, **args)
_current_size = property(
lambda self: self.__size0 + self.__streams[1]._current_size)
def close(self):
self.__streams = None
def read(self, address, size):
_size = self._size
s = self.__size0 - address
shift, data, missing = None, b'', False
if s > 0:
s = min(size, s)
shift, data, w = self.__streams[0].read(address, s)
assert not w
a, s = 0, size - s
else:
a, s = -s, size
if s:
u, v, missing = self.__streams[1].read(a, s)
if missing and _size == self._size:
raise ReadStreamError(s, a)
if shift is None:
shift = u
else:
assert not u
data += v
return shift, data, missing