mirror of
https://github.com/SickGear/SickGear.git
synced 2024-12-01 17:03:38 +00:00
bc1eff8d3b
Change improve performance, don't fetch cached data if no cache data is wanted.
1210 lines
62 KiB
Python
1210 lines
62 KiB
Python
# encoding:utf-8
|
|
# author:Prinz23
|
|
# project:tvdb_api_v4
|
|
|
|
__author__ = 'Prinz23'
|
|
__version__ = '1.0'
|
|
__api_version__ = '1.0.0'
|
|
|
|
import base64
|
|
import datetime
|
|
import logging
|
|
import re
|
|
|
|
from bs4_parser import BS4Parser
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
from .tvdb_exceptions import TvdbError, TvdbTokenFailure
|
|
from exceptions_helper import ex
|
|
from lib.dateutil.parser import parser
|
|
# noinspection PyProtectedMember
|
|
from lib.exceptions_helper import ConnectionSkipException
|
|
from lib.tvinfo_base import (
|
|
CastList, CrewList, PersonGenders, RoleTypes,
|
|
TVInfoBase, TVInfoCharacter, TVInfoEpisode, TVInfoIDs, TVInfoImage, TVInfoImageSize, TVInfoImageType, TVInfoNetwork,
|
|
TVInfoPerson, TVInfoSeason, TVInfoSeasonTypes, TVInfoShow, TVInfoSocialIDs,
|
|
TVINFO_FACEBOOK, TVINFO_FANSITE, TVINFO_IMDB, TVINFO_INSTAGRAM, TVINFO_LINKEDIN, TVINFO_OFFICIALSITE, TVINFO_REDDIT,
|
|
TVINFO_MID_SEASON_FINALE, TVINFO_SEASON_FINALE, TVINFO_SERIES_FINALE, TVINFO_TIKTOK, TVINFO_TMDB, TVINFO_TVDB,
|
|
TVINFO_TVDB_SLUG, TVINFO_TVMAZE, TVINFO_TWITTER, TVINFO_WIKIDATA, TVINFO_WIKIPEDIA, TVINFO_YOUTUBE)
|
|
from sg_helpers import clean_data, clean_str, enforce_type, get_url, try_date, try_int
|
|
from sickgear import ENV
|
|
|
|
from six import integer_types, iteritems, PY3, string_types
|
|
# noinspection PyUnreachableCode
|
|
if False:
|
|
from typing import Any, AnyStr, Dict, List, Optional, Tuple, Union
|
|
|
|
log = logging.getLogger('tvdb_v4.api')
|
|
log.addHandler(logging.NullHandler())
|
|
|
|
TVDB_API_CONFIG = {}
|
|
|
|
NoneType = type(None)
|
|
|
|
|
|
# always use https in cases of redirects
|
|
# noinspection PyUnusedLocal,HttpUrlsUsage
|
|
def _record_hook(r, *args, **kwargs):
|
|
r.hook_called = True
|
|
if r.status_code in (301, 302, 303, 307, 308) and \
|
|
isinstance(r.headers.get('Location'), string_types) and r.headers.get('Location').startswith('http://'):
|
|
r.headers['Location'] = r.headers['Location'].replace('http://', 'https://')
|
|
return r
|
|
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
class RequestsAuthBase(requests.auth.AuthBase):
|
|
# inherit the Requests dynamic packaging here in order to isolate a pyc non-inspection directive
|
|
pass
|
|
|
|
|
|
class TvdbAuth(RequestsAuthBase):
|
|
_token = None
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def reset_token(self):
|
|
self._token = None
|
|
|
|
@staticmethod
|
|
def apikey():
|
|
string = TVDB_API_CONFIG['api_params']['apikey_v4']
|
|
key = TVDB_API_CONFIG['api_params']['apikey']
|
|
string = base64.urlsafe_b64decode(string + b'===')
|
|
string = string.decode('latin') if PY3 else string
|
|
encoded_chars = []
|
|
for i in range(len(string)):
|
|
key_c = key[i % len(key)]
|
|
encoded_c = chr((ord(string[i]) - ord(key_c) + 256) % 256)
|
|
encoded_chars.append(encoded_c)
|
|
encoded_string = ''.join(encoded_chars)
|
|
return encoded_string
|
|
|
|
def get_token(self):
|
|
url = f'{TvdbAPIv4.base_url}{"login"}'
|
|
params = {'apikey': self.apikey()}
|
|
resp = get_url(url, post_json=params, parse_json=True, raise_skip_exception=True)
|
|
if resp and isinstance(resp, dict):
|
|
if 'status' in resp:
|
|
if 'failure' == resp['status']:
|
|
raise TvdbTokenFailure(f'Failed to Authenticate. {resp.get("message", "")}')
|
|
if 'success' == resp['status'] and 'data' in resp and isinstance(resp['data'], dict) \
|
|
and 'token' in resp['data']:
|
|
self._token = resp['data']['token']
|
|
return True
|
|
else:
|
|
raise TvdbTokenFailure('Failed to get Tvdb Token')
|
|
|
|
@property
|
|
def token(self):
|
|
if not self._token:
|
|
self.get_token()
|
|
return self._token
|
|
|
|
def handle_401(self, r, **kwargs):
|
|
if 401 == r.status_code and not any(401 == _or.status_code for _or in r.history):
|
|
self.reset_token()
|
|
self.get_token()
|
|
if self._token:
|
|
prep = r.request.copy()
|
|
prep.headers['Authorization'] = f'Bearer {self._token}'
|
|
_r = r.connection.send(prep, **kwargs)
|
|
_r.history.append(r)
|
|
_r.request = prep
|
|
return _r
|
|
|
|
return r
|
|
|
|
def __call__(self, r):
|
|
r.headers["Authorization"] = f'Bearer {self.token}'
|
|
r.register_hook('response', self.handle_401)
|
|
return r
|
|
|
|
|
|
DEFAULT_TIMEOUT = 30 # seconds
|
|
|
|
|
|
class TimeoutHTTPAdapter(HTTPAdapter):
|
|
def __init__(self, *args, **kwargs):
|
|
self.timeout = DEFAULT_TIMEOUT
|
|
if "timeout" in kwargs:
|
|
self.timeout = kwargs["timeout"]
|
|
del kwargs["timeout"]
|
|
super(TimeoutHTTPAdapter, self).__init__(*args, **kwargs)
|
|
|
|
def send(self, request, **kwargs):
|
|
timeout = kwargs.get("timeout")
|
|
if timeout is None:
|
|
kwargs["timeout"] = self.timeout
|
|
return super(TimeoutHTTPAdapter, self).send(request, **kwargs)
|
|
|
|
|
|
s = requests.Session()
|
|
retries = Retry(total=3,
|
|
backoff_factor=1,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
method_whitelist=['HEAD', 'GET', 'PUT', 'DELETE', 'OPTIONS', 'TRACE', 'POST'])
|
|
# noinspection HttpUrlsUsage
|
|
s.mount('http://', HTTPAdapter(TimeoutHTTPAdapter(max_retries=retries)))
|
|
s.mount('https://', HTTPAdapter(TimeoutHTTPAdapter(max_retries=retries)))
|
|
base_request_para = dict(session=s, hooks={'response': _record_hook}, raise_skip_exception=True, auth=TvdbAuth())
|
|
|
|
|
|
# Query TVdb endpoints
|
|
def tvdb_endpoint_fetch(*args, **kwargs):
|
|
kwargs.update(base_request_para)
|
|
return get_url(*args, **kwargs)
|
|
|
|
|
|
img_type_map = {
|
|
1: TVInfoImageType.banner, # series
|
|
2: TVInfoImageType.poster, # series
|
|
3: TVInfoImageType.fanart, # series
|
|
6: TVInfoImageType.season_banner, # season
|
|
7: TVInfoImageType.season_poster, # season
|
|
8: TVInfoImageType.season_fanart, # season
|
|
13: TVInfoImageType.person_poster, # person
|
|
}
|
|
|
|
people_types = {
|
|
1: RoleTypes.CrewDirector, # 'Director',
|
|
2: RoleTypes.CrewWriter, # 'Writer'
|
|
3: RoleTypes.ActorMain, # 'Actor'
|
|
4: RoleTypes.ActorGuest, # 'Guest Star',
|
|
5: RoleTypes.CrewOther, # 'Crew',
|
|
6: RoleTypes.CrewCreator, # 'Creator',
|
|
7: RoleTypes.CrewProducer, # 'Producer',
|
|
8: RoleTypes.CrewShowrunner, # 'Showrunner',
|
|
9: RoleTypes.MusicalGuest, # 'Musical Guest',
|
|
10: RoleTypes.Host, # 'Host',
|
|
11: RoleTypes.CrewExecutiveProducer, # 'Executive Producer',
|
|
}
|
|
|
|
source_types = {
|
|
2: TVINFO_IMDB, # title
|
|
# 3: TVINFO_ZAP2IT,
|
|
4: TVINFO_OFFICIALSITE,
|
|
5: TVINFO_FACEBOOK,
|
|
6: TVINFO_TWITTER,
|
|
7: TVINFO_REDDIT,
|
|
8: TVINFO_FANSITE,
|
|
9: TVINFO_INSTAGRAM,
|
|
10: TVINFO_TMDB, # movie
|
|
11: TVINFO_YOUTUBE,
|
|
12: TVINFO_TMDB, # tv
|
|
# 13: TVINFO_EIDR, # content
|
|
# 14: TVINFO_EIDR, # party
|
|
15: TVINFO_TMDB, # person
|
|
16: TVINFO_IMDB, # person
|
|
17: TVINFO_IMDB, # company
|
|
18: TVINFO_WIKIDATA,
|
|
19: TVINFO_TVMAZE, # title
|
|
20: TVINFO_LINKEDIN,
|
|
21: TVINFO_TVMAZE, # person
|
|
22: TVINFO_TVMAZE, # season
|
|
23: TVINFO_TVMAZE, # episode
|
|
24: TVINFO_WIKIPEDIA,
|
|
25: TVINFO_TIKTOK,
|
|
26: TVINFO_LINKEDIN, # company
|
|
27: TVINFO_TVMAZE, # company
|
|
28: TVINFO_TMDB, # collection
|
|
29: TVINFO_TMDB, # collection
|
|
}
|
|
|
|
people_types_reverse = {_v: _k for _k, _v in iteritems(people_types)}
|
|
|
|
empty_ep = TVInfoEpisode()
|
|
tz_p = parser()
|
|
status_ids = {
|
|
1: {'name': 'Continuing', 'recordType': 'series', 'keepUpdated': False},
|
|
2: {'name': 'Ended', 'recordType': 'series', 'keepUpdated': False},
|
|
3: {'name': 'Upcoming', 'recordType': 'series', 'keepUpdated': False}
|
|
}
|
|
status_names = {
|
|
'Continuing': {'id': 1, 'recordType': 'series', 'keepUpdated': False},
|
|
'Ended': {'id': 2, 'recordType': 'series', 'keepUpdated': False},
|
|
'Upcoming': {'id': 3, 'recordType': 'series', 'keepUpdated': False}
|
|
}
|
|
tvdb_final_types = {
|
|
'series': TVINFO_SERIES_FINALE,
|
|
'season': TVINFO_SEASON_FINALE,
|
|
'midseason': TVINFO_MID_SEASON_FINALE
|
|
}
|
|
|
|
|
|
class TvdbAPIv4(TVInfoBase):
|
|
supported_id_searches = [TVINFO_TVDB, TVINFO_TVDB_SLUG, TVINFO_IMDB, TVINFO_TMDB, TVINFO_TVMAZE]
|
|
supported_person_id_searches = [TVINFO_TVDB, TVINFO_IMDB, TVINFO_TMDB, TVINFO_TVMAZE]
|
|
base_url = 'https://api4.thetvdb.com/v4/'
|
|
art_url = 'https://artworks.thetvdb.com/'
|
|
season_types = {1: 'official', 2: 'dvd', 3: 'absolute', 4: 'alternate', 5: 'regional', 6: 'altdvd'}
|
|
season_type_map = {season_types[1]: TVInfoSeasonTypes.official, season_types[2]: TVInfoSeasonTypes.dvd}
|
|
|
|
def __init__(self, banners=False, posters=False, seasons=False, seasonwides=False, fanart=False, actors=False,
|
|
dvdorder=False, *args, **kwargs):
|
|
super(TvdbAPIv4, self).__init__(banners, posters, seasons, seasonwides, fanart, actors, dvdorder, *args,
|
|
**kwargs)
|
|
|
|
def _fetch_data(self, endpoint, **kwargs):
|
|
# type: (string_types, Any) -> Any
|
|
if is_series_info := endpoint.startswith('/series/'):
|
|
self.show_not_found = False
|
|
try:
|
|
return tvdb_endpoint_fetch(url=f'{self.base_url}{endpoint}', params=kwargs, parse_json=True,
|
|
raise_status_code=True, raise_exceptions=True)
|
|
except ConnectionSkipException as e:
|
|
raise e
|
|
except requests.exceptions.HTTPError as e:
|
|
if 401 == e.response.status_code:
|
|
raise TvdbTokenFailure('Failed to get new Token')
|
|
elif 404 == e.response.status_code:
|
|
if is_series_info:
|
|
self.show_not_found = True
|
|
self.not_found = True
|
|
elif 404 != e.response.status_code:
|
|
raise TvdbError(ex(e))
|
|
except (BaseException, Exception) as e:
|
|
raise TvdbError(ex(e))
|
|
|
|
@staticmethod
|
|
def _check_resp(type_chk=list, data=None):
|
|
return isinstance(data, dict) and all(_k in data for _k in ('data', 'status')) \
|
|
and 'success' == data['status'] and isinstance(data['data'], type_chk)
|
|
|
|
@staticmethod
|
|
def _next_page(resp, page):
|
|
page += 1
|
|
if f'?page={page}' in ((resp.get('links') or {}).get('next') or ''):
|
|
return page
|
|
|
|
def _convert_person(self, p, ids=None):
|
|
# type: (Dict, Dict) -> List[TVInfoPerson]
|
|
ch, ids = [], ids or {}
|
|
for cur_c in sorted(filter(
|
|
lambda a: (3 == a['type'] or 'Actor' == a['peopleType'])
|
|
and a['name'] and a['seriesId'] and not a.get('episodeId'),
|
|
p.get('characters') or []),
|
|
key=lambda a: (not a['isFeatured'], a['sort'])):
|
|
ti_show = TVInfoShow()
|
|
ti_show.id = clean_data(cur_c['seriesId'])
|
|
ti_show.ids = TVInfoIDs(ids={TVINFO_TVDB: ti_show.id})
|
|
ti_show.seriesname = clean_data(('series' in cur_c and cur_c['series'] and cur_c['series']['name']))
|
|
ti_show.poster = self._sanitise_image_uri(('series' in cur_c and cur_c['series']
|
|
and cur_c['series']['image']))
|
|
ti_show.firstaired = self._get_first_aired(('series' in cur_c and cur_c['series']))
|
|
ch.append(TVInfoCharacter(
|
|
id=cur_c['id'], ids=TVInfoIDs(ids={TVINFO_TVDB: cur_c['id']}),
|
|
name=clean_data(cur_c['name'] or ''),
|
|
image=self._sanitise_image_uri(cur_c.get('image')),
|
|
regular=cur_c['isFeatured'],
|
|
ti_show=ti_show
|
|
))
|
|
try:
|
|
b_date = clean_data(p.get('birth'))
|
|
birthdate = (b_date and '0000-00-00' != b_date and tz_p.parse(b_date).date()) or None
|
|
except (BaseException, Exception):
|
|
birthdate = None
|
|
try:
|
|
d_date = clean_data(p.get('death'))
|
|
deathdate = (d_date and '0000-00-00' != d_date and tz_p.parse(d_date).date()) or None
|
|
except (BaseException, Exception):
|
|
deathdate = None
|
|
|
|
p_tvdb_id = self._get_tvdb_id(p)
|
|
ids.update({TVINFO_TVDB: p_tvdb_id})
|
|
social_ids, official_site = {}, None
|
|
|
|
if 'remoteIds' in p and isinstance(p['remoteIds'], list):
|
|
for cur_rid in p['remoteIds']:
|
|
if not (src_value := clean_data(cur_rid['id'])):
|
|
continue
|
|
src_name = cur_rid['sourceName'].lower()
|
|
src_type = source_types.get(cur_rid['type'])
|
|
if TVINFO_IMDB == src_type or 'imdb' in src_name:
|
|
try:
|
|
imdb_id = try_int(f'{src_value}'.replace('nm', ''), None)
|
|
ids[TVINFO_IMDB] = imdb_id
|
|
except (BaseException, Exception):
|
|
pass
|
|
elif TVINFO_TMDB == src_type or 'themoviedb' in src_name:
|
|
ids[TVINFO_TMDB] = try_int(src_value, None)
|
|
elif TVINFO_TVMAZE == src_type or 'tv maze' in src_name:
|
|
ids[TVINFO_TVMAZE] = try_int(src_value, None)
|
|
elif TVINFO_OFFICIALSITE == src_type or 'official website' in src_name:
|
|
official_site = src_value
|
|
elif TVINFO_FACEBOOK == src_type or 'facebook' in src_name:
|
|
social_ids[TVINFO_FACEBOOK] = src_value
|
|
elif TVINFO_TWITTER == src_type or 'twitter' in src_name:
|
|
social_ids[TVINFO_TWITTER] = src_value
|
|
elif TVINFO_INSTAGRAM == src_type or 'instagram' in src_name:
|
|
social_ids[TVINFO_INSTAGRAM] = src_value
|
|
elif TVINFO_REDDIT == src_type or 'reddit' in src_name:
|
|
social_ids[TVINFO_REDDIT] = src_value
|
|
elif TVINFO_YOUTUBE == src_type or 'youtube' in src_name:
|
|
social_ids[TVINFO_YOUTUBE] = src_value
|
|
elif TVINFO_WIKIPEDIA == src_type or 'wikipedia' in src_name:
|
|
social_ids[TVINFO_WIKIPEDIA] = src_value
|
|
elif TVINFO_WIKIDATA == src_type or 'wikidata' in src_name:
|
|
social_ids[TVINFO_WIKIDATA] = src_value
|
|
elif TVINFO_TIKTOK == src_type or 'tiktok' in src_name:
|
|
social_ids[TVINFO_TIKTOK] = src_value
|
|
elif TVINFO_LINKEDIN == src_type:
|
|
social_ids[TVINFO_LINKEDIN] = src_value
|
|
elif TVINFO_FANSITE == src_type:
|
|
social_ids[TVINFO_FANSITE] = src_value
|
|
|
|
bio = clean_data(
|
|
next((_cp.get('biography') for _cp in p.get('biographies') or [] if 'eng' == _cp.get('language')), None)) \
|
|
or None
|
|
|
|
return [TVInfoPerson(
|
|
p_id=p_tvdb_id, name=clean_data(p['name'] or ''),
|
|
image=self._sanitise_image_uri(p.get('image') or p.get('image_url')),
|
|
gender=PersonGenders.tvdb_map.get(p.get('gender'), PersonGenders.unknown), birthdate=birthdate,
|
|
deathdate=deathdate, birthplace=clean_data(p.get('birthPlace')),
|
|
akas=set(clean_data((isinstance(_a, dict) and _a['name']) or _a) for _a in p.get('aliases') or []),
|
|
bio=bio, ids=TVInfoIDs(ids=ids), social_ids=TVInfoSocialIDs(ids=social_ids), homepage=official_site,
|
|
characters=ch
|
|
)]
|
|
|
|
# noinspection PyUnboundLocalVariable
|
|
def get_cached_or_new(self, cache_key, url, no_cached=True, expire=None, **kwargs):
|
|
# type: (str, str, bool, int, ...) -> Union[List, Dict, NoneType]
|
|
"""
|
|
get cached or new data
|
|
|
|
:param cache_key: cache key
|
|
:param url: url
|
|
:param no_cached: always fetch data (no cached data)
|
|
:param expire: expire time for caching
|
|
:param kwargs: extra parameter for fetching data
|
|
"""
|
|
if not no_cached:
|
|
is_none, resp = self._get_cache_entry(cache_key)
|
|
if no_cached or (None is resp and not is_none):
|
|
try:
|
|
resp = self._fetch_data(url, **kwargs)
|
|
self._set_cache_entry(cache_key, resp, expire=expire)
|
|
except (BaseException, Exception):
|
|
resp = None
|
|
return resp
|
|
|
|
def get_person(self, p_id, get_show_credits=False, get_images=False, **kwargs):
|
|
# type: (integer_types, bool, bool, Any) -> Optional[TVInfoPerson]
|
|
"""
|
|
get person's data for id or list of matching persons for name
|
|
|
|
:param p_id: persons id
|
|
:param get_show_credits: get show credits
|
|
:param get_images: get person images
|
|
:return: person object
|
|
"""
|
|
if not p_id:
|
|
return
|
|
|
|
if self._check_resp(dict, resp := self.get_cached_or_new(f'p-v4-{p_id}', f'/people/{p_id}/extended')):
|
|
return self._convert_person(resp['data'])[0]
|
|
|
|
def _search_person(self, name=None, ids=None):
|
|
# type: (AnyStr, Dict[integer_types, integer_types]) -> List[TVInfoPerson]
|
|
"""
|
|
search for person by name
|
|
:param name: text to search for
|
|
:param ids: dict of ids to search
|
|
:return: list of found person's
|
|
"""
|
|
urls, result, ids = [], [], ids or {}
|
|
for cur_tvinfo in self.supported_person_id_searches:
|
|
if cur_tvinfo in ids:
|
|
if TVINFO_TVDB == cur_tvinfo and (resp := self.get_person(ids[cur_tvinfo])):
|
|
result.append(resp)
|
|
if cur_tvinfo in (TVINFO_IMDB, TVINFO_TMDB, TVINFO_TVMAZE):
|
|
if TVINFO_IMDB == cur_tvinfo:
|
|
url = f'search/remoteid/nm{ids.get(TVINFO_IMDB):07d}'
|
|
elif TVINFO_TMDB == cur_tvinfo:
|
|
url = f'search/remoteid/{ids.get(TVINFO_TMDB)}'
|
|
elif TVINFO_TVMAZE == cur_tvinfo:
|
|
url = f'search/remoteid/{ids.get(TVINFO_TVMAZE)}'
|
|
else:
|
|
continue
|
|
resp = self.get_cached_or_new(
|
|
f'p-v4-id-{cur_tvinfo}-{ids[cur_tvinfo]}', url, expire=self.search_cache_expire,
|
|
no_cached=not self.config.get('cache_search'))
|
|
|
|
if self._check_resp(list, resp):
|
|
for cur_resp in resp['data']:
|
|
if isinstance(cur_resp, dict) and 'people' in cur_resp:
|
|
if p_d := None if 1 != len(cur_resp) else self.get_person(cur_resp['people']['id']):
|
|
result.append(p_d)
|
|
else:
|
|
result.extend(self._convert_person(cur_resp['people'], ids))
|
|
break
|
|
if name:
|
|
resp = self.get_cached_or_new(
|
|
f'p-v4-src-text-{name}', '/search', query=name, type='people',
|
|
expire=self.search_cache_expire, no_cached=not self.config.get('cache_search'))
|
|
|
|
if self._check_resp(list, resp):
|
|
for cur_resp in resp['data']:
|
|
result.extend(self._convert_person(cur_resp))
|
|
|
|
seen = set()
|
|
result = [seen.add(_r.id) or _r for _r in result if _r.id not in seen]
|
|
return result
|
|
|
|
def search_tvs(self, terms, language=None):
|
|
# type: (Union[int, AnyStr], Optional[AnyStr]) -> Optional[dict]
|
|
from random import choice
|
|
|
|
sg_lang = next(filter(lambda x: language == x['id'], self.get_languages()), {}).get('sg_lang')
|
|
headers = {'Accept-Encoding': 'gzip,deflate'}
|
|
if None is not sg_lang: # and sg_lang in self.config['valid_languages']:
|
|
headers.update({'Accept-Language': sg_lang})
|
|
|
|
try:
|
|
src = get_url(
|
|
'https://tvshow''time-%s.algo''lia.net/1/'
|
|
'indexes/*/queries' % choice([1, 2, 3, 'dsn']),
|
|
params={'x-algo''lia-agent': 'Alg''olia for vani''lla JavaScript (lite) 3.3''2.0;'
|
|
'instant''search.js (3.5''.3);JS Helper (2.2''8.0)',
|
|
'x-algo''lia''-app''lication-id': 'tvshow''time',
|
|
'x-algo''lia''-ap''i-key': '3d''978dd96c457390f21cec6131ce5d''9c'[::-1]},
|
|
post_json={'requests': [
|
|
{'indexName': 'TVDB',
|
|
'params': '&'.join(
|
|
[f'query={terms}', 'maxValuesPerFacet=10', 'page=0',
|
|
'facetFilters=[["type:series", "type:person"]]',
|
|
'tagFilters=', 'analytics=false', 'advancedSyntax=true',
|
|
'highlightPreTag=__ais-highlight__', 'highlightPostTag=__/ais-highlight__'
|
|
])
|
|
}]},
|
|
session=requests.session(), headers=headers, parse_json=True, failure_monitor=False)
|
|
return src
|
|
except (KeyError, IndexError, Exception):
|
|
pass
|
|
|
|
@staticmethod
|
|
def _get_overview(show_data, language='eng'):
|
|
# type: (Dict, AnyStr) -> AnyStr
|
|
"""
|
|
internal helper to get english overview
|
|
:param show_data:
|
|
:param language:
|
|
"""
|
|
result = ''
|
|
if isinstance(show_data.get('translations'), dict) and 'overviewTranslations' in show_data['translations']:
|
|
try:
|
|
if trans := next(filter(
|
|
lambda _s: language == _s['language'], show_data['translations']['overviewTranslations']),
|
|
next(filter(
|
|
lambda _s: 'eng' == _s['language'], show_data['translations']['overviewTranslations']), None)):
|
|
|
|
result = trans['overview']
|
|
except (BaseException, Exception):
|
|
pass
|
|
elif isinstance(show_data, dict) and 'overviews' in show_data:
|
|
result = show_data['overviews'].get(language, show_data.get('overview'))
|
|
|
|
return clean_str(result)
|
|
|
|
def _get_series_name(self, show_data, language=None):
|
|
# type: (Dict, AnyStr) -> Tuple[Optional[AnyStr], List]
|
|
if 'nameTranslations' in show_data.get('translations', {}):
|
|
series_name = clean_data(
|
|
next(filter(lambda l: language and language == l['language'],
|
|
show_data.get('translations', {}).get('nameTranslations', [])),
|
|
{'name': show_data['name']})['name'])
|
|
else:
|
|
series_name = clean_data(show_data.get('translations', {}).get(language, show_data['name']))
|
|
series_aliases = self._get_aliases(show_data)
|
|
if not series_name and isinstance(series_aliases, list) and 0 < len(series_aliases):
|
|
series_name = series_aliases.pop(0)
|
|
return series_name, series_aliases
|
|
|
|
def _get_show_data(
|
|
self,
|
|
sid, # type: integer_types
|
|
language, # type: AnyStr
|
|
get_ep_info=False, # type: bool
|
|
banners=False, # type: bool
|
|
posters=False, # type: bool
|
|
seasons=False, # type: bool
|
|
seasonwides=False, # type: bool
|
|
fanart=False, # type: bool
|
|
actors=False, # type: bool
|
|
direct_data=False, # type: bool
|
|
**kwargs # type: Optional[Any]
|
|
):
|
|
# type: (...) -> Optional[bool, dict]
|
|
"""
|
|
internal function that should be overwritten in class to get data for given show id
|
|
:param sid: show id
|
|
:param language: language
|
|
:param get_ep_info: get episodes
|
|
:param banners: load banners
|
|
:param posters: load posters
|
|
:param seasons: load seasons
|
|
:param seasonwides: load seasonwides
|
|
:param fanart: load fanard
|
|
:param actors: load actors
|
|
:param direct_data: return pure data
|
|
"""
|
|
if not sid:
|
|
return False
|
|
|
|
resp = self._fetch_data(f'/series/{sid}/extended?meta=translations')
|
|
if direct_data:
|
|
return resp
|
|
|
|
if self._check_resp(dict, resp):
|
|
show_data = resp['data']
|
|
series_name, series_aliases = self._get_series_name(show_data, language)
|
|
if not series_name:
|
|
return False
|
|
|
|
ti_show = self.ti_shows[sid] # type: TVInfoShow
|
|
ti_show.banner_loaded = ti_show.poster_loaded = ti_show.fanart_loaded = True
|
|
ti_show.id = show_data['id']
|
|
ti_show.seriesname = series_name
|
|
ti_show.slug = clean_data(show_data.get('slug'))
|
|
ti_show.poster = clean_data(show_data.get('image'))
|
|
ti_show.firstaired = clean_data(show_data.get('firstAired'))
|
|
ti_show.rating = show_data.get('score')
|
|
ti_show.contentrating = ('contentRatings' in show_data and show_data['contentRatings']
|
|
and next((_r['name'] for _r in show_data['contentRatings'] or []
|
|
if 'usa' == _r['country']), None)) or None
|
|
ti_show.aliases = series_aliases
|
|
ti_show.status = clean_data(show_data['status']['name'])
|
|
ti_show.network_country = clean_data(show_data.get('originalCountry'))
|
|
ti_show.lastupdated = clean_data(show_data.get('lastUpdated'))
|
|
existing_networks = []
|
|
if 'latestNetwork' in show_data \
|
|
and isinstance(show_data['latestNetwork'].get('primaryCompanyType'), integer_types) \
|
|
and 1 == show_data['latestNetwork']['primaryCompanyType'] \
|
|
and show_data['latestNetwork']['country']:
|
|
ti_show.networks = [TVInfoNetwork(
|
|
name=clean_data(show_data['latestNetwork']['name']),
|
|
country=clean_data(show_data['latestNetwork']['country']),
|
|
active_date=clean_data(show_data['latestNetwork']['activeDate']),
|
|
inactive_date=clean_data(show_data['latestNetwork']['inactiveDate']))]
|
|
ti_show.network = clean_data(show_data['latestNetwork']['name'])
|
|
existing_networks.extend([ti_show.network])
|
|
ti_show.network_country = clean_data(show_data['latestNetwork']['country'])
|
|
if 'companies' in show_data and isinstance(show_data['companies'], list):
|
|
# filter networks
|
|
networks = sorted([_n for _n in show_data['companies'] if 1 == _n['companyType']['companyTypeId']
|
|
and _n['country']], key=lambda a: a['activeDate'] or '0000-00-00')
|
|
if networks:
|
|
ti_show.networks.extend([TVInfoNetwork(
|
|
name=clean_data(_n['name']), country=clean_data(_n['country']),
|
|
active_date=clean_data(_n['activeDate']), inactive_date=clean_data(_n['inactiveDate']))
|
|
for _n in networks if clean_data(_n['name']) not in existing_networks])
|
|
if not ti_show.network:
|
|
ti_show.network = clean_data(networks[-1]['name'])
|
|
ti_show.network_country = clean_data(networks[-1]['country'])
|
|
ti_show.language = clean_data(show_data.get('originalLanguage'))
|
|
ti_show.runtime = show_data.get('averageRuntime')
|
|
ti_show.airs_time = clean_data(show_data.get('airsTime'))
|
|
ti_show.airs_dayofweek = ', '.join([_k.capitalize()
|
|
for _k, _v in iteritems(show_data.get('airsDays')) if _v])
|
|
ti_show.genre_list = ('genres' in show_data and show_data['genres']
|
|
and [clean_data(_g['name']) for _g in show_data['genres']]) or []
|
|
ti_show.genre = '|'.join(ti_show.genre_list)
|
|
|
|
ids, social_ids = {}, {}
|
|
if 'remoteIds' in show_data and isinstance(show_data['remoteIds'], list):
|
|
for cur_rid in show_data['remoteIds']:
|
|
src_name = cur_rid['sourceName'].lower()
|
|
src_value = clean_data(cur_rid['id'])
|
|
src_type = source_types.get(cur_rid['type'])
|
|
if TVINFO_IMDB == src_type or 'imdb' in src_name:
|
|
try:
|
|
imdb_id = try_int(src_value.replace('tt', ''), None)
|
|
ids['imdb'] = imdb_id
|
|
except (BaseException, Exception):
|
|
pass
|
|
ti_show.imdb_id = src_value
|
|
elif TVINFO_TMDB == src_type or 'themoviedb' in src_name:
|
|
ids['tmdb'] = try_int(src_value, None)
|
|
elif TVINFO_TVMAZE == src_type or 'tv maze' in src_name:
|
|
ids['tvmaze'] = try_int(src_value, None)
|
|
elif TVINFO_OFFICIALSITE == src_type or 'official website' in src_name:
|
|
ti_show.official_site = src_value
|
|
elif TVINFO_FACEBOOK == src_type or 'facebook' in src_name:
|
|
social_ids['facebook'] = src_value
|
|
elif TVINFO_TWITTER == src_type or 'twitter' in src_name:
|
|
social_ids['twitter'] = src_value
|
|
elif TVINFO_INSTAGRAM == src_type or 'instagram' in src_name:
|
|
social_ids['instagram'] = src_value
|
|
elif TVINFO_REDDIT == src_type or 'reddit' in src_name:
|
|
social_ids['reddit'] = src_value
|
|
elif TVINFO_YOUTUBE == src_type or 'youtube' in src_name:
|
|
social_ids['youtube'] = src_value
|
|
elif TVINFO_WIKIPEDIA == src_type or 'wikipedia' in src_name:
|
|
social_ids['wikipedia'] = src_value
|
|
elif TVINFO_WIKIDATA == src_type or 'wikidata' in src_name:
|
|
social_ids['wikidata'] = src_value
|
|
elif TVINFO_TIKTOK == src_type or 'tiktok' in src_name:
|
|
social_ids['tiktok'] = src_value
|
|
elif TVINFO_LINKEDIN == src_type:
|
|
social_ids['linkedin'] = src_value
|
|
elif TVINFO_FANSITE == src_type:
|
|
social_ids['fansite'] = src_value
|
|
|
|
ti_show.ids = TVInfoIDs(tvdb=show_data['id'], **ids)
|
|
if social_ids:
|
|
ti_show.social_ids = TVInfoSocialIDs(**social_ids)
|
|
|
|
ti_show.overview = self._get_overview(show_data, language=language)
|
|
|
|
if 'artworks' in show_data and isinstance(show_data['artworks'], list):
|
|
poster = banner = fanart_url = False
|
|
for cur_art in sorted(show_data['artworks'], key=lambda a: a['score'], reverse=True):
|
|
img_type = img_type_map.get(cur_art['type'], TVInfoImageType.other)
|
|
if False is poster and img_type == TVInfoImageType.poster:
|
|
ti_show.poster, ti_show.poster_thumb, poster = cur_art['image'], cur_art['thumbnail'], True
|
|
elif False is banner and img_type == TVInfoImageType.banner:
|
|
ti_show.banner, ti_show.banner_thumb, banner = cur_art['image'], cur_art['thumbnail'], True
|
|
elif False is fanart_url and img_type == TVInfoImageType.fanart:
|
|
ti_show.fanart, fanart_url = cur_art['image'], True
|
|
ti_show['images'].setdefault(img_type, []).append(TVInfoImage(
|
|
image_type=img_type,
|
|
sizes={
|
|
TVInfoImageSize.original: cur_art['image'],
|
|
TVInfoImageSize.small: cur_art['thumbnail']
|
|
},
|
|
img_id=cur_art['id'],
|
|
has_text=enforce_type(cur_art.get('includesText'), (bool, NoneType), None),
|
|
lang=cur_art['language'],
|
|
rating=cur_art['score'],
|
|
updated_at=cur_art['updatedAt'] or None
|
|
))
|
|
|
|
if (actors or self.config['actors_enabled']) \
|
|
and not getattr(self.ti_shows.get(sid), 'actors_loaded', False):
|
|
cast, crew, ti_show.actors_loaded = CastList(), CrewList(), True
|
|
if isinstance(show_data.get('characters'), list):
|
|
for cur_char in sorted(show_data.get('characters') or [],
|
|
key=lambda c: (not c['isFeatured'], c['sort'])):
|
|
people_type = people_types.get(cur_char['type'])
|
|
if people_type not in (RoleTypes.ActorMain, RoleTypes.ActorGuest, RoleTypes.ActorSpecialGuest,
|
|
RoleTypes.ActorRecurring) \
|
|
and isinstance(cur_char['name'], string_types):
|
|
low_name = cur_char['name'].lower()
|
|
if 'presenter' in low_name:
|
|
people_type = RoleTypes.Presenter
|
|
elif 'interviewer' in low_name:
|
|
people_type = RoleTypes.Interviewer
|
|
elif 'host' in low_name:
|
|
people_type = RoleTypes.Host
|
|
if cur_char['episodeId']:
|
|
if RoleTypes.ActorMain == people_type:
|
|
people_type = RoleTypes.ActorGuest
|
|
elif RoleTypes.Presenter == people_type:
|
|
people_type = RoleTypes.PresenterGuest
|
|
elif RoleTypes.Interviewer == people_type:
|
|
people_type = RoleTypes.InterviewerGuest
|
|
elif RoleTypes.Host == people_type:
|
|
people_type = RoleTypes.HostGuest
|
|
if people_type in (RoleTypes.Presenter, RoleTypes.Interviewer, RoleTypes.Host) \
|
|
and not cur_char['name']:
|
|
cur_char['name'] = {RoleTypes.Presenter: 'Presenter', RoleTypes.Interviewer: 'Interviewer',
|
|
RoleTypes.Host: 'Host'}.get(people_type) or cur_char['name']
|
|
if None is people_type:
|
|
continue
|
|
if RoleTypes.crew_limit > people_type:
|
|
cast[people_type].append(TVInfoCharacter(
|
|
p_id=cur_char['id'], name=clean_data(cur_char['name'] or ''),
|
|
ids=TVInfoIDs(ids={TVINFO_TVDB: cur_char['id']}),
|
|
image=self._sanitise_image_uri(cur_char['image']),
|
|
regular=cur_char['isFeatured'],
|
|
person=[TVInfoPerson(
|
|
p_id=cur_char['peopleId'], name=clean_data(cur_char['personName'] or ''),
|
|
image=self._sanitise_image_uri(cur_char.get('personImgURL')),
|
|
ids=TVInfoIDs(ids={TVINFO_TVDB: cur_char['peopleId']})
|
|
)]
|
|
))
|
|
else:
|
|
crew[people_type].append(TVInfoPerson(
|
|
p_id=cur_char['peopleId'], name=clean_data(cur_char['personName'] or ''),
|
|
ids=TVInfoIDs(ids={TVINFO_TVDB: cur_char['peopleId']}),
|
|
image=self._sanitise_image_uri(cur_char.get('personImgURL'))
|
|
))
|
|
|
|
if not cast[RoleTypes.ActorMain]:
|
|
html = get_url(f'https://www.thetvdb.com/series/{ti_show.slug}/people')
|
|
if html:
|
|
try:
|
|
with BS4Parser(html) as soup:
|
|
rc_role = re.compile(r'/series/(?P<show_slug>[^/]+)/people/(?P<role_id>\d+)/?$')
|
|
rc_img = re.compile(r'/(?P<url>person/(?P<person_id>\d+)/(?P<img_id>[^/]+)\..*)')
|
|
rc_img_v3 = re.compile(r'/(?P<url>actors/(?P<img_id>[^/]+)\..*)')
|
|
max_people = 5
|
|
rc_clean = re.compile(r'[^a-z\d]')
|
|
for cur_role in soup.find_all('a', href=rc_role) or []:
|
|
try:
|
|
image, person_id = 2 * [None]
|
|
for cur_rc in (rc_img, rc_img_v3):
|
|
img_tag = cur_role.find('img', src=cur_rc)
|
|
if img_tag:
|
|
img_parsed = cur_rc.search(img_tag.get('src'))
|
|
image, person_id = [
|
|
_x in img_parsed.groupdict() and img_parsed.group(_x)
|
|
for _x in ('url', 'person_id')]
|
|
break
|
|
lines = [_x.strip() for _x in cur_role.get_text().split('\n')
|
|
if _x.strip()][0:2]
|
|
name = role = ''
|
|
if len(lines):
|
|
name = lines[0]
|
|
for cur_line in lines[1:]:
|
|
if cur_line.lower().startswith('as '):
|
|
role = cur_line[3:]
|
|
break
|
|
if not person_id and max_people:
|
|
max_people -= 1
|
|
results = self.search_tvs(name)
|
|
try:
|
|
for cur_result in (
|
|
isinstance(results, dict) and results.get('results') or []):
|
|
# sorts 'banners/images/missing/' to last before filter
|
|
people_filter = (
|
|
lambda r: 'person' == r['type'] and
|
|
rc_clean.sub(name, '') == rc_clean.sub(r['name'], ''),
|
|
cur_result.get('nbHits')
|
|
and sorted(cur_result.get('hits'),
|
|
key=lambda x: len(x['image']), reverse=True) or [])
|
|
if ENV.get('SG_DEV_MODE'):
|
|
for cur_person in filter(*people_filter):
|
|
new_keys = set(list(cur_person)).difference({
|
|
'_highlightResult', 'banner', 'id', 'image',
|
|
'is_tvdb_searchable', 'is_tvt_searchable', 'name',
|
|
'objectID', 'people_birthdate', 'people_died',
|
|
'poster', 'type', 'url'
|
|
})
|
|
if new_keys:
|
|
log.warning(
|
|
f'DEV_MODE: New _parse_actors tvdb attrs for'
|
|
f' {cur_person["id"]} {new_keys!r}')
|
|
|
|
person_ok = False
|
|
for cur_person in filter(*people_filter):
|
|
if image:
|
|
people_data = get_url(cur_person['url'])
|
|
person_ok = re.search(re.escape(image), people_data)
|
|
if not image or person_ok:
|
|
person_id = cur_person['id']
|
|
raise ValueError('value okay, id found')
|
|
except (BaseException, Exception):
|
|
pass
|
|
|
|
rid = int(rc_role.search(cur_role.get('href')).group('role_id'))
|
|
person_id = try_int(person_id, None)
|
|
image = image and f'https://artworks.thetvdb.com/banners/{image}' or None
|
|
# noinspection PyTypeChecker
|
|
cast[RoleTypes.ActorMain].append(TVInfoCharacter(
|
|
p_id=rid, name=clean_data(role),
|
|
ids={TVINFO_TVDB: rid},
|
|
image=image,
|
|
person=[TVInfoPerson(
|
|
p_id=person_id, name=clean_data(name), ids={TVINFO_TVDB: person_id}
|
|
)]
|
|
))
|
|
except(BaseException, Exception):
|
|
pass
|
|
except(BaseException, Exception):
|
|
pass
|
|
|
|
ti_show.cast = cast
|
|
ti_show.crew = crew
|
|
ti_show.actors = [dict(
|
|
character=dict(
|
|
id=_ch.id, name=_ch.name,
|
|
url=f'https://www.thetvdb.com/series/{show_data["slug"]}/people/{_ch.id}',
|
|
image=_ch.image
|
|
),
|
|
person=dict(
|
|
id=_ch.person and _ch.person[0].id, name=_ch.person and _ch.person[0].name,
|
|
url=_ch.person and f'https://www.thetvdb.com/people/{_ch.person[0].id}',
|
|
image=_ch.person and _ch.person[0].image,
|
|
birthday=try_date(_ch.birthdate), deathday=try_date(_ch.deathdate),
|
|
gender=None,
|
|
country=None
|
|
)
|
|
) for _ch in cast[RoleTypes.ActorMain]]
|
|
|
|
if get_ep_info and not getattr(self.ti_shows.get(sid), 'ep_loaded', False):
|
|
# fetch absolute numbers
|
|
abs_ep_nums = {}
|
|
if any(1 for _s in show_data.get('seasons', []) or [] if 'absolute' == _s.get('type', {}).get('type')):
|
|
page = 0
|
|
while 100 >= page:
|
|
resp = self._fetch_data(f'/series/{sid}/episodes/absolute?page={page:d}')
|
|
if isinstance(resp, dict) and isinstance((resp.get('data') or {}).get('episodes'), list):
|
|
abs_ep_nums.update({_e['id']: _e['number'] for _e in resp['data']['episodes']
|
|
if None is _e['seasons'] and _e['number']})
|
|
if None is not (page := self._next_page(resp, page)):
|
|
continue
|
|
break
|
|
|
|
# fetch alt numbers
|
|
alt_ep_nums, alt_ep_types, default_season_type = \
|
|
{}, {}, self.season_types.get(show_data.get('defaultSeasonType'))
|
|
for cur_alt_type in {_a.get('type', {}).get('type') for _a in show_data.get('seasons', []) or []
|
|
if _a.get('type', {}).get('type') not in ('absolute', default_season_type)}:
|
|
if any(1 for _s in show_data.get('seasons', []) or []
|
|
if cur_alt_type == _s.get('type', {}).get('type')):
|
|
page = 0
|
|
while 100 >= page:
|
|
resp = self._fetch_data(f'/series/{sid}/episodes/{cur_alt_type}?page={page:d}')
|
|
if isinstance(resp, dict) and isinstance((resp.get('data') or {}).get('episodes'), list):
|
|
for cur_ep in resp['data']['episodes']:
|
|
alt_ep_types.setdefault(
|
|
self.season_types.get(cur_alt_type, cur_alt_type),
|
|
{}).setdefault(cur_ep['id'], {}).update({
|
|
'season': cur_ep['seasonNumber'], 'episode': cur_ep['number'],
|
|
'name': cur_ep['name']})
|
|
alt_ep_nums.setdefault(cur_ep['id'], {}).update({
|
|
self.season_type_map.get(cur_alt_type, cur_alt_type):
|
|
{'season': cur_ep['seasonNumber'], 'episode': cur_ep['number']}})
|
|
if None is not (page := self._next_page(resp, page)):
|
|
continue
|
|
break
|
|
|
|
ep_lang = (language in (show_data.get('overviewTranslations') or []) and language) or 'eng'
|
|
page, ti_show.ep_loaded, eps_count = 0, True, 0
|
|
while 100 >= page:
|
|
resp = self._fetch_data(f'/series/{sid}/episodes/default/{ep_lang}?page={page:d}')
|
|
if isinstance(resp, dict) and isinstance((resp.get('data') or {}).get('episodes'), list):
|
|
links = 'next' in (resp.get('links') or '')
|
|
total_items = (links and resp.get('links', {}).get('total_items')) or None
|
|
page_size = (links and resp.get('links', {}).get('page_size')) or 500
|
|
eps_count += (len(resp['data']['episodes'])) or 0
|
|
full_page = page_size <= len(resp['data']['episodes'])
|
|
more = (links and None is not (page := self._next_page(resp, page))) or (
|
|
links and isinstance(total_items, integer_types) and eps_count < total_items)
|
|
alt_page = (full_page and not links)
|
|
if not alt_page:
|
|
self._set_episodes(ti_show, resp, abs_ep_nums, alt_ep_nums, alt_ep_types)
|
|
|
|
if not alt_page and more:
|
|
continue
|
|
|
|
if alt_page:
|
|
html = get_url(f'https://www.thetvdb.com/series/{ti_show.slug}/'
|
|
f'allseasons/{default_season_type}')
|
|
if not html:
|
|
raise TvdbError('Failed to get episodes for show')
|
|
|
|
api_sxe = [f's{_e["seasonNumber"]}e{_e["number"]}'
|
|
for _e in resp['data']['episodes']]
|
|
template_ep = resp['data']['episodes'][-1].copy()
|
|
try:
|
|
with BS4Parser(html) as soup:
|
|
for cur_ep in soup.find_all('li', class_='list-group-item'):
|
|
try:
|
|
heading = cur_ep.h4
|
|
sxe = [try_int(_n, None) for _n in
|
|
re.findall(r'(?i)s(?:pecial\s+)?(\d+)[ex](\d+)', clean_data(
|
|
heading.find('span', class_='episode-label').get_text()))[0]]
|
|
if None in sxe or 's{}e{}'.format(*sxe) in api_sxe:
|
|
continue
|
|
ep_season, ep_number = sxe
|
|
except(BaseException, Exception):
|
|
continue
|
|
|
|
try:
|
|
ep_name = clean_data(heading.a.get_text())
|
|
ep_link = heading.a['href'] # check link contains 'series'
|
|
ep_id = try_int(re.findall(r'episodes/(\d+$)', ep_link)[0], None)
|
|
except(BaseException, Exception):
|
|
ep_id = None
|
|
if None is ep_id:
|
|
continue
|
|
|
|
list_items = cur_ep.find('ul', class_='list-inline')
|
|
aired = try_date(list_items.find_all('li')[0].get_text())
|
|
|
|
row_items = cur_ep.find('div', class_='row')
|
|
try:
|
|
overview = clean_str(row_items.p.get_text())
|
|
except(BaseException, Exception):
|
|
overview = ''
|
|
|
|
try:
|
|
image = row_items.find('div', class_='col-xs-3').img['src']
|
|
except(BaseException, Exception):
|
|
image = None
|
|
|
|
new_ep = template_ep.copy()
|
|
new_ep.update(dict(
|
|
id=ep_id, name=ep_name, aired=aired, overview=overview,
|
|
image=image, imageType=11 if image and '/banner' in image else None,
|
|
number=ep_number, seasonNumber=ep_season
|
|
))
|
|
resp['data']['episodes'] += [new_ep]
|
|
except (BaseException, Exception):
|
|
pass
|
|
self._set_episodes(ti_show, resp, abs_ep_nums, alt_ep_nums, alt_ep_types)
|
|
break
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
def _sanitise_image_uri(self, image):
|
|
return isinstance(image, string_types) and 'http' != image[0:4] and \
|
|
f'{self.art_url}{image.lstrip("/")}' or image
|
|
|
|
def _set_episodes(self, ti_show, ep_data, abs_ep_nums, alt_ep_nums, alt_ep_types):
|
|
# type: (TVInfoShow, Dict, Dict, Dict, Dict) -> None
|
|
"""
|
|
populate a show with episode objects
|
|
"""
|
|
for cur_ep_data in ep_data['data']['episodes']:
|
|
for cur_key, cur_data in (
|
|
('seasonnumber', 'seasonNumber'), ('episodenumber', 'number'),
|
|
('episodename', 'name'), ('firstaired', 'aired'), ('runtime', 'runtime'),
|
|
('seriesid', 'seriesId'), ('id', 'id'), ('filename', 'image'), ('overview', 'overview'),
|
|
('absolute_number', 'abs'), ('finale_type', 'finaleType')):
|
|
season_num, ep_num = cur_ep_data['seasonNumber'], cur_ep_data['number']
|
|
|
|
if 'absolute_number' == cur_key:
|
|
value = abs_ep_nums.get(cur_ep_data['id'])
|
|
else:
|
|
value = clean_data(cur_ep_data.get(cur_data, getattr(empty_ep, cur_key)))
|
|
|
|
if 'finale_type' == cur_key:
|
|
value = tvdb_final_types.get(cur_ep_data.get(cur_data), None)
|
|
|
|
if 'image' == cur_data:
|
|
value = self._sanitise_image_uri(value)
|
|
|
|
if season_num not in ti_show:
|
|
ti_show[season_num] = TVInfoSeason(show=ti_show)
|
|
ti_show[season_num].number = season_num
|
|
if ep_num not in ti_show[season_num]:
|
|
ti_show[season_num][ep_num] = TVInfoEpisode(season=ti_show[season_num])
|
|
|
|
ti_show[season_num][ep_num][cur_key] = value
|
|
ti_show[season_num][ep_num].__dict__[cur_key] = value
|
|
ti_show[season_num][ep_num].alt_num = alt_ep_nums.get(cur_ep_data['id'], {})
|
|
for cur_et in alt_ep_types:
|
|
try:
|
|
ep = alt_ep_types[cur_et][cur_ep_data['id']]
|
|
if ep:
|
|
ti_show.alt_ep_numbering.setdefault(cur_et, {}).setdefault(ep['season'], {})[ep['episode']] = \
|
|
ti_show[season_num][ep_num]
|
|
except (BaseException, Exception):
|
|
continue
|
|
|
|
@staticmethod
|
|
def _get_network(show_data):
|
|
# type: (Dict) -> Optional[AnyStr]
|
|
if 'network' in show_data:
|
|
return clean_data(show_data['network'])
|
|
if show_data.get('companies'):
|
|
if isinstance(show_data['companies'][0], dict):
|
|
return clean_data(next(
|
|
filter(lambda a: 1 == a['companyType']['companyTypeId'], show_data['companies']),
|
|
{}).get('name'))
|
|
|
|
return clean_data(show_data['companies'][0])
|
|
|
|
@staticmethod
|
|
def _get_aliases(show_data):
|
|
if show_data.get('aliases') and isinstance(show_data['aliases'][0], dict):
|
|
return [clean_data(_a['name']) for _a in show_data['aliases']]
|
|
return clean_data(show_data.get('aliases', []))
|
|
|
|
@staticmethod
|
|
def _get_tvdb_id(dct):
|
|
try:
|
|
return try_int(dct.get('tvdb_id'), None) or try_int(re.sub(r'^.+-(\d+)$', r'\1', f'{dct["id"]}'), None)
|
|
except (BaseException, Exception):
|
|
return
|
|
|
|
@staticmethod
|
|
def _get_first_aired(show_data):
|
|
if isinstance(show_data, dict):
|
|
first_aired = clean_data(show_data.get('first_air_time') or show_data.get('firstAired') or
|
|
('0000' != show_data.get('year') and show_data.get('year')) or None)
|
|
if isinstance(first_aired, string_types) and re.search(r'(19|20)\d\d', first_aired):
|
|
return first_aired
|
|
|
|
@staticmethod
|
|
def _get_remote_ids(show_data, tvdb_id):
|
|
ids = {}
|
|
if 'remote_ids' in show_data and isinstance(show_data['remote_ids'], list):
|
|
for cur_rid in show_data['remote_ids']:
|
|
src_name = cur_rid['sourceName'].lower()
|
|
src_value = clean_data(cur_rid['id'])
|
|
if 'imdb' in src_name:
|
|
try:
|
|
imdb_id = try_int(src_value.replace('tt', ''), None)
|
|
ids['imdb'] = imdb_id
|
|
except (BaseException, Exception):
|
|
pass
|
|
elif 'themoviedb' in src_name:
|
|
ids['tmdb'] = try_int(src_value, None)
|
|
elif 'tv maze' in src_name:
|
|
ids['tvmaze'] = try_int(src_value, None)
|
|
return TVInfoIDs(tvdb=tvdb_id, **ids)
|
|
|
|
def _search_show(
|
|
self,
|
|
name=None, # type: Union[AnyStr, List[AnyStr]]
|
|
ids=None, # type: Dict[integer_types, integer_types]
|
|
lang=None, # type: Optional[string_types]
|
|
**kwargs):
|
|
# type: (...) -> List[Dict]
|
|
"""
|
|
internal search function to find shows, should be overwritten in class
|
|
:param name: name to search for
|
|
:param ids: dict of ids {tvid: prodid} to search for
|
|
"""
|
|
def _make_result_dict(show_data):
|
|
tvdb_id = self._get_tvdb_id(show_data)
|
|
series_name, series_aliases = self._get_series_name(show_data, language=lang)
|
|
if not series_name:
|
|
return []
|
|
|
|
ti_show = TVInfoShow()
|
|
if country := clean_data(show_data.get('country')) or []:
|
|
country = [country]
|
|
ti_show.seriesname, ti_show.id, ti_show.ids, \
|
|
ti_show.firstaired, ti_show.network, \
|
|
ti_show.overview, \
|
|
ti_show.poster, \
|
|
ti_show.status, \
|
|
ti_show.language, ti_show.origin_countries, \
|
|
ti_show.aliases, ti_show.slug, \
|
|
ti_show.genre_list \
|
|
= series_name, tvdb_id, self._get_remote_ids(show_data, tvdb_id), \
|
|
self._get_first_aired(show_data), self._get_network(show_data), \
|
|
self._get_overview(show_data, language=lang) or clean_str(show_data.get('overview')), \
|
|
show_data.get('image_url') or show_data.get('image'), \
|
|
clean_data(isinstance(show_data.get('status'), dict) and show_data.get('status', {}).get('name')
|
|
or show_data['status']), \
|
|
clean_data(show_data.get('primary_language')), country, \
|
|
series_aliases, clean_data(show_data.get('slug')), \
|
|
[clean_data(_g.get('name')) for _g in (show_data.get('genres') or [])]
|
|
|
|
ti_show.genre = '|'.join(ti_show.genre_list or [])
|
|
return [ti_show]
|
|
|
|
results = []
|
|
if ids:
|
|
for cur_tvinfo, cur_arg, cur_name in ((TVINFO_TVDB, lang, None), (TVINFO_IMDB, 'tt%07d', 'imdb'),
|
|
(TVINFO_TMDB, '%s', 'themoviedb'), (TVINFO_TVMAZE, '%s', 'tv maze')):
|
|
if not ids.get(cur_tvinfo):
|
|
continue
|
|
|
|
type_chk, query = list, None
|
|
if TVINFO_TVDB == cur_tvinfo:
|
|
resp = self._get_show_data(ids[cur_tvinfo], cur_arg, direct_data=True)
|
|
type_chk = dict
|
|
else:
|
|
query = cur_arg % ids[cur_tvinfo]
|
|
resp = self.get_cached_or_new(
|
|
f's-v4-id-{cur_tvinfo}-{ids[cur_tvinfo]}', 'search?meta=translations', remote_id=query,
|
|
query=query, type='series', expire=self.search_cache_expire,
|
|
no_cached=not self.config.get('cache_search'))
|
|
|
|
if self._check_resp(type_chk, resp):
|
|
if TVINFO_TVDB == cur_tvinfo:
|
|
results.extend(_make_result_dict(resp['data']))
|
|
continue
|
|
|
|
for cur_item in resp['data']:
|
|
try:
|
|
if query == next(filter(lambda b: cur_name in b['sourceName'].lower(),
|
|
cur_item.get('remote_ids', []) or []), {}).get('id'):
|
|
results.extend(_make_result_dict(cur_item))
|
|
break
|
|
except (BaseException, Exception):
|
|
pass
|
|
|
|
if ids.get(TVINFO_TVDB_SLUG) and isinstance(ids.get(TVINFO_TVDB_SLUG), string_types):
|
|
resp = self.get_cached_or_new(
|
|
f's-id-{TVINFO_TVDB}-{ids[TVINFO_TVDB_SLUG]}',
|
|
f'/series/slug/{ids.get(TVINFO_TVDB_SLUG)}?meta=translations', expire=self.search_cache_expire,
|
|
no_cached=not self.config.get('cache_search'))
|
|
|
|
if resp and self._check_resp(dict, resp) \
|
|
and ids.get(TVINFO_TVDB_SLUG).lower() == resp['data']['slug'].lower():
|
|
results.extend(_make_result_dict(resp['data']))
|
|
|
|
if name:
|
|
for cur_name in ([name], name)[isinstance(name, list)]:
|
|
resp = self.get_cached_or_new(
|
|
f's-v4-name-{cur_name}', 'search?meta=translations', query=cur_name, type='series',
|
|
expire=self.search_cache_expire, no_cached=not self.config.get('cache_search'))
|
|
|
|
if resp and self._check_resp(list, resp):
|
|
for cur_item in resp['data']:
|
|
results.extend(_make_result_dict(cur_item))
|
|
|
|
seen = set()
|
|
results = [seen.add(_r['id']) or _r for _r in results if _r['id'] not in seen]
|
|
return results
|
|
|
|
def _get_languages(self):
|
|
# type: (...) -> None
|
|
if self._check_resp(list, resp := self._fetch_data('/languages')):
|
|
TvdbAPIv4._supported_languages = [{
|
|
'id': clean_data(_r['id']), 'name': clean_data(_r['name']),
|
|
'nativeName': clean_data(_r['nativeName']),
|
|
'shortCode': clean_data(_r['shortCode']),
|
|
'sg_lang': self.reverse_map_languages.get(_r['id'], _r['id'])} for _r in resp['data']]
|
|
else:
|
|
TvdbAPIv4._supported_languages = []
|
|
|
|
def _get_filtered_series(self, result_count=100, **kwargs):
|
|
# type: (...) -> List[TVInfoShow]
|
|
result = []
|
|
page, cc = 0, 0
|
|
while 100 > page and cc < result_count:
|
|
if self._check_resp(list, resp := self._fetch_data('/series/filter', page=page, **kwargs)) \
|
|
and len(resp['data']):
|
|
for cur_item in resp['data']:
|
|
cc += 1
|
|
if cc > result_count:
|
|
break
|
|
_ti = TVInfoShow()
|
|
_ti.id, _ti.seriesname, _ti.firstaired, \
|
|
_ti.overview, _ti.ids, \
|
|
_ti.poster, _ti.language, \
|
|
_ti.origin_countries, _ti.rating, _ti.slug \
|
|
= cur_item['id'], clean_data(cur_item['name']), self._get_first_aired(cur_item), \
|
|
clean_str(cur_item['overview']), TVInfoIDs(tvdb=cur_item['id']), \
|
|
self._sanitise_image_uri(cur_item['image']), clean_data(cur_item['originalLanguage']), \
|
|
clean_data([cur_item['originalCountry']]), cur_item['score'], clean_data(cur_item['slug'])
|
|
result.append(_ti)
|
|
if None is not (page := self._next_page(resp, page)):
|
|
continue
|
|
break
|
|
return result
|
|
|
|
def discover(self, result_count=100, get_extra_images=False, **kwargs):
|
|
# type: (integer_types, bool, Optional[Any]) -> List[TVInfoShow]
|
|
return self._get_filtered_series(result_count=result_count, status=status_names['Upcoming']['id'],
|
|
sort='firstAired', sortType='asc', lang='eng')
|
|
|
|
def get_top_rated(self, result_count=100, year=None, in_last_year=False, **kwargs):
|
|
# type: (integer_types, integer_types, bool, Optional[Any]) -> List[TVInfoShow]
|
|
kw = dict(sort='score', sortType='desc', lang='eng')
|
|
if in_last_year:
|
|
t = datetime.date.today()
|
|
y = t.year
|
|
ly = (t - datetime.timedelta(days=365)).strftime('%Y-%m-%d')
|
|
this_year = self._get_filtered_series(result_count=result_count, year=y, **kw)
|
|
last_year = [_l for _l in self._get_filtered_series(result_count=result_count, year=y-1, **kw)
|
|
if 10 == len(_l.firstaired or '') and _l.firstaired > ly]
|
|
return sorted(this_year + last_year, key=lambda a: a.rating, reverse=True)[:result_count]
|
|
elif isinstance(year, int):
|
|
kw['year'] = year
|
|
return self._get_filtered_series(result_count=result_count, **kw)
|