SickGear/lib/api_tvdb/tvdb_api_v4.py
Prinz23 9f875601e4 Fix for caching.
Simplification.
2024-10-07 01:24:45 +01:00

1199 lines
62 KiB
Python

# encoding:utf-8
# author:Prinz23
# project:tvdb_api_v4
__author__ = 'Prinz23'
__version__ = '1.0'
__api_version__ = '1.0.0'
import base64
import datetime
import logging
import re
from bs4_parser import BS4Parser
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .tvdb_exceptions import TvdbError, TvdbTokenFailure
from exceptions_helper import ex
from lib.dateutil.parser import parser
# noinspection PyProtectedMember
from lib.exceptions_helper import ConnectionSkipException
from lib.tvinfo_base import (
CastList, CrewList, PersonGenders, RoleTypes,
TVInfoBase, TVInfoCharacter, TVInfoEpisode, TVInfoIDs, TVInfoImage, TVInfoImageSize, TVInfoImageType, TVInfoNetwork,
TVInfoPerson, TVInfoSeason, TVInfoSeasonTypes, TVInfoShow, TVInfoSocialIDs,
TVINFO_FACEBOOK, TVINFO_FANSITE, TVINFO_IMDB, TVINFO_INSTAGRAM, TVINFO_LINKEDIN, TVINFO_OFFICIALSITE, TVINFO_REDDIT,
TVINFO_MID_SEASON_FINALE, TVINFO_SEASON_FINALE, TVINFO_SERIES_FINALE, TVINFO_TIKTOK, TVINFO_TMDB, TVINFO_TVDB,
TVINFO_TVDB_SLUG, TVINFO_TVMAZE, TVINFO_TWITTER, TVINFO_WIKIDATA, TVINFO_WIKIPEDIA, TVINFO_YOUTUBE)
from sg_helpers import clean_data, clean_str, enforce_type, get_url, try_date, try_int
from sickgear import ENV
from six import integer_types, iteritems, PY3, string_types
# noinspection PyUnreachableCode
if False:
from typing import Any, AnyStr, Dict, List, Optional, Tuple, Union
log = logging.getLogger('tvdb_v4.api')
log.addHandler(logging.NullHandler())
TVDB_API_CONFIG = {}
NoneType = type(None)
# always use https in cases of redirects
# noinspection PyUnusedLocal,HttpUrlsUsage
def _record_hook(r, *args, **kwargs):
r.hook_called = True
if r.status_code in (301, 302, 303, 307, 308) and \
isinstance(r.headers.get('Location'), string_types) and r.headers.get('Location').startswith('http://'):
r.headers['Location'] = r.headers['Location'].replace('http://', 'https://')
return r
# noinspection PyUnresolvedReferences
class RequestsAuthBase(requests.auth.AuthBase):
# inherit the Requests dynamic packaging here in order to isolate a pyc non-inspection directive
pass
class TvdbAuth(RequestsAuthBase):
_token = None
def __init__(self):
pass
def reset_token(self):
self._token = None
@staticmethod
def apikey():
string = TVDB_API_CONFIG['api_params']['apikey_v4']
key = TVDB_API_CONFIG['api_params']['apikey']
string = base64.urlsafe_b64decode(string + b'===')
string = string.decode('latin') if PY3 else string
encoded_chars = []
for i in range(len(string)):
key_c = key[i % len(key)]
encoded_c = chr((ord(string[i]) - ord(key_c) + 256) % 256)
encoded_chars.append(encoded_c)
encoded_string = ''.join(encoded_chars)
return encoded_string
def get_token(self):
url = f'{TvdbAPIv4.base_url}{"login"}'
params = {'apikey': self.apikey()}
resp = get_url(url, post_json=params, parse_json=True, raise_skip_exception=True)
if resp and isinstance(resp, dict):
if 'status' in resp:
if 'failure' == resp['status']:
raise TvdbTokenFailure(f'Failed to Authenticate. {resp.get("message", "")}')
if 'success' == resp['status'] and 'data' in resp and isinstance(resp['data'], dict) \
and 'token' in resp['data']:
self._token = resp['data']['token']
return True
else:
raise TvdbTokenFailure('Failed to get Tvdb Token')
@property
def token(self):
if not self._token:
self.get_token()
return self._token
def handle_401(self, r, **kwargs):
if 401 == r.status_code and not any(401 == _or.status_code for _or in r.history):
self.reset_token()
self.get_token()
if self._token:
prep = r.request.copy()
prep.headers['Authorization'] = f'Bearer {self._token}'
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
return r
def __call__(self, r):
r.headers["Authorization"] = f'Bearer {self.token}'
r.register_hook('response', self.handle_401)
return r
DEFAULT_TIMEOUT = 30 # seconds
class TimeoutHTTPAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
self.timeout = DEFAULT_TIMEOUT
if "timeout" in kwargs:
self.timeout = kwargs["timeout"]
del kwargs["timeout"]
super(TimeoutHTTPAdapter, self).__init__(*args, **kwargs)
def send(self, request, **kwargs):
timeout = kwargs.get("timeout")
if timeout is None:
kwargs["timeout"] = self.timeout
return super(TimeoutHTTPAdapter, self).send(request, **kwargs)
s = requests.Session()
retries = Retry(total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=['HEAD', 'GET', 'PUT', 'DELETE', 'OPTIONS', 'TRACE', 'POST'])
# noinspection HttpUrlsUsage
s.mount('http://', HTTPAdapter(TimeoutHTTPAdapter(max_retries=retries)))
s.mount('https://', HTTPAdapter(TimeoutHTTPAdapter(max_retries=retries)))
base_request_para = dict(session=s, hooks={'response': _record_hook}, raise_skip_exception=True, auth=TvdbAuth())
# Query TVdb endpoints
def tvdb_endpoint_fetch(*args, **kwargs):
kwargs.update(base_request_para)
return get_url(*args, **kwargs)
img_type_map = {
1: TVInfoImageType.banner, # series
2: TVInfoImageType.poster, # series
3: TVInfoImageType.fanart, # series
6: TVInfoImageType.season_banner, # season
7: TVInfoImageType.season_poster, # season
8: TVInfoImageType.season_fanart, # season
13: TVInfoImageType.person_poster, # person
}
people_types = {
1: RoleTypes.CrewDirector, # 'Director',
2: RoleTypes.CrewWriter, # 'Writer'
3: RoleTypes.ActorMain, # 'Actor'
4: RoleTypes.ActorGuest, # 'Guest Star',
5: RoleTypes.CrewOther, # 'Crew',
6: RoleTypes.CrewCreator, # 'Creator',
7: RoleTypes.CrewProducer, # 'Producer',
8: RoleTypes.CrewShowrunner, # 'Showrunner',
9: RoleTypes.MusicalGuest, # 'Musical Guest',
10: RoleTypes.Host, # 'Host',
11: RoleTypes.CrewExecutiveProducer, # 'Executive Producer',
}
source_types = {
2: TVINFO_IMDB, # title
# 3: TVINFO_ZAP2IT,
4: TVINFO_OFFICIALSITE,
5: TVINFO_FACEBOOK,
6: TVINFO_TWITTER,
7: TVINFO_REDDIT,
8: TVINFO_FANSITE,
9: TVINFO_INSTAGRAM,
10: TVINFO_TMDB, # movie
11: TVINFO_YOUTUBE,
12: TVINFO_TMDB, # tv
# 13: TVINFO_EIDR, # content
# 14: TVINFO_EIDR, # party
15: TVINFO_TMDB, # person
16: TVINFO_IMDB, # person
17: TVINFO_IMDB, # company
18: TVINFO_WIKIDATA,
19: TVINFO_TVMAZE, # title
20: TVINFO_LINKEDIN,
21: TVINFO_TVMAZE, # person
22: TVINFO_TVMAZE, # season
23: TVINFO_TVMAZE, # episode
24: TVINFO_WIKIPEDIA,
25: TVINFO_TIKTOK,
26: TVINFO_LINKEDIN, # company
27: TVINFO_TVMAZE, # company
28: TVINFO_TMDB, # collection
29: TVINFO_TMDB, # collection
}
people_types_reverse = {_v: _k for _k, _v in iteritems(people_types)}
empty_ep = TVInfoEpisode()
tz_p = parser()
status_ids = {
1: {'name': 'Continuing', 'recordType': 'series', 'keepUpdated': False},
2: {'name': 'Ended', 'recordType': 'series', 'keepUpdated': False},
3: {'name': 'Upcoming', 'recordType': 'series', 'keepUpdated': False}
}
status_names = {
'Continuing': {'id': 1, 'recordType': 'series', 'keepUpdated': False},
'Ended': {'id': 2, 'recordType': 'series', 'keepUpdated': False},
'Upcoming': {'id': 3, 'recordType': 'series', 'keepUpdated': False}
}
tvdb_final_types = {
'series': TVINFO_SERIES_FINALE,
'season': TVINFO_SEASON_FINALE,
'midseason': TVINFO_MID_SEASON_FINALE
}
class TvdbAPIv4(TVInfoBase):
supported_id_searches = [TVINFO_TVDB, TVINFO_TVDB_SLUG, TVINFO_IMDB, TVINFO_TMDB, TVINFO_TVMAZE]
supported_person_id_searches = [TVINFO_TVDB, TVINFO_IMDB, TVINFO_TMDB, TVINFO_TVMAZE]
base_url = 'https://api4.thetvdb.com/v4/'
art_url = 'https://artworks.thetvdb.com/'
season_types = {1: 'official', 2: 'dvd', 3: 'absolute', 4: 'alternate', 5: 'regional', 6: 'altdvd'}
season_type_map = {season_types[1]: TVInfoSeasonTypes.official, season_types[2]: TVInfoSeasonTypes.dvd}
def __init__(self, banners=False, posters=False, seasons=False, seasonwides=False, fanart=False, actors=False,
dvdorder=False, *args, **kwargs):
super(TvdbAPIv4, self).__init__(banners, posters, seasons, seasonwides, fanart, actors, dvdorder, *args,
**kwargs)
def _fetch_data(self, endpoint, **kwargs):
# type: (string_types, Any) -> Any
if is_series_info := endpoint.startswith('/series/'):
self.show_not_found = False
try:
return tvdb_endpoint_fetch(url=f'{self.base_url}{endpoint}', params=kwargs, parse_json=True,
raise_status_code=True, raise_exceptions=True)
except ConnectionSkipException as e:
raise e
except requests.exceptions.HTTPError as e:
if 401 == e.response.status_code:
raise TvdbTokenFailure('Failed to get new Token')
elif 404 == e.response.status_code:
if is_series_info:
self.show_not_found = True
self.not_found = True
elif 404 != e.response.status_code:
raise TvdbError(ex(e))
except (BaseException, Exception) as e:
raise TvdbError(ex(e))
@staticmethod
def _check_resp(type_chk=list, data=None):
return isinstance(data, dict) and all(_k in data for _k in ('data', 'status')) \
and 'success' == data['status'] and isinstance(data['data'], type_chk)
@staticmethod
def _next_page(resp, page):
page += 1
if f'?page={page}' in ((resp.get('links') or {}).get('next') or ''):
return page
def _convert_person(self, p, ids=None):
# type: (Dict, Dict) -> List[TVInfoPerson]
ch, ids = [], ids or {}
for cur_c in sorted(filter(
lambda a: (3 == a['type'] or 'Actor' == a['peopleType'])
and a['name'] and a['seriesId'] and not a.get('episodeId'),
p.get('characters') or []),
key=lambda a: (not a['isFeatured'], a['sort'])):
ti_show = TVInfoShow()
ti_show.id = clean_data(cur_c['seriesId'])
ti_show.ids = TVInfoIDs(ids={TVINFO_TVDB: ti_show.id})
ti_show.seriesname = clean_data(('series' in cur_c and cur_c['series'] and cur_c['series']['name']))
ti_show.poster = self._sanitise_image_uri(('series' in cur_c and cur_c['series']
and cur_c['series']['image']))
ti_show.firstaired = self._get_first_aired(('series' in cur_c and cur_c['series']))
ch.append(TVInfoCharacter(
id=cur_c['id'], ids=TVInfoIDs(ids={TVINFO_TVDB: cur_c['id']}),
name=clean_data(cur_c['name'] or ''),
image=self._sanitise_image_uri(cur_c.get('image')),
regular=cur_c['isFeatured'],
ti_show=ti_show
))
try:
b_date = clean_data(p.get('birth'))
birthdate = (b_date and '0000-00-00' != b_date and tz_p.parse(b_date).date()) or None
except (BaseException, Exception):
birthdate = None
try:
d_date = clean_data(p.get('death'))
deathdate = (d_date and '0000-00-00' != d_date and tz_p.parse(d_date).date()) or None
except (BaseException, Exception):
deathdate = None
p_tvdb_id = self._get_tvdb_id(p)
ids.update({TVINFO_TVDB: p_tvdb_id})
social_ids, official_site = {}, None
if 'remoteIds' in p and isinstance(p['remoteIds'], list):
for cur_rid in p['remoteIds']:
if not (src_value := clean_data(cur_rid['id'])):
continue
src_name = cur_rid['sourceName'].lower()
src_type = source_types.get(cur_rid['type'])
if TVINFO_IMDB == src_type or 'imdb' in src_name:
try:
imdb_id = try_int(f'{src_value}'.replace('nm', ''), None)
ids[TVINFO_IMDB] = imdb_id
except (BaseException, Exception):
pass
elif TVINFO_TMDB == src_type or 'themoviedb' in src_name:
ids[TVINFO_TMDB] = try_int(src_value, None)
elif TVINFO_TVMAZE == src_type or 'tv maze' in src_name:
ids[TVINFO_TVMAZE] = try_int(src_value, None)
elif TVINFO_OFFICIALSITE == src_type or 'official website' in src_name:
official_site = src_value
elif TVINFO_FACEBOOK == src_type or 'facebook' in src_name:
social_ids[TVINFO_FACEBOOK] = src_value
elif TVINFO_TWITTER == src_type or 'twitter' in src_name:
social_ids[TVINFO_TWITTER] = src_value
elif TVINFO_INSTAGRAM == src_type or 'instagram' in src_name:
social_ids[TVINFO_INSTAGRAM] = src_value
elif TVINFO_REDDIT == src_type or 'reddit' in src_name:
social_ids[TVINFO_REDDIT] = src_value
elif TVINFO_YOUTUBE == src_type or 'youtube' in src_name:
social_ids[TVINFO_YOUTUBE] = src_value
elif TVINFO_WIKIPEDIA == src_type or 'wikipedia' in src_name:
social_ids[TVINFO_WIKIPEDIA] = src_value
elif TVINFO_WIKIDATA == src_type or 'wikidata' in src_name:
social_ids[TVINFO_WIKIDATA] = src_value
elif TVINFO_TIKTOK == src_type or 'tiktok' in src_name:
social_ids[TVINFO_TIKTOK] = src_value
elif TVINFO_LINKEDIN == src_type:
social_ids[TVINFO_LINKEDIN] = src_value
elif TVINFO_FANSITE == src_type:
social_ids[TVINFO_FANSITE] = src_value
bio = clean_data(
next((_cp.get('biography') for _cp in p.get('biographies') or [] if 'eng' == _cp.get('language')), None)) \
or None
return [TVInfoPerson(
p_id=p_tvdb_id, name=clean_data(p['name'] or ''),
image=self._sanitise_image_uri(p.get('image') or p.get('image_url')),
gender=PersonGenders.tvdb_map.get(p.get('gender'), PersonGenders.unknown), birthdate=birthdate,
deathdate=deathdate, birthplace=clean_data(p.get('birthPlace')),
akas=set(clean_data((isinstance(_a, dict) and _a['name']) or _a) for _a in p.get('aliases') or []),
bio=bio, ids=TVInfoIDs(ids=ids), social_ids=TVInfoSocialIDs(ids=social_ids), homepage=official_site,
characters=ch
)]
def get_cached_or_url(self, url, cache_key, try_cache=False, expire=None, **kwargs):
# type: (AnyStr, AnyStr, Optional[bool], Optional[int], ...) -> Union[Dict, List, NoneType]
"""
get cached or new data from url
:param url: url
:param cache_key: cache key
:param try_cache: Nonetype to ignore the cache, True to use cache, otherwise False to check cache config
:param expire: expire time for caching
:param kwargs: extra parameter for fetching data
"""
is_none, resp = False, None
if use_cache := False if None is try_cache else try_cache or bool(self.config.get('cache_search')):
is_none, resp = self._get_cache_entry(cache_key)
if not use_cache or (None is resp and not is_none):
try:
resp = self._fetch_data(url, **kwargs)
self._set_cache_entry(cache_key, resp, expire=expire)
except (BaseException, Exception):
resp = None
return resp
def get_person(self, p_id, get_show_credits=False, get_images=False, try_cache=True, **kwargs):
# type: (integer_types, bool, bool, bool, ...) -> Optional[TVInfoPerson]
"""
get person's data for id or list of matching persons for name
:param p_id: persons id
:param get_show_credits: get show credits
:param get_images: get person images
:param try_cache: use cached data if available
:return: person object or None
"""
if bool(p_id) and self._check_resp(dict, resp := self.get_cached_or_url(
f'/people/{p_id}/extended',
f'p-v4-{p_id}', try_cache=try_cache)):
return self._convert_person(resp['data'])[0]
def _search_person(self, name=None, ids=None):
# type: (AnyStr, Dict[integer_types, integer_types]) -> List[TVInfoPerson]
"""
search for person by name
:param name: text to search for
:param ids: dict of ids to search
:return: list of found person's
"""
urls, result, ids = [], [], ids or {}
for cur_tvinfo in self.supported_person_id_searches:
if cur_tvinfo in ids:
if TVINFO_TVDB == cur_tvinfo and (resp := self.get_person(ids[cur_tvinfo])):
result.append(resp)
elif cur_tvinfo in (TVINFO_IMDB, TVINFO_TMDB, TVINFO_TVMAZE):
if TVINFO_IMDB == cur_tvinfo:
url = f'search/remoteid/nm{ids.get(TVINFO_IMDB):07d}'
elif cur_tvinfo in (TVINFO_TMDB, TVINFO_TVMAZE):
url = f'search/remoteid/{ids.get(cur_tvinfo)}'
else:
continue
if self._check_resp(list, resp := self.get_cached_or_url(
url, f'p-v4-id-{cur_tvinfo}-{ids[cur_tvinfo]}', expire=self.search_cache_expire)):
for cur_resp in resp['data']:
if isinstance(cur_resp, dict) and 'people' in cur_resp:
if p_d := None if 1 != len(cur_resp) else self.get_person(cur_resp['people']['id']):
result.append(p_d)
else:
result.extend(self._convert_person(cur_resp['people'], ids))
break
if name and self._check_resp(list, resp := self.get_cached_or_url(
'/search',
f'p-v4-src-text-{name}', expire=self.search_cache_expire,
query=name, type='people')):
for cur_resp in resp['data']:
result.extend(self._convert_person(cur_resp))
seen = set()
result = [seen.add(_r.id) or _r for _r in result if _r.id not in seen]
return result
def search_tvs(self, terms, language=None):
# type: (Union[int, AnyStr], Optional[AnyStr]) -> Optional[dict]
from random import choice
sg_lang = next(filter(lambda x: language == x['id'], self.get_languages()), {}).get('sg_lang')
headers = {'Accept-Encoding': 'gzip,deflate'}
if None is not sg_lang: # and sg_lang in self.config['valid_languages']:
headers.update({'Accept-Language': sg_lang})
try:
src = get_url(
'https://tvshow''time-%s.algo''lia.net/1/'
'indexes/*/queries' % choice([1, 2, 3, 'dsn']),
params={'x-algo''lia-agent': 'Alg''olia for vani''lla JavaScript (lite) 3.3''2.0;'
'instant''search.js (3.5''.3);JS Helper (2.2''8.0)',
'x-algo''lia''-app''lication-id': 'tvshow''time',
'x-algo''lia''-ap''i-key': '3d''978dd96c457390f21cec6131ce5d''9c'[::-1]},
post_json={'requests': [
{'indexName': 'TVDB',
'params': '&'.join(
[f'query={terms}', 'maxValuesPerFacet=10', 'page=0',
'facetFilters=[["type:series", "type:person"]]',
'tagFilters=', 'analytics=false', 'advancedSyntax=true',
'highlightPreTag=__ais-highlight__', 'highlightPostTag=__/ais-highlight__'
])
}]},
session=requests.session(), headers=headers, parse_json=True, failure_monitor=False)
return src
except (KeyError, IndexError, Exception):
pass
@staticmethod
def _get_overview(show_data, language='eng'):
# type: (Dict, AnyStr) -> AnyStr
"""
internal helper to get english overview
:param show_data:
:param language:
"""
result = ''
if isinstance(show_data.get('translations'), dict) and 'overviewTranslations' in show_data['translations']:
try:
if trans := next(filter(
lambda _s: language == _s['language'], show_data['translations']['overviewTranslations']),
next(filter(
lambda _s: 'eng' == _s['language'], show_data['translations']['overviewTranslations']), None)):
result = trans['overview']
except (BaseException, Exception):
pass
elif isinstance(show_data, dict) and 'overviews' in show_data:
result = show_data['overviews'].get(language, show_data.get('overview'))
return clean_str(result)
def _get_series_name(self, show_data, language=None):
# type: (Dict, AnyStr) -> Tuple[Optional[AnyStr], List]
if 'nameTranslations' in show_data.get('translations', {}):
series_name = clean_data(
next(filter(lambda l: language and language == l['language'],
show_data.get('translations', {}).get('nameTranslations', [])),
{'name': show_data['name']})['name'])
else:
series_name = clean_data(show_data.get('translations', {}).get(language, show_data['name']))
series_aliases = self._get_aliases(show_data)
if not series_name and isinstance(series_aliases, list) and 0 < len(series_aliases):
series_name = series_aliases.pop(0)
return series_name, series_aliases
def _get_show_data(
self,
sid, # type: integer_types
language, # type: AnyStr
get_ep_info=False, # type: bool
banners=False, # type: bool
posters=False, # type: bool
seasons=False, # type: bool
seasonwides=False, # type: bool
fanart=False, # type: bool
actors=False, # type: bool
direct_data=False, # type: bool
**kwargs # type: Optional[Any]
):
# type: (...) -> Optional[bool, dict]
"""
internal function that should be overwritten in class to get data for given show id
:param sid: show id
:param language: language
:param get_ep_info: get episodes
:param banners: load banners
:param posters: load posters
:param seasons: load seasons
:param seasonwides: load seasonwides
:param fanart: load fanard
:param actors: load actors
:param direct_data: return pure data
"""
if not sid:
return False
resp = self._fetch_data(f'/series/{sid}/extended?meta=translations')
if direct_data:
return resp
if self._check_resp(dict, resp):
show_data = resp['data']
series_name, series_aliases = self._get_series_name(show_data, language)
if not series_name:
return False
ti_show = self.ti_shows[sid] # type: TVInfoShow
ti_show.banner_loaded = ti_show.poster_loaded = ti_show.fanart_loaded = True
ti_show.id = show_data['id']
ti_show.seriesname = series_name
ti_show.slug = clean_data(show_data.get('slug'))
ti_show.poster = clean_data(show_data.get('image'))
ti_show.firstaired = clean_data(show_data.get('firstAired'))
ti_show.rating = show_data.get('score')
ti_show.contentrating = ('contentRatings' in show_data and show_data['contentRatings']
and next((_r['name'] for _r in show_data['contentRatings'] or []
if 'usa' == _r['country']), None)) or None
ti_show.aliases = series_aliases
ti_show.status = clean_data(show_data['status']['name'])
ti_show.network_country = clean_data(show_data.get('originalCountry'))
ti_show.lastupdated = clean_data(show_data.get('lastUpdated'))
existing_networks = []
if 'latestNetwork' in show_data \
and isinstance(show_data['latestNetwork'].get('primaryCompanyType'), integer_types) \
and 1 == show_data['latestNetwork']['primaryCompanyType'] \
and show_data['latestNetwork']['country']:
ti_show.networks = [TVInfoNetwork(
name=clean_data(show_data['latestNetwork']['name']),
country=clean_data(show_data['latestNetwork']['country']),
active_date=clean_data(show_data['latestNetwork']['activeDate']),
inactive_date=clean_data(show_data['latestNetwork']['inactiveDate']))]
ti_show.network = clean_data(show_data['latestNetwork']['name'])
existing_networks.extend([ti_show.network])
ti_show.network_country = clean_data(show_data['latestNetwork']['country'])
if 'companies' in show_data and isinstance(show_data['companies'], list):
# filter networks
networks = sorted([_n for _n in show_data['companies'] if 1 == _n['companyType']['companyTypeId']
and _n['country']], key=lambda a: a['activeDate'] or '0000-00-00')
if networks:
ti_show.networks.extend([TVInfoNetwork(
name=clean_data(_n['name']), country=clean_data(_n['country']),
active_date=clean_data(_n['activeDate']), inactive_date=clean_data(_n['inactiveDate']))
for _n in networks if clean_data(_n['name']) not in existing_networks])
if not ti_show.network:
ti_show.network = clean_data(networks[-1]['name'])
ti_show.network_country = clean_data(networks[-1]['country'])
ti_show.language = clean_data(show_data.get('originalLanguage'))
ti_show.runtime = show_data.get('averageRuntime')
ti_show.airs_time = clean_data(show_data.get('airsTime'))
ti_show.airs_dayofweek = ', '.join([_k.capitalize()
for _k, _v in iteritems(show_data.get('airsDays')) if _v])
ti_show.genre_list = ('genres' in show_data and show_data['genres']
and [clean_data(_g['name']) for _g in show_data['genres']]) or []
ti_show.genre = '|'.join(ti_show.genre_list)
ids, social_ids = {}, {}
if 'remoteIds' in show_data and isinstance(show_data['remoteIds'], list):
for cur_rid in show_data['remoteIds']:
src_name = cur_rid['sourceName'].lower()
src_value = clean_data(cur_rid['id'])
src_type = source_types.get(cur_rid['type'])
if TVINFO_IMDB == src_type or 'imdb' in src_name:
try:
imdb_id = try_int(src_value.replace('tt', ''), None)
ids['imdb'] = imdb_id
except (BaseException, Exception):
pass
ti_show.imdb_id = src_value
elif TVINFO_TMDB == src_type or 'themoviedb' in src_name:
ids['tmdb'] = try_int(src_value, None)
elif TVINFO_TVMAZE == src_type or 'tv maze' in src_name:
ids['tvmaze'] = try_int(src_value, None)
elif TVINFO_OFFICIALSITE == src_type or 'official website' in src_name:
ti_show.official_site = src_value
elif TVINFO_FACEBOOK == src_type or 'facebook' in src_name:
social_ids['facebook'] = src_value
elif TVINFO_TWITTER == src_type or 'twitter' in src_name:
social_ids['twitter'] = src_value
elif TVINFO_INSTAGRAM == src_type or 'instagram' in src_name:
social_ids['instagram'] = src_value
elif TVINFO_REDDIT == src_type or 'reddit' in src_name:
social_ids['reddit'] = src_value
elif TVINFO_YOUTUBE == src_type or 'youtube' in src_name:
social_ids['youtube'] = src_value
elif TVINFO_WIKIPEDIA == src_type or 'wikipedia' in src_name:
social_ids['wikipedia'] = src_value
elif TVINFO_WIKIDATA == src_type or 'wikidata' in src_name:
social_ids['wikidata'] = src_value
elif TVINFO_TIKTOK == src_type or 'tiktok' in src_name:
social_ids['tiktok'] = src_value
elif TVINFO_LINKEDIN == src_type:
social_ids['linkedin'] = src_value
elif TVINFO_FANSITE == src_type:
social_ids['fansite'] = src_value
ti_show.ids = TVInfoIDs(tvdb=show_data['id'], **ids)
if social_ids:
ti_show.social_ids = TVInfoSocialIDs(**social_ids)
ti_show.overview = self._get_overview(show_data, language=language)
if 'artworks' in show_data and isinstance(show_data['artworks'], list):
poster = banner = fanart_url = False
for cur_art in sorted(show_data['artworks'], key=lambda a: a['score'], reverse=True):
img_type = img_type_map.get(cur_art['type'], TVInfoImageType.other)
if False is poster and img_type == TVInfoImageType.poster:
ti_show.poster, ti_show.poster_thumb, poster = cur_art['image'], cur_art['thumbnail'], True
elif False is banner and img_type == TVInfoImageType.banner:
ti_show.banner, ti_show.banner_thumb, banner = cur_art['image'], cur_art['thumbnail'], True
elif False is fanart_url and img_type == TVInfoImageType.fanart:
ti_show.fanart, fanart_url = cur_art['image'], True
ti_show['images'].setdefault(img_type, []).append(TVInfoImage(
image_type=img_type,
sizes={
TVInfoImageSize.original: cur_art['image'],
TVInfoImageSize.small: cur_art['thumbnail']
},
img_id=cur_art['id'],
has_text=enforce_type(cur_art.get('includesText'), (bool, NoneType), None),
lang=cur_art['language'],
rating=cur_art['score'],
updated_at=cur_art['updatedAt'] or None
))
if (actors or self.config['actors_enabled']) \
and not getattr(self.ti_shows.get(sid), 'actors_loaded', False):
cast, crew, ti_show.actors_loaded = CastList(), CrewList(), True
if isinstance(show_data.get('characters'), list):
for cur_char in sorted(show_data.get('characters') or [],
key=lambda c: (not c['isFeatured'], c['sort'])):
if (people_type := people_types.get(cur_char['type'])) not in (
RoleTypes.ActorMain, RoleTypes.ActorGuest, RoleTypes.ActorSpecialGuest,
RoleTypes.ActorRecurring) \
and isinstance(cur_char['name'], string_types):
if 'presenter' in (low_name := cur_char['name'].lower()):
people_type = RoleTypes.Presenter
elif 'interviewer' in low_name:
people_type = RoleTypes.Interviewer
elif 'host' in low_name:
people_type = RoleTypes.Host
if cur_char['episodeId']:
if RoleTypes.ActorMain == people_type:
people_type = RoleTypes.ActorGuest
elif RoleTypes.Presenter == people_type:
people_type = RoleTypes.PresenterGuest
elif RoleTypes.Interviewer == people_type:
people_type = RoleTypes.InterviewerGuest
elif RoleTypes.Host == people_type:
people_type = RoleTypes.HostGuest
if people_type in (RoleTypes.Presenter, RoleTypes.Interviewer, RoleTypes.Host) \
and not cur_char['name']:
cur_char['name'] = {RoleTypes.Presenter: 'Presenter', RoleTypes.Interviewer: 'Interviewer',
RoleTypes.Host: 'Host'}.get(people_type) or cur_char['name']
if None is people_type:
continue
if RoleTypes.crew_limit > people_type:
cast[people_type].append(TVInfoCharacter(
p_id=cur_char['id'], name=clean_data(cur_char['name'] or ''),
ids=TVInfoIDs(ids={TVINFO_TVDB: cur_char['id']}),
image=self._sanitise_image_uri(cur_char['image']),
regular=cur_char['isFeatured'],
person=[TVInfoPerson(
p_id=cur_char['peopleId'], name=clean_data(cur_char['personName'] or ''),
image=self._sanitise_image_uri(cur_char.get('personImgURL')),
ids=TVInfoIDs(ids={TVINFO_TVDB: cur_char['peopleId']})
)]
))
else:
crew[people_type].append(TVInfoPerson(
p_id=cur_char['peopleId'], name=clean_data(cur_char['personName'] or ''),
ids=TVInfoIDs(ids={TVINFO_TVDB: cur_char['peopleId']}),
image=self._sanitise_image_uri(cur_char.get('personImgURL'))
))
if not cast[RoleTypes.ActorMain]:
html = get_url(f'https://www.thetvdb.com/series/{ti_show.slug}/people')
if html:
try:
with BS4Parser(html) as soup:
rc_role = re.compile(r'/series/(?P<show_slug>[^/]+)/people/(?P<role_id>\d+)/?$')
rc_img = re.compile(r'/(?P<url>person/(?P<person_id>\d+)/(?P<img_id>[^/]+)\..*)')
rc_img_v3 = re.compile(r'/(?P<url>actors/(?P<img_id>[^/]+)\..*)')
max_people = 5
rc_clean = re.compile(r'[^a-z\d]')
for cur_role in soup.find_all('a', href=rc_role) or []:
try:
image, person_id = 2 * [None]
for cur_rc in (rc_img, rc_img_v3):
img_tag = cur_role.find('img', src=cur_rc)
if img_tag:
img_parsed = cur_rc.search(img_tag.get('src'))
image, person_id = [
_x in img_parsed.groupdict() and img_parsed.group(_x)
for _x in ('url', 'person_id')]
break
lines = [_x.strip() for _x in cur_role.get_text().split('\n')
if _x.strip()][0:2]
name = role = ''
if len(lines):
name = lines[0]
for cur_line in lines[1:]:
if cur_line.lower().startswith('as '):
role = cur_line[3:]
break
if not person_id and max_people:
max_people -= 1
results = self.search_tvs(name)
try:
for cur_result in (
isinstance(results, dict) and results.get('results') or []):
# sorts 'banners/images/missing/' to last before filter
people_filter = (
lambda r: 'person' == r['type'] and
rc_clean.sub(name, '') == rc_clean.sub(r['name'], ''),
cur_result.get('nbHits')
and sorted(cur_result.get('hits'),
key=lambda x: len(x['image']), reverse=True) or [])
if ENV.get('SG_DEV_MODE'):
for cur_person in filter(*people_filter):
new_keys = set(list(cur_person)).difference({
'_highlightResult', 'banner', 'id', 'image',
'is_tvdb_searchable', 'is_tvt_searchable', 'name',
'objectID', 'people_birthdate', 'people_died',
'poster', 'type', 'url'
})
if new_keys:
log.warning(
f'DEV_MODE: New _parse_actors tvdb attrs for'
f' {cur_person["id"]} {new_keys!r}')
person_ok = False
for cur_person in filter(*people_filter):
if image:
people_data = get_url(cur_person['url'])
person_ok = re.search(re.escape(image), people_data)
if not image or person_ok:
person_id = cur_person['id']
raise ValueError('value okay, id found')
except (BaseException, Exception):
pass
rid = int(rc_role.search(cur_role.get('href')).group('role_id'))
person_id = try_int(person_id, None)
image = image and f'https://artworks.thetvdb.com/banners/{image}' or None
# noinspection PyTypeChecker
cast[RoleTypes.ActorMain].append(TVInfoCharacter(
p_id=rid, name=clean_data(role),
ids={TVINFO_TVDB: rid},
image=image,
person=[TVInfoPerson(
p_id=person_id, name=clean_data(name), ids={TVINFO_TVDB: person_id}
)]
))
except(BaseException, Exception):
pass
except(BaseException, Exception):
pass
ti_show.cast = cast
ti_show.crew = crew
ti_show.actors = [dict(
character=dict(
id=_ch.id, name=_ch.name,
url=f'https://www.thetvdb.com/series/{show_data["slug"]}/people/{_ch.id}',
image=_ch.image
),
person=dict(
id=_ch.person and _ch.person[0].id, name=_ch.person and _ch.person[0].name,
url=_ch.person and f'https://www.thetvdb.com/people/{_ch.person[0].id}',
image=_ch.person and _ch.person[0].image,
birthday=try_date(_ch.birthdate), deathday=try_date(_ch.deathdate),
gender=None,
country=None
)
) for _ch in cast[RoleTypes.ActorMain]]
if get_ep_info and not getattr(self.ti_shows.get(sid), 'ep_loaded', False):
# fetch absolute numbers
abs_ep_nums = {}
if any(1 for _s in show_data.get('seasons', []) or [] if 'absolute' == _s.get('type', {}).get('type')):
page = 0
while 100 >= page:
resp = self._fetch_data(f'/series/{sid}/episodes/absolute?page={page:d}')
if isinstance(resp, dict) and isinstance((resp.get('data') or {}).get('episodes'), list):
abs_ep_nums.update({_e['id']: _e['number'] for _e in resp['data']['episodes']
if None is _e['seasons'] and _e['number']})
if None is not (page := self._next_page(resp, page)):
continue
break
# fetch alt numbers
alt_ep_nums, alt_ep_types, default_season_type = \
{}, {}, self.season_types.get(show_data.get('defaultSeasonType'))
for cur_alt_type in {_a.get('type', {}).get('type') for _a in show_data.get('seasons', []) or []
if _a.get('type', {}).get('type') not in ('absolute', default_season_type)}:
if any(1 for _s in show_data.get('seasons', []) or []
if cur_alt_type == _s.get('type', {}).get('type')):
page = 0
while 100 >= page:
resp = self._fetch_data(f'/series/{sid}/episodes/{cur_alt_type}?page={page:d}')
if isinstance(resp, dict) and isinstance((resp.get('data') or {}).get('episodes'), list):
for cur_ep in resp['data']['episodes']:
alt_ep_types.setdefault(
self.season_types.get(cur_alt_type, cur_alt_type),
{}).setdefault(cur_ep['id'], {}).update({
'season': cur_ep['seasonNumber'], 'episode': cur_ep['number'],
'name': cur_ep['name']})
alt_ep_nums.setdefault(cur_ep['id'], {}).update({
self.season_type_map.get(cur_alt_type, cur_alt_type):
{'season': cur_ep['seasonNumber'], 'episode': cur_ep['number']}})
if None is not (page := self._next_page(resp, page)):
continue
break
ep_lang = (language in (show_data.get('overviewTranslations') or []) and language) or 'eng'
page, ti_show.ep_loaded, eps_count = 0, True, 0
while 100 >= page:
resp = self._fetch_data(f'/series/{sid}/episodes/default/{ep_lang}?page={page:d}')
if isinstance(resp, dict) and isinstance((resp.get('data') or {}).get('episodes'), list):
links = 'next' in (resp.get('links') or '')
total_items = (links and resp.get('links', {}).get('total_items')) or None
page_size = (links and resp.get('links', {}).get('page_size')) or 500
eps_count += (len(resp['data']['episodes'])) or 0
full_page = page_size <= len(resp['data']['episodes'])
more = (links and None is not (page := self._next_page(resp, page))) or (
links and isinstance(total_items, integer_types) and eps_count < total_items)
alt_page = (full_page and not links)
if not alt_page:
self._set_episodes(ti_show, resp, abs_ep_nums, alt_ep_nums, alt_ep_types)
if not alt_page and more:
continue
if alt_page:
html = get_url(f'https://www.thetvdb.com/series/{ti_show.slug}/'
f'allseasons/{default_season_type}')
if not html:
raise TvdbError('Failed to get episodes for show')
api_sxe = [f's{_e["seasonNumber"]}e{_e["number"]}'
for _e in resp['data']['episodes']]
template_ep = resp['data']['episodes'][-1].copy()
try:
with BS4Parser(html) as soup:
for cur_ep in soup.find_all('li', class_='list-group-item'):
try:
heading = cur_ep.h4
sxe = [try_int(_n, None) for _n in
re.findall(r'(?i)s(?:pecial\s+)?(\d+)[ex](\d+)', clean_data(
heading.find('span', class_='episode-label').get_text()))[0]]
if None in sxe or 's{}e{}'.format(*sxe) in api_sxe:
continue
ep_season, ep_number = sxe
except(BaseException, Exception):
continue
try:
ep_name = clean_data(heading.a.get_text())
ep_link = heading.a['href'] # check link contains 'series'
ep_id = try_int(re.findall(r'episodes/(\d+$)', ep_link)[0], None)
except(BaseException, Exception):
ep_id = None
if None is ep_id:
continue
list_items = cur_ep.find('ul', class_='list-inline')
aired = try_date(list_items.find_all('li')[0].get_text())
row_items = cur_ep.find('div', class_='row')
try:
overview = clean_str(row_items.p.get_text())
except(BaseException, Exception):
overview = ''
try:
image = row_items.find('div', class_='col-xs-3').img['src']
except(BaseException, Exception):
image = None
new_ep = template_ep.copy()
new_ep.update(dict(
id=ep_id, name=ep_name, aired=aired, overview=overview,
image=image, imageType=11 if image and '/banner' in image else None,
number=ep_number, seasonNumber=ep_season
))
resp['data']['episodes'] += [new_ep]
except (BaseException, Exception):
pass
self._set_episodes(ti_show, resp, abs_ep_nums, alt_ep_nums, alt_ep_types)
break
return True
return False
def _sanitise_image_uri(self, image):
return isinstance(image, string_types) and 'http' != image[0:4] and \
f'{self.art_url}{image.lstrip("/")}' or image
def _set_episodes(self, ti_show, ep_data, abs_ep_nums, alt_ep_nums, alt_ep_types):
# type: (TVInfoShow, Dict, Dict, Dict, Dict) -> None
"""
populate a show with episode objects
"""
for cur_ep_data in ep_data['data']['episodes']:
for cur_key, cur_data in (
('seasonnumber', 'seasonNumber'), ('episodenumber', 'number'),
('episodename', 'name'), ('firstaired', 'aired'), ('runtime', 'runtime'),
('seriesid', 'seriesId'), ('id', 'id'), ('filename', 'image'), ('overview', 'overview'),
('absolute_number', 'abs'), ('finale_type', 'finaleType')):
season_num, ep_num = cur_ep_data['seasonNumber'], cur_ep_data['number']
if 'absolute_number' == cur_key:
value = abs_ep_nums.get(cur_ep_data['id'])
else:
value = clean_data(cur_ep_data.get(cur_data, getattr(empty_ep, cur_key)))
if 'finale_type' == cur_key:
value = tvdb_final_types.get(cur_ep_data.get(cur_data), None)
if 'image' == cur_data:
value = self._sanitise_image_uri(value)
if season_num not in ti_show:
ti_show[season_num] = TVInfoSeason(show=ti_show)
ti_show[season_num].number = season_num
if ep_num not in ti_show[season_num]:
ti_show[season_num][ep_num] = TVInfoEpisode(season=ti_show[season_num])
ti_show[season_num][ep_num][cur_key] = value
ti_show[season_num][ep_num].__dict__[cur_key] = value
ti_show[season_num][ep_num].alt_num = alt_ep_nums.get(cur_ep_data['id'], {})
for cur_et in alt_ep_types:
try:
ep = alt_ep_types[cur_et][cur_ep_data['id']]
if ep:
ti_show.alt_ep_numbering.setdefault(cur_et, {}).setdefault(ep['season'], {})[ep['episode']] = \
ti_show[season_num][ep_num]
except (BaseException, Exception):
continue
@staticmethod
def _get_network(show_data):
# type: (Dict) -> Optional[AnyStr]
if 'network' in show_data:
return clean_data(show_data['network'])
if show_data.get('companies'):
if isinstance(show_data['companies'][0], dict):
return clean_data(next(
filter(lambda a: 1 == a['companyType']['companyTypeId'], show_data['companies']),
{}).get('name'))
return clean_data(show_data['companies'][0])
@staticmethod
def _get_aliases(show_data):
if show_data.get('aliases') and isinstance(show_data['aliases'][0], dict):
return [clean_data(_a['name']) for _a in show_data['aliases']]
return clean_data(show_data.get('aliases', []))
@staticmethod
def _get_tvdb_id(dct):
try:
return try_int(dct.get('tvdb_id'), None) or try_int(re.sub(r'^.+-(\d+)$', r'\1', f'{dct["id"]}'), None)
except (BaseException, Exception):
return
@staticmethod
def _get_first_aired(show_data):
if isinstance(show_data, dict):
first_aired = clean_data(show_data.get('first_air_time') or show_data.get('firstAired') or
('0000' != show_data.get('year') and show_data.get('year')) or None)
if isinstance(first_aired, string_types) and re.search(r'(19|20)\d\d', first_aired):
return first_aired
@staticmethod
def _get_remote_ids(show_data, tvdb_id):
ids = {}
if 'remote_ids' in show_data and isinstance(show_data['remote_ids'], list):
for cur_rid in show_data['remote_ids']:
src_name = cur_rid['sourceName'].lower()
src_value = clean_data(cur_rid['id'])
if 'imdb' in src_name:
try:
imdb_id = try_int(src_value.replace('tt', ''), None)
ids['imdb'] = imdb_id
except (BaseException, Exception):
pass
elif 'themoviedb' in src_name:
ids['tmdb'] = try_int(src_value, None)
elif 'tv maze' in src_name:
ids['tvmaze'] = try_int(src_value, None)
return TVInfoIDs(tvdb=tvdb_id, **ids)
def _search_show(
self,
name=None, # type: Union[AnyStr, List[AnyStr]]
ids=None, # type: Dict[integer_types, integer_types]
lang=None, # type: Optional[string_types]
**kwargs):
# type: (...) -> List[Dict]
"""
internal search function to find shows, should be overwritten in class
:param name: name to search for
:param ids: dict of ids {tvid: prodid} to search for
"""
def _make_result_dict(show_data):
tvdb_id = self._get_tvdb_id(show_data)
series_name, series_aliases = self._get_series_name(show_data, language=lang)
if not series_name:
return []
ti_show = TVInfoShow()
if country := clean_data(show_data.get('country')) or []:
country = [country]
ti_show.seriesname, ti_show.id, ti_show.ids, \
ti_show.firstaired, ti_show.network, \
ti_show.overview, \
ti_show.poster, \
ti_show.status, \
ti_show.language, ti_show.origin_countries, \
ti_show.aliases, ti_show.slug, \
ti_show.genre_list \
= series_name, tvdb_id, self._get_remote_ids(show_data, tvdb_id), \
self._get_first_aired(show_data), self._get_network(show_data), \
self._get_overview(show_data, language=lang) or clean_str(show_data.get('overview')), \
show_data.get('image_url') or show_data.get('image'), \
clean_data(isinstance(show_data.get('status'), dict) and show_data.get('status', {}).get('name')
or show_data['status']), \
clean_data(show_data.get('primary_language')), country, \
series_aliases, clean_data(show_data.get('slug')), \
[clean_data(_g.get('name')) for _g in (show_data.get('genres') or [])]
ti_show.genre = '|'.join(ti_show.genre_list or [])
return [ti_show]
results = []
if ids:
for cur_tvinfo, cur_arg, cur_name in ((TVINFO_TVDB, lang, None), (TVINFO_IMDB, 'tt%07d', 'imdb'),
(TVINFO_TMDB, '%s', 'themoviedb'), (TVINFO_TVMAZE, '%s', 'tv maze')):
if not ids.get(cur_tvinfo):
continue
type_chk, query = list, None
if TVINFO_TVDB == cur_tvinfo:
resp = self._get_show_data(ids[cur_tvinfo], cur_arg, direct_data=True)
type_chk = dict
else:
resp = self.get_cached_or_url(
'search?meta=translations',
f's-v4-id-{cur_tvinfo}-{ids[cur_tvinfo]}', expire=self.search_cache_expire,
remote_id=(query := cur_arg % ids[cur_tvinfo]), query=query, type='series')
if self._check_resp(type_chk, resp):
if TVINFO_TVDB == cur_tvinfo:
results.extend(_make_result_dict(resp['data']))
continue
for cur_item in resp['data']:
try:
if query == next(filter(lambda b: cur_name in b['sourceName'].lower(),
cur_item.get('remote_ids', []) or []), {}).get('id'):
results.extend(_make_result_dict(cur_item))
break
except (BaseException, Exception):
pass
if ids.get(TVINFO_TVDB_SLUG) and isinstance(ids.get(TVINFO_TVDB_SLUG), string_types):
if (resp := self.get_cached_or_url(
f'/series/slug/{ids.get(TVINFO_TVDB_SLUG)}?meta=translations',
f's-id-{TVINFO_TVDB}-{ids[TVINFO_TVDB_SLUG]}', expire=self.search_cache_expire)) \
and self._check_resp(dict, resp) \
and ids.get(TVINFO_TVDB_SLUG).lower() == resp['data']['slug'].lower():
results.extend(_make_result_dict(resp['data']))
if name:
for cur_name in ([name], name)[isinstance(name, list)]:
if (resp := self.get_cached_or_url(
'search?meta=translations',
f's-v4-name-{cur_name}', expire=self.search_cache_expire,
query=cur_name, type='series')) \
and self._check_resp(list, resp):
for cur_item in resp['data']:
results.extend(_make_result_dict(cur_item))
seen = set()
results = [seen.add(_r['id']) or _r for _r in results if _r['id'] not in seen]
return results
def _get_languages(self):
# type: (...) -> None
if self._check_resp(list, resp := self._fetch_data('/languages')):
TvdbAPIv4._supported_languages = [{
'id': clean_data(_r['id']), 'name': clean_data(_r['name']),
'nativeName': clean_data(_r['nativeName']),
'shortCode': clean_data(_r['shortCode']),
'sg_lang': self.reverse_map_languages.get(_r['id'], _r['id'])} for _r in resp['data']]
else:
TvdbAPIv4._supported_languages = []
def _get_filtered_series(self, result_count=100, **kwargs):
# type: (...) -> List[TVInfoShow]
result = []
page, cc = 0, 0
while 100 > page and cc < result_count:
if self._check_resp(list, resp := self._fetch_data('/series/filter', page=page, **kwargs)) \
and len(resp['data']):
for cur_item in resp['data']:
cc += 1
if cc > result_count:
break
_ti = TVInfoShow()
_ti.id, _ti.seriesname, _ti.firstaired, \
_ti.overview, _ti.ids, \
_ti.poster, _ti.language, \
_ti.origin_countries, _ti.rating, _ti.slug \
= cur_item['id'], clean_data(cur_item['name']), self._get_first_aired(cur_item), \
clean_str(cur_item['overview']), TVInfoIDs(tvdb=cur_item['id']), \
self._sanitise_image_uri(cur_item['image']), clean_data(cur_item['originalLanguage']), \
clean_data([cur_item['originalCountry']]), cur_item['score'], clean_data(cur_item['slug'])
result.append(_ti)
if None is not (page := self._next_page(resp, page)):
continue
break
return result
def discover(self, result_count=100, get_extra_images=False, **kwargs):
# type: (integer_types, bool, Optional[Any]) -> List[TVInfoShow]
return self._get_filtered_series(result_count=result_count, status=status_names['Upcoming']['id'],
sort='firstAired', sortType='asc', lang='eng')
def get_top_rated(self, result_count=100, year=None, in_last_year=False, **kwargs):
# type: (integer_types, integer_types, bool, Optional[Any]) -> List[TVInfoShow]
kw = dict(sort='score', sortType='desc', lang='eng')
if in_last_year:
ly = ((t := datetime.date.today()) - datetime.timedelta(days=365)).strftime('%Y-%m-%d')
this_year = self._get_filtered_series(result_count=result_count, year=(y := t.year), **kw)
last_year = [_l for _l in self._get_filtered_series(result_count=result_count, year=y-1, **kw)
if 10 == len(_l.firstaired or '') and _l.firstaired > ly]
return sorted(this_year + last_year, key=lambda a: a.rating, reverse=True)[:result_count]
elif isinstance(year, int):
kw['year'] = year
return self._get_filtered_series(result_count=result_count, **kw)