SickGear/lib/hachoir/parser/audio/mpeg_audio.py
JackDandy 980e05cc99 Change Hachoir can't support PY2 so backport their PY3 to prevent a need for system dependant external binaries like mediainfo.
Backported 400 revisions from rev 1de4961-8897c5b (2018-2014).
Move core/benchmark, core/cmd_line, core/memory, core/profiler and core/timeout to core/optional/*
Remove metadata/qt*

PORT: Version 2.0a3 (inline with 3.0a3 @ f80c7d5).
Basic Support for XMP Packets.
tga: improvements to adhere more closely to the spec.
pdf: slightly improved parsing.
rar: fix TypeError on unknown block types.
Add MacRoman win32 codepage.
tiff/exif: support SubIFDs and tiled images.
Add method to export metadata in dictionary.
mpeg_video: don't attempt to parse Stream past length.
mpeg_video: parse ESCR correctly, add SCR value.
Change centralise CustomFragments.
field: don't set parser class if class is None, to enable autodetect.
field: add value/display for CustomFragment.
parser: inline warning to enable tracebacks in debug mode.
Fix empty bytestrings in makePrintable.
Fix contentSize in jpeg.py to account for image_data blocks.
Fix the ELF parser.
Enhance the AR archive parser.
elf parser: fix wrong wrong fields order in parsing little endian section flags.
elf parser: add s390 as a machine type.
Flesh out mp4 parser.

PORT: Version 2.0a1 (inline with 3.0a1).
Major refactoring and PEP8.
Fix ResourceWarning warnings on files. Add a close() method and support for the context manager protocol ("with obj: ...") to parsers, input and output streams.
metadata: get comment from ZIP.
Support for InputIOStream.read(0).
Fix sizeGe when size is None.
Remove unused new_seekable_field_set file.
Remove parser Mapsforge .map.
Remove parser Parallel Realities Starfighter .pak files.
sevenzip: fix for newer archives.
java: update access flags and modifiers for Java 1.7 and update description text for most recent Java.
Support ustar prefix field in tar archives.
Remove file_system* parsers.
Remove misc parsers 3d0, 3ds, gnome_keyring, msoffice*, mstask, ole*, word*.
Remove program parsers macho, nds, prc.
Support non-8bit Character subclasses.
Python parser supports Python 3.7.
Enhance mpeg_ts parser to support MTS/M2TS.
Support for creation date in tiff.
Change don't hardcode errno constant.

PORT: 1.9.1
Internal Only: The following are legacy reference to upstream commit messages.
Relevant changes up to b0a115f8.
Use integer division.
Replace HACHOIR_ERRORS with Exception.
Fix metadata.Data: make it sortable.
Import fixes from e7de492.
PORT: Version 2.0a1 (inline with 3.0a1 @ e9f8fad).
Replace hachoir.core.field with hachoir.field
Replace hachoir.core.stream with hachoir.stream
Remove the compatibility module for PY1.5 to PY2.5.
metadata: support TIFF picture.
metadata: fix string normalization.
metadata: fix datetime regex Fix hachoir bug #57.
FileFromInputStream: fix comparison between None and an int.
InputIOStream: open the file in binary mode.
2018-03-28 00:43:11 +01:00

410 lines
14 KiB
Python

"""
MPEG audio file parser.
Creation: 12 decembre 2005
Author: Victor Stinner
"""
from hachoir.parser import Parser
from hachoir.field import (FieldSet,
MissingField, ParserError, createOrphanField,
Bit, Bits, Enum,
PaddingBits, PaddingBytes,
RawBytes)
from hachoir.parser.audio.id3 import ID3v1, ID3v2
from hachoir.core.endian import BIG_ENDIAN
from hachoir.core.tools import humanFrequency, humanBitSize
from hachoir.core.bits import long2raw
from hachoir.stream import InputStreamError
# Max MP3 filesize: 200 MB
MAX_FILESIZE = 200 * 1024 * 1024 * 8
class Frame(FieldSet):
VERSION_NAME = {0: "2.5", 2: "2", 3: "1"}
MPEG_I = 3
MPEG_II = 2
MPEG_II_5 = 0
LAYER_NAME = {1: "III", 2: "II", 3: "I"}
LAYER_I = 3
LAYER_II = 2
LAYER_III = 1
# Bit rates (bit_rate * 1000 = bits/sec)
# key 15 is always invalid
BIT_RATES = {
1: ( # MPEG1
(0, 32, 64, 96, 128, 160, 192, 224, 256, 288, 320, 352, 384, 416, 448), # layer I
(0, 32, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320, 384), # layer II
(0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320), # layer III
# - 1 2 3 4 5 6 7 8 9 10 11 12 13 14 -
),
2: ( # MPEG2 / MPEG2.5
(0, 32, 48, 56, 64, 80, 96, 112, 128, 144, 160, 176, 192, 224, 256), # layer I
(0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160), # layer II
(0, 8, 16, 24, 32, 40, 48, 56, 64, 80, 96, 112, 128, 144, 160), # layer III
# - 1 2 3 4 5 6 7 8 9 10 11 12 13 14 -
)
}
SAMPLING_RATES = {
3: {0: 44100, 1: 48000, 2: 32000}, # MPEG1
2: {0: 22050, 1: 24000, 2: 16000}, # MPEG2
0: {0: 11025, 1: 12000, 2: 8000} # MPEG2.5
}
EMPHASIS_NAME = {0: "none", 1: "50/15 ms", 3: "CCIT J.17"}
CHANNEL_MODE_NAME = {
0: "Stereo",
1: "Joint stereo",
2: "Dual channel",
3: "Single channel"
}
# Channel mode => number of channels
NB_CHANNEL = {
0: 2,
1: 2,
2: 2,
3: 1,
}
def __init__(self, *args, **kw):
FieldSet.__init__(self, *args, **kw)
if not self._size:
frame_size = self.getFrameSize()
if not frame_size:
raise ParserError("MPEG audio: Invalid frame %s" % self.path)
self._size = min(frame_size * 8, self.parent.size - self.address)
def createFields(self):
# Header
yield PaddingBits(self, "sync", 11, "Synchronize bits (set to 1)", pattern=1)
yield Enum(Bits(self, "version", 2, "MPEG audio version"), self.VERSION_NAME)
yield Enum(Bits(self, "layer", 2, "MPEG audio layer"), self.LAYER_NAME)
yield Bit(self, "crc16", "No CRC16 protection?")
# Rates and padding
yield Bits(self, "bit_rate", 4, "Bit rate")
yield Bits(self, "sampling_rate", 2, "Sampling rate")
yield Bit(self, "use_padding", "Stream field use padding?")
yield Bit(self, "extension", "Extension")
# Channel mode, mode extension, copyright, ...
yield Enum(Bits(self, "channel_mode", 2, "Channel mode"), self.CHANNEL_MODE_NAME)
yield Bits(self, "mode_ext", 2, "Mode extension")
yield Bit(self, "copyright", "Is copyrighted?")
yield Bit(self, "original", "Is original?")
yield Enum(Bits(self, "emphasis", 2, "Emphasis"), self.EMPHASIS_NAME)
size = (self.size - self.current_size) // 8
if size:
yield RawBytes(self, "data", size)
def isValid(self):
return (self["layer"].value != 0
and self["sync"].value == 2047
and self["version"].value != 1
and self["sampling_rate"].value != 3
and self["bit_rate"].value not in (0, 15)
and self["emphasis"].value != 2)
def getSampleRate(self):
"""
Read sampling rate. Returns None on error.
"""
version = self["version"].value
rate = self["sampling_rate"].value
try:
return self.SAMPLING_RATES[version][rate]
except (KeyError, IndexError):
return None
def getBitRate(self):
"""
Read bit rate in bit/sec. Returns None on error.
"""
layer = 3 - self["layer"].value
bit_rate = self["bit_rate"].value
if bit_rate in (0, 15):
return None
if self["version"].value == 3:
dataset = self.BIT_RATES[1] # MPEG1
else:
dataset = self.BIT_RATES[2] # MPEG2 / MPEG2.5
try:
return dataset[layer][bit_rate] * 1000
except (KeyError, IndexError):
return None
def getFrameSize(self):
"""
Read frame size in bytes. Returns None on error.
"""
frame_size = self.getBitRate()
if not frame_size:
return None
sample_rate = self.getSampleRate()
if not sample_rate:
return None
padding = int(self["use_padding"].value)
if self["layer"].value == self.LAYER_III:
if self["version"].value == self.MPEG_I:
return (frame_size * 144) // sample_rate + padding
else:
return (frame_size * 72) // sample_rate + padding
elif self["layer"].value == self.LAYER_II:
return (frame_size * 144) // sample_rate + padding
else: # self.LAYER_I:
frame_size = (frame_size * 12) // sample_rate
return (frame_size + padding) * 4
def getNbChannel(self):
return self.NB_CHANNEL[self["channel_mode"].value]
def createDescription(self):
info = ["layer %s" % self["layer"].display]
bit_rate = self.getBitRate()
if bit_rate:
info.append("%s/sec" % humanBitSize(bit_rate))
sampling_rate = self.getSampleRate()
if sampling_rate:
info.append(humanFrequency(sampling_rate))
return "MPEG-%s %s" % (self["version"].display, ", ".join(info))
def findSynchronizeBits(parser, start, max_size):
"""
Find synchronisation bits (11 bits set to 1)
Returns None on error, or number of bytes before the synchronization.
"""
address0 = parser.absolute_address
end = start + max_size
size = 0
while start < end:
# Fast search: search 0xFF (first byte of sync frame field)
length = parser.stream.searchBytesLength("\xff", False, start, end)
if length is None:
return None
size += length
start += length * 8
# Strong validation of frame: create the frame
# and call method isValid()
try:
frame = createOrphanField(parser, start - address0, Frame, "frame")
valid = frame.isValid()
except Exception:
valid = False
if valid:
return size
# Invalid frame: continue
start += 8
size += 1
return None
class Frames(FieldSet):
# Padding bytes allowed before a frame
MAX_PADDING = 256
def synchronize(self):
addr = self.absolute_address
start = addr + self.current_size
end = min(start + self.MAX_PADDING * 8, addr + self.size)
padding = findSynchronizeBits(self, start, end)
if padding is None:
raise ParserError("MPEG audio: Unable to find synchronization bits")
if padding:
return PaddingBytes(self, "padding[]", padding, "Padding before synchronization")
else:
return None
def looksConstantBitRate(self, count=10):
"""
Guess if frames are constant bit rate. If it returns False, you can
be sure that frames are variable bit rate. Otherwise, it looks like
constant bit rate (on first count fields).
"""
check_keys = ("version", "layer", "bit_rate")
last_field = None
for index, field in enumerate(self.array("frame")):
if last_field:
for key in check_keys:
if field[key].value != last_field[key].value:
return False
last_field = field
if index == count:
break
return True
def createFields(self):
# Find synchronisation bytes
padding = self.synchronize()
if padding:
yield padding
while self.current_size < self.size:
yield Frame(self, "frame[]")
# padding = self.synchronize()
# if padding:
# yield padding
# Read raw bytes at the end (if any)
size = (self.size - self.current_size) // 8
if size:
yield RawBytes(self, "raw", size)
def createDescription(self):
if self.looksConstantBitRate():
text = "(looks like) Constant bit rate (CBR)"
else:
text = "Variable bit rate (VBR)"
return "Frames: %s" % text
def createMpegAudioMagic():
# ID3v1 magic
magics = [("TAG", 0)]
# ID3v2 magics
for ver_major in ID3v2.VALID_MAJOR_VERSIONS:
magic = "ID3%c\x00" % ver_major
magics.append((magic, 0))
# MPEG frame magic
# TODO: Use longer magic: 32 bits instead of 16 bits
SYNC_BITS = 2047
for version in Frame.VERSION_NAME.iterkeys():
for layer in Frame.LAYER_NAME.iterkeys():
for crc16 in (0, 1):
magic = (SYNC_BITS << 5) | (version << 3) | (layer << 1) | crc16
magic = long2raw(magic, BIG_ENDIAN, 2)
magics.append((magic, 0))
return magics
class MpegAudioFile(Parser):
PARSER_TAGS = {
"id": "mpeg_audio",
"category": "audio",
"file_ext": ("mpa", "mp1", "mp2", "mp3"),
"mime": (u"audio/mpeg",),
"min_size": 4 * 8,
# "magic": createMpegAudioMagic(),
"description": "MPEG audio version 1, 2, 2.5",
"subfile": "skip",
}
endian = BIG_ENDIAN
def validate(self):
if self[0].name in ("id3v2", "id3v1"):
return True
if not self.stream.checked: # TODO: is it possible to handle piped input?
return False
# Validate first 5 frames
for index in xrange(5):
try:
frame = self["frames/frame[%u]" % index]
except MissingField:
# Require a least one valid frame
if (1 <= index) \
and self["frames"].done:
return True
return "Unable to get frame #%u" % index
except (InputStreamError, ParserError):
return "Unable to create frame #%u" % index
# Check first frame values
if not frame.isValid():
return "Frame #%u is invalid" % index
# Check that all frames are similar
if not index:
frame0 = frame
else:
if frame0["channel_mode"].value != frame["channel_mode"].value:
return "Frame #%u channel mode is different" % index
return True
def createFields(self):
# Read ID3v2 (if any)
if self.stream.readBytes(0, 3) == "ID3":
yield ID3v2(self, "id3v2")
if self._size is None: # TODO: is it possible to handle piped input?
raise NotImplementedError
# Check if file is ending with ID3v1 or not and compute frames size
frames_size = self.size - self.current_size
addr = self.size - 128 * 8
if 0 <= addr:
has_id3 = (self.stream.readBytes(addr, 3) == "TAG")
if has_id3:
frames_size -= 128 * 8
else:
has_id3 = False
# Read frames (if any)
if frames_size:
yield Frames(self, "frames", size=frames_size)
# Read ID3v1 (if any)
if has_id3:
yield ID3v1(self, "id3v1")
def createDescription(self):
if "frames" in self:
frame = self["frames/frame[0]"]
return "%s, %s" % (frame.description, frame["channel_mode"].display)
elif "id3v2" in self:
return self["id3v2"].description
elif "id3v1" in self:
return self["id3v1"].description
else:
return "MPEG audio"
def createContentSize(self):
# Get "frames" field
field = self[0]
if field.name != "frames":
try:
field = self[1]
except MissingField:
# File only contains ID3v1 or ID3v2
return field.size
# Error: second field are not the frames"?
if field.name != "frames":
return None
# Go to last frame
frames = field
frame = frames["frame[0]"]
address0 = field.absolute_address
size = address0 + frame.size
while True:
try:
# Parse one MPEG audio frame
frame = createOrphanField(frames, size - address0, Frame, "frame")
# Check frame 32 bits header
if not frame.isValid():
break
except Exception:
break
if MAX_FILESIZE < (size + frame.size):
break
size += frame.size
# ID3v1 at the end?
try:
if self.stream.readBytes(size, 3) == "TAG":
size += ID3v1.static_size
except InputStreamError:
pass
return size