mirror of
https://github.com/SickGear/SickGear.git
synced 2024-12-01 17:03:38 +00:00
386c066ed9
Fix FST provider exception raised when no title
341 lines
13 KiB
Python
341 lines
13 KiB
Python
#
|
|
# This file is part of SickGear.
|
|
#
|
|
# SickGear is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# SickGear is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with SickGear. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
import difflib
|
|
import re
|
|
import time
|
|
import traceback
|
|
|
|
import sickgear
|
|
from . import generic
|
|
from .. import classes, logger, show_name_helpers, tvcache
|
|
from ..classes import NZBDataSearchResult
|
|
from ..common import NeededQualities
|
|
from ..tv import TVEpisode
|
|
|
|
from bs4_parser import BS4Parser
|
|
|
|
from six import iteritems
|
|
|
|
# noinspection PyUnreachableCode
|
|
if False:
|
|
from typing import Any, AnyStr, Dict, List, Optional
|
|
|
|
|
|
class FSTProvider(generic.NZBProvider):
|
|
|
|
def __init__(self):
|
|
generic.NZBProvider.__init__(self, 'FileSharingTalk')
|
|
|
|
self.url_base = 'https://filesharingtalk.com/' # type: AnyStr
|
|
self.urls = {'config_provider_home_uri': self.url_base,
|
|
'cache': self.url_base + 'nzbs/tv/%s?sort=age&order=desc',
|
|
'search_init': self.url_base + 'search.php?search_type=1#ads=15',
|
|
'search': self.url_base + 'search.php?do=process'} # type: Dict[AnyStr, AnyStr]
|
|
self.url = self.urls['config_provider_home_uri']
|
|
|
|
self.digest = None
|
|
self.cache = FSTCache(self)
|
|
|
|
cat_sd = ['dvdr', 'xvid', 'x264sd', 'misc']
|
|
cat_hd = ['x264720', 'x2641080', 'webdl720', 'misc']
|
|
|
|
@staticmethod
|
|
def _get_cats(needed):
|
|
"""
|
|
|
|
:param needed: needed class
|
|
:type needed: NeededQualities
|
|
:return:
|
|
:rtype: List
|
|
"""
|
|
cats = []
|
|
if needed.need_sd:
|
|
cats.extend(FSTProvider.cat_sd)
|
|
if needed.need_hd:
|
|
cats.extend(FSTProvider.cat_hd)
|
|
return list(set(cats))
|
|
|
|
def _init_cookies(self):
|
|
# type: (...) -> Optional[bool]
|
|
"""
|
|
:return: False if success with no issues, or None if failure to init
|
|
"""
|
|
if not self.should_skip():
|
|
self.cookies = self.digest
|
|
success, msg = self._check_cookie()
|
|
if success:
|
|
return False
|
|
logger.warning(f'{msg}: {self.cookies}')
|
|
|
|
self.cookies = None
|
|
return None
|
|
|
|
def _search_provider(self, search, search_mode='eponly', needed=NeededQualities(need_all=True), **kwargs):
|
|
# type: (AnyStr, AnyStr, NeededQualities, Any) -> List
|
|
"""
|
|
:param search:
|
|
:param search_mode:
|
|
:param needed:needed class
|
|
:param kwargs:
|
|
"""
|
|
self._init_cookies()
|
|
results = []
|
|
if None is getattr(self, 'cookies', None):
|
|
return results
|
|
|
|
cats = self._get_cats(needed=needed)
|
|
if not cats:
|
|
return results
|
|
|
|
rc = dict((k, re.compile('(?i)' + v)) for (k, v) in iteritems(dict(
|
|
cat='(?:%s)' % '|'.join(cats), results='(?:collections|searchbits)')))
|
|
mode = ('search', 'cache')['' == search]
|
|
post_data = None
|
|
if 'cache' == mode:
|
|
pages = ['', 'page2']
|
|
else:
|
|
html = self.get_url(self.urls['search_init'])
|
|
try:
|
|
token = re.findall(r'(?i)token["\s]+[^"]+"([0-9a-f-]+)"', html)[0]
|
|
except(BaseException, Exception):
|
|
token = None
|
|
if not token:
|
|
logger.warning('Failed to parse an initial search token')
|
|
pages = []
|
|
else:
|
|
post_data = {'ngsubcategory[]': [16, 17, 53, 22, 23, 51, 49, 24]}
|
|
post_data.update(dict(
|
|
query='%s' % search, securitytoken='%s' % token, dosearch='Search+Now', saveprefs=0, searchdate=0,
|
|
searchuser='', s='', sortby='dateline', order='descending', beforeafter='after', overridesearch=1,
|
|
searchfromtype='fstNZB:Collection', contenttypeid='', do='process'))
|
|
pages = ['']
|
|
|
|
for cur_page in pages:
|
|
cnt = len(results)
|
|
search_url = self.urls[mode]
|
|
if 'cache' == mode:
|
|
search_url = search_url % cur_page
|
|
|
|
html = self.get_url(search_url, post_data=post_data)
|
|
if self.should_skip():
|
|
return results
|
|
|
|
try:
|
|
if not html:
|
|
raise generic.HaltParseException
|
|
|
|
with BS4Parser(html, parse_only={'ol': {'id': rc['results']}}) as soup: # 'collections'
|
|
tbl_rows = [] if not soup else soup.find_all('li', class_='collectionbit')
|
|
|
|
if 1 > len(tbl_rows):
|
|
raise generic.HaltParseException
|
|
|
|
for tr in tbl_rows:
|
|
title = None
|
|
try:
|
|
if tr.find('img', class_=rc['cat']):
|
|
title = tr['data-title'].strip()
|
|
age = tr.find(class_='binaryage').find('dd').get_text(strip=True).lower()
|
|
age_value, age_dim = age.split()
|
|
rls_dt = None
|
|
age_arg = 'hours' if 'hour' in age_dim else 'days' if 'day' in age_dim else None
|
|
if age_arg:
|
|
rls_dt = datetime.now(timezone.utc) - timedelta(**{age_arg: float(age_value)})
|
|
info_url = self._link(tr['data-url'].strip())
|
|
except (AttributeError, TypeError, ValueError):
|
|
continue
|
|
|
|
if title and info_url and rls_dt:
|
|
results.append({'title': title, 'link': info_url, 'release_dt': rls_dt})
|
|
|
|
except generic.HaltParseException:
|
|
time.sleep(1.1)
|
|
pass
|
|
except (BaseException, Exception):
|
|
logger.error(f'Failed to parse. Traceback: {traceback.format_exc()}')
|
|
|
|
self._log_search((mode, search_mode)['Propers' == search_mode], len(results) - cnt, search_url)
|
|
return results
|
|
|
|
def find_propers(self, **kwargs):
|
|
"""
|
|
|
|
:param kwargs:
|
|
:return:
|
|
:rtype: List[classes.Proper]
|
|
"""
|
|
results = []
|
|
if not self.should_skip():
|
|
|
|
search_terms = ['.PROPER.', '.REPACK.', '.REAL.']
|
|
for term in search_terms:
|
|
for item in self._search_provider(term, search_mode='Propers'):
|
|
title, url = self._title_and_url(item)
|
|
results.append(classes.Proper(title, url, item['release_dt'], self.show_obj))
|
|
|
|
return results
|
|
|
|
@staticmethod
|
|
def common_string(files):
|
|
# type: (List) -> Optional[AnyStr]
|
|
""" find a string common to many strings
|
|
e.g 1) 123.rar 2) 123.par2 3) 123.nfo returns `123`
|
|
|
|
:param files: list of strings
|
|
:return: string common to those in list or None
|
|
"""
|
|
|
|
result = None
|
|
|
|
def __matcher(_s1, _s2):
|
|
sequencer = difflib.SequenceMatcher(None, _s1, _s2)
|
|
pos_a, pos_b, size = max(sequencer.get_matching_blocks(), key=lambda _x: _x[2])
|
|
# noinspection PyUnresolvedReferences
|
|
return sequencer.a[pos_a:pos_a + size]
|
|
|
|
base_names = set()
|
|
# 1st pass, get candidates of common part of name
|
|
s1 = files[0]
|
|
for s2 in files[1:]:
|
|
s1 = __matcher(s1, s2)
|
|
base_names.add(s1)
|
|
|
|
# 2nd pass, finds base name
|
|
files2nd = sorted(list(base_names), key=len)
|
|
s1 = files2nd[0]
|
|
for s2 in files2nd[1:]:
|
|
s1 = __matcher(s1, s2)
|
|
if '.' == s1[-1]:
|
|
result = s1[0:-1]
|
|
break
|
|
|
|
return result
|
|
|
|
def get_data(self, url):
|
|
"""
|
|
:param url: url
|
|
:type url: AnyStr
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
result = None
|
|
if url and False is self._init_cookies():
|
|
html = self.get_url(url, timeout=90)
|
|
if not self.should_skip() and html:
|
|
try:
|
|
collection = int(url.rpartition('/')[-1].split('-')[0])
|
|
except(BaseException, Exception):
|
|
collection = None
|
|
|
|
if collection:
|
|
with BS4Parser(html, parse_only={'div': {'id': 'binaryeditor'}}) as soup:
|
|
nzb_rows = [] if not soup else soup.find_all('li', {'data-collectionid': '%s' % collection})
|
|
try:
|
|
files = sorted([_x.find(class_='subject').find('dd').get_text(strip=True)
|
|
for _x in nzb_rows], key=len, reverse=True)
|
|
except(BaseException, Exception):
|
|
files = []
|
|
|
|
if len(files):
|
|
base_name = self.common_string(files)
|
|
if base_name:
|
|
base_url = 'https://nzbindex.nl/'
|
|
# uncomment the following into use if required.
|
|
# init_url = base_url + 'search/?q=%s' % base_name
|
|
# html = self.get_url(init_url)
|
|
# try:
|
|
# action = re.findall(r'action="([^"]+)"', html)[0].lstrip('/')
|
|
# except(BaseException, Exception):
|
|
# action = None
|
|
# if action:
|
|
# # get a session disclaimer cookie
|
|
# self.get_url(base_url + action, post_data={'_method': 'POST'})
|
|
#
|
|
# if 'disclaimer' in self.session.cookies:
|
|
# all the following to be indented +1 if above is uncommented into use
|
|
json = self.get_url(base_url + 'search/json?q=%s' % base_name, parse_json=True,
|
|
params=dict(max=100, minage=0, maxage=0, sort='agedesc',
|
|
hidespam=1, hidepassword=0, minsize=0, maxsize=0,
|
|
complete=0, hidecross=0, hasNFO=0, poster='', p=0))
|
|
|
|
ids = []
|
|
idx_eq_fst = True
|
|
fn_reg = re.compile(r'[^"]+"([^"]+).*')
|
|
for cur_result in json['results']:
|
|
ids += [cur_result['id']]
|
|
# check indexer files match FST files
|
|
idx_eq_fst = idx_eq_fst and fn_reg.sub(r'\1', cur_result['name']) in files
|
|
|
|
if idx_eq_fst:
|
|
nzb = '%s.nzb' % base_name
|
|
response = self.get_url(base_url + 'download/' + nzb, post_data={'n': nzb, 'r[]': ids})
|
|
|
|
if '</nzb>' not in response:
|
|
logger.debug('Failed nzb data response: %s' % response)
|
|
else:
|
|
result = response
|
|
return result
|
|
|
|
def get_result(self, ep_obj_list, url):
|
|
# type: (List[TVEpisode], AnyStr) -> Optional[NZBDataSearchResult]
|
|
"""
|
|
|
|
:param ep_obj_list: list of episode objects
|
|
:param url: url
|
|
"""
|
|
result = classes.NZBDataSearchResult(ep_obj_list)
|
|
result.get_data_func = self.get_data
|
|
result.url = url
|
|
result.provider = self
|
|
return result
|
|
|
|
def _season_strings(self, ep_obj):
|
|
"""
|
|
|
|
:param ep_obj: episode object
|
|
:type ep_obj: sickgear.tv.TVEpisode
|
|
:return: list of search strings
|
|
:rtype: List[AnyStr]
|
|
"""
|
|
return [x for x in show_name_helpers.make_scene_season_search_string(self.show_obj, ep_obj)]
|
|
|
|
def _episode_strings(self, ep_obj):
|
|
"""
|
|
|
|
:param ep_obj: episode object
|
|
:type ep_obj: sickgear.tv.TVEpisode
|
|
:return: list of search strings
|
|
:rtype: List[AnyStr]
|
|
"""
|
|
return [x for x in show_name_helpers.make_scene_search_string(self.show_obj, ep_obj)]
|
|
|
|
@staticmethod
|
|
def ui_string(key=None):
|
|
return 'filesharingtalk_digest' == key and 'use... \'bb_userid=xx; bb_password=yy\'' or ''
|
|
|
|
|
|
class FSTCache(tvcache.TVCache):
|
|
def __init__(self, this_provider):
|
|
tvcache.TVCache.__init__(self, this_provider)
|
|
|
|
def _cache_data(self, **kwargs):
|
|
# noinspection PyProtectedMember
|
|
return self.provider._search_provider('', **kwargs)
|
|
|
|
|
|
provider = FSTProvider()
|