mirror of
https://github.com/SickGear/SickGear.git
synced 2024-12-18 16:53:38 +00:00
518 lines
14 KiB
Python
518 lines
14 KiB
Python
# Copyright: 2013 Paul Traylor
|
|
# These sources are released under the terms of the MIT license: see LICENSE
|
|
|
|
import hashlib
|
|
import re
|
|
import time
|
|
|
|
from . import shim
|
|
from . import errors as errors
|
|
|
|
__all__ = [
|
|
'GNTPRegister',
|
|
'GNTPNotice',
|
|
'GNTPSubscribe',
|
|
'GNTPOK',
|
|
'GNTPError',
|
|
'parse_gntp',
|
|
]
|
|
|
|
# GNTP/<version> <messagetype> <encryptionAlgorithmID>[:<ivValue>][ <keyHashAlgorithmID>:<keyHash>.<salt>]
|
|
GNTP_INFO_LINE = re.compile(
|
|
r'GNTP/(?P<version>\d+\.\d+) (?P<messagetype>REGISTER|NOTIFY|SUBSCRIBE|\-OK|\-ERROR)' +
|
|
' (?P<encryptionAlgorithmID>[A-Z0-9]+(:(?P<ivValue>[A-F0-9]+))?) ?' +
|
|
'((?P<keyHashAlgorithmID>[A-Z0-9]+):(?P<keyHash>[A-F0-9]+).(?P<salt>[A-F0-9]+))?\r\n',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
GNTP_INFO_LINE_SHORT = re.compile(
|
|
r'GNTP/(?P<version>\d+\.\d+) (?P<messagetype>REGISTER|NOTIFY|SUBSCRIBE|\-OK|\-ERROR)',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
GNTP_HEADER = re.compile(r'([\w-]+):(.+)')
|
|
|
|
GNTP_EOL = shim.b('\r\n')
|
|
GNTP_SEP = shim.b(': ')
|
|
|
|
|
|
class _GNTPBuffer(shim.StringIO):
|
|
"""GNTP Buffer class"""
|
|
def writeln(self, value=None):
|
|
if value:
|
|
self.write(shim.b(value))
|
|
self.write(GNTP_EOL)
|
|
|
|
def writeheader(self, key, value):
|
|
if not isinstance(value, str):
|
|
value = str(value)
|
|
self.write(shim.b(key))
|
|
self.write(GNTP_SEP)
|
|
self.write(shim.b(value))
|
|
self.write(GNTP_EOL)
|
|
|
|
|
|
class _GNTPBase(object):
|
|
"""Base initilization
|
|
|
|
:param string messagetype: GNTP Message type
|
|
:param string version: GNTP Protocol version
|
|
:param string encription: Encryption protocol
|
|
"""
|
|
def __init__(self, messagetype=None, version='1.0', encryption=None):
|
|
self.info = {
|
|
'version': version,
|
|
'messagetype': messagetype,
|
|
'encryptionAlgorithmID': encryption
|
|
}
|
|
self.hash_algo = {
|
|
'MD5': hashlib.md5,
|
|
'SHA1': hashlib.sha1,
|
|
'SHA256': hashlib.sha256,
|
|
'SHA512': hashlib.sha512,
|
|
}
|
|
self.headers = {}
|
|
self.resources = {}
|
|
|
|
# For Python2 we can just return the bytes as is without worry
|
|
# but on Python3 we want to make sure we return the packet as
|
|
# a unicode string so that things like logging won't get confused
|
|
if shim.PY2:
|
|
def __str__(self):
|
|
return self.encode()
|
|
else:
|
|
def __str__(self):
|
|
return shim.u(self.encode())
|
|
|
|
def _parse_info(self, data):
|
|
"""Parse the first line of a GNTP message to get security and other info values
|
|
|
|
:param string data: GNTP Message
|
|
:return dict: Parsed GNTP Info line
|
|
"""
|
|
|
|
match = GNTP_INFO_LINE.match(data)
|
|
|
|
if not match:
|
|
raise errors.ParseError('ERROR_PARSING_INFO_LINE')
|
|
|
|
info = match.groupdict()
|
|
if info['encryptionAlgorithmID'] == 'NONE':
|
|
info['encryptionAlgorithmID'] = None
|
|
|
|
return info
|
|
|
|
def set_password(self, password, encryptAlgo='MD5'):
|
|
"""Set a password for a GNTP Message
|
|
|
|
:param string password: Null to clear password
|
|
:param string encryptAlgo: Supports MD5, SHA1, SHA256, SHA512
|
|
"""
|
|
if not password:
|
|
self.info['encryptionAlgorithmID'] = None
|
|
self.info['keyHashAlgorithm'] = None
|
|
return
|
|
|
|
self.password = shim.b(password)
|
|
self.encryptAlgo = encryptAlgo.upper()
|
|
|
|
if not self.encryptAlgo in self.hash_algo:
|
|
raise errors.UnsupportedError('INVALID HASH "%s"' % self.encryptAlgo)
|
|
|
|
hashfunction = self.hash_algo.get(self.encryptAlgo)
|
|
|
|
password = password.encode('utf8')
|
|
seed = time.ctime().encode('utf8')
|
|
salt = hashfunction(seed).hexdigest()
|
|
saltHash = hashfunction(seed).digest()
|
|
keyBasis = password + saltHash
|
|
key = hashfunction(keyBasis).digest()
|
|
keyHash = hashfunction(key).hexdigest()
|
|
|
|
self.info['keyHashAlgorithmID'] = self.encryptAlgo
|
|
self.info['keyHash'] = keyHash.upper()
|
|
self.info['salt'] = salt.upper()
|
|
|
|
def _decode_hex(self, value):
|
|
"""Helper function to decode hex string to `proper` hex string
|
|
|
|
:param string value: Human readable hex string
|
|
:return string: Hex string
|
|
"""
|
|
result = ''
|
|
for i in range(0, len(value), 2):
|
|
tmp = int(value[i:i + 2], 16)
|
|
result += chr(tmp)
|
|
return result
|
|
|
|
def _decode_binary(self, rawIdentifier, identifier):
|
|
rawIdentifier += '\r\n\r\n'
|
|
dataLength = int(identifier['Length'])
|
|
pointerStart = self.raw.find(rawIdentifier) + len(rawIdentifier)
|
|
pointerEnd = pointerStart + dataLength
|
|
data = self.raw[pointerStart:pointerEnd]
|
|
if not len(data) == dataLength:
|
|
raise errors.ParseError('INVALID_DATA_LENGTH Expected: %s Recieved %s' % (dataLength, len(data)))
|
|
return data
|
|
|
|
def _validate_password(self, password):
|
|
"""Validate GNTP Message against stored password"""
|
|
self.password = password
|
|
if password is None:
|
|
raise errors.AuthError('Missing password')
|
|
keyHash = self.info.get('keyHash', None)
|
|
if keyHash is None and self.password is None:
|
|
return True
|
|
if keyHash is None:
|
|
raise errors.AuthError('Invalid keyHash')
|
|
if self.password is None:
|
|
raise errors.AuthError('Missing password')
|
|
|
|
keyHashAlgorithmID = self.info.get('keyHashAlgorithmID','MD5')
|
|
|
|
password = self.password.encode('utf8')
|
|
saltHash = self._decode_hex(self.info['salt'])
|
|
|
|
keyBasis = password + saltHash
|
|
self.key = self.hash_algo[keyHashAlgorithmID](keyBasis).digest()
|
|
keyHash = self.hash_algo[keyHashAlgorithmID](self.key).hexdigest()
|
|
|
|
if not keyHash.upper() == self.info['keyHash'].upper():
|
|
raise errors.AuthError('Invalid Hash')
|
|
return True
|
|
|
|
def validate(self):
|
|
"""Verify required headers"""
|
|
for header in self._requiredHeaders:
|
|
if not self.headers.get(header, False):
|
|
raise errors.ParseError('Missing Notification Header: ' + header)
|
|
|
|
def _format_info(self):
|
|
"""Generate info line for GNTP Message
|
|
|
|
:return string:
|
|
"""
|
|
info = 'GNTP/%s %s' % (
|
|
self.info.get('version'),
|
|
self.info.get('messagetype'),
|
|
)
|
|
if self.info.get('encryptionAlgorithmID', None):
|
|
info += ' %s:%s' % (
|
|
self.info.get('encryptionAlgorithmID'),
|
|
self.info.get('ivValue'),
|
|
)
|
|
else:
|
|
info += ' NONE'
|
|
|
|
if self.info.get('keyHashAlgorithmID', None):
|
|
info += ' %s:%s.%s' % (
|
|
self.info.get('keyHashAlgorithmID'),
|
|
self.info.get('keyHash'),
|
|
self.info.get('salt')
|
|
)
|
|
|
|
return info
|
|
|
|
def _parse_dict(self, data):
|
|
"""Helper function to parse blocks of GNTP headers into a dictionary
|
|
|
|
:param string data:
|
|
:return dict: Dictionary of parsed GNTP Headers
|
|
"""
|
|
d = {}
|
|
for line in data.split('\r\n'):
|
|
match = GNTP_HEADER.match(line)
|
|
if not match:
|
|
continue
|
|
|
|
key = match.group(1).strip()
|
|
val = match.group(2).strip()
|
|
d[key] = val
|
|
return d
|
|
|
|
def add_header(self, key, value):
|
|
self.headers[key] = value
|
|
|
|
def add_resource(self, data):
|
|
"""Add binary resource
|
|
|
|
:param string data: Binary Data
|
|
"""
|
|
data = shim.b(data)
|
|
identifier = hashlib.md5(data).hexdigest()
|
|
self.resources[identifier] = data
|
|
return 'x-growl-resource://%s' % identifier
|
|
|
|
def decode(self, data, password=None):
|
|
"""Decode GNTP Message
|
|
|
|
:param string data:
|
|
"""
|
|
self.password = password
|
|
self.raw = shim.u(data)
|
|
parts = self.raw.split('\r\n\r\n')
|
|
self.info = self._parse_info(self.raw)
|
|
self.headers = self._parse_dict(parts[0])
|
|
|
|
def encode(self):
|
|
"""Encode a generic GNTP Message
|
|
|
|
:return string: GNTP Message ready to be sent. Returned as a byte string
|
|
"""
|
|
|
|
buff = _GNTPBuffer()
|
|
|
|
buff.writeln(self._format_info())
|
|
|
|
#Headers
|
|
for k, v in self.headers.items():
|
|
buff.writeheader(k, v)
|
|
buff.writeln()
|
|
|
|
#Resources
|
|
for resource, data in self.resources.items():
|
|
buff.writeheader('Identifier', resource)
|
|
buff.writeheader('Length', len(data))
|
|
buff.writeln()
|
|
buff.write(data)
|
|
buff.writeln()
|
|
buff.writeln()
|
|
|
|
return buff.getvalue()
|
|
|
|
|
|
class GNTPRegister(_GNTPBase):
|
|
"""Represents a GNTP Registration Command
|
|
|
|
:param string data: (Optional) See decode()
|
|
:param string password: (Optional) Password to use while encoding/decoding messages
|
|
"""
|
|
_requiredHeaders = [
|
|
'Application-Name',
|
|
'Notifications-Count'
|
|
]
|
|
_requiredNotificationHeaders = ['Notification-Name']
|
|
|
|
def __init__(self, data=None, password=None):
|
|
_GNTPBase.__init__(self, 'REGISTER')
|
|
self.notifications = []
|
|
|
|
if data:
|
|
self.decode(data, password)
|
|
else:
|
|
self.set_password(password)
|
|
self.add_header('Application-Name', 'pygntp')
|
|
self.add_header('Notifications-Count', 0)
|
|
|
|
def validate(self):
|
|
'''Validate required headers and validate notification headers'''
|
|
for header in self._requiredHeaders:
|
|
if False is self.headers.get(header, False):
|
|
raise errors.ParseError('Missing Registration Header: ' + header)
|
|
for notice in self.notifications:
|
|
for header in self._requiredNotificationHeaders:
|
|
if not notice.get(header, False):
|
|
raise errors.ParseError('Missing Notification Header: ' + header)
|
|
|
|
def decode(self, data, password):
|
|
"""Decode existing GNTP Registration message
|
|
|
|
:param string data: Message to decode
|
|
"""
|
|
self.raw = shim.u(data)
|
|
parts = self.raw.split('\r\n\r\n')
|
|
self.info = self._parse_info(self.raw)
|
|
self._validate_password(password)
|
|
self.headers = self._parse_dict(parts[0])
|
|
|
|
for i, part in enumerate(parts):
|
|
if i == 0:
|
|
continue # Skip Header
|
|
if part.strip() == '':
|
|
continue
|
|
notice = self._parse_dict(part)
|
|
if notice.get('Notification-Name', False):
|
|
self.notifications.append(notice)
|
|
elif notice.get('Identifier', False):
|
|
notice['Data'] = self._decode_binary(part, notice)
|
|
#open('register.png','wblol').write(notice['Data'])
|
|
self.resources[notice.get('Identifier')] = notice
|
|
|
|
def add_notification(self, name, enabled=True):
|
|
"""Add new Notification to Registration message
|
|
|
|
:param string name: Notification Name
|
|
:param boolean enabled: Enable this notification by default
|
|
"""
|
|
notice = {}
|
|
notice['Notification-Name'] = name
|
|
notice['Notification-Enabled'] = enabled
|
|
|
|
self.notifications.append(notice)
|
|
self.add_header('Notifications-Count', len(self.notifications))
|
|
|
|
def encode(self):
|
|
"""Encode a GNTP Registration Message
|
|
|
|
:return string: Encoded GNTP Registration message. Returned as a byte string
|
|
"""
|
|
|
|
buff = _GNTPBuffer()
|
|
|
|
buff.writeln(self._format_info())
|
|
|
|
#Headers
|
|
for k, v in self.headers.items():
|
|
buff.writeheader(k, v)
|
|
buff.writeln()
|
|
|
|
#Notifications
|
|
if len(self.notifications) > 0:
|
|
for notice in self.notifications:
|
|
for k, v in notice.items():
|
|
buff.writeheader(k, v)
|
|
buff.writeln()
|
|
|
|
#Resources
|
|
for resource, data in self.resources.items():
|
|
buff.writeheader('Identifier', resource)
|
|
buff.writeheader('Length', len(data))
|
|
buff.writeln()
|
|
buff.write(data)
|
|
buff.writeln()
|
|
buff.writeln()
|
|
|
|
return buff.getvalue()
|
|
|
|
|
|
class GNTPNotice(_GNTPBase):
|
|
"""Represents a GNTP Notification Command
|
|
|
|
:param string data: (Optional) See decode()
|
|
:param string app: (Optional) Set Application-Name
|
|
:param string name: (Optional) Set Notification-Name
|
|
:param string title: (Optional) Set Notification Title
|
|
:param string password: (Optional) Password to use while encoding/decoding messages
|
|
"""
|
|
_requiredHeaders = [
|
|
'Application-Name',
|
|
'Notification-Name',
|
|
'Notification-Title'
|
|
]
|
|
|
|
def __init__(self, data=None, app=None, name=None, title=None, password=None):
|
|
_GNTPBase.__init__(self, 'NOTIFY')
|
|
|
|
if data:
|
|
self.decode(data, password)
|
|
else:
|
|
self.set_password(password)
|
|
if app:
|
|
self.add_header('Application-Name', app)
|
|
if name:
|
|
self.add_header('Notification-Name', name)
|
|
if title:
|
|
self.add_header('Notification-Title', title)
|
|
|
|
def decode(self, data, password):
|
|
"""Decode existing GNTP Notification message
|
|
|
|
:param string data: Message to decode.
|
|
"""
|
|
self.raw = shim.u(data)
|
|
parts = self.raw.split('\r\n\r\n')
|
|
self.info = self._parse_info(self.raw)
|
|
self._validate_password(password)
|
|
self.headers = self._parse_dict(parts[0])
|
|
|
|
for i, part in enumerate(parts):
|
|
if i == 0:
|
|
continue # Skip Header
|
|
if part.strip() == '':
|
|
continue
|
|
notice = self._parse_dict(part)
|
|
if notice.get('Identifier', False):
|
|
notice['Data'] = self._decode_binary(part, notice)
|
|
#open('notice.png','wblol').write(notice['Data'])
|
|
self.resources[notice.get('Identifier')] = notice
|
|
|
|
|
|
class GNTPSubscribe(_GNTPBase):
|
|
"""Represents a GNTP Subscribe Command
|
|
|
|
:param string data: (Optional) See decode()
|
|
:param string password: (Optional) Password to use while encoding/decoding messages
|
|
"""
|
|
_requiredHeaders = [
|
|
'Subscriber-ID',
|
|
'Subscriber-Name',
|
|
]
|
|
|
|
def __init__(self, data=None, password=None):
|
|
_GNTPBase.__init__(self, 'SUBSCRIBE')
|
|
if data:
|
|
self.decode(data, password)
|
|
else:
|
|
self.set_password(password)
|
|
|
|
|
|
class GNTPOK(_GNTPBase):
|
|
"""Represents a GNTP OK Response
|
|
|
|
:param string data: (Optional) See _GNTPResponse.decode()
|
|
:param string action: (Optional) Set type of action the OK Response is for
|
|
"""
|
|
_requiredHeaders = ['Response-Action']
|
|
|
|
def __init__(self, data=None, action=None):
|
|
_GNTPBase.__init__(self, '-OK')
|
|
if data:
|
|
self.decode(data)
|
|
if action:
|
|
self.add_header('Response-Action', action)
|
|
|
|
|
|
class GNTPError(_GNTPBase):
|
|
"""Represents a GNTP Error response
|
|
|
|
:param string data: (Optional) See _GNTPResponse.decode()
|
|
:param string errorcode: (Optional) Error code
|
|
:param string errordesc: (Optional) Error Description
|
|
"""
|
|
_requiredHeaders = ['Error-Code', 'Error-Description']
|
|
|
|
def __init__(self, data=None, errorcode=None, errordesc=None):
|
|
_GNTPBase.__init__(self, '-ERROR')
|
|
if data:
|
|
self.decode(data)
|
|
if errorcode:
|
|
self.add_header('Error-Code', errorcode)
|
|
self.add_header('Error-Description', errordesc)
|
|
|
|
def error(self):
|
|
return (self.headers.get('Error-Code', None),
|
|
self.headers.get('Error-Description', None))
|
|
|
|
|
|
def parse_gntp(data, password=None):
|
|
"""Attempt to parse a message as a GNTP message
|
|
|
|
:param string data: Message to be parsed
|
|
:param string password: Optional password to be used to verify the message
|
|
"""
|
|
data = shim.u(data)
|
|
match = GNTP_INFO_LINE_SHORT.match(data)
|
|
if not match:
|
|
raise errors.ParseError('INVALID_GNTP_INFO')
|
|
info = match.groupdict()
|
|
if info['messagetype'] == 'REGISTER':
|
|
return GNTPRegister(data, password=password)
|
|
elif info['messagetype'] == 'NOTIFY':
|
|
return GNTPNotice(data, password=password)
|
|
elif info['messagetype'] == 'SUBSCRIBE':
|
|
return GNTPSubscribe(data, password=password)
|
|
elif info['messagetype'] == '-OK':
|
|
return GNTPOK(data)
|
|
elif info['messagetype'] == '-ERROR':
|
|
return GNTPError(data)
|
|
raise errors.ParseError('INVALID_GNTP_MESSAGE')
|