mirror of
https://github.com/SickGear/SickGear.git
synced 2024-11-15 01:15:05 +00:00
53cc79ee8c
Change hide ratings on browse shows cards in person view for TVmaze as not provided. Change replace test code for consistency at "Other shows" for sub title across all sources. Make overview parser centralised for consistent reusability. Fix add dynamic fetch to fill show data on person view when a role is non main and the API does not expose. Add logic for when there are no genres. Fix mitigate MRU issue where browse default and param person view conflicted. Change make TMDB person genre lowercase for consistency with other sources. Filter out TMDB roles that have no actual name. Add expose `show_type` to TVmaze api for cases where genres are None (e.g. Taylor Swift vs. Scooter Braun). Fix images on Shows -> MC cards view. Change ensure genres on NE browse shows view are lowercase consistent with other sources. Change browse shows view, replace \r\n in show overviews to a single whitespace on cards. Change TMDB browse shows expose ratings. Fix TMDB browse shows use alternative term_vote in drop down.
858 lines
44 KiB
Python
858 lines
44 KiB
Python
# encoding:utf-8
|
|
# author:Prinz23
|
|
# project:tvmaze_api
|
|
|
|
__author__ = 'Prinz23'
|
|
__version__ = '1.0'
|
|
__api_version__ = '1.0.0'
|
|
|
|
import datetime
|
|
import logging
|
|
import re
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
# noinspection PyProtectedMember
|
|
from tornado._locale_data import LOCALE_NAMES
|
|
from urllib3.util.retry import Retry
|
|
|
|
from sg_helpers import clean_data, enforce_type, get_url, try_int
|
|
from lib.dateutil.parser import parser
|
|
# noinspection PyProtectedMember
|
|
from lib.dateutil.tz.tz import _datetime_to_timestamp
|
|
from lib.exceptions_helper import ConnectionSkipException, ex
|
|
from lib.pytvmaze import tvmaze
|
|
# from .tvmaze_exceptions import *
|
|
from lib.tvinfo_base import TVInfoBase, TVInfoImage, TVInfoImageSize, TVInfoImageType, TVInfoCharacter, Crew, \
|
|
crew_type_names, TVInfoPerson, RoleTypes, TVInfoShow, TVInfoEpisode, TVInfoIDs, TVInfoNetwork, TVInfoSeason, \
|
|
PersonGenders, TVINFO_TVMAZE, TVINFO_TVDB, TVINFO_IMDB
|
|
|
|
from six import integer_types, iteritems, string_types
|
|
|
|
# noinspection PyUnreachableCode
|
|
if False:
|
|
from typing import Any, AnyStr, Dict, List, Optional
|
|
from lib.pytvmaze.tvmaze import Episode as TVMazeEpisode, Show as TVMazeShow
|
|
|
|
log = logging.getLogger('tvmaze.api')
|
|
log.addHandler(logging.NullHandler())
|
|
|
|
|
|
# Query TVmaze free endpoints
|
|
def tvmaze_endpoint_standard_get(url):
|
|
s = requests.Session()
|
|
retries = Retry(total=5,
|
|
backoff_factor=0.1,
|
|
status_forcelist=[429])
|
|
# noinspection HttpUrlsUsage
|
|
s.mount('http://', HTTPAdapter(max_retries=retries))
|
|
s.mount('https://', HTTPAdapter(max_retries=retries))
|
|
# noinspection PyProtectedMember
|
|
return get_url(url, json=True, session=s, hooks={'response': tvmaze._record_hook}, raise_skip_exception=True)
|
|
|
|
|
|
tvmaze.TVmaze.endpoint_standard_get = staticmethod(tvmaze_endpoint_standard_get)
|
|
tvm_obj = tvmaze.TVmaze()
|
|
empty_ep = TVInfoEpisode()
|
|
empty_se = TVInfoSeason()
|
|
tz_p = parser()
|
|
|
|
character_clean_regex = re.compile(r'^tb(a|d)$', flags=re.I)
|
|
|
|
img_type_map = {
|
|
'poster': TVInfoImageType.poster,
|
|
'banner': TVInfoImageType.banner,
|
|
'background': TVInfoImageType.fanart,
|
|
'typography': TVInfoImageType.typography,
|
|
}
|
|
|
|
img_size_map = {
|
|
'original': TVInfoImageSize.original,
|
|
'medium': TVInfoImageSize.medium,
|
|
}
|
|
|
|
show_map = {
|
|
'id': 'maze_id',
|
|
'ids': 'externals',
|
|
# 'slug': '',
|
|
'seriesid': 'maze_id',
|
|
'seriesname': 'name',
|
|
'aliases': 'akas',
|
|
# 'season': '',
|
|
'classification': 'type',
|
|
# 'genre': '',
|
|
'genre_list': 'genres',
|
|
# 'actors': '',
|
|
# 'cast': '',
|
|
# 'show_type': '',
|
|
# 'network': 'network',
|
|
# 'network_id': '',
|
|
# 'network_timezone': '',
|
|
# 'network_country': '',
|
|
# 'network_country_code': '',
|
|
# 'network_is_stream': '',
|
|
# 'runtime': 'runtime',
|
|
'language': 'language',
|
|
'official_site': 'official_site',
|
|
# 'imdb_id': '',
|
|
# 'zap2itid': '',
|
|
# 'airs_dayofweek': '',
|
|
# 'airs_time': '',
|
|
# 'time': '',
|
|
'firstaired': 'premiered',
|
|
# 'added': '',
|
|
# 'addedby': '',
|
|
# 'siteratingcount': '',
|
|
# 'lastupdated': '',
|
|
# 'contentrating': '',
|
|
# 'rating': 'rating',
|
|
'status': 'status',
|
|
'overview': 'summary',
|
|
# 'poster': 'image',
|
|
# 'poster_thumb': '',
|
|
# 'banner': '',
|
|
# 'banner_thumb': '',
|
|
# 'fanart': '',
|
|
# 'banners': '',
|
|
'updated_timestamp': 'updated',
|
|
}
|
|
season_map = {
|
|
'id': 'id',
|
|
'number': 'season_number',
|
|
'name': 'name',
|
|
# 'actors': '',
|
|
# 'cast': '',
|
|
# 'network': '',
|
|
# 'network_id': '',
|
|
# 'network_timezone': '',
|
|
# 'network_country': '',
|
|
# 'network_country_code': '',
|
|
# 'network_is_stream': '',
|
|
'ordered': '',
|
|
'start_date': 'premiere_date',
|
|
'end_date': 'end_date',
|
|
# 'poster': '',
|
|
'summery': 'summary',
|
|
'episode_order': 'episode_order',
|
|
}
|
|
|
|
|
|
class TvMaze(TVInfoBase):
|
|
supported_id_searches = [TVINFO_TVMAZE, TVINFO_TVDB, TVINFO_IMDB]
|
|
supported_person_id_searches = [TVINFO_TVMAZE]
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(TvMaze, self).__init__(*args, **kwargs)
|
|
|
|
def _search_show(self, name=None, ids=None, **kwargs):
|
|
def _make_result_dict(s):
|
|
# type: (tvmaze.Show) -> Dict
|
|
language = s.language and clean_data(s.language.lower())
|
|
language_country_code = None
|
|
if language:
|
|
for cur_locale in iteritems(LOCALE_NAMES):
|
|
if language in cur_locale[1]['name_en'].lower():
|
|
language_country_code = cur_locale[0].split('_')[1].lower()
|
|
break
|
|
ti_show = TVInfoShow()
|
|
show_type = clean_data(s.type)
|
|
if show_type:
|
|
show_type = [show_type]
|
|
else:
|
|
show_type = []
|
|
ti_show.seriesname, ti_show.id, ti_show.firstaired, ti_show.network, ti_show.genre_list, ti_show.overview, \
|
|
ti_show.language, ti_show.runtime, ti_show.show_type, ti_show.airs_dayofweek, ti_show. status, \
|
|
ti_show.official_site, ti_show.aliases, ti_show.poster, ti_show.ids = clean_data(s.name), s.id, \
|
|
clean_data(s.premiered), \
|
|
clean_data((s.network and s.network.name) or (s.web_channel and s.web_channel.name)), \
|
|
isinstance(s.genres, list) and [clean_data(g.lower()) for g in s.genres], \
|
|
enforce_type(clean_data(s.summary), str, ''), clean_data(s.language), \
|
|
s.average_runtime or s.runtime, show_type, ', '.join(s.schedule['days'] or []), clean_data(s.status), \
|
|
clean_data(s.official_site), [clean_data(a.name) for a in s.akas], \
|
|
s.image and s.image.get('original'), \
|
|
TVInfoIDs(tvdb=s.externals.get('thetvdb'), rage=s.externals.get('tvrage'), tvmaze=s.id,
|
|
imdb=clean_data(s.externals.get('imdb') and
|
|
try_int(s.externals.get('imdb').replace('tt', ''), None)))
|
|
ti_show.genre = '|'.join(ti_show.genre_list or [])
|
|
return ti_show
|
|
|
|
results = []
|
|
if ids:
|
|
for t, p in iteritems(ids):
|
|
if t in self.supported_id_searches:
|
|
cache_id_key = 's-id-%s-%s' % (t, ids[t])
|
|
is_none, shows = self._get_cache_entry(cache_id_key)
|
|
if t == TVINFO_TVDB:
|
|
if not self.config.get('cache_search') or (None is shows and not is_none):
|
|
try:
|
|
show = tvmaze.lookup_tvdb(p)
|
|
self._set_cache_entry(cache_id_key, show, expire=self.search_cache_expire)
|
|
except (BaseException, Exception):
|
|
continue
|
|
else:
|
|
show = shows
|
|
elif t == TVINFO_IMDB:
|
|
if not self.config.get('cache_search') or (None is shows and not is_none):
|
|
try:
|
|
show = tvmaze.lookup_imdb((p, 'tt%07d' % p)[not str(p).startswith('tt')])
|
|
self._set_cache_entry(cache_id_key, show, expire=self.search_cache_expire)
|
|
except (BaseException, Exception):
|
|
continue
|
|
else:
|
|
show = shows
|
|
elif t == TVINFO_TVMAZE:
|
|
if not self.config.get('cache_search') or (None is shows and not is_none):
|
|
try:
|
|
show = tvm_obj.get_show(maze_id=p)
|
|
self._set_cache_entry(cache_id_key, show, expire=self.search_cache_expire)
|
|
except (BaseException, Exception):
|
|
continue
|
|
else:
|
|
show = shows
|
|
else:
|
|
continue
|
|
if show:
|
|
try:
|
|
if show.id not in [i['id'] for i in results]:
|
|
results.append(_make_result_dict(show))
|
|
except (BaseException, Exception) as e:
|
|
log.debug('Error creating result dict: %s' % ex(e))
|
|
if name:
|
|
for n in ([name], name)[isinstance(name, list)]:
|
|
cache_name_key = 's-name-%s' % n
|
|
is_none, shows = self._get_cache_entry(cache_name_key)
|
|
if not self.config.get('cache_search') or (None is shows and not is_none):
|
|
try:
|
|
shows = tvmaze.show_search(n)
|
|
except (BaseException, Exception) as e:
|
|
log.debug('Error searching for show: %s' % ex(e))
|
|
continue
|
|
results.extend([_make_result_dict(s) for s in shows or []])
|
|
|
|
seen = set()
|
|
results = [seen.add(r['id']) or r for r in results if r['id'] not in seen]
|
|
return results
|
|
|
|
def _set_episode(self, sid, ep_obj):
|
|
for _k, _s in (
|
|
('seasonnumber', 'season_number'), ('episodenumber', 'episode_number'),
|
|
('episodename', 'title'), ('overview', 'summary'), ('firstaired', 'airdate'),
|
|
('airtime', 'airtime'), ('runtime', 'runtime'),
|
|
('seriesid', 'maze_id'), ('id', 'maze_id'), ('is_special', 'special'), ('filename', 'image')):
|
|
if 'airtime' == _k:
|
|
try:
|
|
airtime = datetime.time.fromisoformat(clean_data(getattr(ep_obj, _s, getattr(empty_ep, _k))))
|
|
except (BaseException, Exception):
|
|
airtime = None
|
|
self._set_item(sid, ep_obj.season_number, ep_obj.episode_number or 0, _k, airtime)
|
|
elif 'filename' == _k:
|
|
image = getattr(ep_obj, _s, {}) or {}
|
|
image = image.get('original') or image.get('medium')
|
|
self._set_item(sid, ep_obj.season_number, ep_obj.episode_number or 0, _k, image)
|
|
else:
|
|
self._set_item(sid, ep_obj.season_number, ep_obj.episode_number or 0, _k,
|
|
clean_data(getattr(ep_obj, _s, getattr(empty_ep, _k))))
|
|
|
|
if ep_obj.airstamp:
|
|
try:
|
|
at = _datetime_to_timestamp(tz_p.parse(ep_obj.airstamp))
|
|
self._set_item(sid, ep_obj.season_number, ep_obj.episode_number or 0, 'timestamp', at)
|
|
except (BaseException, Exception):
|
|
pass
|
|
|
|
@staticmethod
|
|
def _set_network(ti_obj, network, is_stream):
|
|
ti_obj.network = clean_data(network.name)
|
|
ti_obj.network_timezone = clean_data(network.timezone)
|
|
ti_obj.network_country = clean_data(network.country)
|
|
ti_obj.network_country_code = clean_data(network.code)
|
|
ti_obj.network_id = clean_data(network.maze_id)
|
|
ti_obj.network_is_stream = is_stream
|
|
|
|
def _set_images(self, ti_show, show_data, p_set):
|
|
# type: (TVInfoShow, TVMazeShow, bool) -> None
|
|
"""
|
|
Populate TVInfoShow with images show data
|
|
|
|
:param ti_show:
|
|
:param show_data:
|
|
:param p_set:
|
|
"""
|
|
|
|
b_set, f_set = False, False
|
|
for cur_img in show_data.images:
|
|
img_type = img_type_map.get(cur_img.type, TVInfoImageType.other)
|
|
img_width, img_height, img_url = ([cur_img.resolutions['original'].get(this)
|
|
for this in ('width', 'height', 'url')])
|
|
img_ar = img_width and img_height and float(img_width) / float(img_height)
|
|
img_ar_type = self._which_type(img_width, img_ar)
|
|
if TVInfoImageType.poster == img_type and img_ar and img_ar_type != img_type and \
|
|
ti_show.poster == img_url:
|
|
p_set = False
|
|
ti_show.poster = None
|
|
ti_show.poster_thumb = None
|
|
img_type = (TVInfoImageType.other, img_type)[
|
|
not img_ar or img_ar_type == img_type or
|
|
img_type not in (TVInfoImageType.banner, TVInfoImageType.poster, TVInfoImageType.fanart)]
|
|
img_src = {}
|
|
for cur_res, cur_img_url in iteritems(cur_img.resolutions):
|
|
img_size = img_size_map.get(cur_res)
|
|
if img_size:
|
|
img_src[img_size] = cur_img_url.get('url')
|
|
ti_show.images.setdefault(img_type, []).append(
|
|
TVInfoImage(
|
|
image_type=img_type, sizes=img_src, img_id=cur_img.id, main_image=cur_img.main,
|
|
type_str=cur_img.type, width=img_width, height=img_height, aspect_ratio=img_ar))
|
|
if not p_set and TVInfoImageType.poster == img_type:
|
|
p_set = True
|
|
ti_show.poster = img_url
|
|
ti_show.poster_thumb = img_url
|
|
elif not b_set and 'banner' == cur_img.type and TVInfoImageType.banner == img_type:
|
|
b_set = True
|
|
ti_show.banner = img_url
|
|
ti_show.banner_thumb = cur_img.resolutions.get('medium')['url']
|
|
elif not f_set and 'background' == cur_img.type and TVInfoImageType.fanart == img_type:
|
|
f_set = True
|
|
ti_show.fanart = img_url
|
|
|
|
def _get_tvm_show(self, show_id, get_ep_info):
|
|
try:
|
|
self.show_not_found = False
|
|
return tvm_obj.get_show(maze_id=show_id, embed='cast%s' % ('', ',episodeswithspecials')[get_ep_info])
|
|
except tvmaze.ShowNotFound:
|
|
self.show_not_found = True
|
|
except (BaseException, Exception):
|
|
log.debug('Error getting data for TVmaze show id: %s' % show_id)
|
|
|
|
def _get_show_data(self, sid, language, get_ep_info=False, banners=False, posters=False, seasons=False,
|
|
seasonwides=False, fanart=False, actors=False, **kwargs):
|
|
log.debug('Getting all series data for %s' % sid)
|
|
|
|
show_data = self._get_tvm_show(sid, get_ep_info)
|
|
if not show_data:
|
|
return False
|
|
|
|
ti_show = self.ti_shows[sid] # type: TVInfoShow
|
|
self._show_info_loader(
|
|
sid, show_data, ti_show,
|
|
load_images=banners or posters or fanart or
|
|
any(self.config.get('%s_enabled' % t, False) for t in ('banners', 'posters', 'fanart')),
|
|
load_actors=(actors or self.config['actors_enabled'])
|
|
)
|
|
|
|
if get_ep_info and not getattr(self.ti_shows.get(sid), 'ep_loaded', False):
|
|
log.debug('Getting all episodes of %s' % sid)
|
|
if None is show_data:
|
|
show_data = self._get_tvm_show(sid, get_ep_info)
|
|
if not show_data:
|
|
return False
|
|
|
|
if show_data.episodes:
|
|
specials = []
|
|
for cur_ep in show_data.episodes:
|
|
if cur_ep.is_special():
|
|
specials.append(cur_ep)
|
|
else:
|
|
self._set_episode(sid, cur_ep)
|
|
|
|
if specials:
|
|
specials.sort(key=lambda ep: ep.airstamp or 'Last')
|
|
for cur_ep_num, cur_sp in enumerate(specials, start=1):
|
|
cur_sp.season_number, cur_sp.episode_number = 0, cur_ep_num
|
|
self._set_episode(sid, cur_sp)
|
|
|
|
if show_data.seasons:
|
|
networks = {}
|
|
for _, cur_season in iteritems(show_data.seasons):
|
|
ti_season = None
|
|
if cur_season.season_number not in ti_show:
|
|
if all(_e.is_special() for _e in cur_season.episodes or []):
|
|
ti_season = ti_show[0]
|
|
else:
|
|
log.error('error episodes have no numbers')
|
|
ti_season = ti_season or ti_show[cur_season.season_number] # type: TVInfoSeason
|
|
for k, v in iteritems(season_map):
|
|
setattr(ti_season, k, clean_data(getattr(cur_season, v, None)) or empty_se.get(v))
|
|
if cur_season.network:
|
|
self._set_network(ti_season, cur_season.network, False)
|
|
elif cur_season.web_channel:
|
|
self._set_network(ti_season, cur_season.web_channel, True)
|
|
if ti_season.network:
|
|
networks[cur_season.season_number] = TVInfoNetwork(
|
|
name=ti_season.network, country=ti_season.network_country,
|
|
country_code=ti_season.network_country_code, n_id=ti_season.network_id,
|
|
stream=ti_season.network_is_stream, timezone=ti_season.network_timezone
|
|
)
|
|
if cur_season.image:
|
|
ti_season.poster = cur_season.image.get('original')
|
|
ti_show.season_images_loaded = True
|
|
seen = set()
|
|
ti_show.networks = [seen.add(r[1].name) or r[1]
|
|
for r in sorted(iteritems(networks), key=lambda a: a[0], reverse=True)
|
|
if r[1].name not in seen]
|
|
|
|
ti_show.ep_loaded = True
|
|
|
|
return True
|
|
|
|
def get_updated_shows(self):
|
|
# type: (...) -> Dict[integer_types, integer_types]
|
|
return {sid: v.seconds_since_epoch for sid, v in iteritems(tvmaze.show_updates().updates)}
|
|
|
|
@staticmethod
|
|
def _clean_character_name(name):
|
|
# type: (Optional[str]) -> str
|
|
name = clean_data(name)
|
|
if isinstance(name, str):
|
|
return enforce_type(character_clean_regex.sub('', name), str, '')
|
|
return enforce_type(name, str, '')
|
|
|
|
def _convert_person(self, tvmaze_person_obj, load_credits=True):
|
|
# type: (tvmaze.Person, bool) -> TVInfoPerson
|
|
ch = []
|
|
_dupes = []
|
|
if load_credits:
|
|
for c in tvmaze_person_obj.castcredits or []:
|
|
ti_show = TVInfoShow()
|
|
ti_show.seriesname = clean_data(c.show.name)
|
|
ti_show.id = c.show.id
|
|
ti_show.firstaired = clean_data(c.show.premiered)
|
|
ti_show.ids = TVInfoIDs(ids={TVINFO_TVMAZE: ti_show.id})
|
|
ti_show.overview = clean_data(c.show.summary)
|
|
ti_show.status = clean_data(c.show.status)
|
|
ti_show.vote_average = clean_data((c.show.rating and c.show.rating.get('average'))) or None
|
|
ti_show.rating = ti_show.vote_average
|
|
net = c.show.network or c.show.web_channel
|
|
ti_show.genre_list = clean_data(c.show.genres or [])
|
|
ti_show.genre = '|'.join(ti_show.genre_list or [])
|
|
ti_show.show_type = clean_data((
|
|
isinstance(c.show.type, string_types) and [c.show.type.lower()] or
|
|
isinstance(c.show.type, list) and [x.lower() for x in c.show.type] or []
|
|
))
|
|
if net:
|
|
ti_show.network = clean_data(net.name)
|
|
ti_show.network_id = net.maze_id
|
|
ti_show.network_country = clean_data(net.country)
|
|
ti_show.network_country_code = clean_data(net.code)
|
|
ti_show.network_timezone = clean_data(net.timezone)
|
|
ti_show.network_is_stream = None is not c.show.web_channel
|
|
_images = None
|
|
if c.character.image and all(i_s in c.character.image and c.character.image[i_s]
|
|
for i_s in ('original', 'medium')):
|
|
_images = [TVInfoImage(TVInfoImageType.poster,
|
|
sizes={TVInfoImageSize.original: c.character.image['original'],
|
|
TVInfoImageSize.medium: c.character.image['medium']})]
|
|
ch.append(TVInfoCharacter(name=self._clean_character_name(c.character.name),
|
|
ti_show=ti_show, episode_count=1, plays_self=c.character.plays_self,
|
|
voice=c.character.voice,
|
|
image= c.character.image and c.character.image.get('original'),
|
|
thumb_url= c.character.image and c.character.image.get('medium'),
|
|
p_id=c.character.id, images=_images))
|
|
try:
|
|
birthdate = tvmaze_person_obj.birthday and tz_p.parse(tvmaze_person_obj.birthday).date()
|
|
except (BaseException, Exception):
|
|
birthdate = None
|
|
try:
|
|
deathdate = tvmaze_person_obj.death_day and tz_p.parse(tvmaze_person_obj.death_day).date()
|
|
except (BaseException, Exception):
|
|
deathdate = None
|
|
|
|
_ti_person_obj = TVInfoPerson(
|
|
p_id=tvmaze_person_obj.id, name=clean_data(tvmaze_person_obj.name),
|
|
image=tvmaze_person_obj.image and tvmaze_person_obj.image.get('original'),
|
|
gender=PersonGenders.named.get(tvmaze_person_obj.gender and tvmaze_person_obj.gender.lower(),
|
|
PersonGenders.unknown),
|
|
birthdate=birthdate, deathdate=deathdate,
|
|
country=tvmaze_person_obj.country and clean_data(tvmaze_person_obj.country.get('name')),
|
|
country_code=tvmaze_person_obj.country and clean_data(tvmaze_person_obj.country.get('code')),
|
|
country_timezone=tvmaze_person_obj.country and clean_data(tvmaze_person_obj.country.get('timezone')),
|
|
thumb_url=tvmaze_person_obj.image and tvmaze_person_obj.image.get('medium'),
|
|
url=tvmaze_person_obj.url, ids=TVInfoIDs(ids={TVINFO_TVMAZE: tvmaze_person_obj.id})
|
|
)
|
|
|
|
if load_credits:
|
|
for (c_t, regular) in [(tvmaze_person_obj.castcredits or [], True),
|
|
(tvmaze_person_obj.guestcastcredits or [], False)]:
|
|
for c in c_t: # type: tvmaze.CastCredit
|
|
_show = c.show or c.episode.show
|
|
_clean_char_name = self._clean_character_name(c.character.name)
|
|
ti_show = TVInfoShow()
|
|
if None is not _show:
|
|
_clean_show_name = clean_data(_show.name)
|
|
_clean_show_id = clean_data(_show.id)
|
|
_cur_dup = (_clean_char_name, _clean_show_id)
|
|
if _cur_dup in _dupes:
|
|
_co = next((_c for _c in ch if _clean_show_id == _c.ti_show.id
|
|
and _c.name == _clean_char_name), None)
|
|
if None is not _co:
|
|
ti_show = _co.ti_show
|
|
_co.episode_count += 1
|
|
if not regular:
|
|
ep_no = c.episode.episode_number or 0
|
|
_co.guest_episodes_numbers.setdefault(c.episode.season_number, []).append(ep_no)
|
|
if c.episode.season_number not in ti_show:
|
|
season = TVInfoSeason(show=ti_show, number=c.episode.season_number)
|
|
ti_show[c.episode.season_number] = season
|
|
else:
|
|
season = ti_show[c.episode.season_number]
|
|
episode = self._make_episode(c.episode, show_obj=ti_show)
|
|
episode.season = season
|
|
ti_show[c.episode.season_number][ep_no] = episode
|
|
continue
|
|
else:
|
|
_dupes.append(_cur_dup)
|
|
ti_show.seriesname = clean_data(_show.name)
|
|
ti_show.id = _show.id
|
|
ti_show.firstaired = clean_data(_show.premiered)
|
|
ti_show.ids = TVInfoIDs(ids={TVINFO_TVMAZE: ti_show.id})
|
|
ti_show.overview = enforce_type(clean_data(_show.summary), str, '')
|
|
ti_show.status = clean_data(_show.status)
|
|
ti_show.vote_average = clean_data(_show.rating and _show.rating.get('average')) or None
|
|
ti_show.rating = ti_show.vote_average
|
|
net = _show.network or _show.web_channel
|
|
if net:
|
|
ti_show.network = clean_data(net.name)
|
|
ti_show.network_id = net.maze_id
|
|
ti_show.network_country = clean_data(net.country)
|
|
ti_show.network_timezone = clean_data(net.timezone)
|
|
ti_show.network_country_code = clean_data(net.code)
|
|
ti_show.network_is_stream = None is not _show.web_channel
|
|
if c.episode:
|
|
|
|
ti_show.show_loaded = False
|
|
ti_show.load_method = self._show_info_loader
|
|
season = TVInfoSeason(show=ti_show, number=c.episode.season_number)
|
|
ti_show[c.episode.season_number] = season
|
|
episode = self._make_episode(c.episode, show_obj=ti_show)
|
|
episode.season = season
|
|
ti_show[c.episode.season_number][c.episode.episode_number or 0] = episode
|
|
if not regular:
|
|
_g_kw = {'guest_episodes_numbers': {c.episode.season_number: [c.episode.episode_number or 0]}}
|
|
else:
|
|
_g_kw = {}
|
|
_images = None
|
|
if c.character.image and all(i_s in c.character.image and c.character.image[i_s]
|
|
for i_s in ('original', 'medium')):
|
|
_images = [TVInfoImage(TVInfoImageType.poster,
|
|
sizes={TVInfoImageSize.original: c.character.image['original'],
|
|
TVInfoImageSize.medium: c.character.image['medium']})]
|
|
ch.append(TVInfoCharacter(name=_clean_char_name, ti_show=ti_show, regular=regular, episode_count=1,
|
|
person=[_ti_person_obj], plays_self=c.character.plays_self,
|
|
voice=c.character.voice,
|
|
image=c.character.image and c.character.image.get('original'),
|
|
thumb_url=c.character.image and c.character.image.get('medium'),
|
|
p_id=c.character.id, images=_images, **_g_kw))
|
|
_ti_person_obj.characters = ch
|
|
return _ti_person_obj
|
|
|
|
def _show_info_loader(self, show_id, show_data=None, show_obj=None, load_images=True, load_actors=True):
|
|
# type: (int, TVMazeShow, TVInfoShow, bool, bool) -> TVInfoShow
|
|
try:
|
|
_s_d = show_data or tvmaze.show_main_info(show_id, embed='cast')
|
|
if _s_d:
|
|
if None is not show_obj:
|
|
_s_o = show_obj
|
|
else:
|
|
_s_o = TVInfoShow()
|
|
show_dict = _s_o.__dict__
|
|
for k, v in iteritems(show_dict):
|
|
if k not in ('cast', 'crew', 'images', 'aliases', 'rating'):
|
|
show_dict[k] = getattr(_s_d, show_map.get(k, k), clean_data(show_dict[k]))
|
|
_s_o.aliases = [clean_data(a.name) for a in _s_d.akas]
|
|
_s_o.runtime = _s_d.average_runtime or _s_d.runtime
|
|
p_set = False
|
|
if _s_d.image:
|
|
p_set = True
|
|
_s_o.poster = _s_d.image.get('original')
|
|
_s_o.poster_thumb = _s_d.image.get('medium')
|
|
|
|
if load_images and \
|
|
not all(getattr(_s_o, '%s_loaded' % t, False) for t in ('poster', 'banner', 'fanart')):
|
|
if _s_d.images:
|
|
_s_o.poster_loaded = True
|
|
_s_o.banner_loaded = True
|
|
_s_o.fanart_loaded = True
|
|
self._set_images(_s_o, _s_d, p_set)
|
|
|
|
if _s_d.schedule:
|
|
if 'time' in _s_d.schedule:
|
|
_s_o.airs_time = _s_d.schedule['time']
|
|
try:
|
|
h, m = _s_d.schedule['time'].split(':')
|
|
h, m = try_int(h, None), try_int(m, None)
|
|
if None is not h and None is not m:
|
|
_s_o.time = datetime.time(hour=h, minute=m)
|
|
except (BaseException, Exception):
|
|
pass
|
|
if 'days' in _s_d.schedule:
|
|
_s_o.airs_dayofweek = ', '.join(_s_d.schedule['days'])
|
|
|
|
if load_actors and not _s_o.actors_loaded:
|
|
if _s_d.cast:
|
|
character_person_ids = {}
|
|
for cur_ch in _s_o.cast[RoleTypes.ActorMain]:
|
|
character_person_ids.setdefault(cur_ch.id, []).extend([p.id for p in cur_ch.person])
|
|
for cur_ch in _s_d.cast.characters:
|
|
existing_character = next(
|
|
(c for c in _s_o.cast[RoleTypes.ActorMain] if c.id == cur_ch.id),
|
|
None) # type: Optional[TVInfoCharacter]
|
|
person = self._convert_person(cur_ch.person, load_credits=False)
|
|
if existing_character:
|
|
existing_person = next((p for p in existing_character.person
|
|
if person.id == p.ids.get(TVINFO_TVMAZE)),
|
|
None) # type: TVInfoPerson
|
|
if existing_person:
|
|
try:
|
|
character_person_ids[cur_ch.id].remove(existing_person.id)
|
|
except (BaseException, Exception):
|
|
print('error')
|
|
pass
|
|
(existing_person.p_id, existing_person.name, existing_person.image,
|
|
existing_person.gender,
|
|
existing_person.birthdate, existing_person.deathdate, existing_person.country,
|
|
existing_person.country_code, existing_person.country_timezone,
|
|
existing_person.thumb_url,
|
|
existing_person.url, existing_person.ids) = \
|
|
(cur_ch.person.id, clean_data(cur_ch.person.name),
|
|
cur_ch.person.image and cur_ch.person.image.get('original'),
|
|
PersonGenders.named.get(
|
|
cur_ch.person.gender and cur_ch.person.gender.lower(),
|
|
PersonGenders.unknown),
|
|
person.birthdate, person.deathdate,
|
|
cur_ch.person.country and clean_data(cur_ch.person.country.get('name')),
|
|
cur_ch.person.country and clean_data(cur_ch.person.country.get('code')),
|
|
cur_ch.person.country and clean_data(cur_ch.person.country.get('timezone')),
|
|
cur_ch.person.image and cur_ch.person.image.get('medium'),
|
|
cur_ch.person.url, {TVINFO_TVMAZE: cur_ch.person.id})
|
|
else:
|
|
existing_character.person.append(person)
|
|
else:
|
|
_s_o.cast[RoleTypes.ActorMain].append(
|
|
TVInfoCharacter(image=cur_ch.image and cur_ch.image.get('original'),
|
|
name=self._clean_character_name(cur_ch.name),
|
|
ids=TVInfoIDs({TVINFO_TVMAZE: cur_ch.id}),
|
|
p_id=cur_ch.id, person=[person], plays_self=cur_ch.plays_self,
|
|
thumb_url=cur_ch.image and cur_ch.image.get('medium'),
|
|
ti_show=_s_o
|
|
))
|
|
|
|
if character_person_ids:
|
|
for cur_ch, cur_p_ids in iteritems(character_person_ids):
|
|
if cur_p_ids:
|
|
char = next((mc for mc in _s_o.cast[RoleTypes.ActorMain] if mc.id == cur_ch),
|
|
None) # type: Optional[TVInfoCharacter]
|
|
if char:
|
|
char.person = [p for p in char.person if p.id not in cur_p_ids]
|
|
|
|
if _s_d.cast:
|
|
_s_o.actors = [
|
|
{'character': {'id': ch.id,
|
|
'name': clean_data(ch.name),
|
|
'url': 'https://www.tvmaze.com/character/view?id=%s' % ch.id,
|
|
'image': ch.image and ch.image.get('original'),
|
|
},
|
|
'person': {'id': ch.person and ch.person.id,
|
|
'name': ch.person and clean_data(ch.person.name),
|
|
'url': ch.person and 'https://www.tvmaze.com/person/view?id=%s' % ch.person.id,
|
|
'image': ch.person and ch.person.image and ch.person.image.get('original'),
|
|
'birthday': None, # not sure about format
|
|
'deathday': None, # not sure about format
|
|
'gender': ch.person and ch.person.gender and ch.person.gender,
|
|
'country': ch.person and ch.person.country and
|
|
clean_data(ch.person.country.get('name')),
|
|
},
|
|
} for ch in _s_d.cast.characters]
|
|
|
|
if _s_d.crew:
|
|
for cur_cw in _s_d.crew:
|
|
rt = crew_type_names.get(cur_cw.type.lower(), RoleTypes.CrewOther)
|
|
_s_o.crew[rt].append(
|
|
Crew(p_id=cur_cw.person.id, name=clean_data(cur_cw.person.name),
|
|
image=cur_cw.person.image and cur_cw.person.image.get('original'),
|
|
gender=cur_cw.person.gender,
|
|
birthdate=cur_cw.person.birthday, deathdate=cur_cw.person.death_day,
|
|
country=cur_cw.person.country and cur_cw.person.country.get('name'),
|
|
country_code=cur_cw.person.country and clean_data(
|
|
cur_cw.person.country.get('code')),
|
|
country_timezone=cur_cw.person.country
|
|
and clean_data(cur_cw.person.country.get('timezone')),
|
|
crew_type_name=cur_cw.type,
|
|
)
|
|
)
|
|
|
|
if _s_d.externals:
|
|
_s_o.ids = TVInfoIDs(tvdb=_s_d.externals.get('thetvdb'),
|
|
rage=_s_d.externals.get('tvrage'),
|
|
imdb=clean_data(_s_d.externals.get('imdb') and
|
|
try_int(_s_d.externals.get('imdb').replace('tt', ''),
|
|
None)))
|
|
|
|
if _s_d.network:
|
|
self._set_network(_s_o, _s_d.network, False)
|
|
elif _s_d.web_channel:
|
|
self._set_network(_s_o, _s_d.web_channel, True)
|
|
|
|
return _s_o
|
|
except (BaseException, Exception):
|
|
pass
|
|
|
|
def _search_person(self, name=None, ids=None):
|
|
# type: (AnyStr, Dict[integer_types, integer_types]) -> List[TVInfoPerson]
|
|
urls, result, ids = [], [], ids or {}
|
|
for tv_src in self.supported_person_id_searches:
|
|
if tv_src in ids:
|
|
if TVINFO_TVMAZE == tv_src:
|
|
try:
|
|
r = self.get_person(ids[tv_src])
|
|
except ConnectionSkipException as e:
|
|
raise e
|
|
except (BaseException, Exception):
|
|
r = None
|
|
if r:
|
|
result.append(r)
|
|
if name:
|
|
try:
|
|
r = tvmaze.people_search(name)
|
|
except ConnectionSkipException as e:
|
|
raise e
|
|
except (BaseException, Exception):
|
|
r = None
|
|
if r:
|
|
for p in r:
|
|
if not any(1 for ep in result if p.id == ep.id):
|
|
result.append(self._convert_person(p))
|
|
return result
|
|
|
|
def get_person(self, p_id, get_show_credits=False, get_images=False, **kwargs):
|
|
# type: (integer_types, bool, bool, Any) -> Optional[TVInfoPerson]
|
|
if not p_id:
|
|
return
|
|
kw = {}
|
|
to_embed = []
|
|
if get_show_credits:
|
|
to_embed.append('castcredits')
|
|
if to_embed:
|
|
kw['embed'] = ','.join(to_embed)
|
|
try:
|
|
p = tvmaze.person_main_info(p_id, **kw)
|
|
except ConnectionSkipException as e:
|
|
raise e
|
|
except (BaseException, Exception):
|
|
p = None
|
|
if p:
|
|
return self._convert_person(p)
|
|
|
|
def get_premieres(self, **kwargs):
|
|
# type: (...) -> List[TVInfoShow]
|
|
return [_e.show for _e in self._filtered_schedule(**kwargs).get('premieres')]
|
|
|
|
def get_returning(self, **kwargs):
|
|
# type: (...) -> List[TVInfoShow]
|
|
return [_e.show for _e in self._filtered_schedule(**kwargs).get('returning')]
|
|
|
|
def _make_episode(self, episode_data, show_data=None, get_images=False, get_akas=False, show_obj=None):
|
|
# type: (TVMazeEpisode, TVMazeShow, bool, bool, TVInfoShow) -> TVInfoEpisode
|
|
"""
|
|
make out of TVMazeEpisode object and optionally TVMazeShow a TVInfoEpisode
|
|
"""
|
|
if None is not show_obj:
|
|
ti_show = show_obj
|
|
else:
|
|
ti_show = TVInfoShow()
|
|
ti_show.seriesname = clean_data(show_data.name)
|
|
ti_show.id = show_data.maze_id
|
|
ti_show.seriesid = ti_show.id
|
|
ti_show.language = clean_data(show_data.language)
|
|
ti_show.overview = enforce_type(clean_data(show_data.summary), str, '')
|
|
ti_show.firstaired = clean_data(show_data.premiered)
|
|
ti_show.runtime = show_data.average_runtime or show_data.runtime
|
|
ti_show.vote_average = show_data.rating and show_data.rating.get('average')
|
|
ti_show.rating = ti_show.vote_average
|
|
ti_show.popularity = show_data.weight
|
|
ti_show.genre_list = clean_data(show_data.genres or [])
|
|
ti_show.genre = '|'.join(ti_show.genre_list).lower()
|
|
ti_show.official_site = clean_data(show_data.official_site)
|
|
ti_show.status = clean_data(show_data.status)
|
|
ti_show.show_type = clean_data((isinstance(show_data.type, string_types) and [show_data.type.lower()] or
|
|
isinstance(show_data.type, list) and [x.lower() for x in show_data.type] or []))
|
|
ti_show.lastupdated = show_data.updated
|
|
ti_show.poster = show_data.image and show_data.image.get('original')
|
|
if get_akas:
|
|
ti_show.aliases = [clean_data(a.name) for a in show_data.akas]
|
|
if show_data.schedule and 'days' in show_data.schedule:
|
|
ti_show.airs_dayofweek = ', '.join(clean_data(show_data.schedule['days']))
|
|
network = show_data.network or show_data.web_channel
|
|
if network:
|
|
ti_show.network_is_stream = None is not show_data.web_channel
|
|
ti_show.network = clean_data(network.name)
|
|
ti_show.network_id = network.maze_id
|
|
ti_show.network_country = clean_data(network.country)
|
|
ti_show.network_country_code = clean_data(network.code)
|
|
ti_show.network_timezone = clean_data(network.timezone)
|
|
if get_images and show_data.images:
|
|
self._set_images(ti_show, show_data, False)
|
|
ti_show.ids = TVInfoIDs(
|
|
tvdb=show_data.externals.get('thetvdb'), rage=show_data.externals.get('tvrage'), tvmaze=show_data.id,
|
|
imdb=clean_data(show_data.externals.get('imdb') and
|
|
try_int(show_data.externals.get('imdb').replace('tt', ''), None)))
|
|
ti_show.imdb_id = clean_data(show_data.externals.get('imdb'))
|
|
if isinstance(ti_show.imdb_id, integer_types):
|
|
ti_show.imdb_id = 'tt%07d' % ti_show.imdb_id
|
|
|
|
ti_episode = TVInfoEpisode(show=ti_show)
|
|
ti_episode.id = episode_data.maze_id
|
|
ti_episode.seasonnumber = episode_data.season_number
|
|
ti_episode.episodenumber = episode_data.episode_number or 0
|
|
ti_episode.episodename = clean_data(episode_data.title)
|
|
try:
|
|
ti_episode.airtime = datetime.time.fromisoformat(clean_data(episode_data.airtime))
|
|
except (BaseException, Exception):
|
|
ti_episode.airtime = None
|
|
ti_episode.firstaired = clean_data(episode_data.airdate)
|
|
if episode_data.airstamp:
|
|
try:
|
|
at = _datetime_to_timestamp(tz_p.parse(episode_data.airstamp))
|
|
ti_episode.timestamp = at
|
|
except (BaseException, Exception):
|
|
pass
|
|
ti_episode.filename = episode_data.image and (episode_data.image.get('original') or
|
|
episode_data.image.get('medium'))
|
|
ti_episode.is_special = episode_data.is_special()
|
|
ti_episode.overview = enforce_type(clean_data(episode_data.summary), str, '')
|
|
ti_episode.runtime = episode_data.runtime
|
|
if ti_episode.seasonnumber not in ti_show:
|
|
season = TVInfoSeason(show=ti_show, number=ti_episode.seasonnumber)
|
|
ti_show[ti_episode.seasonnumber] = season
|
|
ti_episode.season = season
|
|
ti_show[ti_episode.seasonnumber][ti_episode.episodenumber] = ti_episode
|
|
return ti_episode
|
|
|
|
def _filtered_schedule(self, **kwargs):
|
|
cache_name_key = 'tvmaze_schedule'
|
|
is_none, schedule = self._get_cache_entry(cache_name_key)
|
|
if None is schedule and not is_none:
|
|
schedule = []
|
|
try:
|
|
schedule = tvmaze.get_full_schedule()
|
|
except(BaseException, Exception):
|
|
pass
|
|
|
|
premieres = []
|
|
returning = []
|
|
rc_lang = re.compile('(?i)eng|jap')
|
|
for cur_show in filter(lambda s: 1 == s.episode_number and (
|
|
None is s.show.language or rc_lang.search(s.show.language)), schedule):
|
|
if 1 == cur_show.season_number:
|
|
premieres += [cur_show]
|
|
else:
|
|
returning += [cur_show]
|
|
|
|
premieres = [self._make_episode(r, r.show, **kwargs)
|
|
for r in sorted(premieres, key=lambda e: e.show.premiered)]
|
|
returning = [self._make_episode(r, r.show, **kwargs)
|
|
for r in sorted(returning, key=lambda e: e.airstamp)]
|
|
|
|
schedule = dict(premieres=premieres, returning=returning)
|
|
self._set_cache_entry(cache_name_key, schedule, expire=self.schedule_cache_expire)
|
|
|
|
return schedule
|