mirror of
https://github.com/SickGear/SickGear.git
synced 2025-01-10 12:03:38 +00:00
1538 lines
53 KiB
Python
1538 lines
53 KiB
Python
import re
|
|
from datetime import datetime
|
|
import requests
|
|
from urllib.parse import quote
|
|
from urllib3.util.retry import Retry
|
|
from requests.adapters import HTTPAdapter
|
|
from . import endpoints
|
|
from .exceptions import *
|
|
|
|
# noinspection PyUnreachableCode
|
|
if False:
|
|
from typing import Any, AnyStr, Dict, List, Optional, Union
|
|
|
|
|
|
class Show(object):
|
|
def __init__(self, data):
|
|
self.status = data.get('status') # type: Optional[AnyStr]
|
|
self.rating = data.get('rating') # type: Optional[int]
|
|
self.genres = data.get('genres') # type: Optional[List[AnyStr]]
|
|
self.weight = data.get('weight') # type: Optional[int]
|
|
self.updated = data.get('updated')
|
|
self.name = data.get('name') # type: Optional[AnyStr]
|
|
self.language = data.get('language') # type: Optional[AnyStr]
|
|
self.schedule = data.get('schedule')
|
|
self.url = data.get('url') # type: Optional[AnyStr]
|
|
self.official_site = data.get('officialSite') # type: Optional[AnyStr]
|
|
self.image = data.get('image')
|
|
self.externals = data.get('externals')
|
|
self.premiered = data.get('premiered') # type: Optional[AnyStr]
|
|
self.summary = _remove_tags(data.get('summary', '')) # type: Optional[AnyStr]
|
|
self.links = data.get('_links')
|
|
if data.get('webChannel'):
|
|
self.web_channel = WebChannel(data.get('webChannel')) # type: Optional[WebChannel]
|
|
else:
|
|
self.web_channel = None
|
|
self.runtime = data.get('runtime') # type: Optional[int]
|
|
self.average_runtime = data.get('averageRuntime')
|
|
self.type = data.get('type') # type: Optional[AnyStr]
|
|
self.id = data.get('id') # type: int
|
|
self.maze_id = self.id # type: int
|
|
if data.get('network'):
|
|
self.network = Network(data.get('network')) # type: Optional[Network]
|
|
else:
|
|
self.network = None
|
|
self.__episodes = None # type: Optional[List[Episode]]
|
|
self._specials_loaded = False # type: bool
|
|
self._seasons = None # type: Optional[List[Season]]
|
|
self._cast = None # type: Optional[List[Cast]]
|
|
self._crew = None # type: Optional[List[Crew]]
|
|
self._images = None # type: Optional[List[Image]]
|
|
self._akas = None # type: Optional[List[AnyStr]]
|
|
self.__nextepisode = None # type: Optional[Episode]
|
|
self.__previousepisode = None # type: Optional[Episode]
|
|
self.populate(data)
|
|
|
|
def __repr__(self):
|
|
if self.premiered:
|
|
year = str(self.premiered[:4])
|
|
else:
|
|
year = None
|
|
if self.web_channel:
|
|
platform = ',show_web_channel='
|
|
network = self.web_channel.name
|
|
elif self.network:
|
|
platform = ',network='
|
|
network = self.network.name
|
|
else:
|
|
platform = ''
|
|
network = ''
|
|
|
|
return f'<Show(maze_id={self.maze_id}, name={self.name}, year={year}{platform}{network})>'
|
|
|
|
def __str__(self):
|
|
n = ''
|
|
if self.network:
|
|
n = ' (%s)' % self.network.name
|
|
return f'{self.name}{n} [{self.premiered}]'
|
|
|
|
def __iter__(self):
|
|
return iter(self.seasons.values())
|
|
|
|
# Python 3 bool evaluation
|
|
def __bool__(self):
|
|
return bool(self.id)
|
|
|
|
# Python 2 bool evaluation
|
|
def __nonzero__(self):
|
|
return bool(self.id)
|
|
|
|
def __len__(self):
|
|
return len(self.seasons)
|
|
|
|
def __getitem__(self, item):
|
|
try:
|
|
return self.seasons[item]
|
|
except KeyError:
|
|
raise SeasonNotFound(f'Season {item} does not exist for show {self.name}.')
|
|
|
|
@property
|
|
def akas(self):
|
|
if None is self._akas:
|
|
self._akas = show_akas(self.maze_id, raise_error=False)
|
|
return self._akas
|
|
|
|
@property
|
|
def crew(self):
|
|
if None is self._crew:
|
|
self._crew = get_show_crew(self.maze_id, raise_error=False)
|
|
return self._crew
|
|
|
|
@property
|
|
def images(self):
|
|
if None is self._images:
|
|
self._images = get_show_images(self.maze_id, raise_error=False)
|
|
return self._images
|
|
|
|
@property
|
|
def cast(self):
|
|
if None is self._cast:
|
|
self._cast = show_cast(self.maze_id, raise_error=False)
|
|
return self._cast
|
|
|
|
@property
|
|
def next_episode(self):
|
|
if self.__nextepisode is None and 'nextepisode' in self.links and 'href' in self.links['nextepisode']:
|
|
episode_id = self.links['nextepisode']['href'].rsplit('/', 1)[1]
|
|
if episode_id.isdigit():
|
|
self.__nextepisode = episode_by_id(episode_id, show=self, raise_error=False)
|
|
return self.__nextepisode
|
|
|
|
@property
|
|
def previous_episode(self):
|
|
if self.__previousepisode is None and 'previousepisode' in self.links\
|
|
and 'href' in self.links['previousepisode']:
|
|
episode_id = self.links['previousepisode']['href'].rsplit('/', 1)[1]
|
|
if episode_id.isdigit():
|
|
self.__previousepisode = episode_by_id(episode_id, show=self, raise_error=False)
|
|
return self.__previousepisode
|
|
|
|
def _set_season_numbers(self):
|
|
seasons = show_seasons(self.maze_id, show=self, raise_error=False)
|
|
self._seasons = dict()
|
|
for episode in self.__episodes:
|
|
season_num = int(episode.season_number)
|
|
if season_num not in self._seasons:
|
|
self._seasons[season_num] = seasons[season_num]
|
|
self._seasons[season_num].show = self
|
|
|
|
@property
|
|
def seasons(self):
|
|
# type: (...) -> Dict
|
|
if None is self._seasons:
|
|
# noinspection PyStatementEffect
|
|
self.episodes
|
|
return self._seasons
|
|
|
|
@property
|
|
def episodes(self):
|
|
if None is self.__episodes or not self._specials_loaded:
|
|
self.__episodes = episode_list(self.maze_id, specials=True, raise_error=False, show=self)
|
|
self._specials_loaded = True
|
|
self._set_season_numbers()
|
|
return self.__episodes
|
|
|
|
def populate(self, data):
|
|
embedded = data.get('_embedded')
|
|
if embedded:
|
|
episodes = embedded.get('episodeswithspecials') or embedded.get('episodes')
|
|
if episodes:
|
|
self.__episodes = []
|
|
for episode in episodes:
|
|
self.__episodes.append(Episode(episode, self))
|
|
self._set_season_numbers()
|
|
if 'episodeswithspecials' in embedded:
|
|
self._specials_loaded = True
|
|
if embedded.get('cast'):
|
|
self._cast = Cast(embedded.get('cast'))
|
|
|
|
|
|
class Season(object):
|
|
def __init__(self, data, show=None, season_number=None):
|
|
self.show = show
|
|
if None is not season_number and None is not show and show.episodes:
|
|
self.__episodes = [ep for ep in show.episodes if season_number == ep.season_number]
|
|
else:
|
|
self.__episodes = None
|
|
self.id = data.get('id')
|
|
self.url = data.get('url')
|
|
self.season_number = data.get('number')
|
|
self.name = data.get('name')
|
|
self.episode_order = data.get('episodeOrder')
|
|
self.premiere_date = data.get('premiereDate')
|
|
self.end_date = data.get('endDate')
|
|
if data.get('network'):
|
|
self.network = Network(data.get('network'))
|
|
else:
|
|
self.network = None
|
|
if data.get('webChannel'):
|
|
self.web_channel = WebChannel(data.get('webChannel'))
|
|
else:
|
|
self.web_channel = None
|
|
self.image = data.get('image')
|
|
self.summary = data.get('summary')
|
|
self.links = data.get('_links')
|
|
|
|
def __repr__(self):
|
|
return f'<Season(id={self.id}, season_number={self.season_number})>'
|
|
|
|
def _get_showname(self):
|
|
sn = ''
|
|
if self.show:
|
|
sn = '%s - ' % self.show.name
|
|
return f'{sn}Season {self.season_number:0>2} ({len(self.episodes)}/{self.episode_order})'
|
|
|
|
def __str__(self):
|
|
return self._get_showname()
|
|
|
|
def __iter__(self):
|
|
return iter(self.episodes)
|
|
|
|
def __len__(self):
|
|
return len(self.episodes)
|
|
|
|
def __getitem__(self, item):
|
|
try:
|
|
return self.episodes[item]
|
|
except KeyError:
|
|
raise EpisodeNotFound(f'Episode {item} does not exist for season {self.season_number} of show {self.show}.')
|
|
|
|
# Python 3 bool evaluation
|
|
def __bool__(self):
|
|
return bool(self.id)
|
|
|
|
# Python 2 bool evaluation
|
|
def __nonzero__(self):
|
|
return bool(self.id)
|
|
|
|
@property
|
|
def episodes(self):
|
|
# type: (...) -> List[Episode]
|
|
if None is self.__episodes or not self.show._specials_loaded:
|
|
self.__episodes = [ep for ep in episode_list(self.show.maze_id, specials=True, raise_error=False,
|
|
show=self.show) if ep.season_number == self.season_number]
|
|
self.show._specials_loaded = True
|
|
return self.__episodes
|
|
|
|
|
|
class Episode(object):
|
|
def __init__(self, data, show=None):
|
|
self.title = data.get('name')
|
|
self.airdate = data.get('airdate')
|
|
self.url = data.get('url')
|
|
self.season_number = data.get('season')
|
|
self.episode_number = data.get('number')
|
|
self.image = data.get('image')
|
|
self.airstamp = data.get('airstamp')
|
|
self.airtime = data.get('airtime')
|
|
self.runtime = data.get('runtime')
|
|
self.summary = _remove_tags(data.get('summary'))
|
|
self.maze_id = data.get('id')
|
|
self.type = data.get('type') # type: AnyStr # ("regular", "significant_special", "insignificant_special")
|
|
self.special = self.is_special()
|
|
# Reference to show for when using get_schedule()
|
|
self.show = show
|
|
if not show:
|
|
if data.get('show'):
|
|
self.show = Show(data.get('show'))
|
|
# Reference to show for when using get_full_schedule()
|
|
if not self.show and data.get('_embedded'):
|
|
if data['_embedded'].get('show'):
|
|
self.show = Show(data['_embedded']['show'])
|
|
if not self.show and data.get('_links') and 'show' in data['_links']:
|
|
self.show = Show({})
|
|
self.show.id = int(re.search(r'/(\d+)$', data['_links']['show']['href']).group(1))
|
|
|
|
def __repr__(self):
|
|
if self.special:
|
|
epnum = 'Special'
|
|
else:
|
|
epnum = self.episode_number
|
|
return f'<Episode(season={self.season_number:0>2}, episode_number={epnum:0>2})>'
|
|
|
|
def _gen_ep_name(self):
|
|
season = f'S{self.season_number:0>2}'
|
|
if self.special:
|
|
episode = ' Special'
|
|
else:
|
|
episode = f'E{self.episode_number:0>2}'
|
|
sn = ''
|
|
if self.show:
|
|
sn = f'{self.show.name} - ' % self.show.name
|
|
return f'{sn}{season}{episode} {self.title}'
|
|
|
|
def __str__(self):
|
|
return self._gen_ep_name()
|
|
|
|
def is_special(self):
|
|
if self.type in ('significant_special', 'insignificant_special'):
|
|
return True
|
|
elif 'regular' == self.type:
|
|
return False
|
|
# fallback for previous api without type field
|
|
if self.episode_number:
|
|
return False
|
|
return True
|
|
|
|
|
|
class Person(object):
|
|
def __init__(self, data):
|
|
if data.get('person'):
|
|
data = data['person']
|
|
self.links = data.get('_links')
|
|
self.id = data.get('id')
|
|
self.image = data.get('image')
|
|
self.name = data.get('name')
|
|
self.score = data.get('score')
|
|
self.url = data.get('url')
|
|
self.birthday = data.get('birthday')
|
|
self.death_day = data.get('deathday')
|
|
self.gender = data.get('gender')
|
|
self.country = data.get('country')
|
|
self.character = None
|
|
self._castcredits = None
|
|
self._guestcastcredits = None
|
|
self._crewcredits = None
|
|
# self.populate(data)
|
|
|
|
# def populate(self, data):
|
|
# if data.get('_embedded'):
|
|
# if data['_embedded'].get('castcredits'):
|
|
# self._castcredits = [CastCredit(credit)
|
|
# for credit in data['_embedded']['castcredits']]
|
|
# elif data['_embedded'].get('crewcredits'):
|
|
# self.crewcredits = [CrewCredit(credit)
|
|
# for credit in data['_embedded']['crewcredits']]
|
|
|
|
@property
|
|
def castcredits(self):
|
|
if None is self._castcredits:
|
|
self._castcredits = person_cast_credits(self.id, embed='show,character', raise_error=False)
|
|
return self._castcredits
|
|
|
|
@property
|
|
def guestcastcredits(self):
|
|
if None is self._guestcastcredits:
|
|
self._guestcastcredits = person_guestcast_credits(self.id, embed='episode,character', raise_error=False)
|
|
return self._guestcastcredits
|
|
|
|
@property
|
|
def crewcredits(self):
|
|
if None is self._crewcredits:
|
|
self._crewcredits = person_crew_credits(self.id, embed='show', raise_error=False)
|
|
return self._crewcredits
|
|
|
|
def __repr__(self):
|
|
return f'<Person(name={self.name}, maze_id={self.id})>'
|
|
|
|
def _gen_lifetime(self):
|
|
life = ''
|
|
if self.birthday:
|
|
life = f'{self.birthday}'
|
|
if self.death_day:
|
|
if 0 < len(life):
|
|
life += ' - '
|
|
life = f'{self.death_day}'
|
|
if 0 < len(life):
|
|
life = f' ({life})'
|
|
return life
|
|
|
|
def __str__(self):
|
|
return f'{self.name}{self._gen_lifetime()}'
|
|
|
|
|
|
class Character(object):
|
|
def __init__(self, data, base_data=None):
|
|
base_data = {} if None is base_data else base_data
|
|
self.id = data.get('id')
|
|
self.url = data.get('url')
|
|
self.name = data.get('name')
|
|
self.image = data.get('image')
|
|
self.links = data.get('_links')
|
|
self.plays_self = base_data.get('self')
|
|
self.voice = base_data.get('voice')
|
|
self.person = None # type: Optional[Person]
|
|
|
|
def __repr__(self):
|
|
return f'<Character(name={self.name}, maze_id={self.id})>'
|
|
|
|
def _get_person(self):
|
|
p = ''
|
|
if self.person:
|
|
p = f' ({self.person.name})'
|
|
return p
|
|
|
|
def __str__(self):
|
|
return f'{self.name}{self._get_person()}'
|
|
|
|
|
|
class Cast(object):
|
|
def __init__(self, data):
|
|
self.people = []
|
|
self.characters = [] # type: List[Character]
|
|
self.populate(data)
|
|
|
|
def populate(self, data):
|
|
if isinstance(data, list):
|
|
for cast_member in data[:30]:
|
|
self.people.append(Person(cast_member['person']))
|
|
self.characters.append(Character(cast_member['character'], cast_member))
|
|
self.people[-1].character = self.characters[-1] # add reference to character
|
|
self.characters[-1].person = self.people[-1] # add reference to cast member
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
def __str__(self):
|
|
return f'{self.characters} ({self.people})'
|
|
|
|
|
|
class CastCredit(object):
|
|
def __init__(self, data):
|
|
self.links = data.get('_links')
|
|
self.character = None # type: Optional[Character]
|
|
self.show = None # type: Optional[Show]
|
|
self.episode = None # type: Optional[Episode]
|
|
self.populate(data)
|
|
|
|
def populate(self, data):
|
|
if data.get('_embedded'):
|
|
if data['_embedded'].get('character'):
|
|
self.character = Character(data['_embedded']['character'])
|
|
if data['_embedded'].get('show'):
|
|
self.show = Show(data['_embedded']['show'])
|
|
if data['_embedded'].get('episode'):
|
|
self.episode = Episode(data['_embedded']['episode'])
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
def __str__(self):
|
|
return f'{self.character} ({self.show})'
|
|
|
|
|
|
class CrewCredit(object):
|
|
def __init__(self, data):
|
|
self.links = data.get('_links')
|
|
self.type = data.get('type')
|
|
self.show = None
|
|
self.populate(data)
|
|
|
|
def populate(self, data):
|
|
if data.get('_embedded'):
|
|
if data['_embedded'].get('show'):
|
|
self.show = Show(data['_embedded']['show'])
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
def __str__(self):
|
|
return f'{self.type} ({self.show})'
|
|
|
|
|
|
class Crew(object):
|
|
def __init__(self, data):
|
|
self.person = Person(data.get('person'))
|
|
self.type = data.get('type')
|
|
|
|
def __str__(self):
|
|
return f'{self.type}: {self.person}'
|
|
|
|
def __repr__(self):
|
|
return f'<Crew(name={self.person.name}, maze_id={self.person.id}, type={self.type})>'
|
|
|
|
|
|
class Image(object):
|
|
def __init__(self, data):
|
|
self.main = data.get('main') # type: bool
|
|
self.type = data.get('type') # type: AnyStr
|
|
self.id = data.get('id') # type: int
|
|
self.resolutions = data.get('resolutions') # type: Dict[AnyStr, AnyStr]
|
|
|
|
def _get_type_name(self):
|
|
return ('unknown', self.type)[isinstance(self.type, str)]
|
|
|
|
def _get_main_str(self):
|
|
return ('Other', 'Main')[True is self.main]
|
|
|
|
def __str__(self):
|
|
return f'{self._get_type_name()}: {self._get_main_str()}'
|
|
|
|
def __repr__(self):
|
|
return f'<Image(main={self.main}, maze_id={self.id}, type={self._get_type_name()})>'
|
|
|
|
|
|
class Updates(object):
|
|
def __init__(self, data):
|
|
self.updates = dict()
|
|
self.populate(data)
|
|
|
|
def populate(self, data):
|
|
for maze_id, time in data.items():
|
|
self.updates[int(maze_id)] = Update(maze_id, time)
|
|
|
|
def __getitem__(self, item):
|
|
try:
|
|
return self.updates[item]
|
|
except KeyError:
|
|
logger.error(f'No update found for Maze id {item}.')
|
|
raise UpdateNotFound(f'No update found for Maze id {item}.')
|
|
|
|
def __iter__(self):
|
|
return iter(self.updates.values())
|
|
|
|
|
|
class Update(object):
|
|
def __init__(self, maze_id, time):
|
|
self.maze_id = int(maze_id)
|
|
self.seconds_since_epoch = time
|
|
self.timestamp = datetime.fromtimestamp(time)
|
|
|
|
def __repr__(self):
|
|
return f'<Update(maze_id={self.maze_id}, time={self.seconds_since_epoch})>'
|
|
|
|
|
|
class AKA(object):
|
|
def __init__(self, data):
|
|
self.name = data.get('name')
|
|
self.country = data.get('country')
|
|
|
|
def __repr__(self):
|
|
return f'<AKA(name={self.name}, country={self.country})>'
|
|
|
|
def _get_country(self):
|
|
c = ''
|
|
if self.country:
|
|
c = f' ({self.country.get("name")})'
|
|
return c
|
|
|
|
def __str__(self):
|
|
return f'{self.name}{self.name}'
|
|
|
|
|
|
class NetworkBase(object):
|
|
def __init__(self, data, *args, **kwargs):
|
|
self.name = data.get('name')
|
|
self.maze_id = data.get('id')
|
|
if data.get('country'):
|
|
self.country = data['country'].get('name')
|
|
self.timezone = data['country'].get('timezone')
|
|
self.code = data['country'].get('code')
|
|
else:
|
|
self.country = None
|
|
self.timezone = None
|
|
self.code = None
|
|
|
|
def _get_country(self):
|
|
c = ''
|
|
if self.country:
|
|
c = f' ({self.country})'
|
|
return c
|
|
|
|
def __str__(self):
|
|
return f'{self.name}{self._get_country()}'
|
|
|
|
|
|
class Network(NetworkBase):
|
|
def __init__(self, data):
|
|
super(Network, self).__init__(data)
|
|
|
|
def __repr__(self):
|
|
return f'<Network(name={self.name}, country={self.country})>'
|
|
|
|
|
|
class WebChannel(NetworkBase):
|
|
def __init__(self, data):
|
|
super(WebChannel, self).__init__(data)
|
|
|
|
def __repr__(self):
|
|
return f'<WebChannel(name={self.name}, country={self.country})>'
|
|
|
|
|
|
class FollowedShow(object):
|
|
def __init__(self, data):
|
|
self.maze_id = data.get('show_id')
|
|
self.show = None
|
|
if data.get('_embedded'):
|
|
self.show = Show(data['_embedded'].get('show'))
|
|
|
|
def __repr__(self):
|
|
return f'<FollowedShow(maze_id={self.maze_id})>'
|
|
|
|
|
|
class FollowedPerson(object):
|
|
def __init__(self, data):
|
|
self.person_id = data.get('person_id')
|
|
self.person = None
|
|
if data.get('_embedded'):
|
|
self.person = Person(data['_embedded'].get('person'))
|
|
|
|
def __repr__(self):
|
|
return f'<FollowedPerson(person_id={self.person_id})>'
|
|
|
|
|
|
class FollowedNetwork(object):
|
|
def __init__(self, data):
|
|
self.network_id = data.get('network_id')
|
|
self.network = None
|
|
if data.get('_embedded'):
|
|
self.network = Network(data['_embedded'].get('network'))
|
|
|
|
def __repr__(self):
|
|
return f'<FollowedNetwork(network_id={self.network_id})>'
|
|
|
|
|
|
class FollowedWebChannel(object):
|
|
def __init__(self, data):
|
|
self.web_channel_id = data.get('webchannel_id')
|
|
self.web_channel = None
|
|
if data.get('_embedded'):
|
|
self.web_channel = WebChannel(data['_embedded'].get('webchannel'))
|
|
|
|
def __repr__(self):
|
|
return f'<FollowedWebChannel(web_channel_id={self.web_channel_id})>'
|
|
|
|
|
|
class MarkedEpisode(object):
|
|
def __init__(self, data):
|
|
self.episode_id = data.get('episode_id')
|
|
self.marked_at = data.get('marked_at')
|
|
type_ = data.get('type')
|
|
types = {0: 'watched', 1: 'acquired', 2: 'skipped'}
|
|
self.type = types[type_]
|
|
|
|
def __repr__(self):
|
|
return f'<MarkedEpisode(episode_id={self.episode_id}, marked_at={self.marked_at}, type={self.type})>'
|
|
|
|
|
|
class VotedShow(object):
|
|
def __init__(self, data):
|
|
self.maze_id = data.get('show_id')
|
|
self.voted_at = data.get('voted_at')
|
|
self.vote = data.get('vote')
|
|
if data.get('_embedded'):
|
|
self.show = Show(data['_embedded'].get('show'))
|
|
|
|
def __repr__(self):
|
|
return f'<VotedShow(maze_id={self.maze_id}, voted_at={self.voted_at}, vote={self.vote})>'
|
|
|
|
|
|
class VotedEpisode(object):
|
|
def __init__(self, data):
|
|
self.episode_id = data.get('episode_id')
|
|
self.voted_at = data.get('voted_at')
|
|
self.vote = data.get('vote')
|
|
|
|
def __repr__(self):
|
|
return f'<VotedEpisode(episode_id={self.episode_id}, voted_at={self.voted_at}, vote={self.vote})>'
|
|
|
|
|
|
def _url_quote(show):
|
|
return quote(show.encode('UTF-8'))
|
|
|
|
|
|
def _remove_tags(text):
|
|
if not text:
|
|
return None
|
|
return re.sub(r'<.*?>', '', text)
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
def _record_hook(r, *args, **kwargs):
|
|
r.hook_called = True
|
|
if 301 == r.status_code and isinstance(r.headers.get('Location'), str) \
|
|
and r.headers.get('Location').startswith('http://api.tvmaze'):
|
|
r.headers['Location'] = r.headers['Location'].replace('http://', 'https://')
|
|
return r
|
|
|
|
|
|
def _embed_url(base_url, embed, possible_embeds, glue):
|
|
if isinstance(embed, str):
|
|
embed_words = [em.strip() for em in embed.split(',')]
|
|
elif None is embed:
|
|
embed_words = []
|
|
else:
|
|
embed_words = None
|
|
if None is embed_words or any(ew not in possible_embeds for ew in embed_words):
|
|
raise InvalidEmbedValue('Value for embed must be %s' % possible_embeds)
|
|
if embed_words:
|
|
if 1 == len(embed_words):
|
|
return '%s%sembed=%s' % (base_url, glue, embed_words[0])
|
|
else:
|
|
eu = ''
|
|
for word in embed_words:
|
|
eu += '%sembed[]=%s' % (glue, word)
|
|
glue = '&'
|
|
return '%s%s' % (base_url, eu)
|
|
return base_url
|
|
|
|
|
|
class TVmaze(object):
|
|
"""This is the main class of the module enabling interaction with both free and Premium
|
|
TVmaze features.
|
|
|
|
Attributes:
|
|
username (str): Username for https://www.tvmaze.com
|
|
api_key (str): TVmaze api key. Find your key at https://www.tvmaze.com/dashboard
|
|
|
|
"""
|
|
|
|
def __init__(self, username=None, api_key=None):
|
|
self.username = username
|
|
self.api_key = api_key
|
|
|
|
# Query TVmaze free endpoints
|
|
@staticmethod
|
|
def endpoint_standard_get(url):
|
|
# type: (str) -> Any
|
|
s = requests.Session()
|
|
retries = Retry(total=5,
|
|
backoff_factor=0.1,
|
|
status_forcelist=[429])
|
|
s.mount('http://', HTTPAdapter(max_retries=retries))
|
|
s.mount('https://', HTTPAdapter(max_retries=retries))
|
|
try:
|
|
r = s.get(url, hooks={'response': _record_hook})
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(repr(e))
|
|
raise ConnectionError(repr(e))
|
|
|
|
s.close()
|
|
|
|
if r.status_code in [404, 422]:
|
|
return None
|
|
|
|
if r.status_code == 400:
|
|
raise BadRequest(f'Bad Request for url {url}')
|
|
|
|
results = r.json()
|
|
return results
|
|
|
|
# Query TVmaze Premium endpoints
|
|
def _endpoint_premium_get(self, url):
|
|
# type: (str) -> Any
|
|
s = requests.Session()
|
|
retries = Retry(total=5,
|
|
backoff_factor=0.1,
|
|
status_forcelist=[429])
|
|
s.mount('http://', HTTPAdapter(max_retries=retries))
|
|
s.mount('https://', HTTPAdapter(max_retries=retries))
|
|
try:
|
|
r = s.get(url, auth=(self.username, self.api_key), hooks={'response': _record_hook})
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(repr(e))
|
|
raise ConnectionError(repr(e))
|
|
|
|
s.close()
|
|
|
|
if r.status_code in [404, 422]:
|
|
return None
|
|
|
|
if r.status_code == 400:
|
|
raise BadRequest(f'Bad Request for url {url}')
|
|
|
|
results = r.json()
|
|
return results
|
|
|
|
def _endpoint_premium_delete(self, url):
|
|
# type: (str) -> Any
|
|
s = requests.Session()
|
|
retries = Retry(total=5,
|
|
backoff_factor=0.1,
|
|
status_forcelist=[429])
|
|
s.mount('http://', HTTPAdapter(max_retries=retries))
|
|
s.mount('https://', HTTPAdapter(max_retries=retries))
|
|
try:
|
|
r = s.delete(url, auth=(self.username, self.api_key), hooks={'response': _record_hook})
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(repr(e))
|
|
raise ConnectionError(repr(e))
|
|
|
|
s.close()
|
|
|
|
if r.status_code == 400:
|
|
logger.error(f'Bad Request for url {url}')
|
|
raise BadRequest(f'Bad Request for url {url}')
|
|
|
|
if r.status_code == 200:
|
|
return True
|
|
|
|
if r.status_code == 404:
|
|
return None
|
|
|
|
def _endpoint_premium_put(self, url, payload=None):
|
|
# type: (str, Any) -> Any
|
|
s = requests.Session()
|
|
retries = Retry(total=5,
|
|
backoff_factor=0.1,
|
|
status_forcelist=[429])
|
|
s.mount('http://', HTTPAdapter(max_retries=retries))
|
|
s.mount('https://', HTTPAdapter(max_retries=retries))
|
|
try:
|
|
r = s.put(url, data=payload, auth=(self.username, self.api_key), hooks={'response': _record_hook})
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(repr(e))
|
|
raise ConnectionError(repr(e))
|
|
|
|
s.close()
|
|
|
|
if r.status_code == 400:
|
|
raise BadRequest(f'Bad Request for url {url}')
|
|
|
|
if r.status_code == 200:
|
|
return True
|
|
|
|
if r.status_code in [404, 422]:
|
|
return None
|
|
|
|
# Get Show object
|
|
def get_show(self, maze_id=None, tvdb_id=None, tvrage_id=None, imdb_id=None, show_name=None,
|
|
show_year=None, show_network=None, show_language=None, show_country=None,
|
|
show_web_channel=None, embed=None):
|
|
# type: (...) -> Show
|
|
"""
|
|
Get Show object directly via id or indirectly via name + optional qualifiers
|
|
|
|
If only a show_name is given, the show with the highest score using the
|
|
tvmaze algorithm will be returned.
|
|
If you provide extra qualifiers such as network or language they will be
|
|
used for a more specific match, if one exists.
|
|
Args:
|
|
maze_id: Show maze_id
|
|
tvdb_id: Show tvdb_id
|
|
tvrage_id: Show tvrage_id
|
|
imdb_id: Show tvrage_id
|
|
show_name: Show name to be searched
|
|
show_year: Show premiere year
|
|
show_network: Show TV Network (like ABC, NBC, etc.)
|
|
show_web_channel: Show Web Channel (like Netflix, Amazon, etc.)
|
|
show_language: Show language
|
|
show_country: Show country
|
|
embed: embed parameter to include additional data. Values: 'episodes', 'cast', 'episodeswithspecials'
|
|
"""
|
|
errors = []
|
|
if not (maze_id or tvdb_id or tvrage_id or imdb_id or show_name):
|
|
raise MissingParameters(
|
|
'Either maze_id, tvdb_id, tvrage_id, imdb_id or show_name are required to get show, none provided,')
|
|
if maze_id:
|
|
try:
|
|
return show_main_info(maze_id, embed=embed)
|
|
except IDNotFound as e:
|
|
errors.append(e.value)
|
|
if tvdb_id:
|
|
try:
|
|
return show_main_info(lookup_tvdb(tvdb_id).id, embed=embed)
|
|
except IDNotFound as e:
|
|
errors.append(e.value)
|
|
if tvrage_id:
|
|
try:
|
|
return show_main_info(lookup_tvrage(tvrage_id).id, embed=embed)
|
|
except IDNotFound as e:
|
|
errors.append(e.value)
|
|
if imdb_id:
|
|
try:
|
|
return show_main_info(lookup_imdb(imdb_id).id, embed=embed)
|
|
except IDNotFound as e:
|
|
errors.append(e.value)
|
|
if show_name:
|
|
try:
|
|
show = self._get_show_by_search(show_name, show_year, show_network, show_language,
|
|
show_country, show_web_channel, embed=embed)
|
|
return show
|
|
except ShowNotFound as e:
|
|
errors.append(e.value)
|
|
raise ShowNotFound(' ,'.join(errors))
|
|
|
|
@staticmethod
|
|
def _get_show_with_qualifiers(show_name, qualifiers):
|
|
# type: (str, List) -> Show
|
|
shows = get_show_list(show_name)
|
|
best_match = -1 # Initialize match value score
|
|
show_match = None
|
|
|
|
for show in shows:
|
|
if show.premiered:
|
|
premiered = show.premiered[:-6].lower()
|
|
else:
|
|
premiered = None
|
|
if show.network and show.network.name:
|
|
network = show.network.name.lower()
|
|
else:
|
|
network = None
|
|
if show.web_channel and show.web_channel.name:
|
|
web_channel = show.web_channel.name.lower()
|
|
else:
|
|
web_channel = None
|
|
if show.network and show.network.code:
|
|
country = show.network.code.lower()
|
|
else:
|
|
if show.web_channel and show.web_channel.code:
|
|
country = show.web_channel.code.lower()
|
|
else:
|
|
country = None
|
|
if show.language:
|
|
language = show.language.lower()
|
|
else:
|
|
language = None
|
|
|
|
attributes = [premiered, country, network, language, web_channel]
|
|
show_score = len(set(qualifiers) & set(attributes))
|
|
if show_score > best_match:
|
|
best_match = show_score
|
|
show_match = show
|
|
return show_match
|
|
|
|
# Search with user-defined qualifiers, used by get_show() method
|
|
def _get_show_by_search(self, show_name, show_year, show_network, show_language, show_country,
|
|
show_web_channel, embed):
|
|
# type: (str, int, str, str, str, str, str) -> Show
|
|
if show_year:
|
|
show_year = str(show_year)
|
|
qualifiers = list(filter(None, [show_year, show_network, show_language, show_country, show_web_channel]))
|
|
if qualifiers:
|
|
qualifiers = [q.lower() for q in qualifiers if q]
|
|
show = self._get_show_with_qualifiers(show_name, qualifiers)
|
|
else:
|
|
return show_single_search(show=show_name, embed=embed)
|
|
if embed:
|
|
return show_main_info(maze_id=show.id, embed=embed)
|
|
else:
|
|
return show
|
|
|
|
# TVmaze Premium Endpoints
|
|
# NOT DONE OR TESTED
|
|
def get_followed_shows(self, embed=None):
|
|
# type: (str) -> List[FollowedShow]
|
|
if embed not in [None, 'show']:
|
|
raise InvalidEmbedValue('Value for embed must be "show" or None')
|
|
url = endpoints.followed_shows.format('/')
|
|
if embed == 'show':
|
|
url = endpoints.followed_shows.format('?embed=show')
|
|
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return [FollowedShow(show) for show in q]
|
|
else:
|
|
raise NoFollowedShows('You have not followed any shows yet')
|
|
|
|
def get_followed_show(self, maze_id):
|
|
# type: (int) -> FollowedShow
|
|
url = endpoints.followed_shows.format(f'/{maze_id}')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return FollowedShow(q)
|
|
else:
|
|
raise ShowNotFollowed(f'Show with ID {maze_id} is not followed')
|
|
|
|
def follow_show(self, maze_id):
|
|
# type: (int) -> None
|
|
url = endpoints.followed_shows.format(f'/{maze_id}')
|
|
q = self._endpoint_premium_put(url)
|
|
if not q:
|
|
raise ShowNotFound(f'Show with ID {maze_id} does not exist')
|
|
|
|
def unfollow_show(self, maze_id):
|
|
# type: (int) -> None
|
|
url = endpoints.followed_shows.format(f'/{maze_id}')
|
|
q = self._endpoint_premium_delete(url)
|
|
if not q:
|
|
raise ShowNotFollowed(f'Show with ID {maze_id} was not followed')
|
|
|
|
def get_followed_people(self, embed=None):
|
|
# type: (str) -> List[FollowedPerson]
|
|
if embed not in [None, 'person']:
|
|
raise InvalidEmbedValue('Value for embed must be "person" or None')
|
|
url = endpoints.followed_people.format('/')
|
|
if embed == 'person':
|
|
url = endpoints.followed_people.format('?embed=person')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return [FollowedPerson(person) for person in q]
|
|
else:
|
|
raise NoFollowedPeople('You have not followed any people yet')
|
|
|
|
def get_followed_person(self, person_id):
|
|
# type: (int) -> FollowedPerson
|
|
url = endpoints.followed_people.format(f'/{person_id}')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return FollowedPerson(q)
|
|
else:
|
|
raise PersonNotFound(f'Person with ID {person_id} is not followed')
|
|
|
|
def follow_person(self, person_id):
|
|
# type: (int) -> None
|
|
url = endpoints.followed_people.format(f'/{person_id}')
|
|
q = self._endpoint_premium_put(url)
|
|
if not q:
|
|
raise PersonNotFound(f'Person with ID {person_id} does not exist')
|
|
|
|
def unfollow_person(self, person_id):
|
|
# type: (int) -> None
|
|
url = endpoints.followed_people.format(f'/{person_id}')
|
|
q = self._endpoint_premium_delete(url)
|
|
if not q:
|
|
raise PersonNotFollowed(f'Person with ID {person_id} was not followed')
|
|
|
|
def get_followed_networks(self, embed=None):
|
|
# type: (str) -> List[FollowedNetwork]
|
|
if embed not in [None, 'network']:
|
|
raise InvalidEmbedValue('Value for embed must be "network" or None')
|
|
url = endpoints.followed_networks.format('/')
|
|
if embed == 'network':
|
|
url = endpoints.followed_networks.format('?embed=network')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return [FollowedNetwork(network) for network in q]
|
|
else:
|
|
raise NoFollowedNetworks('You have not followed any networks yet')
|
|
|
|
def get_followed_network(self, network_id):
|
|
# type: (int) -> FollowedNetwork
|
|
url = endpoints.followed_networks.format(f'/{network_id}')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return FollowedNetwork(q)
|
|
else:
|
|
raise NetworkNotFound(f'Network with ID {network_id} is not followed')
|
|
|
|
def follow_network(self, network_id):
|
|
# type: (int) -> None
|
|
url = endpoints.followed_networks.format(f'/{network_id}')
|
|
q = self._endpoint_premium_put(url)
|
|
if not q:
|
|
raise NetworkNotFound(f'Network with ID {network_id} does not exist')
|
|
|
|
def unfollow_network(self, network_id):
|
|
# type: (int) -> None
|
|
url = endpoints.followed_networks.format(f'/{network_id}')
|
|
q = self._endpoint_premium_delete(url)
|
|
if not q:
|
|
raise NetworkNotFollowed(f'Network with ID {network_id} was not followed')
|
|
|
|
def get_followed_web_channels(self, embed=None):
|
|
# type: (str) -> List[FollowedWebChannel]
|
|
if embed not in [None, 'webchannel']:
|
|
raise InvalidEmbedValue('Value for embed must be "webchannel" or None')
|
|
url = endpoints.followed_web_channels.format('/')
|
|
if embed == 'webchannel':
|
|
url = endpoints.followed_web_channels.format('?embed=webchannel')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return [FollowedWebChannel(webchannel) for webchannel in q]
|
|
else:
|
|
raise NoFollowedWebChannels('You have not followed any Web Channels yet')
|
|
|
|
def get_followed_web_channel(self, webchannel_id):
|
|
# type: (int) -> FollowedWebChannel
|
|
url = endpoints.followed_web_channels.format(f'/{webchannel_id}')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return FollowedWebChannel(q)
|
|
else:
|
|
raise NetworkNotFound('Web Channel with ID {} is not followed'.format(webchannel_id))
|
|
|
|
def follow_web_channel(self, webchannel_id):
|
|
# type: (int) -> None
|
|
url = endpoints.followed_web_channels.format(f'/{webchannel_id}')
|
|
q = self._endpoint_premium_put(url)
|
|
if not q:
|
|
raise WebChannelNotFound(f'Web Channel with ID {webchannel_id} does not exist')
|
|
|
|
def unfollow_web_channel(self, webchannel_id):
|
|
# type: (int) -> None
|
|
url = endpoints.followed_web_channels.format(f'/{webchannel_id}')
|
|
q = self._endpoint_premium_delete(url)
|
|
if not q:
|
|
raise WebChannelNotFollowed(f'Web Channel with ID {webchannel_id} was not followed')
|
|
|
|
def get_marked_episodes(self, maze_id=None):
|
|
# type: (int) -> List[MarkedEpisode]
|
|
if not maze_id:
|
|
url = endpoints.marked_episodes.format('/')
|
|
else:
|
|
show_id = '?show_id={}'.format(maze_id)
|
|
url = endpoints.marked_episodes.format(show_id)
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return [MarkedEpisode(episode) for episode in q]
|
|
else:
|
|
raise NoMarkedEpisodes('You have not marked any episodes yet')
|
|
|
|
def get_marked_episode(self, episode_id):
|
|
# type: (int) -> MarkedEpisode
|
|
url = endpoints.marked_episodes.format(f'/{episode_id}')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return MarkedEpisode(q)
|
|
else:
|
|
raise EpisodeNotMarked(f'Episode with ID {episode_id} is not marked')
|
|
|
|
def mark_episode(self, episode_id, mark_type):
|
|
# type: (int, str) -> None
|
|
types = {'watched': 0, 'acquired': 1, 'skipped': 2}
|
|
try:
|
|
status = types[mark_type]
|
|
except IndexError:
|
|
raise InvalidMarkedEpisodeType('Episode must be marked as "watched", "acquired", or "skipped"')
|
|
payload = {'type': str(status)}
|
|
url = endpoints.marked_episodes.format(f'/{episode_id}')
|
|
q = self._endpoint_premium_put(url, payload=payload)
|
|
if not q:
|
|
raise EpisodeNotFound(f'Episode with ID {episode_id} does not exist')
|
|
|
|
def unmark_episode(self, episode_id):
|
|
# type: (int) -> None
|
|
url = endpoints.marked_episodes.format(f'/{episode_id}')
|
|
q = self._endpoint_premium_delete(url)
|
|
if not q:
|
|
raise EpisodeNotMarked(f'Episode with ID {episode_id} was not marked')
|
|
|
|
def get_voted_shows(self, embed=None):
|
|
# type: (str) -> List[VotedShow]
|
|
if embed not in [None, 'show']:
|
|
raise InvalidEmbedValue('Value for embed must be "show" or None')
|
|
url = endpoints.voted_shows.format('/')
|
|
if embed == 'show':
|
|
url = endpoints.voted_shows.format('?embed=show')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return [VotedShow(show) for show in q]
|
|
else:
|
|
raise NoVotedShows('You have not voted for any shows yet')
|
|
|
|
def get_voted_show(self, maze_id):
|
|
# type: (int) -> VotedShow
|
|
url = endpoints.voted_shows.format(f'/{maze_id}')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return VotedShow(q)
|
|
else:
|
|
raise ShowNotVotedFor(f'Show with ID {maze_id} not voted for')
|
|
|
|
def remove_show_vote(self, maze_id):
|
|
# type: (int) -> None
|
|
url = endpoints.voted_shows.format(f'/{maze_id}')
|
|
q = self._endpoint_premium_delete(url)
|
|
if not q:
|
|
raise ShowNotVotedFor(f'Show with ID {maze_id} was not voted for')
|
|
|
|
def vote_show(self, maze_id, vote):
|
|
# type: (int, int) -> None
|
|
if not 1 <= vote <= 10:
|
|
raise InvalidVoteValue('Vote must be an integer between 1 and 10')
|
|
payload = {'vote': int(vote)}
|
|
url = endpoints.voted_shows.format(f'/{maze_id}')
|
|
q = self._endpoint_premium_put(url, payload=payload)
|
|
if not q:
|
|
raise ShowNotFound(f'Show with ID {maze_id} does not exist')
|
|
|
|
def get_voted_episodes(self):
|
|
# type: (...) -> List[VotedEpisode]
|
|
url = endpoints.voted_episodes.format('/')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return [VotedEpisode(episode) for episode in q]
|
|
else:
|
|
raise NoVotedEpisodes('You have not voted for any episodes yet')
|
|
|
|
def get_voted_episode(self, episode_id):
|
|
# type: (int) -> VotedEpisode
|
|
url = endpoints.voted_episodes.format(f'/{episode_id}')
|
|
q = self._endpoint_premium_get(url)
|
|
if q:
|
|
return VotedEpisode(q)
|
|
else:
|
|
raise EpisodeNotVotedFor(f'Episode with ID {episode_id} not voted for')
|
|
|
|
def remove_episode_vote(self, episode_id):
|
|
# type: (int) -> None
|
|
url = endpoints.voted_episodes.format(f'/{episode_id}')
|
|
q = self._endpoint_premium_delete(url)
|
|
if not q:
|
|
raise EpisodeNotVotedFor(f'Episode with ID {episode_id} was not voted for')
|
|
|
|
def vote_episode(self, episode_id, vote):
|
|
# type: (int, int) -> None
|
|
if not 1 <= vote <= 10:
|
|
raise InvalidVoteValue('Vote must be an integer between 1 and 10')
|
|
payload = {'vote': int(vote)}
|
|
url = endpoints.voted_episodes.format(f'/{episode_id}')
|
|
q = self._endpoint_premium_put(url, payload=payload)
|
|
if not q:
|
|
raise EpisodeNotFound(f'Episode with ID {episode_id} does not exist')
|
|
|
|
|
|
# Return list of Show objects
|
|
def get_show_list(show_name):
|
|
# type: (str) -> List[Show]
|
|
"""
|
|
Return list of Show objects from the TVmaze "Show Search" endpoint
|
|
|
|
List will be ordered by tvmaze score and should mimic the results you see
|
|
by doing a show search on the website.
|
|
:param show_name: Name of show
|
|
:return: List of Show(s)
|
|
"""
|
|
shows = show_search(show_name)
|
|
return shows
|
|
|
|
|
|
# Get list of Person objects
|
|
def get_people(name):
|
|
# type: (str) -> List[Person]
|
|
"""
|
|
Return list of Person objects from the TVmaze "People Search" endpoint
|
|
:param name: Name of person
|
|
:return: List of Person(s)
|
|
"""
|
|
people = people_search(name)
|
|
if people:
|
|
return people
|
|
|
|
|
|
def show_search(show):
|
|
# type: (str) -> List[Show]
|
|
_show = _url_quote(show)
|
|
url = endpoints.show_search.format(_show)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
shows = []
|
|
for result in q:
|
|
show = Show(result['show'])
|
|
show.score = result['score']
|
|
shows.append(show)
|
|
return shows
|
|
else:
|
|
raise ShowNotFound(f'Show {show} not found')
|
|
|
|
|
|
def show_single_search(show, embed=None):
|
|
# type: (str, str) -> Show
|
|
_show = _url_quote(show)
|
|
url = _embed_url(endpoints.show_single_search.format(_show), embed,
|
|
[None, 'episodes', 'cast', 'previousepisode', 'nextepisode'], '&')
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Show(q)
|
|
else:
|
|
raise ShowNotFound(f'show name "{show}" not found')
|
|
|
|
|
|
def lookup_tvrage(tvrage_id):
|
|
# type: (Union[int, str]) -> Show
|
|
url = endpoints.lookup_tvrage.format(tvrage_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Show(q)
|
|
else:
|
|
raise IDNotFound(f'TVRage id {tvrage_id} not found')
|
|
|
|
|
|
def lookup_tvdb(tvdb_id):
|
|
# type: (Union[int, str]) -> Show
|
|
url = endpoints.lookup_tvdb.format(tvdb_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Show(q)
|
|
else:
|
|
raise IDNotFound(f'TVDB ID {tvdb_id} not found')
|
|
|
|
|
|
def lookup_imdb(imdb_id):
|
|
# type: (str) -> Show
|
|
url = endpoints.lookup_imdb.format(imdb_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Show(q)
|
|
else:
|
|
raise IDNotFound(f'IMDB ID {imdb_id} not found')
|
|
|
|
|
|
def get_schedule(country='US', date=str(datetime.today().date())):
|
|
# type: (str, str) -> List[Episode]
|
|
url = endpoints.get_schedule.format(country, date)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [Episode(episode) for episode in q]
|
|
else:
|
|
raise ScheduleNotFound(f'Schedule for country {country} at date {date} not found')
|
|
|
|
|
|
# ALL known future episodes, several MB large, cached for 24 hours
|
|
def get_full_schedule():
|
|
# type: (...) -> List[Episode]
|
|
url = endpoints.get_full_schedule
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [Episode(episode) for episode in q]
|
|
else:
|
|
raise GeneralError('Something went wrong, www.tvmaze.com may be down')
|
|
|
|
|
|
def show_main_info(maze_id, embed=None):
|
|
# type: (int, str) -> Show
|
|
url = _embed_url(endpoints.show_main_info.format(maze_id), embed,
|
|
[None, 'episodes', 'cast', 'previousepisode', 'nextepisode', 'episodeswithspecials'], '?')
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Show(q)
|
|
else:
|
|
raise IDNotFound(f'Maze id {maze_id} not found')
|
|
|
|
|
|
def episode_list(maze_id, specials=None, raise_error=True, show=None):
|
|
# type: (int, bool, bool, Show) -> List[Episode]
|
|
if specials:
|
|
url = f'{endpoints.episode_list.format(maze_id)}?specials=1'
|
|
else:
|
|
url = endpoints.episode_list.format(maze_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if type(q) == list:
|
|
return [Episode(episode, show) for episode in q]
|
|
elif raise_error:
|
|
raise IDNotFound(f'Maze id {maze_id} not found'.format)
|
|
return []
|
|
|
|
|
|
def episode_by_number(maze_id, season_number, episode_number):
|
|
# type: (int, int, int) -> Episode
|
|
url = endpoints.episode_by_number.format(maze_id,
|
|
season_number,
|
|
episode_number)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Episode(q)
|
|
else:
|
|
raise EpisodeNotFound(f'Couldn\'t find season {season_number} episode {episode_number} for TVmaze ID {maze_id}')
|
|
|
|
|
|
def episodes_by_date(maze_id, airdate):
|
|
# type: (int, str) -> List[Episode]
|
|
try:
|
|
datetime.strptime(airdate, '%Y-%m-%d')
|
|
except ValueError:
|
|
raise IllegalAirDate('Airdate must be string formatted as \"YYYY-MM-DD\"')
|
|
url = endpoints.episodes_by_date.format(maze_id, airdate)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [Episode(episode) for episode in q]
|
|
else:
|
|
raise NoEpisodesForAirdate(f'Couldn\'t find an episode airing {airdate} for TVmaze ID {maze_id}')
|
|
|
|
|
|
def show_cast(maze_id, raise_error=True):
|
|
# type: (int, bool) -> Cast
|
|
url = endpoints.show_cast.format(maze_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Cast(q)
|
|
elif raise_error:
|
|
raise CastNotFound(f'Couldn\'nt find show cast for TVmaze ID {maze_id}')
|
|
return Cast({})
|
|
|
|
|
|
def show_index(page=1):
|
|
# type: (int) -> List[Show]
|
|
url = endpoints.show_index.format(page)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [Show(show) for show in q]
|
|
else:
|
|
raise ShowIndexError('Error getting show index, www.tvmaze.com may be down')
|
|
|
|
|
|
def people_search(person):
|
|
# type: (AnyStr) -> List[Person]
|
|
person = _url_quote(person)
|
|
url = endpoints.people_search.format(person)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [Person(person) for person in q]
|
|
else:
|
|
raise PersonNotFound(f'Couldn\'t find person {person}')
|
|
|
|
|
|
def person_main_info(person_id, embed=None):
|
|
# type: (int, AnyStr) -> Person
|
|
url = _embed_url(endpoints.person_main_info.format(person_id), embed,
|
|
[None, 'castcredits', 'crewcredits'], '?')
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Person(q)
|
|
else:
|
|
raise PersonNotFound(f'Couldn\'t find person {person_id}')
|
|
|
|
|
|
def person_cast_credits(person_id, embed=None, raise_error=True):
|
|
# type: (int, str, bool) -> List[CastCredit]
|
|
url = _embed_url(endpoints.person_cast_credits.format(person_id), embed,
|
|
[None, 'show', 'character'], '?')
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [CastCredit(credit) for credit in q]
|
|
elif raise_error:
|
|
raise CreditsNotFound(f'Couldn\'t find cast credits for person ID {person_id}')
|
|
return []
|
|
|
|
|
|
def person_guestcast_credits(person_id, embed=None, raise_error=True):
|
|
# type: (int, str, bool) -> List[CastCredit]
|
|
url = _embed_url(endpoints.person_guestcast_credits.format(person_id), embed,
|
|
[None, 'episode', 'character'], '?')
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [CastCredit(credit) for credit in q]
|
|
elif raise_error:
|
|
raise CreditsNotFound(f'Couldn\'t find cast credits for person ID {person_id}')
|
|
return []
|
|
|
|
|
|
def person_crew_credits(person_id, embed=None, raise_error=True):
|
|
# type: (int, str, bool) -> List[CrewCredit]
|
|
url = _embed_url(endpoints.person_crew_credits.format(person_id), embed,
|
|
[None, 'show'], '?')
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [CrewCredit(credit) for credit in q]
|
|
elif raise_error:
|
|
raise CreditsNotFound(f'Couldn\'t find crew credits for person ID {person_id}')
|
|
return []
|
|
|
|
|
|
def get_show_crew(maze_id, raise_error=True):
|
|
# type: (int, bool) -> List[Crew]
|
|
url = endpoints.show_crew.format(maze_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [Crew(crew) for crew in (isinstance(q, list) and q[:5]) or []]
|
|
elif raise_error:
|
|
raise CrewNotFound(f'Couldn\'t find crew for TVmaze ID {maze_id}')
|
|
return []
|
|
|
|
|
|
def get_show_images(maze_id, raise_error=True):
|
|
# type: (int, bool) -> List[Image]
|
|
url = endpoints.show_images.format(maze_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [Image(img) for img in q]
|
|
elif raise_error:
|
|
raise ShowImagesNotFound(f'Couldn\'t find images for TVmaze ID {maze_id}')
|
|
return []
|
|
|
|
|
|
def show_updates(since=None):
|
|
# type: (AnyStr) -> Updates
|
|
"""
|
|
returns all or in given timeframe changed shows
|
|
|
|
:param since: None, "day", "week", "month"
|
|
"""
|
|
if since not in ('day', 'week', 'month', None):
|
|
raise InvalidTimeFrame('Only supported are: None, "day", "week", "month"')
|
|
url = '%s%s' % (endpoints.show_updates, ('', '?since=%s' % since)[None is not since])
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Updates(q)
|
|
else:
|
|
raise ShowIndexError('Error getting show updates, www.tvmaze.com may be down')
|
|
|
|
|
|
def show_akas(maze_id, raise_error=True):
|
|
# type: (int, bool) -> List[AKA]
|
|
url = endpoints.show_akas.format(maze_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [AKA(aka) for aka in q]
|
|
elif raise_error:
|
|
raise AKASNotFound(f'Couldn\'t find AKA\'s for TVmaze ID {maze_id}')
|
|
return []
|
|
|
|
|
|
def show_seasons(maze_id, raise_error=True, show=None):
|
|
# type: (int, bool, Show) -> Dict[int, Season]
|
|
url = endpoints.show_seasons.format(maze_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
season_dict = dict()
|
|
for season in q:
|
|
season_dict[season['number']] = Season(season, show=show, season_number=season['number'])
|
|
return season_dict
|
|
elif raise_error:
|
|
raise SeasonNotFound(f'Couldn\'t find Season\'s for TVmaze ID {maze_id}')
|
|
return {}
|
|
|
|
|
|
def season_by_id(season_id, embed=None):
|
|
# type: (int, str) -> Season
|
|
url = _embed_url(endpoints.season_by_id.format(season_id), embed,
|
|
[None, 'episodes'], '?')
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Season(q)
|
|
else:
|
|
raise SeasonNotFound(f'Couldn\'t find Season with ID {season_id}')
|
|
|
|
|
|
def episode_by_id(episode_id, show=None, raise_error=True, embed=None):
|
|
# type: (int, Show, bool, str) -> Episode
|
|
url = _embed_url(endpoints.episode_by_id.format(episode_id), embed,
|
|
[None, 'show', 'guestcast', 'guestcrew'], '?')
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return Episode(q, show=show)
|
|
elif raise_error:
|
|
raise EpisodeNotFound(f'Couldn\'t find Episode with ID {episode_id}')
|
|
|
|
|
|
def episode_guestcast_credits(episode_id, raise_error=True):
|
|
# type: (int, bool) -> List[CastCredit]
|
|
url = endpoints.episode_guestcast.format(episode_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [CastCredit(credit) for credit in q]
|
|
elif raise_error:
|
|
raise CreditsNotFound(f'Couldn\'t find cast credits for episode ID {episode_id}')
|
|
return []
|
|
|
|
|
|
def episode_crew_credits(episode_id, raise_error=True):
|
|
# type: (int, bool) -> List[CrewCredit]
|
|
url = endpoints.episode_guestcrew.format(episode_id)
|
|
q = TVmaze.endpoint_standard_get(url)
|
|
if q:
|
|
return [CrewCredit(credit) for credit in q]
|
|
elif raise_error:
|
|
raise CreditsNotFound(f'Couldn\'t find crew credits for episode ID {episode_id}')
|
|
return []
|