# coding=utf-8
#
# This file is part of SickGear.
#
# SickGear is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SickGear is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SickGear. If not, see .
from itertools import cycle
import datetime
import hashlib
import os
import re
import shutil
import socket
import time
import uuid
import sys
from pathlib import Path
import sickgear
from . import db, logger, notifiers
from .common import cpu_presets, mediaExtensions, Overview, Quality, statusStrings, subtitleExtensions, \
ARCHIVED, DOWNLOADED, FAILED, IGNORED, SKIPPED, SNATCHED_ANY, SUBTITLED, UNAIRED, UNKNOWN, WANTED
from .sgdatetime import SGDatetime
from lib.tvinfo_base.exceptions import *
from exceptions_helper import ex, MultipleShowObjectsException
import dateutil.parser
import requests
import requests.exceptions
import subliminal
from lxml_etree import etree, is_lxml
from base64 import decodebytes as b64decodebytes, encodebytes as b64encodebytes
from _23 import decode_bytes, decode_str, scandir
from six import iteritems, string_types, text_type
# noinspection PyUnresolvedReferences
from six.moves import zip
# the following are imported from elsewhere,
# therefore, they intentionally don't resolve and are unused in this particular file.
# noinspection PyUnresolvedReferences
from sg_helpers import chmod_as_parent, clean_data, copy_file, download_file, fix_set_group_id, get_system_temp_dir, \
get_url, indent_xml, make_path, maybe_plural, md5_for_text, move_file, proxy_setting, remove_file, \
remove_file_perm, replace_extension, sanitize_filename, scantree, touch_file, try_int, try_ord, write_file
# deprecated item, remove in 2020, kept here as rollback uses it
copyFile = copy_file
moveFile = move_file
tryInt = try_int # one legacy custom provider is keeping this signature here
# noinspection PyUnreachableCode
if False:
# noinspection PyUnresolvedReferences
from typing import Any, AnyStr, Dict, Generator, NoReturn, Iterable, Iterator, List, Optional, Set, Tuple, Union
from .tv import TVShow
# the following workaround hack resolves a pyc resolution bug
from .name_cache import retrieve_name_from_cache
from six import integer_types
RE_XML_ENCODING = re.compile(r'^(<\?xml[^>]+)\s+(encoding\s*=\s*[\"\'][^\"\']*[\"\'])(\s*\?>|)', re.U)
RE_IMDB_ID = r'tt\d{7,10}'
RC_IMDB_ID = re.compile(r'(?i)(%s)' % RE_IMDB_ID)
def remove_extension(name):
"""
Remove download or media extension from name (if any)
:param name: filename
:type name: AnyStr
:return: name without extension
:rtype: AnyStr
"""
if name and "." in name:
base_name, sep, extension = name.rpartition('.')
if base_name and extension.lower() in ['nzb', 'torrent'] + mediaExtensions:
name = base_name
return name
def is_parsable_date(text):
try:
result = isinstance(dateutil.parser.parse(text), datetime.datetime)
except (BaseException, Exception):
result = False
return result
def remove_non_release_groups(name, is_anime=False):
"""
Remove non release groups from name
:param name: release name
:type name: AnyStr
:param is_anime: is anmie
:type is_anime: bool
:return:
:rtype: AnyStr
"""
if name:
rc = [re.compile(r'(?i)' + v) for v in [
r'([\s\.\-_\[\{\(]*(no-rar|nzbgeek|ripsalot|siklopentan)[\s\.\-_\]\}\)]*)$',
r'([\s\.\-_\[\{\(]rp[\s\.\-_\]\}\)]*)$',
r'(?<=\w)([\s\.\-_]*[\[\{\(][\s\.\-_]*(www\.\w+.\w+)[\s\.\-_]*[\]\}\)][\s\.\-_]*)$',
r'(?<=\w)([\s\.\-_]*[\[\{\(]\s*(rar(bg|tv)|((e[tz]|v)tv))[\s\.\-_]*[\]\}\)][\s\.\-_]*)$'] +
([r'(?<=\w)([\s\.\-_]*[\[\{\(][\s\.\-_]*[\w\s\.\-\_]+[\s\.\-_]*[\]\}\)][\s\.\-_]*)$',
r'^([\s\.\-_]*[\[\{\(][\s\.\-_]*[\w\s\.\-\_]+[\s\.\-_]*[\]\}\)][\s\.\-_]*)(?=\w)'], [])[is_anime]]
rename = name = remove_extension(name)
while rename:
for regex in rc:
result = regex.findall(name)
if result:
for cur_match in isinstance(result[0], tuple) and result[0] or result:
if not is_parsable_date(cur_match.strip(' ()')):
name = regex.sub('', name)
rename = (name, False)[name == rename]
return name
def is_sync_file(filename):
"""
:param filename: filename
:type filename: AnyStr
:return:
:rtype: bool
"""
extension = filename.rpartition(".")[2].lower()
return '!sync' == extension or 'lftp-pget-status' == extension
def has_media_ext(filename):
"""
checks if file has media extension
:param filename: filename
:type filename: AnyStr
:return:
:rtype: bool
"""
# ignore samples
if re.search(r'(^|[\W_])(sample\d*)[\W_]', filename, re.I) \
or filename.startswith('._'): # and MAC OS's 'resource fork' files
return False
sep_file = filename.rpartition('.')
return (None is re.search('extras?$', sep_file[0], re.I)) and (sep_file[2].lower() in mediaExtensions)
def has_image_ext(filename):
"""
checks if file has image extension
:param filename: filename
:type filename: AnyStr
:return:
:rtype: bool
"""
try:
if os.path.splitext(filename)[1].lower() in ['.avif', '.bmp', '.gif', '.jpeg', '.jpg', '.png', '.webp']:
return True
except (BaseException, Exception):
pass
return False
def is_sickgear_dir(path):
# type: (str) -> bool
"""
validate that a path is a sickgear subpath
:param path: path to check
"""
path = Path(os.path.realpath(os.path.abspath(path)))
sickgear_path = Path(sickgear.PROG_DIR)
sickgear_data_path = Path(sickgear.DATA_DIR)
if sickgear_data_path in path.parents or sickgear_path in path.parents:
return True
return False
def is_first_rar_volume(filename):
"""
checks if file is part of rar set
:param filename: filename
:type filename: AnyStr
:return:
:rtype: bool
"""
return None is not re.search(r'(?P^(?P(?:(?!\.part\d+\.rar$).)*)\.(?:(?:part0*1\.)?rar)$)', filename)
def find_show_by_id(
show_id, # type: Union[AnyStr, Dict[int, int], int]
show_list=None, # type: Optional[List[TVShow]]
no_mapped_ids=True, # type: bool
check_multishow=False, # type: bool
no_exceptions=False # type: bool
):
# type: (...) -> Optional[TVShow]
"""
:param show_id: {indexer: id} or 'tvid_prodid'.
:param show_list: (optional) TVShow objects list
:param no_mapped_ids: don't check mapped ids
:param check_multishow: check for multiple show matches
:param no_exceptions: suppress the MultipleShowObjectsException and return None instead
"""
results = []
if None is show_list:
show_list = sickgear.showList
if show_id and show_list:
tvid_prodid_obj = None if not isinstance(show_id, string_types) else sickgear.tv.TVidProdid(show_id)
if tvid_prodid_obj and no_mapped_ids:
if None is tvid_prodid_obj.prodid:
return
return sickgear.showDict.get(int(tvid_prodid_obj))
else:
if tvid_prodid_obj:
if None is tvid_prodid_obj.prodid:
return None
show_id = tvid_prodid_obj.dict
if isinstance(show_id, dict):
if no_mapped_ids:
sid_int_list = [sickgear.tv.TVShow.create_sid(k, v) for k, v in iteritems(show_id) if k and v and
0 < v and sickgear.tv.tvid_bitmask >= k]
if not check_multishow:
return next((sickgear.showDict.get(_show_sid_id) for _show_sid_id in sid_int_list
if sickgear.showDict.get(_show_sid_id)), None)
results = [sickgear.showDict.get(_show_sid_id) for _show_sid_id in sid_int_list
if sickgear.showDict.get(_show_sid_id)]
else:
results = [_show_obj for k, v in iteritems(show_id) if k and v and 0 < v
for _show_obj in show_list if v == _show_obj.internal_ids.get(k, {'id': 0})['id']]
num_shows = len(set(results))
if 1 == num_shows:
return results[0]
if 1 < num_shows and not no_exceptions:
raise MultipleShowObjectsException()
def make_dir(path):
"""
create given path recursively
:param path: path
:type path: AnyStr
:return: success of creation
:rtype: bool
"""
if not os.path.isdir(path):
try:
os.makedirs(path)
# do a Synology library update
notifiers.NotifierFactory().get('SYNOINDEX').addFolder(path)
except OSError:
return False
return True
def search_infosrc_for_show_id(reg_show_name, tvid=None, prodid=None, ui=None):
"""
search info source for show
:param reg_show_name: show name to search
:type reg_show_name: AnyStr
:param tvid: tvid
:type tvid: int or None
:param prodid: prodid
:type prodid: int or long or None
:param ui: ui class
:type ui: object
:return: seriesname, tvid, prodid or None, None, None
:rtype: Tuple[None, None, None] or Tuple[AnyStr, int, int or long]
"""
show_names = [re.sub('[. -]', ' ', reg_show_name)]
# Query Indexers for each search term and build the list of results
for cur_tvid in (sickgear.TVInfoAPI().sources if not tvid else [int(tvid)]) or []:
# Query Indexers for each search term and build the list of results
tvinfo_config = sickgear.TVInfoAPI(cur_tvid).api_params.copy()
if ui is not None:
tvinfo_config['custom_ui'] = ui
t = sickgear.TVInfoAPI(cur_tvid).setup(**tvinfo_config)
for cur_name in show_names:
logger.debug('Trying to find %s on %s' % (cur_name, sickgear.TVInfoAPI(cur_tvid).name))
try:
if prodid:
show_info_list = t.get_show(prodid)
else:
show_info_list = t.search_show(cur_name)
show_info_list = show_info_list if isinstance(show_info_list, list) else [show_info_list]
except (BaseException, Exception):
continue
seriesname = _prodid = None
for cur_show_info in show_info_list: # type: dict
try:
seriesname = cur_show_info['seriesname']
_prodid = cur_show_info['id']
except (BaseException, Exception):
_prodid = seriesname = None
continue
if seriesname and _prodid:
break
if not (seriesname and _prodid):
continue
if None is prodid and str(cur_name).lower() == str(seriesname).lower():
return seriesname, cur_tvid, int(_prodid)
elif None is not prodid and int(prodid) == int(_prodid):
return seriesname, cur_tvid, int(prodid)
if tvid:
break
return None, None, None
def sizeof_fmt(number, digits=1, sep=' '):
# type: (int, int, AnyStr) -> AnyStr
"""
format given bytes to human-readable text
:param number: value to convert
:param digits: number of digits after decimal point
:param sep: seperater of value and dimension
:return: human-readable formatted text
"""
for cur_dimension in ['bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
if 1024.0 > number:
return f'{number:3.{digits}f}{sep}{cur_dimension}'
number /= 1024.0
def list_media_files(path):
# type: (AnyStr) -> List[AnyStr]
"""
list all media files in given path
:param path: path
:return: list of media files
"""
result = []
if path:
if [direntry for direntry in scantree(path, include=[r'\.sickgearignore'], filter_kind=False, recurse=False)]:
logger.debug('Skipping folder "%s" because it contains ".sickgearignore"' % path)
else:
result = [direntry.path for direntry in scantree(path, exclude_dirs=[
'^Extras$',
'^Behind The Scenes$', '^Deleted Scenes$', '^Featurettes$',
'^Interviews$', '^Scenes$', '^Shorts$', '^Trailers$', '^Other$'
], filter_kind=False, exclude_folders_with_files=['.sickgearignore'])
if has_media_ext(direntry.name)]
return result
def link(src_file, dest_file):
"""
:param src_file: source file
:type src_file: AnyStr
:param dest_file: destination file
:type dest_file: AnyStr
"""
if 'nt' == os.name:
import ctypes
if 0 == ctypes.windll.kernel32.CreateHardLinkW(text_type(dest_file), text_type(src_file), 0):
raise ctypes.WinError()
else:
os.link(src_file, dest_file)
def hardlink_file(src_file, dest_file):
"""
:param src_file: source file
:type src_file: AnyStr
:param dest_file: destination file
:type dest_file: AnyStr
"""
try:
link(src_file, dest_file)
fix_set_group_id(dest_file)
except (BaseException, Exception) as e:
logger.error(f'Failed to create hardlink of {src_file} at {dest_file}: {ex(e)}. Copying instead.')
copy_file(src_file, dest_file)
def symlink(src_file, dest_file):
"""
:param src_file: source file
:type src_file: AnyStr
:param dest_file: destination
:type dest_file: AnyStr
"""
if 'nt' == os.name:
import ctypes
if ctypes.windll.kernel32.CreateSymbolicLinkW(
text_type(dest_file), text_type(src_file), 1 if os.path.isdir(src_file) else 0) in [0, 1280]:
raise ctypes.WinError()
else:
os.symlink(src_file, dest_file)
def move_and_symlink_file(src_file, dest_file):
"""
:param src_file: source file
:type src_file: AnyStr
:param dest_file: destination file
:type dest_file: AnyStr
"""
try:
shutil.move(src_file, dest_file)
fix_set_group_id(dest_file)
symlink(dest_file, src_file)
except (BaseException, Exception):
logger.error(f'Failed to create symlink of {src_file} at {dest_file}. Copying instead')
copy_file(src_file, dest_file)
def rename_ep_file(cur_path, new_path, old_path_length=0, use_rename=False):
"""
Creates all folders needed to move a file to its new location, renames it, then cleans up any folders
left that are now empty.
:param cur_path: The absolute path to the file you want to move/rename
:type cur_path: AnyStr
:param new_path: The absolute path to the destination for the file WITHOUT THE EXTENSION
:type new_path: AnyStr
:param old_path_length: The length of media file path (old name) WITHOUT THE EXTENSION
:type old_path_length: int or long
:param use_rename: use rename instead of shutil.move
:return: success
:rtype: bool
"""
# new_dest_dir, new_dest_name = os.path.split(new_path)
if 0 == old_path_length or len(cur_path) < old_path_length:
# approach from the right
cur_file_name, cur_file_ext = os.path.splitext(cur_path)
else:
# approach from the left
cur_file_ext = cur_path[old_path_length:]
cur_file_name = cur_path[:old_path_length]
if cur_file_ext[1:] in subtitleExtensions:
# Extract subtitle language from filename
sublang = os.path.splitext(cur_file_name)[1][1:]
# Check if the language extracted from filename is a valid language
try:
_ = subliminal.language.Language(sublang, strict=True)
cur_file_ext = '.' + sublang + cur_file_ext
except ValueError:
pass
# put the extension on the incoming file
new_path += cur_file_ext
make_path(os.path.dirname(new_path), syno=True)
# move the file
try:
logger.log(f'Renaming file from {cur_path} to {new_path}')
if use_rename:
os.rename(cur_path, new_path)
else:
shutil.move(cur_path, new_path)
except (OSError, IOError, IsADirectoryError, NotADirectoryError, FileExistsError) as e:
logger.error(f'Failed renaming {cur_path} to {new_path}: {ex(e)}')
return False
# clean up any old folders that are empty
delete_empty_folders(os.path.dirname(cur_path))
return True
def delete_empty_folders(check_empty_dir, keep_dir=None):
"""
Walks backwards up the path and deletes any empty folders found.
:param check_empty_dir: The path to clean (absolute path to a folder)
:type check_empty_dir: AnyStr
:param keep_dir: Clean until this path is reached
:type keep_dir: bool
"""
# treat check_empty_dir as empty when it only contains these items
ignore_items = []
logger.log(f'Trying to clean any empty folders under {check_empty_dir}')
# as long as the folder exists and doesn't contain any files, delete it
while os.path.isdir(check_empty_dir) and check_empty_dir != keep_dir:
check_files = os.listdir(check_empty_dir)
if not check_files or (len(check_files) <= len(ignore_items) and all(
[check_file in ignore_items for check_file in check_files])):
# directory is empty or contains only ignore_items
try:
logger.log(f"Deleting empty folder: {check_empty_dir}")
# need shutil.rmtree when ignore_items is really implemented
os.rmdir(check_empty_dir)
# do a Synology library update
notifiers.NotifierFactory().get('SYNOINDEX').deleteFolder(check_empty_dir)
except OSError as e:
logger.warning(f'Unable to delete {check_empty_dir}: {repr(e)} / {ex(e)}')
break
check_empty_dir = os.path.dirname(check_empty_dir)
else:
break
def get_absolute_number_from_season_and_episode(show_obj, season, episode):
"""
:param show_obj: show object
:type show_obj: TVShow
:param season: season number
:type season: int
:param episode: episode number
:type episode: int
:return: absolute number
:type: int or long
"""
absolute_number = None
if season and episode:
my_db = db.DBConnection()
sql_result = my_db.select('SELECT *'
' FROM tv_episodes'
' WHERE indexer = ? AND showid = ? AND season = ? AND episode = ?',
[show_obj.tvid, show_obj.prodid, season, episode])
if 1 == len(sql_result):
absolute_number = int(sql_result[0]["absolute_number"])
logger.debug(f'Found absolute_number:{absolute_number} by {season}x{episode}')
else:
logger.debug('No entries for absolute number in show: %s found using %sx%s' %
(show_obj.unique_name, str(season), str(episode)))
return absolute_number
def get_all_episodes_from_absolute_number(show_obj, absolute_numbers):
# type: (TVShow, List[int]) -> Tuple[int, List[int]]
"""
:param show_obj: show object
:param absolute_numbers: absolute numbers
"""
episode_numbers = []
season_number = None
if show_obj and len(absolute_numbers):
for absolute_number in absolute_numbers:
ep_obj = show_obj.get_episode(None, None, absolute_number=absolute_number)
if ep_obj:
episode_numbers.append(ep_obj.episode)
season_number = ep_obj.season # this takes the last found season so eps that cross the season
# border are not handled well
return season_number, episode_numbers
def sanitize_scene_name(name):
"""
Takes a show name and returns the "scenified" version of it.
:param name: name
:type name: AnyStr
:return: A string containing the scene version of the show name given.
:rtype: AnyStr
"""
if name:
bad_chars = ',:()£\'!?\u2019'
# strip out any bad chars
name = re.sub(r'[%s]' % bad_chars, '', name, flags=re.U)
# tidy up stuff that doesn't belong in scene names
name = re.sub(r'(-?\s|/)', '.', name).replace('&', 'and')
name = re.sub(r"\.+", '.', name).rstrip('.')
return name
return ''
def create_https_certificates(ssl_cert, ssl_key):
"""
Create self-signed HTTPS certificares and store in paths 'ssl_cert' and 'ssl_key'
"""
try:
from lib.certgen import generate_key, generate_local_cert
except (BaseException, Exception):
return False
private_key = generate_key(key_size=4096, output_file=ssl_key)
cert = generate_local_cert(private_key, days_valid=3650, output_file=ssl_cert)
return bool(cert)
if '__main__' == __name__:
import doctest
doctest.testmod()
def parse_xml(data, del_xmlns=False):
# type: (AnyStr, bool) -> Optional[etree.ElementTree]
"""
Parse data into an xml elementtree.ElementTree
data: data string containing xml
del_xmlns: if True, removes xmlns namesspace from data before parsing
Returns: parsed data as elementtree or None
"""
if del_xmlns:
data = re.sub(' xmlns="[^"]+"', '', data)
if isinstance(data, text_type) and is_lxml:
data = RE_XML_ENCODING.sub(r'\1\3', data, count=1)
try:
parsed_xml = etree.fromstring(data)
except (BaseException, Exception) as e:
logger.debug(f"Error trying to parse xml data. Error: {ex(e)}")
parsed_xml = None
return parsed_xml
def backup_versioned_file(old_file, version):
"""
:param old_file: old filename
:type old_file: AnyStr
:param version: version number
:type version: int
:return: success
:rtype: bool
"""
num_tries = 0
new_file = '%s.v%s' % (old_file, version)
if os.path.isfile(new_file):
changed_old_db = False
for back_nr in range(1, 10000):
alt_name = '%s.r%s' % (new_file, back_nr)
if not os.path.isfile(alt_name):
try:
shutil.move(new_file, alt_name)
changed_old_db = True
break
except (BaseException, Exception):
if os.path.isfile(new_file):
continue
logger.warning('could not rename old backup db file')
if not changed_old_db:
raise Exception('can\'t create a backup of db')
while not os.path.isfile(new_file):
if not os.path.isfile(old_file) or 0 == get_size(old_file):
logger.debug('No need to create backup')
break
try:
logger.debug(f'Trying to back up {old_file} to {new_file}')
shutil.copy(old_file, new_file)
logger.debug('Backup done')
break
except (BaseException, Exception) as e:
logger.warning(f'Error while trying to back up {old_file} to {new_file} : {ex(e)}')
num_tries += 1
time.sleep(3)
logger.debug('Trying again.')
if 3 <= num_tries:
logger.error(f'Unable to back up {old_file} to {new_file} please do it manually.')
return False
return True
def restore_versioned_file(backup_file, version):
"""
:param backup_file: filename
:type backup_file: AnyStr
:param version: version number
:type version: int
:return: success
:rtype: bool
"""
num_tries = 0
new_file, backup_version = os.path.splitext(backup_file)
restore_file = new_file + '.' + 'v' + str(version)
if not os.path.isfile(new_file):
logger.debug(f'Not restoring, {new_file} doesn\'t exist')
return False
try:
logger.debug(f'Trying to backup {new_file} to {new_file}.r{version} before restoring backup')
shutil.move(new_file, new_file + '.' + 'r' + str(version))
except (BaseException, Exception) as e:
logger.warning(f'Error while trying to backup DB file {restore_file} before proceeding with restore: {ex(e)}')
return False
while not os.path.isfile(new_file):
if not os.path.isfile(restore_file):
logger.debug(f'Not restoring, {restore_file} doesn\'t exist')
break
try:
logger.debug(f'Trying to restore {restore_file} to {new_file}')
shutil.copy(restore_file, new_file)
logger.debug('Restore done')
break
except (BaseException, Exception) as e:
logger.warning(f'Error while trying to restore {restore_file}: {ex(e)}')
num_tries += 1
time.sleep(1)
logger.debug('Trying again.')
if 10 <= num_tries:
logger.error(f'Unable to restore {restore_file} to {new_file} please do it manually.')
return False
return True
# try to convert to float, return default on failure
def try_float(s, s_default=0.0):
try:
return float(s)
except (BaseException, Exception):
return float(s_default)
# generates a md5 hash of a file
def md5_for_file(filename, block_size=2 ** 16):
"""
:param filename: filename
:type filename: AnyStr
:param block_size: block size
:type block_size: int or long
:return:
:rtype: AnyStr or None
"""
try:
with open(filename, 'rb') as f:
md5 = hashlib.md5()
while True:
data = f.read(block_size)
if not data:
break
md5.update(data)
f.close()
return md5.hexdigest()
except (BaseException, Exception):
return None
def get_lan_ip():
"""
Simple function to get LAN localhost_ip
http://stackoverflow.com/questions/11735821/python-get-localhost-ip
"""
if 'nt' != os.name:
# noinspection PyUnresolvedReferences
import fcntl
import struct
def get_interface_ip(if_name):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', if_name[:15]))[20:24])
ip = socket.gethostbyname(socket.gethostname())
if ip.startswith("127.") and "nt" != os.name:
interfaces = [
"eth0",
"eth1",
"eth2",
"wlan0",
"wlan1",
"wifi0",
"ath0",
"ath1",
"ppp0",
]
for ifname in interfaces:
try:
# noinspection PyUnboundLocalVariable
ip = get_interface_ip(ifname)
print(ifname, ip)
break
except IOError:
pass
return ip
def check_url(url):
"""
Check if a URL exists without downloading the whole file.
:param url: url
:type url: AnyStr
:return:
:rtype: bool
"""
try:
return requests.head(url).ok
except (BaseException, Exception):
return False
def anon_url(*url):
"""
:param url: url
:type url:
:return: a URL string consisting of the Anonymous redirect URL and an arbitrary number of values appended
:rtype: AnyStr
"""
return '' if None in url else '%s%s' % (sickgear.ANON_REDIRECT, ''.join([str(s) for s in url]))
def starify(text, verify=False):
"""
If verify is true, return true if text is a star block created text else return false.
:param text: text
:type text: AnyStr
:param verify:
:type verify: bool
:return: Return text input string with either its latter half or its centre area (if 12 chars or more)
replaced with asterisks. Useful for securely presenting api keys to a ui.
"""
return '' if not text\
else ((('%s%s' % (text[:len(text) // 2], '*' * (len(text) // 2))),
('%s%s%s' % (text[:4], '*' * (len(text) - 8), text[-4:])))[12 <= len(text)],
set('*') == set((text[len(text) // 2:], text[4:-4])[12 <= len(text)]))[verify]
"""
Encryption
==========
By Pedro Jose Pereira Vieito (@pvieito)
* If encryption_version==0 then return data without encryption
* The keys should be unique for each device
To add a new encryption_version:
1) Code your new encryption_version
2) Update the last encryption_version available in webserve.py
3) Remember to maintain old encryption versions and key generators for retrocompatibility
"""
# Key Generators
unique_key1 = hex(uuid.getnode() ** 2) # Used in encryption v1
# Encryption Functions
def encrypt(data, encryption_version=0, do_decrypt=False):
# Version 1: Simple XOR encryption (this is not very secure, but works)
if 1 == encryption_version:
if do_decrypt:
return ''.join([chr(try_ord(x) ^ try_ord(y)) for (x, y) in
zip(b64decodebytes(decode_bytes(data)), cycle(unique_key1))])
return decode_str(b64encodebytes(decode_bytes(
''.join([chr(try_ord(x) ^ try_ord(y)) for (x, y) in zip(data, cycle(unique_key1))])))).strip()
# Version 0: Plain text
return data
def decrypt(data, encryption_version=0):
return encrypt(data, encryption_version, do_decrypt=True)
def full_sanitize_scene_name(name):
"""
sanitize scene name
:param name: name
:type name: AnyStr
:return: sanitized name
:rtype: AnyStr
"""
return re.sub('[. -]', ' ', sanitize_scene_name(name)).lower().lstrip()
def get_show(name, try_scene_exceptions=False):
# type: (AnyStr, bool) -> Optional[TVShow]
"""
get show object for show with given name
:param name: name of show
:type name: AnyStr
:param try_scene_exceptions: check scene exceptions
:type try_scene_exceptions: bool
:return: None or show object
:type: TVShow or None
"""
if not sickgear.showList or None is name:
return
show_obj = None
try:
tvid, prodid = sickgear.name_cache.retrieve_name_from_cache(name)
if tvid and prodid:
show_obj = find_show_by_id({tvid: prodid})
if not show_obj and try_scene_exceptions:
tvid, prodid, season = sickgear.scene_exceptions.get_scene_exception_by_name(name)
if tvid and prodid:
show_obj = find_show_by_id({tvid: prodid})
except (BaseException, Exception) as e:
logger.debug(f'Error when attempting to find show: {name} in SickGear: {ex(e)}')
return show_obj
def is_hidden_folder(folder):
"""
On Linux based systems hidden folders start with . (dot)
:param folder: Full path of folder to check
:type folder: AnyStr
:return: Returns True if folder is hidden
:rtype: bool
"""
if os.path.isdir(folder):
if os.path.basename(folder).startswith('.'):
return True
return False
def real_path(path):
"""
The resulting path will have no symbolic link, '/./' or '/../' components.
:param path: path
:type path: AnyStr
:return: the canonicalized absolute pathname
:rtype: AnyStr
"""
return os.path.normpath(os.path.normcase(os.path.realpath(os.path.expanduser(path))))
def validate_show(show_obj, season=None, episode=None):
"""
:param show_obj: show object
:type show_obj: TVShow
:param season: optional season
:type season: int or None
:param episode: opitonal episode
:type episode: int or None
:return: TVInfoAPI source
:rtype: object
"""
show_lang = show_obj.lang
try:
tvinfo_config = sickgear.TVInfoAPI(show_obj.tvid).api_params.copy()
tvinfo_config['dvdorder'] = 0 != show_obj.dvdorder
if show_lang and not 'en' == show_lang:
tvinfo_config['language'] = show_lang
t = sickgear.TVInfoAPI(show_obj.tvid).setup(**tvinfo_config)
if season is None and episode is None:
return t
return t.get_show(show_obj.prodid, language=show_obj.lang)[season][episode]
except (BaseTVinfoEpisodenotfound, BaseTVinfoSeasonnotfound, TypeError):
pass
def _maybe_request_url(e, def_url=''):
return hasattr(e, 'request') and hasattr(e.request, 'url') and ' ' + e.request.url or def_url
def clear_cache(force=False):
"""
clear sickgear cache folder
:param force: force clearing
:type force: bool
"""
# clean out cache directory, remove everything > 12 hours old
dirty = None
del_time = SGDatetime.timestamp_near(td=datetime.timedelta(hours=12))
direntry_args = dict(follow_symlinks=False)
for direntry in scantree(sickgear.CACHE_DIR, exclude_dirs=['images|rss|zoneinfo'], follow_symlinks=True):
if direntry.is_file(**direntry_args) and (force or del_time > direntry.stat(**direntry_args).st_mtime):
dirty = dirty or False if remove_file_perm(direntry.path) else True
elif direntry.is_dir(**direntry_args) and direntry.name not in ['cheetah', 'sessions', 'indexers']:
dirty = dirty or False
try:
os.rmdir(direntry.path)
except OSError:
dirty = True
logger.log(
f'{(("Found items not removed", "Found items removed")[not dirty], "No items found to remove")[None is dirty]}'
f' from cache folder {sickgear.CACHE_DIR}')
def human(size):
"""
format a size in bytes into a 'human' file size, e.g. bytes, KB, MB, GB, TB, PB
Note that bytes/KB will be reported in whole numbers but MB and above will have greater precision
e.g. 1 byte, 43 bytes, 443 KB, 4.3 MB, 4.43 GB, etc
:param size: numerical value to be converted
:type size: int or long or float
:return: human readable string
:rtype: AnyStr
"""
if 1 == size:
# because I really hate unnecessary plurals
return "1 byte"
suffixes_table = [('bytes', 0), ('KB', 0), ('MB', 1), ('GB', 2), ('TB', 2), ('PB', 2)]
num = float(size)
for suffix, precision in suffixes_table:
if 1024.0 > num:
break
num /= 1024.0
# noinspection PyUnboundLocalVariable
if 0 == precision:
formatted_size = '%d' % num
else:
formatted_size = str(round(num, ndigits=precision))
# noinspection PyUnboundLocalVariable
return '%s %s' % (formatted_size, suffix)
def get_size(start_path='.'):
"""
return combined size of data in given path
:param start_path: start path
:type start_path: AnyStr
:return: size in bytes
:rtype: int or long
"""
if os.path.isfile(start_path):
return os.path.getsize(start_path)
try:
return sum(map((lambda x: x.stat(follow_symlinks=False).st_size), scantree(start_path)))
except OSError:
return 0
def get_media_stats(start_path='.'):
# type: (AnyStr) -> Tuple[int, int, int, int]
"""
return recognised media stats for a folder as...
number of media files, smallest size in bytes, largest size in bytes, average size in bytes
:param start_path: path to scan
"""
if os.path.isdir(start_path):
sizes = sorted(map(lambda y: y.stat(follow_symlinks=False).st_size,
filter(lambda x: has_media_ext(x.name), scantree(start_path))))
if sizes:
return len(sizes), sizes[0], sizes[-1], int(sum(sizes) / len(sizes))
elif os.path.isfile(start_path):
size = os.path.getsize(start_path)
return 1, size, size, size
return 0, 0, 0, 0
def remove_article(text=''):
"""
remove articles from text
:param text: input text
:type text: AnyStr
:return: text without articles
:rtype: AnyStr
"""
return re.sub(r'(?i)^(?:A(?!\s+to)n?|The)\s(\w)', r'\1', text)
def re_valid_hostname(with_allowed=True):
this_host = socket.gethostname()
return re.compile(r'(?i)(%slocalhost|.*\.local%s%s)$' % (
(with_allowed
and '%s|' % (sickgear.ALLOWED_HOSTS
and '|'.join(re.escape(x.strip()) for x in sickgear.ALLOWED_HOSTS.split(','))
or '.*')
or ''),
bool(this_host) and ('|%s' % this_host) or '',
sickgear.ALLOW_ANYIP and ('|%s' % valid_ipaddr_expr()) or ''))
def valid_ipaddr_expr():
"""
Returns a regular expression that will validate an ip address
:return: Regular expression
:rtype: String
"""
return r'(%s)' % '|'.join([re.sub(r'\s+(#.[^\r\n]+)?', '', x) for x in [
# IPv4 address (accurate)
# Matches 0.0.0.0 through 255.255.255.255
r'''
(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])
''',
# IPv6 address (standard and mixed)
# 8 hexadecimal words, or 6 hexadecimal words followed by 4 decimal bytes All with optional leading zeros
r'''
(?:(? max_eps:
seasons = [x for x in set([x.season for x in segment])]
seg_str = f'Season{maybe_plural(len(seasons))}: '
divider = ''
for x in seasons:
eps = [str(s.episode) for s in segment if x == s.season]
ep_c = len(eps)
seg_str += '%s%s (%s Ep%s)' \
% (divider, x, maybe_plural(ep_c), ', '.join(eps), ep_c, maybe_plural(ep_c))
divider = ', '
elif segment:
episode_numbers = ['S%sE%s' % (str(x.season).zfill(2), str(x.episode).zfill(2)) for x in segment]
seg_str = f'Episode{maybe_plural(len(episode_numbers))}: {", ".join(episode_numbers)}'
return seg_str
def has_anime():
"""
:return: if there are any anime shows in show list
:rtype: bool
"""
# noinspection PyTypeChecker
return False if not sickgear.showList else any(filter(lambda show: show.is_anime, sickgear.showList))
def cpu_sleep():
if cpu_presets[sickgear.CPU_PRESET]:
time.sleep(cpu_presets[sickgear.CPU_PRESET])
def cleanup_cache():
"""
Delete old cached files
"""
delete_not_changed_in(
[os.path.join(sickgear.CACHE_DIR, 'images', 'browse', 'thumb', x)
for x in ['anidb', 'imdb', 'trakt', 'tvdb']] +
[os.path.join(sickgear.CACHE_DIR, 'images', x)
for x in ['characters', 'person']] +
[os.path.join(sickgear.CACHE_DIR, 'tvinfo_cache')])
def delete_not_changed_in(paths, days=30, minutes=0):
"""
Delete files under paths not changed in n days and/or n minutes.
If a file was modified later than days/and or minutes, then don't delete it.
:param paths: Path(s) to scan for files to delete
:type paths: String or List of strings
:param days: Purge files not modified in this number of days (default: 30 days)
:param minutes: Purge files not modified in this number of minutes (default: 0 minutes)
:return: tuple; number of files that qualify for deletion, number of qualifying files that failed to be deleted
"""
del_time = SGDatetime.timestamp_near(td=datetime.timedelta(days=days, minutes=minutes))
errors = 0
qualified = 0
for cur_path in (paths, [paths])[not isinstance(paths, list)]:
try:
for direntry in scantree(cur_path, filter_kind=False):
if del_time > direntry.stat(follow_symlinks=False).st_mtime:
if not remove_file_perm(direntry.path):
errors += 1
qualified += 1
except (BaseException, Exception):
pass
return qualified, errors
def set_file_timestamp(filename, min_age=3, new_time=None):
"""
:param filename: filename
:type filename: AnyStr
:param min_age: minimum age in days
:type min_age: int
:param new_time:
:type new_time: None or int
"""
min_time = SGDatetime.timestamp_near(td=datetime.timedelta(days=min_age))
try:
if os.path.isfile(filename) and os.path.getmtime(filename) < min_time:
os.utime(filename, new_time)
except (BaseException, Exception):
pass
def should_delete_episode(status):
"""
check if episode should be deleted from db
:param status: episode status
:type status: int
:return: should be deleted
:rtype: bool
"""
s = Quality.split_composite_status(status)[0]
if s not in SNATCHED_ANY + [DOWNLOADED, ARCHIVED, IGNORED]:
return True
logger.debug('not safe to delete episode from db because of status: %s' % statusStrings[s])
return False
def is_link(filepath):
"""
Check if given file/pathname is symbolic link
:param filepath: file or path to check
:return: True or False
"""
if 'win32' == sys.platform:
if not os.path.exists(filepath):
return False
import ctypes
invalid_file_attributes = 0xFFFFFFFF
file_attribute_reparse_point = 0x0400
attr = ctypes.windll.kernel32.GetFileAttributesW(text_type(filepath))
return invalid_file_attributes != attr and 0 != attr & file_attribute_reparse_point
return os.path.islink(filepath)
def find_mount_point(path):
# type: (AnyStr) -> AnyStr
"""
returns the mount point for the given path
:param path: to find the mount path
:return: mount point for path or path if no mount
"""
result = path
if os.path.exists(path):
result = os.path.realpath(os.path.abspath(path))
try:
while not os.path.ismount(result):
new_path = os.path.dirname(result)
if new_path == result:
# return input path if mount point not found
return path
result = new_path
except (BaseException, Exception):
return path
return result
def df():
# type: (...) -> Tuple[List[Tuple[AnyStr, AnyStr]], bool]
"""
Return disk free space at known parent locations
:return: string path, string value that is formatted size
"""
result = []
min_output = True # flag ui to output minimal (e.g. vol: size, vol: size)
if sickgear.ROOT_DIRS and sickgear.DISPLAY_FREESPACE:
targets = []
for cur_target in filter(lambda _t: _t and _t not in targets,
map(find_mount_point, sickgear.ROOT_DIRS.split('|')[1:])):
targets += [cur_target]
free = freespace(cur_target)
if 'win32' == sys.platform and None is not free:
cur_target = os.path.splitdrive(cur_target)[0]
if any(['win32' == sys.platform and not re.match('(?i)[a-z]:(\\\\)?$', cur_target),
# Windows, if a simple drive letter isn't found, fallback to full path. On Linux, full path is used
# trigger ui to output long paths instead of minimal volume letters layout
sys.platform.startswith(('linux', 'darwin', 'sunos5')), 'bsd' in sys.platform]):
min_output = False
result += [(cur_target, 'unavailable' if None is free else sizeof_fmt(free, sep=''))]
return result, min_output
def freespace(path=None):
"""
Return free space available at path location
:param path: Example paths (Windows) = '\\\\192.168.0.1\\sharename\\existing_path', 'd:\\existing_path'
Untested with mount points under linux
:type path: AnyStr
:return: Size in bytes
:rtype: long or None
"""
result = None
if 'win32' == sys.platform:
try:
import ctypes
if None is not ctypes:
max_val = (2 ** 64) - 1
storage = ctypes.c_ulonglong(max_val)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(storage))
result = (storage.value, None)[max_val == storage.value]
except (BaseException, Exception):
pass
elif sys.platform.startswith(('linux', 'darwin', 'sunos5')) or 'bsd' in sys.platform:
try:
storage = os.statvfs(path) # perms errors can result
result = storage.f_bavail * storage.f_frsize
except OSError:
pass
return result
def path_mapper(search, replace, subject):
"""
Substitute strings in a path
:param search: Search text
:type search: AnyStr
:param replace: Replacement text
:type replace: AnyStr
:param subject: Path text to search
:type subject: AnyStr
:return: Subject with or without substitution, True if a change was made otherwise False
:rtype: Tuple[AnyStr, bool]
"""
delim = '/!~!/'
search = re.sub(r'\\', delim, search)
replace = re.sub(r'\\', delim, replace)
path = re.sub(r'\\', delim, subject)
result = re.sub('(?i)^%s' % search, replace, path)
result = os.path.normpath(re.sub(delim, '/', result))
return result, result != subject
def get_overview(ep_status, show_quality, upgrade_once, split_snatch=False):
# type: (integer_types, integer_types, Union[integer_types, bool], bool) -> integer_types
"""
:param ep_status: episode status
:param show_quality: show quality
:param upgrade_once: upgrade once
:param split_snatch:
:type split_snatch: bool
:return: constant from classes Overview
"""
status, quality = Quality.split_composite_status(ep_status)
if ARCHIVED == status:
return Overview.GOOD
if WANTED == status:
return Overview.WANTED
if status in (SKIPPED, IGNORED):
return Overview.SKIPPED
if status in (UNAIRED, UNKNOWN):
return Overview.UNAIRED
if status in [SUBTITLED] + Quality.SNATCHED_ANY + Quality.DOWNLOADED + Quality.FAILED:
if FAILED == status:
return Overview.WANTED
if not split_snatch and status in SNATCHED_ANY:
return Overview.SNATCHED
void, best_qualities = Quality.split_quality(show_quality)
# if re-downloads aren't wanted then mark it "good" if there is anything
if not len(best_qualities):
return Overview.GOOD
min_best, max_best = min(best_qualities), max(best_qualities)
if quality >= max_best \
or (upgrade_once and
(quality in best_qualities or (None is not min_best and quality > min_best))):
return Overview.GOOD
return (Overview.QUAL, Overview.SNATCHED_QUAL)[status in SNATCHED_ANY]
def generate_show_dir_name(root_dir, show_name):
# type: (Optional[AnyStr], AnyStr) -> AnyStr
"""
generate show dir name
:param root_dir: root dir
:param show_name: show name
:return: show dir name
"""
san_show_name = sanitize_filename(show_name)
if sickgear.SHOW_DIRS_WITH_DOTS:
san_show_name = san_show_name.replace(' ', '.')
if None is root_dir:
return san_show_name
return os.path.join(root_dir, san_show_name)
def count_files_dirs(base_dir):
"""
:param base_dir: path
:type base_dir: AnyStr
:return: tuple of count of files, dirs
:rtype: Tuple[int, int]
"""
f = d = 0
try:
with scandir(base_dir) as s_d:
try:
files = s_d
except OSError as e:
logger.warning('Unable to count files %s / %s' % (repr(e), ex(e)))
else:
for e in files:
if e.is_file():
f += 1
elif e.is_dir():
d += 1
except OSError as e:
logger.warning('Unable to count files %s / %s' % (repr(e), ex(e)))
return f, d
def upgrade_new_naming():
my_db = db.DBConnection()
sql_result = my_db.select('SELECT indexer AS tv_id, indexer_id AS prod_id FROM tv_shows')
show_list = {}
for cur_result in sql_result:
show_list[int(cur_result['prod_id'])] = int(cur_result['tv_id'])
if sickgear.FANART_RATINGS:
from sickgear.tv import TVidProdid
ne = {}
for k, v in iteritems(sickgear.FANART_RATINGS):
nk = show_list.get(try_int(k))
if nk:
ne[TVidProdid({nk: int(k)})()] = sickgear.FANART_RATINGS[k]
sickgear.FANART_RATINGS = ne
sickgear.CFG.setdefault('GUI', {})['fanart_ratings'] = '%s' % ne
sickgear.CFG.write()
image_cache_dir = os.path.join(sickgear.CACHE_DIR, 'images')
bp_match = re.compile(r'(\d+)\.((?:banner|poster|(?:\d+(?:\.\w*)?\.\w{5,8}\.)?fanart)\.jpg)', flags=re.I)
def _set_progress(p_msg, c, s):
ps = None
if 0 == s:
ps = 0
elif 1 == s and 0 == c:
ps = 100
elif 1 > c % s:
ps = c / s
if None is not ps:
sickgear.classes.loading_msg.set_msg_progress(p_msg, '{:6.2f}%'.format(ps))
for d in ['', 'thumbnails']:
bd = os.path.join(image_cache_dir, d)
if os.path.isdir(bd):
fc, dc = count_files_dirs(bd)
step = fc / float(100)
cf = 0
p_text = 'Upgrading %s' % (d, 'banner/poster')[not d]
_set_progress(p_text, 0, 0)
with scandir(bd) as s_d:
for entry in scandir(bd):
if entry.is_file():
cf += 1
_set_progress(p_text, cf, step)
b_s = bp_match.search(entry.name)
if b_s:
old_id = int(b_s.group(1))
tvid = show_list.get(old_id)
if tvid:
nb_dir = os.path.join(sickgear.CACHE_DIR, 'images', 'shows', '%s-%s' % (tvid, old_id), d)
if not os.path.isdir(nb_dir):
try:
os.makedirs(nb_dir)
except (BaseException, Exception):
pass
new_name = os.path.join(nb_dir, bp_match.sub(r'\2', entry.name))
try:
move_file(entry.path, new_name)
except (BaseException, Exception) as e:
logger.warning('Unable to rename %s to %s: %s / %s'
% (entry.path, new_name, repr(e), ex(e)))
else:
# clean up files without reference in db
try:
os.remove(entry.path)
except (BaseException, Exception):
pass
elif entry.is_dir():
if entry.name in ['shows', 'browse']:
continue
elif 'fanart' == entry.name:
_set_progress(p_text, 0, 1)
fc_fan, dc_fan = count_files_dirs(entry.path)
step_fan = dc_fan / float(100)
cf_fan = 0
p_text = 'Upgrading fanart'
_set_progress(p_text, 0, 0)
try:
with scandir(entry.path) as s_p:
try:
entries = s_p
except OSError as e:
logger.warning('Unable to stat dirs %s / %s' % (repr(e), ex(e)))
continue
for d_entry in entries:
if d_entry.is_dir():
cf_fan += 1
_set_progress(p_text, cf_fan, step_fan)
old_id = try_int(d_entry.name)
if old_id:
new_id = show_list.get(old_id)
if new_id:
new_dir_name = os.path.join(sickgear.CACHE_DIR, 'images', 'shows',
'%s-%s' % (new_id, old_id), 'fanart')
try:
move_file(d_entry.path, new_dir_name)
except (BaseException, Exception) as e:
logger.warning(f'Unable to rename {d_entry.path}'
f' to {new_dir_name}: {repr(e)} / {ex(e)}')
if os.path.isdir(new_dir_name):
try:
with scandir(new_dir_name) as s_d_n:
try:
f_n = filter(lambda fn: fn.is_file(), s_d_n)
except OSError as e:
logger.warning(
f'Unable to rename {repr(e)} / {ex(d)}')
else:
rename_args = []
# noinspection PyTypeChecker
for f_entry in f_n:
rename_args += [
(f_entry.path,
bp_match.sub(r'\2', f_entry.path))]
for args in rename_args:
try:
move_file(*args)
except (BaseException, Exception) as e:
logger.warning(
f'Unable to rename {args[0]}'
f' to {args[1]}: {repr(e)} / {ex(e)}')
except OSError as e:
logger.warning(
'Unable to rename %s / %s' % (repr(e), ex(e)))
else:
try:
shutil.rmtree(d_entry.path)
except (BaseException, Exception):
pass
try:
shutil.rmtree(d_entry.path)
except (BaseException, Exception):
pass
except OSError as e:
logger.warning('Unable to stat dirs %s / %s' % (repr(e), ex(e)))
continue
try:
os.rmdir(entry.path)
except (BaseException, Exception):
pass
if 'thumbnails' == d:
try:
os.rmdir(bd)
except (BaseException, Exception):
pass
_set_progress(p_text, 0, 1)
def xhtml_escape(text, br=True):
"""
Escapes a string, so it is valid within HTML or XML using the function from Tornado.
:param text: Text to convert entities from for example '"' to '"'
:type text: AnyStr
:param br: True, replace newline with html `
`
:type br: bool
:return: Text with entities replaced
:rtype: AnyStr
"""
if not text:
return text
from tornado import escape
if br:
text = re.sub(r'\r?\n', '
', text)
return escape.xhtml_escape(normalise_chars(text))
def normalise_chars(text):
# noinspection GrazieInspection
"""
Normalise characters to maintain a consistent output from different sources, and to prevent issues when sorting
e.g.
curved apostrophe ’ with standard ' apostrophe
wide dash with standard hyphen
:param text: Text to convert entities from for example '"' to '"'
:type text: AnyStr
:return: Text with entities replaced
:rtype: AnyStr
"""
result = text.replace('\u2010', '-').replace('\u2011', '-').replace('\u2012', '-') \
.replace('\u2013', '-').replace('\u2014', '-').replace('\u2015', '-') \
.replace('\u2018', "'").replace('\u2019', "'") \
.replace('\u201c', '\"').replace('\u201d', '\"') \
.replace('\u0020', ' ').replace('\u00a0', ' ')
return result
def parse_imdb_id(string):
# type: (AnyStr) -> Optional[AnyStr]
""" Parse an IMDB ID from a string
:param string: string to parse
:return: parsed ID of the form char, char, number of digits or None if no match
"""
result = None
try:
result = RC_IMDB_ID.findall(string)[0]
except(BaseException, Exception):
pass
return result
def generate_word_str(words, regex=False, join_chr=','):
# type: (Set[AnyStr], bool, AnyStr) -> AnyStr
"""
combine a list or set to a string with optional prefix 'regex:'
:param words: list or set of words
:type words: set
:param regex: prefix regex: ?
:type regex: bool
:param join_chr: character(s) used for join words
:type join_chr: basestring
:return: combined string
:rtype: basestring
"""
return '%s%s' % (('', 'regex:')[True is regex], join_chr.join(words))
def split_word_str(word_list):
# type: (AnyStr) -> Tuple[Set[AnyStr], bool]
"""
split string into set and boolean regex
:param word_list: string with words
:type word_list: basestring
:return: set of words, is it regex
:rtype: (set, bool)
"""
try:
if word_list.startswith('regex:'):
rx = True
word_list = word_list.replace('regex:', '')
else:
rx = False
s = set(w.strip() for w in word_list.split(',') if w.strip())
except (BaseException, Exception):
rx = False
s = set()
return s, rx