mirror of
https://github.com/SickGear/SickGear.git
synced 2025-01-18 15:53:42 +00:00
442 lines
16 KiB
Python
442 lines
16 KiB
Python
from hachoir.parser import Parser
|
|
from hachoir.field import (FieldSet, StaticFieldSet,
|
|
RawBytes, PaddingBytes, createPaddingField, Link, Fragment,
|
|
Bit, Bits, UInt8, UInt16, UInt32,
|
|
String, Bytes, NullBytes)
|
|
from hachoir.field.integer import GenericInteger
|
|
from hachoir.core.endian import LITTLE_ENDIAN
|
|
from hachoir.core.text_handler import textHandler, hexadecimal
|
|
from hachoir.core.error import error
|
|
from hachoir.core.tools import humanFilesize, makePrintable
|
|
import datetime
|
|
import re
|
|
|
|
strip_index = re.compile(r'\[[^]]+]$')
|
|
|
|
|
|
class Boot(FieldSet):
|
|
static_size = 512 * 8
|
|
|
|
def createFields(self):
|
|
yield Bytes(self, "jmp", 3, "Jump instruction (to skip over header on boot)")
|
|
yield String(self, "oem_name", 8, "OEM Name (padded with spaces)", charset="ASCII")
|
|
yield UInt16(self, "sector_size", "Bytes per sector")
|
|
yield UInt8(self, "cluster_size", "Sectors per cluster")
|
|
yield UInt16(self, "reserved_sectors", "Reserved sector count (including boot sector)")
|
|
yield UInt8(self, "fat_nb", "Number of file allocation tables")
|
|
yield UInt16(self, "max_root", "Maximum number of root directory entries")
|
|
yield UInt16(self, "sectors1", "Total sectors (if zero, use 'sectors2')")
|
|
yield UInt8(self, "media_desc", "Media descriptor")
|
|
yield UInt16(self, "fat_size", "Sectors per FAT")
|
|
yield UInt16(self, "track_size", "Sectors per track")
|
|
yield UInt16(self, "head_nb", "Number of heads")
|
|
yield UInt32(self, "hidden", "Hidden sectors")
|
|
yield UInt32(self, "sectors2", "Total sectors (if greater than 65535)")
|
|
if self.parent.version == 32:
|
|
yield UInt32(self, "fat32_size", "Sectors per FAT")
|
|
yield UInt16(self, "fat_flags", "FAT Flags")
|
|
yield UInt16(self, "version", "Version")
|
|
yield UInt32(self, "root_start", "Cluster number of root directory start")
|
|
yield UInt16(self, "inf_sector", "Sector number of FS Information Sector")
|
|
yield UInt16(self, "boot_copy", "Sector number of a copy of this boot sector")
|
|
yield NullBytes(self, "reserved[]", 12, "Reserved")
|
|
yield UInt8(self, "phys_drv", "Physical drive number")
|
|
yield NullBytes(self, "reserved[]", 1, 'Reserved ("current head")')
|
|
yield UInt8(self, "sign", "Signature")
|
|
yield textHandler(UInt32(self, "serial", "ID (serial number)"), hexadecimal)
|
|
yield String(self, "label", 11, "Volume Label", strip=' ', charset="ASCII")
|
|
yield String(self, "fs_type", 8, "FAT file system type", strip=' ', charset="ASCII")
|
|
yield Bytes(self, "code", 510 - self.current_size // 8, "Operating system boot code")
|
|
yield Bytes(self, "trail_sig", 2, "Signature (0x55 0xAA)")
|
|
|
|
|
|
class FSInfo(StaticFieldSet):
|
|
format = (
|
|
(String, "lead_sig", 4, 'Signature ("RRaA")'),
|
|
(NullBytes, "reserved[]", 480),
|
|
(String, "struct_sig", 4, 'Signature ("rrAa")'),
|
|
(UInt32, "free_count", "Last known free cluster count on the volume"),
|
|
(UInt32, "nxt_free",),
|
|
(NullBytes, "reserved[]", 12),
|
|
(Bytes, "trail_sig", 4, "Signature (0x00 0x00 0x55 0xAA)")
|
|
)
|
|
|
|
|
|
class FAT(FieldSet):
|
|
|
|
def createFields(self):
|
|
version = self.parent.version
|
|
max_entry = 1 << min(28, version)
|
|
|
|
def FatEntry(chunk):
|
|
i = chunk.value
|
|
j = (1 - i) % max_entry
|
|
if j == 0:
|
|
return "reserved cluster"
|
|
elif j == 1:
|
|
return "free cluster"
|
|
elif j < 10:
|
|
return "end of a chain"
|
|
elif j == 10:
|
|
return "bad cluster"
|
|
elif j < 18:
|
|
return "reserved value"
|
|
else:
|
|
return str(i)
|
|
|
|
while self.current_size < self._size:
|
|
yield textHandler(GenericInteger(self, 'entry[]', False, version), FatEntry)
|
|
|
|
|
|
class Date(FieldSet):
|
|
|
|
def __init__(self, parent, name):
|
|
FieldSet.__init__(self, parent, name, size={
|
|
"create": 5,
|
|
"access": 2,
|
|
"modify": 4,
|
|
}[name] * 8)
|
|
|
|
def createFields(self):
|
|
size = self.size // 8
|
|
if size > 2:
|
|
if size > 4:
|
|
yield UInt8(self, "cs", "10ms units, values from 0 to 199")
|
|
yield Bits(self, "2sec", 5, "seconds/2")
|
|
yield Bits(self, "min", 6, "minutes")
|
|
yield Bits(self, "hour", 5, "hours")
|
|
yield Bits(self, "day", 5, "(1-31)")
|
|
yield Bits(self, "month", 4, "(1-12)")
|
|
yield Bits(self, "year", 7, "(0 = 1980, 127 = 2107)")
|
|
|
|
def createDescription(self):
|
|
date = [self["year"].value, self["month"].value, self["day"].value]
|
|
size = self.size // 8
|
|
if size > 2:
|
|
mkdate = datetime.datetime
|
|
cs = 200 * self["2sec"].value
|
|
if size > 4:
|
|
cs += self["cs"].value
|
|
date += [self["hour"].value, self["min"].value,
|
|
cs // 100, cs % 100 * 10000]
|
|
else:
|
|
mkdate = datetime.date
|
|
if date == [0 for i in date]:
|
|
date = None
|
|
else:
|
|
date[0] += 1980
|
|
try:
|
|
date = mkdate(*tuple(date))
|
|
except ValueError:
|
|
return "invalid"
|
|
return str(date)
|
|
|
|
|
|
class InodeLink(Link):
|
|
|
|
def __init__(self, parent, name, target=None):
|
|
Link.__init__(self, parent, name)
|
|
self.target = target
|
|
self.first = None
|
|
|
|
def _getTargetPath(self):
|
|
if not self.target:
|
|
parent = self.parent
|
|
self.target = strip_index.sub(
|
|
r"\\", parent.parent._name) + parent.getFilename().rstrip("/")
|
|
return self.target
|
|
|
|
def createValue(self):
|
|
field = InodeGen(self["/"], self.parent, self._getTargetPath())(self)
|
|
if field:
|
|
self._display = field.path
|
|
return Link.createValue(self)
|
|
|
|
def createDisplay(self):
|
|
return "/%s[0]" % self._getTargetPath()
|
|
|
|
|
|
class FileEntry(FieldSet):
|
|
static_size = 32 * 8
|
|
process = False
|
|
LFN = False
|
|
|
|
def __init__(self, *args):
|
|
FieldSet.__init__(self, *args)
|
|
self.status = self.stream.readBits(
|
|
self.absolute_address, 8, LITTLE_ENDIAN)
|
|
if self.status in (0, 0xE5):
|
|
return
|
|
|
|
magic = self.stream.readBits(
|
|
self.absolute_address + 11 * 8, 8, LITTLE_ENDIAN)
|
|
if magic & 0x3F == 0x0F:
|
|
self.LFN = True
|
|
elif self.getFilename() not in (".", ".."):
|
|
self.process = True
|
|
|
|
def getFilename(self):
|
|
name = self["name"].value
|
|
if isinstance(name, str):
|
|
name = makePrintable(name, "ASCII")
|
|
ext = self["ext"].value
|
|
if ext:
|
|
name += "." + ext
|
|
if name[0] == 5:
|
|
name = "\xE5" + name[1:]
|
|
if not self.LFN and self["directory"].value:
|
|
name += "/"
|
|
return name
|
|
|
|
def createDescription(self):
|
|
if self.status == 0:
|
|
return "Free entry"
|
|
elif self.status == 0xE5:
|
|
return "Deleted file"
|
|
elif self.LFN:
|
|
name = "".join(field.value for field in self.array("name"))
|
|
try:
|
|
name = name[:name.index('\0')]
|
|
except ValueError:
|
|
pass
|
|
seq_no = self["seq_no"].value
|
|
return "Long filename part: '%s' [%u]" % (name, seq_no)
|
|
else:
|
|
return "File: '%s'" % self.getFilename()
|
|
|
|
def getCluster(self):
|
|
cluster = self["cluster_lo"].value
|
|
if self.parent.parent.version > 16:
|
|
cluster += self["cluster_hi"].value << 16
|
|
return cluster
|
|
|
|
def createFields(self):
|
|
if not self.LFN:
|
|
yield String(self, "name", 8, "DOS file name (padded with spaces)",
|
|
strip=' ', charset="ASCII")
|
|
yield String(self, "ext", 3, "DOS file extension (padded with spaces)",
|
|
strip=' ', charset="ASCII")
|
|
yield Bit(self, "read_only")
|
|
yield Bit(self, "hidden")
|
|
yield Bit(self, "system")
|
|
yield Bit(self, "volume_label")
|
|
yield Bit(self, "directory")
|
|
yield Bit(self, "archive")
|
|
yield Bit(self, "device")
|
|
yield Bit(self, "unused")
|
|
yield RawBytes(self, "reserved", 1, "Something about the case")
|
|
yield Date(self, "create")
|
|
yield Date(self, "access")
|
|
if self.parent.parent.version > 16:
|
|
yield UInt16(self, "cluster_hi")
|
|
else:
|
|
yield UInt16(self, "ea_index")
|
|
yield Date(self, "modify")
|
|
yield UInt16(self, "cluster_lo")
|
|
size = UInt32(self, "size")
|
|
yield size
|
|
if self.process:
|
|
del self.process
|
|
target_size = size.value
|
|
if self["directory"].value:
|
|
if target_size:
|
|
size.error("(FAT) value must be zero")
|
|
target_size = 0
|
|
elif not target_size:
|
|
return
|
|
self.target_size = 8 * target_size
|
|
yield InodeLink(self, "data")
|
|
else:
|
|
yield UInt8(self, "seq_no", "Sequence Number")
|
|
yield String(self, "name[]", 10, "(5 UTF-16 characters)",
|
|
charset="UTF-16-LE")
|
|
yield UInt8(self, "magic", "Magic number (15)")
|
|
yield NullBytes(self, "reserved", 1, "(always 0)")
|
|
yield UInt8(self, "checksum", "Checksum of DOS file name")
|
|
yield String(self, "name[]", 12, "(6 UTF-16 characters)",
|
|
charset="UTF-16-LE")
|
|
yield UInt16(self, "first_cluster", "(always 0)")
|
|
yield String(self, "name[]", 4, "(2 UTF-16 characters)",
|
|
charset="UTF-16-LE")
|
|
|
|
|
|
class Directory(Fragment):
|
|
|
|
def createFields(self):
|
|
while self.current_size < self._size:
|
|
yield FileEntry(self, "entry[]")
|
|
|
|
|
|
class File(Fragment):
|
|
|
|
def _getData(self):
|
|
return self["data"]
|
|
|
|
def createFields(self):
|
|
yield Bytes(self, "data", self.datasize // 8)
|
|
padding = self._size - self.current_size
|
|
if padding:
|
|
yield createPaddingField(self, padding)
|
|
|
|
|
|
class InodeGen:
|
|
|
|
def __init__(self, root, entry, path):
|
|
self.root = root
|
|
self.cluster = root.clusters(entry.getCluster)
|
|
self.path = path
|
|
self.filesize = entry.target_size
|
|
self.done = 0
|
|
|
|
def createInputStream(cis, **args):
|
|
args["size"] = self.filesize
|
|
args.setdefault("tags", []).append(
|
|
("filename", entry.getFilename()))
|
|
return cis(**args)
|
|
self.createInputStream = createInputStream
|
|
|
|
def __call__(self, prev):
|
|
name = self.path + "[]"
|
|
address, size, last = next(self.cluster)
|
|
if self.filesize:
|
|
if self.done >= self.filesize:
|
|
error("(FAT) bad metadata for " + self.path)
|
|
return
|
|
field = File(self.root, name, size=size)
|
|
if prev.first is None:
|
|
field._description = 'File size: %s' % humanFilesize(
|
|
self.filesize // 8)
|
|
field.setSubIStream(self.createInputStream)
|
|
field.datasize = min(self.filesize - self.done, size)
|
|
self.done += field.datasize
|
|
else:
|
|
field = Directory(self.root, name, size=size)
|
|
padding = self.root.getFieldByAddress(address, feed=False)
|
|
if not isinstance(padding, (PaddingBytes, RawBytes)):
|
|
error("(FAT) address %u doesn't point to a padding field" % address)
|
|
return
|
|
if last:
|
|
link_next = None
|
|
else:
|
|
def link_next():
|
|
return self(field)
|
|
field.setLinks(prev.first, link_next)
|
|
self.root.writeFieldsIn(padding, address, (field,))
|
|
return field
|
|
|
|
|
|
class FAT_FS(Parser):
|
|
endian = LITTLE_ENDIAN
|
|
PARSER_TAGS = {
|
|
"category": "file_system",
|
|
"min_size": 512 * 8,
|
|
"file_ext": ("",),
|
|
}
|
|
|
|
def _validate(self, type_offset):
|
|
if self.stream.readBytes(type_offset * 8, 8) != ("FAT%-5u" % self.version).encode('ascii'):
|
|
return "Invalid FAT%u signature" % self.version
|
|
if self.stream.readBytes(510 * 8, 2) != b"\x55\xAA":
|
|
return "Invalid BIOS signature"
|
|
return True
|
|
|
|
def clusters(self, cluster_func):
|
|
max_entry = (1 << min(28, self.version)) - 16
|
|
cluster = cluster_func()
|
|
if 1 < cluster < max_entry:
|
|
clus_nb = 1
|
|
next = cluster
|
|
while True:
|
|
next = self.fat[next / 1000][next % 1000].value
|
|
if not 1 < next < max_entry:
|
|
break
|
|
if cluster + clus_nb == next:
|
|
clus_nb += 1
|
|
else:
|
|
yield self.data_start + cluster * self.cluster_size, clus_nb * self.cluster_size, False
|
|
cluster = next
|
|
clus_nb = 1
|
|
yield self.data_start + cluster * self.cluster_size, clus_nb * self.cluster_size, True
|
|
|
|
def createFields(self):
|
|
# Read boot seector
|
|
boot = Boot(self, "boot", "Boot sector")
|
|
yield boot
|
|
self.sector_size = boot["sector_size"].value
|
|
|
|
if self.version == 32:
|
|
for field in sorted((
|
|
(boot["inf_sector"].value, lambda: FSInfo(self, "fsinfo")),
|
|
(boot["boot_copy"].value, lambda: Boot(
|
|
self, "bkboot", "Copy of the boot sector")),
|
|
)):
|
|
if field[0]:
|
|
padding = self.seekByte(field[0] * self.sector_size)
|
|
if padding:
|
|
yield padding
|
|
yield field[1]()
|
|
padding = self.seekByte(
|
|
boot["reserved_sectors"].value * self.sector_size)
|
|
if padding:
|
|
yield padding
|
|
|
|
# Read the two FAT
|
|
fat_size = boot["fat_size"].value
|
|
if fat_size == 0:
|
|
fat_size = boot["fat32_size"].value
|
|
fat_size *= self.sector_size * 8
|
|
for i in range(boot["fat_nb"].value):
|
|
yield FAT(self, "fat[]", "File Allocation Table", size=fat_size)
|
|
|
|
# Read inode table (Directory)
|
|
self.cluster_size = boot["cluster_size"].value * self.sector_size * 8
|
|
self.fat = self["fat[0]"]
|
|
if "root_start" in boot and boot["root_start"].value != 2:
|
|
self.target_size = 0
|
|
self.getCluster = lambda: boot["root_start"].value
|
|
yield InodeLink(self, "root", "root")
|
|
else:
|
|
yield Directory(self, "root[]", size=max(boot["max_root"].value * 32 * 8, boot["cluster_size"].value * self.sector_size * 8))
|
|
self.data_start = self.current_size - 2 * self.cluster_size
|
|
sectors = boot["sectors1"].value
|
|
if not sectors:
|
|
sectors = boot["sectors2"].value
|
|
|
|
for i in range(sectors // boot['cluster_size'].value):
|
|
yield RawBytes(self, "sector[%d]" % (i + 2), self.cluster_size // 8)
|
|
|
|
|
|
class FAT12(FAT_FS):
|
|
PARSER_TAGS = {
|
|
"id": "fat12",
|
|
"description": "FAT12 filesystem",
|
|
"magic": ((b"FAT12 ", 54 * 8),),
|
|
}
|
|
version = 12
|
|
|
|
def validate(self):
|
|
return FAT_FS._validate(self, 54)
|
|
|
|
|
|
class FAT16(FAT_FS):
|
|
PARSER_TAGS = {
|
|
"id": "fat16",
|
|
"description": "FAT16 filesystem",
|
|
"magic": ((b"FAT16 ", 54 * 8),),
|
|
}
|
|
version = 16
|
|
|
|
def validate(self):
|
|
return FAT_FS._validate(self, 54)
|
|
|
|
|
|
class FAT32(FAT_FS):
|
|
PARSER_TAGS = {
|
|
"id": "fat32",
|
|
"description": "FAT32 filesystem",
|
|
"magic": ((b"FAT32 ", 82 * 8),),
|
|
}
|
|
version = 32
|
|
|
|
def validate(self):
|
|
return FAT_FS._validate(self, 82)
|