mirror of
https://github.com/SickGear/SickGear.git
synced 2024-12-05 02:43:37 +00:00
408 lines
13 KiB
Python
408 lines
13 KiB
Python
"""
|
|
MPEG audio file parser.
|
|
|
|
Creation: 12 decembre 2005
|
|
Author: Victor Stinner
|
|
"""
|
|
|
|
from hachoir_parser import Parser
|
|
from hachoir_core.field import (FieldSet,
|
|
MissingField, ParserError, createOrphanField,
|
|
Bit, Bits, Enum,
|
|
PaddingBits, PaddingBytes,
|
|
RawBytes)
|
|
from hachoir_parser.audio.id3 import ID3v1, ID3v2
|
|
from hachoir_core.endian import BIG_ENDIAN
|
|
from hachoir_core.tools import humanFrequency, humanBitSize
|
|
from hachoir_core.bits import long2raw
|
|
from hachoir_core.error import HACHOIR_ERRORS
|
|
from hachoir_core.stream import InputStreamError
|
|
|
|
# Max MP3 filesize: 200 MB
|
|
MAX_FILESIZE = 200*1024*1024*8
|
|
|
|
class Frame(FieldSet):
|
|
VERSION_NAME = { 0: "2.5", 2: "2", 3: "1" }
|
|
MPEG_I = 3
|
|
MPEG_II = 2
|
|
MPEG_II_5 = 0
|
|
|
|
LAYER_NAME = { 1: "III", 2: "II", 3: "I" }
|
|
LAYER_I = 3
|
|
LAYER_II = 2
|
|
LAYER_III = 1
|
|
|
|
# Bit rates (bit_rate * 1000 = bits/sec)
|
|
# key 15 is always invalid
|
|
BIT_RATES = {
|
|
1: ( # MPEG1
|
|
( 0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448 ), # layer I
|
|
( 0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384 ), # layer II
|
|
( 0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320 ), # layer III
|
|
# - 1 2 3 4 5 6 7 8 9 10 11 12 13 14 -
|
|
),
|
|
2: ( # MPEG2 / MPEG2.5
|
|
( 0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256 ), # layer I
|
|
( 0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160 ), # layer II
|
|
( 0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160 ), # layer III
|
|
# - 1 2 3 4 5 6 7 8 9 10 11 12 13 14 -
|
|
)
|
|
}
|
|
SAMPLING_RATES = {
|
|
3: {0: 44100, 1: 48000, 2: 32000}, # MPEG1
|
|
2: {0: 22050, 1: 24000, 2: 16000}, # MPEG2
|
|
0: {0: 11025, 1: 12000, 2: 8000} # MPEG2.5
|
|
}
|
|
EMPHASIS_NAME = {0: "none", 1: "50/15 ms", 3: "CCIT J.17"}
|
|
CHANNEL_MODE_NAME = {
|
|
0: "Stereo",
|
|
1: "Joint stereo",
|
|
2: "Dual channel",
|
|
3: "Single channel"
|
|
}
|
|
# Channel mode => number of channels
|
|
NB_CHANNEL = {
|
|
0: 2,
|
|
1: 2,
|
|
2: 2,
|
|
3: 1,
|
|
}
|
|
|
|
def __init__(self, *args, **kw):
|
|
FieldSet.__init__(self, *args, **kw)
|
|
if not self._size:
|
|
frame_size = self.getFrameSize()
|
|
if not frame_size:
|
|
raise ParserError("MPEG audio: Invalid frame %s" % self.path)
|
|
self._size = min(frame_size * 8, self.parent.size - self.address)
|
|
|
|
def createFields(self):
|
|
# Header
|
|
yield PaddingBits(self, "sync", 11, "Synchronize bits (set to 1)", pattern=1)
|
|
yield Enum(Bits(self, "version", 2, "MPEG audio version"), self.VERSION_NAME)
|
|
yield Enum(Bits(self, "layer", 2, "MPEG audio layer"), self.LAYER_NAME)
|
|
yield Bit(self, "crc16", "No CRC16 protection?")
|
|
|
|
# Rates and padding
|
|
yield Bits(self, "bit_rate", 4, "Bit rate")
|
|
yield Bits(self, "sampling_rate", 2, "Sampling rate")
|
|
yield Bit(self, "use_padding", "Stream field use padding?")
|
|
yield Bit(self, "extension", "Extension")
|
|
|
|
# Channel mode, mode extension, copyright, ...
|
|
yield Enum(Bits(self, "channel_mode", 2, "Channel mode"), self.CHANNEL_MODE_NAME)
|
|
yield Bits(self, "mode_ext", 2, "Mode extension")
|
|
yield Bit(self, "copyright", "Is copyrighted?")
|
|
yield Bit(self, "original", "Is original?")
|
|
yield Enum(Bits(self, "emphasis", 2, "Emphasis"), self.EMPHASIS_NAME)
|
|
|
|
size = (self.size - self.current_size) / 8
|
|
if size:
|
|
yield RawBytes(self, "data", size)
|
|
|
|
def isValid(self):
|
|
return (self["layer"].value != 0
|
|
and self["sync"].value == 2047
|
|
and self["version"].value != 1
|
|
and self["sampling_rate"].value != 3
|
|
and self["bit_rate"].value not in (0, 15)
|
|
and self["emphasis"].value != 2)
|
|
|
|
def getSampleRate(self):
|
|
"""
|
|
Read sampling rate. Returns None on error.
|
|
"""
|
|
version = self["version"].value
|
|
rate = self["sampling_rate"].value
|
|
try:
|
|
return self.SAMPLING_RATES[version][rate]
|
|
except (KeyError, IndexError):
|
|
return None
|
|
|
|
def getBitRate(self):
|
|
"""
|
|
Read bit rate in bit/sec. Returns None on error.
|
|
"""
|
|
layer = 3 - self["layer"].value
|
|
bit_rate = self["bit_rate"].value
|
|
if bit_rate in (0, 15):
|
|
return None
|
|
if self["version"].value == 3:
|
|
dataset = self.BIT_RATES[1] # MPEG1
|
|
else:
|
|
dataset = self.BIT_RATES[2] # MPEG2 / MPEG2.5
|
|
try:
|
|
return dataset[layer][bit_rate] * 1000
|
|
except (KeyError, IndexError):
|
|
return None
|
|
|
|
def getFrameSize(self):
|
|
"""
|
|
Read frame size in bytes. Returns None on error.
|
|
"""
|
|
frame_size = self.getBitRate()
|
|
if not frame_size:
|
|
return None
|
|
sample_rate = self.getSampleRate()
|
|
if not sample_rate:
|
|
return None
|
|
padding = int(self["use_padding"].value)
|
|
|
|
if self["layer"].value == self.LAYER_III:
|
|
if self["version"].value == self.MPEG_I:
|
|
return (frame_size * 144) // sample_rate + padding
|
|
else:
|
|
return (frame_size * 72) // sample_rate + padding
|
|
elif self["layer"].value == self.LAYER_II:
|
|
return (frame_size * 144) / sample_rate + padding
|
|
else: # self.LAYER_I:
|
|
frame_size = (frame_size * 12) / sample_rate
|
|
return (frame_size + padding) * 4
|
|
|
|
def getNbChannel(self):
|
|
return self.NB_CHANNEL[ self["channel_mode"].value ]
|
|
|
|
def createDescription(self):
|
|
info = ["layer %s" % self["layer"].display]
|
|
bit_rate = self.getBitRate()
|
|
if bit_rate:
|
|
info.append("%s/sec" % humanBitSize(bit_rate))
|
|
sampling_rate = self.getSampleRate()
|
|
if sampling_rate:
|
|
info.append(humanFrequency(sampling_rate))
|
|
return "MPEG-%s %s" % (self["version"].display, ", ".join(info))
|
|
|
|
def findSynchronizeBits(parser, start, max_size):
|
|
"""
|
|
Find synchronisation bits (11 bits set to 1)
|
|
|
|
Returns None on error, or number of bytes before the synchronization.
|
|
"""
|
|
address0 = parser.absolute_address
|
|
end = start + max_size
|
|
size = 0
|
|
while start < end:
|
|
# Fast search: search 0xFF (first byte of sync frame field)
|
|
length = parser.stream.searchBytesLength("\xff", False, start, end)
|
|
if length is None:
|
|
return None
|
|
size += length
|
|
start += length * 8
|
|
|
|
# Strong validation of frame: create the frame
|
|
# and call method isValid()
|
|
try:
|
|
frame = createOrphanField(parser, start-address0, Frame, "frame")
|
|
valid = frame.isValid()
|
|
except HACHOIR_ERRORS:
|
|
valid = False
|
|
if valid:
|
|
return size
|
|
|
|
# Invalid frame: continue
|
|
start += 8
|
|
size += 1
|
|
return None
|
|
|
|
class Frames(FieldSet):
|
|
# Padding bytes allowed before a frame
|
|
MAX_PADDING = 256
|
|
|
|
def synchronize(self):
|
|
addr = self.absolute_address
|
|
start = addr + self.current_size
|
|
end = min(start + self.MAX_PADDING*8, addr + self.size)
|
|
padding = findSynchronizeBits(self, start, end)
|
|
if padding is None:
|
|
raise ParserError("MPEG audio: Unable to find synchronization bits")
|
|
if padding:
|
|
return PaddingBytes(self, "padding[]", padding, "Padding before synchronization")
|
|
else:
|
|
return None
|
|
|
|
def looksConstantBitRate(self, count=10):
|
|
"""
|
|
Guess if frames are constant bit rate. If it returns False, you can
|
|
be sure that frames are variable bit rate. Otherwise, it looks like
|
|
constant bit rate (on first count fields).
|
|
"""
|
|
check_keys = ("version", "layer", "bit_rate")
|
|
last_field = None
|
|
for index, field in enumerate(self.array("frame")):
|
|
if last_field:
|
|
for key in check_keys:
|
|
if field[key].value != last_field[key].value:
|
|
return False
|
|
last_field = field
|
|
if index == count:
|
|
break
|
|
return True
|
|
|
|
def createFields(self):
|
|
# Find synchronisation bytes
|
|
padding = self.synchronize()
|
|
if padding:
|
|
yield padding
|
|
|
|
while self.current_size < self.size:
|
|
yield Frame(self, "frame[]")
|
|
# padding = self.synchronize()
|
|
# if padding:
|
|
# yield padding
|
|
|
|
# Read raw bytes at the end (if any)
|
|
size = (self.size - self.current_size) / 8
|
|
if size:
|
|
yield RawBytes(self, "raw", size)
|
|
|
|
def createDescription(self):
|
|
if self.looksConstantBitRate():
|
|
text = "(looks like) Constant bit rate (CBR)"
|
|
else:
|
|
text = "Variable bit rate (VBR)"
|
|
return "Frames: %s" % text
|
|
|
|
def createMpegAudioMagic():
|
|
|
|
# ID3v1 magic
|
|
magics = [("TAG", 0)]
|
|
|
|
# ID3v2 magics
|
|
for ver_major in ID3v2.VALID_MAJOR_VERSIONS:
|
|
magic = "ID3%c\x00" % ver_major
|
|
magics.append( (magic,0) )
|
|
|
|
# MPEG frame magic
|
|
# TODO: Use longer magic: 32 bits instead of 16 bits
|
|
SYNC_BITS = 2047
|
|
for version in Frame.VERSION_NAME.iterkeys():
|
|
for layer in Frame.LAYER_NAME.iterkeys():
|
|
for crc16 in (0, 1):
|
|
magic = (SYNC_BITS << 5) | (version << 3) | (layer << 1) | crc16
|
|
magic = long2raw(magic, BIG_ENDIAN, 2)
|
|
magics.append( (magic, 0) )
|
|
return magics
|
|
|
|
class MpegAudioFile(Parser):
|
|
PARSER_TAGS = {
|
|
"id": "mpeg_audio",
|
|
"category": "audio",
|
|
"file_ext": ("mpa", "mp1", "mp2", "mp3"),
|
|
"mime": (u"audio/mpeg",),
|
|
"min_size": 4*8,
|
|
# "magic": createMpegAudioMagic(),
|
|
"description": "MPEG audio version 1, 2, 2.5",
|
|
"subfile": "skip",
|
|
}
|
|
endian = BIG_ENDIAN
|
|
|
|
def validate(self):
|
|
if self[0].name in ("id3v2", "id3v1"):
|
|
return True
|
|
|
|
if not self.stream.checked: # TODO: is it possible to handle piped input?
|
|
return False
|
|
|
|
# Validate first 5 frames
|
|
for index in xrange(5):
|
|
try:
|
|
frame = self["frames/frame[%u]" % index]
|
|
except MissingField:
|
|
# Require a least one valid frame
|
|
if (1 <= index) \
|
|
and self["frames"].done:
|
|
return True
|
|
return "Unable to get frame #%u" % index
|
|
except (InputStreamError, ParserError):
|
|
return "Unable to create frame #%u" % index
|
|
|
|
# Check first frame values
|
|
if not frame.isValid():
|
|
return "Frame #%u is invalid" % index
|
|
|
|
# Check that all frames are similar
|
|
if not index:
|
|
frame0 = frame
|
|
else:
|
|
if frame0["channel_mode"].value != frame["channel_mode"].value:
|
|
return "Frame #%u channel mode is different" % index
|
|
return True
|
|
|
|
def createFields(self):
|
|
# Read ID3v2 (if any)
|
|
if self.stream.readBytes(0, 3) == "ID3":
|
|
yield ID3v2(self, "id3v2")
|
|
|
|
if self._size is None: # TODO: is it possible to handle piped input?
|
|
raise NotImplementedError
|
|
|
|
# Check if file is ending with ID3v1 or not and compute frames size
|
|
frames_size = self.size - self.current_size
|
|
addr = self.size - 128*8
|
|
if 0 <= addr:
|
|
has_id3 = (self.stream.readBytes(addr, 3) == "TAG")
|
|
if has_id3:
|
|
frames_size -= 128*8
|
|
else:
|
|
has_id3 = False
|
|
|
|
# Read frames (if any)
|
|
if frames_size:
|
|
yield Frames(self, "frames", size=frames_size)
|
|
|
|
# Read ID3v1 (if any)
|
|
if has_id3:
|
|
yield ID3v1(self, "id3v1")
|
|
|
|
def createDescription(self):
|
|
if "frames" in self:
|
|
frame = self["frames/frame[0]"]
|
|
return "%s, %s" % (frame.description, frame["channel_mode"].display)
|
|
elif "id3v2" in self:
|
|
return self["id3v2"].description
|
|
elif "id3v1" in self:
|
|
return self["id3v1"].description
|
|
else:
|
|
return "MPEG audio"
|
|
|
|
def createContentSize(self):
|
|
# Get "frames" field
|
|
field = self[0]
|
|
if field.name != "frames":
|
|
try:
|
|
field = self[1]
|
|
except MissingField:
|
|
# File only contains ID3v1 or ID3v2
|
|
return field.size
|
|
|
|
# Error: second field are not the frames"?
|
|
if field.name != "frames":
|
|
return None
|
|
|
|
# Go to last frame
|
|
frames = field
|
|
frame = frames["frame[0]"]
|
|
address0 = field.absolute_address
|
|
size = address0 + frame.size
|
|
while True:
|
|
try:
|
|
# Parse one MPEG audio frame
|
|
frame = createOrphanField(frames, size - address0, Frame, "frame")
|
|
|
|
# Check frame 32 bits header
|
|
if not frame.isValid():
|
|
break
|
|
except HACHOIR_ERRORS:
|
|
break
|
|
if MAX_FILESIZE < (size + frame.size):
|
|
break
|
|
size += frame.size
|
|
|
|
# ID3v1 at the end?
|
|
try:
|
|
if self.stream.readBytes(size, 3) == "TAG":
|
|
size += ID3v1.static_size
|
|
except InputStreamError:
|
|
pass
|
|
return size
|
|
|