mirror of
https://github.com/SickGear/SickGear.git
synced 2025-01-05 09:33:38 +00:00
7a6936823e
Add `spoken_languages` to tmdb API and TVInfoShow object. Add `trailers`, `homepage` to trakt API and TVInfoShow object. Add trakt episode data if returned from api. Add trakt API methods. - get_most_played - get_most_watched - get_most_collected - get_anticipated - get_recommended - get_trending - get_popular - get_recommended_for_account - get_new_shows - get_new_seasons - get_watchlisted_for_account - get_similar - hide_recommended_for_account (to hide/remove recommended shows for account) - unhide_recommended_for_account - list_hidden_recommended_for_account Fix caching tmdb language list over different runtime instances. Add episode_count and fix ti_show in tmdb_api person object. Change set additional properties in get_person trakt_api. Add tmdb API methods and tvinfo_base. - get_recommended_for_show - get_similar --- fix supported language caching improve print output (source name) of tvinfo_api_tests fix tvinfo_api_tests data creation --- Add code so that it runs with all_test use mock today() and now() dates add option to only get new urls mock data try also to make object creation only when needed fix person parser in tmdb_api add search_person test in tvinfo_api_tests restore mocked methods at the end of the tvinfo_api_tests to prevent other tests to fail when called via all_tests switch gzip with better lzma compression for mock files (default lib in py3) move mock files in test unit sub folder --- Fix trakt method `get_recommended`. Fix browse trakt tests in tvinfo_api_tests. Change set episode id in trakt api. --- Add test_browse_endpoints to tvinfo_api_tests. --- Add enforce_type to sg_helpers. Change use enforce str for overviews. Change remove `if PY2` code sections Add support for datetime.time in _make_airtime in tv.py Refactor tvmaze_api show data setter. Change test to not allow None for seriesname. Add additional missing showdata with caller load_data(). Add load_data() to TVInfoShow. Add guestcast, guestcrew to episodes in pytvmaze lib. --- Change make seriesid of TVInfoShow a alias property of id. Add tvinfo tests. Add search tests. Add show, person tests. Change add trakt tests. Change add tmdb search tests. tvmaze_api exclude rating from mapping. Allow None for seriesname. Fix origin_countries in trakt_api search. Fix show_type in tvmaze_api. Fix airtime for episodes in tvmaze_api. --- Change switch to property instead of legacy dict-like use for trakt search results. Change optimize speed of get() function. Fix make BaseTVinfoSeasonnotfound and BaseTVinfoAttributenotfound also a subclass of AttributeError and KeyError. Change mock get() to work with and without default args just like dict get(). Change add language to tmdb_api search results. Change improve person search by remote id, by getting the complete persons data when there is only 1 result. Change trakt API search results to tvinfoshow. Change search results to TVInfoShow objs in tvmaze_api. Change simplify poster URL generation for search results. Change search results to TVInfoShow objs. Change add tvdb genre links to displayShow. Change workaround for missing data in person data (series set to None). Fix add show to characters of person if there is no name on IMDb (set to 'unknown name'). Change add config and icons for linkedin, reddit, wikidata, youtube. Add TVInfoIDs, TVInfoSocialIDs to Trakt. Add TVInfoIDs to tmdb_api. Add TVInfoIDs to tvmaze_api. add TVInfoIDs to imdb_api. Change make character name '' if None. Fix for 'unknown name' persons and characters. Add contentrating. Change fill in new fields to get_person results. ---- Change set new in/active dates to network. Change add active_date, inactive_date to TVInfoNetwork class. Change add default kwargs to tmdb discover method if no kwargs are set. Change default: English language shows with first air date greater then today. Change add slug field to returned data from discover. Change add 'score' mapped to rating to discover returned results. Fix valid_data for discover method. Change add result_count to discover. Change add _sanitise_image_uri to discover method. Fix convert_person. Change add missing _sanitise_image_uri for images in some places. Fix crew. Change return type of tvinfo base: discover to list tvinfoshow. Fix people remote id search. Change add tmdb person id search. Change fix people endpoint fieldname changes. Change add biography to person object. Change move 401 expired token handling into TvdbAuth class. Change get new token if old token is expired. Change add raise error if episodes fallback fails to load data. Change add break if no valid_data to absolute and alternative numberings. Change add filter only networks. Change add new required parameter meta=translations to get translated (includes the original language) show overviews. Change add check if show is set for person compare. Fix person update properties with no show set. Change add person image. Change add alternative episode orders. Change add alt_ep_numbering to TVINFO_Show. Change add old interface for dvd order. Change add trakt slug tvinfo search test cases. Change add mock for old tvdb get new token. Change old lib to newer tvinfo data. Fix person id (not available on old api). Change more places to new TVInfoAPI interface.
1810 lines
61 KiB
Python
1810 lines
61 KiB
Python
# coding=utf-8
|
||
#
|
||
# This file is part of SickGear.
|
||
#
|
||
# SickGear is free software: you can redistribute it and/or modify
|
||
# it under the terms of the GNU General Public License as published by
|
||
# the Free Software Foundation, either version 3 of the License, or
|
||
# (at your option) any later version.
|
||
#
|
||
# SickGear is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU General Public License for more details.
|
||
#
|
||
# You should have received a copy of the GNU General Public License
|
||
# along with SickGear. If not, see <http://www.gnu.org/licenses/>.
|
||
|
||
from itertools import cycle
|
||
import datetime
|
||
import hashlib
|
||
import os
|
||
import re
|
||
import shutil
|
||
import socket
|
||
import time
|
||
import uuid
|
||
import sys
|
||
|
||
import sickgear
|
||
from . import db, logger, notifiers
|
||
from .common import cpu_presets, mediaExtensions, Overview, Quality, statusStrings, subtitleExtensions, \
|
||
ARCHIVED, DOWNLOADED, FAILED, IGNORED, SKIPPED, SNATCHED_ANY, SUBTITLED, UNAIRED, UNKNOWN, WANTED
|
||
from .sgdatetime import SGDatetime
|
||
from lib.tvinfo_base.exceptions import *
|
||
from exceptions_helper import ex, MultipleShowObjectsException
|
||
|
||
import dateutil.parser
|
||
import requests
|
||
import requests.exceptions
|
||
import subliminal
|
||
from lxml_etree import etree, is_lxml
|
||
from base64 import decodebytes as b64decodebytes, encodebytes as b64encodebytes
|
||
|
||
from _23 import decode_bytes, decode_str, scandir
|
||
from six import iteritems, string_types, text_type
|
||
# noinspection PyUnresolvedReferences
|
||
from six.moves import zip
|
||
|
||
# the following are imported from elsewhere,
|
||
# therefore, they intentionally don't resolve and are unused in this particular file.
|
||
# noinspection PyUnresolvedReferences
|
||
from sg_helpers import chmod_as_parent, clean_data, copy_file, download_file, fix_set_group_id, get_system_temp_dir, \
|
||
get_url, indent_xml, make_path, maybe_plural, md5_for_text, move_file, proxy_setting, remove_file, \
|
||
remove_file_perm, replace_extension, sanitize_filename, scantree, touch_file, try_int, try_ord, write_file
|
||
|
||
# deprecated item, remove in 2020, kept here as rollback uses it
|
||
copyFile = copy_file
|
||
moveFile = move_file
|
||
tryInt = try_int # one legacy custom provider is keeping this signature here
|
||
|
||
# noinspection PyUnreachableCode
|
||
if False:
|
||
# noinspection PyUnresolvedReferences
|
||
from typing import Any, AnyStr, Dict, Generator, NoReturn, Iterable, Iterator, List, Optional, Set, Tuple, Union
|
||
from .tv import TVShow
|
||
# the following workaround hack resolves a pyc resolution bug
|
||
from .name_cache import retrieve_name_from_cache
|
||
from six import integer_types
|
||
|
||
RE_XML_ENCODING = re.compile(r'^(<\?xml[^>]+)\s+(encoding\s*=\s*[\"\'][^\"\']*[\"\'])(\s*\?>|)', re.U)
|
||
RE_IMDB_ID = r'tt\d{7,10}'
|
||
RC_IMDB_ID = re.compile(r'(?i)(%s)' % RE_IMDB_ID)
|
||
|
||
|
||
def remove_extension(name):
|
||
"""
|
||
Remove download or media extension from name (if any)
|
||
|
||
:param name: filename
|
||
:type name: AnyStr
|
||
:return: name without extension
|
||
:rtype: AnyStr
|
||
"""
|
||
|
||
if name and "." in name:
|
||
base_name, sep, extension = name.rpartition('.')
|
||
if base_name and extension.lower() in ['nzb', 'torrent'] + mediaExtensions:
|
||
name = base_name
|
||
|
||
return name
|
||
|
||
|
||
def is_parsable_date(text):
|
||
try:
|
||
result = isinstance(dateutil.parser.parse(text), datetime.datetime)
|
||
except (BaseException, Exception):
|
||
result = False
|
||
return result
|
||
|
||
|
||
def remove_non_release_groups(name, is_anime=False):
|
||
"""
|
||
Remove non release groups from name
|
||
|
||
:param name: release name
|
||
:type name: AnyStr
|
||
:param is_anime: is anmie
|
||
:type is_anime: bool
|
||
:return:
|
||
:rtype: AnyStr
|
||
"""
|
||
|
||
if name:
|
||
rc = [re.compile(r'(?i)' + v) for v in [
|
||
r'([\s\.\-_\[\{\(]*(no-rar|nzbgeek|ripsalot|siklopentan)[\s\.\-_\]\}\)]*)$',
|
||
r'([\s\.\-_\[\{\(]rp[\s\.\-_\]\}\)]*)$',
|
||
r'(?<=\w)([\s\.\-_]*[\[\{\(][\s\.\-_]*(www\.\w+.\w+)[\s\.\-_]*[\]\}\)][\s\.\-_]*)$',
|
||
r'(?<=\w)([\s\.\-_]*[\[\{\(]\s*(rar(bg|tv)|((e[tz]|v)tv))[\s\.\-_]*[\]\}\)][\s\.\-_]*)$'] +
|
||
([r'(?<=\w)([\s\.\-_]*[\[\{\(][\s\.\-_]*[\w\s\.\-\_]+[\s\.\-_]*[\]\}\)][\s\.\-_]*)$',
|
||
r'^([\s\.\-_]*[\[\{\(][\s\.\-_]*[\w\s\.\-\_]+[\s\.\-_]*[\]\}\)][\s\.\-_]*)(?=\w)'], [])[is_anime]]
|
||
rename = name = remove_extension(name)
|
||
while rename:
|
||
for regex in rc:
|
||
result = regex.findall(name)
|
||
if result:
|
||
for cur_match in isinstance(result[0], tuple) and result[0] or result:
|
||
if not is_parsable_date(cur_match.strip(' ()')):
|
||
name = regex.sub('', name)
|
||
rename = (name, False)[name == rename]
|
||
|
||
return name
|
||
|
||
|
||
def is_sync_file(filename):
|
||
"""
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
extension = filename.rpartition(".")[2].lower()
|
||
return '!sync' == extension or 'lftp-pget-status' == extension
|
||
|
||
|
||
def has_media_ext(filename):
|
||
"""
|
||
checks if file has media extension
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
# ignore samples
|
||
if re.search(r'(^|[\W_])(sample\d*)[\W_]', filename, re.I) \
|
||
or filename.startswith('._'): # and MAC OS's 'resource fork' files
|
||
return False
|
||
|
||
sep_file = filename.rpartition('.')
|
||
return (None is re.search('extras?$', sep_file[0], re.I)) and (sep_file[2].lower() in mediaExtensions)
|
||
|
||
|
||
def has_image_ext(filename):
|
||
"""
|
||
checks if file has image extension
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
try:
|
||
if os.path.splitext(filename)[1].lower() in ['.bmp', '.gif', '.jpeg', '.jpg', '.png', '.webp']:
|
||
return True
|
||
except (BaseException, Exception):
|
||
pass
|
||
return False
|
||
|
||
|
||
def is_first_rar_volume(filename):
|
||
"""
|
||
checks if file is part of rar set
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
return None is not re.search(r'(?P<file>^(?P<base>(?:(?!\.part\d+\.rar$).)*)\.(?:(?:part0*1\.)?rar)$)', filename)
|
||
|
||
|
||
def find_show_by_id(
|
||
show_id, # type: Union[AnyStr, Dict[int, int], int]
|
||
show_list=None, # type: Optional[List[TVShow]]
|
||
no_mapped_ids=True, # type: bool
|
||
check_multishow=False, # type: bool
|
||
no_exceptions=False # type: bool
|
||
):
|
||
# type: (...) -> Optional[TVShow]
|
||
"""
|
||
:param show_id: {indexer: id} or 'tvid_prodid'.
|
||
:param show_list: (optional) TVShow objects list
|
||
:param no_mapped_ids: don't check mapped ids
|
||
:param check_multishow: check for multiple show matches
|
||
:param no_exceptions: suppress the MultipleShowObjectsException and return None instead
|
||
"""
|
||
results = []
|
||
if None is show_list:
|
||
show_list = sickgear.showList
|
||
|
||
if show_id and show_list:
|
||
tvid_prodid_obj = None if not isinstance(show_id, string_types) else sickgear.tv.TVidProdid(show_id)
|
||
|
||
if tvid_prodid_obj and no_mapped_ids:
|
||
if None is tvid_prodid_obj.prodid:
|
||
return
|
||
return sickgear.showDict.get(int(tvid_prodid_obj))
|
||
else:
|
||
if tvid_prodid_obj:
|
||
if None is tvid_prodid_obj.prodid:
|
||
return None
|
||
show_id = tvid_prodid_obj.dict
|
||
|
||
if isinstance(show_id, dict):
|
||
if no_mapped_ids:
|
||
sid_int_list = [sickgear.tv.TVShow.create_sid(k, v) for k, v in iteritems(show_id) if k and v and
|
||
0 < v and sickgear.tv.tvid_bitmask >= k]
|
||
if not check_multishow:
|
||
return next((sickgear.showDict.get(_show_sid_id) for _show_sid_id in sid_int_list
|
||
if sickgear.showDict.get(_show_sid_id)), None)
|
||
results = [sickgear.showDict.get(_show_sid_id) for _show_sid_id in sid_int_list
|
||
if sickgear.showDict.get(_show_sid_id)]
|
||
else:
|
||
results = [_show_obj for k, v in iteritems(show_id) if k and v and 0 < v
|
||
for _show_obj in show_list if v == _show_obj.internal_ids.get(k, {'id': 0})['id']]
|
||
|
||
num_shows = len(set(results))
|
||
if 1 == num_shows:
|
||
return results[0]
|
||
if 1 < num_shows and not no_exceptions:
|
||
raise MultipleShowObjectsException()
|
||
|
||
|
||
def make_dir(path):
|
||
"""
|
||
create given path recursively
|
||
|
||
:param path: path
|
||
:type path: AnyStr
|
||
:return: success of creation
|
||
:rtype: bool
|
||
"""
|
||
if not os.path.isdir(path):
|
||
try:
|
||
os.makedirs(path)
|
||
# do a Synology library update
|
||
notifiers.NotifierFactory().get('SYNOINDEX').addFolder(path)
|
||
except OSError:
|
||
return False
|
||
return True
|
||
|
||
|
||
def search_infosrc_for_show_id(reg_show_name, tvid=None, prodid=None, ui=None):
|
||
"""
|
||
search info source for show
|
||
|
||
:param reg_show_name: show name to search
|
||
:type reg_show_name: AnyStr
|
||
:param tvid: tvid
|
||
:type tvid: int or None
|
||
:param prodid: prodid
|
||
:type prodid: int or long or None
|
||
:param ui: ui class
|
||
:type ui: object
|
||
:return: seriesname, tvid, prodid or None, None, None
|
||
:rtype: Tuple[None, None, None] or Tuple[AnyStr, int, int or long]
|
||
"""
|
||
show_names = [re.sub('[. -]', ' ', reg_show_name)]
|
||
|
||
# Query Indexers for each search term and build the list of results
|
||
for cur_tvid in (sickgear.TVInfoAPI().sources if not tvid else [int(tvid)]) or []:
|
||
# Query Indexers for each search term and build the list of results
|
||
tvinfo_config = sickgear.TVInfoAPI(cur_tvid).api_params.copy()
|
||
if ui is not None:
|
||
tvinfo_config['custom_ui'] = ui
|
||
t = sickgear.TVInfoAPI(cur_tvid).setup(**tvinfo_config)
|
||
|
||
for cur_name in show_names:
|
||
logger.debug('Trying to find %s on %s' % (cur_name, sickgear.TVInfoAPI(cur_tvid).name))
|
||
|
||
try:
|
||
if prodid:
|
||
show_info_list = t.get_show(prodid)
|
||
else:
|
||
show_info_list = t.search_show(cur_name)
|
||
show_info_list = show_info_list if isinstance(show_info_list, list) else [show_info_list]
|
||
except (BaseException, Exception):
|
||
continue
|
||
|
||
seriesname = _prodid = None
|
||
for cur_show_info in show_info_list: # type: dict
|
||
try:
|
||
seriesname = cur_show_info['seriesname']
|
||
_prodid = cur_show_info['id']
|
||
except (BaseException, Exception):
|
||
_prodid = seriesname = None
|
||
continue
|
||
if seriesname and _prodid:
|
||
break
|
||
|
||
if not (seriesname and _prodid):
|
||
continue
|
||
|
||
if None is prodid and str(cur_name).lower() == str(seriesname).lower():
|
||
return seriesname, cur_tvid, int(_prodid)
|
||
elif None is not prodid and int(prodid) == int(_prodid):
|
||
return seriesname, cur_tvid, int(prodid)
|
||
|
||
if tvid:
|
||
break
|
||
|
||
return None, None, None
|
||
|
||
|
||
def sizeof_fmt(number, digits=1, sep=' '):
|
||
# type: (int, int, AnyStr) -> AnyStr
|
||
"""
|
||
format given bytes to human-readable text
|
||
|
||
:param number: value to convert
|
||
:param digits: number of digits after decimal point
|
||
:param sep: seperater of value and dimension
|
||
:return: human-readable formatted text
|
||
"""
|
||
for cur_dimension in ['bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
|
||
if 1024.0 > number:
|
||
return f'{number:3.{digits}f}{sep}{cur_dimension}'
|
||
number /= 1024.0
|
||
|
||
|
||
def list_media_files(path):
|
||
# type: (AnyStr) -> List[AnyStr]
|
||
"""
|
||
list all media files in given path
|
||
|
||
:param path: path
|
||
:return: list of media files
|
||
"""
|
||
result = []
|
||
if path:
|
||
if [direntry for direntry in scantree(path, include=[r'\.sickgearignore'], filter_kind=False, recurse=False)]:
|
||
logger.debug('Skipping folder "%s" because it contains ".sickgearignore"' % path)
|
||
else:
|
||
result = [direntry.path for direntry in scantree(path, exclude=['Extras'], filter_kind=False)
|
||
if has_media_ext(direntry.name)]
|
||
return result
|
||
|
||
|
||
def link(src_file, dest_file):
|
||
"""
|
||
|
||
:param src_file: source file
|
||
:type src_file: AnyStr
|
||
:param dest_file: destination file
|
||
:type dest_file: AnyStr
|
||
"""
|
||
if 'nt' == os.name:
|
||
import ctypes
|
||
|
||
if 0 == ctypes.windll.kernel32.CreateHardLinkW(text_type(dest_file), text_type(src_file), 0):
|
||
raise ctypes.WinError()
|
||
else:
|
||
os.link(src_file, dest_file)
|
||
|
||
|
||
def hardlink_file(src_file, dest_file):
|
||
"""
|
||
|
||
:param src_file: source file
|
||
:type src_file: AnyStr
|
||
:param dest_file: destination file
|
||
:type dest_file: AnyStr
|
||
"""
|
||
try:
|
||
link(src_file, dest_file)
|
||
fix_set_group_id(dest_file)
|
||
except (BaseException, Exception) as e:
|
||
logger.error(f'Failed to create hardlink of {src_file} at {dest_file}: {ex(e)}. Copying instead.')
|
||
copy_file(src_file, dest_file)
|
||
|
||
|
||
def symlink(src_file, dest_file):
|
||
"""
|
||
|
||
:param src_file: source file
|
||
:type src_file: AnyStr
|
||
:param dest_file: destination
|
||
:type dest_file: AnyStr
|
||
"""
|
||
if 'nt' == os.name:
|
||
import ctypes
|
||
|
||
if ctypes.windll.kernel32.CreateSymbolicLinkW(
|
||
text_type(dest_file), text_type(src_file), 1 if os.path.isdir(src_file) else 0) in [0, 1280]:
|
||
raise ctypes.WinError()
|
||
else:
|
||
os.symlink(src_file, dest_file)
|
||
|
||
|
||
def move_and_symlink_file(src_file, dest_file):
|
||
"""
|
||
|
||
:param src_file: source file
|
||
:type src_file: AnyStr
|
||
:param dest_file: destination file
|
||
:type dest_file: AnyStr
|
||
"""
|
||
try:
|
||
shutil.move(src_file, dest_file)
|
||
fix_set_group_id(dest_file)
|
||
symlink(dest_file, src_file)
|
||
except (BaseException, Exception):
|
||
logger.error(f'Failed to create symlink of {src_file} at {dest_file}. Copying instead')
|
||
copy_file(src_file, dest_file)
|
||
|
||
|
||
def rename_ep_file(cur_path, new_path, old_path_length=0, use_rename=False):
|
||
"""
|
||
Creates all folders needed to move a file to its new location, renames it, then cleans up any folders
|
||
left that are now empty.
|
||
|
||
:param cur_path: The absolute path to the file you want to move/rename
|
||
:type cur_path: AnyStr
|
||
:param new_path: The absolute path to the destination for the file WITHOUT THE EXTENSION
|
||
:type new_path: AnyStr
|
||
:param old_path_length: The length of media file path (old name) WITHOUT THE EXTENSION
|
||
:type old_path_length: int or long
|
||
:param use_rename: use rename instead of shutil.move
|
||
:return: success
|
||
:rtype: bool
|
||
"""
|
||
|
||
# new_dest_dir, new_dest_name = os.path.split(new_path)
|
||
|
||
if 0 == old_path_length or len(cur_path) < old_path_length:
|
||
# approach from the right
|
||
cur_file_name, cur_file_ext = os.path.splitext(cur_path)
|
||
else:
|
||
# approach from the left
|
||
cur_file_ext = cur_path[old_path_length:]
|
||
cur_file_name = cur_path[:old_path_length]
|
||
|
||
if cur_file_ext[1:] in subtitleExtensions:
|
||
# Extract subtitle language from filename
|
||
sublang = os.path.splitext(cur_file_name)[1][1:]
|
||
|
||
# Check if the language extracted from filename is a valid language
|
||
try:
|
||
_ = subliminal.language.Language(sublang, strict=True)
|
||
cur_file_ext = '.' + sublang + cur_file_ext
|
||
except ValueError:
|
||
pass
|
||
|
||
# put the extension on the incoming file
|
||
new_path += cur_file_ext
|
||
|
||
make_path(os.path.dirname(new_path), syno=True)
|
||
|
||
# move the file
|
||
try:
|
||
logger.log(f'Renaming file from {cur_path} to {new_path}')
|
||
if use_rename:
|
||
os.rename(cur_path, new_path)
|
||
else:
|
||
shutil.move(cur_path, new_path)
|
||
except (OSError, IOError, IsADirectoryError, NotADirectoryError, FileExistsError) as e:
|
||
logger.error(f'Failed renaming {cur_path} to {new_path}: {ex(e)}')
|
||
return False
|
||
|
||
# clean up any old folders that are empty
|
||
delete_empty_folders(os.path.dirname(cur_path))
|
||
|
||
return True
|
||
|
||
|
||
def delete_empty_folders(check_empty_dir, keep_dir=None):
|
||
"""
|
||
Walks backwards up the path and deletes any empty folders found.
|
||
|
||
:param check_empty_dir: The path to clean (absolute path to a folder)
|
||
:type check_empty_dir: AnyStr
|
||
:param keep_dir: Clean until this path is reached
|
||
:type keep_dir: bool
|
||
"""
|
||
|
||
# treat check_empty_dir as empty when it only contains these items
|
||
ignore_items = []
|
||
|
||
logger.log(f'Trying to clean any empty folders under {check_empty_dir}')
|
||
|
||
# as long as the folder exists and doesn't contain any files, delete it
|
||
while os.path.isdir(check_empty_dir) and check_empty_dir != keep_dir:
|
||
check_files = os.listdir(check_empty_dir)
|
||
|
||
if not check_files or (len(check_files) <= len(ignore_items) and all(
|
||
[check_file in ignore_items for check_file in check_files])):
|
||
# directory is empty or contains only ignore_items
|
||
try:
|
||
logger.log(f"Deleting empty folder: {check_empty_dir}")
|
||
# need shutil.rmtree when ignore_items is really implemented
|
||
os.rmdir(check_empty_dir)
|
||
# do a Synology library update
|
||
notifiers.NotifierFactory().get('SYNOINDEX').deleteFolder(check_empty_dir)
|
||
except OSError as e:
|
||
logger.warning(f'Unable to delete {check_empty_dir}: {repr(e)} / {ex(e)}')
|
||
break
|
||
check_empty_dir = os.path.dirname(check_empty_dir)
|
||
else:
|
||
break
|
||
|
||
|
||
def get_absolute_number_from_season_and_episode(show_obj, season, episode):
|
||
"""
|
||
|
||
:param show_obj: show object
|
||
:type show_obj: TVShow
|
||
:param season: season number
|
||
:type season: int
|
||
:param episode: episode number
|
||
:type episode: int
|
||
:return: absolute number
|
||
:type: int or long
|
||
"""
|
||
absolute_number = None
|
||
|
||
if season and episode:
|
||
my_db = db.DBConnection()
|
||
sql_result = my_db.select('SELECT *'
|
||
' FROM tv_episodes'
|
||
' WHERE indexer = ? AND showid = ? AND season = ? AND episode = ?',
|
||
[show_obj.tvid, show_obj.prodid, season, episode])
|
||
|
||
if 1 == len(sql_result):
|
||
absolute_number = int(sql_result[0]["absolute_number"])
|
||
logger.debug(f'Found absolute_number:{absolute_number} by {season}x{episode}')
|
||
else:
|
||
logger.debug('No entries for absolute number in show: %s found using %sx%s' %
|
||
(show_obj.unique_name, str(season), str(episode)))
|
||
|
||
return absolute_number
|
||
|
||
|
||
def get_all_episodes_from_absolute_number(show_obj, absolute_numbers):
|
||
# type: (TVShow, List[int]) -> Tuple[int, List[int]]
|
||
"""
|
||
|
||
:param show_obj: show object
|
||
:param absolute_numbers: absolute numbers
|
||
"""
|
||
episode_numbers = []
|
||
season_number = None
|
||
|
||
if show_obj and len(absolute_numbers):
|
||
for absolute_number in absolute_numbers:
|
||
ep_obj = show_obj.get_episode(None, None, absolute_number=absolute_number)
|
||
if ep_obj:
|
||
episode_numbers.append(ep_obj.episode)
|
||
season_number = ep_obj.season # this takes the last found season so eps that cross the season
|
||
# border are not handled well
|
||
|
||
return season_number, episode_numbers
|
||
|
||
|
||
def sanitize_scene_name(name):
|
||
"""
|
||
Takes a show name and returns the "scenified" version of it.
|
||
|
||
:param name: name
|
||
:type name: AnyStr
|
||
:return: A string containing the scene version of the show name given.
|
||
:rtype: AnyStr
|
||
"""
|
||
if name:
|
||
bad_chars = ',:()£\'!?\u2019'
|
||
|
||
# strip out any bad chars
|
||
name = re.sub(r'[%s]' % bad_chars, '', name, flags=re.U)
|
||
|
||
# tidy up stuff that doesn't belong in scene names
|
||
name = re.sub(r'(-?\s|/)', '.', name).replace('&', 'and')
|
||
name = re.sub(r"\.+", '.', name).rstrip('.')
|
||
|
||
return name
|
||
return ''
|
||
|
||
|
||
def create_https_certificates(ssl_cert, ssl_key):
|
||
"""
|
||
Create self-signed HTTPS certificares and store in paths 'ssl_cert' and 'ssl_key'
|
||
"""
|
||
|
||
try:
|
||
from lib.certgen import generate_key, generate_local_cert
|
||
except (BaseException, Exception):
|
||
return False
|
||
|
||
private_key = generate_key(key_size=4096, output_file=ssl_key)
|
||
cert = generate_local_cert(private_key, days_valid=3650, output_file=ssl_cert)
|
||
return bool(cert)
|
||
|
||
|
||
if '__main__' == __name__:
|
||
import doctest
|
||
|
||
doctest.testmod()
|
||
|
||
|
||
def parse_xml(data, del_xmlns=False):
|
||
# type: (AnyStr, bool) -> Optional[etree.ElementTree]
|
||
"""
|
||
Parse data into an xml elementtree.ElementTree
|
||
|
||
data: data string containing xml
|
||
del_xmlns: if True, removes xmlns namesspace from data before parsing
|
||
|
||
Returns: parsed data as elementtree or None
|
||
"""
|
||
|
||
if del_xmlns:
|
||
data = re.sub(' xmlns="[^"]+"', '', data)
|
||
|
||
if isinstance(data, text_type) and is_lxml:
|
||
data = RE_XML_ENCODING.sub(r'\1\3', data, count=1)
|
||
|
||
try:
|
||
parsed_xml = etree.fromstring(data)
|
||
except (BaseException, Exception) as e:
|
||
logger.debug(f"Error trying to parse xml data. Error: {ex(e)}")
|
||
parsed_xml = None
|
||
|
||
return parsed_xml
|
||
|
||
|
||
def backup_versioned_file(old_file, version):
|
||
"""
|
||
|
||
:param old_file: old filename
|
||
:type old_file: AnyStr
|
||
:param version: version number
|
||
:type version: int
|
||
:return: success
|
||
:rtype: bool
|
||
"""
|
||
num_tries = 0
|
||
|
||
new_file = '%s.v%s' % (old_file, version)
|
||
|
||
if os.path.isfile(new_file):
|
||
changed_old_db = False
|
||
for back_nr in range(1, 10000):
|
||
alt_name = '%s.r%s' % (new_file, back_nr)
|
||
if not os.path.isfile(alt_name):
|
||
try:
|
||
shutil.move(new_file, alt_name)
|
||
changed_old_db = True
|
||
break
|
||
except (BaseException, Exception):
|
||
if os.path.isfile(new_file):
|
||
continue
|
||
logger.warning('could not rename old backup db file')
|
||
if not changed_old_db:
|
||
raise Exception('can\'t create a backup of db')
|
||
|
||
while not os.path.isfile(new_file):
|
||
if not os.path.isfile(old_file) or 0 == get_size(old_file):
|
||
logger.debug('No need to create backup')
|
||
break
|
||
|
||
try:
|
||
logger.debug(f'Trying to back up {old_file} to {new_file}')
|
||
shutil.copy(old_file, new_file)
|
||
logger.debug('Backup done')
|
||
break
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Error while trying to back up {old_file} to {new_file} : {ex(e)}')
|
||
num_tries += 1
|
||
time.sleep(3)
|
||
logger.debug('Trying again.')
|
||
|
||
if 3 <= num_tries:
|
||
logger.error(f'Unable to back up {old_file} to {new_file} please do it manually.')
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def restore_versioned_file(backup_file, version):
|
||
"""
|
||
|
||
:param backup_file: filename
|
||
:type backup_file: AnyStr
|
||
:param version: version number
|
||
:type version: int
|
||
:return: success
|
||
:rtype: bool
|
||
"""
|
||
num_tries = 0
|
||
|
||
new_file, backup_version = os.path.splitext(backup_file)
|
||
restore_file = new_file + '.' + 'v' + str(version)
|
||
|
||
if not os.path.isfile(new_file):
|
||
logger.debug(f'Not restoring, {new_file} doesn\'t exist')
|
||
return False
|
||
|
||
try:
|
||
logger.debug(f'Trying to backup {new_file} to {new_file}.r{version} before restoring backup')
|
||
shutil.move(new_file, new_file + '.' + 'r' + str(version))
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Error while trying to backup DB file {restore_file} before proceeding with restore: {ex(e)}')
|
||
return False
|
||
|
||
while not os.path.isfile(new_file):
|
||
if not os.path.isfile(restore_file):
|
||
logger.debug(f'Not restoring, {restore_file} doesn\'t exist')
|
||
break
|
||
|
||
try:
|
||
logger.debug(f'Trying to restore {restore_file} to {new_file}')
|
||
shutil.copy(restore_file, new_file)
|
||
logger.debug('Restore done')
|
||
break
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Error while trying to restore {restore_file}: {ex(e)}')
|
||
num_tries += 1
|
||
time.sleep(1)
|
||
logger.debug('Trying again.')
|
||
|
||
if 10 <= num_tries:
|
||
logger.error(f'Unable to restore {restore_file} to {new_file} please do it manually.')
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
# try to convert to float, return default on failure
|
||
def try_float(s, s_default=0.0):
|
||
try:
|
||
return float(s)
|
||
except (BaseException, Exception):
|
||
return float(s_default)
|
||
|
||
|
||
# generates a md5 hash of a file
|
||
def md5_for_file(filename, block_size=2 ** 16):
|
||
"""
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:param block_size: block size
|
||
:type block_size: int or long
|
||
:return:
|
||
:rtype: AnyStr or None
|
||
"""
|
||
try:
|
||
with open(filename, 'rb') as f:
|
||
md5 = hashlib.md5()
|
||
while True:
|
||
data = f.read(block_size)
|
||
if not data:
|
||
break
|
||
md5.update(data)
|
||
f.close()
|
||
return md5.hexdigest()
|
||
except (BaseException, Exception):
|
||
return None
|
||
|
||
|
||
def get_lan_ip():
|
||
"""
|
||
Simple function to get LAN localhost_ip
|
||
http://stackoverflow.com/questions/11735821/python-get-localhost-ip
|
||
"""
|
||
|
||
if 'nt' != os.name:
|
||
# noinspection PyUnresolvedReferences
|
||
import fcntl
|
||
import struct
|
||
|
||
def get_interface_ip(if_name):
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', if_name[:15]))[20:24])
|
||
|
||
ip = socket.gethostbyname(socket.gethostname())
|
||
if ip.startswith("127.") and "nt" != os.name:
|
||
interfaces = [
|
||
"eth0",
|
||
"eth1",
|
||
"eth2",
|
||
"wlan0",
|
||
"wlan1",
|
||
"wifi0",
|
||
"ath0",
|
||
"ath1",
|
||
"ppp0",
|
||
]
|
||
for ifname in interfaces:
|
||
try:
|
||
# noinspection PyUnboundLocalVariable
|
||
ip = get_interface_ip(ifname)
|
||
print(ifname, ip)
|
||
break
|
||
except IOError:
|
||
pass
|
||
return ip
|
||
|
||
|
||
def check_url(url):
|
||
"""
|
||
Check if a URL exists without downloading the whole file.
|
||
:param url: url
|
||
:type url: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
try:
|
||
return requests.head(url).ok
|
||
except (BaseException, Exception):
|
||
return False
|
||
|
||
|
||
def anon_url(*url):
|
||
"""
|
||
|
||
:param url: url
|
||
:type url:
|
||
:return: a URL string consisting of the Anonymous redirect URL and an arbitrary number of values appended
|
||
:rtype: AnyStr
|
||
"""
|
||
return '' if None in url else '%s%s' % (sickgear.ANON_REDIRECT, ''.join([str(s) for s in url]))
|
||
|
||
|
||
def starify(text, verify=False):
|
||
"""
|
||
If verify is true, return true if text is a star block created text else return false.
|
||
|
||
:param text: text
|
||
:type text: AnyStr
|
||
:param verify:
|
||
:type verify: bool
|
||
:return: Return text input string with either its latter half or its centre area (if 12 chars or more)
|
||
replaced with asterisks. Useful for securely presenting api keys to a ui.
|
||
"""
|
||
return '' if not text\
|
||
else ((('%s%s' % (text[:len(text) // 2], '*' * (len(text) // 2))),
|
||
('%s%s%s' % (text[:4], '*' * (len(text) - 8), text[-4:])))[12 <= len(text)],
|
||
set('*') == set((text[len(text) // 2:], text[4:-4])[12 <= len(text)]))[verify]
|
||
|
||
|
||
"""
|
||
Encryption
|
||
==========
|
||
By Pedro Jose Pereira Vieito <pvieito@gmail.com> (@pvieito)
|
||
|
||
* If encryption_version==0 then return data without encryption
|
||
* The keys should be unique for each device
|
||
|
||
To add a new encryption_version:
|
||
1) Code your new encryption_version
|
||
2) Update the last encryption_version available in webserve.py
|
||
3) Remember to maintain old encryption versions and key generators for retrocompatibility
|
||
"""
|
||
|
||
# Key Generators
|
||
unique_key1 = hex(uuid.getnode() ** 2) # Used in encryption v1
|
||
|
||
|
||
# Encryption Functions
|
||
def encrypt(data, encryption_version=0, do_decrypt=False):
|
||
# Version 1: Simple XOR encryption (this is not very secure, but works)
|
||
if 1 == encryption_version:
|
||
if do_decrypt:
|
||
return ''.join([chr(try_ord(x) ^ try_ord(y)) for (x, y) in
|
||
zip(b64decodebytes(decode_bytes(data)), cycle(unique_key1))])
|
||
|
||
return decode_str(b64encodebytes(decode_bytes(
|
||
''.join([chr(try_ord(x) ^ try_ord(y)) for (x, y) in zip(data, cycle(unique_key1))])))).strip()
|
||
|
||
# Version 0: Plain text
|
||
return data
|
||
|
||
|
||
def decrypt(data, encryption_version=0):
|
||
return encrypt(data, encryption_version, do_decrypt=True)
|
||
|
||
|
||
def full_sanitize_scene_name(name):
|
||
"""
|
||
sanitize scene name
|
||
|
||
:param name: name
|
||
:type name: AnyStr
|
||
:return: sanitized name
|
||
:rtype: AnyStr
|
||
"""
|
||
return re.sub('[. -]', ' ', sanitize_scene_name(name)).lower().lstrip()
|
||
|
||
|
||
def get_show(name, try_scene_exceptions=False):
|
||
# type: (AnyStr, bool) -> Optional[TVShow]
|
||
"""
|
||
get show object for show with given name
|
||
|
||
:param name: name of show
|
||
:type name: AnyStr
|
||
:param try_scene_exceptions: check scene exceptions
|
||
:type try_scene_exceptions: bool
|
||
:return: None or show object
|
||
:type: TVShow or None
|
||
"""
|
||
if not sickgear.showList or None is name:
|
||
return
|
||
|
||
show_obj = None
|
||
|
||
try:
|
||
tvid, prodid = sickgear.name_cache.retrieve_name_from_cache(name)
|
||
if tvid and prodid:
|
||
show_obj = find_show_by_id({tvid: prodid})
|
||
|
||
if not show_obj and try_scene_exceptions:
|
||
tvid, prodid, season = sickgear.scene_exceptions.get_scene_exception_by_name(name)
|
||
if tvid and prodid:
|
||
show_obj = find_show_by_id({tvid: prodid})
|
||
except (BaseException, Exception) as e:
|
||
logger.debug(f'Error when attempting to find show: {name} in SickGear: {ex(e)}')
|
||
|
||
return show_obj
|
||
|
||
|
||
def is_hidden_folder(folder):
|
||
"""
|
||
On Linux based systems hidden folders start with . (dot)
|
||
|
||
:param folder: Full path of folder to check
|
||
:type folder: AnyStr
|
||
:return: Returns True if folder is hidden
|
||
:rtype: bool
|
||
"""
|
||
if os.path.isdir(folder):
|
||
if os.path.basename(folder).startswith('.'):
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def real_path(path):
|
||
"""
|
||
The resulting path will have no symbolic link, '/./' or '/../' components.
|
||
|
||
:param path: path
|
||
:type path: AnyStr
|
||
:return: the canonicalized absolute pathname
|
||
:rtype: AnyStr
|
||
"""
|
||
return os.path.normpath(os.path.normcase(os.path.realpath(os.path.expanduser(path))))
|
||
|
||
|
||
def validate_show(show_obj, season=None, episode=None):
|
||
"""
|
||
|
||
:param show_obj: show object
|
||
:type show_obj: TVShow
|
||
:param season: optional season
|
||
:type season: int or None
|
||
:param episode: opitonal episode
|
||
:type episode: int or None
|
||
:return: TVInfoAPI source
|
||
:rtype: object
|
||
"""
|
||
show_lang = show_obj.lang
|
||
|
||
try:
|
||
tvinfo_config = sickgear.TVInfoAPI(show_obj.tvid).api_params.copy()
|
||
tvinfo_config['dvdorder'] = 0 != show_obj.dvdorder
|
||
|
||
if show_lang and not 'en' == show_lang:
|
||
tvinfo_config['language'] = show_lang
|
||
|
||
t = sickgear.TVInfoAPI(show_obj.tvid).setup(**tvinfo_config)
|
||
if season is None and episode is None:
|
||
return t
|
||
|
||
return t.get_show(show_obj.prodid, language=show_obj.lang)[season][episode]
|
||
except (BaseTVinfoEpisodenotfound, BaseTVinfoSeasonnotfound, TypeError):
|
||
pass
|
||
|
||
|
||
def _maybe_request_url(e, def_url=''):
|
||
return hasattr(e, 'request') and hasattr(e.request, 'url') and ' ' + e.request.url or def_url
|
||
|
||
|
||
def clear_cache(force=False):
|
||
"""
|
||
clear sickgear cache folder
|
||
|
||
:param force: force clearing
|
||
:type force: bool
|
||
"""
|
||
# clean out cache directory, remove everything > 12 hours old
|
||
dirty = None
|
||
del_time = SGDatetime.timestamp_near(td=datetime.timedelta(hours=12))
|
||
direntry_args = dict(follow_symlinks=False)
|
||
for direntry in scantree(sickgear.CACHE_DIR, ['images|rss|zoneinfo'], follow_symlinks=True):
|
||
if direntry.is_file(**direntry_args) and (force or del_time > direntry.stat(**direntry_args).st_mtime):
|
||
dirty = dirty or False if remove_file_perm(direntry.path) else True
|
||
elif direntry.is_dir(**direntry_args) and direntry.name not in ['cheetah', 'sessions', 'indexers']:
|
||
dirty = dirty or False
|
||
try:
|
||
os.rmdir(direntry.path)
|
||
except OSError:
|
||
dirty = True
|
||
|
||
logger.log(
|
||
f'{(("Found items not removed", "Found items removed")[not dirty], "No items found to remove")[None is dirty]}'
|
||
f' from cache folder {sickgear.CACHE_DIR}')
|
||
|
||
|
||
def human(size):
|
||
"""
|
||
format a size in bytes into a 'human' file size, e.g. bytes, KB, MB, GB, TB, PB
|
||
Note that bytes/KB will be reported in whole numbers but MB and above will have greater precision
|
||
e.g. 1 byte, 43 bytes, 443 KB, 4.3 MB, 4.43 GB, etc
|
||
|
||
:param size: numerical value to be converted
|
||
:type size: int or long or float
|
||
:return: human readable string
|
||
:rtype: AnyStr
|
||
"""
|
||
if 1 == size:
|
||
# because I really hate unnecessary plurals
|
||
return "1 byte"
|
||
|
||
suffixes_table = [('bytes', 0), ('KB', 0), ('MB', 1), ('GB', 2), ('TB', 2), ('PB', 2)]
|
||
|
||
num = float(size)
|
||
for suffix, precision in suffixes_table:
|
||
if 1024.0 > num:
|
||
break
|
||
num /= 1024.0
|
||
|
||
# noinspection PyUnboundLocalVariable
|
||
if 0 == precision:
|
||
formatted_size = '%d' % num
|
||
else:
|
||
formatted_size = str(round(num, ndigits=precision))
|
||
|
||
# noinspection PyUnboundLocalVariable
|
||
return '%s %s' % (formatted_size, suffix)
|
||
|
||
|
||
def get_size(start_path='.'):
|
||
"""
|
||
return combined size of data in given path
|
||
|
||
:param start_path: start path
|
||
:type start_path: AnyStr
|
||
:return: size in bytes
|
||
:rtype: int or long
|
||
"""
|
||
if os.path.isfile(start_path):
|
||
return os.path.getsize(start_path)
|
||
try:
|
||
return sum(map((lambda x: x.stat(follow_symlinks=False).st_size), scantree(start_path)))
|
||
except OSError:
|
||
return 0
|
||
|
||
|
||
def get_media_stats(start_path='.'):
|
||
# type: (AnyStr) -> Tuple[int, int, int, int]
|
||
"""
|
||
return recognised media stats for a folder as...
|
||
|
||
number of media files, smallest size in bytes, largest size in bytes, average size in bytes
|
||
|
||
:param start_path: path to scan
|
||
"""
|
||
if os.path.isdir(start_path):
|
||
sizes = sorted(map(lambda y: y.stat(follow_symlinks=False).st_size,
|
||
filter(lambda x: has_media_ext(x.name), scantree(start_path))))
|
||
if sizes:
|
||
return len(sizes), sizes[0], sizes[-1], int(sum(sizes) / len(sizes))
|
||
|
||
elif os.path.isfile(start_path):
|
||
size = os.path.getsize(start_path)
|
||
return 1, size, size, size
|
||
|
||
return 0, 0, 0, 0
|
||
|
||
|
||
def remove_article(text=''):
|
||
"""
|
||
remove articles from text
|
||
|
||
:param text: input text
|
||
:type text: AnyStr
|
||
:return: text without articles
|
||
:rtype: AnyStr
|
||
"""
|
||
return re.sub(r'(?i)^(?:A(?!\s+to)n?|The)\s(\w)', r'\1', text)
|
||
|
||
|
||
def re_valid_hostname(with_allowed=True):
|
||
this_host = socket.gethostname()
|
||
return re.compile(r'(?i)(%slocalhost|.*\.local%s%s)$' % (
|
||
(with_allowed
|
||
and '%s|' % (sickgear.ALLOWED_HOSTS
|
||
and '|'.join(re.escape(x.strip()) for x in sickgear.ALLOWED_HOSTS.split(','))
|
||
or '.*')
|
||
or ''),
|
||
bool(this_host) and ('|%s' % this_host) or '',
|
||
sickgear.ALLOW_ANYIP and ('|%s' % valid_ipaddr_expr()) or ''))
|
||
|
||
|
||
def valid_ipaddr_expr():
|
||
"""
|
||
Returns a regular expression that will validate an ip address
|
||
:return: Regular expression
|
||
:rtype: String
|
||
"""
|
||
return r'(%s)' % '|'.join([re.sub(r'\s+(#.[^\r\n]+)?', '', x) for x in [
|
||
# IPv4 address (accurate)
|
||
# Matches 0.0.0.0 through 255.255.255.255
|
||
r'''
|
||
(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])
|
||
''',
|
||
# IPv6 address (standard and mixed)
|
||
# 8 hexadecimal words, or 6 hexadecimal words followed by 4 decimal bytes All with optional leading zeros
|
||
r'''
|
||
(?:(?<![:.\w])\[? # Anchor address
|
||
(?:[A-F0-9]{1,4}:){6} # 6 words
|
||
(?:[A-F0-9]{1,4}:[A-F0-9]{1,4} # 2 words
|
||
| (?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3} # or 4 bytes
|
||
(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])
|
||
)(?![:.\w])) # Anchor address
|
||
''',
|
||
# IPv6 address (compressed and compressed mixed)
|
||
# 8 hexadecimal words, or 6 hexadecimal words followed by 4 decimal bytes
|
||
# All with optional leading zeros. Consecutive zeros may be replaced with ::
|
||
r'''
|
||
(?:(?<![:.\w])\[?(?: # Anchor address
|
||
(?: # Mixed
|
||
(?:[A-F0-9]{1,4}:){6} # Non-compressed
|
||
|(?=(?:[A-F0-9]{0,4}:){2,6} # Compressed with 2 to 6 colons
|
||
(?:[0-9]{1,3}\.){3}[0-9]{1,3} # and 4 bytes
|
||
(?![:.\w])) # and anchored
|
||
(([0-9A-F]{1,4}:){1,5}|:)((:[0-9A-F]{1,4}){1,5}:|:) # and at most 1 double colon
|
||
|::(?:[A-F0-9]{1,4}:){5} # Compressed with 7 colons and 5 numbers
|
||
)
|
||
(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3} # 255.255.255.
|
||
(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9]) # 255
|
||
| # Standard
|
||
(?:[A-F0-9]{1,4}:){7}[A-F0-9]{1,4} # Standard
|
||
| # Compressed
|
||
(?=(?:[A-F0-9]{0,4}:){0,7}[A-F0-9]{0,4} # Compressed with at most 7 colons
|
||
(?![:.\w])) # and anchored
|
||
(([0-9A-F]{1,4}:){1,7}|:)((:[0-9A-F]{1,4}){1,7}|:) # and at most 1 double colon
|
||
|(?:[A-F0-9]{1,4}:){7}:|:(:[A-F0-9]{1,4}){7} # Compressed with 8 colons
|
||
)(?![:.\w])) # Anchor address
|
||
'''
|
||
]])
|
||
|
||
|
||
def build_dict(seq, key):
|
||
"""
|
||
|
||
:param seq: Iterable sequence
|
||
:type seq: Iterable
|
||
:param key: key
|
||
:type key: AnyStr
|
||
:return: returns dict
|
||
:rtype: Dict
|
||
"""
|
||
return dict([(d[key], dict(d, index=index)) for (index, d) in enumerate(seq)])
|
||
|
||
|
||
def client_host(server_host):
|
||
"""Extracted from cherrypy libs
|
||
Return the host on which a client can connect to the given listener."""
|
||
if '0.0.0.0' == server_host:
|
||
# 0.0.0.0 is INADDR_ANY, which should answer on localhost.
|
||
return '127.0.0.1'
|
||
if server_host in ('::', '::0', '::0.0.0.0'):
|
||
# :: is IN6ADDR_ANY, which should answer on localhost.
|
||
# ::0 and ::0.0.0.0 are non-canonical but common ways to write
|
||
# IN6ADDR_ANY.
|
||
return '::1'
|
||
return server_host
|
||
|
||
|
||
def wait_for_free_port(host, port):
|
||
"""Extracted from cherrypy libs
|
||
Wait for the specified port to become free (drop requests)."""
|
||
if not host:
|
||
raise ValueError("Host values of '' or None are not allowed.")
|
||
for trial in range(50):
|
||
try:
|
||
# we are expecting a free port, so reduce the timeout
|
||
check_port(host, port, timeout=0.1)
|
||
except IOError:
|
||
# Give the old server thread time to free the port.
|
||
time.sleep(0.1)
|
||
else:
|
||
return
|
||
|
||
raise IOError("Port %r is not free on %r" % (port, host))
|
||
|
||
|
||
def check_port(host, port, timeout=1.0):
|
||
"""Extracted from cherrypy libs
|
||
Raise an error if the given port is not free on the given host."""
|
||
if not host:
|
||
raise ValueError("Host values of '' or None are not allowed.")
|
||
host = client_host(host)
|
||
port = int(port)
|
||
|
||
import socket
|
||
|
||
# AF_INET or AF_INET6 socket
|
||
# Get the correct address family for our host (allows IPv6 addresses)
|
||
try:
|
||
info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
|
||
socket.SOCK_STREAM)
|
||
except socket.gaierror:
|
||
if ':' in host:
|
||
info = [(socket.AF_INET6, socket.SOCK_STREAM, 0, "", (host, port, 0, 0))]
|
||
else:
|
||
info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", (host, port))]
|
||
|
||
for res in info:
|
||
af, socktype, proto, canonname, sa = res
|
||
s = None
|
||
try:
|
||
s = socket.socket(af, socktype, proto)
|
||
# See http://groups.google.com/group/cherrypy-users/browse_frm/thread/bbfe5eb39c904fe0
|
||
s.settimeout(timeout)
|
||
s.connect((host, port))
|
||
s.close()
|
||
raise IOError("Port %s is in use on %s; perhaps the previous "
|
||
"httpserver did not shut down properly." %
|
||
(repr(port), repr(host)))
|
||
except socket.error:
|
||
if s:
|
||
s.close()
|
||
|
||
|
||
def clear_unused_providers():
|
||
providers = [x.cache.providerID for x in sickgear.providers.sorted_sources() if x.is_active()]
|
||
|
||
if providers:
|
||
my_db = db.DBConnection('cache.db')
|
||
my_db.action('DELETE FROM provider_cache WHERE provider NOT IN (%s)' % ','.join(['?'] * len(providers)),
|
||
providers)
|
||
|
||
|
||
def make_search_segment_html_string(segment, max_eps=5):
|
||
seg_str = ''
|
||
if segment and not isinstance(segment, list):
|
||
segment = [segment]
|
||
if segment and len(segment) > max_eps:
|
||
seasons = [x for x in set([x.season for x in segment])]
|
||
seg_str = f'Season{maybe_plural(len(seasons))}: '
|
||
divider = ''
|
||
for x in seasons:
|
||
eps = [str(s.episode) for s in segment if x == s.season]
|
||
ep_c = len(eps)
|
||
seg_str += '%s%s <span title="Episode%s: %s">(%s Ep%s)</span>' \
|
||
% (divider, x, maybe_plural(ep_c), ', '.join(eps), ep_c, maybe_plural(ep_c))
|
||
divider = ', '
|
||
elif segment:
|
||
episode_numbers = ['S%sE%s' % (str(x.season).zfill(2), str(x.episode).zfill(2)) for x in segment]
|
||
seg_str = f'Episode{maybe_plural(len(episode_numbers))}: {", ".join(episode_numbers)}'
|
||
return seg_str
|
||
|
||
|
||
def has_anime():
|
||
"""
|
||
:return: if there are any anime shows in show list
|
||
:rtype: bool
|
||
"""
|
||
# noinspection PyTypeChecker
|
||
return False if not sickgear.showList else any(filter(lambda show: show.is_anime, sickgear.showList))
|
||
|
||
|
||
def cpu_sleep():
|
||
if cpu_presets[sickgear.CPU_PRESET]:
|
||
time.sleep(cpu_presets[sickgear.CPU_PRESET])
|
||
|
||
|
||
def cleanup_cache():
|
||
"""
|
||
Delete old cached files
|
||
"""
|
||
delete_not_changed_in(
|
||
[os.path.join(sickgear.CACHE_DIR, 'images', 'browse', 'thumb', x)
|
||
for x in ['anidb', 'imdb', 'trakt', 'tvdb']] +
|
||
[os.path.join(sickgear.CACHE_DIR, 'images', x)
|
||
for x in ['characters', 'person']] +
|
||
[os.path.join(sickgear.CACHE_DIR, 'tvinfo_cache')])
|
||
|
||
|
||
def delete_not_changed_in(paths, days=30, minutes=0):
|
||
"""
|
||
Delete files under paths not changed in n days and/or n minutes.
|
||
If a file was modified later than days/and or minutes, then don't delete it.
|
||
|
||
:param paths: Path(s) to scan for files to delete
|
||
:type paths: String or List of strings
|
||
:param days: Purge files not modified in this number of days (default: 30 days)
|
||
:param minutes: Purge files not modified in this number of minutes (default: 0 minutes)
|
||
:return: tuple; number of files that qualify for deletion, number of qualifying files that failed to be deleted
|
||
"""
|
||
del_time = SGDatetime.timestamp_near(td=datetime.timedelta(days=days, minutes=minutes))
|
||
errors = 0
|
||
qualified = 0
|
||
for cur_path in (paths, [paths])[not isinstance(paths, list)]:
|
||
try:
|
||
for direntry in scantree(cur_path, filter_kind=False):
|
||
if del_time > direntry.stat(follow_symlinks=False).st_mtime:
|
||
if not remove_file_perm(direntry.path):
|
||
errors += 1
|
||
qualified += 1
|
||
except (BaseException, Exception):
|
||
pass
|
||
return qualified, errors
|
||
|
||
|
||
def set_file_timestamp(filename, min_age=3, new_time=None):
|
||
"""
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:param min_age: minimum age in days
|
||
:type min_age: int
|
||
:param new_time:
|
||
:type new_time: None or int
|
||
"""
|
||
min_time = SGDatetime.timestamp_near(td=datetime.timedelta(days=min_age))
|
||
try:
|
||
if os.path.isfile(filename) and os.path.getmtime(filename) < min_time:
|
||
os.utime(filename, new_time)
|
||
except (BaseException, Exception):
|
||
pass
|
||
|
||
|
||
def should_delete_episode(status):
|
||
"""
|
||
check if episode should be deleted from db
|
||
|
||
:param status: episode status
|
||
:type status: int
|
||
:return: should be deleted
|
||
:rtype: bool
|
||
"""
|
||
s = Quality.split_composite_status(status)[0]
|
||
if s not in SNATCHED_ANY + [DOWNLOADED, ARCHIVED, IGNORED]:
|
||
return True
|
||
logger.debug('not safe to delete episode from db because of status: %s' % statusStrings[s])
|
||
return False
|
||
|
||
|
||
def is_link(filepath):
|
||
"""
|
||
Check if given file/pathname is symbolic link
|
||
|
||
:param filepath: file or path to check
|
||
:return: True or False
|
||
"""
|
||
if 'win32' == sys.platform:
|
||
if not os.path.exists(filepath):
|
||
return False
|
||
|
||
import ctypes
|
||
invalid_file_attributes = 0xFFFFFFFF
|
||
file_attribute_reparse_point = 0x0400
|
||
|
||
attr = ctypes.windll.kernel32.GetFileAttributesW(text_type(filepath))
|
||
return invalid_file_attributes != attr and 0 != attr & file_attribute_reparse_point
|
||
|
||
return os.path.islink(filepath)
|
||
|
||
|
||
def find_mount_point(path):
|
||
# type: (AnyStr) -> AnyStr
|
||
"""
|
||
returns the mount point for the given path
|
||
|
||
:param path: to find the mount path
|
||
:return: mount point for path or path if no mount
|
||
"""
|
||
result = path
|
||
if os.path.exists(path):
|
||
result = os.path.realpath(os.path.abspath(path))
|
||
try:
|
||
while not os.path.ismount(result):
|
||
new_path = os.path.dirname(result)
|
||
if new_path == result:
|
||
# return input path if mount point not found
|
||
return path
|
||
result = new_path
|
||
except (BaseException, Exception):
|
||
return path
|
||
return result
|
||
|
||
|
||
def df():
|
||
# type: (...) -> Tuple[List[Tuple[AnyStr, AnyStr]], bool]
|
||
"""
|
||
Return disk free space at known parent locations
|
||
|
||
:return: string path, string value that is formatted size
|
||
"""
|
||
result = []
|
||
min_output = True # flag ui to output minimal (e.g. vol: size, vol: size)
|
||
if sickgear.ROOT_DIRS and sickgear.DISPLAY_FREESPACE:
|
||
targets = []
|
||
for cur_target in filter(lambda _t: _t and _t not in targets,
|
||
map(find_mount_point, sickgear.ROOT_DIRS.split('|')[1:])):
|
||
targets += [cur_target]
|
||
free = freespace(cur_target)
|
||
if 'win32' == sys.platform and None is not free:
|
||
cur_target = os.path.splitdrive(cur_target)[0]
|
||
if any(['win32' == sys.platform and not re.match('(?i)[a-z]:(\\\\)?$', cur_target),
|
||
# Windows, if a simple drive letter isn't found, fallback to full path. On Linux, full path is used
|
||
# trigger ui to output long paths instead of minimal volume letters layout
|
||
sys.platform.startswith(('linux', 'darwin', 'sunos5')), 'bsd' in sys.platform]):
|
||
min_output = False
|
||
result += [(cur_target, 'unavailable' if None is free else sizeof_fmt(free, sep=''))]
|
||
|
||
return result, min_output
|
||
|
||
|
||
def freespace(path=None):
|
||
"""
|
||
Return free space available at path location
|
||
|
||
:param path: Example paths (Windows) = '\\\\192.168.0.1\\sharename\\existing_path', 'd:\\existing_path'
|
||
Untested with mount points under linux
|
||
:type path: AnyStr
|
||
:return: Size in bytes
|
||
:rtype: long or None
|
||
"""
|
||
result = None
|
||
|
||
if 'win32' == sys.platform:
|
||
try:
|
||
import ctypes
|
||
if None is not ctypes:
|
||
max_val = (2 ** 64) - 1
|
||
storage = ctypes.c_ulonglong(max_val)
|
||
ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(storage))
|
||
result = (storage.value, None)[max_val == storage.value]
|
||
except (BaseException, Exception):
|
||
pass
|
||
elif sys.platform.startswith(('linux', 'darwin', 'sunos5')) or 'bsd' in sys.platform:
|
||
try:
|
||
storage = os.statvfs(path) # perms errors can result
|
||
result = storage.f_bavail * storage.f_frsize
|
||
except OSError:
|
||
pass
|
||
|
||
return result
|
||
|
||
|
||
def path_mapper(search, replace, subject):
|
||
"""
|
||
Substitute strings in a path
|
||
|
||
:param search: Search text
|
||
:type search: AnyStr
|
||
:param replace: Replacement text
|
||
:type replace: AnyStr
|
||
:param subject: Path text to search
|
||
:type subject: AnyStr
|
||
:return: Subject with or without substitution, True if a change was made otherwise False
|
||
:rtype: Tuple[AnyStr, bool]
|
||
"""
|
||
delim = '/!~!/'
|
||
search = re.sub(r'\\', delim, search)
|
||
replace = re.sub(r'\\', delim, replace)
|
||
path = re.sub(r'\\', delim, subject)
|
||
result = re.sub('(?i)^%s' % search, replace, path)
|
||
result = os.path.normpath(re.sub(delim, '/', result))
|
||
|
||
return result, result != subject
|
||
|
||
|
||
def get_overview(ep_status, show_quality, upgrade_once, split_snatch=False):
|
||
# type: (integer_types, integer_types, Union[integer_types, bool], bool) -> integer_types
|
||
"""
|
||
|
||
:param ep_status: episode status
|
||
:param show_quality: show quality
|
||
:param upgrade_once: upgrade once
|
||
:param split_snatch:
|
||
:type split_snatch: bool
|
||
:return: constant from classes Overview
|
||
"""
|
||
status, quality = Quality.split_composite_status(ep_status)
|
||
if ARCHIVED == status:
|
||
return Overview.GOOD
|
||
if WANTED == status:
|
||
return Overview.WANTED
|
||
if status in (SKIPPED, IGNORED):
|
||
return Overview.SKIPPED
|
||
if status in (UNAIRED, UNKNOWN):
|
||
return Overview.UNAIRED
|
||
if status in [SUBTITLED] + Quality.SNATCHED_ANY + Quality.DOWNLOADED + Quality.FAILED:
|
||
|
||
if FAILED == status:
|
||
return Overview.WANTED
|
||
if not split_snatch and status in SNATCHED_ANY:
|
||
return Overview.SNATCHED
|
||
|
||
void, best_qualities = Quality.split_quality(show_quality)
|
||
# if re-downloads aren't wanted then mark it "good" if there is anything
|
||
if not len(best_qualities):
|
||
return Overview.GOOD
|
||
|
||
min_best, max_best = min(best_qualities), max(best_qualities)
|
||
if quality >= max_best \
|
||
or (upgrade_once and
|
||
(quality in best_qualities or (None is not min_best and quality > min_best))):
|
||
return Overview.GOOD
|
||
return (Overview.QUAL, Overview.SNATCHED_QUAL)[status in SNATCHED_ANY]
|
||
|
||
|
||
def generate_show_dir_name(root_dir, show_name):
|
||
# type: (Optional[AnyStr], AnyStr) -> AnyStr
|
||
"""
|
||
generate show dir name
|
||
|
||
:param root_dir: root dir
|
||
:param show_name: show name
|
||
:return: show dir name
|
||
"""
|
||
san_show_name = sanitize_filename(show_name)
|
||
if sickgear.SHOW_DIRS_WITH_DOTS:
|
||
san_show_name = san_show_name.replace(' ', '.')
|
||
if None is root_dir:
|
||
return san_show_name
|
||
return os.path.join(root_dir, san_show_name)
|
||
|
||
|
||
def count_files_dirs(base_dir):
|
||
"""
|
||
|
||
:param base_dir: path
|
||
:type base_dir: AnyStr
|
||
:return: tuple of count of files, dirs
|
||
:rtype: Tuple[int, int]
|
||
"""
|
||
f = d = 0
|
||
try:
|
||
files = scandir(base_dir)
|
||
except OSError as e:
|
||
logger.warning('Unable to count files %s / %s' % (repr(e), ex(e)))
|
||
else:
|
||
for e in files:
|
||
if e.is_file():
|
||
f += 1
|
||
elif e.is_dir():
|
||
d += 1
|
||
|
||
return f, d
|
||
|
||
|
||
def upgrade_new_naming():
|
||
my_db = db.DBConnection()
|
||
sql_result = my_db.select('SELECT indexer AS tv_id, indexer_id AS prod_id FROM tv_shows')
|
||
show_list = {}
|
||
for cur_result in sql_result:
|
||
show_list[int(cur_result['prod_id'])] = int(cur_result['tv_id'])
|
||
|
||
if sickgear.FANART_RATINGS:
|
||
from sickgear.tv import TVidProdid
|
||
ne = {}
|
||
for k, v in iteritems(sickgear.FANART_RATINGS):
|
||
nk = show_list.get(try_int(k))
|
||
if nk:
|
||
ne[TVidProdid({nk: int(k)})()] = sickgear.FANART_RATINGS[k]
|
||
sickgear.FANART_RATINGS = ne
|
||
sickgear.CFG.setdefault('GUI', {})['fanart_ratings'] = '%s' % ne
|
||
sickgear.CFG.write()
|
||
|
||
image_cache_dir = os.path.join(sickgear.CACHE_DIR, 'images')
|
||
bp_match = re.compile(r'(\d+)\.((?:banner|poster|(?:\d+(?:\.\w*)?\.\w{5,8}\.)?fanart)\.jpg)', flags=re.I)
|
||
|
||
def _set_progress(p_msg, c, s):
|
||
ps = None
|
||
if 0 == s:
|
||
ps = 0
|
||
elif 1 == s and 0 == c:
|
||
ps = 100
|
||
elif 1 > c % s:
|
||
ps = c / s
|
||
if None is not ps:
|
||
sickgear.classes.loading_msg.set_msg_progress(p_msg, '{:6.2f}%'.format(ps))
|
||
|
||
for d in ['', 'thumbnails']:
|
||
bd = os.path.join(image_cache_dir, d)
|
||
if os.path.isdir(bd):
|
||
fc, dc = count_files_dirs(bd)
|
||
step = fc / float(100)
|
||
cf = 0
|
||
p_text = 'Upgrading %s' % (d, 'banner/poster')[not d]
|
||
_set_progress(p_text, 0, 0)
|
||
for entry in scandir(bd):
|
||
if entry.is_file():
|
||
cf += 1
|
||
_set_progress(p_text, cf, step)
|
||
b_s = bp_match.search(entry.name)
|
||
if b_s:
|
||
old_id = int(b_s.group(1))
|
||
tvid = show_list.get(old_id)
|
||
if tvid:
|
||
nb_dir = os.path.join(sickgear.CACHE_DIR, 'images', 'shows', '%s-%s' % (tvid, old_id), d)
|
||
if not os.path.isdir(nb_dir):
|
||
try:
|
||
os.makedirs(nb_dir)
|
||
except (BaseException, Exception):
|
||
pass
|
||
new_name = os.path.join(nb_dir, bp_match.sub(r'\2', entry.name))
|
||
try:
|
||
move_file(entry.path, new_name)
|
||
except (BaseException, Exception) as e:
|
||
logger.warning('Unable to rename %s to %s: %s / %s'
|
||
% (entry.path, new_name, repr(e), ex(e)))
|
||
else:
|
||
# clean up files without reference in db
|
||
try:
|
||
os.remove(entry.path)
|
||
except (BaseException, Exception):
|
||
pass
|
||
elif entry.is_dir():
|
||
if entry.name in ['shows', 'browse']:
|
||
continue
|
||
elif 'fanart' == entry.name:
|
||
_set_progress(p_text, 0, 1)
|
||
fc_fan, dc_fan = count_files_dirs(entry.path)
|
||
step_fan = dc_fan / float(100)
|
||
cf_fan = 0
|
||
p_text = 'Upgrading fanart'
|
||
_set_progress(p_text, 0, 0)
|
||
try:
|
||
entries = scandir(entry.path)
|
||
except OSError as e:
|
||
logger.warning('Unable to stat dirs %s / %s' % (repr(e), ex(e)))
|
||
continue
|
||
for d_entry in entries:
|
||
if d_entry.is_dir():
|
||
cf_fan += 1
|
||
_set_progress(p_text, cf_fan, step_fan)
|
||
old_id = try_int(d_entry.name)
|
||
if old_id:
|
||
new_id = show_list.get(old_id)
|
||
if new_id:
|
||
new_dir_name = os.path.join(sickgear.CACHE_DIR, 'images', 'shows',
|
||
'%s-%s' % (new_id, old_id), 'fanart')
|
||
try:
|
||
move_file(d_entry.path, new_dir_name)
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Unable to rename {d_entry.path} to {new_dir_name}:'
|
||
f' {repr(e)} / {ex(e)}')
|
||
if os.path.isdir(new_dir_name):
|
||
try:
|
||
f_n = filter(lambda fn: fn.is_file(), scandir(new_dir_name))
|
||
except OSError as e:
|
||
logger.warning('Unable to rename %s / %s' % (repr(e), ex(e)))
|
||
else:
|
||
rename_args = []
|
||
# noinspection PyTypeChecker
|
||
for f_entry in f_n:
|
||
rename_args += [(f_entry.path, bp_match.sub(r'\2', f_entry.path))]
|
||
|
||
for args in rename_args:
|
||
try:
|
||
move_file(*args)
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Unable to rename {args[0]} to {args[1]}:'
|
||
f' {repr(e)} / {ex(e)}')
|
||
else:
|
||
try:
|
||
shutil.rmtree(d_entry.path)
|
||
except (BaseException, Exception):
|
||
pass
|
||
try:
|
||
shutil.rmtree(d_entry.path)
|
||
except (BaseException, Exception):
|
||
pass
|
||
try:
|
||
os.rmdir(entry.path)
|
||
except (BaseException, Exception):
|
||
pass
|
||
if 'thumbnails' == d:
|
||
try:
|
||
os.rmdir(bd)
|
||
except (BaseException, Exception):
|
||
pass
|
||
_set_progress(p_text, 0, 1)
|
||
|
||
|
||
def xhtml_escape(text, br=True):
|
||
"""
|
||
Escapes a string, so it is valid within HTML or XML using the function from Tornado.
|
||
|
||
:param text: Text to convert entities from for example '"' to '"'
|
||
:type text: AnyStr
|
||
:param br: True, replace newline with html `<br>`
|
||
:type br: bool
|
||
:return: Text with entities replaced
|
||
:rtype: AnyStr
|
||
"""
|
||
if not text:
|
||
return text
|
||
from tornado import escape
|
||
if br:
|
||
text = re.sub(r'\r?\n', '<br>', text)
|
||
|
||
return escape.xhtml_escape(normalise_chars(text))
|
||
|
||
|
||
def normalise_chars(text):
|
||
# noinspection GrazieInspection
|
||
"""
|
||
Normalise characters to maintain a consistent output from different sources, and to prevent issues when sorting
|
||
|
||
e.g.
|
||
curved apostrophe ’ with standard ' apostrophe
|
||
wide dash with standard hyphen
|
||
|
||
:param text: Text to convert entities from for example '"' to '"'
|
||
:type text: AnyStr
|
||
:return: Text with entities replaced
|
||
:rtype: AnyStr
|
||
"""
|
||
result = text.replace('\u2010', '-').replace('\u2011', '-').replace('\u2012', '-') \
|
||
.replace('\u2013', '-').replace('\u2014', '-').replace('\u2015', '-') \
|
||
.replace('\u2018', "'").replace('\u2019', "'") \
|
||
.replace('\u201c', '\"').replace('\u201d', '\"') \
|
||
.replace('\u0020', ' ').replace('\u00a0', ' ')
|
||
|
||
return result
|
||
|
||
|
||
def parse_imdb_id(string):
|
||
# type: (AnyStr) -> Optional[AnyStr]
|
||
""" Parse an IMDB ID from a string
|
||
|
||
:param string: string to parse
|
||
:return: parsed ID of the form char, char, number of digits or None if no match
|
||
"""
|
||
result = None
|
||
try:
|
||
result = RC_IMDB_ID.findall(string)[0]
|
||
except(BaseException, Exception):
|
||
pass
|
||
|
||
return result
|
||
|
||
|
||
def generate_word_str(words, regex=False, join_chr=','):
|
||
# type: (Set[AnyStr], bool, AnyStr) -> AnyStr
|
||
"""
|
||
combine a list or set to a string with optional prefix 'regex:'
|
||
|
||
:param words: list or set of words
|
||
:type words: set
|
||
:param regex: prefix regex: ?
|
||
:type regex: bool
|
||
:param join_chr: character(s) used for join words
|
||
:type join_chr: basestring
|
||
:return: combined string
|
||
:rtype: basestring
|
||
"""
|
||
return '%s%s' % (('', 'regex:')[True is regex], join_chr.join(words))
|
||
|
||
|
||
def split_word_str(word_list):
|
||
# type: (AnyStr) -> Tuple[Set[AnyStr], bool]
|
||
"""
|
||
split string into set and boolean regex
|
||
|
||
:param word_list: string with words
|
||
:type word_list: basestring
|
||
:return: set of words, is it regex
|
||
:rtype: (set, bool)
|
||
"""
|
||
try:
|
||
if word_list.startswith('regex:'):
|
||
rx = True
|
||
word_list = word_list.replace('regex:', '')
|
||
else:
|
||
rx = False
|
||
s = set(w.strip() for w in word_list.split(',') if w.strip())
|
||
except (BaseException, Exception):
|
||
rx = False
|
||
s = set()
|
||
return s, rx
|