# -*- coding: utf-8 -*- # BSD 2-Clause License # # Apprise - Push Notification Library. # Copyright (c) 2024, Chris Caron # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # Great sources # - https://github.com/matrix-org/matrix-python-sdk # - https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst # import re import requests from markdown import markdown from json import dumps from json import loads from time import time from .base import NotifyBase from ..url import PrivacyMode from ..common import NotifyType from ..common import NotifyImageSize from ..common import NotifyFormat from ..utils import parse_bool from ..utils import parse_list from ..utils import is_hostname from ..utils import validate_regex from ..locale import gettext_lazy as _ # Define default path MATRIX_V1_WEBHOOK_PATH = '/api/v1/matrix/hook' MATRIX_V2_API_PATH = '/_matrix/client/r0' MATRIX_V3_API_PATH = '/_matrix/client/v3' MATRIX_V3_MEDIA_PATH = '/_matrix/media/v3' MATRIX_V2_MEDIA_PATH = '/_matrix/media/r0' # Extend HTTP Error Messages MATRIX_HTTP_ERROR_MAP = { 403: 'Unauthorized - Invalid Token.', 429: 'Rate limit imposed; wait 2s and try again', } # Matrix Room Syntax IS_ROOM_ALIAS = re.compile( r'^\s*(#|%23)?(?P[a-z0-9-]+)((:|%3A)' r'(?P[a-z0-9.-]+))?\s*$', re.I) # Room ID MUST start with an exclamation to avoid ambiguity IS_ROOM_ID = re.compile( r'^\s*(!|!|%21)(?P[a-z0-9-]+)((:|%3A)' r'(?P[a-z0-9.-]+))?\s*$', re.I) class MatrixMessageType: """ The Matrix Message types """ TEXT = "text" NOTICE = "notice" # matrix message types are placed into this list for validation purposes MATRIX_MESSAGE_TYPES = ( MatrixMessageType.TEXT, MatrixMessageType.NOTICE, ) class MatrixVersion: # Version 2 V2 = "2" # Version 3 V3 = "3" # webhook modes are placed into this list for validation purposes MATRIX_VERSIONS = ( MatrixVersion.V2, MatrixVersion.V3, ) class MatrixWebhookMode: # Webhook Mode is disabled DISABLED = "off" # The default webhook mode is to just be set to Matrix MATRIX = "matrix" # Support the slack webhook plugin SLACK = "slack" # Support the t2bot webhook plugin T2BOT = "t2bot" # webhook modes are placed into this list for validation purposes MATRIX_WEBHOOK_MODES = ( MatrixWebhookMode.DISABLED, MatrixWebhookMode.MATRIX, MatrixWebhookMode.SLACK, MatrixWebhookMode.T2BOT, ) class NotifyMatrix(NotifyBase): """ A wrapper for Matrix Notifications """ # The default descriptive name associated with the Notification service_name = 'Matrix' # The services URL service_url = 'https://matrix.org/' # The default protocol protocol = 'matrix' # The default secure protocol secure_protocol = 'matrixs' # Support Attachments attachment_support = True # A URL that takes you to the setup/help of the specific protocol setup_url = 'https://github.com/caronc/apprise/wiki/Notify_matrix' # Allows the user to specify the NotifyImageSize object image_size = NotifyImageSize.XY_32 # The maximum allowable characters allowed in the body per message # https://spec.matrix.org/v1.6/client-server-api/#size-limits # The complete event MUST NOT be larger than 65536 bytes, when formatted # with the federation event format, including any signatures, and encoded # as Canonical JSON. # # To gracefully allow for some overhead' we'll define a max body length # of just slighty lower then the limit of the full message itself. body_maxlen = 65000 # Throttle a wee-bit to avoid thrashing request_rate_per_sec = 0.5 # Our Matrix API Version matrix_api_version = '3' # How many retry attempts we'll make in the event the server asks us to # throttle back. default_retries = 2 # The number of micro seconds to wait if we get a 429 error code and # the server doesn't remind us how long we shoul wait for default_wait_ms = 1000 # Define object templates templates = ( # Targets are ignored when using t2bot mode; only a token is required '{schema}://{token}', '{schema}://{user}@{token}', # Disabled webhook '{schema}://{user}:{password}@{host}/{targets}', '{schema}://{user}:{password}@{host}:{port}/{targets}', # Webhook mode '{schema}://{user}:{token}@{host}/{targets}', '{schema}://{user}:{token}@{host}:{port}/{targets}', ) # Define our template tokens template_tokens = dict(NotifyBase.template_tokens, **{ 'host': { 'name': _('Hostname'), 'type': 'string', }, 'port': { 'name': _('Port'), 'type': 'int', 'min': 1, 'max': 65535, }, 'user': { 'name': _('Username'), 'type': 'string', }, 'password': { 'name': _('Password'), 'type': 'string', 'private': True, }, 'token': { 'name': _('Access Token'), 'private': True, 'map_to': 'password', }, 'target_user': { 'name': _('Target User'), 'type': 'string', 'prefix': '@', 'map_to': 'targets', }, 'target_room_id': { 'name': _('Target Room ID'), 'type': 'string', 'prefix': '!', 'map_to': 'targets', }, 'target_room_alias': { 'name': _('Target Room Alias'), 'type': 'string', 'prefix': '!', 'map_to': 'targets', }, 'targets': { 'name': _('Targets'), 'type': 'list:string', }, }) # Define our template arguments template_args = dict(NotifyBase.template_args, **{ 'image': { 'name': _('Include Image'), 'type': 'bool', 'default': False, 'map_to': 'include_image', }, 'mode': { 'name': _('Webhook Mode'), 'type': 'choice:string', 'values': MATRIX_WEBHOOK_MODES, 'default': MatrixWebhookMode.DISABLED, }, 'version': { 'name': _('Matrix API Verion'), 'type': 'choice:string', 'values': MATRIX_VERSIONS, 'default': MatrixVersion.V3, }, 'msgtype': { 'name': _('Message Type'), 'type': 'choice:string', 'values': MATRIX_MESSAGE_TYPES, 'default': MatrixMessageType.TEXT, }, 'to': { 'alias_of': 'targets', }, 'token': { 'alias_of': 'token', }, }) def __init__(self, targets=None, mode=None, msgtype=None, version=None, include_image=False, **kwargs): """ Initialize Matrix Object """ super().__init__(**kwargs) # Prepare a list of rooms to connect and notify self.rooms = parse_list(targets) # our home server gets populated after a login/registration self.home_server = None # our user_id gets populated after a login/registration self.user_id = None # This gets initialized after a login/registration self.access_token = None # This gets incremented for each request made against the v3 API self.transaction_id = 0 # Place an image inline with the message body self.include_image = include_image # maintain a lookup of room alias's we already paired with their id # to speed up future requests self._room_cache = {} # Setup our mode self.mode = self.template_args['mode']['default'] \ if not isinstance(mode, str) else mode.lower() if self.mode and self.mode not in MATRIX_WEBHOOK_MODES: msg = 'The mode specified ({}) is invalid.'.format(mode) self.logger.warning(msg) raise TypeError(msg) # Setup our version self.version = self.template_args['version']['default'] \ if not isinstance(version, str) else version if self.version not in MATRIX_VERSIONS: msg = 'The version specified ({}) is invalid.'.format(version) self.logger.warning(msg) raise TypeError(msg) # Setup our message type self.msgtype = self.template_args['msgtype']['default'] \ if not isinstance(msgtype, str) else msgtype.lower() if self.msgtype and self.msgtype not in MATRIX_MESSAGE_TYPES: msg = 'The msgtype specified ({}) is invalid.'.format(msgtype) self.logger.warning(msg) raise TypeError(msg) if self.mode == MatrixWebhookMode.T2BOT: # t2bot configuration requires that a webhook id is specified self.access_token = validate_regex( self.password, r'^[a-z0-9]{64}$', 'i') if not self.access_token: msg = 'An invalid T2Bot/Matrix Webhook ID ' \ '({}) was specified.'.format(self.password) self.logger.warning(msg) raise TypeError(msg) elif not is_hostname(self.host): msg = 'An invalid Matrix Hostname ({}) was specified'\ .format(self.host) self.logger.warning(msg) raise TypeError(msg) else: # Verify port if specified if self.port is not None and not ( isinstance(self.port, int) and self.port >= self.template_tokens['port']['min'] and self.port <= self.template_tokens['port']['max']): msg = 'An invalid Matrix Port ({}) was specified'\ .format(self.port) self.logger.warning(msg) raise TypeError(msg) def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): """ Perform Matrix Notification """ # Call the _send_ function applicable to whatever mode we're in # - calls _send_webhook_notification if the mode variable is set # - calls _send_server_notification if the mode variable is not set return getattr(self, '_send_{}_notification'.format( 'webhook' if self.mode != MatrixWebhookMode.DISABLED else 'server'))( body=body, title=title, notify_type=notify_type, **kwargs) def _send_webhook_notification(self, body, title='', notify_type=NotifyType.INFO, **kwargs): """ Perform Matrix Notification as a webhook """ headers = { 'User-Agent': self.app_id, 'Content-Type': 'application/json', } if self.mode != MatrixWebhookMode.T2BOT: # Acquire our access token from our URL access_token = self.password if self.password else self.user default_port = 443 if self.secure else 80 # Prepare our URL url = '{schema}://{hostname}:{port}{webhook_path}/{token}'.format( schema='https' if self.secure else 'http', hostname=self.host, port='' if self.port is None or self.port == default_port else self.port, webhook_path=MATRIX_V1_WEBHOOK_PATH, token=access_token, ) else: # # t2bot Setup # # Prepare our URL url = 'https://webhooks.t2bot.io/api/v1/matrix/hook/' \ '{token}'.format(token=self.access_token) # Retrieve our payload payload = getattr(self, '_{}_webhook_payload'.format(self.mode))( body=body, title=title, notify_type=notify_type, **kwargs) self.logger.debug('Matrix POST URL: %s (cert_verify=%r)' % ( url, self.verify_certificate, )) self.logger.debug('Matrix Payload: %s' % str(payload)) # Always call throttle before any remote server i/o is made self.throttle() try: r = requests.post( url, data=dumps(payload), headers=headers, verify=self.verify_certificate, timeout=self.request_timeout, ) if r.status_code != requests.codes.ok: # We had a problem status_str = \ NotifyMatrix.http_response_code_lookup( r.status_code, MATRIX_HTTP_ERROR_MAP) self.logger.warning( 'Failed to send Matrix notification: ' '{}{}error={}.'.format( status_str, ', ' if status_str else '', r.status_code)) self.logger.debug('Response Details:\r\n{}'.format(r.content)) # Return; we're done return False else: self.logger.info('Sent Matrix notification.') except requests.RequestException as e: self.logger.warning( 'A Connection error occurred sending Matrix notification.' ) self.logger.debug('Socket Exception: %s' % str(e)) # Return; we're done return False return True def _slack_webhook_payload(self, body, title='', notify_type=NotifyType.INFO, **kwargs): """ Format the payload for a Slack based message """ if not hasattr(self, '_re_slack_formatting_rules'): # Prepare some one-time slack formatting variables self._re_slack_formatting_map = { # New lines must become the string version r'\r\*\n': '\\n', # Escape other special characters r'&': '&', r'<': '<', r'>': '>', } # Iterate over above list and store content accordingly self._re_slack_formatting_rules = re.compile( r'(' + '|'.join(self._re_slack_formatting_map.keys()) + r')', re.IGNORECASE, ) # Perform Formatting title = self._re_slack_formatting_rules.sub( # pragma: no branch lambda x: self._re_slack_formatting_map[x.group()], title, ) body = self._re_slack_formatting_rules.sub( # pragma: no branch lambda x: self._re_slack_formatting_map[x.group()], body, ) # prepare JSON Object payload = { 'username': self.user if self.user else self.app_id, # Use Markdown language 'mrkdwn': (self.notify_format == NotifyFormat.MARKDOWN), 'attachments': [{ 'title': title, 'text': body, 'color': self.color(notify_type), 'ts': time(), 'footer': self.app_id, }], } return payload def _matrix_webhook_payload(self, body, title='', notify_type=NotifyType.INFO, **kwargs): """ Format the payload for a Matrix based message """ payload = { 'displayName': self.user if self.user else self.app_id, 'format': 'plain' if self.notify_format == NotifyFormat.TEXT else 'html', 'text': '', } if self.notify_format == NotifyFormat.HTML: payload['text'] = '{title}{body}'.format( title='' if not title else '

