mirror of
https://github.com/SickGear/SickGear.git
synced 2025-01-11 20:43:38 +00:00
695 lines
22 KiB
Python
695 lines
22 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
#
|
||
|
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
|
||
|
# All rights reserved.
|
||
|
#
|
||
|
# This code is licensed under the MIT License.
|
||
|
#
|
||
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
|
# of this software and associated documentation files(the "Software"), to deal
|
||
|
# in the Software without restriction, including without limitation the rights
|
||
|
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
|
||
|
# copies of the Software, and to permit persons to whom the Software is
|
||
|
# furnished to do so, subject to the following conditions :
|
||
|
#
|
||
|
# The above copyright notice and this permission notice shall be included in
|
||
|
# all copies or substantial portions of the Software.
|
||
|
#
|
||
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
|
||
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||
|
# THE SOFTWARE.
|
||
|
|
||
|
import re
|
||
|
import six
|
||
|
import contextlib
|
||
|
import os
|
||
|
from os.path import expanduser
|
||
|
from functools import reduce
|
||
|
|
||
|
try:
|
||
|
# Python 2.7
|
||
|
from urllib import unquote
|
||
|
from urllib import quote
|
||
|
from urlparse import urlparse
|
||
|
|
||
|
except ImportError:
|
||
|
# Python 3.x
|
||
|
from urllib.parse import unquote
|
||
|
from urllib.parse import quote
|
||
|
from urllib.parse import urlparse
|
||
|
|
||
|
# URL Indexing Table for returns via parse_url()
|
||
|
# The below accepts and scans for:
|
||
|
# - schema://
|
||
|
# - schema://path
|
||
|
# - schema://path?kwargs
|
||
|
#
|
||
|
VALID_URL_RE = re.compile(
|
||
|
r'^[\s]*((?P<schema>[^:\s]+):[/\\]+)?((?P<path>[^?]+)'
|
||
|
r'(\?(?P<kwargs>.+))?)?[\s]*$',
|
||
|
)
|
||
|
VALID_QUERY_RE = re.compile(r'^(?P<path>.*[/\\])(?P<query>[^/\\]+)?$')
|
||
|
|
||
|
# delimiters used to separate values when content is passed in by string.
|
||
|
# This is useful when turning a string into a list
|
||
|
STRING_DELIMITERS = r'[\[\]\;,\s]+'
|
||
|
|
||
|
# Pre-Escape content since we reference it so much
|
||
|
ESCAPED_PATH_SEPARATOR = re.escape('\\/')
|
||
|
ESCAPED_WIN_PATH_SEPARATOR = re.escape('\\')
|
||
|
ESCAPED_NUX_PATH_SEPARATOR = re.escape('/')
|
||
|
|
||
|
TIDY_WIN_PATH_RE = re.compile(
|
||
|
r'(^[%s]{2}|[^%s\s][%s]|[\s][%s]{2}])([%s]+)' % (
|
||
|
ESCAPED_WIN_PATH_SEPARATOR,
|
||
|
ESCAPED_WIN_PATH_SEPARATOR,
|
||
|
ESCAPED_WIN_PATH_SEPARATOR,
|
||
|
ESCAPED_WIN_PATH_SEPARATOR,
|
||
|
ESCAPED_WIN_PATH_SEPARATOR,
|
||
|
),
|
||
|
)
|
||
|
TIDY_WIN_TRIM_RE = re.compile(
|
||
|
r'^(.+[^:][^%s])[\s%s]*$' % (
|
||
|
ESCAPED_WIN_PATH_SEPARATOR,
|
||
|
ESCAPED_WIN_PATH_SEPARATOR,
|
||
|
),
|
||
|
)
|
||
|
|
||
|
TIDY_NUX_PATH_RE = re.compile(
|
||
|
r'([%s])([%s]+)' % (
|
||
|
ESCAPED_NUX_PATH_SEPARATOR,
|
||
|
ESCAPED_NUX_PATH_SEPARATOR,
|
||
|
),
|
||
|
)
|
||
|
|
||
|
TIDY_NUX_TRIM_RE = re.compile(
|
||
|
r'([^%s])[\s%s]+$' % (
|
||
|
ESCAPED_NUX_PATH_SEPARATOR,
|
||
|
ESCAPED_NUX_PATH_SEPARATOR,
|
||
|
),
|
||
|
)
|
||
|
|
||
|
# The handling of custom arguments passed in the URL; we treat any
|
||
|
# argument (which would otherwise appear in the qsd area of our parse_url()
|
||
|
# function differently if they start with a + or - value
|
||
|
NOTIFY_CUSTOM_ADD_TOKENS = re.compile(r'^( |\+)(?P<key>.*)\s*')
|
||
|
NOTIFY_CUSTOM_DEL_TOKENS = re.compile(r'^-(?P<key>.*)\s*')
|
||
|
|
||
|
# Used for attempting to acquire the schema if the URL can't be parsed.
|
||
|
GET_SCHEMA_RE = re.compile(r'\s*(?P<schema>[a-z0-9]{2,9})://.*$', re.I)
|
||
|
|
||
|
# Regular expression based and expanded from:
|
||
|
# http://www.regular-expressions.info/email.html
|
||
|
GET_EMAIL_RE = re.compile(
|
||
|
r"(?P<fulluser>((?P<label>[^+]+)\+)?"
|
||
|
r"(?P<userid>[a-z0-9$%=_~-]+"
|
||
|
r"(?:\.[a-z0-9$%+=_~-]+)"
|
||
|
r"*))@(?P<domain>(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+"
|
||
|
r"[a-z0-9](?:[a-z0-9-]*"
|
||
|
r"[a-z0-9]))?",
|
||
|
re.IGNORECASE,
|
||
|
)
|
||
|
|
||
|
# Regular expression used to extract a phone number
|
||
|
GET_PHONE_NO_RE = re.compile(r'^\+?(?P<phone>[0-9\s)(+-]+)\s*$')
|
||
|
|
||
|
# Regular expression used to destinguish between multiple URLs
|
||
|
URL_DETECTION_RE = re.compile(
|
||
|
r'([a-z0-9]+?:\/\/.*?)[\s,]*(?=$|[a-z0-9]+?:\/\/)', re.I)
|
||
|
|
||
|
# validate_regex() utilizes this mapping to track and re-use pre-complied
|
||
|
# regular expressions
|
||
|
REGEX_VALIDATE_LOOKUP = {}
|
||
|
|
||
|
|
||
|
def is_hostname(hostname):
|
||
|
"""
|
||
|
Validate hostname
|
||
|
"""
|
||
|
if len(hostname) > 255 or len(hostname) == 0:
|
||
|
return False
|
||
|
|
||
|
if hostname[-1] == ".":
|
||
|
hostname = hostname[:-1]
|
||
|
|
||
|
allowed = re.compile(r'(?!-)[A-Z\d_-]{1,63}(?<!-)$', re.IGNORECASE)
|
||
|
return all(allowed.match(x) for x in hostname.split("."))
|
||
|
|
||
|
|
||
|
def is_email(address):
|
||
|
"""Determine if the specified entry is an email address
|
||
|
|
||
|
Args:
|
||
|
address (str): The string you want to check.
|
||
|
|
||
|
Returns:
|
||
|
bool: Returns True if the address specified is an email address
|
||
|
and False if it isn't.
|
||
|
"""
|
||
|
|
||
|
try:
|
||
|
return GET_EMAIL_RE.match(address) is not None
|
||
|
except TypeError:
|
||
|
# invalid syntax
|
||
|
return False
|
||
|
|
||
|
|
||
|
def tidy_path(path):
|
||
|
"""take a filename and or directory and attempts to tidy it up by removing
|
||
|
trailing slashes and correcting any formatting issues.
|
||
|
|
||
|
For example: ////absolute//path// becomes:
|
||
|
/absolute/path
|
||
|
|
||
|
"""
|
||
|
# Windows
|
||
|
path = TIDY_WIN_PATH_RE.sub('\\1', path.strip())
|
||
|
# Linux
|
||
|
path = TIDY_NUX_PATH_RE.sub('\\1', path.strip())
|
||
|
|
||
|
# Linux Based Trim
|
||
|
path = TIDY_NUX_TRIM_RE.sub('\\1', path.strip())
|
||
|
# Windows Based Trim
|
||
|
path = expanduser(TIDY_WIN_TRIM_RE.sub('\\1', path.strip()))
|
||
|
return path
|
||
|
|
||
|
|
||
|
def parse_qsd(qs):
|
||
|
"""
|
||
|
Query String Dictionary Builder
|
||
|
|
||
|
A custom implimentation of the parse_qsl() function already provided
|
||
|
by Python. This function is slightly more light weight and gives us
|
||
|
more control over parsing out arguments such as the plus/+ symbol
|
||
|
at the head of a key/value pair.
|
||
|
|
||
|
qs should be a query string part made up as part of the URL such as
|
||
|
a=1&c=2&d=
|
||
|
|
||
|
a=1 gets interpreted as { 'a': '1' }
|
||
|
a= gets interpreted as { 'a': '' }
|
||
|
a gets interpreted as { 'a': '' }
|
||
|
|
||
|
|
||
|
This function returns a result object that fits with the apprise
|
||
|
expected parameters (populating the 'qsd' portion of the dictionary
|
||
|
"""
|
||
|
|
||
|
# Our return result set:
|
||
|
result = {
|
||
|
# The arguments passed in (the parsed query). This is in a dictionary
|
||
|
# of {'key': 'val', etc }. Keys are all made lowercase before storing
|
||
|
# to simplify access to them.
|
||
|
'qsd': {},
|
||
|
|
||
|
# Detected Entries that start with + or - are additionally stored in
|
||
|
# these values (un-touched). The +/- however are stripped from their
|
||
|
# name before they are stored here.
|
||
|
'qsd+': {},
|
||
|
'qsd-': {},
|
||
|
}
|
||
|
|
||
|
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
|
||
|
for name_value in pairs:
|
||
|
nv = name_value.split('=', 1)
|
||
|
# Handle case of a control-name with no equal sign
|
||
|
if len(nv) != 2:
|
||
|
nv.append('')
|
||
|
|
||
|
# Apprise keys can start with a + symbol; so we need to skip over
|
||
|
# the very first entry
|
||
|
key = '{}{}'.format(
|
||
|
'' if len(nv[0]) == 0 else nv[0][0],
|
||
|
'' if len(nv[0]) <= 1 else nv[0][1:].replace('+', ' '),
|
||
|
)
|
||
|
|
||
|
key = unquote(key)
|
||
|
key = '' if not key else key
|
||
|
|
||
|
val = nv[1].replace('+', ' ')
|
||
|
val = unquote(val)
|
||
|
val = '' if not val else val.strip()
|
||
|
|
||
|
# Always Query String Dictionary (qsd) for every entry we have
|
||
|
# content is always made lowercase for easy indexing
|
||
|
result['qsd'][key.lower().strip()] = val
|
||
|
|
||
|
# Check for tokens that start with a addition/plus symbol (+)
|
||
|
k = NOTIFY_CUSTOM_ADD_TOKENS.match(key)
|
||
|
if k is not None:
|
||
|
# Store content 'as-is'
|
||
|
result['qsd+'][k.group('key')] = val
|
||
|
|
||
|
# Check for tokens that start with a subtraction/hyphen symbol (-)
|
||
|
k = NOTIFY_CUSTOM_DEL_TOKENS.match(key)
|
||
|
if k is not None:
|
||
|
# Store content 'as-is'
|
||
|
result['qsd-'][k.group('key')] = val
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
def parse_url(url, default_schema='http', verify_host=True):
|
||
|
"""A function that greatly simplifies the parsing of a url
|
||
|
specified by the end user.
|
||
|
|
||
|
Valid syntaxes are:
|
||
|
<schema>://<user>@<host>:<port>/<path>
|
||
|
<schema>://<user>:<passwd>@<host>:<port>/<path>
|
||
|
<schema>://<host>:<port>/<path>
|
||
|
<schema>://<host>/<path>
|
||
|
<schema>://<host>
|
||
|
<host>
|
||
|
|
||
|
Argument parsing is also supported:
|
||
|
<schema>://<user>@<host>:<port>/<path>?key1=val&key2=val2
|
||
|
<schema>://<user>:<passwd>@<host>:<port>/<path>?key1=val&key2=val2
|
||
|
<schema>://<host>:<port>/<path>?key1=val&key2=val2
|
||
|
<schema>://<host>/<path>?key1=val&key2=val2
|
||
|
<schema>://<host>?key1=val&key2=val2
|
||
|
|
||
|
The function returns a simple dictionary with all of
|
||
|
the parsed content within it and returns 'None' if the
|
||
|
content could not be extracted.
|
||
|
"""
|
||
|
|
||
|
if not isinstance(url, six.string_types):
|
||
|
# Simple error checking
|
||
|
return None
|
||
|
|
||
|
# Default Results
|
||
|
result = {
|
||
|
# The username (if specified)
|
||
|
'user': None,
|
||
|
# The password (if specified)
|
||
|
'password': None,
|
||
|
# The port (if specified)
|
||
|
'port': None,
|
||
|
# The hostname
|
||
|
'host': '',
|
||
|
# The full path (query + path)
|
||
|
'fullpath': None,
|
||
|
# The path
|
||
|
'path': None,
|
||
|
# The query
|
||
|
'query': None,
|
||
|
# The schema
|
||
|
'schema': None,
|
||
|
# The schema
|
||
|
'url': None,
|
||
|
# The arguments passed in (the parsed query). This is in a dictionary
|
||
|
# of {'key': 'val', etc }. Keys are all made lowercase before storing
|
||
|
# to simplify access to them.
|
||
|
# qsd = Query String Dictionary
|
||
|
'qsd': {},
|
||
|
|
||
|
# Detected Entries that start with + or - are additionally stored in
|
||
|
# these values (un-touched). The +/- however are stripped from their
|
||
|
# name before they are stored here.
|
||
|
'qsd+': {},
|
||
|
'qsd-': {},
|
||
|
}
|
||
|
|
||
|
qsdata = ''
|
||
|
match = VALID_URL_RE.search(url)
|
||
|
if match:
|
||
|
# Extract basic results (with schema present)
|
||
|
result['schema'] = match.group('schema').lower().strip() \
|
||
|
if match.group('schema') else default_schema
|
||
|
host = match.group('path').strip() \
|
||
|
if match.group('path') else ''
|
||
|
qsdata = match.group('kwargs').strip() \
|
||
|
if match.group('kwargs') else None
|
||
|
|
||
|
else:
|
||
|
# Could not extract basic content from the URL
|
||
|
return None
|
||
|
|
||
|
# Parse Query Arugments ?val=key&key=val
|
||
|
# while ensuring that all keys are lowercase
|
||
|
if qsdata:
|
||
|
result.update(parse_qsd(qsdata))
|
||
|
|
||
|
# Now do a proper extraction of data; http:// is just substitued in place
|
||
|
# to allow urlparse() to function as expected, we'll swap this back to the
|
||
|
# expected schema after.
|
||
|
parsed = urlparse('http://%s' % host)
|
||
|
|
||
|
# Parse results
|
||
|
result['host'] = parsed[1].strip()
|
||
|
result['fullpath'] = quote(unquote(tidy_path(parsed[2].strip())))
|
||
|
|
||
|
try:
|
||
|
# Handle trailing slashes removed by tidy_path
|
||
|
if result['fullpath'][-1] not in ('/', '\\') and \
|
||
|
url[-1] in ('/', '\\'):
|
||
|
result['fullpath'] += url.strip()[-1]
|
||
|
|
||
|
except IndexError:
|
||
|
# No problem, there simply isn't any returned results
|
||
|
# and therefore, no trailing slash
|
||
|
pass
|
||
|
|
||
|
if not result['fullpath']:
|
||
|
# Default
|
||
|
result['fullpath'] = None
|
||
|
|
||
|
else:
|
||
|
# Using full path, extract query from path
|
||
|
match = VALID_QUERY_RE.search(result['fullpath'])
|
||
|
result['path'] = match.group('path')
|
||
|
result['query'] = match.group('query')
|
||
|
if not result['query']:
|
||
|
result['query'] = None
|
||
|
try:
|
||
|
(result['user'], result['host']) = \
|
||
|
re.split(r'[@]+', result['host'])[:2]
|
||
|
|
||
|
except ValueError:
|
||
|
# no problem then, host only exists
|
||
|
# and it's already assigned
|
||
|
pass
|
||
|
|
||
|
if result['user'] is not None:
|
||
|
try:
|
||
|
(result['user'], result['password']) = \
|
||
|
re.split(r'[:]+', result['user'])[:2]
|
||
|
|
||
|
except ValueError:
|
||
|
# no problem then, user only exists
|
||
|
# and it's already assigned
|
||
|
pass
|
||
|
|
||
|
try:
|
||
|
(result['host'], result['port']) = \
|
||
|
re.split(r'[:]+', result['host'])[:2]
|
||
|
|
||
|
except ValueError:
|
||
|
# no problem then, user only exists
|
||
|
# and it's already assigned
|
||
|
pass
|
||
|
|
||
|
if result['port']:
|
||
|
try:
|
||
|
result['port'] = int(result['port'])
|
||
|
|
||
|
except (ValueError, TypeError):
|
||
|
# Invalid Port Specified
|
||
|
return None
|
||
|
|
||
|
if result['port'] == 0:
|
||
|
result['port'] = None
|
||
|
|
||
|
if verify_host and not is_hostname(result['host']):
|
||
|
# Nothing more we can do without a hostname
|
||
|
return None
|
||
|
|
||
|
# Re-assemble cleaned up version of the url
|
||
|
result['url'] = '%s://' % result['schema']
|
||
|
if isinstance(result['user'], six.string_types):
|
||
|
result['url'] += result['user']
|
||
|
|
||
|
if isinstance(result['password'], six.string_types):
|
||
|
result['url'] += ':%s@' % result['password']
|
||
|
|
||
|
else:
|
||
|
result['url'] += '@'
|
||
|
result['url'] += result['host']
|
||
|
|
||
|
if result['port']:
|
||
|
result['url'] += ':%d' % result['port']
|
||
|
|
||
|
if result['fullpath']:
|
||
|
result['url'] += result['fullpath']
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
def parse_bool(arg, default=False):
|
||
|
"""
|
||
|
NZBGet uses 'yes' and 'no' as well as other strings such as 'on' or
|
||
|
'off' etch to handle boolean operations from it's control interface.
|
||
|
|
||
|
This method can just simplify checks to these variables.
|
||
|
|
||
|
If the content could not be parsed, then the default is returned.
|
||
|
"""
|
||
|
|
||
|
if isinstance(arg, six.string_types):
|
||
|
# no = no - False
|
||
|
# of = short for off - False
|
||
|
# 0 = int for False
|
||
|
# fa = short for False - False
|
||
|
# f = short for False - False
|
||
|
# n = short for No or Never - False
|
||
|
# ne = short for Never - False
|
||
|
# di = short for Disable(d) - False
|
||
|
# de = short for Deny - False
|
||
|
if arg.lower()[0:2] in (
|
||
|
'de', 'di', 'ne', 'f', 'n', 'no', 'of', '0', 'fa'):
|
||
|
return False
|
||
|
# ye = yes - True
|
||
|
# on = short for off - True
|
||
|
# 1 = int for True
|
||
|
# tr = short for True - True
|
||
|
# t = short for True - True
|
||
|
# al = short for Always (and Allow) - True
|
||
|
# en = short for Enable(d) - True
|
||
|
elif arg.lower()[0:2] in (
|
||
|
'en', 'al', 't', 'y', 'ye', 'on', '1', 'tr'):
|
||
|
return True
|
||
|
# otherwise
|
||
|
return default
|
||
|
|
||
|
# Handle other types
|
||
|
return bool(arg)
|
||
|
|
||
|
|
||
|
def split_urls(urls):
|
||
|
"""
|
||
|
Takes a string containing URLs separated by comma's and/or spaces and
|
||
|
returns a list.
|
||
|
"""
|
||
|
|
||
|
try:
|
||
|
results = URL_DETECTION_RE.findall(urls)
|
||
|
|
||
|
except TypeError:
|
||
|
results = []
|
||
|
|
||
|
if len(results) > 0 and results[len(results) - 1][-1] != urls[-1]:
|
||
|
# we always want to save the end of url URL if we can; This handles
|
||
|
# cases where there is actually a comma (,) at the end of a single URL
|
||
|
# that would have otherwise got lost when our regex passed over it.
|
||
|
results[len(results) - 1] += \
|
||
|
re.match(r'.*?([\s,]+)?$', urls).group(1).rstrip()
|
||
|
|
||
|
return results
|
||
|
|
||
|
|
||
|
def parse_list(*args):
|
||
|
"""
|
||
|
Take a string list and break it into a delimited
|
||
|
list of arguments. This funciton also supports
|
||
|
the processing of a list of delmited strings and will
|
||
|
always return a unique set of arguments. Duplicates are
|
||
|
always combined in the final results.
|
||
|
|
||
|
You can append as many items to the argument listing for
|
||
|
parsing.
|
||
|
|
||
|
Hence: parse_list('.mkv, .iso, .avi') becomes:
|
||
|
['.mkv', '.iso', '.avi']
|
||
|
|
||
|
Hence: parse_list('.mkv, .iso, .avi', ['.avi', '.mp4']) becomes:
|
||
|
['.mkv', '.iso', '.avi', '.mp4']
|
||
|
|
||
|
The parsing is very forgiving and accepts spaces, slashes, commas
|
||
|
semicolons, and pipes as delimiters
|
||
|
"""
|
||
|
|
||
|
result = []
|
||
|
for arg in args:
|
||
|
if isinstance(arg, six.string_types):
|
||
|
result += re.split(STRING_DELIMITERS, arg)
|
||
|
|
||
|
elif isinstance(arg, (set, list, tuple)):
|
||
|
result += parse_list(*arg)
|
||
|
|
||
|
#
|
||
|
# filter() eliminates any empty entries
|
||
|
#
|
||
|
# Since Python v3 returns a filter (iterator) where-as Python v2 returned
|
||
|
# a list, we need to change it into a list object to remain compatible with
|
||
|
# both distribution types.
|
||
|
return sorted([x for x in filter(bool, list(set(result)))])
|
||
|
|
||
|
|
||
|
def is_exclusive_match(logic, data, match_all='all'):
|
||
|
"""
|
||
|
|
||
|
The data variable should always be a set of strings that the logic can be
|
||
|
compared against. It should be a set. If it isn't already, then it will
|
||
|
be converted as such. These identify the tags themselves.
|
||
|
|
||
|
Our logic should be a list as well:
|
||
|
- top level entries are treated as an 'or'
|
||
|
- second level (or more) entries are treated as 'and'
|
||
|
|
||
|
examples:
|
||
|
logic="tagA, tagB" = tagA or tagB
|
||
|
logic=['tagA', 'tagB'] = tagA or tagB
|
||
|
logic=[('tagA', 'tagC'), 'tagB'] = (tagA and tagC) or tagB
|
||
|
logic=[('tagB', 'tagC')] = tagB and tagC
|
||
|
"""
|
||
|
|
||
|
if isinstance(logic, six.string_types):
|
||
|
# Update our logic to support our delimiters
|
||
|
logic = set(parse_list(logic))
|
||
|
|
||
|
if not logic:
|
||
|
# If there is no logic to apply then we're done early; we only match
|
||
|
# if there is also no data to match against
|
||
|
return not data
|
||
|
|
||
|
if not isinstance(logic, (list, tuple, set)):
|
||
|
# garbage input
|
||
|
return False
|
||
|
|
||
|
# Track what we match against; but by default we do not match
|
||
|
# against anything
|
||
|
matched = False
|
||
|
|
||
|
# Every entry here will be or'ed with the next
|
||
|
for entry in logic:
|
||
|
if not isinstance(entry, (six.string_types, list, tuple, set)):
|
||
|
# Garbage entry in our logic found
|
||
|
return False
|
||
|
|
||
|
# treat these entries as though all elements found
|
||
|
# must exist in the notification service
|
||
|
entries = set(parse_list(entry))
|
||
|
if not entries:
|
||
|
# We got a bogus set of tags to parse
|
||
|
# If there is no logic to apply then we're done early; we only
|
||
|
# match if there is also no data to match against
|
||
|
return not data
|
||
|
|
||
|
if len(entries.intersection(data.union({match_all}))) == len(entries):
|
||
|
# our set contains all of the entries found
|
||
|
# in our notification data set
|
||
|
matched = True
|
||
|
break
|
||
|
|
||
|
# else: keep looking
|
||
|
|
||
|
# Return True if we matched against our logic (or simply none was
|
||
|
# specified).
|
||
|
return matched
|
||
|
|
||
|
|
||
|
def validate_regex(value, regex=r'[^\s]+', flags=re.I, strip=True, fmt=None):
|
||
|
"""
|
||
|
A lot of the tokens, secrets, api keys, etc all have some regular
|
||
|
expression validation they support. This hashes the regex after it's
|
||
|
compiled and returns it's content if matched, otherwise it returns None.
|
||
|
|
||
|
This function greatly increases performance as it prevents apprise modules
|
||
|
from having to pre-compile all of their regular expressions.
|
||
|
|
||
|
value is the element being tested
|
||
|
regex is the regular expression to be compiled and tested. By default
|
||
|
we extract the first chunk of code while eliminating surrounding
|
||
|
whitespace (if present)
|
||
|
|
||
|
flags is the regular expression flags that should be applied
|
||
|
format is used to alter the response format if the regular
|
||
|
expression matches. You identify your format using {tags}.
|
||
|
Effectively nesting your ID's between {}. Consider a regex of:
|
||
|
'(?P<year>[0-9]{2})[0-9]+(?P<value>[A-Z])'
|
||
|
to which you could set your format up as '{value}-{year}'. This
|
||
|
would substitute the matched groups and format a response.
|
||
|
"""
|
||
|
|
||
|
if flags:
|
||
|
# Regex String -> Flag Lookup Map
|
||
|
_map = {
|
||
|
# Ignore Case
|
||
|
'i': re.I,
|
||
|
# Multi Line
|
||
|
'm': re.M,
|
||
|
# Dot Matches All
|
||
|
's': re.S,
|
||
|
# Locale Dependant
|
||
|
'L': re.L,
|
||
|
# Unicode Matching
|
||
|
'u': re.U,
|
||
|
# Verbose
|
||
|
'x': re.X,
|
||
|
}
|
||
|
|
||
|
if isinstance(flags, six.string_types):
|
||
|
# Convert a string of regular expression flags into their
|
||
|
# respected integer (expected) Python values and perform
|
||
|
# a bit-wise or on each match found:
|
||
|
flags = reduce(
|
||
|
lambda x, y: x | y,
|
||
|
[0] + [_map[f] for f in flags if f in _map])
|
||
|
|
||
|
else:
|
||
|
# Handles None/False/'' cases
|
||
|
flags = 0
|
||
|
|
||
|
# A key is used to store our compiled regular expression
|
||
|
key = '{}{}'.format(regex, flags)
|
||
|
|
||
|
if key not in REGEX_VALIDATE_LOOKUP:
|
||
|
REGEX_VALIDATE_LOOKUP[key] = re.compile(regex, flags)
|
||
|
|
||
|
# Perform our lookup usig our pre-compiled result
|
||
|
try:
|
||
|
result = REGEX_VALIDATE_LOOKUP[key].match(value)
|
||
|
if not result:
|
||
|
# let outer exception handle this
|
||
|
raise TypeError
|
||
|
|
||
|
if fmt:
|
||
|
# Map our format back to our response
|
||
|
value = fmt.format(**result.groupdict())
|
||
|
|
||
|
except (TypeError, AttributeError):
|
||
|
return None
|
||
|
|
||
|
# Return our response
|
||
|
return value.strip() if strip else value
|
||
|
|
||
|
|
||
|
@contextlib.contextmanager
|
||
|
def environ(*remove, **update):
|
||
|
"""
|
||
|
Temporarily updates the ``os.environ`` dictionary in-place.
|
||
|
|
||
|
The ``os.environ`` dictionary is updated in-place so that the modification
|
||
|
is sure to work in all situations.
|
||
|
|
||
|
:param remove: Environment variable(s) to remove.
|
||
|
:param update: Dictionary of environment variables and values to
|
||
|
add/update.
|
||
|
"""
|
||
|
|
||
|
# Create a backup of our environment for restoration purposes
|
||
|
env_orig = os.environ.copy()
|
||
|
|
||
|
try:
|
||
|
os.environ.update(update)
|
||
|
[os.environ.pop(k, None) for k in remove]
|
||
|
yield
|
||
|
|
||
|
finally:
|
||
|
# Restore our snapshot
|
||
|
os.environ = env_orig.copy()
|