# !/usr/bin/env python2 # encoding:utf-8 # author:dbr/Ben # project:tvdb_api # repository:http://github.com/dbr/tvdb_api # license:un license (http://unlicense.org/) from functools import wraps __author__ = 'dbr/Ben' __version__ = '2.0' __api_version__ = '3.0.0' import copy import datetime import getpass import logging import os import random import re import requests import requests.exceptions import tempfile import time import warnings from bs4_parser import BS4Parser from collections import OrderedDict from sg_helpers import clean_data, get_url, try_int from lib.cachecontrol import CacheControl, caches from lib.dateutil.parser import parse from lib.exceptions_helper import ConnectionSkipException from lib.tvinfo_base import CastList, TVInfoCharacter, CrewList, TVInfoPerson, RoleTypes, \ TVINFO_TVDB, TVINFO_TVDB_SLUG, TVInfoBase, TVInfoIDs, TVInfoNetwork, TVInfoShow from .tvdb_exceptions import TvdbError, TvdbShownotfound, TvdbTokenexpired from .tvdb_ui import BaseUI, ConsoleUI from six import integer_types, iteritems, PY2, string_types # noinspection PyUnreachableCode if False: # noinspection PyUnresolvedReferences from typing import Any, AnyStr, Dict, List, Optional, Union ENV = os.environ THETVDB_V2_API_TOKEN = {'token': None, 'datetime': datetime.datetime.fromordinal(1)} log = logging.getLogger('tvdb.api') log.addHandler(logging.NullHandler()) # noinspection HttpUrlsUsage,PyUnusedLocal def _record_hook(r, *args, **kwargs): r.hook_called = True if 301 == r.status_code and isinstance(r.headers.get('Location'), string_types) \ and r.headers.get('Location').startswith('http://api.thetvdb.com/'): r.headers['Location'] = r.headers['Location'].replace('http://', 'https://') return r def retry(exception_to_check, tries=4, delay=3, backoff=2): """Retry calling the decorated function using an exponential backoff. www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/ original from: wiki.python.org/moin/PythonDecoratorLibrary#Retry :param exception_to_check: the exception to check. may be a tuple of exceptions to check :type exception_to_check: Exception or tuple :param tries: number of times to try (not retry) before giving up :type tries: int :param delay: initial delay between retries in seconds :type delay: int :param backoff: backoff multiplier e.g. value of 2 will double the delay each retry :type backoff: int """ def deco_retry(f): @wraps(f) def f_retry(*args, **kwargs): mtries, mdelay = tries, delay auth_error = 0 while 1 < mtries: try: return f(*args, **kwargs) except exception_to_check as e: msg = '%s, Retrying in %d seconds...' % (str(e), mdelay) log.warning(msg) time.sleep(mdelay) if isinstance(e, TvdbTokenexpired) and not auth_error: auth_error += 1 else: mtries -= 1 mdelay *= backoff except ConnectionSkipException as e: raise e try: return f(*args, **kwargs) except TvdbTokenexpired: if not auth_error: return f(*args, **kwargs) raise TvdbTokenexpired except ConnectionSkipException as e: raise e return f_retry # true decorator return deco_retry class Actors(list): """Holds all Actor instances for a show """ pass class Actor(dict): """Represents a single actor. Should contain.. id, image, name, role, sortorder """ def __repr__(self): return '' % self.get('name') class Tvdb(TVInfoBase): """Create easy-to-use interface to name of season/episode name >> t = Tvdb() >> t['Scrubs'][1][24]['episodename'] 'My Last Day' """ map_languages = {} reverse_map_languages = {v: k for k, v in iteritems(map_languages)} supported_id_searches = [TVINFO_TVDB, TVINFO_TVDB_SLUG] # noinspection PyUnusedLocal def __init__(self, interactive=False, select_first=False, debug=False, cache=True, banners=False, fanart=False, posters=False, seasons=False, seasonwides=False, actors=False, custom_ui=None, language=None, search_all_languages=False, apikey=None, dvdorder=False, proxy=None, *args, **kwargs): """interactive (True/False): When True, uses built-in console UI is used to select the correct show. When False, the first search result is used. select_first (True/False): Automatically selects the first series search result (rather than showing the user a list of more than one series). Is overridden by interactive = False, or specifying a custom_ui debug (True/False) DEPRECATED: Replaced with proper use of logging module. To show debug messages: >> import logging >> logging.basicConfig(level = logging.DEBUG) cache (True/False/str/unicode/urllib2 opener): Retrieved XML are persisted to to disc. If true, stores in tvdb_api folder under your systems TEMP_DIR, if set to str/unicode instance it will use this as the cache location. If False, disables caching. Can also be passed an arbitrary Python object, which is used as a urllib2 opener, which should be created by urllib2.build_opener banners (True/False): Retrieves the banners for a show. These are accessed via the banners key of a Show(), for example: >> Tvdb(banners=True)['scrubs']['banners'].keys() ['fanart', 'poster', 'series', 'season'] actors (True/False): Retrieves a list of the actors for a show. These are accessed via the actors key of a Show(), for example: >> t = Tvdb(actors=True) >> t['scrubs']['actors'][0]['name'] 'Zach Braff' custom_ui (tvdb_ui.BaseUI subclass): A callable subclass of tvdb_ui.BaseUI (overrides interactive option) language (2 character language abbreviation): The language of the returned data. Is also the language search uses. Default is "en" (English). For full list, run.. >> Tvdb().config['valid_languages'] #doctest: +ELLIPSIS ['da', 'fi', 'nl', ...] search_all_languages (True/False): By default, Tvdb will only search in the language specified using the language option. When this is True, it will search for the show in and language apikey (str/unicode): Override the default thetvdb.com API key. By default it will use tvdb_api's own key (fine for small scripts), but you can use your own key if desired - this is recommended if you are embedding tvdb_api in a larger application) See thetvdb.com/?tab=apiregister to get your own key """ super(Tvdb, self).__init__(*args, **kwargs) self.config = {} if None is not apikey: self.config['apikey'] = apikey else: self.config['apikey'] = '0629B785CE550C8D' # tvdb_api's API key self.config['debug_enabled'] = debug # show debugging messages self.config['custom_ui'] = custom_ui self.config['interactive'] = interactive # prompt for correct series? self.config['select_first'] = select_first self.config['search_all_languages'] = search_all_languages self.config['dvdorder'] = dvdorder self.config['proxy'] = proxy if cache is True: self.config['cache_enabled'] = True self.config['cache_location'] = self._get_temp_dir() elif cache is False: self.config['cache_enabled'] = False elif isinstance(cache, string_types): self.config['cache_enabled'] = True self.config['cache_location'] = cache else: raise ValueError('Invalid value for Cache %r (type was %s)' % (cache, type(cache))) self.config['banners_enabled'] = banners self.config['posters_enabled'] = posters self.config['seasons_enabled'] = seasons self.config['seasonwides_enabled'] = seasonwides self.config['fanart_enabled'] = fanart self.config['actors_enabled'] = actors if self.config['debug_enabled']: warnings.warn('The debug argument to tvdb_api.__init__ will be removed in the next version. ' + 'To enable debug messages, use the following code before importing: ' + 'import logging; logging.basicConfig(level=logging.DEBUG)') logging.basicConfig(level=logging.DEBUG) # List of language from http://thetvdb.com/api/0629B785CE550C8D/languages.xml # Hard-coded here as it is relatively static, and saves another HTTP request, as # recommended on http://thetvdb.com/wiki/index.php/API:languages.xml self.config['valid_languages'] = [ 'cs', 'da', 'de', 'el', 'en', 'es', 'fi', 'fr', 'he', 'hr', 'hu', 'it', 'ja', 'ko', 'nl', 'no', 'pl', 'pt', 'ru', 'sl', 'sv', 'tr', 'zh' ] # not mapped: el, sl, tr. added as guess: fin, pol. unknown: _1 self.config['langabbv_23'] = { 'cs': 'ces', 'da': 'dan', 'de': 'deu', 'en': 'eng', 'es': 'spa', 'fi': 'fin', 'fr': 'fra', 'he': 'heb', 'hr': 'hrv', 'hu': 'hun', 'it': 'ita', 'ja': 'jpn', 'ko': 'kor', 'nb': 'nor', 'nl': 'nld', 'no': 'nor', 'pl': 'pol', 'pt': 'pot', 'ru': 'rus', 'sk': 'slv', 'sv': 'swe', 'zh': 'zho', '_1': 'srp', } self.config['valid_languages_3'] = list(self.config['langabbv_23'].values()) # TheTvdb.com should be based around numeric language codes, # but to link to a series like http://thetvdb.com/?tab=series&id=79349&lid=16 # requires the language ID, thus this mapping is required (mainly # for usage in tvdb_ui - internally tvdb_api will use the language abbreviations) self.config['langabbv_to_id'] = { 'cs': 28, 'da': 10, 'de': 14, 'el': 20, 'en': 7, 'es': 16, 'fi': 11, 'fr': 17, 'he': 24, 'hr': 31, 'hu': 19, 'it': 15, 'ja': 25, 'ko': 32, 'nl': 13, 'no': 9, 'pl': 18, 'pt': 26, 'ru': 22, 'sl': 30, 'sv': 8, 'tr': 21, 'zh': 27 } if not language: self.config['language'] = 'en' else: if language not in self.config['valid_languages']: raise ValueError('Invalid language %s, options are: %s' % (language, self.config['valid_languages'])) else: self.config['language'] = language # The following url_ configs are based of the # http://thetvdb.com/wiki/index.php/Programmers_API self.config['base_url'] = 'https://thetvdb.com/' self.config['api3_url'] = 'https://api.thetvdb.com/' self.config['url_search_series'] = '%(api3_url)ssearch/series' % self.config self.config['params_search_series'] = {'name': ''} self.config['url_series_episodes_info'] = '%(api3_url)sseries/%%s/episodes?page=%%s' % self.config self.config['url_series_info'] = '%(api3_url)sseries/%%s' % self.config self.config['url_episodes_info'] = '%(api3_url)sepisodes/%%s' % self.config self.config['url_actors_info'] = '%(api3_url)sseries/%%s/actors' % self.config self.config['url_series_images'] = '%(api3_url)sseries/%%s/images/query?keyType=%%s' % self.config self.config['url_artworks'] = 'https://artworks.thetvdb.com/banners/%s' self.config['url_artworks_search'] = 'https://artworks.thetvdb.com/%s' self.config['url_people'] = '%(base_url)speople/%%s' % self.config self.config['url_series_people'] = '%(base_url)sseries/%%s/people' % self.config self.config['url_series_all'] = '%(base_url)sseries/%%s/allseasons/official' % self.config self.config['url_series_dvd'] = '%(base_url)sseries/%%s/allseasons/dvd' % self.config self.config['url_series_abs'] = '%(base_url)sseries/%%s/seasons/absolute/1' % self.config def _search_show(self, name=None, ids=None, **kwargs): # type: (AnyStr, Dict[integer_types, integer_types], Optional[Any]) -> List[TVInfoShow] def make_tvinfoshow(data): _ti_show = TVInfoShow() _ti_show.id, _ti_show.banner, _ti_show.firstaired, _ti_show.poster, _ti_show.network, _ti_show.overview, \ _ti_show.seriesname, _ti_show.slug, _ti_show.status, _ti_show.aliases, _ti_show.ids = \ clean_data(data['id']), clean_data(data.get('banner')), clean_data(data.get('firstaired')), \ clean_data(data.get('poster')), clean_data(data.get('network')), clean_data(data.get('overview')), \ clean_data(data.get('seriesname')), clean_data(data.get('slug')), clean_data(data.get('status')), \ clean_data((data.get('aliases'))), TVInfoIDs(tvdb=try_int(clean_data(data['id']))) return _ti_show results = [] if ids: if ids.get(TVINFO_TVDB): cache_id_key = 's-id-%s-%s' % (TVINFO_TVDB, ids[TVINFO_TVDB]) is_none, shows = self._get_cache_entry(cache_id_key) if not self.config.get('cache_search') or (None is shows and not is_none): try: d_m = self._get_show_data(ids.get(TVINFO_TVDB), self.config['language'], direct_data=True) self._set_cache_entry(cache_id_key, d_m, expire=self.search_cache_expire) except (BaseException, Exception): d_m = None else: d_m = shows if d_m: results.append(make_tvinfoshow(d_m['data'])) if ids.get(TVINFO_TVDB_SLUG): cache_id_key = 's-id-%s-%s' % (TVINFO_TVDB, ids[TVINFO_TVDB_SLUG]) is_none, shows = self._get_cache_entry(cache_id_key) if not self.config.get('cache_search') or (None is shows and not is_none): try: d_m = self.get_series(ids.get(TVINFO_TVDB_SLUG).replace('-', ' ')) self._set_cache_entry(cache_id_key, d_m, expire=self.search_cache_expire) except (BaseException, Exception): d_m = None else: d_m = shows if d_m: for r in d_m: if ids.get(TVINFO_TVDB_SLUG) == r['slug']: results.append(make_tvinfoshow(r)) break if name: for n in ([name], name)[isinstance(name, list)]: cache_name_key = 's-name-%s' % n is_none, shows = self._get_cache_entry(cache_name_key) if not self.config.get('cache_search') or (None is shows and not is_none): try: r = self.get_series(n) self._set_cache_entry(cache_name_key, r, expire=self.search_cache_expire) except (BaseException, Exception): r = None else: r = shows if r: if not isinstance(r, list): r = [r] results.extend([make_tvinfoshow(_s) for _s in r]) seen = set() results = [seen.add(r['id']) or r for r in results if r['id'] not in seen] return results def get_new_token(self): global THETVDB_V2_API_TOKEN token = THETVDB_V2_API_TOKEN.get('token', None) dt = THETVDB_V2_API_TOKEN.get('datetime', datetime.datetime.fromordinal(1)) url = '%s%s' % (self.config['api3_url'], 'login') params = {'apikey': self.config['apikey']} resp = get_url(url.strip(), post_json=params, parse_json=True, raise_skip_exception=True) if resp: if 'token' in resp: token = resp['token'] dt = datetime.datetime.now() return {'token': token, 'datetime': dt} def get_token(self): global THETVDB_V2_API_TOKEN if None is THETVDB_V2_API_TOKEN.get( 'token') or datetime.datetime.now() - THETVDB_V2_API_TOKEN.get( 'datetime', datetime.datetime.fromordinal(1)) > datetime.timedelta(hours=23): THETVDB_V2_API_TOKEN = self.get_new_token() if not THETVDB_V2_API_TOKEN.get('token'): raise TvdbError('Could not get Authentification Token') return THETVDB_V2_API_TOKEN.get('token') @staticmethod def _get_temp_dir(): """Returns the [system temp dir]/tvdb_api-u501 (or tvdb_api-myuser) """ if hasattr(os, 'getuid'): uid = 'u%d' % (os.getuid()) else: # For Windows try: uid = getpass.getuser() except ImportError: return os.path.join(tempfile.gettempdir(), 'tvdb_api') return os.path.join(tempfile.gettempdir(), 'tvdb_api-%s' % uid) def _match_url_pattern(self, pattern, url): if pattern in self.config: try: if PY2: return None is not re.search('^%s$' % re.escape(self.config[pattern]).replace('\\%s', '[^/]+'), url) else: return None is not re.search('^%s$' % re.escape(self.config[pattern]).replace(r'%s', '[^/]+'), url) except (BaseException, Exception): pass return False def is_apikey(self, check_url=None): return bool(self.config['apikey']) and (None is check_url or '://api' in check_url) @retry((TvdbError, TvdbTokenexpired)) def _load_url(self, url, params=None, language=None, parse_json=False, **kwargs): log.debug('Retrieving URL %s' % url) parse_json = parse_json or self.is_apikey(url) session = requests.session() if self.config['cache_enabled']: session = CacheControl(session, cache=caches.FileCache(self.config['cache_location'])) if self.config['proxy']: log.debug('Using proxy for URL: %s' % url) session.proxies = {'http': self.config['proxy'], 'https': self.config['proxy']} headers = {'Accept-Encoding': 'gzip,deflate'} if self.is_apikey(url): headers.update({'Authorization': 'Bearer %s' % self.get_token(), 'Accept': 'application/vnd.thetvdb.v%s' % __api_version__}) if None is not language and language in self.config['valid_languages']: headers.update({'Accept-Language': language}) resp = None is_series_info = self._match_url_pattern('url_series_info', url) if is_series_info: self.show_not_found = False self.not_found = False try: resp = get_url(url.strip(), params=params, session=session, headers=headers, parse_json=parse_json, raise_status_code=True, raise_exceptions=True, raise_skip_exception=True, **kwargs) except ConnectionSkipException as e: raise e except requests.exceptions.HTTPError as e: if 401 == e.response.status_code: if self.is_apikey(url): # token expired, get new token, raise error to retry global THETVDB_V2_API_TOKEN THETVDB_V2_API_TOKEN = self.get_new_token() raise TvdbTokenexpired elif 404 == e.response.status_code: if is_series_info: self.show_not_found = True elif self._match_url_pattern('url_series_episodes_info', url): resp = {'data': []} self.not_found = True elif 404 != e.response.status_code: raise TvdbError except (BaseException, Exception): raise TvdbError if is_series_info and isinstance(resp, dict) and isinstance(resp.get('data'), dict) and \ isinstance(resp['data'].get('seriesName'), string_types) and \ re.search(r'^[*]\s*[*]\s*[*]', resp['data'].get('seriesName', ''), flags=re.I): self.show_not_found = True self.not_found = True map_show = {'airstime': 'airs_time', 'airsdayofweek': 'airs_dayofweek', 'imdbid': 'imdb_id', 'writers': 'writer', 'siterating': 'rating'} def map_show_keys(data): keep_data = {} del_keys = [] new_data = {} for k, v in iteritems(data): k_org = k k = k.lower() if None is not v: if k in ['banner', 'fanart', 'poster', 'image'] and v: v = (self.config['url_artworks'], self.config['url_artworks_search'])[ isinstance(v, string_types) and v.lstrip('/').startswith('banners/')] % v.lstrip('/') elif 'genre' == k: keep_data['genre_list'] = v v = '|'.join([clean_data(c) for c in v if isinstance(c, string_types)]) elif 'gueststars' == k: keep_data['gueststars_list'] = v v = '|%s|' % '|'.join([clean_data(c) for c in v if isinstance(c, string_types)]) elif 'writers' == k: keep_data[k] = v v = '|%s|' % '|'.join([clean_data(c) for c in v if isinstance(c, string_types)]) elif 'rating' == k: new_data['contentrating'] = v elif 'firstaired' == k: if v: try: v = parse(v, fuzzy=True).strftime('%Y-%m-%d') except (BaseException, Exception): v = None else: v = None elif 'imdbid' == k: if v: if re.search(r'^(tt)?\d{1,9}$', v, flags=re.I): v = clean_data(v) else: v = '' else: v = clean_data(v) if not v and 'seriesname' == k: if isinstance(data.get('aliases'), list) and 0 < len(data.get('aliases')): v = data['aliases'].pop(0) # this is a invalid show, it has no Name if not v: return None if k in map_show: k = map_show[k] if k_org is not k: del_keys.append(k_org) new_data[k] = v else: data[k] = v for d in del_keys: del (data[d]) if isinstance(data, dict): data.update(new_data) data.update(keep_data) return data if resp and isinstance(resp, dict): if isinstance(resp.get('data'), dict): resp['data'] = map_show_keys(resp['data']) elif isinstance(resp.get('data'), list): data_list = [] for idx, row in enumerate(resp['data']): if isinstance(row, dict): cr = map_show_keys(row) if None is not cr: data_list.append(cr) resp['data'] = data_list return resp return dict([('data', (None, resp)[isinstance(resp, string_types)])]) def _getetsrc(self, url, params=None, language=None, parse_json=False): """Loads a URL using caching """ try: src = self._load_url(url, params=params, language=language, parse_json=parse_json) if isinstance(src, dict): if None is not src['data']: data = src['data'] else: data = {} # data = src['data'] or {} if isinstance(data, list): if 0 < len(data): data = data[0] # data = data[0] or {} if None is data or (isinstance(data, dict) and 1 > len(data.keys())): raise ValueError return src except (KeyError, IndexError, Exception): pass @staticmethod def clean_overview(text): """replace newlines with period and space, remove multiple spaces""" return ' '.join(['%s.' % re.sub(r'[\s][\s]+', r' ', x).strip().rstrip('.') for x in text.split('\r\n')]) def get_show_info(self, sid, language=None): # type: (int, Optional[str]) -> Optional[dict] results = self.search_tvs(sid, language=language) for cur_result in (isinstance(results, dict) and results.get('results') or []): result = list(filter(lambda r: 'series' == r['type'] and sid == r['id'], cur_result.get('nbHits') and cur_result.get('hits') or [])) if 1 == len(result): result[0]['overview'] = self.clean_overview( result[0]['overviews'][self.config['langabbv_23'].get(language) or 'eng']) # remap for from_key, to_key in iteritems({ 'name': 'seriesname', 'first_air_date': 'firstaired' }): result[0][to_key] = result[0][from_key] del result[0][from_key] # delete also prevents false +ve with the following new key notifier # notify of new keys if ENV.get('SG_DEV_MODE'): new_keys = set(list(result[0])).difference({ '_highlightResult', 'aliases', 'banner', 'fanart', 'firstaired', 'follower_count', 'id', 'image', 'is_tvdb_searchable', 'is_tvt_searchable', 'seriesname', 'network', 'objectID', 'overviews', 'poster', 'release_year', 'slug', 'status', 'translations', 'type', 'url', 'uuid' }) if new_keys: log.warning('DEV_MODE: New get_show_info tvdb attrs for %s %r' % (sid, new_keys)) return result[0] # fallback : e.g. https://thetvdb.com/?tab=series&id=349309&lid=7 response = self._load_url(self.config['base_url'], params={ 'tab': 'series', 'id': sid, 'lid': self.config['langabbv_to_id'].get(language, 7)}) series = {} def get_value(tag, contains): try: rc_contains = re.compile(r'(?i)%s' % contains) parent = copy.copy(tag.find(string=rc_contains, recursive=True).find_parent(class_=re.compile('item'))) return ', '.join(re.sub(r'(?i)(\s)([\s]+)', r'\1', i.get_text(strip=True)) for i in parent.find_all('span')) except(BaseException, Exception): pass with BS4Parser(response.get('data', '')) as soup: basic_info = soup.find(id='series_basic_info') series_id = try_int(get_value(basic_info, r'series\sid'), None) if None is not series_id: series['id'] = series_id series['firstaired'] = None # fill from ep listings page series['genrelist'] = get_value(basic_info, 'genres').split(', ') # extra field series['genre'] = '|'.join(series['genrelist']) series['language'] = language series['seriesname'] = soup.find(id='series_title').get_text(strip=True) series['networklist'] = get_value(basic_info, 'network').split(', ') # extra field series['network'] = '|%s|' % '|'.join(series['networklist']) # e.g. '|network|network n|network 10|' series['status'] = get_value(basic_info, 'status') series['type'] = 'series' # extra field airs_at = get_value(basic_info, 'airs') airs = airs_at and airs_at.split(', ') or [] if 0 < len(airs): series['airs_time'] = 'at ' in airs[-1] \ and re.sub(r'(?i)\s+([ap]m)', r'\1', airs[-1]).split()[-1] or '' series['airs_dayofweek'] = ', '.join(airs[0:-1]) else: series['airs_time'] = airs_at series['airs_dayofweek'] = '' # alias list series['aliases'] = [] try: lang_tag = soup.find(id='translations').select('.change_translation_text[data-language="%s"]' % ( self.config['langabbv_23'].get(language) or 'eng'))[0] series['aliases'] = [t.get_text(strip=True) for t in lang_tag .find(string=re.compile('(?i)alias'), recursive=True).find_parent() .find_next_sibling('ul').find_all('li')] except(BaseException, Exception): pass # images series['image'] = series['poster'] = (soup.find(rel=re.compile('artwork_posters')) or {}).get('href') series['banner'] = (soup.find(rel=re.compile('artwork_banners')) or {}).get('href') series['fanart'] = (soup.find(rel=re.compile('artwork_backgrounds')) or {}).get('href') series['imdb_id'] = re.sub(r'.*(tt\d+)', r'\1', (soup.find(href=re.compile(r'imdb\.com')) or {}).get('href', '')) # {lang: overview} series.setdefault('overviews', {}) for cur_tag in soup.find_all(class_='change_translation_text'): try: lang = cur_tag.attrs.get('data-language') if None is not lang: text = cur_tag.p.get_text(strip=True) if text: text = self.clean_overview(text) series['overviews'].setdefault(lang, text) # extra field if lang == self.config['langabbv_23'].get(language): series['overview'] = text except(BaseException, Exception): pass runtime = get_value(basic_info, 'runtime') runtime_often = None if ', ' in runtime: try: # sort runtimes by most number of episodes (e.g. '25 minutes (700 episodes)') runtime_often = sorted([re.findall(r'([^(]+)\((\d+).*', i)[0] for i in runtime.split(', ')], key=lambda x: try_int(x[1]), reverse=True) runtime_often = next(iter(runtime_often))[0].strip() # first item is most frequent runtime except(BaseException, Exception): runtime_often = None series['runtime'] = runtime_often and re.sub('^([0-9]+).*', r'\1', runtime_often) or runtime series['season'] = None try: last_season = sorted([x.get('href') for x in soup.find_all(href=re.compile(r'/seasons/official/(\d+)'))])[-1] series['season'] = re.findall(r'(\d+)$', last_season)[0] except(BaseException, Exception): pass series['slug'] = series['url'] = '' try: rc_slug = re.compile('(?i)/series/(?P[^/]+)/(?:episode|season)') series['slug'] = rc_slug.search(soup.find(href=rc_slug).get('href')).group('slug') series['url'] = '%sseries/%s' % (self.config['base_url'], series['slug']) # extra field except(BaseException, Exception): pass # {lang: show title in lang} # extra field series['translations'] = {t.attrs.get('data-language'): t.attrs.get('data-title') for t in soup.find_all(class_='change_translation_text') if all(t.attrs.get(a) for a in ('data-title', 'data-language'))} return series def search_tvs(self, terms, language=None): # type: (Union[int, str], Optional[str]) -> Optional[dict] try: src = self._load_url( 'https://tvshow''time-%s.algo''lia.net/1/' 'indexes/*/queries' % random.choice([1, 2, 3, 'dsn']), params={'x-algo''lia-agent': 'Alg''olia for vani''lla JavaScript (lite) 3.3''2.0;' 'instant''search.js (3.5''.3);JS Helper (2.2''8.0)', 'x-algo''lia''-app''lication-id': 'tvshow''time', 'x-algo''lia''-ap''i-key': '3d''978dd96c457390f21cec6131ce5d''9c'[::-1]}, post_json={'requests': [ {'indexName': 'TVDB', 'params': '&'.join( ['query=%s' % terms, 'maxValuesPerFacet=10', 'page=0', 'facetFilters=[["type:series", "type:person"]]', 'tagFilters=', 'analytics=false', 'advancedSyntax=true', 'highlightPreTag=__ais-highlight__', 'highlightPostTag=__/ais-highlight__' ]) }]}, language=language, parse_json=True) return src except (KeyError, IndexError, Exception): pass def search(self, series): # type: (AnyStr) -> List """This searches TheTVDB.com for the series name and returns the result list """ if PY2: series = series.encode('utf-8') self.config['params_search_series']['name'] = series log.debug('Searching for show %s' % series) try: series_found = self._getetsrc(self.config['url_search_series'], params=self.config['params_search_series'], language=self.config['language']) if series_found: return list(series_found.values())[0] except (BaseException, Exception): pass return [] def get_series(self, series): """This searches TheTVDB.com for the series name, If a custom_ui UI is configured, it uses this to select the correct series. If not, and interactive == True, ConsoleUI is used, if not BaseUI is used to select the first result. """ all_series = self.search(series) if not isinstance(all_series, list): all_series = [all_series] if 0 == len(all_series): log.debug('Series result returned zero') raise TvdbShownotfound('Show-name search returned zero results (cannot find show on TVDB)') if None is not self.config['custom_ui']: log.debug('Using custom UI %s' % self.config['custom_ui'].__name__) custom_ui = self.config['custom_ui'] ui = custom_ui(config=self.config) else: if not self.config['interactive']: log.debug('Auto-selecting first search result using BaseUI') ui = BaseUI(config=self.config) else: log.debug('Interactively selecting show using ConsoleUI') ui = ConsoleUI(config=self.config) return ui.select_series(all_series) def _parse_banners(self, sid, img_list): banners = {} try: for cur_banner in img_list: bid = cur_banner['id'] btype = (cur_banner['keytype'], 'banner')['series' == cur_banner['keytype']] btype2 = (cur_banner['resolution'], try_int(cur_banner['subkey'], cur_banner['subkey']))[ btype in ('season', 'seasonwide')] if None is btype or None is btype2: continue for k, v in iteritems(cur_banner): if None is k or None is v: continue k, v = k.lower(), v.lower() if isinstance(v, string_types) else v if 'filename' == k: k = 'bannerpath' v = self._make_image(self.config['url_artworks'], v) elif 'thumbnail' == k: k = 'thumbnailpath' v = self._make_image(self.config['url_artworks'], v) elif 'keytype' == k: k = 'bannertype' banners.setdefault(btype, OrderedDict()).setdefault(btype2, OrderedDict()).setdefault(bid, {})[ k] = v except (BaseException, Exception): pass self._set_show_data(sid, '_banners', banners, add=True) @staticmethod def _make_image(base_url, url): # type: (str, str) -> str if not url or url.lower().startswith('http'): return url or '' return base_url % url def _parse_actors(self, sid, actor_list, actor_list_alt): a = [] cast = CastList() try: alts = {} if actor_list_alt: with BS4Parser(actor_list_alt) as soup: rc_role = re.compile(r'/series/(?P[^/]+)/people/(?P\d+)/?$') rc_img = re.compile(r'/(?Pperson/(?P[0-9]+)/(?P[^/]+)\..*)') rc_img_v3 = re.compile(r'/(?Pactors/(?P[^/]+)\..*)') max_people = 5 rc_clean = re.compile(r'[^a-z0-9]') for cur_enum, cur_role in enumerate(soup.find_all('a', href=rc_role) or []): try: image = person_id = None for cur_rc in (rc_img, rc_img_v3): img_tag = cur_role.find('img', src=cur_rc) if img_tag: img_parsed = cur_rc.search(img_tag.get('src')) image, person_id = [x in img_parsed.groupdict() and img_parsed.group(x) for x in ('url', 'person_id')] break lines = [x.strip() for x in cur_role.get_text().split('\n') if x.strip()][0:2] name = role = '' if len(lines): name = lines[0] for line in lines[1:]: if line.lower().startswith('as '): role = line[3:] break if not person_id and max_people: max_people -= 1 results = self.search_tvs(name) try: for cur_result in (isinstance(results, dict) and results.get('results') or []): # sorts 'banners/images/missing/' to last before filter people = list(filter( lambda r: 'person' == r['type'] and rc_clean.sub(name, '') == rc_clean.sub(r['name'], ''), cur_result.get('nbHits') and sorted(cur_result.get('hits'), key=lambda x: len(x['image']), reverse=True) or [])) if ENV.get('SG_DEV_MODE'): for person in people: new_keys = set(list(person)).difference({ '_highlightResult', 'banner', 'id', 'image', 'is_tvdb_searchable', 'is_tvt_searchable', 'name', 'objectID', 'people_birthdate', 'people_died', 'poster', 'type', 'url' }) if new_keys: log.warning('DEV_MODE: New _parse_actors tvdb attrs for %s %r' % (person['id'], new_keys)) person_ok = False for person in people: if image: people_data = self._load_url(person['url'])['data'] person_ok = re.search(re.escape(image), people_data) if not image or person_ok: person_id = person['id'] raise ValueError('value okay, id found') except (BaseException, Exception): pass rid = int(rc_role.search(cur_role.get('href')).group('role_id')) alts.setdefault(rid, {'id': rid, 'person_id': person_id or None, 'name': name, 'role': role, 'image': image, 'sortorder': cur_enum, 'lastupdated': 0}) except(BaseException, Exception): pass if not self.is_apikey(): # for the future when apikey == '' actor_list = sorted([d for _, d in iteritems(alts)], key=lambda x: x.get('sortorder')) unique_c_p, c_p_list, new_actor_list = set(), [], [] for actor in sorted(actor_list, key=lambda x: x.get('lastupdated'), reverse=True): c_p_list.append((actor['name'], actor['role'])) if (actor['name'], actor['role']) not in unique_c_p: unique_c_p.add((actor['name'], actor['role'])) new_actor_list.append(actor) for n in sorted(new_actor_list, key=lambda x: x['sortorder']): role_image = (alts.get(n['id'], {}).get('image'), n.get('image'))[ any([n.get('image')]) and 1 == c_p_list.count((n['name'], n['role']))] if role_image: role_image = self._make_image(self.config['url_artworks'], role_image) character_name = n.get('role', '').strip() or alts.get(n['id'], {}).get('role', '') person_name = n.get('name', '').strip() or alts.get(n['id'], {}).get('name', '') person_id = None person_id = person_id or alts.get(n['id'], {}).get('person_id') character_id = n.get('id', None) or alts.get(n['id'], {}).get('rid') a.append({'character': {'id': character_id, 'name': character_name, 'url': None, # not supported by tvdb 'image': role_image, }, 'person': {'id': person_id, 'name': person_name, 'url': person_id and (self.config['url_people'] % person_id) or None, 'image': None, # not supported by tvdb 'birthday': None, # not supported by tvdb 'deathday': None, # not supported by tvdb 'gender': None, # not supported by tvdb 'country': None, # not supported by tvdb }, }) cast[RoleTypes.ActorMain].append( TVInfoCharacter( p_id=character_id, name=character_name, person=[TVInfoPerson(p_id=person_id, name=person_name)], image=role_image, show=self.ti_shows[sid])) except (BaseException, Exception): pass self._set_show_data(sid, 'actors', a) self._set_show_data(sid, 'cast', cast) self.ti_shows[sid].actors_loaded = True def get_episode_data(self, epid): # Parse episode information data = None log.debug('Getting all episode data for %s' % epid) url = self.config['url_episodes_info'] % epid episode_data = self._getetsrc(url, language=self.config['language']) if episode_data and 'data' in episode_data: data = episode_data['data'] if isinstance(data, dict): for k, v in iteritems(data): k = k.lower() if None is not v: if 'filename' == k and v: v = self._make_image(self.config['url_artworks'], v) else: v = clean_data(v) data[k] = v return data def _parse_images(self, sid, language, show_data, image_type, enabled_type, type_bool): mapped_img_types = {'banner': 'series'} excluded_main_data = enabled_type in ['seasons_enabled', 'seasonwides_enabled'] loaded_name = '%s_loaded' % image_type if (type_bool or self.config[enabled_type]) and not getattr(self.ti_shows.get(sid), loaded_name, False): image_data = self._getetsrc(self.config['url_series_images'] % (sid, mapped_img_types.get(image_type, image_type)), language=language) if image_data and 0 < len(image_data.get('data', '') or ''): image_data['data'] = sorted(image_data['data'], reverse=True, key=lambda x: (x['ratingsinfo']['average'], x['ratingsinfo']['count'])) if not excluded_main_data: url_image = self._make_image(self.config['url_artworks'], image_data['data'][0]['filename']) url_thumb = self._make_image(self.config['url_artworks'], image_data['data'][0]['thumbnail']) self._set_show_data(sid, image_type, url_image) self._set_show_data(sid, f'{image_type}_thumb', url_thumb) excluded_main_data = True # artwork found so prevent fallback self._parse_banners(sid, image_data['data']) self.ti_shows[sid].__dict__[loaded_name] = True # fallback image thumbnail for none excluded_main_data if artwork is not found if not excluded_main_data and show_data.get(image_type): self._set_show_data(sid, f'{image_type}_thumb', re.sub(r'\.jpg$', '_t.jpg', show_data[image_type], flags=re.I)) def _get_show_data(self, sid, # type: integer_types language, # type: AnyStr get_ep_info=False, # type: bool banners=False, # type: bool posters=False, # type: bool seasons=False, # type: bool seasonwides=False, # type: bool fanart=False, # type: bool actors=False, # type: bool direct_data=False, # type: bool **kwargs # type: Optional[Any] ): # type: (...) -> Optional[bool, dict] """Takes a series ID, gets the epInfo URL and parses the TVDB XML file into the shows dict in layout: shows[series_id][season_number][episode_number] """ # Parse show information url = self.config['url_series_info'] % sid if direct_data or sid not in self.ti_shows or None is self.ti_shows[sid].id or \ language != self.ti_shows[sid].language: log.debug('Getting all series data for %s' % sid) show_data = self._getetsrc(url, language=language) if not show_data or not show_data.get('data'): show_data = {'data': self.get_show_info(sid, language=language)} if direct_data: return show_data # check and make sure we have data to process and that it contains a series name if not (show_data and 'seriesname' in show_data.get('data', {}) or {}): return False show_data = show_data['data'] ti_show = self.ti_shows[sid] # type: TVInfoShow ti_show.banner_loaded = ti_show.poster_loaded = ti_show.fanart_loaded = True ti_show.id = show_data['id'] ti_show.seriesname = clean_data(show_data.get('seriesname')) ti_show.slug = clean_data(show_data.get('slug')) ti_show.poster = clean_data(show_data.get('poster')) ti_show.banner = clean_data(show_data.get('banner')) ti_show.fanart = clean_data(show_data.get('fanart')) ti_show.firstaired = clean_data(show_data.get('firstAired')) ti_show.rating = show_data.get('rating') ti_show.contentrating = clean_data(show_data.get('contentRatings')) ti_show.aliases = show_data.get('aliases') or [] ti_show.status = clean_data(show_data['status']) if clean_data(show_data.get('network')): ti_show.network = clean_data(show_data['network']) ti_show.networks = [TVInfoNetwork(clean_data(show_data['network']), n_id=clean_data(show_data.get('networkid')))] ti_show.runtime = try_int(show_data.get('runtime'), 0) ti_show.language = clean_data(show_data.get('language')) ti_show.genre = clean_data(show_data.get('genre')) ti_show.genre_list = clean_data(show_data.get('genre_list')) or [] ti_show.overview = clean_data(show_data.get('overview')) ti_show.imdb_id = clean_data(show_data.get('imdb_id')) or None ti_show.airs_time = clean_data(show_data.get('airs_time')) ti_show.airs_dayofweek = clean_data(show_data.get('airs_dayofweek')) ti_show.ids = TVInfoIDs(tvdb=ti_show.id, imdb=try_int(str(ti_show.imdb_id).replace('tt', ''), None)) else: show_data = {'data': {}} for img_type, en_type, p_type in [('poster', 'posters_enabled', posters), ('banner', 'banners_enabled', banners), ('fanart', 'fanart_enabled', fanart), ('season', 'seasons_enabled', seasons), ('seasonwide', 'seasonwides_enabled', seasonwides)]: self._parse_images(sid, language, show_data, img_type, en_type, p_type) if (actors or self.config['actors_enabled']) and not getattr(self.ti_shows.get(sid), 'actors_loaded', False): actor_data = self._getetsrc(self.config['url_actors_info'] % sid, language=language) actor_data_alt = self._getetsrc(self.config['url_series_people'] % sid, language=language) if actor_data and 0 < len(actor_data.get('data', '') or '') or actor_data_alt and actor_data_alt['data']: self._parse_actors(sid, actor_data and actor_data.get('data', ''), actor_data_alt and actor_data_alt['data']) if get_ep_info and not getattr(self.ti_shows.get(sid), 'ep_loaded', False): # Parse episode data log.debug('Getting all episodes of %s' % sid) page = 0 # type: int episodes = [] # type: list episode_data_found = False # type: bool episode_data_broken = False # type: bool page_count = 0 # type: int pages_loaded = 0 # type: int start_page = 0 # type: int while page <= 400: episode_data = {} if self.is_apikey() and not episode_data_broken: episode_data = self._getetsrc( self.config['url_series_episodes_info'] % (sid, page), language=language) # fallback to correct old pagination if 0 == page and None is episode_data: page = 1 continue if episode_data: if 1 == page and not bool(episodes): start_page = 1 pages_loaded += 1 episode_data_found |= start_page == page and bool(episode_data) if episode_data_broken or \ (not episode_data_found and isinstance(show_data, dict) and 'slug' in show_data): response = {'data': None} items_found = False # fallback to page 'all' if dvd is enabled and response has no items for page_type in ('url_series_dvd', 'url_series_all'): if 'dvd' not in page_type or self.config['dvdorder']: response = self._load_url(self.config[page_type] % show_data.get('slug')) with BS4Parser(response.get('data') or '') as soup: items_found = bool(soup.find_all(class_='list-group-item')) if items_found: break if not items_found: break episode_data = {'data': []} with BS4Parser(response.get('data')) as soup: items = soup.find_all(class_='list-group-item') rc_sxe = re.compile(r'(?i)s(?:pecial\s*)?(\d+)\s*[xe]\s*(\d+)') # Special nxn or SnnEnn rc_episode = re.compile(r'(?i)/series/%s/episodes?/(?P\d+)' % show_data['slug']) rc_date = re.compile(r'\s\d{4}\s*$') season_type, episode_type = ['%s%s' % (('aired', 'dvd')['dvd' in page_type], x) for x in ('season', 'episodenumber')] for cur_item in items: try: heading_tag = cur_item.find(class_='list-group-item-heading') sxe = heading_tag.find(class_='episode-label').get_text(strip=True) ep_season, ep_episode = [try_int(x) for x in rc_sxe.findall(sxe)[0]] link_ep_tag = heading_tag.find(href=rc_episode) or {} link_match = rc_episode.search(link_ep_tag.get('href', '')) ep_id = link_match and try_int(link_match.group('ep_id'), None) ep_name = link_ep_tag.get_text(strip=True) # ep_network = None # extra field ep_aired = None for cur_tag in cur_item.find('ul').find_all('li'): text = cur_tag.get_text(strip=True) if rc_date.search(text): ep_aired = parse(text).strftime('%Y-%m-%d') # elif text in show_data['data']['network']: # unreliable data # ep_network = text ep_overview = None item_tag = cur_item.find(class_='list-group-item-text') if item_tag: ep_overview = self.clean_overview(item_tag.get_text() or '') ep_filename = None link_ep_tag = item_tag.find(href=rc_episode) or None if link_ep_tag: ep_filename = (link_ep_tag.find('img') or {}).get('src', '') episode_data['data'].append({ 'id': ep_id, season_type: ep_season, episode_type: ep_episode, 'episodename': ep_name, 'firstaired': ep_aired, 'overview': ep_overview, 'filename': ep_filename, # 'network': ep_network }) if not show_data['firstaired'] and ep_aired \ and (1, 1) == (ep_season, ep_episode): show_data['firstaired'] = ep_aired episode_data['fallback'] = True except (BaseException, Exception): continue if episode_data_found and not episode_data: if pages_loaded < page_count or 0 == page_count: episode_data_broken = True continue else: break if None is episode_data and not bool(episodes) and not episode_data_found: raise TvdbError('Exception retrieving episodes for show') if isinstance(episode_data, dict) and not episode_data.get('data', []): if start_page != page: self.not_found = False break if not getattr(self, 'not_found', False) and None is not episode_data.get('data'): episodes.extend(episode_data['data']) next_link = episode_data.get('links', {}).get('next', None) # check if page is a valid following page if not isinstance(next_link, integer_types) or next_link <= page: next_link = None if isinstance(episode_data, dict) and 'links' in episode_data \ and isinstance(episode_data['links'], dict) and 'last' in episode_data['links'] \ and isinstance(episode_data['links']['last'], int) \ and episode_data['links']['last'] > page_count: page_count = episode_data['links']['last'] if not next_link and isinstance(episode_data, dict) \ and isinstance(episode_data.get('data', []), list) and \ (100 > len(episode_data.get('data', [])) or episode_data.get('fallback')): break if isinstance(next_link, int) and page + 1 == next_link: page = next_link else: page += 1 ep_map_keys = {'absolutenumber': 'absolute_number', 'airedepisodenumber': 'episodenumber', 'airedseason': 'seasonnumber', 'airedseasonid': 'seasonid', 'dvdepisodenumber': 'dvd_episodenumber', 'dvdseason': 'dvd_season'} for cur_ep in episodes: if self.config['dvdorder']: log.debug('Using DVD ordering.') use_dvd = None is not cur_ep.get('dvdseason') and None is not cur_ep.get('dvdepisodenumber') else: use_dvd = False if use_dvd: elem_seasnum, elem_epno = cur_ep.get('dvdseason'), cur_ep.get('dvdepisodenumber') else: elem_seasnum, elem_epno = cur_ep.get('airedseason'), cur_ep.get('airedepisodenumber') if None is elem_seasnum or None is elem_epno: log.warning('An episode has incomplete season/episode number (season: %r, episode: %r)' % ( elem_seasnum, elem_epno)) continue # Skip to next episode # float() is because https://github.com/dbr/tvnamer/issues/95 - should probably be fixed in TVDB data seas_no = int(float(elem_seasnum)) ep_no = int(float(elem_epno)) if not cur_ep.get('network'): cur_ep['network'] = self.ti_shows[sid].network for k, v in iteritems(cur_ep): k = k.lower() if None is not v: if 'filename' == k and v: if '://' not in v: v = self._make_image(self.config['url_artworks'], v) else: v = clean_data(v) if k in ep_map_keys: k = ep_map_keys[k] self._set_item(sid, seas_no, ep_no, k, v) crew = CrewList() cast = CastList() try: for director in cur_ep.get('directors', []): crew[RoleTypes.CrewDirector].append(TVInfoPerson(name=director)) except (BaseException, Exception): pass try: for guest in cur_ep.get('gueststars_list', []): cast[RoleTypes.ActorGuest].append(TVInfoCharacter(person=[TVInfoPerson(name=guest)], show=self.ti_shows[sid])) except (BaseException, Exception): pass try: for writers in cur_ep.get('writers', []): crew[RoleTypes.CrewWriter].append(TVInfoPerson(name=writers)) except (BaseException, Exception): pass self._set_item(sid, seas_no, ep_no, 'crew', crew) self._set_item(sid, seas_no, ep_no, 'cast', cast) self.ti_shows[sid].ep_loaded = True return True def _name_to_sid(self, name): """Takes show name, returns the correct series ID (if the show has already been grabbed), or grabs all episodes and returns the correct SID. """ if name in self.corrections: log.debug('Correcting %s to %s' % (name, self.corrections[name])) return self.corrections[name] else: log.debug('Getting show %s' % name) selected_series = self.get_series(name) if isinstance(selected_series, dict): selected_series = [selected_series] sids = [int(x['id']) for x in selected_series if self._get_show_data(int(x['id']), self.config['language'])] self.corrections.update(dict([(x['seriesname'], int(x['id'])) for x in selected_series])) return sids def _get_languages(self): if not Tvdb._supported_languages: Tvdb._supported_languages = [{'id': _l, 'name': None, 'nativeName': None, 'sg_lang': _l} for _l in self.config['valid_languages']] def main(): """Simple example of using tvdb_api - it just grabs an episode name interactively. """ import logging logging.basicConfig(level=logging.DEBUG) tvdb_instance = Tvdb(interactive=True, cache=False) print(tvdb_instance['Lost']['seriesname']) print(tvdb_instance['Lost'][1][4]['episodename']) if '__main__' == __name__: main()