SickGear/sickgear/classes.py

518 lines
14 KiB
Python
Raw Permalink Normal View History

#
# This file is part of SickGear.
#
# SickGear is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SickGear is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SickGear. If not, see <http://www.gnu.org/licenses/>.
from collections import OrderedDict
import copy
import datetime
import os
import re
import threading
import sickgear
from ._legacy_classes import LegacySearchResult, LegacyProper
from .common import Quality
from six import integer_types, iteritems, string_types
# noinspection PyUnreachableCode
if False:
from typing import Any, AnyStr, Callable, Dict, List, Optional
class SearchResult(LegacySearchResult):
"""
Represents a search result from an indexer.
"""
# type of result (overwritten in subclass)
resultType = 'generic'
def __init__(self, ep_obj_list):
# type: (Optional[List[sickgear.tv.TVEpisode]]) -> None
"""
:param ep_obj_list: list of episode objs
"""
# noinspection PyTypeChecker
self.provider = -1 # type: sickgear.providers.generic.GenericProvider
# release show object
self._show_obj = None
# URL to the NZB/torrent file
self.url = '' # type: AnyStr
# used by some providers to store extra info associated with the result
self.extraInfo = []
# assign function to get the data for the download
self.get_data_func = None # type: Callable or None
# assign function for after getting the download data
self.after_get_data_func = None # type: Callable or None
# list of TVEpisode objects that this result is associated with
self.ep_obj_list = ep_obj_list # type: Optional[List[sickgear.tv.TVEpisode]]
# quality of the release
self.quality = Quality.UNKNOWN # type: int
# release name
self.name = '' # type: AnyStr
# size of the release (-1 = n/a)
self.size = -1 # type: int
# release group
self.release_group = '' # type: AnyStr
# version
self.version = -1 # type: int
# proper level
self._properlevel = 0 # type: int
# is a repack
self.is_repack = False # type: bool
# provider unique id
self.puid = None # type: Any
# path to cache file
self.cache_filepath = '' # type: AnyStr
# priority of result
# -1 = low, 0 = normal, 1 = high
self.priority = 0 # type: int
@property
def show_obj(self):
# type: (...) -> Optional[sickgear.tv.TVShow]
return self._show_obj
@show_obj.setter
def show_obj(self, val):
# type: (sickgear.tv.TVShow) -> None
self._show_obj = val
@property
def properlevel(self):
"""
:rtype: int or long
"""
return self._properlevel
@properlevel.setter
def properlevel(self, v):
"""
:param v: proper level
:type v: int or long
"""
if isinstance(v, integer_types):
self._properlevel = v
def __str__(self):
if None is self.provider:
return 'Invalid provider, unable to print self'
return '\n'.join([
'%s @ %s' % (self.provider.name, self.url),
'Extra Info:',
'\n'.join([' %s' % x for x in self.extraInfo]),
'Episode: %s' % self.ep_obj_list,
'Quality: %s' % Quality.qualityStrings[self.quality],
'Name: %s' % self.name,
'Size: %s' % self.size,
'Release Group: %s' % self.release_group])
def get_data(self):
"""
:return: None or data
:rtype: Any
"""
if None is not self.get_data_func:
try:
return self.get_data_func(self.url)
except (BaseException, Exception):
pass
if self.extraInfo and 0 < len(self.extraInfo):
return self.extraInfo[0]
return None
class NZBSearchResult(SearchResult):
"""
Regular NZB result with a URL to the NZB
"""
resultType = 'nzb'
class NZBDataSearchResult(SearchResult):
"""
NZB result where the actual NZB XML data is stored in the extraInfo
"""
resultType = 'nzbdata'
class TorrentSearchResult(SearchResult):
"""
Torrent result with a URL to the torrent
"""
resultType = 'torrent'
# torrent hash
content = None
hash = None
provider = None # type: sickgear.providers.generic.TorrentProvider
class ShowInfoFilter(object):
def __init__(self, config, log=None):
self.config = config
self.log = log
self.bad_names = [re.compile('(?i)%s' % r) for r in (
r'[*]+\s*(?:403:|do not add|dupli[^s]+\s*(?:\d+|<a\s|[*])|inval)',
r'(?:inval|not? allow(ed)?)(?:[,\s]*period)?\s*[*]',
r'[*]+\s*dupli[^\s*]+\s*[*]+\s*(?:\d+|<a\s)',
r'\s(?:dupli[^s]+\s*(?:\d+|<a\s|[*]))'
)]
def _is_bad_name(self, show_info):
return isinstance(show_info, dict) \
and 'seriesname' in show_info \
and isinstance(show_info['seriesname'], string_types) \
and any(x.search(show_info['seriesname']) for x in self.bad_names)
@staticmethod
def _fix_firstaired(show_info):
if 'firstaired' not in show_info:
show_info['firstaired'] = '1900-01-01'
@staticmethod
def _dict_prevent_none(d, key, default):
v = None
if isinstance(d, dict):
v = d.get(key, default)
return (v, default)[None is v]
@staticmethod
def _fix_seriesname(show_info):
if isinstance(show_info, dict) \
and 'seriesname' in show_info \
and isinstance(show_info['seriesname'], string_types):
show_info['seriesname'] = ShowInfoFilter._dict_prevent_none(show_info, 'seriesname', '').strip()
class AllShowInfosNoFilterListUI(ShowInfoFilter):
"""
This class is for indexer api. Used for searching.
"""
def __init__(self, config, log=None):
super(AllShowInfosNoFilterListUI, self).__init__(config, log)
def select_series(self, all_series):
search_results = []
# get all available shows
if all_series:
for cur_show_info in all_series:
self._fix_seriesname(cur_show_info)
if cur_show_info in search_results or self._is_bad_name(cur_show_info):
continue
self._fix_firstaired(cur_show_info)
if cur_show_info not in search_results:
search_results += [cur_show_info]
return search_results
class Proper(LegacyProper):
def __init__(self, name, url, date, show_obj, parsed_show_obj=None, size=-1, puid=None, **kwargs):
"""
:param name: release name
:type name: AnyStr
:param url: url
:type url: AnyStr
:param date: date
:type date:
:param show_obj: show object or None
:type show_obj: sickgear.tv.TVShow or None
:param parsed_show_obj: parsed show object
:type parsed_show_obj: sickbread.tv.TVShow
:param size: size
:type size: int or long
:param puid: puid
:type puid: AnyStr
:param kwargs:
"""
self.name = name
self.url = url
self.date = date
self.size = size
self.puid = puid
self.provider = None
self.quality = Quality.UNKNOWN
self.release_group = None # type: Optional[AnyStr]
self.version = -1 # type: int
self.parsed_show_obj = parsed_show_obj
self.show_obj = show_obj
self.tvid = None # type: Optional[int]
self.prodid = -1 # type: int
self.season = -1 # type: int
self.episode = -1 # type: int
self.scene_season = -1 # type: int
self.scene_episode = -1 # type: int
super(Proper, self).__init__(**kwargs)
@property
def show_obj(self):
# type: (...) -> Optional[sickgear.tv.TVShow]
return self._show_obj
@show_obj.setter
def show_obj(self, val):
# type: (sickgear.tv.TVShow) -> None
self._show_obj = val
def __str__(self):
if self.show_obj:
prodid = self.show_obj.prodid
tvid = self.show_obj.tvid
elif self.parsed_show_obj:
prodid = self.parsed_show_obj.prodid
tvid = self.parsed_show_obj.tvid
else:
prodid = self.prodid
tvid = self.tvid
return '%s %s %sx%s of %s from %s' % (self.date, self.name, self.season, self.episode, prodid,
sickgear.TVInfoAPI(tvid).name)
class ErrorViewer(object):
"""
Keeps a static list of UIErrors to be displayed on the UI and allows
the list to be cleared.
"""
errors = []
def __init__(self):
ErrorViewer.errors = []
@staticmethod
def add(error):
ErrorViewer.errors.append(error)
@staticmethod
def clear():
ErrorViewer.errors = []
class UIError(object):
"""
Represents an error to be displayed in the web UI.
"""
def __init__(self, message):
self.message = message
self.time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
class OrderedDefaultdict(OrderedDict):
def __init__(self, *args, **kwargs):
if not args:
self.default_factory = None
else:
if not (None is args[0] or callable(args[0])):
raise TypeError('first argument must be callable or None')
self.default_factory = args[0]
args = args[1:]
super(OrderedDefaultdict, self).__init__(*args, **kwargs)
def __missing__(self, key):
if None is self.default_factory:
raise KeyError(key)
self[key] = default = self.default_factory()
return default
def __reduce__(self): # optional, for pickle support
args = (self.default_factory,) if self.default_factory else ()
return self.__class__, args, None, None, iteritems(self)
def first_key(self):
return next(iter(self))
def last_key(self):
return next(reversed(self))
class ImageUrlList(list):
def __init__(self, max_age=30):
"""
:param max_age: max age in days
:type max_age: int
"""
super(ImageUrlList, self).__init__()
self.max_age = max_age
def add_url(self, url):
"""
adds url to list
:param url: url
:type url: AnyStr
"""
self.remove_old()
cache_item = (url, datetime.datetime.now())
for n, x in enumerate(self):
if self._is_cache_item(x) and url == x[0]:
self[n] = cache_item
return
self.append(cache_item)
@staticmethod
def _is_cache_item(item):
return isinstance(item, (tuple, list)) and 2 == len(item)
def remove_old(self):
age_limit = datetime.datetime.now() - datetime.timedelta(minutes=self.max_age)
self[:] = [x for x in self if self._is_cache_item(x) and age_limit < x[1]]
def __repr__(self):
return str([x[0] for x in self if self._is_cache_item(x)])
def __contains__(self, url):
for x in self:
if self._is_cache_item(x) and url == x[0]:
return True
return False
def remove(self, url):
"""
removes url from list
:param url: url
:type url: AnyStr
"""
for x in self:
if self._is_cache_item(x) and url == x[0]:
super(ImageUrlList, self).remove(x)
break
class EnvVar(object):
def __init__(self):
pass
def __getitem__(self, key):
return os.environ[key]
@staticmethod
def get(key, default=None):
return os.environ.get(key, default)
sickgear.ENV = EnvVar()
# backport from python 3
class SimpleNamespace(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __repr__(self):
keys = sorted(self.__dict__)
items = ["{}={!r}".format(k, self.__dict__[k]) for k in keys]
return "{}({})".format(type(self).__name__, ", ".join(items))
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __ne__(self, o):
return not self.__eq__(o)
def __hash__(self):
return hash(tuple(self.__dict__))
# list that supports weak reference
class WeakList(list):
__slots__ = ('__weakref__',)
class LoadingMessage(object):
def __init__(self):
self.lock = threading.Lock()
self._message = [{'msg': 'Loading', 'progress': -1}]
@property
def message(self):
"""
:return: list of messages
:rtype: List[Dict[AnyStr, int]]
"""
with self.lock:
return copy.deepcopy(self._message)
@message.setter
def message(self, msg):
"""
add message to list
:param msg: message
:type msg: AnyStr
"""
with self.lock:
if 0 != len(self._message) and msg != self._message[-1:][0]['msg']:
self._message.append({'msg': msg, 'progress': -1})
def set_msg_progress(self, msg, progress):
"""
add message with progress
:param msg: message
:type msg: AnyStr
:param progress: progress message
:type progress: Any
"""
with self.lock:
for m in self._message:
if msg == m.get('msg'):
m['progress'] = progress
return
self._message.append({'msg': msg, 'progress': progress})
def reset(self, msg=None):
"""
resets message list
:param msg: optional message dict to reset to
:type msg: Dict[AnyStr, int] or None
"""
msg = msg or {'msg': 'Loading', 'progress': -1}
with self.lock:
self._message = [msg]
loading_msg = LoadingMessage()