{}

'.format( NotifyMatrix.escape_html(title)), body=body) elif self.notify_format == NotifyFormat.MARKDOWN: payload['text'] = '{title}{body}'.format( title='' if not title else '

{}

'.format( NotifyMatrix.escape_html(title)), body=markdown(body)) else: # NotifyFormat.TEXT payload['text'] = \ body if not title else '{}\r\n{}'.format(title, body) return payload def _t2bot_webhook_payload(self, body, title='', notify_type=NotifyType.INFO, **kwargs): """ Format the payload for a T2Bot Matrix based messages """ # Retrieve our payload payload = self._matrix_webhook_payload( body=body, title=title, notify_type=notify_type, **kwargs) # Acquire our image url if we're configured to do so image_url = None if not self.include_image else \ self.image_url(notify_type) if image_url: # t2bot can take an avatarUrl Entry payload['avatarUrl'] = image_url return payload def _send_server_notification(self, body, title='', notify_type=NotifyType.INFO, attach=None, **kwargs): """ Perform Direct Matrix Server Notification (no webhook) """ if self.access_token is None: # We need to register if not self._login(): if not self._register(): return False if len(self.rooms) == 0: # Attempt to retrieve a list of already joined channels self.rooms = self._joined_rooms() if len(self.rooms) == 0: # Nothing to notify self.logger.warning( 'There were no Matrix rooms specified to notify.') return False # Create a copy of our rooms to join and message rooms = list(self.rooms) # Initiaize our error tracking has_error = False attachments = None if attach and self.attachment_support: attachments = self._send_attachments(attach) if attachments is False: # take an early exit return False while len(rooms) > 0: # Get our room room = rooms.pop(0) # Get our room_id from our response room_id = self._room_join(room) if not room_id: # Notify our user about our failure self.logger.warning( 'Could not join Matrix room {}.'.format((room))) # Mark our failure has_error = True continue # Acquire our image url if we're configured to do so image_url = None if not self.include_image else \ self.image_url(notify_type) # Build our path if self.version == MatrixVersion.V3: path = '/rooms/{}/send/m.room.message/{}'.format( NotifyMatrix.quote(room_id), self.transaction_id, ) else: path = '/rooms/{}/send/m.room.message'.format( NotifyMatrix.quote(room_id)) if self.version == MatrixVersion.V2: # # Attachments don't work beyond V2 at this time # if image_url: # Define our payload image_payload = { 'msgtype': 'm.image', 'url': image_url, 'body': '{}'.format( notify_type if not title else title), } # Post our content postokay, response = self._fetch( path, payload=image_payload) if not postokay: # Mark our failure has_error = True continue if attachments: for attachment in attachments: attachment['room_id'] = room_id attachment['type'] = 'm.room.message' postokay, response = self._fetch( path, payload=attachment) if not postokay: # Mark our failure has_error = True continue # Define our payload payload = { 'msgtype': 'm.{}'.format(self.msgtype), 'body': '{title}{body}'.format( title='' if not title else '# {}\r\n'.format(title), body=body), } # Update our payload advance formatting for the services that # support them. if self.notify_format == NotifyFormat.HTML: payload.update({ 'format': 'org.matrix.custom.html', 'formatted_body': '{title}{body}'.format( title='' if not title else '

