mirror of
https://github.com/SickGear/SickGear.git
synced 2024-11-15 09:25:04 +00:00
ae4173e8c0
Change use specific logger functions for debug, warning, error.
471 lines
18 KiB
Python
471 lines
18 KiB
Python
#
|
|
# This file is part of SickGear.
|
|
#
|
|
# SickGear is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# SickGear is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with SickGear. If not, see <http://www.gnu.org/licenses/>.
|
|
import datetime
|
|
import fnmatch
|
|
import os
|
|
import copy
|
|
import re
|
|
|
|
from exceptions_helper import ex
|
|
|
|
import sickgear
|
|
from . import common, db, logger
|
|
from .helpers import sanitize_scene_name
|
|
from .name_parser.parser import InvalidNameException, InvalidShowException, NameParser
|
|
from .scene_exceptions import get_scene_exceptions
|
|
from sg_helpers import scantree
|
|
|
|
from _23 import quote_plus
|
|
from six import iterkeys, itervalues
|
|
|
|
# noinspection PyUnreachableCode
|
|
if False:
|
|
from typing import AnyStr, List, Optional, Set, Union
|
|
from .tv import TVShow
|
|
# noinspection PyUnresolvedReferences
|
|
from re import Pattern
|
|
|
|
|
|
def pass_wordlist_checks(name, # type: AnyStr
|
|
parse=True, # type: bool
|
|
indexer_lookup=True, # type: bool
|
|
show_obj=None # type: TVShow
|
|
): # type: (...) -> bool
|
|
"""
|
|
Filters out non-english and just all-around stupid releases by comparing
|
|
the word list contents at boundaries or the end of name.
|
|
|
|
:param name: the release name to check
|
|
:type name: basestring
|
|
:param parse: parse release name
|
|
:type parse: bool
|
|
:param indexer_lookup: use indexer lookup during paring
|
|
:type indexer_lookup: bool
|
|
:param show_obj: TVShow object
|
|
:type show_obj: TVShow
|
|
|
|
:return: True if the release name is OK, False if it's bad.
|
|
:rtype: bool
|
|
"""
|
|
|
|
if parse:
|
|
err_msg = f'Unable to parse the filename {name} into a valid '
|
|
try:
|
|
NameParser(indexer_lookup=indexer_lookup).parse(name)
|
|
except InvalidNameException:
|
|
logger.debug(err_msg + 'episode')
|
|
return False
|
|
except InvalidShowException:
|
|
logger.debug(err_msg + 'show')
|
|
return False
|
|
|
|
word_list = {'sub(bed|ed|pack|s)', '(dk|fin|heb|kor|nor|nordic|pl|swe)sub(bed|ed|s)?',
|
|
'(dir|sample|sub|nfo)fix', 'sample', '(dvd)?extras',
|
|
'dub(bed)?'}
|
|
|
|
# if any of the bad strings are in the name then say no
|
|
if sickgear.IGNORE_WORDS:
|
|
word_list.update(sickgear.IGNORE_WORDS)
|
|
|
|
req_word_list = copy.copy(sickgear.REQUIRE_WORDS)
|
|
|
|
result = None
|
|
if show_obj:
|
|
if show_obj.rls_ignore_words and isinstance(show_obj.rls_ignore_words, set):
|
|
if sickgear.IGNORE_WORDS_REGEX == show_obj.rls_ignore_words_regex:
|
|
word_list.update(show_obj.rls_ignore_words)
|
|
else:
|
|
result = contains_any(name, show_obj.rls_ignore_words, rx=show_obj.rls_ignore_words_regex)
|
|
if show_obj.rls_global_exclude_ignore and isinstance(show_obj.rls_global_exclude_ignore, set):
|
|
word_list = word_list - show_obj.rls_global_exclude_ignore
|
|
|
|
result = result or contains_any(name, word_list, rx=sickgear.IGNORE_WORDS_REGEX)
|
|
if None is not result and result:
|
|
logger.debug(f'Ignored: {name} for containing ignore word')
|
|
return False
|
|
|
|
result = None
|
|
if show_obj:
|
|
if show_obj.rls_require_words and isinstance(show_obj.rls_require_words, set):
|
|
result = not contains_any(name, show_obj.rls_require_words, rx=show_obj.rls_require_words_regex)
|
|
|
|
if show_obj.rls_global_exclude_require and isinstance(show_obj.rls_global_exclude_require, set):
|
|
req_word_list = req_word_list - show_obj.rls_global_exclude_require
|
|
|
|
# if any of the good strings aren't in the name then say no
|
|
result = result or not_contains_any(name, req_word_list, rx=sickgear.REQUIRE_WORDS_REGEX)
|
|
if None is not result and result:
|
|
logger.debug(f'Ignored: {name} for not containing required word match')
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def not_contains_any(subject, # type: AnyStr
|
|
lookup_words, # type: Union[AnyStr, Set[AnyStr]]
|
|
rx=None,
|
|
**kwargs
|
|
): # type: (...) -> bool
|
|
|
|
return contains_any(subject, lookup_words, invert=True, rx=rx, **kwargs)
|
|
|
|
|
|
def contains_any(subject, # type: AnyStr
|
|
lookup_words, # type: Union[AnyStr, Set[AnyStr]]
|
|
invert=False, # type: bool
|
|
rx=None,
|
|
**kwargs
|
|
): # type: (...) -> Optional[bool]
|
|
"""
|
|
Check if subject does or does not contain a match from a list or string of regular expression lookup words
|
|
|
|
:param subject: word to test existence of
|
|
:type subject: basestring
|
|
:param lookup_words: List or comma separated string of words to search
|
|
:type lookup_words: Union(list, set, basestring)
|
|
:param re_prefix: insert string to all lookup words
|
|
:type re_prefix: basestring
|
|
:param re_suffix: append string to all lookup words
|
|
:type re_suffix: basestring
|
|
:param invert: invert function logic "contains any" into "does not contain any"
|
|
:type invert: bool
|
|
:param rx: lookup_words are regex
|
|
:type rx: Union(NoneType, bool)
|
|
|
|
:return: None if no checking was done. True for first match found, or if invert is False,
|
|
:param lookup_words: List or comma separated string of words to search
|
|
:param invert: invert function logic "contains any" into "does not contain any"
|
|
:param kwargs:
|
|
:return: None if no checking was done. True for first match found, or if invert is False,
|
|
then True for first pattern that does not match, or False
|
|
:rtype: Union(NoneType, bool)
|
|
"""
|
|
compiled_words = compile_word_list(lookup_words, rx=rx, **kwargs)
|
|
if subject and compiled_words:
|
|
for rc_filter in compiled_words:
|
|
match = rc_filter.search(subject)
|
|
if (match and not invert) or (not match and invert):
|
|
msg = match and not invert and 'Found match' or ''
|
|
msg = not match and invert and 'No match found' or msg
|
|
logger.debug(f'{msg} from pattern: {rc_filter.pattern} in text: {subject} ')
|
|
return True
|
|
return False
|
|
return None
|
|
|
|
|
|
def compile_word_list(lookup_words, # type: Union[AnyStr, Set[AnyStr]]
|
|
re_prefix=r'(^|[\W_])', # type: AnyStr
|
|
re_suffix=r'($|[\W_])', # type: AnyStr
|
|
rx=None
|
|
): # type: (...) -> List[Pattern[AnyStr]]
|
|
|
|
result = []
|
|
if lookup_words:
|
|
if None is rx:
|
|
search_raw = isinstance(lookup_words, list)
|
|
if not search_raw:
|
|
# noinspection PyUnresolvedReferences
|
|
search_raw = not lookup_words.startswith('regex:')
|
|
# noinspection PyUnresolvedReferences
|
|
lookup_words = lookup_words[(6, 0)[search_raw]:].split(',')
|
|
lookup_words = [x.strip() for x in lookup_words if x.strip()]
|
|
else:
|
|
search_raw = not rx
|
|
for word in lookup_words:
|
|
try:
|
|
# !0 == regex and subject = s / 'what\'s the "time"' / what\'s\ the\ \"time\"
|
|
subject = search_raw and re.escape(word) or re.sub(r'([\" \'])', r'\\\1', word)
|
|
result.append(re.compile('(?i)%s%s%s' % (re_prefix, subject, re_suffix)))
|
|
except re.error as e:
|
|
logger.debug(f'Failure to compile filter expression: {word} ... Reason: {ex(e)}')
|
|
|
|
diff = len(lookup_words) - len(result)
|
|
if diff:
|
|
logger.debug(f'From {len(lookup_words)} expressions, {diff} was discarded during compilation')
|
|
|
|
return result
|
|
|
|
|
|
def url_encode(show_names, spacer='.'):
|
|
# type: (List[AnyStr], AnyStr) -> List[AnyStr]
|
|
"""
|
|
|
|
:param show_names: show name
|
|
:param spacer: spacer
|
|
:return:
|
|
"""
|
|
return [quote_plus(n.replace('.', spacer).encode('utf-8', errors='replace')) for n in show_names]
|
|
|
|
|
|
def get_show_names(ep_obj, spacer='.'):
|
|
# type: (sickgear.tv.TVEpisode, AnyStr) -> List[AnyStr]
|
|
"""
|
|
|
|
:param ep_obj: episode object
|
|
:param spacer: spacer
|
|
:return:
|
|
"""
|
|
return get_show_names_all_possible(ep_obj.show_obj, season=ep_obj.season, spacer=spacer, force_anime=True)
|
|
|
|
|
|
def get_show_names_all_possible(show_obj, season=-1, scenify=True, spacer='.', force_anime=False):
|
|
# type: (sickgear.tv.TVShow, int, bool, AnyStr, bool) -> List[AnyStr]
|
|
"""
|
|
|
|
:param show_obj: show object
|
|
:param season: season
|
|
:param scenify:
|
|
:param spacer: spacer
|
|
:param force_anime:
|
|
:return:
|
|
"""
|
|
show_names = list(set(
|
|
all_possible_show_names(show_obj, season=season, force_anime=force_anime))) # type: List[AnyStr]
|
|
if scenify:
|
|
show_names = list(map(sanitize_scene_name, show_names))
|
|
return url_encode(show_names, spacer)
|
|
|
|
|
|
def make_scene_season_search_string(show_obj, # type: sickgear.tv.TVShow
|
|
ep_obj, # type: sickgear.tv.TVEpisode
|
|
ignore_allowlist=False, # type: bool
|
|
extra_search_type=None
|
|
): # type: (...) -> List[AnyStr]
|
|
"""
|
|
|
|
:param show_obj: show object
|
|
:param ep_obj: episode object
|
|
:param ignore_allowlist:
|
|
:param extra_search_type:
|
|
:return: list of search strings
|
|
"""
|
|
if show_obj.air_by_date or show_obj.sports:
|
|
numseasons = 0
|
|
|
|
# the search string for air by date shows is just
|
|
season_strings = [str(ep_obj.airdate).split('-')[0]]
|
|
elif show_obj.is_anime:
|
|
numseasons = 0
|
|
ep_obj_list = show_obj.get_all_episodes(ep_obj.season)
|
|
|
|
# get show qualities
|
|
any_qualities, best_qualities = common.Quality.split_quality(show_obj.quality)
|
|
|
|
# compile a list of all the episode numbers we need in this 'season'
|
|
season_strings = []
|
|
for episode in ep_obj_list:
|
|
|
|
# get quality of the episode
|
|
cur_composite_status = episode.status
|
|
cur_status, cur_quality = common.Quality.split_composite_status(cur_composite_status)
|
|
|
|
if best_qualities:
|
|
highest_best_quality = max(best_qualities)
|
|
else:
|
|
highest_best_quality = 0
|
|
|
|
# if we need a better one then add it to the list of episodes to fetch
|
|
if (cur_status in (
|
|
common.DOWNLOADED,
|
|
common.SNATCHED) and cur_quality < highest_best_quality) or cur_status == common.WANTED:
|
|
ab_number = episode.scene_absolute_number
|
|
if 0 < ab_number:
|
|
season_strings.append("%02d" % ab_number)
|
|
|
|
else:
|
|
my_db = db.DBConnection()
|
|
sql_result = my_db.select(
|
|
'SELECT COUNT(DISTINCT season) AS numseasons'
|
|
' FROM tv_episodes'
|
|
' WHERE indexer = ? AND showid = ?'
|
|
' AND season != 0',
|
|
[show_obj.tvid, show_obj.prodid])
|
|
|
|
numseasons = int(sql_result[0][0])
|
|
season_strings = ["S%02d" % int(ep_obj.scene_season)]
|
|
|
|
show_names = get_show_names_all_possible(show_obj, ep_obj.scene_season)
|
|
|
|
to_return = []
|
|
|
|
# search each show name
|
|
for cur_name in show_names:
|
|
# most providers all work the same way
|
|
if not extra_search_type:
|
|
# if there's only one season then we can just use the show name straight up
|
|
if 1 == numseasons:
|
|
to_return.append(cur_name)
|
|
# for providers that don't allow multiple searches in one request we only search for Sxx style stuff
|
|
else:
|
|
for cur_season in season_strings:
|
|
if not ignore_allowlist and show_obj.is_anime \
|
|
and None is not show_obj.release_groups and show_obj.release_groups.allowlist:
|
|
for keyword in show_obj.release_groups.allowlist:
|
|
|
|
to_return.append(keyword + '.' + cur_name + "." + cur_season)
|
|
else:
|
|
to_return.append(cur_name + "." + cur_season)
|
|
|
|
return to_return
|
|
|
|
|
|
def make_scene_search_string(show_obj, # type: sickgear.tv.TVShow
|
|
ep_obj, # type: sickgear.tv.TVEpisode
|
|
ignore_allowlist=False # type: bool
|
|
): # type: (...) -> List[AnyStr]
|
|
"""
|
|
|
|
:param show_obj: show object
|
|
:param ep_obj: episode object
|
|
:param ignore_allowlist:
|
|
:return: list or search strings
|
|
"""
|
|
my_db = db.DBConnection()
|
|
sql_result = my_db.select(
|
|
'SELECT COUNT(DISTINCT season) AS numseasons'
|
|
' FROM tv_episodes'
|
|
' WHERE indexer = ? AND showid = ? AND season != 0',
|
|
[show_obj.tvid, show_obj.prodid])
|
|
num_seasons = int(sql_result[0][0])
|
|
|
|
# see if we should use dates instead of episodes
|
|
if (show_obj.air_by_date or show_obj.sports) and ep_obj.airdate != datetime.date.fromordinal(1):
|
|
ep_strings = [str(ep_obj.airdate)]
|
|
elif show_obj.is_anime:
|
|
ep_strings = ['%02i' % int(ep_obj.scene_absolute_number
|
|
if 0 < ep_obj.scene_absolute_number else ep_obj.scene_episode)]
|
|
else:
|
|
ep_strings = ['S%02iE%02i' % (int(ep_obj.scene_season), int(ep_obj.scene_episode)),
|
|
'%ix%02i' % (int(ep_obj.scene_season), int(ep_obj.scene_episode))]
|
|
|
|
# for single-season shows just search for the show name -- if total ep count (exclude s0) is less than 11
|
|
# due to the amount of qualities and releases, it is easy to go over the 50 result limit on rss feeds otherwise
|
|
if 1 == num_seasons and not ep_obj.show_obj.is_anime:
|
|
ep_strings = ['']
|
|
|
|
show_names = get_show_names_all_possible(show_obj, ep_obj.scene_season)
|
|
|
|
to_return = []
|
|
|
|
for cur_show_obj in show_names:
|
|
for cur_ep_string in ep_strings:
|
|
if not ignore_allowlist and ep_obj.show_obj.is_anime and \
|
|
None is not ep_obj.show_obj.release_groups and ep_obj.show_obj.release_groups.allowlist:
|
|
for keyword in ep_obj.show_obj.release_groups.allowlist:
|
|
to_return.append(keyword + '.' + cur_show_obj + '.' + cur_ep_string)
|
|
else:
|
|
to_return.append(cur_show_obj + '.' + cur_ep_string)
|
|
|
|
return to_return
|
|
|
|
|
|
def all_possible_show_names(show_obj, season=-1, force_anime=False):
|
|
# type: (sickgear.tv.TVShow, int, bool) -> List[AnyStr]
|
|
"""
|
|
Figures out every possible variation of the name for a particular show. Includes TVDB name, TVRage name,
|
|
country codes on the end, e.g. "Show Name (AU)", and any scene exception names.
|
|
|
|
:param show_obj: a TVShow object that we should get the names of
|
|
:param season: season
|
|
:param force_anime:
|
|
:return: a list of all the possible show names
|
|
"""
|
|
|
|
show_names = get_scene_exceptions(show_obj.tvid, show_obj.prodid, season=season)[:]
|
|
if not show_names: # if we don't have any season specific exceptions fallback to generic exceptions
|
|
season = -1
|
|
show_names = get_scene_exceptions(show_obj.tvid, show_obj.prodid, season=season)[:]
|
|
|
|
if -1 == season:
|
|
show_names.append(show_obj.name)
|
|
|
|
if not show_obj.is_anime and not force_anime:
|
|
new_show_names = []
|
|
country_list = common.countryList
|
|
country_list.update(dict(zip(itervalues(common.countryList), iterkeys(common.countryList))))
|
|
for cur_name in set(show_names):
|
|
if not cur_name:
|
|
continue
|
|
|
|
# if we have "Show Name Australia" or "Show Name (Australia)" this will add "Show Name (AU)" for
|
|
# any countries defined in common.countryList
|
|
# (and vice versa)
|
|
for cur_country in country_list:
|
|
if cur_name.endswith(' ' + cur_country):
|
|
new_show_names.append(cur_name.replace(' ' + cur_country,
|
|
' (' + country_list[cur_country] + ')'))
|
|
elif cur_name.endswith(' (' + cur_country + ')'):
|
|
new_show_names.append(cur_name.replace(' (' + cur_country + ')',
|
|
' (' + country_list[cur_country] + ')'))
|
|
|
|
# if we have "Show Name (2013)" this will strip the (2013) show year from the show name
|
|
# newShowNames.append(re.sub('\(\d{4}\)','',curName))
|
|
|
|
show_names += new_show_names
|
|
|
|
return show_names
|
|
|
|
|
|
def determine_release_name(dir_name=None, nzb_name=None):
|
|
# type: (AnyStr, AnyStr) -> Union[AnyStr, None]
|
|
"""Determine a release name from a nzb and/or folder name
|
|
:param dir_name: dir name
|
|
:param nzb_name: nzb name
|
|
:return: None or release name
|
|
"""
|
|
|
|
if None is not nzb_name:
|
|
logger.log('Using nzb name for release name.')
|
|
return nzb_name.rpartition('.')[0]
|
|
|
|
if not dir_name or not os.path.isdir(dir_name):
|
|
return None
|
|
|
|
# try to get the release name from nzb/nfo
|
|
file_types = ['*.nzb', '*.nfo']
|
|
|
|
for search in file_types:
|
|
results = [direntry.name for direntry in scantree(dir_name, include=[fnmatch.translate(search)],
|
|
filter_kind=False, recurse=False)]
|
|
|
|
if 1 == len(results):
|
|
found_file = results[0].rpartition('.')[0]
|
|
if pass_wordlist_checks(found_file):
|
|
logger.log(f'Release name ({found_file}) found from file ({results[0]})')
|
|
return found_file.rpartition('.')[0]
|
|
|
|
# If that fails, we try the folder
|
|
folder = os.path.basename(dir_name)
|
|
if pass_wordlist_checks(folder):
|
|
# NOTE: Multiple failed downloads will change the folder name.
|
|
# (e.g., appending #s)
|
|
# Should we handle that?
|
|
logger.log(f'Folder name ({folder}) appears to be a valid release name. Using it.')
|
|
return folder
|
|
|
|
return None
|
|
|
|
|
|
def abbr_showname(name):
|
|
# type: (AnyStr) -> AnyStr
|
|
result = name
|
|
for cur_from, cur_to in (
|
|
(r'^Star Trek\s*:\s*', r'ST: '), (r'^The Walking Dead\s*:\s*', r'TWD: '),
|
|
):
|
|
result = re.sub('(?i)%s' % cur_from, cur_to, result)
|
|
if name != result:
|
|
break
|
|
return result
|