# encoding:utf-8 # author:Prinz23 # project:tmdb_api __author__ = 'Prinz23' __version__ = '1.0' __api_version__ = '1.0.0' import datetime import logging import re from lib import tmdbsimple from lib.dateutil.parser import parser from lib.exceptions_helper import ConnectionSkipException, ex from lib.tvinfo_base import CastList, PersonGenders, RoleTypes, \ TVInfoBase, TVInfoIDs, TVInfoImage, TVInfoImageSize, TVInfoImageType, TVInfoNetwork, TVInfoSocialIDs, \ TVInfoCharacter, TVInfoPerson, TVInfoShow, TVInfoEpisode, TVInfoSeason, \ TVINFO_IMDB, TVINFO_TMDB, TVINFO_TVDB, \ TVINFO_FACEBOOK, TVINFO_INSTAGRAM, TVINFO_X from json_helper import json_dumps from sg_helpers import clean_data, enforce_type, get_url, iterate_chunk, try_int from six import iteritems # noinspection PyUnreachableCode if False: from typing import Any, AnyStr, Dict, List, Optional, Union from six import integer_types log = logging.getLogger('tmdb.api') log.addHandler(logging.NullHandler()) tz_p = parser() tmdbsimple.API_KEY = 'edc5f123313769de83a71e157758030b' id_map = {TVINFO_IMDB: 'imdb_id', TVINFO_TVDB: 'tvdb_id', TVINFO_FACEBOOK: 'facebook_id', TVINFO_INSTAGRAM: 'instagram_id', TVINFO_X: 'twitter_id'} tv_show_map = dict( name='seriesname', id='id', first_air_date='firstaired', status='status', original_language='language') ep_group_types = { 1: 'Original air date', 2: 'Absolute', 3: 'DVD', 4: 'Digital', 5: 'Story arc', 6: 'Production', 7: 'TV' } empty_ep = TVInfoEpisode() def tmdb_get(self, path, params=None): url = self._get_complete_url(path) params = self._get_params(params) return get_url(url=url, params=params, json=True, raise_skip_exception=True) def tmdb_post(self, path, params=None, payload=None): url = self._get_complete_url(path) params = self._get_params(params) data = json_dumps(payload) if payload else payload return get_url(url=url, params=params, post_data=data, json=True, raise_skip_exception=True) tmdbsimple.base.TMDB._GET = tmdb_get tmdbsimple.base.TMDB._POST = tmdb_post _TMDB_CONSTANTS_CACHE = {'date': datetime.datetime(2000, 1, 1), 'data': {}} def get_tmdb_constants(): # type: (...) -> Dict """return tmdbsimple Configuration().info() or cached copy""" global _TMDB_CONSTANTS_CACHE # only retrieve info data if older than 3 days if 3 < (datetime.datetime.now() - _TMDB_CONSTANTS_CACHE['date']).days or not _TMDB_CONSTANTS_CACHE['data']: try: tv_genres = {g['id']: g['name'] for g in tmdbsimple.Genres().tv_list()['genres']} response = tmdbsimple.Configuration().info() sorted_poster_sizes = sorted((try_int(_p.replace('w', '')) for _p in response['images']['poster_sizes'] if 'original' != _p), reverse=True) sorted_backdrop_sizes = sorted((try_int(_p.replace('w', '')) for _p in response['images']['backdrop_sizes'] if 'original' != _p), reverse=True) sorted_profile_sizes = sorted((try_int(_p.replace('w', '')) for _p in response['images']['profile_sizes'] if 'original' != _p and not _p.startswith('h')), reverse=True) sorted_still_sizes = sorted((try_int(_p.replace('w', '')) for _p in response['images']['still_sizes'] if 'original' != _p and not _p.startswith('h')), reverse=True) _TMDB_CONSTANTS_CACHE = dict( date=datetime.datetime.now(), data=dict( genres=tv_genres, img_base_url=response['images']['secure_base_url'], img_profile_sizes=response['images']['profile_sizes'], poster_sizes=response['images']['poster_sizes'], backdrop_sizes=response['images']['backdrop_sizes'], logo_sizes=response['images']['logo_sizes'], still_sizes=response['images']['still_sizes'], change_keys=response['change_keys'] ) ) except (BaseException, Exception): poster_sizes = ['w92', 'w154', 'w185', 'w342', 'w500', 'w780', 'original'] sorted_poster_sizes = sorted((try_int(_p.replace('w', '')) for _p in poster_sizes if 'original' != _p), reverse=True) backdrop_sizes = ['w300', 'w780', 'w1280', 'original'] sorted_backdrop_sizes = sorted((try_int(_p.replace('w', '')) for _p in backdrop_sizes if 'original' != _p), reverse=True) profile_sizes = ['w45', 'w185', 'h632', 'original'] sorted_profile_sizes = sorted((try_int(_p.replace('w', '')) for _p in profile_sizes if 'original' != _p and not _p.startswith('h')), reverse=True) still_sizes = ['w92', 'w185', 'w300', 'original'] sorted_still_sizes = sorted((try_int(_p.replace('w', '')) for _p in still_sizes if 'original' != _p and not _p.startswith('h')), reverse=True) _TMDB_CONSTANTS_CACHE['data'] = dict( genres={16: 'Animation', 18: 'Drama', 35: 'Comedy', 37: 'Western', 80: 'Crime', 99: 'Documentary', 9648: 'Mystery', 10751: 'Family', 10759: 'Action & Adventure', 10762: 'Kids', 10763: 'News', 10764: 'Reality', 10765: 'Sci-Fi & Fantasy', 10766: 'Soap', 10767: 'Talk', 10768: 'War & Politics'}, img_base_url=r'https://image.tmdb.org/t/p/', img_profile_sizes=['w45', 'w185', 'h632', 'original'], poster_sizes=poster_sizes, backdrop_sizes=backdrop_sizes, logo_sizes=['w45', 'w92', 'w154', 'w185', 'w300', 'w500', 'original'], still_sizes=['w92', 'w185', 'w300', 'original'], change_keys=['adult', 'air_date', 'also_known_as', 'alternative_titles', 'biography', 'birthday', 'budget', 'cast', 'certifications', 'character_names', 'created_by', 'crew', 'deathday', 'episode', 'episode_number', 'episode_run_time', 'freebase_id', 'freebase_mid', 'general', 'genres', 'guest_stars', 'homepage', 'images', 'imdb_id', 'languages', 'name', 'network', 'origin_country', 'original_name', 'original_title', 'overview', 'parts', 'place_of_birth', 'plot_keywords', 'production_code', 'production_companies', 'production_countries', 'releases', 'revenue', 'runtime', 'season', 'season_number', 'season_regular', 'spoken_languages', 'status', 'tagline', 'title', 'translations', 'tvdb_id', 'tvrage_id', 'type', 'video', 'videos'] ) if isinstance(_TMDB_CONSTANTS_CACHE, dict) and 'data' in _TMDB_CONSTANTS_CACHE: _TMDB_CONSTANTS_CACHE['data'].update(dict( size_map={ TVInfoImageType.poster: { TVInfoImageSize.original: 'original', TVInfoImageSize.medium: 'w%s' % next((s for s in sorted_poster_sizes if s < 400), 342), TVInfoImageSize.small: 'w%s' % next((s for s in sorted_poster_sizes if s < 200), 185) }, TVInfoImageType.fanart: { TVInfoImageSize.original: 'original', TVInfoImageSize.medium: 'w%s' % next((s for s in sorted_backdrop_sizes if s < 1000), 780), TVInfoImageSize.small: 'w%s' % next((s for s in sorted_backdrop_sizes if s < 500), 300) }, TVInfoImageType.person_poster: { TVInfoImageSize.original: 'original', TVInfoImageSize.medium: 'w%s' % next((s for s in sorted_profile_sizes if s < 400), 185), TVInfoImageSize.small: 'w%s' % next((s for s in sorted_profile_sizes if s < 150), 45) }, TVInfoImageType.still: { TVInfoImageSize.original: 'original', TVInfoImageSize.medium: 'w%s' % next((s for s in sorted_still_sizes if s < 400), 185), TVInfoImageSize.small: 'w%s' % next((s for s in sorted_still_sizes if s < 150), 45) } } )) return _TMDB_CONSTANTS_CACHE['data'] class TmdbIndexer(TVInfoBase): API_KEY = tmdbsimple.API_KEY supported_person_id_searches = [TVINFO_FACEBOOK, TVINFO_IMDB, TVINFO_INSTAGRAM, TVINFO_TMDB, TVINFO_X] supported_id_searches = [TVINFO_IMDB, TVINFO_TMDB, TVINFO_TVDB] map_languages = {} reverse_map_languages = {v: k for k, v in iteritems(map_languages)} _tmdb_lang_list = None # noinspection PyUnusedLocal # noinspection PyDefaultArgument def __init__(self, *args, **kwargs): super(TmdbIndexer, self).__init__(*args, **kwargs) response = get_tmdb_constants() self.img_base_url = response.get('img_base_url') self.size_map = response.get('size_map') self.tv_genres = response.get('genres') def _search_show(self, name=None, ids=None, lang=None, **kwargs): # type: (Union[AnyStr, List[AnyStr]], Dict[integer_types, integer_types], Optional[string_types], Optional[Any]) -> List[Dict] """This searches TMDB for the series name, """ tmdb_lang = ('en-US', lang)[lang in self._tmdb_supported_lang_list] def _make_result_dict(s): ti_show = TVInfoShow() ti_show.seriesname, ti_show.id, ti_show.seriesid, ti_show.firstaired, ti_show.genre_list, \ ti_show.overview, ti_show.poster, ti_show.ids, ti_show.language, ti_show.popularity, ti_show.rating = \ clean_data(s['name']), s['id'], s['id'], clean_data(s.get('first_air_date')) or None, \ clean_data([self.tv_genres.get(g) for g in s.get('genre_ids') or []]), \ self._enforce_text(s.get('overview')), s.get('poster_path') and '%s%s%s' % ( self.img_base_url, self.size_map[TVInfoImageType.poster][TVInfoImageSize.original], s.get('poster_path')), \ TVInfoIDs(tvdb=s.get('external_ids') and s['external_ids'].get('tvdb_id'), tmdb=s['id'], rage=s.get('external_ids') and s['external_ids'].get('tvrage_id'), imdb=s.get('external_ids') and s['external_ids'].get('imdb_id') and try_int(s['external_ids'].get('imdb_id', '').replace('tt', ''), None)), \ clean_data(s.get('original_language')), s.get('popularity'), s.get('vote_average') ti_show.genre = '|'.join(ti_show.genre_list or []) return ti_show results = [] if ids: for t, p in iteritems(ids): if t in self.supported_id_searches: if t == TVINFO_TMDB: cache_id_key = 's-id-%s-%s' % (TVINFO_TMDB, p) is_none, shows = self._get_cache_entry(cache_id_key) if not self.config.get('cache_search') or (None is shows and not is_none): try: show = tmdbsimple.TV(id=p).info(append_to_response='external_ids', language=tmdb_lang) except (BaseException, Exception): continue self._set_cache_entry(cache_id_key, show, expire=self.search_cache_expire) else: show = shows if show: results.extend([_make_result_dict(show)]) elif t in (TVINFO_IMDB, TVINFO_TVDB): cache_id_key = 's-id-%s-%s' % (t, p) is_none, shows = self._get_cache_entry(cache_id_key) if not self.config.get('cache_search') or (None is shows and not is_none): try: show = tmdbsimple.Find(id=(p, 'tt%07d' % p)[t == TVINFO_IMDB]).info( external_source=id_map[t], language=tmdb_lang) if show.get('tv_results') and 1 == len(show['tv_results']): show = tmdbsimple.TV(id=show['tv_results'][0]['id']).info( append_to_response='external_ids', language=tmdb_lang) except (BaseException, Exception): continue self._set_cache_entry(cache_id_key, show, expire=self.search_cache_expire) else: show = shows if show: results.extend([_make_result_dict(s) for s in show.get('tv_results') or (show.get('id') and [show]) or []]) if name: for n in ([name], name)[isinstance(name, list)]: cache_name_key = 's-name-%s' % n is_none, shows = self._get_cache_entry(cache_name_key) if not self.config.get('cache_search') or (None is shows and not is_none): try: shows = tmdbsimple.Search().tv(query=n, language=tmdb_lang) self._set_cache_entry(cache_name_key, shows, expire=self.search_cache_expire) results.extend([_make_result_dict(s) for s in shows.get('results') or []]) except (BaseException, Exception) as e: log.debug('Error searching for show: %s' % ex(e)) else: results.extend([_make_result_dict(s) for s in (shows and shows.get('results')) or []]) seen = set() results = [seen.add(r.id) or r for r in results if r.id not in seen] return results def _convert_person_obj(self, tmdb_person_obj): gender = PersonGenders.tmdb_map.get(tmdb_person_obj.get('gender'), PersonGenders.unknown) try: birthdate = tmdb_person_obj.get('birthday') and tz_p.parse(tmdb_person_obj.get('birthday')).date() except (BaseException, Exception): birthdate = None try: deathdate = tmdb_person_obj.get('deathday') and tz_p.parse(tmdb_person_obj.get('deathday')).date() except (BaseException, Exception): deathdate = None person_imdb_id = tmdb_person_obj.get('imdb_id') and try_int(tmdb_person_obj['imdb_id'].replace('nm', ''), None) person_ids = {TVINFO_TMDB: tmdb_person_obj.get('id')} if person_imdb_id: person_ids.update({TVINFO_IMDB: person_imdb_id}) pi = tmdb_person_obj.get('images') image_url, main_image, thumb_url, main_thumb, image_list = None, None, None, None, [] if pi: for i in sorted(pi['profiles'], key=lambda a: a['vote_average'] or 0, reverse=True): if not any((main_image, main_thumb)): if 500 < i['height'] and not image_url: image_url = '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.person_poster][TVInfoImageSize.original], i['file_path']) thumb_url = '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.person_poster][TVInfoImageSize.medium], i['file_path']) elif not thumb_url: thumb_url = '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.person_poster][TVInfoImageSize.original], i['file_path']) if image_url and thumb_url: main_image_url, main_thumb = image_url, thumb_url image_list.append( TVInfoImage( image_type=TVInfoImageType.person_poster, sizes={_s: '%s%s%s' % (self.img_base_url, self.size_map[TVInfoImageType.person_poster][_s], i['file_path']) for _s in (TVInfoImageSize.original, TVInfoImageSize.medium, TVInfoImageSize.small)}, aspect_ratio=i['aspect_ratio'], height=i['height'], width=i['width'], lang=clean_data(i['iso_639_1']), rating=i['vote_average'], votes=i['vote_count'] )) elif tmdb_person_obj.get('profile_path'): main_image = '%s%s%s' % ( self.img_base_url, self.size_map[TVInfoImageType.person_poster][TVInfoImageSize.original], tmdb_person_obj['profile_path']) main_thumb = '%s%s%s' % ( self.img_base_url, self.size_map[TVInfoImageType.person_poster][TVInfoImageSize.medium], tmdb_person_obj['profile_path']) clean_person_name = clean_data(tmdb_person_obj.get('name')) _it_person_obj = TVInfoPerson( p_id=tmdb_person_obj.get('id'), ids=TVInfoIDs(ids=person_ids), name=clean_person_name, akas=clean_data(set(tmdb_person_obj.get('also_known_as') or [])), bio=clean_data(tmdb_person_obj.get('biography')), gender=gender, image=main_image, images=image_list, thumb_url=main_thumb, birthdate=birthdate, birthplace=clean_data(tmdb_person_obj.get('place_of_birth')), deathdate=deathdate, homepage=tmdb_person_obj.get('homepage') ) cast = tmdb_person_obj.get('cast') or tmdb_person_obj.get('tv_credits', {}).get('cast') or \ tmdb_person_obj.get('known_for') characters = [] for character in cast or []: ti_show = TVInfoShow() ti_show.id = character.get('id') ti_show.ids = TVInfoIDs(ids={TVINFO_TMDB: ti_show.id}) ti_show.seriesname = enforce_type(clean_data(character.get('original_name')), str, '') ti_show.overview = self._enforce_text(character.get('overview')) ti_show.firstaired = clean_data(character.get('first_air_date')) ti_show.language = clean_data(character.get('original_language')) ti_show.popularity = character.get('popularity') ti_show.vote_count = character.get('vote_count') ti_show.vote_average = character.get('vote_average') ti_show.rating = ti_show.vote_average ti_show.genre_list = [] for g in character.get('genre_ids') or []: if g in self.tv_genres: ti_show.genre_list.append(self.tv_genres.get(g)) ti_show.genre = '|'.join(ti_show.genre_list) if character.get('poster_path'): ti_show.poster = '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.person_poster][TVInfoImageSize.original], character['poster_path']) ti_show.poster_thumb = '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.person_poster][TVInfoImageSize.medium], character['poster_path']) if character.get('backdrop_path'): ti_show.fanart = '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.person_poster][TVInfoImageSize.original], character['backdrop_path']) clean_char_name = clean_data(character.get('character')) clean_lower_person_name = (clean_person_name or '').lower() or None characters.append( TVInfoCharacter(name=clean_char_name, ti_show=ti_show, person=[_it_person_obj], episode_count=character.get('episode_count'), plays_self=enforce_type(clean_char_name and (clean_char_name or '').lower() in ('self', clean_lower_person_name), bool, False)), ) _it_person_obj.characters = characters return _it_person_obj def _search_person(self, name=None, ids=None): # type: (AnyStr, Dict[integer_types, integer_types]) -> List[TVInfoPerson] """ search for person by name :param name: name to search for :param ids: dict of ids to search :return: list of found person's """ results, ids = [], ids or {} search_text_obj = tmdbsimple.Search() for tv_src in self.supported_person_id_searches: if tv_src in ids: if TVINFO_TMDB == tv_src: try: people_obj = self.get_person(ids[tv_src]) except ConnectionSkipException as e: raise e except (BaseException, Exception): people_obj = None if people_obj and not any(1 for r in results if r.id == people_obj.id): results.append(people_obj) elif tv_src in (TVINFO_IMDB, TVINFO_TMDB): try: cache_key_name = 'p-src-%s-%s' % (tv_src, ids.get(tv_src)) is_none, result_objs = self._get_cache_entry(cache_key_name) if None is result_objs and not is_none: result_objs = tmdbsimple.Find(id=(ids.get(tv_src), 'nm%07d' % ids.get(tv_src))[TVINFO_IMDB == tv_src]).info( external_source=id_map[tv_src]).get('person_results') self._set_cache_entry(cache_key_name, result_objs) except ConnectionSkipException as e: raise e except (BaseException, Exception): result_objs = None if result_objs: for person_obj in result_objs: if not any(1 for r in results if r.id == person_obj['id']): results.append(self._convert_person_obj(person_obj)) else: continue if name: cache_key_name = 'p-src-text-%s' % name is_none, people_objs = self._get_cache_entry(cache_key_name) if None is people_objs and not is_none: try: people_objs = search_text_obj.person(query=name, include_adult=True) self._set_cache_entry(cache_key_name, people_objs) except ConnectionSkipException as e: raise e except (BaseException, Exception): people_objs = None if people_objs and people_objs.get('results'): for person_obj in people_objs['results']: if not any(1 for r in results if r.id == person_obj['id']): results.append(self._convert_person_obj(person_obj)) return results def get_person(self, p_id, get_show_credits=False, get_images=False, **kwargs): # type: (integer_types, bool, bool, Any) -> Optional[TVInfoPerson] kw = {} to_append = [] if get_show_credits: to_append.append('tv_credits') if get_images: to_append.append('images') if to_append: kw['append_to_response'] = ','.join(to_append) cache_key_name = 'p-%s-%s' % (p_id, '-'.join(to_append)) is_none, people_obj = self._get_cache_entry(cache_key_name) if None is people_obj and not is_none: try: people_obj = tmdbsimple.People(id=p_id).info(**kw) except ConnectionSkipException as e: raise e except (BaseException, Exception): people_obj = None self._set_cache_entry(cache_key_name, people_obj) if people_obj: return self._convert_person_obj(people_obj) def _convert_show(self, show_dict, show_obj=None): # type: (Dict, TVInfoShow) -> TVInfoShow if None is show_obj: ti_show = TVInfoShow() else: ti_show = show_obj if show_dict: ti_show.seriesname = clean_data(show_dict.get('name') or show_dict.get('original_name') or show_dict.get('original_title')) org_title = clean_data(show_dict.get('original_name') or show_dict.get('original_title')) if org_title != ti_show.seriesname: ti_show.aliases = [org_title] ti_show.id = show_dict.get('id') ti_show.seriesid = ti_show.id ti_show.language = clean_data(show_dict.get('original_language')) ti_show.spoken_languages = [_l['iso_639_1'] for _l in show_dict.get('spoken_languages') or []] ti_show.overview = self._enforce_text(show_dict.get('overview')) ti_show.status = clean_data(show_dict.get('status', '')) ti_show.show_type = clean_data((show_dict.get('type') and [show_dict['type']]) or []) ti_show.firstaired = clean_data(show_dict.get('first_air_date')) ti_show.popularity = show_dict.get('popularity') ti_show.vote_count = show_dict.get('vote_count') ti_show.vote_average = show_dict.get('vote_average') ti_show.rating = ti_show.vote_average ti_show.origin_countries = show_dict.get('origin_country') or [] ti_show.genre_list = [] ti_show.origin_countries = clean_data(show_dict.get('origin_country') or []) for g in show_dict.get('genre_ids') or []: if g in self.tv_genres: ti_show.genre_list.append(self.tv_genres.get(g)) ti_show.genre = '|'.join(ti_show.genre_list) runtime = None for r in sorted(show_dict.get('episode_run_time') or [], reverse=True): if 40 < r < 50: runtime = r break if 20 < r < 40: runtime = r break if not runtime and show_dict.get('episode_run_time'): runtime = max(show_dict.get('episode_run_time') or [0]) or None ti_show.runtime = runtime ti_show.networks = [ TVInfoNetwork(name=clean_data(n.get('name')), n_id=n.get('id'), country_code=clean_data(n.get('origin_country'))) for n in reversed(show_dict.get('networks') or []) ] if show_dict.get('networks'): ti_show.network = clean_data(show_dict['networks'][-1]['name']) ti_show.network_id = show_dict['networks'][-1].get('id') ti_show.network_country_code = clean_data(show_dict['networks'][-1].get('origin_country')) image_url = show_dict.get('poster_path') and '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.poster][TVInfoImageSize.original], show_dict.get('poster_path')) thumb_image_url = show_dict.get('poster_path') and '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.poster][TVInfoImageSize.small], show_dict.get('poster_path')) backdrop_url = show_dict.get('backdrop_path') and '%s%s%s' % \ (self.img_base_url, self.size_map[TVInfoImageType.fanart][TVInfoImageSize.original], show_dict.get('backdrop_path')) ti_show.ids = TVInfoIDs(tvdb=show_dict.get('external_ids', {}).get('tvdb_id'), tmdb=show_dict['id'], rage=show_dict.get('external_ids', {}).get('tvrage_id'), imdb=show_dict.get('external_ids', {}).get('imdb_id') and try_int( show_dict.get('external_ids', {}).get('imdb_id', '').replace('tt', ''), None)) ti_show.social_ids = TVInfoSocialIDs(twitter=show_dict.get('external_ids', {}).get('twitter_id'), instagram=show_dict.get('external_ids', {}).get('instagram_id'), facebook=show_dict.get('external_ids', {}).get('facebook_id')) ti_show.poster = image_url ti_show.poster_thumb = thumb_image_url ti_show.fanart = backdrop_url return ti_show def _get_show_list(self, src_method, result_count, **kwargs): result = [] try: c_page = 1 while len(result) < result_count: results = src_method(page=c_page, **kwargs) t_pages = results.get('total_pages') if c_page != results.get('page') or c_page >= t_pages: break c_page += 1 if results and 'results' in results: result += [self._convert_show(t) for t in results['results']] else: break except (BaseException, Exception): pass return result[:result_count] def get_similar(self, tvid, result_count=100, **kwargs): # type: (integer_types, int, Any) -> List[TVInfoShow] """ list of similar shows to the provided tv id :param tvid: id to find similar shows for :param result_count: result count to returned """ return self._get_show_list(tmdbsimple.TV(id=tvid).similar, result_count) def get_recommended_for_show(self, tvid, result_count=100, **kwargs): # type: (integer_types, int, Any) -> List[TVInfoShow] """ list of recommended shows to the provided tv id :param tvid: id to find recommended shows for :param result_count: result count to returned """ return self._get_show_list(tmdbsimple.TV(id=tvid).recommendations, result_count) def get_trending(self, result_count=100, time_window='day', **kwargs): # type: (int, str, Any) -> List[TVInfoShow] """ list of trending tv shows for day or week :param result_count: :param time_window: valid values: 'day', 'week' """ t_windows = ('day', 'week')['week' == time_window] return self._get_show_list(tmdbsimple.Trending(media_type='tv', time_window=t_windows).info, result_count) def get_popular(self, result_count=100, **kwargs): # type: (int, Any) -> List[TVInfoShow] return self._get_show_list(tmdbsimple.TV().popular, result_count) def get_top_rated(self, result_count=100, **kwargs): # type: (int, Any) -> List[TVInfoShow] return self._get_show_list(tmdbsimple.TV().top_rated, result_count) def discover(self, result_count=100, **kwargs): # type: (int, Any) -> List[TVInfoShow] """ Discover TV shows by different types of data like average rating, number of votes, genres, the network they aired on and air dates. Discover also supports a nice list of sort options. See below for all the available options. Also note that a number of filters support being comma (,) or pipe (|) separated. Commas are treated like an AND query while pipe's are an OR. Some examples of what can be done with discover can be found at https://www.themoviedb.org/documentation/api/discover. kwargs: language: (optional) ISO 639-1 code. sort_by: (optional) Available options are 'vote_average.desc', 'vote_average.asc', 'first_air_date.desc', 'first_air_date.asc', 'popularity.desc', 'popularity.asc' sort_by: (optional) Allowed values: vote_average.desc, vote_average.asc, first_air_date.desc, first_air_date.asc, popularity.desc, popularity.asc Default: popularity.desc air_date.gte: (optional) Filter and only include TV shows that have an air date (by looking at all episodes) that is greater or equal to the specified value. air_date.lte: (optional) Filter and only include TV shows that have an air date (by looking at all episodes) that is less than or equal to the specified value. first_air_date.gte: (optional) Filter and only include TV shows that have an original air date that is greater or equal to the specified value. Can be used in conjunction with the "include_null_first_air_dates" filter if you want to include items with no air date. first_air_date.lte: (optional) Filter and only include TV shows that have an original air date that is less than or equal to the specified value. Can be used in conjunction with the "include_null_first_air_dates" filter if you want to include items with no air date. first_air_date_year: (optional) Filter and only include TV shows that have an original air date year that equal to the specified value. Can be used in conjunction with the "include_null_first_air_dates" filter if you want to include items with no air date. timezone: (optional) Used in conjunction with the air_date.gte/lte filter to calculate the proper UTC offset. Default America/New_York. vote_average.gte: (optional) Filter and only include movies that have a rating that is greater or equal to the specified value. Minimum 0. vote_count.gte: (optional) Filter and only include movies that have a rating that is less than or equal to the specified value. Minimum 0. with_genres: (optional) Comma separated value of genre ids that you want to include in the results. with_networks: (optional) Comma separated value of network ids that you want to include in the results. without_genres: (optional) Comma separated value of genre ids that you want to exclude from the results. with_runtime.gte: (optional) Filter and only include TV shows with an episode runtime that is greater than or equal to a value. with_runtime.lte: (optional) Filter and only include TV shows with an episode runtime that is less than or equal to a value. include_null_first_air_dates: (optional) Use this filter to include TV shows that don't have an air date while using any of the "first_air_date" filters. with_original_language: (optional) Specify an ISO 639-1 string to filter results by their original language value. without_keywords: (optional) Exclude items with certain keywords. You can comma and pipe seperate these values to create an 'AND' or 'OR' logic. screened_theatrically: (optional) Filter results to include items that have been screened theatrically. with_companies: (optional) A comma separated list of production company ID's. Only include movies that have one of the ID's added as a production company. with_keywords: (optional) A comma separated list of keyword ID's. Only includes TV shows that have one of the ID's added as a keyword. :param result_count: """ if not kwargs: # use default if now kwargs are set = return all future airdate shows with language set to 'en' kwargs.update({'sort_by': 'first_air_date.asc', 'first_air_date.gte': datetime.date.today().strftime('%Y-%m-%d'), 'with_original_language': 'en', }) return self._get_show_list(tmdbsimple.Discover().tv, result_count, **kwargs) def _get_show_data(self, sid, language, get_ep_info=False, banners=False, posters=False, seasons=False, seasonwides=False, fanart=False, actors=False, **kwargs): # type: (integer_types, AnyStr, bool, bool, bool, bool, bool, bool, bool, Optional[Any]) -> bool # note: this is only working for images fetching currently self.show_not_found = False to_append = ['external_ids', 'alternative_titles', 'content_ratings', 'translations'] tmdb_lang = ('en-US', language)[language in self._tmdb_supported_lang_list] if any((banners, posters, seasons, seasonwides, fanart)): to_append.append('images') if (actors or self.config['actors_enabled']) and not getattr(self.ti_shows.get(sid), 'actors_loaded', False): to_append.append('aggregate_credits') if get_ep_info and not getattr(self.ti_shows.get(sid), 'ep_loaded', False): to_append.append('episode_groups') try: tmdb = tmdbsimple.TV(sid) show_data = tmdb.info(append_to_response=','.join(to_append), language=tmdb_lang) if tmdb_lang not in (_l['iso_639_1'] for _l in show_data['translations'].get('translations', []) or []): tmdb_lang = 'en' show_data = tmdb.info(append_to_response=','.join(to_append), language=tmdb_lang) except (BaseException, Exception): self.show_not_found = True return False if not show_data: self.show_not_found = True return False show_obj = self.ti_shows[sid] self._convert_show(show_data, show_obj) if 'images' in show_data: show_obj.poster_loaded = True show_obj.banner_loaded = True show_obj.fanart_loaded = True for img_type, img_list in iteritems(show_data['images']): map_img_type = {'backdrops': TVInfoImageType.fanart, 'posters': TVInfoImageType.poster}.get(img_type) if None is not map_img_type: for img in img_list: if None is not img.get('iso_639_1') and img.get('iso_639_1') != tmdb_lang: continue show_obj.images.setdefault(map_img_type, []).append( TVInfoImage( image_type=map_img_type, sizes={ t_s: '%s%s%s' % ( self.img_base_url, self.size_map[map_img_type][t_s], img['file_path']) for t_s in [TVInfoImageSize.original, TVInfoImageSize.medium, TVInfoImageSize.small] }, rating=img['vote_average'], votes=img['vote_count'], lang=img['iso_639_1'], height=img['height'], width=img['width'], aspect_ratio=img['aspect_ratio'] ) ) season_cast_objs = {} if (actors or self.config['actors_enabled']) and not getattr(self.ti_shows.get(sid), 'actors_loaded', False): cast, show_obj.actors_loaded = CastList(), True if isinstance(show_data.get('aggregate_credits'), dict) and 'cast' in show_data['aggregate_credits'] and\ isinstance(show_data['aggregate_credits']['cast'], list): season_credits = [('season/%d/credits' % s['season_number'], s['season_number']) for s in show_data.get('seasons') or []] main_cast_ids, season_cast_ids, main_cast_credit_ids = {}, {}, set() for cur_seasons in iterate_chunk(season_credits, 20): try: season_data = tmdb.info(append_to_response=','.join(_c[0] for _c in cur_seasons), language=tmdb_lang) except (BaseException, Exception): season_data = None if season_data: main_cast_ids.update({season_cast_obj['id']: season_obj[1] for season_obj in cur_seasons for season_cast_obj in season_data[season_obj[0]].get('cast') or []}) main_cast_credit_ids.update({season_cast_obj['credit_id'] for season_obj in cur_seasons for season_cast_obj in season_data[season_obj[0]].get('cast') or []}) for season_obj in cur_seasons: season_cast_ids.setdefault(season_obj[1], []).extend([ season_cast_obj['id'] for season_cast_obj in season_data[season_obj[0]].get('cast') or []]) for person_obj in sorted(list(filter(lambda a: a['id'] in main_cast_ids, show_data['aggregate_credits']['cast'] or []))[:50], key=lambda c: (main_cast_ids.get(c['id'], 0) or 0, c['total_episode_count'], c['order'] * -1), reverse=True): for character in sorted(list(filter(lambda b: b['credit_id'] in main_cast_credit_ids, person_obj.get('roles', []) or [])), key=lambda c: c['episode_count'], reverse=True): clean_char_name = clean_data(character['character']) clean_person_name = clean_data(person_obj['name']) clean_lower_person_name = (clean_person_name or '').lower() or None character_obj = TVInfoCharacter( name=clean_char_name, plays_self=enforce_type(clean_char_name and (clean_char_name or '').lower() in ('self', clean_lower_person_name), bool, False), person=[ TVInfoPerson( p_id=person_obj['id'], name=clean_person_name, ids=TVInfoIDs(ids={TVINFO_TMDB: person_obj['id']}), image='%s%s%s' % ( self.img_base_url, self.size_map[TVInfoImageType.person_poster][ TVInfoImageSize.original], person_obj['profile_path']), thumb_url='%s%s%s' % ( self.img_base_url, self.size_map[TVInfoImageType.person_poster][ TVInfoImageSize.medium], person_obj['profile_path']), gender=PersonGenders.tmdb_map.get(person_obj.get('gender'), PersonGenders.unknown) )]) cast[RoleTypes.ActorMain].append(character_obj) for _s, _c in iteritems(season_cast_ids): if person_obj['id'] in _c: season_cast_objs.setdefault(_s, []).append(character_obj) show_obj.cast = cast show_obj.actors = [ {'character': {'id': ch.id, 'name': ch.name, 'image': ch.image, }, 'person': {'id': ch.person and ch.person[0].id, 'name': ch.person and ch.person[0].name, 'url': ch.person and 'https://www.themoviedb.org/person/%s' % ch.person[0].id, 'image': ch.person and ch.person[0].image, 'birthday': None, # not sure about format 'deathday': None, # not sure about format 'gender': None, 'country': None, }, } for ch in cast[RoleTypes.ActorMain]] if get_ep_info and not getattr(self.ti_shows.get(sid), 'ep_loaded', False): show_obj.ep_loaded = True seasons = ['season/%d' % s['season_number'] for s in show_data.get('seasons') or []] # call limited to 20 seasons per call for cur_seasons in iterate_chunk(seasons, 20): try: ep_data = tmdb.info(append_to_response=','.join(cur_seasons), language=tmdb_lang) except (BaseException, Exception): ep_data = None if ep_data: for season_obj in cur_seasons: for ep_obj in ep_data[season_obj]['episodes']: for _k, _s in ( ('seasonnumber', 'season_number'), ('episodenumber', 'episode_number'), ('episodename', 'name'), ('firstaired', 'air_date'), ('overview', 'overview'), ('id', 'id'), ('filename', 'still_path')): seas, ep, value = ep_obj['season_number'], ep_obj['episode_number'], \ clean_data(ep_obj.get(_s, getattr(empty_ep, _k))) if seas not in show_obj: show_obj[seas] = TVInfoSeason(show=show_obj) show_obj[seas].number = seas if seas in season_cast_objs: show_obj[seas].cast[RoleTypes.ActorMain] = season_cast_objs[seas] if ep not in show_obj[seas]: show_obj[seas][ep] = TVInfoEpisode(season=show_obj[seas], show=show_obj) if 'still_path' == _s: value = '%s%s%s' % (self.img_base_url, self.size_map[TVInfoImageType.still][TVInfoImageSize.original], value) show_obj[seas][ep].__dict__[_k] = value return True @property def _tmdb_supported_lang_list(self): if not TmdbIndexer._tmdb_lang_list: self._get_languages() return TmdbIndexer._tmdb_lang_list def _get_languages(self): # type: (...) -> None try: tmdb = tmdbsimple.Configuration() lang_data = tmdb.languages() except (BaseException, Exception): lang_data = None if lang_data: TmdbIndexer._supported_languages = [{ 'id': clean_data(a['iso_639_1']), 'name': clean_data(a['english_name']), 'nativeName': clean_data(a['name']), 'shortCode': None, 'sg_lang': clean_data(a['iso_639_1']) } for a in sorted(lang_data, key=lambda b: b['iso_639_1'])] TmdbIndexer._tmdb_lang_list = [a['id'] for a in self._supported_languages] else: TmdbIndexer._supported_languages = [] TmdbIndexer._tmdb_lang_list = [] @staticmethod def _enforce_text(text): """ Set nonsense text to an enforced type :param text: :type text: AnyStr :return: :rtype: AnyStr """ text = enforce_type(clean_data(text), str, '').strip() tmp = text.lower() if 'details here' == tmp \ or re.search(r'no(\s\w+){1,2}\savailable', tmp): return '' return text