{}

'.format(title), body=body, ) }) elif self.notify_format == NotifyFormat.MARKDOWN: payload.update({ 'format': 'org.matrix.custom.html', 'formatted_body': '{title}{body}'.format( title='' if not title else '

{}

'.format( NotifyMatrix.escape_html(title, whitespace=False)), body=markdown(body), ) }) # Post our content method = 'PUT' if self.version == MatrixVersion.V3 else 'POST' postokay, response = self._fetch( path, payload=payload, method=method) # Increment the transaction ID to avoid future messages being # recognized as retransmissions and ignored if self.version == MatrixVersion.V3: self.transaction_id += 1 if not postokay: # Notify our user self.logger.warning( 'Could not send notification Matrix room {}.'.format(room)) # Mark our failure has_error = True continue return not has_error def _send_attachments(self, attach): """ Posts all of the provided attachments """ payloads = [] if self.version != MatrixVersion.V2: self.logger.warning( 'Add ?v=2 to Apprise URL to support Attachments') return next((False for a in attach if not a), []) for attachment in attach: if not attachment: # invalid attachment (bad file) return False if not re.match(r'^image/', attachment.mimetype, re.I): # unsuppored at this time continue postokay, response = \ self._fetch('/upload', attachment=attachment) if not (postokay and isinstance(response, dict)): # Failed to perform upload return False # If we get here, we'll have a response that looks like: # { # "content_uri": "mxc://example.com/a-unique-key" # } # FUTURE if self.version == MatrixVersion.V3: # FUTURE # Prepare our payload # FUTURE payloads.append({ # FUTURE "body": attachment.name, # FUTURE "info": { # FUTURE "mimetype": attachment.mimetype, # FUTURE "size": len(attachment), # FUTURE }, # FUTURE "msgtype": "m.image", # FUTURE "url": response.get('content_uri'), # FUTURE }) # FUTURE else: # FUTURE # Prepare our payload # FUTURE payloads.append({ # FUTURE "info": { # FUTURE "mimetype": attachment.mimetype, # FUTURE }, # FUTURE "msgtype": "m.image", # FUTURE "body": "tta.webp", # FUTURE "url": response.get('content_uri'), # FUTURE }) # Prepare our payload payloads.append({ "info": { "mimetype": attachment.mimetype, }, "msgtype": "m.image", "body": "tta.webp", "url": response.get('content_uri'), }) return payloads def _register(self): """ Register with the service if possible. """ # Prepare our Registration Payload. This will only work if registration # is enabled for the public payload = { 'kind': 'user', 'auth': {'type': 'm.login.dummy'}, } # parameters params = { 'kind': 'user', } # If a user is not specified, one will be randomly generated for you. # If you do not specify a password, you will be unable to login to the # account if you forget the access_token. if self.user: payload['username'] = self.user if self.password: payload['password'] = self.password # Register postokay, response = \ self._fetch('/register', payload=payload, params=params) if not (postokay and isinstance(response, dict)): # Failed to register return False # Pull the response details self.access_token = response.get('access_token') self.home_server = response.get('home_server') self.user_id = response.get('user_id') if self.access_token is not None: self.logger.debug( 'Registered successfully with Matrix server.') return True return False def _login(self): """ Acquires the matrix token required for making future requests. If we fail we return False, otherwise we return True """ if self.access_token: # Login not required; silently skip-over return True if not (self.user and self.password): # It's not possible to register since we need these 2 values to # make the action possible. self.logger.warning( 'Failed to login to Matrix server: ' 'user/pass combo is missing.') return False # Prepare our Authentication Payload if self.version == MatrixVersion.V3: payload = { 'type': 'm.login.password', 'identifier': { 'type': 'm.id.user', 'user': self.user, }, 'password': self.password, } else: payload = { 'type': 'm.login.password', 'user': self.user, 'password': self.password, } # Build our URL postokay, response = self._fetch('/login', payload=payload) if not (postokay and isinstance(response, dict)): # Failed to login return False # Pull the response details self.access_token = response.get('access_token') self.home_server = response.get('home_server') self.user_id = response.get('user_id') if not self.access_token: return False self.logger.debug( 'Authenticated successfully with Matrix server.') return True def _logout(self): """ Relinquishes token from remote server """ if not self.access_token: # Login not required; silently skip-over return True # Prepare our Registration Payload payload = {} # Expire our token postokay, response = self._fetch('/logout', payload=payload) if not postokay: # If we get here, the token was declared as having already # been expired. The response looks like this: # { # u'errcode': u'M_UNKNOWN_TOKEN', # u'error': u'Access Token unknown or expired', # } # # In this case it's okay to safely return True because # we're logged out in this case. if response.get('errcode') != u'M_UNKNOWN_TOKEN': return False # else: The response object looks like this if we were successful: # {} # Pull the response details self.access_token = None self.home_server = None self.user_id = None # Clear our room cache self._room_cache = {} self.logger.debug( 'Unauthenticated successfully with Matrix server.') return True def _room_join(self, room): """ Joins a matrix room if we're not already in it. Otherwise it attempts to create it if it doesn't exist and always returns the room_id if it was successful, otherwise it returns None """ if not self.access_token: # We can't join a room if we're not logged in return None if not isinstance(room, str): # Not a supported string return None # Prepare our Join Payload payload = {} # Check if it's a room id... result = IS_ROOM_ID.match(room) if result: # We detected ourselves the home_server home_server = result.group('home_server') \ if result.group('home_server') else self.home_server # It was a room ID; simple mapping: room_id = "!{}:{}".format( result.group('room'), home_server, ) # Check our cache for speed: if room_id in self._room_cache: # We're done as we've already joined the channel return self._room_cache[room_id]['id'] # Build our URL path = '/join/{}'.format(NotifyMatrix.quote(room_id)) # Make our query postokay, _ = self._fetch(path, payload=payload) if postokay: # Cache our entry for fast access later self._room_cache[room_id] = { 'id': room_id, 'home_server': home_server, } return room_id if postokay else None # Try to see if it's an alias then... result = IS_ROOM_ALIAS.match(room) if not result: # There is nothing else it could be self.logger.warning( 'Ignoring illegally formed room {} ' 'from Matrix server list.'.format(room)) return None # If we reach here, we're dealing with a channel alias home_server = self.home_server \ if not result.group('home_server') \ else result.group('home_server') # tidy our room (alias) identifier room = '#{}:{}'.format(result.group('room'), home_server) # Check our cache for speed: if room in self._room_cache: # We're done as we've already joined the channel return self._room_cache[room]['id'] # If we reach here, we need to join the channel # Build our URL path = '/join/{}'.format(NotifyMatrix.quote(room)) # Attempt to join the channel postokay, response = self._fetch(path, payload=payload) if postokay: # Cache our entry for fast access later self._room_cache[room] = { 'id': response.get('room_id'), 'home_server': home_server, } return self._room_cache[room]['id'] # Try to create the channel return self._room_create(room) def _room_create(self, room): """ Creates a matrix room and return it's room_id if successful otherwise None is returned. """ if not self.access_token: # We can't create a room if we're not logged in return None if not isinstance(room, str): # Not a supported string return None # Build our room if we have to: result = IS_ROOM_ALIAS.match(room) if not result: # Illegally formed room return None # Our home_server home_server = result.group('home_server') \ if result.group('home_server') else self.home_server # update our room details room = '#{}:{}'.format(result.group('room'), home_server) # Prepare our Create Payload payload = { 'room_alias_name': result.group('room'), # Set our channel name 'name': '#{} - {}'.format(result.group('room'), self.app_desc), # hide the room by default; let the user open it up if they wish # to others. 'visibility': 'private', 'preset': 'trusted_private_chat', } postokay, response = self._fetch('/createRoom', payload=payload) if not postokay: # Failed to create channel # Typical responses: # - {u'errcode': u'M_ROOM_IN_USE', # u'error': u'Room alias already taken'} # - {u'errcode': u'M_UNKNOWN', # u'error': u'Internal server error'} if (response and response.get('errcode') == 'M_ROOM_IN_USE'): return self._room_id(room) return None # Cache our entry for fast access later self._room_cache[response.get('room_alias')] = { 'id': response.get('room_id'), 'home_server': home_server, } return response.get('room_id') def _joined_rooms(self): """ Returns a list of the current rooms the logged in user is a part of. """ if not self.access_token: # No list is possible return list() postokay, response = self._fetch( '/joined_rooms', payload=None, method='GET') if not postokay: # Failed to retrieve listings return list() # Return our list of rooms return response.get('joined_rooms', list()) def _room_id(self, room): """Get room id from its alias. Args: room (str): The room alias name. Returns: returns the room id if it can, otherwise it returns None """ if not self.access_token: # We can't get a room id if we're not logged in return None if not isinstance(room, str): # Not a supported string return None # Build our room if we have to: result = IS_ROOM_ALIAS.match(room) if not result: # Illegally formed room return None # Our home_server home_server = result.group('home_server') \ if result.group('home_server') else self.home_server # update our room details room = '#{}:{}'.format(result.group('room'), home_server) # Make our request postokay, response = self._fetch( "/directory/room/{}".format( NotifyMatrix.quote(room)), payload=None, method='GET') if postokay: return response.get("room_id") return None def _fetch(self, path, payload=None, params=None, attachment=None, method='POST'): """ Wrapper to request.post() to manage it's response better and make the send() function cleaner and easier to maintain. This function returns True if the _post was successful and False if it wasn't. """ # Define our headers headers = { 'User-Agent': self.app_id, 'Content-Type': 'application/json', 'Accept': 'application/json', } if self.access_token is not None: headers["Authorization"] = 'Bearer %s' % self.access_token default_port = 443 if self.secure else 80 url = \ '{schema}://{hostname}{port}'.format( schema='https' if self.secure else 'http', hostname=self.host, port='' if self.port is None or self.port == default_port else f':{self.port}') if path == '/upload': # FUTURE if self.version == MatrixVersion.V3: # FUTURE url += MATRIX_V3_MEDIA_PATH + path # FUTURE else: # FUTURE url += MATRIX_V2_MEDIA_PATH + path url += MATRIX_V2_MEDIA_PATH + path params = {'filename': attachment.name} with open(attachment.path, 'rb') as fp: payload = fp.read() # Update our content type headers['Content-Type'] = attachment.mimetype else: if self.version == MatrixVersion.V3: url += MATRIX_V3_API_PATH + path else: url += MATRIX_V2_API_PATH + path # Our response object response = {} # fetch function fn = requests.post if method == 'POST' else ( requests.put if method == 'PUT' else requests.get) # Define how many attempts we'll make if we get caught in a throttle # event retries = self.default_retries if self.default_retries > 0 else 1 while retries > 0: # Decrement our throttle retry count retries -= 1 self.logger.debug('Matrix POST URL: %s (cert_verify=%r)' % ( url, self.verify_certificate, )) self.logger.debug('Matrix Payload: %s' % str(payload)) # Initialize our response object r = None try: r = fn( url, data=dumps(payload) if not attachment else payload, params=params, headers=headers, verify=self.verify_certificate, timeout=self.request_timeout, ) self.logger.debug( 'Matrix Response: code=%d, %s' % ( r.status_code, str(r.content))) response = loads(r.content) if r.status_code == 429: wait = self.default_wait_ms / 1000 try: wait = response['retry_after_ms'] / 1000 except KeyError: try: errordata = response['error'] wait = errordata['retry_after_ms'] / 1000 except KeyError: pass self.logger.warning( 'Matrix server requested we throttle back {}ms; ' 'retries left {}.'.format(wait, retries)) self.logger.debug( 'Response Details:\r\n{}'.format(r.content)) # Throttle for specified wait self.throttle(wait=wait) # Try again continue elif r.status_code != requests.codes.ok: # We had a problem status_str = \ NotifyMatrix.http_response_code_lookup( r.status_code, MATRIX_HTTP_ERROR_MAP) self.logger.warning( 'Failed to handshake with Matrix server: ' '{}{}error={}.'.format( status_str, ', ' if status_str else '', r.status_code)) self.logger.debug( 'Response Details:\r\n{}'.format(r.content)) # Return; we're done return (False, response) except (AttributeError, TypeError, ValueError): # This gets thrown if we can't parse our JSON Response # - ValueError = r.content is Unparsable # - TypeError = r.content is None # - AttributeError = r is None self.logger.warning('Invalid response from Matrix server.') self.logger.debug( 'Response Details:\r\n{}'.format(r.content)) return (False, {}) except requests.RequestException as e: self.logger.warning( 'A Connection error occurred while registering with Matrix' ' server.') self.logger.debug('Socket Exception: %s' % str(e)) # Return; we're done return (False, response) except (OSError, IOError) as e: self.logger.warning( 'An I/O error occurred while reading {}.'.format( attachment.name if attachment else 'unknown file')) self.logger.debug('I/O Exception: %s' % str(e)) return (False, {}) return (True, response) # If we get here, we ran out of retries return (False, {}) def __del__(self): """ Ensure we relinquish our token """ if self.mode == MatrixWebhookMode.T2BOT: # nothing to do return try: self._logout() except LookupError: # pragma: no cover # Python v3.5 call to requests can sometimes throw the exception # "/usr/lib64/python3.7/socket.py", line 748, in getaddrinfo # LookupError: unknown encoding: idna # # This occurs every time when running unit-tests against Apprise: # LANG=C.UTF-8 PYTHONPATH=$(pwd) py.test-3.7 # # There has been an open issue on this since Jan 2017. # - https://bugs.python.org/issue29288 # # A ~similar~ issue can be identified here in the requests # ticket system as unresolved and has provided workarounds # - https://github.com/kennethreitz/requests/issues/3578 pass except ImportError: # pragma: no cover # The actual exception is `ModuleNotFoundError` however ImportError # grants us backwards compatibility with versions of Python older # than v3.6 # Python code that makes early calls to sys.exit() can cause # the __del__() code to run. However, in some newer versions of # Python, this causes the `sys` library to no longer be # available. The stack overflow also goes on to suggest that # it's not wise to use the __del__() as a destructor # which is the case here. # https://stackoverflow.com/questions/67218341/\ # modulenotfounderror-import-of-time-halted-none-in-sys-\ # modules-occured-when-obj?noredirect=1&lq=1 # # # Also see: https://stackoverflow.com/questions\ # /1481488/what-is-the-del-method-and-how-do-i-call-it # At this time it seems clean to try to log out (if we can) # but not throw any unnecessary exceptions (like this one) to # the end user if we don't have to. pass def url(self, privacy=False, *args, **kwargs): """ Returns the URL built dynamically based on specified arguments. """ # Define any URL parameters params = { 'image': 'yes' if self.include_image else 'no', 'mode': self.mode, 'version': self.version, 'msgtype': self.msgtype, } # Extend our parameters params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) auth = '' if self.mode != MatrixWebhookMode.T2BOT: # Determine Authentication if self.user and self.password: auth = '{user}:{password}@'.format( user=NotifyMatrix.quote(self.user, safe=''), password=self.pprint( self.password, privacy, mode=PrivacyMode.Secret, safe=''), ) elif self.user: auth = '{user}@'.format( user=NotifyMatrix.quote(self.user, safe=''), ) default_port = 443 if self.secure else 80 return '{schema}://{auth}{hostname}{port}/{rooms}?{params}'.format( schema=self.secure_protocol if self.secure else self.protocol, auth=auth, hostname=NotifyMatrix.quote(self.host, safe='') if self.mode != MatrixWebhookMode.T2BOT else self.pprint(self.access_token, privacy, safe=''), port='' if self.port is None or self.port == default_port else ':{}'.format(self.port), rooms=NotifyMatrix.quote('/'.join(self.rooms)), params=NotifyMatrix.urlencode(params), ) def __len__(self): """ Returns the number of targets associated with this notification """ targets = len(self.rooms) return targets if targets > 0 else 1 @staticmethod def parse_url(url): """ Parses the URL and returns enough arguments that can allow us to re-instantiate this object. """ results = NotifyBase.parse_url(url, verify_host=False) if not results: # We're done early as we couldn't load the results return results if not results.get('host'): return None # Get our rooms results['targets'] = NotifyMatrix.split_path(results['fullpath']) # Support the 'to' variable so that we can support rooms this way too # The 'to' makes it easier to use yaml configuration if 'to' in results['qsd'] and len(results['qsd']['to']): results['targets'] += NotifyMatrix.parse_list(results['qsd']['to']) # Boolean to include an image or not results['include_image'] = parse_bool(results['qsd'].get( 'image', NotifyMatrix.template_args['image']['default'])) # Get our mode results['mode'] = results['qsd'].get('mode') # t2bot detection... look for just a hostname, and/or just a user/host # if we match this; we can go ahead and set the mode (but only if # it was otherwise not set) if results['mode'] is None \ and not results['password'] \ and not results['targets']: # Default mode to t2bot results['mode'] = MatrixWebhookMode.T2BOT if results['mode'] and \ results['mode'].lower() == MatrixWebhookMode.T2BOT: # unquote our hostname and pass it in as the password/token results['password'] = NotifyMatrix.unquote(results['host']) # Support the message type keyword if 'msgtype' in results['qsd'] and len(results['qsd']['msgtype']): results['msgtype'] = \ NotifyMatrix.unquote(results['qsd']['msgtype']) # Support the use of the token= keyword if 'token' in results['qsd'] and len(results['qsd']['token']): results['password'] = NotifyMatrix.unquote(results['qsd']['token']) # Support the use of the version= or v= keyword if 'version' in results['qsd'] and len(results['qsd']['version']): results['version'] = \ NotifyMatrix.unquote(results['qsd']['version']) elif 'v' in results['qsd'] and len(results['qsd']['v']): results['version'] = NotifyMatrix.unquote(results['qsd']['v']) return results @staticmethod def parse_native_url(url): """ Support https://webhooks.t2bot.io/api/v1/matrix/hook/WEBHOOK_TOKEN/ """ result = re.match( r'^https?://webhooks\.t2bot\.io/api/v[0-9]+/matrix/hook/' r'(?P[A-Z0-9_-]+)/?' r'(?P\?.+)?$', url, re.I) if result: mode = 'mode={}'.format(MatrixWebhookMode.T2BOT) return NotifyMatrix.parse_url( '{schema}://{webhook_token}/{params}'.format( schema=NotifyMatrix.secure_protocol, webhook_token=result.group('webhook_token'), params='?{}'.format(mode) if not result.group('params') else '{}&{}'.format(result.group('params'), mode))) return None