mirror of
https://github.com/SickGear/SickGear.git
synced 2025-01-07 02:23:38 +00:00
273 lines
12 KiB
Python
273 lines
12 KiB
Python
|
#
|
||
|
# This file is part of SickGear.
|
||
|
#
|
||
|
# SickGear is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# SickGear is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU General Public License
|
||
|
# along with SickGear. If not, see <http://www.gnu.org/licenses/>.
|
||
|
|
||
|
import re
|
||
|
|
||
|
from .generic import Notifier
|
||
|
import sickgear
|
||
|
from encodingKludge import fixStupidEncodings
|
||
|
from exceptions_helper import ex
|
||
|
|
||
|
from _23 import b64encodestring, decode_str, etree, filter_iter, list_values, unquote_plus, urlencode
|
||
|
from six import iteritems, text_type, PY2
|
||
|
# noinspection PyUnresolvedReferences
|
||
|
from six.moves import urllib
|
||
|
|
||
|
|
||
|
class PLEXNotifier(Notifier):
|
||
|
|
||
|
def __init__(self):
|
||
|
super(PLEXNotifier, self).__init__()
|
||
|
|
||
|
def _send_to_plex(self, command, host, username=None, password=None):
|
||
|
"""Handles communication to Plex hosts via HTTP API
|
||
|
|
||
|
Args:
|
||
|
command: Dictionary of field/data pairs, encoded via urllib and passed to the legacy xbmcCmds HTTP API
|
||
|
host: Plex host:port
|
||
|
username: Plex API username
|
||
|
password: Plex API password
|
||
|
|
||
|
Returns:
|
||
|
Returns True for successful commands or False if there was an error
|
||
|
|
||
|
"""
|
||
|
if not host:
|
||
|
self._log_error(u'No host specified, check your settings')
|
||
|
return False
|
||
|
|
||
|
for key in command:
|
||
|
if not PY2 or type(command[key]) == text_type:
|
||
|
command[key] = command[key].encode('utf-8')
|
||
|
|
||
|
enc_command = urlencode(command)
|
||
|
self._log_debug(u'Encoded API command: ' + enc_command)
|
||
|
|
||
|
url = 'http://%s/xbmcCmds/xbmcHttp/?%s' % (host, enc_command)
|
||
|
try:
|
||
|
req = urllib.request.Request(url)
|
||
|
if password:
|
||
|
req.add_header('Authorization', 'Basic %s' % b64encodestring('%s:%s' % (username, password)))
|
||
|
self._log_debug(u'Contacting (with auth header) via url: ' + url)
|
||
|
else:
|
||
|
self._log_debug(u'Contacting via url: ' + url)
|
||
|
|
||
|
http_response_obj = urllib.request.urlopen(req) # PY2 http_response_obj has no `with` context manager
|
||
|
result = decode_str(http_response_obj.read(), sickgear.SYS_ENCODING)
|
||
|
http_response_obj.close()
|
||
|
|
||
|
self._log_debug(u'HTTP response: ' + result.replace('\n', ''))
|
||
|
return True
|
||
|
|
||
|
except (urllib.error.URLError, IOError) as e:
|
||
|
self._log_warning(u'Couldn\'t contact Plex at ' + fixStupidEncodings(url) + ' ' + ex(e))
|
||
|
return False
|
||
|
|
||
|
@staticmethod
|
||
|
def _get_host_list(host='', enable_secure=False):
|
||
|
"""
|
||
|
Return a list of hosts from a host CSV string
|
||
|
"""
|
||
|
host_list = []
|
||
|
|
||
|
user_list = [x.strip().lower() for x in host.split(',')]
|
||
|
for cur_host in user_list:
|
||
|
if cur_host.startswith('https://'):
|
||
|
host_list += ([], [cur_host])[enable_secure]
|
||
|
else:
|
||
|
host_list += ([], ['https://%s' % cur_host])[enable_secure] + ['http://%s' % cur_host]
|
||
|
|
||
|
return host_list
|
||
|
|
||
|
def _notify(self, title, body, host=None, username=None, password=None, **kwargs):
|
||
|
"""Internal wrapper for the notify_snatch and notify_download functions
|
||
|
|
||
|
Args:
|
||
|
title: Title of the notice to send
|
||
|
body: Message body of the notice to send
|
||
|
host: Plex Media Client(s) host:port
|
||
|
username: Plex username
|
||
|
password: Plex password
|
||
|
|
||
|
Returns:
|
||
|
Returns a test result string for ui output while testing, otherwise True if all tests are a success
|
||
|
"""
|
||
|
host = self._choose(host, sickgear.PLEX_HOST)
|
||
|
username = self._choose(username, sickgear.PLEX_USERNAME)
|
||
|
password = self._choose(password, sickgear.PLEX_PASSWORD)
|
||
|
|
||
|
command = {'command': 'ExecBuiltIn',
|
||
|
'parameter': 'Notification(%s,%s)' % (title.encode('utf-8'), body.encode('utf-8'))}
|
||
|
|
||
|
results = []
|
||
|
for cur_host in [x.strip() for x in host.split(',')]:
|
||
|
cur_host = unquote_plus(cur_host)
|
||
|
self._log(u'Sending notification to \'%s\'' % cur_host)
|
||
|
result = self._send_to_plex(command, cur_host, username, password)
|
||
|
results += [self._choose(('%s Plex client ... %s' % (('Successful test notice sent to',
|
||
|
'Failed test for')[not result], cur_host)), result)]
|
||
|
|
||
|
return self._choose('<br>\n'.join(results), all(results))
|
||
|
|
||
|
##############################################################################
|
||
|
# Public functions
|
||
|
##############################################################################
|
||
|
|
||
|
def notify_git_update(self, new_version='??', **kwargs):
|
||
|
# ensure PMS is setup, this is not for when clients are
|
||
|
if sickgear.PLEX_HOST:
|
||
|
super(PLEXNotifier, self).notify_git_update(new_version, **kwargs)
|
||
|
|
||
|
def test_update_library(self, host=None, username=None, password=None):
|
||
|
self._testing = True
|
||
|
result = self.update_library(host=unquote_plus(host), username=username, password=password)
|
||
|
if '<br>' == result:
|
||
|
result += 'Fail: No valid host set to connect with'
|
||
|
return (('Test result for', 'Successful test of')['Fail' not in result]
|
||
|
+ ' Plex server(s) ... %s<br>\n' % result)
|
||
|
|
||
|
def update_library(self, ep_obj=None, host=None, username=None, password=None, location=None, **kwargs):
|
||
|
"""Handles updating the Plex Media Server host via HTTP API
|
||
|
|
||
|
Plex Media Server currently only supports updating the whole video library and not a specific path.
|
||
|
|
||
|
Returns:
|
||
|
Returns None for no issue, else a string of host with connection issues
|
||
|
|
||
|
"""
|
||
|
host = self._choose(host, sickgear.PLEX_SERVER_HOST)
|
||
|
if not host:
|
||
|
msg = u'No Plex Media Server host specified, check your settings'
|
||
|
self._log_debug(msg)
|
||
|
return '%sFail: %s' % (('', '<br>')[self._testing], msg)
|
||
|
|
||
|
username = self._choose(username, sickgear.PLEX_USERNAME)
|
||
|
password = self._choose(password, sickgear.PLEX_PASSWORD)
|
||
|
|
||
|
# if username and password were provided, fetch the auth token from plex.tv
|
||
|
token_arg = None
|
||
|
if username and password:
|
||
|
|
||
|
self._log_debug(u'Fetching plex.tv credentials for user: ' + username)
|
||
|
req = urllib.request.Request('https://plex.tv/users/sign_in.xml', data=b'')
|
||
|
req.add_header('Authorization', 'Basic %s' % b64encodestring('%s:%s' % (username, password)))
|
||
|
req.add_header('X-Plex-Device-Name', 'SickGear')
|
||
|
req.add_header('X-Plex-Product', 'SickGear Notifier')
|
||
|
req.add_header('X-Plex-Client-Identifier', '5f48c063eaf379a565ff56c9bb2b401e')
|
||
|
req.add_header('X-Plex-Version', '1.0')
|
||
|
token_arg = False
|
||
|
|
||
|
try:
|
||
|
http_response_obj = urllib.request.urlopen(req) # PY2 http_response_obj has no `with` context manager
|
||
|
auth_tree = etree.parse(http_response_obj)
|
||
|
http_response_obj.close()
|
||
|
token = auth_tree.findall('.//authentication-token')[0].text
|
||
|
token_arg = '?X-Plex-Token=' + token
|
||
|
|
||
|
except urllib.error.URLError as e:
|
||
|
self._log(u'Error fetching credentials from plex.tv for user %s: %s' % (username, ex(e)))
|
||
|
|
||
|
except (ValueError, IndexError) as e:
|
||
|
self._log(u'Error parsing plex.tv response: ' + ex(e))
|
||
|
|
||
|
file_location = location if None is not location else '' if None is ep_obj else ep_obj.location
|
||
|
host_validate = self._get_host_list(host, all([token_arg]))
|
||
|
hosts_all = {}
|
||
|
hosts_match = {}
|
||
|
hosts_failed = []
|
||
|
for cur_host in host_validate:
|
||
|
response = sickgear.helpers.get_url(
|
||
|
'%s/library/sections%s' % (cur_host, token_arg or ''), timeout=10,
|
||
|
mute_connect_err=True, mute_read_timeout=True, mute_connect_timeout=True)
|
||
|
if response:
|
||
|
response = sickgear.helpers.parse_xml(response)
|
||
|
if None is response or not len(response):
|
||
|
hosts_failed.append(cur_host)
|
||
|
continue
|
||
|
|
||
|
sections = response.findall('.//Directory')
|
||
|
if not sections:
|
||
|
self._log(u'Plex Media Server not running on: ' + cur_host)
|
||
|
hosts_failed.append(cur_host)
|
||
|
continue
|
||
|
|
||
|
for section in filter_iter(lambda x: 'show' == x.attrib['type'], sections):
|
||
|
if str(section.attrib['key']) in hosts_all:
|
||
|
continue
|
||
|
keyed_host = [(str(section.attrib['key']), cur_host)]
|
||
|
hosts_all.update(keyed_host)
|
||
|
if not file_location:
|
||
|
continue
|
||
|
|
||
|
for section_location in section.findall('.//Location'):
|
||
|
section_path = re.sub(r'[/\\]+', '/', section_location.attrib['path'].lower())
|
||
|
section_path = re.sub(r'^(.{,2})[/\\]', '', section_path)
|
||
|
location_path = re.sub(r'[/\\]+', '/', file_location.lower())
|
||
|
location_path = re.sub(r'^(.{,2})[/\\]', '', location_path)
|
||
|
|
||
|
if section_path in location_path:
|
||
|
hosts_match.update(keyed_host)
|
||
|
break
|
||
|
|
||
|
if not self._testing:
|
||
|
hosts_try = (hosts_all.copy(), hosts_match.copy())[any(hosts_match)]
|
||
|
host_list = []
|
||
|
for section_key, cur_host in iteritems(hosts_try):
|
||
|
refresh_result = None
|
||
|
if not self._testing:
|
||
|
refresh_result = sickgear.helpers.get_url(
|
||
|
'%s/library/sections/%s/refresh%s' % (cur_host, section_key, token_arg or ''))
|
||
|
if (not self._testing and '' == refresh_result) or self._testing:
|
||
|
host_list.append(cur_host)
|
||
|
else:
|
||
|
hosts_failed.append(cur_host)
|
||
|
self._log_error(u'Error updating library section for Plex Media Server: %s' % cur_host)
|
||
|
|
||
|
if len(hosts_failed) == len(host_validate):
|
||
|
self._log(u'No successful Plex host updated')
|
||
|
return 'Fail no successful Plex host updated: %s' % ', '.join([host for host in hosts_failed])
|
||
|
else:
|
||
|
hosts = ', '.join(set(host_list))
|
||
|
if len(hosts_match):
|
||
|
self._log(u'Hosts updating where TV section paths match the downloaded show: %s' % hosts)
|
||
|
else:
|
||
|
self._log(u'Updating all hosts with TV sections: %s' % hosts)
|
||
|
return ''
|
||
|
|
||
|
hosts = [
|
||
|
host.replace('http://', '') for host in filter_iter(lambda x: x.startswith('http:'),
|
||
|
list_values(hosts_all))]
|
||
|
secured = [
|
||
|
host.replace('https://', '') for host in filter_iter(lambda x: x.startswith('https:'),
|
||
|
list_values(hosts_all))]
|
||
|
failed = ', '.join([
|
||
|
host.replace('http://', '') for host in filter_iter(lambda x: x.startswith('http:'),
|
||
|
hosts_failed)])
|
||
|
failed_secured = ', '.join(filter_iter(
|
||
|
lambda x: x not in hosts,
|
||
|
[host.replace('https://', '') for host in filter_iter(lambda x: x.startswith('https:'),
|
||
|
hosts_failed)]))
|
||
|
|
||
|
return '<br>' + '<br>'.join([result for result in [
|
||
|
('', 'Fail: username/password when fetching credentials from plex.tv')[False is token_arg],
|
||
|
('', 'OK (secure connect): %s' % ', '.join(secured))[any(secured)],
|
||
|
('', 'OK%s: %s' % ((' (legacy connect)', '')[None is token_arg], ', '.join(hosts)))[any(hosts)],
|
||
|
('', 'Fail (secure connect): %s' % failed_secured)[any(failed_secured)],
|
||
|
('', 'Fail%s: %s' % ((' (legacy connect)', '')[None is token_arg], failed))[bool(failed)]] if result])
|
||
|
|
||
|
|
||
|
notifier = PLEXNotifier
|