mirror of
https://github.com/SickGear/SickGear.git
synced 2024-12-18 16:53:38 +00:00
316 lines
10 KiB
Python
316 lines
10 KiB
Python
|
# coding=utf-8
|
||
|
#
|
||
|
# This file is part of SickGear.
|
||
|
#
|
||
|
# SickGear is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# SickGear is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU General Public License
|
||
|
# along with SickGear. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
import datetime
|
||
|
from collections import deque
|
||
|
from itertools import islice
|
||
|
from sys import version_info
|
||
|
|
||
|
from six import binary_type, moves
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from six.moves.urllib.parse import quote, quote_plus, unquote as six_unquote, unquote_plus as six_unquote_plus, \
|
||
|
urlencode, urlsplit, urlunparse, urlunsplit
|
||
|
|
||
|
# noinspection PyUnreachableCode
|
||
|
if False:
|
||
|
# ----------------------
|
||
|
# resolve typing imports
|
||
|
# ----------------------
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from typing import Any, AnyStr, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union
|
||
|
# -------------------
|
||
|
# resolve pyc imports
|
||
|
# -------------------
|
||
|
# noinspection PyTypeChecker
|
||
|
quote = quote_plus = None # type: Callable
|
||
|
# noinspection PyTypeChecker
|
||
|
urlencode = urlsplit = urlunparse = urlunsplit = None # type: Callable
|
||
|
|
||
|
PY38 = version_info[0:2] >= (3, 8)
|
||
|
|
||
|
""" one off consumables (Iterators) """
|
||
|
filter_iter = moves.filter # type: Callable[[Callable, Iterable], Iterator]
|
||
|
map_iter = moves.map # type: Callable[[Callable, ...], Iterator]
|
||
|
|
||
|
|
||
|
def map_consume(*args):
|
||
|
# type: (...) -> None
|
||
|
"""Run a lambda over elements without returning anything"""
|
||
|
deque(moves.map(*args), maxlen=0)
|
||
|
|
||
|
|
||
|
def consume(iterator, n=None):
|
||
|
# type: (Iterator, Optional[int]) -> None
|
||
|
"""Advance the iterator n-steps ahead. If n is None, consume entirely. Returns nothing.
|
||
|
|
||
|
Useful if a method returns a Iterator but it's not used, but still all should be called,
|
||
|
for example if each iter element calls a function that should be called for all or
|
||
|
given amount of elements in Iterator
|
||
|
|
||
|
examples:
|
||
|
consume(filter_iter(...)) # consumes all elements of given function that returns a Iterator
|
||
|
consume(filter_iter(...), 3) # consumes next 3 elements of given function that returns a Iterator
|
||
|
"""
|
||
|
# Use functions that consume iterators at C speed.
|
||
|
if n is None:
|
||
|
# feed the entire iterator into a zero-length deque
|
||
|
deque(iterator, maxlen=0)
|
||
|
else:
|
||
|
# advance to the empty slice starting at position n
|
||
|
next(islice(iterator, n, n), None)
|
||
|
|
||
|
|
||
|
def decode_str(s, encoding='utf-8', errors=None):
|
||
|
# type: (...) -> AnyStr
|
||
|
if isinstance(s, binary_type):
|
||
|
if None is errors:
|
||
|
return s.decode(encoding)
|
||
|
return s.decode(encoding, errors)
|
||
|
return s
|
||
|
|
||
|
|
||
|
def html_unescape(s):
|
||
|
# type: (AnyStr) -> AnyStr
|
||
|
"""helper to remove special character quoting"""
|
||
|
if (3, 4) > version_info:
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from six.moves.html_parser import HTMLParser
|
||
|
# noinspection PyDeprecation
|
||
|
return HTMLParser().unescape(s)
|
||
|
|
||
|
# noinspection PyCompatibility,PyUnresolvedReferences
|
||
|
from html import unescape
|
||
|
return unescape(s)
|
||
|
|
||
|
|
||
|
def list_range(*args, **kwargs):
|
||
|
# type: (...) -> List
|
||
|
return list(moves.range(*args, **kwargs))
|
||
|
|
||
|
|
||
|
def urlparse(url, scheme='', allow_fragments=True):
|
||
|
"""return ParseResult where netloc is populated from path if required, no need to test .netloc anymore"""
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from six.moves.urllib.parse import urlparse as _urlparse, ParseResult
|
||
|
parsed_url = _urlparse(url, scheme, allow_fragments)
|
||
|
if '' != parsed_url.netloc:
|
||
|
return parsed_url
|
||
|
# fix occasional cases where '' == netloc and its data is in parsed_result.path
|
||
|
# noinspection PyArgumentList
|
||
|
fix = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.path, path=url,
|
||
|
params=parsed_url.params, query=parsed_url.query, fragment=parsed_url.fragment)
|
||
|
return fix
|
||
|
|
||
|
|
||
|
def make_btih(s):
|
||
|
from base64 import b16encode, b32decode
|
||
|
return decode_str(b16encode(b32decode(s)))
|
||
|
|
||
|
|
||
|
def b64decodestring(s):
|
||
|
# type: (Union[bytes, AnyStr]) -> AnyStr
|
||
|
from base64 import b64decode
|
||
|
return decode_str(b64decode(s))
|
||
|
|
||
|
|
||
|
def b64encodestring(s, keep_eol=False):
|
||
|
# type: (AnyStr, Union[bool]) -> AnyStr
|
||
|
data = decode_str(b64encodebytes(decode_bytes(s)))
|
||
|
if keep_eol:
|
||
|
return data
|
||
|
return data.rstrip()
|
||
|
|
||
|
|
||
|
if 2 != version_info[0]:
|
||
|
# ---------
|
||
|
# Python 3+
|
||
|
# ---------
|
||
|
# noinspection PyUnresolvedReferences,PyProtectedMember
|
||
|
from base64 import decodebytes, encodebytes
|
||
|
b64decodebytes = decodebytes
|
||
|
b64encodebytes = encodebytes
|
||
|
# noinspection PyUnresolvedReferences,PyCompatibility
|
||
|
from configparser import ConfigParser
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from enum import Enum
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from os import scandir, DirEntry
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from itertools import zip_longest
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from inspect import getfullargspec as getargspec
|
||
|
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from subprocess import Popen
|
||
|
|
||
|
# noinspection PyUnresolvedReferences, PyPep8Naming
|
||
|
import xml.etree.ElementTree as etree
|
||
|
|
||
|
ordered_dict = dict
|
||
|
|
||
|
native_timestamp = datetime.datetime.timestamp # type: Callable[[datetime.datetime], float]
|
||
|
|
||
|
def unquote(string, encoding='utf-8', errors='replace'):
|
||
|
return decode_str(six_unquote(decode_str(string, encoding, errors), encoding=encoding, errors=errors),
|
||
|
encoding, errors)
|
||
|
|
||
|
def unquote_plus(string, encoding='utf-8', errors='replace'):
|
||
|
return decode_str(six_unquote_plus(decode_str(string, encoding, errors), encoding=encoding, errors=errors),
|
||
|
encoding, errors)
|
||
|
|
||
|
def decode_bytes(d, encoding='utf-8', errors='replace'):
|
||
|
if not isinstance(d, binary_type):
|
||
|
# noinspection PyArgumentList
|
||
|
return bytes(d, encoding=encoding, errors=errors)
|
||
|
return d
|
||
|
|
||
|
def filter_list(*args):
|
||
|
# type: (...) -> List
|
||
|
return list(filter(*args))
|
||
|
|
||
|
def list_items(d):
|
||
|
# type: (Dict) -> List[Tuple[Any, Any]]
|
||
|
"""
|
||
|
equivalent to python 2 .items()
|
||
|
"""
|
||
|
return list(d.items())
|
||
|
|
||
|
def list_keys(d):
|
||
|
# type: (Dict) -> List
|
||
|
"""
|
||
|
equivalent to python 2 .keys()
|
||
|
"""
|
||
|
return list(d)
|
||
|
|
||
|
def list_values(d):
|
||
|
# type: (Dict) -> List
|
||
|
"""
|
||
|
equivalent to python 2 .values()
|
||
|
"""
|
||
|
return list(d.values())
|
||
|
|
||
|
def map_list(*args):
|
||
|
# type: (...) -> List
|
||
|
return list(map(*args))
|
||
|
|
||
|
def map_none(*args):
|
||
|
# type: (...) -> List
|
||
|
return list(zip_longest(*args))
|
||
|
|
||
|
def unidecode(data):
|
||
|
# type: (AnyStr) -> AnyStr
|
||
|
return data
|
||
|
|
||
|
else:
|
||
|
# ---------
|
||
|
# Python 2
|
||
|
# ---------
|
||
|
import time
|
||
|
from lib.unidecode import unidecode as unicode_decode
|
||
|
# noinspection PyProtectedMember,PyDeprecation
|
||
|
from base64 import decodestring, encodestring
|
||
|
# noinspection PyDeprecation
|
||
|
b64decodebytes = decodestring
|
||
|
# noinspection PyDeprecation
|
||
|
b64encodebytes = encodestring
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from lib.backports.configparser import ConfigParser
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from lib.enum34 import Enum
|
||
|
# noinspection PyProtectedMember,PyUnresolvedReferences
|
||
|
from lib.scandir.scandir import scandir, GenericDirEntry as DirEntry
|
||
|
# noinspection PyUnresolvedReferences,PyDeprecation
|
||
|
from inspect import getargspec
|
||
|
|
||
|
try:
|
||
|
# noinspection PyPep8Naming
|
||
|
import xml.etree.cElementTree as etree
|
||
|
except ImportError:
|
||
|
# noinspection PyPep8Naming
|
||
|
import xml.etree.ElementTree as etree
|
||
|
|
||
|
from collections import OrderedDict
|
||
|
ordered_dict = OrderedDict
|
||
|
|
||
|
def _totimestamp(dt=None):
|
||
|
# type: (datetime.datetime) -> float
|
||
|
""" This function should only be used in this module due to its 1970s+ limitation as that's all we need here and
|
||
|
sgdatatime can't be used at this module level
|
||
|
"""
|
||
|
return time.mktime(dt.timetuple())
|
||
|
|
||
|
native_timestamp = _totimestamp # type: Callable[[datetime.datetime], float]
|
||
|
|
||
|
from subprocess import Popen as _Popen
|
||
|
|
||
|
class Popen(_Popen):
|
||
|
|
||
|
def __enter__(self):
|
||
|
return self
|
||
|
|
||
|
def __exit__(self, *args, **kwargs):
|
||
|
for x in filter_iter(lambda y: y, [self.stdout, self.stderr, self.stdin]):
|
||
|
x.close()
|
||
|
self.wait()
|
||
|
|
||
|
def unquote(string, encoding='utf-8', errors='replace'):
|
||
|
return decode_str(six_unquote(decode_str(string, encoding, errors)), encoding, errors)
|
||
|
|
||
|
def unquote_plus(string, encoding='utf-8', errors='replace'):
|
||
|
return decode_str(six_unquote_plus(decode_str(string, encoding, errors)), encoding, errors)
|
||
|
|
||
|
# noinspection PyUnusedLocal
|
||
|
def decode_bytes(d, encoding='utf-8', errors='replace'):
|
||
|
if not isinstance(d, binary_type):
|
||
|
return bytes(d)
|
||
|
return d
|
||
|
|
||
|
def filter_list(*args):
|
||
|
# type: (...) -> List
|
||
|
# noinspection PyTypeChecker
|
||
|
return filter(*args)
|
||
|
|
||
|
def list_items(d):
|
||
|
# type: (Dict) -> List[Tuple[Any, Any]]
|
||
|
# noinspection PyTypeChecker
|
||
|
return d.items()
|
||
|
|
||
|
def list_keys(d):
|
||
|
# type: (Dict) -> List
|
||
|
# noinspection PyTypeChecker
|
||
|
return d.keys()
|
||
|
|
||
|
def list_values(d):
|
||
|
# type: (Dict) -> List
|
||
|
# noinspection PyTypeChecker
|
||
|
return d.values()
|
||
|
|
||
|
def map_list(*args):
|
||
|
# type: (...) -> List
|
||
|
# noinspection PyTypeChecker
|
||
|
return map(*args)
|
||
|
|
||
|
def map_none(*args):
|
||
|
# type: (...) -> List
|
||
|
# noinspection PyTypeChecker
|
||
|
return map(None, *args)
|
||
|
|
||
|
def unidecode(data):
|
||
|
# type: (AnyStr) -> AnyStr
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
return isinstance(data, unicode) and unicode_decode(data) or data
|