mirror of
https://github.com/SickGear/SickGear.git
synced 2024-11-22 12:55:05 +00:00
e56303798c
Initial SickGear for Python 3.
645 lines
20 KiB
Python
645 lines
20 KiB
Python
#!/usr/bin/env python
|
|
# coding: utf-8
|
|
|
|
"""
|
|
A .torrent file parser for both Python 2 and 3
|
|
|
|
Usage:
|
|
|
|
data = parse_torrent_file(filename)
|
|
|
|
# or
|
|
|
|
with open(filename, 'rb') as f: # the binary mode 'b' is necessary
|
|
data = TorrentFileParser(f).parse()
|
|
|
|
# then you can edit the data
|
|
|
|
data['announce-list'].append(['http://127.0.0.1:8080'])
|
|
|
|
# and create a new torrent file from data
|
|
|
|
create_torrent_file('new.torrent', data)
|
|
|
|
# or
|
|
|
|
with open('new.torrent', 'wb') as f:
|
|
f.write(TorrentFileCreator(data).encode())
|
|
|
|
# or you don't deal with file, just object in memory
|
|
|
|
data = decode(b'i12345e') # data = 12345
|
|
content = encode(data) # content = b'i12345e'
|
|
|
|
"""
|
|
|
|
from __future__ import print_function, unicode_literals
|
|
|
|
import argparse
|
|
import binascii
|
|
import collections
|
|
import io
|
|
import json
|
|
import sys
|
|
import warnings
|
|
|
|
try:
|
|
FileNotFoundError
|
|
except NameError:
|
|
# Python 2 do not have FileNotFoundError, use IOError instead
|
|
# noinspection PyShadowingBuiltins
|
|
FileNotFoundError = IOError
|
|
|
|
try:
|
|
# noinspection PyPackageRequirements
|
|
from chardet import detect as _detect
|
|
except ImportError:
|
|
def _detect(_):
|
|
warnings.warn("No chardet module installed, encoding will be utf-8")
|
|
return {'encoding': 'utf-8', 'confidence': 1}
|
|
|
|
try:
|
|
# noinspection PyUnresolvedReferences
|
|
# For Python 2
|
|
str_type = unicode
|
|
except NameError:
|
|
# For Python 3
|
|
str_type = str
|
|
|
|
__all__ = [
|
|
'InvalidTorrentDataException',
|
|
'BEncoder',
|
|
'BDecoder',
|
|
'encode',
|
|
'decode',
|
|
'TorrentFileParser',
|
|
'create_torrent_file',
|
|
'parse_torrent_file',
|
|
]
|
|
|
|
__version__ = '0.3.0'
|
|
|
|
|
|
def detect(content):
|
|
return _detect(content)['encoding']
|
|
|
|
|
|
class InvalidTorrentDataException(Exception):
|
|
def __init__(self, pos, msg=None):
|
|
msg = msg or "Invalid torrent format when read at pos {pos}"
|
|
msg = msg.format(pos=pos)
|
|
super(InvalidTorrentDataException, self).__init__(msg)
|
|
|
|
|
|
class __EndCls(object):
|
|
pass
|
|
|
|
|
|
_END = __EndCls()
|
|
|
|
|
|
def _check_hash_field_params(name, value):
|
|
return isinstance(name, str_type) \
|
|
and isinstance(value, tuple) and len(value) == 2 \
|
|
and isinstance(value[0], int) and isinstance(value[1], bool)
|
|
|
|
|
|
class TorrentFileParser(object):
|
|
|
|
TYPE_LIST = 'list'
|
|
TYPE_DICT = 'dict'
|
|
TYPE_INT = 'int'
|
|
TYPE_STRING = 'string'
|
|
TYPE_END = 'end'
|
|
|
|
LIST_INDICATOR = b'l'
|
|
DICT_INDICATOR = b'd'
|
|
INT_INDICATOR = b'i'
|
|
END_INDICATOR = b'e'
|
|
STRING_INDICATOR = b''
|
|
STRING_DELIMITER = b':'
|
|
|
|
HASH_FIELD_PARAMS = {
|
|
# field length need_list
|
|
'pieces': (20, True),
|
|
'ed2k': (16, False),
|
|
'filehash': (20, False),
|
|
}
|
|
|
|
TYPES = [
|
|
(TYPE_LIST, LIST_INDICATOR),
|
|
(TYPE_DICT, DICT_INDICATOR),
|
|
(TYPE_INT, INT_INDICATOR),
|
|
(TYPE_END, END_INDICATOR),
|
|
(TYPE_STRING, STRING_INDICATOR),
|
|
]
|
|
|
|
def __init__(
|
|
self, fp, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
|
hash_fields=None, hash_raw=False,
|
|
):
|
|
"""
|
|
:param fp: a **binary** file-like object to parse,
|
|
which means need 'b' mode when use built-in open function
|
|
:param bool use_ordered_dict: Use collections.OrderedDict as dict
|
|
container default False, which mean use built-in dict
|
|
:param str encoding: file content encoding, default utf-8, use 'auto'
|
|
to enable charset auto detection (need 'chardet' package installed)
|
|
:param str errors: how to deal with encoding error when try to parse
|
|
string from content with ``encoding``
|
|
:param Dict[str, Tuple[int, bool]] hash_fields: extra fields should
|
|
be treated as hash value. dict key is the field name, value is a
|
|
two-element tuple of (hash_block_length, as_a_list).
|
|
See :any:`hash_field` for detail
|
|
"""
|
|
if getattr(fp, 'read', ) is None \
|
|
or getattr(fp, 'seek') is None:
|
|
raise ValueError('Parameter fp needs a file like object')
|
|
|
|
self._pos = 0
|
|
self._encoding = encoding
|
|
self._content = fp
|
|
self._use_ordered_dict = use_ordered_dict
|
|
self._error_handler = errors
|
|
self._hash_fields = dict(TorrentFileParser.HASH_FIELD_PARAMS)
|
|
if hash_fields is not None:
|
|
for k, v in hash_fields.items():
|
|
if _check_hash_field_params(k, v):
|
|
self._hash_fields[k] = v
|
|
else:
|
|
raise ValueError(
|
|
"Invalid hash field parameter, it should be type of "
|
|
"Dict[str, Tuple[int, bool]]"
|
|
)
|
|
self._hash_raw = bool(hash_raw)
|
|
|
|
def hash_field(self, name, block_length=20, need_list=False):
|
|
"""
|
|
Let field with the `name` to be treated as hash value, don't decode it
|
|
as a string.
|
|
|
|
:param str name: field name
|
|
:param int block_length: hash block length for split
|
|
:param bool need_list: if True, when the field only has one block(
|
|
or even empty) its parse result will be a one-element list(
|
|
or empty list); If False, will be a string in 0 or 1 block condition
|
|
:return: return self, so you can chained call
|
|
"""
|
|
v = (block_length, need_list)
|
|
if _check_hash_field_params(name, v):
|
|
self._hash_fields[name] = v
|
|
else:
|
|
raise ValueError("Invalid hash field parameter")
|
|
return self
|
|
|
|
def parse(self):
|
|
"""
|
|
:rtype: dict|list|int|str|bytes
|
|
:raise: :any:`InvalidTorrentDataException` when parse failed or error
|
|
happened when decode string using specified encoding
|
|
"""
|
|
self._restart()
|
|
data = self._next_element()
|
|
|
|
try:
|
|
c = self._read_byte(1, True)
|
|
raise InvalidTorrentDataException(
|
|
0, 'Expect EOF, but get [{}] at pos {}'.format(c, self._pos)
|
|
)
|
|
except EOFError: # expect EOF
|
|
pass
|
|
|
|
return data
|
|
|
|
def _read_byte(self, count=1, raise_eof=False):
|
|
assert count >= 0
|
|
gotten = self._content.read(count)
|
|
if count != 0 and len(gotten) == 0:
|
|
if raise_eof:
|
|
raise EOFError()
|
|
raise InvalidTorrentDataException(
|
|
self._pos,
|
|
'Unexpected EOF when reading torrent file'
|
|
)
|
|
self._pos += count
|
|
return gotten
|
|
|
|
def _seek_back(self, count):
|
|
self._content.seek(-count, 1)
|
|
self._pos = self._pos - count
|
|
|
|
def _restart(self):
|
|
self._content.seek(0, 0)
|
|
self._pos = 0
|
|
|
|
def _dict_items_generator(self):
|
|
while True:
|
|
k = self._next_element()
|
|
if k is _END:
|
|
return
|
|
if not isinstance(k, str_type):
|
|
raise InvalidTorrentDataException(
|
|
self._pos, "Type of dict key can't be " + type(k).__name__
|
|
)
|
|
if k in self._hash_fields:
|
|
v = self._next_hash(*self._hash_fields[k])
|
|
else:
|
|
v = self._next_element(k)
|
|
if k == 'encoding':
|
|
self._encoding = v
|
|
yield k, v
|
|
|
|
def _next_dict(self):
|
|
data = collections.OrderedDict() if self._use_ordered_dict else dict()
|
|
for key, element in self._dict_items_generator():
|
|
data[key] = element
|
|
return data
|
|
|
|
def _list_items_generator(self):
|
|
while True:
|
|
element = self._next_element()
|
|
if element is _END:
|
|
return
|
|
yield element
|
|
|
|
def _next_list(self):
|
|
return [element for element in self._list_items_generator()]
|
|
|
|
def _next_int(self, end=END_INDICATOR):
|
|
value = 0
|
|
char = self._read_byte(1)
|
|
neg = False
|
|
while char != end:
|
|
if not neg and char == b'-':
|
|
neg = True
|
|
elif not b'0' <= char <= b'9':
|
|
raise InvalidTorrentDataException(self._pos - 1)
|
|
else:
|
|
value = value * 10 + int(char) - int(b'0')
|
|
char = self._read_byte(1)
|
|
return -value if neg else value
|
|
|
|
def _next_string(self, need_decode=True, field=None):
|
|
length = self._next_int(self.STRING_DELIMITER)
|
|
raw = self._read_byte(length)
|
|
if need_decode:
|
|
encoding = self._encoding
|
|
if encoding == 'auto':
|
|
self.encoding = encoding = detect(raw)
|
|
try:
|
|
string = raw.decode(encoding, self._error_handler)
|
|
except UnicodeDecodeError as e:
|
|
msg = [
|
|
"Fail to decode string at pos {pos} using encoding ",
|
|
e.encoding
|
|
]
|
|
if field:
|
|
msg.extend([
|
|
' when parser field "', field, '"'
|
|
', maybe it is an hash field. ',
|
|
'You can use self.hash_field("', field, '") ',
|
|
'to let it be treated as hash value, ',
|
|
'so this error may disappear'
|
|
])
|
|
raise InvalidTorrentDataException(
|
|
self._pos - length + e.start,
|
|
''.join(msg)
|
|
)
|
|
return string
|
|
return raw
|
|
|
|
def _next_hash(self, p_len, need_list):
|
|
raw = self._next_string(need_decode=False)
|
|
if len(raw) % p_len != 0:
|
|
raise InvalidTorrentDataException(
|
|
self._pos - len(raw), "Hash bit length not match at pos {pos}"
|
|
)
|
|
if self._hash_raw:
|
|
return raw
|
|
res = [
|
|
binascii.hexlify(chunk).decode('ascii')
|
|
for chunk in (raw[x:x+p_len] for x in range(0, len(raw), p_len))
|
|
]
|
|
if len(res) == 0 and not need_list:
|
|
return ''
|
|
if len(res) == 1 and not need_list:
|
|
return res[0]
|
|
return res
|
|
|
|
@staticmethod
|
|
def _next_end():
|
|
return _END
|
|
|
|
def _next_type(self):
|
|
for (element_type, indicator) in self.TYPES:
|
|
indicator_length = len(indicator)
|
|
char = self._read_byte(indicator_length)
|
|
if indicator == char:
|
|
return element_type
|
|
self._seek_back(indicator_length)
|
|
raise InvalidTorrentDataException(self._pos)
|
|
|
|
def _type_to_func(self, t):
|
|
return getattr(self, '_next_' + t)
|
|
|
|
def _next_element(self, field=None):
|
|
element_type = self._next_type()
|
|
if element_type is TorrentFileParser.TYPE_STRING and field is not None:
|
|
element = self._type_to_func(element_type)(field=field)
|
|
else:
|
|
element = self._type_to_func(element_type)()
|
|
return element
|
|
|
|
|
|
class BEncoder(object):
|
|
|
|
TYPES = {
|
|
(dict,): TorrentFileParser.TYPE_DICT,
|
|
(list,): TorrentFileParser.TYPE_LIST,
|
|
(int,): TorrentFileParser.TYPE_INT,
|
|
(str_type, bytes): TorrentFileParser.TYPE_STRING,
|
|
}
|
|
|
|
def __init__(self, data, encoding='utf-8', hash_fields=None):
|
|
"""
|
|
:param dict|list|int|str data: data will be encoded
|
|
:param str encoding: string field output encoding
|
|
:param List[str] hash_fields: see
|
|
:any:`TorrentFileParser.__init__`
|
|
"""
|
|
self._data = data
|
|
self._encoding = encoding
|
|
self._hash_fields = list(TorrentFileParser.HASH_FIELD_PARAMS.keys())
|
|
if hash_fields is not None:
|
|
self._hash_fields.extend(str_type(hash_fields))
|
|
|
|
def hash_field(self, name):
|
|
"""
|
|
see :any:`TorrentFileParser.hash_field`
|
|
|
|
:param str name:
|
|
:return: return self, so you can chained call
|
|
"""
|
|
return self._hash_fields.append(str_type(name))
|
|
|
|
def encode(self):
|
|
"""
|
|
Encode to bytes
|
|
|
|
:rtype: bytes
|
|
"""
|
|
return b''.join(self._output_element(self._data))
|
|
|
|
def encode_to_filelike(self):
|
|
"""
|
|
Encode to a file-like(BytesIO) object
|
|
|
|
:rtype: BytesIO
|
|
"""
|
|
return io.BytesIO(self.encode())
|
|
|
|
def _output_string(self, data):
|
|
if isinstance(data, str_type):
|
|
data = data.encode(self._encoding)
|
|
yield str(len(data)).encode('ascii')
|
|
yield TorrentFileParser.STRING_DELIMITER
|
|
yield data
|
|
|
|
@staticmethod
|
|
def _output_int(data):
|
|
yield TorrentFileParser.INT_INDICATOR
|
|
yield str(data).encode('ascii')
|
|
yield TorrentFileParser.END_INDICATOR
|
|
|
|
def _output_decode_hash(self, data):
|
|
if isinstance(data, str_type):
|
|
data = [data]
|
|
result = []
|
|
for hash_line in data:
|
|
if not isinstance(hash_line, str_type):
|
|
raise InvalidTorrentDataException(
|
|
None,
|
|
"Hash must be " + str_type.__name__ + " not " +
|
|
type(hash_line).__name__,
|
|
)
|
|
if len(hash_line) % 2 != 0:
|
|
raise InvalidTorrentDataException(
|
|
None,
|
|
"Hash(" + hash_line + ") length(" + str(len(hash_line)) +
|
|
") is a not even number",
|
|
)
|
|
try:
|
|
raw = binascii.unhexlify(hash_line)
|
|
except binascii.Error as e:
|
|
raise InvalidTorrentDataException(
|
|
None, str(e),
|
|
)
|
|
result.append(raw)
|
|
for x in self._output_string(b''.join(result)):
|
|
yield x
|
|
|
|
def _output_dict(self, data):
|
|
yield TorrentFileParser.DICT_INDICATOR
|
|
for k, v in data.items():
|
|
if not isinstance(k, str_type):
|
|
raise InvalidTorrentDataException(
|
|
None, "Dict key must be " + str_type.__name__,
|
|
)
|
|
for x in self._output_element(k):
|
|
yield x
|
|
if k in self._hash_fields:
|
|
for x in self._output_decode_hash(v):
|
|
yield x
|
|
else:
|
|
for x in self._output_element(v):
|
|
yield x
|
|
yield TorrentFileParser.END_INDICATOR
|
|
|
|
def _output_list(self, data):
|
|
yield TorrentFileParser.LIST_INDICATOR
|
|
for v in data:
|
|
for x in self._output_element(v):
|
|
yield x
|
|
yield TorrentFileParser.END_INDICATOR
|
|
|
|
def _type_to_func(self, t):
|
|
return getattr(self, '_output_' + t)
|
|
|
|
def _output_element(self, data):
|
|
for types, t in self.TYPES.items():
|
|
if isinstance(data, types):
|
|
# noinspection PyCallingNonCallable
|
|
return self._type_to_func(t)(data)
|
|
raise InvalidTorrentDataException(
|
|
None,
|
|
"Invalid type for torrent file: " + type(data).__name__,
|
|
)
|
|
|
|
|
|
class BDecoder(object):
|
|
def __init__(
|
|
self, data, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
|
hash_fields=None, hash_raw=False,
|
|
):
|
|
"""
|
|
See :any:`TorrentFileParser.__init__` for parameter description.
|
|
|
|
:param bytes data: raw data to be decoded
|
|
:param bool use_ordered_dict:
|
|
:param str encoding:
|
|
:param str errors:
|
|
:param Dict[str, Tuple[int, bool]] hash_fields:
|
|
:param bool hash_raw:
|
|
"""
|
|
self._parser = TorrentFileParser(
|
|
io.BytesIO(bytes(data)),
|
|
use_ordered_dict,
|
|
encoding,
|
|
errors,
|
|
hash_fields,
|
|
hash_raw,
|
|
)
|
|
|
|
def hash_field(self, name, block_length=20, need_dict=False):
|
|
"""
|
|
See :any:`TorrentFileParser.hash_field` for parameter description
|
|
|
|
:param name:
|
|
:param block_length:
|
|
:param need_dict:
|
|
:return: return self, so you can chained call
|
|
"""
|
|
self._parser.hash_field(name, block_length, need_dict)
|
|
return self
|
|
|
|
def decode(self):
|
|
return self._parser.parse()
|
|
|
|
|
|
def encode(data, encoding='utf-8', hash_fields=None):
|
|
"""
|
|
Shortcut function for encode python object to torrent file format(bencode)
|
|
|
|
See :any:`BEncoder.__init__` for parameter description
|
|
|
|
:param dict|list|int|str|bytes data: data to be encoded
|
|
:param str encoding:
|
|
:param List[str] hash_fields:
|
|
:rtype: bytes
|
|
"""
|
|
return BEncoder(data, encoding, hash_fields).encode()
|
|
|
|
|
|
def decode(
|
|
data, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
|
hash_fields=None, hash_raw=False,
|
|
):
|
|
"""
|
|
Shortcut function for decode bytes as torrent file format(bencode) to python
|
|
object
|
|
|
|
See :any:`BDecoder.__init__` for parameter description
|
|
|
|
:param bytes data: raw data to be decoded
|
|
:param bool use_ordered_dict:
|
|
:param str encoding:
|
|
:param str errors:
|
|
:param Dict[str, Tuple[int, bool]] hash_fields:
|
|
:param bool hash_raw:
|
|
:rtype: dict|list|int|str|bytes|bytes
|
|
"""
|
|
return BDecoder(
|
|
data, use_ordered_dict, encoding, errors, hash_fields, hash_raw,
|
|
).decode()
|
|
|
|
|
|
def parse_torrent_file(
|
|
filename, use_ordered_dict=False, encoding='utf-8', errors='strict',
|
|
hash_fields=None, hash_raw=False,
|
|
):
|
|
"""
|
|
Shortcut function for parse torrent object using TorrentFileParser
|
|
|
|
See :any:`TorrentFileParser.__init__` for parameter description
|
|
|
|
:param str filename: torrent filename
|
|
:param bool use_ordered_dict:
|
|
:param str encoding:
|
|
:param str errors:
|
|
:param Dict[str, Tuple[int, bool]] hash_fields:
|
|
:param bool hash_raw:
|
|
:rtype: dict|list|int|str|bytes
|
|
"""
|
|
with open(filename, 'rb') as f:
|
|
return TorrentFileParser(
|
|
f, use_ordered_dict, encoding, errors, hash_fields, hash_raw,
|
|
).parse()
|
|
|
|
|
|
def create_torrent_file(filename, data, encoding='utf-8', hash_fields=None):
|
|
"""
|
|
Shortcut function for create a torrent file using BEncoder
|
|
|
|
see :any:`BDecoder.__init__` for parameter description
|
|
|
|
:param str filename: output torrent filename
|
|
:param dict|list|int|str|bytes data:
|
|
:param str encoding:
|
|
:param List[str] hash_fields:
|
|
"""
|
|
with open(filename, 'wb') as f:
|
|
f.write(BEncoder(data, encoding, hash_fields).encode())
|
|
|
|
|
|
def __main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('file', nargs='?', default='',
|
|
help='input file, will read form stdin if empty')
|
|
parser.add_argument('--dict', '-d', action='store_true', default=False,
|
|
help='use built-in dict, default will be OrderedDict')
|
|
parser.add_argument('--sort', '-s', action='store_true', default=False,
|
|
help='sort output json item by key')
|
|
parser.add_argument('--indent', '-i', type=int, default=None,
|
|
help='json output indent for every inner level')
|
|
parser.add_argument('--ascii', '-a', action='store_true', default=False,
|
|
help='ensure output json use ascii char, '
|
|
'escape other char use \\u')
|
|
parser.add_argument('--coding', '-c', default='utf-8',
|
|
help='string encoding, default "utf-8"')
|
|
parser.add_argument('--errors', '-e', default='strict',
|
|
help='decoding error handler, default "strict", you can'
|
|
' use "ignore" or "replace" to avoid exception')
|
|
parser.add_argument('--version', '-v', action='store_true', default=False,
|
|
help='print version and exit')
|
|
args = parser.parse_args()
|
|
|
|
if args.version:
|
|
print(__version__)
|
|
exit(0)
|
|
|
|
try:
|
|
if args.file == '':
|
|
target_file = io.BytesIO(
|
|
getattr(sys.stdin, 'buffer', sys.stdin).read()
|
|
)
|
|
else:
|
|
target_file = open(args.file, 'rb')
|
|
except FileNotFoundError:
|
|
sys.stderr.write('File "{}" not exist\n'.format(args.file))
|
|
exit(1)
|
|
|
|
# noinspection PyUnboundLocalVariable
|
|
data = TorrentFileParser(
|
|
target_file, not args.dict, args.coding, args.errors
|
|
).parse()
|
|
|
|
data = json.dumps(
|
|
data, ensure_ascii=args.ascii,
|
|
sort_keys=args.sort, indent=args.indent
|
|
)
|
|
|
|
print(data)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
__main()
|