mirror of
https://github.com/SickGear/SickGear.git
synced 2025-01-22 09:33:37 +00:00
356 lines
13 KiB
Python
356 lines
13 KiB
Python
"""
|
|
Advanced Streaming Format (ASF) parser, format used by Windows Media Video
|
|
(WMF) and Windows Media Audio (WMA).
|
|
|
|
Informations:
|
|
- http://www.microsoft.com/windows/windowsmedia/forpros/format/asfspec.aspx
|
|
- http://swpat.ffii.org/pikta/xrani/asf/index.fr.html
|
|
|
|
Author: Victor Stinner
|
|
Creation: 5 august 2006
|
|
"""
|
|
|
|
from hachoir_parser import Parser
|
|
from hachoir_core.field import (FieldSet, ParserError,
|
|
UInt16, UInt32, UInt64,
|
|
TimestampWin64, TimedeltaWin64,
|
|
String, PascalString16, Enum,
|
|
Bit, Bits, PaddingBits,
|
|
PaddingBytes, NullBytes, RawBytes)
|
|
from hachoir_core.endian import LITTLE_ENDIAN
|
|
from hachoir_core.text_handler import (
|
|
displayHandler, filesizeHandler)
|
|
from hachoir_core.tools import humanBitRate
|
|
from itertools import izip
|
|
from hachoir_parser.video.fourcc import audio_codec_name, video_fourcc_name
|
|
from hachoir_parser.common.win32 import BitmapInfoHeader, GUID
|
|
|
|
MAX_HEADER_SIZE = 100 * 1024 # bytes
|
|
|
|
class AudioHeader(FieldSet):
|
|
guid = "F8699E40-5B4D-11CF-A8FD-00805F5C442B"
|
|
def createFields(self):
|
|
yield Enum(UInt16(self, "twocc"), audio_codec_name)
|
|
yield UInt16(self, "channels")
|
|
yield UInt32(self, "sample_rate")
|
|
yield UInt32(self, "bit_rate")
|
|
yield UInt16(self, "block_align")
|
|
yield UInt16(self, "bits_per_sample")
|
|
yield UInt16(self, "codec_specific_size")
|
|
size = self["codec_specific_size"].value
|
|
if size:
|
|
yield RawBytes(self, "codec_specific", size)
|
|
|
|
class BitrateMutualExclusion(FieldSet):
|
|
guid = "D6E229DC-35DA-11D1-9034-00A0C90349BE"
|
|
mutex_name = {
|
|
"D6E22A00-35DA-11D1-9034-00A0C90349BE": "Language",
|
|
"D6E22A01-35DA-11D1-9034-00A0C90349BE": "Bitrate",
|
|
"D6E22A02-35DA-11D1-9034-00A0C90349BE": "Unknown",
|
|
}
|
|
|
|
def createFields(self):
|
|
yield Enum(GUID(self, "exclusion_type"), self.mutex_name)
|
|
yield UInt16(self, "nb_stream")
|
|
for index in xrange(self["nb_stream"].value):
|
|
yield UInt16(self, "stream[]")
|
|
|
|
class VideoHeader(FieldSet):
|
|
guid = "BC19EFC0-5B4D-11CF-A8FD-00805F5C442B"
|
|
def createFields(self):
|
|
if False:
|
|
yield UInt32(self, "width0")
|
|
yield UInt32(self, "height0")
|
|
yield PaddingBytes(self, "reserved[]", 7)
|
|
yield UInt32(self, "width")
|
|
yield UInt32(self, "height")
|
|
yield PaddingBytes(self, "reserved[]", 2)
|
|
yield UInt16(self, "depth")
|
|
yield Enum(String(self, "codec", 4, charset="ASCII"), video_fourcc_name)
|
|
yield NullBytes(self, "padding", 20)
|
|
else:
|
|
yield UInt32(self, "width")
|
|
yield UInt32(self, "height")
|
|
yield PaddingBytes(self, "reserved[]", 1)
|
|
yield UInt16(self, "format_data_size")
|
|
if self["format_data_size"].value < 40:
|
|
raise ParserError("Unknown format data size")
|
|
yield BitmapInfoHeader(self, "bmp_info", use_fourcc=True)
|
|
|
|
class FileProperty(FieldSet):
|
|
guid = "8CABDCA1-A947-11CF-8EE4-00C00C205365"
|
|
def createFields(self):
|
|
yield GUID(self, "guid")
|
|
yield filesizeHandler(UInt64(self, "file_size"))
|
|
yield TimestampWin64(self, "creation_date")
|
|
yield UInt64(self, "pckt_count")
|
|
yield TimedeltaWin64(self, "play_duration")
|
|
yield TimedeltaWin64(self, "send_duration")
|
|
yield UInt64(self, "preroll")
|
|
yield Bit(self, "broadcast", "Is broadcast?")
|
|
yield Bit(self, "seekable", "Seekable stream?")
|
|
yield PaddingBits(self, "reserved[]", 30)
|
|
yield filesizeHandler(UInt32(self, "min_pckt_size"))
|
|
yield filesizeHandler(UInt32(self, "max_pckt_size"))
|
|
yield displayHandler(UInt32(self, "max_bitrate"), humanBitRate)
|
|
|
|
class HeaderExtension(FieldSet):
|
|
guid = "5FBF03B5-A92E-11CF-8EE3-00C00C205365"
|
|
def createFields(self):
|
|
yield GUID(self, "reserved[]")
|
|
yield UInt16(self, "reserved[]")
|
|
yield UInt32(self, "size")
|
|
if self["size"].value:
|
|
yield RawBytes(self, "data", self["size"].value)
|
|
|
|
class Header(FieldSet):
|
|
guid = "75B22630-668E-11CF-A6D9-00AA0062CE6C"
|
|
def createFields(self):
|
|
yield UInt32(self, "obj_count")
|
|
yield PaddingBytes(self, "reserved[]", 2)
|
|
for index in xrange(self["obj_count"].value):
|
|
yield Object(self, "object[]")
|
|
|
|
class Metadata(FieldSet):
|
|
guid = "75B22633-668E-11CF-A6D9-00AA0062CE6C"
|
|
names = ("title", "author", "copyright", "xxx", "yyy")
|
|
def createFields(self):
|
|
for index in xrange(5):
|
|
yield UInt16(self, "size[]")
|
|
for name, size in izip(self.names, self.array("size")):
|
|
if size.value:
|
|
yield String(self, name, size.value, charset="UTF-16-LE", strip=" \0")
|
|
|
|
class Descriptor(FieldSet):
|
|
"""
|
|
See ExtendedContentDescription class.
|
|
"""
|
|
TYPE_BYTE_ARRAY = 1
|
|
TYPE_NAME = {
|
|
0: "Unicode",
|
|
1: "Byte array",
|
|
2: "BOOL (32 bits)",
|
|
3: "DWORD (32 bits)",
|
|
4: "QWORD (64 bits)",
|
|
5: "WORD (16 bits)"
|
|
}
|
|
def createFields(self):
|
|
yield PascalString16(self, "name", "Name", charset="UTF-16-LE", strip="\0")
|
|
yield Enum(UInt16(self, "type"), self.TYPE_NAME)
|
|
yield UInt16(self, "value_length")
|
|
type = self["type"].value
|
|
size = self["value_length"].value
|
|
name = "value"
|
|
if type == 0 and (size % 2) == 0:
|
|
yield String(self, name, size, charset="UTF-16-LE", strip="\0")
|
|
elif type in (2, 3):
|
|
yield UInt32(self, name)
|
|
elif type == 4:
|
|
yield UInt64(self, name)
|
|
else:
|
|
yield RawBytes(self, name, size)
|
|
|
|
class ExtendedContentDescription(FieldSet):
|
|
guid = "D2D0A440-E307-11D2-97F0-00A0C95EA850"
|
|
def createFields(self):
|
|
yield UInt16(self, "count")
|
|
for index in xrange(self["count"].value):
|
|
yield Descriptor(self, "descriptor[]")
|
|
|
|
class Codec(FieldSet):
|
|
"""
|
|
See CodecList class.
|
|
"""
|
|
type_name = {
|
|
1: "video",
|
|
2: "audio"
|
|
}
|
|
def createFields(self):
|
|
yield Enum(UInt16(self, "type"), self.type_name)
|
|
yield UInt16(self, "name_len", "Name length in character (byte=len*2)")
|
|
if self["name_len"].value:
|
|
yield String(self, "name", self["name_len"].value*2, "Name", charset="UTF-16-LE", strip=" \0")
|
|
yield UInt16(self, "desc_len", "Description length in character (byte=len*2)")
|
|
if self["desc_len"].value:
|
|
yield String(self, "desc", self["desc_len"].value*2, "Description", charset="UTF-16-LE", strip=" \0")
|
|
yield UInt16(self, "info_len")
|
|
if self["info_len"].value:
|
|
yield RawBytes(self, "info", self["info_len"].value)
|
|
|
|
class CodecList(FieldSet):
|
|
guid = "86D15240-311D-11D0-A3A4-00A0C90348F6"
|
|
|
|
def createFields(self):
|
|
yield GUID(self, "reserved[]")
|
|
yield UInt32(self, "count")
|
|
for index in xrange(self["count"].value):
|
|
yield Codec(self, "codec[]")
|
|
|
|
class SimpleIndexEntry(FieldSet):
|
|
"""
|
|
See SimpleIndex class.
|
|
"""
|
|
def createFields(self):
|
|
yield UInt32(self, "pckt_number")
|
|
yield UInt16(self, "pckt_count")
|
|
|
|
class SimpleIndex(FieldSet):
|
|
guid = "33000890-E5B1-11CF-89F4-00A0C90349CB"
|
|
|
|
def createFields(self):
|
|
yield GUID(self, "file_id")
|
|
yield TimedeltaWin64(self, "entry_interval")
|
|
yield UInt32(self, "max_pckt_count")
|
|
yield UInt32(self, "entry_count")
|
|
for index in xrange(self["entry_count"].value):
|
|
yield SimpleIndexEntry(self, "entry[]")
|
|
|
|
class BitRate(FieldSet):
|
|
"""
|
|
See BitRateList class.
|
|
"""
|
|
def createFields(self):
|
|
yield Bits(self, "stream_index", 7)
|
|
yield PaddingBits(self, "reserved", 9)
|
|
yield displayHandler(UInt32(self, "avg_bitrate"), humanBitRate)
|
|
|
|
class BitRateList(FieldSet):
|
|
guid = "7BF875CE-468D-11D1-8D82-006097C9A2B2"
|
|
|
|
def createFields(self):
|
|
yield UInt16(self, "count")
|
|
for index in xrange(self["count"].value):
|
|
yield BitRate(self, "bit_rate[]")
|
|
|
|
class Data(FieldSet):
|
|
guid = "75B22636-668E-11CF-A6D9-00AA0062CE6C"
|
|
|
|
def createFields(self):
|
|
yield GUID(self, "file_id")
|
|
yield UInt64(self, "packet_count")
|
|
yield PaddingBytes(self, "reserved", 2)
|
|
size = (self.size - self.current_size) / 8
|
|
yield RawBytes(self, "data", size)
|
|
|
|
class StreamProperty(FieldSet):
|
|
guid = "B7DC0791-A9B7-11CF-8EE6-00C00C205365"
|
|
def createFields(self):
|
|
yield GUID(self, "type")
|
|
yield GUID(self, "error_correction")
|
|
yield UInt64(self, "time_offset")
|
|
yield UInt32(self, "data_len")
|
|
yield UInt32(self, "error_correct_len")
|
|
yield Bits(self, "stream_index", 7)
|
|
yield Bits(self, "reserved[]", 8)
|
|
yield Bit(self, "encrypted", "Content is encrypted?")
|
|
yield UInt32(self, "reserved[]")
|
|
size = self["data_len"].value
|
|
if size:
|
|
tag = self["type"].value
|
|
if tag in Object.TAG_INFO:
|
|
name, parser = Object.TAG_INFO[tag][0:2]
|
|
yield parser(self, name, size=size*8)
|
|
else:
|
|
yield RawBytes(self, "data", size)
|
|
size = self["error_correct_len"].value
|
|
if size:
|
|
yield RawBytes(self, "error_correct", size)
|
|
|
|
class Object(FieldSet):
|
|
# This list is converted to a dictionnary later where the key is the GUID
|
|
TAG_INFO = (
|
|
("header", Header, "Header object"),
|
|
("file_prop", FileProperty, "File property"),
|
|
("header_ext", HeaderExtension, "Header extension"),
|
|
("codec_list", CodecList, "Codec list"),
|
|
("simple_index", SimpleIndex, "Simple index"),
|
|
("data", Data, "Data object"),
|
|
("stream_prop[]", StreamProperty, "Stream properties"),
|
|
("bit_rates", BitRateList, "Bit rate list"),
|
|
("ext_desc", ExtendedContentDescription, "Extended content description"),
|
|
("metadata", Metadata, "Metadata"),
|
|
("video_header", VideoHeader, "Video"),
|
|
("audio_header", AudioHeader, "Audio"),
|
|
("bitrate_mutex", BitrateMutualExclusion, "Bitrate mutual exclusion"),
|
|
)
|
|
|
|
def __init__(self, *args, **kw):
|
|
FieldSet.__init__(self, *args, **kw)
|
|
|
|
tag = self["guid"].value
|
|
if tag not in self.TAG_INFO:
|
|
self.handler = None
|
|
return
|
|
info = self.TAG_INFO[tag]
|
|
self._name = info[0]
|
|
self.handler = info[1]
|
|
|
|
def createFields(self):
|
|
yield GUID(self, "guid")
|
|
yield filesizeHandler(UInt64(self, "size"))
|
|
|
|
size = self["size"].value - self.current_size/8
|
|
if 0 < size:
|
|
if self.handler:
|
|
yield self.handler(self, "content", size=size*8)
|
|
else:
|
|
yield RawBytes(self, "content", size)
|
|
|
|
tag_info_list = Object.TAG_INFO
|
|
Object.TAG_INFO = dict( (parser[1].guid, parser) for parser in tag_info_list )
|
|
|
|
class AsfFile(Parser):
|
|
MAGIC = "\x30\x26\xB2\x75\x8E\x66\xCF\x11\xA6\xD9\x00\xAA\x00\x62\xCE\x6C"
|
|
PARSER_TAGS = {
|
|
"id": "asf",
|
|
"category": "video",
|
|
"file_ext": ("wmv", "wma", "asf"),
|
|
"mime": (u"video/x-ms-asf", u"video/x-ms-wmv", u"audio/x-ms-wma"),
|
|
"min_size": 24*8,
|
|
"description": "Advanced Streaming Format (ASF), used for WMV (video) and WMA (audio)",
|
|
"magic": ((MAGIC, 0),),
|
|
}
|
|
FILE_TYPE = {
|
|
"video/x-ms-wmv": (".wmv", u"Window Media Video (wmv)"),
|
|
"video/x-ms-asf": (".asf", u"ASF container"),
|
|
"audio/x-ms-wma": (".wma", u"Window Media Audio (wma)"),
|
|
}
|
|
endian = LITTLE_ENDIAN
|
|
|
|
def validate(self):
|
|
magic = self.MAGIC
|
|
if self.stream.readBytes(0, len(magic)) != magic:
|
|
return "Invalid magic"
|
|
header = self[0]
|
|
if not(30 <= header["size"].value <= MAX_HEADER_SIZE):
|
|
return "Invalid header size (%u)" % header["size"].value
|
|
return True
|
|
|
|
def createMimeType(self):
|
|
audio = False
|
|
for prop in self.array("header/content/stream_prop"):
|
|
guid = prop["content/type"].value
|
|
if guid == VideoHeader.guid:
|
|
return u"video/x-ms-wmv"
|
|
if guid == AudioHeader.guid:
|
|
audio = True
|
|
if audio:
|
|
return u"audio/x-ms-wma"
|
|
else:
|
|
return u"video/x-ms-asf"
|
|
|
|
def createFields(self):
|
|
while not self.eof:
|
|
yield Object(self, "object[]")
|
|
|
|
def createDescription(self):
|
|
return self.FILE_TYPE[self.mime_type][1]
|
|
|
|
def createFilenameSuffix(self):
|
|
return self.FILE_TYPE[self.mime_type][0]
|
|
|
|
def createContentSize(self):
|
|
if self[0].name != "header":
|
|
return None
|
|
return self["header/content/file_prop/content/file_size"].value * 8
|
|
|