mirror of
https://github.com/SickGear/SickGear.git
synced 2024-11-15 09:25:04 +00:00
ae4173e8c0
Change use specific logger functions for debug, warning, error.
262 lines
8.6 KiB
Python
262 lines
8.6 KiB
Python
# Author: Dennis Lutter <lad1337@gmail.com>
|
|
#
|
|
# This file is part of Sick Beard.
|
|
#
|
|
# Sick Beard is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Sick Beard is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Sick Beard. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
|
|
import adba
|
|
from adba.aniDBresponses import LoginFirstResponse
|
|
from exceptions_helper import ex
|
|
|
|
import sickgear
|
|
from . import db, logger
|
|
from .classes import NZBDataSearchResult, NZBSearchResult, TorrentSearchResult
|
|
from .helpers import get_system_temp_dir, make_path
|
|
|
|
# noinspection PyUnreachableCode
|
|
if False:
|
|
from typing import Any, AnyStr, List, Optional, Tuple
|
|
|
|
|
|
class AllowBlockNoShowIDException(Exception):
|
|
"""
|
|
No prodid or tvid was given
|
|
"""
|
|
|
|
|
|
class AniGroupList(object):
|
|
allowlist = [] # type: List[AnyStr]
|
|
blocklist = [] # type: List[AnyStr]
|
|
|
|
def __init__(self, tvid, prodid, tvid_prodid=None):
|
|
# type: (int, int, AnyStr) -> None
|
|
|
|
if not tvid or not prodid:
|
|
raise AllowBlockNoShowIDException()
|
|
self.tvid = tvid # type: int
|
|
self.prodid = prodid # type: int
|
|
self.tvid_prodid = tvid_prodid # type: AnyStr
|
|
self.load()
|
|
|
|
def load(self):
|
|
logger.debug(f'Building allow amd block list for {self.tvid_prodid}')
|
|
self.allowlist = self._load_list('allowlist')
|
|
self.blocklist = self._load_list('blocklist')
|
|
|
|
def _load_list(self, table):
|
|
# type: (AnyStr) -> List[AnyStr]
|
|
"""
|
|
|
|
:param table: table name
|
|
:return: list of words
|
|
"""
|
|
my_db = db.DBConnection()
|
|
# noinspection SqlResolve
|
|
sql_result = my_db.select('SELECT keyword FROM [%s] WHERE indexer = ? AND show_id = ?' % table,
|
|
[self.tvid, self.prodid])
|
|
if not sql_result or not len(sql_result):
|
|
return []
|
|
|
|
groups = []
|
|
for cur_result in sql_result:
|
|
groups.append(cur_result['keyword'])
|
|
|
|
logger.debug('AniPermsList: %s loaded keywords from %s: %s' % (self.tvid_prodid, table, groups))
|
|
|
|
return groups
|
|
|
|
def set_allow_keywords(self, values):
|
|
# type: (List[AnyStr]) -> None
|
|
"""
|
|
|
|
:param values: list of words
|
|
"""
|
|
self._del_all_keywords('allowlist')
|
|
self._add_keywords('allowlist', values)
|
|
self.allowlist = values
|
|
logger.debug('Allowlist set to: %s' % self.allowlist)
|
|
|
|
def set_block_keywords(self, values):
|
|
# type: (List[AnyStr]) -> None
|
|
"""
|
|
|
|
:param values: list of words
|
|
"""
|
|
self._del_all_keywords('blocklist')
|
|
self._add_keywords('blocklist', values)
|
|
self.blocklist = values
|
|
logger.debug('Blocklist set to: %s' % self.blocklist)
|
|
|
|
def _del_all_keywords(self, table):
|
|
# type: (AnyStr) -> None
|
|
"""
|
|
|
|
:param table: table name
|
|
"""
|
|
my_db = db.DBConnection()
|
|
# noinspection SqlResolve
|
|
my_db.action('DELETE FROM [%s] WHERE indexer = ? AND show_id = ?' % table, [self.tvid, self.prodid])
|
|
|
|
def _add_keywords(self, table, values):
|
|
# type: (AnyStr, List[AnyStr]) -> None
|
|
"""
|
|
|
|
:param table: table name
|
|
:param values: list of words
|
|
"""
|
|
my_db = db.DBConnection()
|
|
for cur_value in values:
|
|
# noinspection SqlResolve
|
|
my_db.action('INSERT INTO [%s] (indexer, show_id, keyword) VALUES (?,?,?)' % table,
|
|
[self.tvid, self.prodid, cur_value])
|
|
|
|
def is_valid(self, result):
|
|
# type: (NZBSearchResult or NZBDataSearchResult or TorrentSearchResult) -> bool
|
|
"""
|
|
Test if release group parsed from result is in allow list and not in block list
|
|
|
|
:param result: Search result
|
|
:return: True or False
|
|
"""
|
|
if not result.release_group:
|
|
logger.debug('Failed to detect release group, invalid result')
|
|
return False
|
|
|
|
allowed = result.release_group.lower() in [x.lower() for x in self.allowlist] or not self.allowlist
|
|
blocked = result.release_group.lower() in [x.lower() for x in self.blocklist]
|
|
|
|
logger.debug(f'Result {("not ", "")[allowed]}allowed{(", but", " and not")[not blocked]} in block list.'
|
|
f' Parsed group name: "{result.release_group}" from result "{result.name}"')
|
|
|
|
return allowed and not blocked
|
|
|
|
|
|
def short_group_names(groups):
|
|
# type: (AnyStr) -> List[AnyStr]
|
|
"""
|
|
|
|
:param groups: groups
|
|
:return: list of groups
|
|
"""
|
|
group_list = groups.split(',')
|
|
short_group_list = []
|
|
if set_up_anidb_connection():
|
|
for group_name in group_list:
|
|
adba_result = None
|
|
try:
|
|
# no such group is returned for utf8 groups like interrobang
|
|
adba_result = sickgear.ADBA_CONNECTION.group(gname=group_name)
|
|
except(BaseException, Exception):
|
|
pass
|
|
if isinstance(adba_result, LoginFirstResponse):
|
|
break
|
|
if None is adba_result or not hasattr(adba_result, 'datalines'):
|
|
continue
|
|
for line in adba_result.datalines:
|
|
if line['shortname']:
|
|
short_group_list.append(line['shortname'])
|
|
else:
|
|
if group_name not in short_group_list:
|
|
short_group_list.append(group_name)
|
|
else:
|
|
short_group_list = group_list
|
|
return short_group_list
|
|
|
|
|
|
def anidb_cache_dir():
|
|
# type: (...) -> Optional[AnyStr]
|
|
cache_dir = os.path.join(sickgear.CACHE_DIR or get_system_temp_dir(), 'anidb')
|
|
if not make_path(cache_dir):
|
|
cache_dir = None
|
|
return cache_dir
|
|
|
|
|
|
def create_anidb_obj(**kwargs):
|
|
|
|
return adba.Anime(sickgear.ADBA_CONNECTION, cache_path=anidb_cache_dir(), **kwargs)
|
|
|
|
|
|
def set_up_anidb_connection():
|
|
if not sickgear.USE_ANIDB:
|
|
logger.debug('Usage of anidb disabled. Skipping')
|
|
return False
|
|
|
|
if not sickgear.ANIDB_USERNAME and not sickgear.ANIDB_PASSWORD:
|
|
logger.debug('anidb username and/or password are not set. Aborting anidb lookup.')
|
|
return False
|
|
|
|
if not sickgear.ADBA_CONNECTION:
|
|
# anidb_logger = (lambda x: logger.debug('ANIDB: ' + str(x)))
|
|
sickgear.ADBA_CONNECTION = adba.Connection(keepAlive=True) # , log=anidb_logger)
|
|
|
|
auth = False
|
|
try:
|
|
auth = sickgear.ADBA_CONNECTION.authed()
|
|
except (BaseException, Exception) as e:
|
|
logger.log(f'exception msg: {ex(e)}')
|
|
pass
|
|
|
|
if not auth:
|
|
try:
|
|
sickgear.ADBA_CONNECTION.auth(sickgear.ANIDB_USERNAME, sickgear.ANIDB_PASSWORD)
|
|
except (BaseException, Exception) as e:
|
|
logger.log(f'exception msg: {ex(e)}')
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
return sickgear.ADBA_CONNECTION.authed()
|
|
|
|
|
|
def pull_anidb_groups(show_name):
|
|
# type: (AnyStr) -> Optional[bool, List]
|
|
if set_up_anidb_connection():
|
|
try:
|
|
anime = create_anidb_obj(name=show_name)
|
|
return anime.get_groups()
|
|
except (BaseException, Exception) as e:
|
|
logger.debug(f'Anidb exception: {ex(e)}')
|
|
return False
|
|
|
|
|
|
def push_anidb_mylist(filepath, anidb_episode):
|
|
# type: (AnyStr, Any) -> Tuple[Optional[bool], Optional[Tuple[AnyStr, int]]]
|
|
"""
|
|
:param filepath: file path
|
|
:type filepath: AnyStr
|
|
:param anidb_episode:
|
|
:type anidb_episode:
|
|
:return
|
|
"""
|
|
result, log = None, None
|
|
if set_up_anidb_connection():
|
|
if not anidb_episode: # seems like we could parse the name before, build the anidb object
|
|
# build an anidb episode
|
|
anidb_episode = adba.Episode(
|
|
sickgear.ADBA_CONNECTION,
|
|
filePath=filepath,
|
|
paramsF=['quality', 'anidb_file_name', 'crc32'],
|
|
paramsA=['epno', 'english_name', 'short_name_list', 'other_name', 'synonym_list'])
|
|
|
|
try:
|
|
anidb_episode.add_to_mylist(state=1) # status = 1 sets the status of the file to "internal HDD"
|
|
log = ('Adding the file to the anidb mylist', logger.DEBUG)
|
|
result = True
|
|
except (BaseException, Exception) as e:
|
|
log = (f'exception msg: {ex(e)}', logger.MESSAGE)
|
|
result = False
|
|
|
|
return result, log
|