SickGear/sickgear/anime.py

263 lines
8.6 KiB
Python
Raw Normal View History

# Author: Dennis Lutter <lad1337@gmail.com>
#
# This file is part of Sick Beard.
#
# Sick Beard is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Sick Beard is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sick Beard. If not, see <http://www.gnu.org/licenses/>.
import os
import adba
from adba.aniDBresponses import LoginFirstResponse
from exceptions_helper import ex
import sickgear
from . import db, logger
from .classes import NZBDataSearchResult, NZBSearchResult, TorrentSearchResult
from .helpers import get_system_temp_dir, make_path
# noinspection PyUnreachableCode
if False:
from typing import Any, AnyStr, List, Optional, Tuple
class AllowBlockNoShowIDException(Exception):
"""
No prodid or tvid was given
"""
class AniGroupList(object):
allowlist = [] # type: List[AnyStr]
blocklist = [] # type: List[AnyStr]
def __init__(self, tvid, prodid, tvid_prodid=None):
# type: (int, int, AnyStr) -> None
if not tvid or not prodid:
raise AllowBlockNoShowIDException()
self.tvid = tvid # type: int
self.prodid = prodid # type: int
self.tvid_prodid = tvid_prodid # type: AnyStr
self.load()
def load(self):
logger.debug(f'Building allow amd block list for {self.tvid_prodid}')
self.allowlist = self._load_list('allowlist')
self.blocklist = self._load_list('blocklist')
def _load_list(self, table):
# type: (AnyStr) -> List[AnyStr]
"""
:param table: table name
:return: list of words
"""
my_db = db.DBConnection()
# noinspection SqlResolve
sql_result = my_db.select('SELECT keyword FROM [%s] WHERE indexer = ? AND show_id = ?' % table,
[self.tvid, self.prodid])
if not sql_result or not len(sql_result):
return []
groups = []
for cur_result in sql_result:
groups.append(cur_result['keyword'])
logger.debug('AniPermsList: %s loaded keywords from %s: %s' % (self.tvid_prodid, table, groups))
return groups
def set_allow_keywords(self, values):
# type: (List[AnyStr]) -> None
"""
:param values: list of words
"""
self._del_all_keywords('allowlist')
self._add_keywords('allowlist', values)
self.allowlist = values
logger.debug('Allowlist set to: %s' % self.allowlist)
def set_block_keywords(self, values):
# type: (List[AnyStr]) -> None
"""
:param values: list of words
"""
self._del_all_keywords('blocklist')
self._add_keywords('blocklist', values)
self.blocklist = values
logger.debug('Blocklist set to: %s' % self.blocklist)
def _del_all_keywords(self, table):
# type: (AnyStr) -> None
"""
:param table: table name
"""
my_db = db.DBConnection()
# noinspection SqlResolve
my_db.action('DELETE FROM [%s] WHERE indexer = ? AND show_id = ?' % table, [self.tvid, self.prodid])
def _add_keywords(self, table, values):
# type: (AnyStr, List[AnyStr]) -> None
"""
:param table: table name
:param values: list of words
"""
my_db = db.DBConnection()
for cur_value in values:
# noinspection SqlResolve
my_db.action('INSERT INTO [%s] (indexer, show_id, keyword) VALUES (?,?,?)' % table,
[self.tvid, self.prodid, cur_value])
def is_valid(self, result):
# type: (NZBSearchResult or NZBDataSearchResult or TorrentSearchResult) -> bool
"""
Test if release group parsed from result is in allow list and not in block list
:param result: Search result
:return: True or False
"""
if not result.release_group:
logger.debug('Failed to detect release group, invalid result')
return False
allowed = result.release_group.lower() in [x.lower() for x in self.allowlist] or not self.allowlist
blocked = result.release_group.lower() in [x.lower() for x in self.blocklist]
logger.debug(f'Result {("not ", "")[allowed]}allowed{(", but", " and not")[not blocked]} in block list.'
f' Parsed group name: "{result.release_group}" from result "{result.name}"')
return allowed and not blocked
def short_group_names(groups):
# type: (AnyStr) -> List[AnyStr]
"""
:param groups: groups
:return: list of groups
"""
group_list = groups.split(',')
short_group_list = []
if set_up_anidb_connection():
for group_name in group_list:
adba_result = None
try:
# no such group is returned for utf8 groups like interrobang
adba_result = sickgear.ADBA_CONNECTION.group(gname=group_name)
except(BaseException, Exception):
pass
if isinstance(adba_result, LoginFirstResponse):
break
if None is adba_result or not hasattr(adba_result, 'datalines'):
continue
for line in adba_result.datalines:
if line['shortname']:
short_group_list.append(line['shortname'])
else:
if group_name not in short_group_list:
short_group_list.append(group_name)
else:
short_group_list = group_list
return short_group_list
def anidb_cache_dir():
# type: (...) -> Optional[AnyStr]
cache_dir = os.path.join(sickgear.CACHE_DIR or get_system_temp_dir(), 'anidb')
if not make_path(cache_dir):
cache_dir = None
return cache_dir
def create_anidb_obj(**kwargs):
return adba.Anime(sickgear.ADBA_CONNECTION, cache_path=anidb_cache_dir(), **kwargs)
def set_up_anidb_connection():
if not sickgear.USE_ANIDB:
logger.debug('Usage of anidb disabled. Skipping')
return False
if not sickgear.ANIDB_USERNAME and not sickgear.ANIDB_PASSWORD:
logger.debug('anidb username and/or password are not set. Aborting anidb lookup.')
return False
if not sickgear.ADBA_CONNECTION:
# anidb_logger = (lambda x: logger.debug('ANIDB: ' + str(x)))
sickgear.ADBA_CONNECTION = adba.Connection(keepAlive=True) # , log=anidb_logger)
auth = False
try:
auth = sickgear.ADBA_CONNECTION.authed()
except (BaseException, Exception) as e:
logger.log(f'exception msg: {ex(e)}')
pass
if not auth:
try:
sickgear.ADBA_CONNECTION.auth(sickgear.ANIDB_USERNAME, sickgear.ANIDB_PASSWORD)
except (BaseException, Exception) as e:
logger.log(f'exception msg: {ex(e)}')
return False
else:
return True
return sickgear.ADBA_CONNECTION.authed()
def pull_anidb_groups(show_name):
# type: (AnyStr) -> Optional[bool, List]
if set_up_anidb_connection():
try:
anime = create_anidb_obj(name=show_name)
return anime.get_groups()
except (BaseException, Exception) as e:
logger.debug(f'Anidb exception: {ex(e)}')
return False
def push_anidb_mylist(filepath, anidb_episode):
# type: (AnyStr, Any) -> Tuple[Optional[bool], Optional[Tuple[AnyStr, int]]]
"""
:param filepath: file path
:type filepath: AnyStr
:param anidb_episode:
:type anidb_episode:
:return
"""
result, log = None, None
if set_up_anidb_connection():
if not anidb_episode: # seems like we could parse the name before, build the anidb object
# build an anidb episode
anidb_episode = adba.Episode(
sickgear.ADBA_CONNECTION,
filePath=filepath,
paramsF=['quality', 'anidb_file_name', 'crc32'],
paramsA=['epno', 'english_name', 'short_name_list', 'other_name', 'synonym_list'])
try:
anidb_episode.add_to_mylist(state=1) # status = 1 sets the status of the file to "internal HDD"
log = ('Adding the file to the anidb mylist', logger.DEBUG)
result = True
except (BaseException, Exception) as e:
log = (f'exception msg: {ex(e)}', logger.MESSAGE)
result = False
return result, log