mirror of
https://github.com/SickGear/SickGear.git
synced 2025-01-24 10:13:43 +00:00
257 lines
11 KiB
Python
257 lines
11 KiB
Python
# encoding:utf-8
|
|
# author:Prinz23
|
|
# project:imdb_api
|
|
|
|
__author__ = 'Prinz23'
|
|
__version__ = '1.0'
|
|
__api_version__ = '1.0.0'
|
|
|
|
import logging
|
|
import re
|
|
|
|
# from .imdb_exceptions import *
|
|
from bs4_parser import BS4Parser
|
|
from exceptions_helper import ex
|
|
from lib import imdbpie
|
|
from lib.dateutil.parser import parser
|
|
# from lib.tvinfo_base.exceptions import BaseTVinfoShownotfound
|
|
from lib.tvinfo_base import (
|
|
TVInfoCharacter, TVInfoPerson, PersonGenders, TVINFO_IMDB,
|
|
# TVINFO_FACEBOOK, TVINFO_INSTAGRAM, TVINFO_TMDB, TVINFO_TRAKT,
|
|
# TVINFO_TVDB, TVINFO_TVRAGE, TVINFO_X, TVINFO_WIKIPEDIA,
|
|
TVInfoBase, TVInfoIDs, TVInfoShow)
|
|
from sg_helpers import clean_data, enforce_type, get_url, try_int
|
|
from json_helper import json_loads
|
|
|
|
from six import iteritems
|
|
from six.moves import http_client as httplib
|
|
from six.moves.urllib.parse import urlencode, urljoin, quote, unquote
|
|
|
|
|
|
# noinspection PyUnreachableCode
|
|
if False:
|
|
from typing import Any, AnyStr, Dict, List, Optional
|
|
from six import integer_types
|
|
|
|
tz_p = parser()
|
|
log = logging.getLogger('imdb.api')
|
|
log.addHandler(logging.NullHandler())
|
|
|
|
|
|
def _get_imdb(self, url, query=None, params=None):
|
|
headers = {'Accept-Language': self.locale}
|
|
if params:
|
|
full_url = '{0}?{1}'.format(url, urlencode(params))
|
|
else:
|
|
full_url = url
|
|
headers.update(self.get_auth_headers(full_url))
|
|
resp = get_url(url, headers=headers, params=params, return_response=True)
|
|
|
|
if not resp.ok:
|
|
if resp.status_code == httplib.NOT_FOUND:
|
|
raise LookupError('Resource {0} not found'.format(url))
|
|
else:
|
|
msg = '{0} {1}'.format(resp.status_code, resp.text)
|
|
raise imdbpie.ImdbAPIError(msg)
|
|
resp_data = resp.content.decode('utf-8')
|
|
try:
|
|
resp_dict = json_loads(resp_data)
|
|
except ValueError:
|
|
resp_dict = self._parse_dirty_json(
|
|
data=resp_data, query=query
|
|
)
|
|
|
|
if resp_dict.get('error'):
|
|
return None
|
|
return resp_dict
|
|
|
|
|
|
imdbpie.Imdb._get = _get_imdb
|
|
|
|
|
|
class IMDbIndexer(TVInfoBase):
|
|
# supported_id_searches = [TVINFO_IMDB]
|
|
supported_person_id_searches = [TVINFO_IMDB]
|
|
supported_id_searches = [TVINFO_IMDB]
|
|
|
|
# noinspection PyUnusedLocal
|
|
# noinspection PyDefaultArgument
|
|
def __init__(self, *args, **kwargs):
|
|
super(IMDbIndexer, self).__init__(*args, **kwargs)
|
|
|
|
def search(self, series):
|
|
# type: (AnyStr) -> List
|
|
"""This searches for the series name
|
|
and returns the result list
|
|
"""
|
|
result = []
|
|
cache_name_key = 's-title-%s' % series
|
|
is_none, shows = self._get_cache_entry(cache_name_key)
|
|
if not self.config.get('cache_search') or (None is shows and not is_none):
|
|
try:
|
|
result = imdbpie.Imdb().search_for_title(series)
|
|
except (BaseException, Exception):
|
|
pass
|
|
self._set_cache_entry(cache_name_key, result, expire=self.search_cache_expire)
|
|
else:
|
|
result = shows
|
|
return result
|
|
|
|
def _search_show(self, name=None, ids=None, **kwargs):
|
|
# type: (AnyStr, Dict[integer_types, integer_types], Optional[Any]) -> List[TVInfoShow]
|
|
"""This searches IMDB for the series name,
|
|
"""
|
|
def _make_result_dict(s):
|
|
imdb_id = try_int(re.search(r'tt(\d+)', s.get('id') or s.get('imdb_id')).group(1), None)
|
|
ti_show = TVInfoShow()
|
|
ti_show.seriesname, ti_show.id, ti_show.firstaired, ti_show.genre_list, ti_show.overview, \
|
|
ti_show.poster, ti_show.ids = \
|
|
clean_data(s['title']), imdb_id, s.get('releaseDetails', {}).get('date') or s.get('year'), \
|
|
s.get('genres'), enforce_type(clean_data(s.get('plot', {}).get('outline', {}).get('text')), str, ''), \
|
|
s.get('image') and s['image'].get('url'), TVInfoIDs(imdb=imdb_id)
|
|
return ti_show
|
|
|
|
results = []
|
|
if ids:
|
|
for t, p in iteritems(ids):
|
|
if t in self.supported_id_searches:
|
|
if t == TVINFO_IMDB:
|
|
cache_id_key = 's-id-%s-%s' % (TVINFO_IMDB, p)
|
|
is_none, shows = self._get_cache_entry(cache_id_key)
|
|
if not self.config.get('cache_search') or (None is shows and not is_none):
|
|
try:
|
|
show = imdbpie.Imdb().get_title_auxiliary('tt%07d' % p)
|
|
except (BaseException, Exception):
|
|
continue
|
|
self._set_cache_entry(cache_id_key, show, expire=self.search_cache_expire)
|
|
else:
|
|
show = shows
|
|
if show:
|
|
results.extend([_make_result_dict(show)])
|
|
if name:
|
|
for n in ([name], name)[isinstance(name, list)]:
|
|
try:
|
|
shows = self.search(n)
|
|
results.extend([_make_result_dict(s) for s in shows])
|
|
except (BaseException, Exception) as e:
|
|
log.debug('Error searching for show: %s' % ex(e))
|
|
seen = set()
|
|
results = [seen.add(r.id) or r for r in results if r.id not in seen]
|
|
return results
|
|
|
|
@staticmethod
|
|
def _convert_person(person_obj, filmography=None, bio=None):
|
|
if isinstance(person_obj, dict) and 'imdb_id' in person_obj:
|
|
imdb_id = try_int(re.search(r'(\d+)', person_obj['imdb_id']).group(1))
|
|
return TVInfoPerson(p_id=imdb_id, name=person_obj['name'], ids=TVInfoIDs(ids={TVINFO_IMDB: imdb_id}))
|
|
characters = []
|
|
for known_for in (filmography and filmography['filmography']) or []:
|
|
if known_for['titleType'] not in ('tvSeries', 'tvMiniSeries'):
|
|
continue
|
|
for character in known_for.get('characters') or ['unknown name']:
|
|
ti_show = TVInfoShow()
|
|
ti_show.id = try_int(re.search(r'(\d+)', known_for.get('id')).group(1))
|
|
ti_show.ids.imdb = ti_show.id
|
|
ti_show.seriesname = known_for.get('title')
|
|
ti_show.firstaired = known_for.get('year')
|
|
characters.append(
|
|
TVInfoCharacter(name=character, ti_show=ti_show, start_year=known_for.get('startYear'),
|
|
end_year=known_for.get('endYear'))
|
|
)
|
|
try:
|
|
birthdate = person_obj['base']['birthDate'] and tz_p.parse(person_obj['base']['birthDate']).date()
|
|
except (BaseException, Exception):
|
|
birthdate = None
|
|
try:
|
|
deathdate = person_obj['base']['deathDate'] and tz_p.parse(person_obj['base']['deathDate']).date()
|
|
except (BaseException, Exception):
|
|
deathdate = None
|
|
imdb_id = try_int(re.search(r'(\d+)', person_obj['id']).group(1))
|
|
return TVInfoPerson(
|
|
p_id=imdb_id, ids=TVInfoIDs(ids={TVINFO_IMDB: imdb_id}), characters=characters,
|
|
name=person_obj['base'].get('name'), real_name=person_obj['base'].get('realName'),
|
|
nicknames=set((person_obj['base'].get('nicknames') and person_obj['base'].get('nicknames')) or []),
|
|
akas=set((person_obj['base'].get('akas') and person_obj['base'].get('akas')) or []),
|
|
bio=bio, gender=PersonGenders.imdb_map.get(person_obj['base'].get('gender'), PersonGenders.unknown),
|
|
image=person_obj['base'].get('image', {}).get('url'),
|
|
birthdate=birthdate, birthplace=person_obj['base'].get('birthPlace'),
|
|
deathdate=deathdate, deathplace=person_obj['base'].get('deathPlace'),
|
|
height=person_obj['base'].get('heightCentimeters')
|
|
)
|
|
|
|
def _search_person(self, name=None, ids=None):
|
|
# type: (AnyStr, Dict[integer_types, integer_types]) -> List[TVInfoPerson]
|
|
"""
|
|
search for person by name
|
|
:param name: name to search for
|
|
:param ids: dict of ids to search
|
|
:return: list of found person's
|
|
"""
|
|
results, ids = [], ids or {}
|
|
for tv_src in self.supported_person_id_searches:
|
|
if tv_src in ids:
|
|
if TVINFO_IMDB == tv_src:
|
|
try:
|
|
p = self.get_person(ids[tv_src])
|
|
except (BaseException, Exception):
|
|
p = None
|
|
if p:
|
|
results.append(p)
|
|
if name:
|
|
cache_name_key = 'p-name-%s' % name
|
|
is_none, ps = self._get_cache_entry(cache_name_key)
|
|
if None is ps and not is_none:
|
|
try:
|
|
ps = imdbpie.Imdb().search_for_name(name)
|
|
except (BaseException, Exception):
|
|
ps = None
|
|
self._set_cache_entry(cache_name_key, ps)
|
|
if ps:
|
|
for cp in ps:
|
|
if not any(1 for c in results if cp['imdb_id'] == 'nm%07d' % c.id):
|
|
results.append(self._convert_person(cp))
|
|
return results
|
|
|
|
@staticmethod
|
|
def _get_bio(p_id):
|
|
try:
|
|
bio = get_url('https://www.imdb.com/name/nm%07d/bio' % p_id, headers={'Accept-Language': 'en'})
|
|
if not bio:
|
|
return
|
|
with BS4Parser(bio) as bio_item:
|
|
bv = bio_item.find('div', attrs={'data-testid': re.compile('mini_bio$')}, recursive=True)
|
|
for a in bv.findAll('a'):
|
|
a.replaceWithChildren()
|
|
for b in bv.findAll('br'):
|
|
b.replaceWith('\n')
|
|
return bv.get_text().strip()
|
|
except (BaseException, Exception):
|
|
return
|
|
|
|
def get_person(self, p_id, get_show_credits=False, get_images=False, **kwargs):
|
|
# type: (integer_types, bool, bool, Any) -> Optional[TVInfoPerson]
|
|
if not p_id:
|
|
return
|
|
cache_main_key, cache_bio_key, cache_credits_key = 'p-main-%s' % p_id, 'p-bio-%s' % p_id, 'p-credits-%s' % p_id
|
|
is_none, p = self._get_cache_entry(cache_main_key)
|
|
if None is p and not is_none:
|
|
try:
|
|
p = imdbpie.Imdb().get_name(imdb_id='nm%07d' % p_id)
|
|
except (BaseException, Exception):
|
|
p = None
|
|
self._set_cache_entry(cache_main_key, p)
|
|
is_none, bio = self._get_cache_entry(cache_bio_key)
|
|
if None is bio and not is_none:
|
|
bio = self._get_bio(p_id)
|
|
self._set_cache_entry(cache_bio_key, bio)
|
|
fg = None
|
|
if get_show_credits:
|
|
is_none, fg = self._get_cache_entry(cache_credits_key)
|
|
if None is fg and not is_none:
|
|
try:
|
|
fg = imdbpie.Imdb().get_name_filmography(imdb_id='nm%07d' % p_id)
|
|
except (BaseException, Exception):
|
|
fg = None
|
|
self._set_cache_entry(cache_credits_key, fg)
|
|
if p:
|
|
return self._convert_person(p, filmography=fg, bio=bio)
|