mirror of
https://github.com/SickGear/SickGear.git
synced 2024-11-15 01:15:05 +00:00
e56303798c
Initial SickGear for Python 3.
348 lines
9.3 KiB
Python
348 lines
9.3 KiB
Python
# The contents of this file are subject to the BitTorrent Open Source License
|
|
# Version 1.1 (the License). You may not copy or use this file, in either
|
|
# source code or executable form, except in compliance with the License. You
|
|
# may obtain a copy of the License at http://www.bittorrent.com/license/.
|
|
#
|
|
# Software distributed under the License is distributed on an AS IS basis,
|
|
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
|
|
# for the specific language governing rights and limitations under the
|
|
# License.
|
|
|
|
# Written by Petru Paler
|
|
|
|
"""bencode.py - bencode encoder + decoder."""
|
|
|
|
from .BTL import BTFailure
|
|
from .exceptions import BencodeDecodeError
|
|
|
|
from collections import deque
|
|
import sys
|
|
|
|
try:
|
|
from typing import Dict, List, Tuple, Deque, Union, TextIO, BinaryIO, Any
|
|
except ImportError:
|
|
Dict = List = Tuple = Deque = Union = TextIO = BinaryIO = Any = None
|
|
|
|
try:
|
|
from collections import OrderedDict
|
|
except ImportError:
|
|
OrderedDict = None
|
|
|
|
try:
|
|
import pathlib
|
|
except ImportError:
|
|
pathlib = None
|
|
|
|
__all__ = (
|
|
'BTFailure',
|
|
'BencodeDecodeError',
|
|
'bencode',
|
|
'bdecode',
|
|
'bread',
|
|
'bwrite',
|
|
'encode',
|
|
'decode'
|
|
)
|
|
|
|
|
|
def decode_int(x, f):
|
|
# type: (bytes, int) -> Tuple[int, int]
|
|
f += 1
|
|
newf = x.index(b'e', f)
|
|
n = int(x[f:newf])
|
|
|
|
if x[f:f + 1] == b'-':
|
|
if x[f + 1:f + 2] == b'0':
|
|
raise ValueError
|
|
elif x[f:f + 1] == b'0' and newf != f + 1:
|
|
raise ValueError
|
|
|
|
return n, newf + 1
|
|
|
|
|
|
def decode_string(x, f, try_decode_utf8=True, force_decode_utf8=False):
|
|
# type: (bytes, int, bool, bool) -> Tuple[bytes, int]
|
|
"""Decode torrent bencoded 'string' in x starting at f.
|
|
|
|
An attempt is made to convert the string to a python string from utf-8.
|
|
However, both string and non-string binary data is intermixed in the
|
|
torrent bencoding standard. So we have to guess whether the byte
|
|
sequence is a string or just binary data. We make this guess by trying
|
|
to decode (from utf-8), and if that fails, assuming it is binary data.
|
|
There are some instances where the data SHOULD be a string though.
|
|
You can check enforce this by setting force_decode_utf8 to True. If the
|
|
decoding from utf-8 fails, an UnidcodeDecodeError is raised. Similarly,
|
|
if you know it should not be a string, you can skip the decoding
|
|
attempt by setting try_decode_utf8=False.
|
|
"""
|
|
colon = x.index(b':', f)
|
|
n = int(x[f:colon])
|
|
|
|
if x[f:f + 1] == b'0' and colon != f + 1:
|
|
raise ValueError
|
|
|
|
colon += 1
|
|
s = x[colon:colon + n]
|
|
|
|
if try_decode_utf8:
|
|
try:
|
|
return s.decode('utf-8'), colon + n
|
|
except UnicodeDecodeError:
|
|
if force_decode_utf8:
|
|
raise
|
|
|
|
return bytes(s), colon + n
|
|
|
|
|
|
def decode_list(x, f):
|
|
# type: (bytes, int) -> Tuple[List, int]
|
|
r, f = [], f + 1
|
|
|
|
while x[f:f + 1] != b'e':
|
|
v, f = decode_func[x[f:f + 1]](x, f)
|
|
r.append(v)
|
|
|
|
return r, f + 1
|
|
|
|
|
|
def decode_dict_py26(x, f):
|
|
# type: (bytes, int) -> Tuple[Dict[str, Any], int]
|
|
r, f = {}, f + 1
|
|
|
|
while x[f] != 'e':
|
|
k, f = decode_string(x, f)
|
|
r[k], f = decode_func[x[f]](x, f)
|
|
|
|
return r, f + 1
|
|
|
|
|
|
def decode_dict(x, f, force_sort=True):
|
|
# type: (bytes, int, bool) -> Tuple[OrderedDict[str, Any], int]
|
|
"""Decode bencoded data to an OrderedDict.
|
|
|
|
The BitTorrent standard states that:
|
|
Keys must be strings and appear in sorted order (sorted as raw
|
|
strings, not alphanumerics)
|
|
- http://www.bittorrent.org/beps/bep_0003.html
|
|
|
|
Therefore, this function will force the keys to be strings (decoded
|
|
from utf-8), and by default the keys are (re)sorted after reading.
|
|
Set force_sort to False to keep the order of the dictionary as
|
|
represented in x, as many other encoders and decoders do not force this
|
|
property.
|
|
"""
|
|
|
|
r, f = OrderedDict(), f + 1
|
|
|
|
while x[f:f + 1] != b'e':
|
|
k, f = decode_string(x, f, force_decode_utf8=True)
|
|
r[k], f = decode_func[x[f:f + 1]](x, f)
|
|
|
|
if force_sort:
|
|
r = OrderedDict(sorted(r.items()))
|
|
|
|
return r, f + 1
|
|
|
|
|
|
# noinspection PyDictCreation
|
|
decode_func = {}
|
|
decode_func[b'l'] = decode_list
|
|
decode_func[b'i'] = decode_int
|
|
decode_func[b'0'] = decode_string
|
|
decode_func[b'1'] = decode_string
|
|
decode_func[b'2'] = decode_string
|
|
decode_func[b'3'] = decode_string
|
|
decode_func[b'4'] = decode_string
|
|
decode_func[b'5'] = decode_string
|
|
decode_func[b'6'] = decode_string
|
|
decode_func[b'7'] = decode_string
|
|
decode_func[b'8'] = decode_string
|
|
decode_func[b'9'] = decode_string
|
|
|
|
if sys.version_info[0] == 2 and sys.version_info[1] == 6:
|
|
decode_func[b'd'] = decode_dict_py26
|
|
else:
|
|
decode_func[b'd'] = decode_dict
|
|
|
|
|
|
def bdecode(value):
|
|
# type: (bytes) -> Union[Tuple, List, OrderedDict, bool, int, str, bytes]
|
|
"""
|
|
Decode bencode formatted byte string ``value``.
|
|
|
|
:param value: Bencode formatted string
|
|
:type value: bytes
|
|
|
|
:return: Decoded value
|
|
:rtype: object
|
|
"""
|
|
try:
|
|
r, l = decode_func[value[0:1]](value, 0)
|
|
except (IndexError, KeyError, TypeError, ValueError):
|
|
raise BencodeDecodeError("not a valid bencoded string")
|
|
|
|
if l != len(value):
|
|
raise BencodeDecodeError("invalid bencoded value (data after valid prefix)")
|
|
|
|
return r
|
|
|
|
|
|
class Bencached(object):
|
|
__slots__ = ['bencoded']
|
|
|
|
def __init__(self, s):
|
|
self.bencoded = s
|
|
|
|
|
|
def encode_bencached(x, r):
|
|
# type: (Bencached, Deque[bytes]) -> None
|
|
r.append(x.bencoded)
|
|
|
|
|
|
def encode_int(x, r):
|
|
# type: (int, Deque[bytes]) -> None
|
|
r.extend((b'i', str(x).encode('utf-8'), b'e'))
|
|
|
|
|
|
def encode_bool(x, r):
|
|
# type: (bool, Deque[bytes]) -> None
|
|
if x:
|
|
encode_int(1, r)
|
|
else:
|
|
encode_int(0, r)
|
|
|
|
|
|
def encode_bytes(x, r):
|
|
# type: (bytes, Deque[bytes]) -> None
|
|
r.extend((str(len(x)).encode('utf-8'), b':', x))
|
|
|
|
|
|
def encode_string(x, r):
|
|
# type: (str, Deque[bytes]) -> None
|
|
try:
|
|
s = x.encode('utf-8')
|
|
except UnicodeDecodeError:
|
|
encode_bytes(x, r)
|
|
return
|
|
|
|
r.extend((str(len(s)).encode('utf-8'), b':', s))
|
|
|
|
|
|
def encode_list(x, r):
|
|
# type: (List, Deque[bytes]) -> None
|
|
r.append(b'l')
|
|
|
|
for i in x:
|
|
encode_func[type(i)](i, r)
|
|
|
|
r.append(b'e')
|
|
|
|
|
|
def encode_dict(x, r):
|
|
# type: (Dict, Deque[bytes]) -> None
|
|
r.append(b'd')
|
|
ilist = list(x.items())
|
|
ilist.sort()
|
|
|
|
for k, v in ilist:
|
|
k = k.encode('utf-8')
|
|
r.extend((str(len(k)).encode('utf-8'), b':', k))
|
|
encode_func[type(v)](v, r)
|
|
|
|
r.append(b'e')
|
|
|
|
|
|
# noinspection PyDictCreation
|
|
encode_func = {}
|
|
encode_func[Bencached] = encode_bencached
|
|
|
|
if sys.version_info[0] == 2:
|
|
from types import DictType, IntType, ListType, LongType, StringType, TupleType, UnicodeType
|
|
|
|
encode_func[DictType] = encode_dict
|
|
encode_func[IntType] = encode_int
|
|
encode_func[ListType] = encode_list
|
|
encode_func[LongType] = encode_int
|
|
encode_func[StringType] = encode_string
|
|
encode_func[TupleType] = encode_list
|
|
encode_func[UnicodeType] = encode_string
|
|
|
|
if OrderedDict is not None:
|
|
encode_func[OrderedDict] = encode_dict
|
|
|
|
try:
|
|
from types import BooleanType
|
|
|
|
encode_func[BooleanType] = encode_bool
|
|
except ImportError:
|
|
pass
|
|
else:
|
|
encode_func[OrderedDict] = encode_dict
|
|
encode_func[bool] = encode_bool
|
|
encode_func[dict] = encode_dict
|
|
encode_func[int] = encode_int
|
|
encode_func[list] = encode_list
|
|
encode_func[str] = encode_string
|
|
encode_func[tuple] = encode_list
|
|
encode_func[bytes] = encode_bytes
|
|
|
|
|
|
def bencode(value):
|
|
# type: (Union[Tuple, List, OrderedDict, Dict, bool, int, str, bytes]) -> bytes
|
|
"""
|
|
Encode ``value`` into the bencode format.
|
|
|
|
:param value: Value
|
|
:type value: object
|
|
|
|
:return: Bencode formatted string
|
|
:rtype: str
|
|
"""
|
|
r = deque() # makes more sense for something with lots of appends
|
|
|
|
# Encode provided value
|
|
encode_func[type(value)](value, r)
|
|
|
|
# Join parts
|
|
return b''.join(r)
|
|
|
|
|
|
# Method proxies (for compatibility with other libraries)
|
|
decode = bdecode
|
|
encode = bencode
|
|
|
|
|
|
def bread(fd):
|
|
# type: (Union[bytes, str, pathlib.Path, pathlib.PurePath, TextIO, BinaryIO]) -> bytes
|
|
"""Return bdecoded data from filename, file, or file-like object.
|
|
|
|
if fd is a bytes/string or pathlib.Path-like object, it is opened and
|
|
read, otherwise .read() is used. if read() not available, exception
|
|
raised.
|
|
"""
|
|
if isinstance(fd, (bytes, str)):
|
|
with open(fd, 'rb') as fd:
|
|
return bdecode(fd.read())
|
|
elif pathlib is not None and isinstance(fd, (pathlib.Path, pathlib.PurePath)):
|
|
with open(str(fd), 'rb') as fd:
|
|
return bdecode(fd.read())
|
|
else:
|
|
return bdecode(fd.read())
|
|
|
|
|
|
def bwrite(data, fd):
|
|
# type: (Union[Tuple, List, OrderedDict, Dict, bool, int, str, bytes], Union[bytes, str, pathlib.Path, pathlib.PurePath, TextIO, BinaryIO]) -> None
|
|
"""Write data in bencoded form to filename, file, or file-like object.
|
|
|
|
if fd is bytes/string or pathlib.Path-like object, it is opened and
|
|
written to, otherwise .write() is used. if write() is not available,
|
|
exception raised.
|
|
"""
|
|
if isinstance(fd, (bytes, str)):
|
|
with open(fd, 'wb') as fd:
|
|
fd.write(bencode(data))
|
|
elif pathlib is not None and isinstance(fd, (pathlib.Path, pathlib.PurePath)):
|
|
with open(str(fd), 'wb') as fd:
|
|
fd.write(bencode(data))
|
|
else:
|
|
fd.write(bencode(data))
|