mirror of
https://github.com/SickGear/SickGear.git
synced 2024-11-21 20:35:05 +00:00
1833 lines
63 KiB
Python
1833 lines
63 KiB
Python
# coding=utf-8
|
||
#
|
||
# This file is part of SickGear.
|
||
#
|
||
# SickGear is free software: you can redistribute it and/or modify
|
||
# it under the terms of the GNU General Public License as published by
|
||
# the Free Software Foundation, either version 3 of the License, or
|
||
# (at your option) any later version.
|
||
#
|
||
# SickGear is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU General Public License for more details.
|
||
#
|
||
# You should have received a copy of the GNU General Public License
|
||
# along with SickGear. If not, see <http://www.gnu.org/licenses/>.
|
||
|
||
from itertools import cycle
|
||
import datetime
|
||
import hashlib
|
||
import os
|
||
import re
|
||
import shutil
|
||
import socket
|
||
import time
|
||
import uuid
|
||
import sys
|
||
|
||
import sickgear
|
||
from . import db, logger, notifiers
|
||
from .common import cpu_presets, mediaExtensions, Overview, Quality, statusStrings, subtitleExtensions, \
|
||
ARCHIVED, DOWNLOADED, FAILED, IGNORED, SKIPPED, SNATCHED_ANY, SUBTITLED, UNAIRED, UNKNOWN, WANTED
|
||
from .sgdatetime import SGDatetime
|
||
from lib.tvinfo_base.exceptions import *
|
||
from exceptions_helper import ex, MultipleShowObjectsException
|
||
|
||
import dateutil.parser
|
||
import requests
|
||
import requests.exceptions
|
||
import subliminal
|
||
from lxml_etree import etree, is_lxml
|
||
from base64 import decodebytes as b64decodebytes, encodebytes as b64encodebytes
|
||
|
||
from _23 import decode_bytes, decode_str, scandir
|
||
from six import iteritems, string_types, text_type
|
||
# noinspection PyUnresolvedReferences
|
||
from six.moves import zip
|
||
|
||
# the following are imported from elsewhere,
|
||
# therefore, they intentionally don't resolve and are unused in this particular file.
|
||
# noinspection PyUnresolvedReferences
|
||
from sg_helpers import chmod_as_parent, clean_data, copy_file, download_file, fix_set_group_id, get_system_temp_dir, \
|
||
get_url, indent_xml, make_path, maybe_plural, md5_for_text, move_file, proxy_setting, remove_file, \
|
||
remove_file_perm, replace_extension, sanitize_filename, scantree, touch_file, try_int, try_ord, write_file
|
||
|
||
# deprecated item, remove in 2020, kept here as rollback uses it
|
||
copyFile = copy_file
|
||
moveFile = move_file
|
||
tryInt = try_int # one legacy custom provider is keeping this signature here
|
||
|
||
# noinspection PyUnreachableCode
|
||
if False:
|
||
# noinspection PyUnresolvedReferences
|
||
from typing import Any, AnyStr, Dict, Generator, NoReturn, Iterable, Iterator, List, Optional, Set, Tuple, Union
|
||
from .tv import TVShow
|
||
# the following workaround hack resolves a pyc resolution bug
|
||
from .name_cache import retrieve_name_from_cache
|
||
from six import integer_types
|
||
|
||
RE_XML_ENCODING = re.compile(r'^(<\?xml[^>]+)\s+(encoding\s*=\s*[\"\'][^\"\']*[\"\'])(\s*\?>|)', re.U)
|
||
RE_IMDB_ID = r'tt\d{7,10}'
|
||
RC_IMDB_ID = re.compile(r'(?i)(%s)' % RE_IMDB_ID)
|
||
|
||
|
||
def remove_extension(name):
|
||
"""
|
||
Remove download or media extension from name (if any)
|
||
|
||
:param name: filename
|
||
:type name: AnyStr
|
||
:return: name without extension
|
||
:rtype: AnyStr
|
||
"""
|
||
|
||
if name and "." in name:
|
||
base_name, sep, extension = name.rpartition('.')
|
||
if base_name and extension.lower() in ['nzb', 'torrent'] + mediaExtensions:
|
||
name = base_name
|
||
|
||
return name
|
||
|
||
|
||
def is_parsable_date(text):
|
||
try:
|
||
result = isinstance(dateutil.parser.parse(text), datetime.datetime)
|
||
except (BaseException, Exception):
|
||
result = False
|
||
return result
|
||
|
||
|
||
def remove_non_release_groups(name, is_anime=False):
|
||
"""
|
||
Remove non release groups from name
|
||
|
||
:param name: release name
|
||
:type name: AnyStr
|
||
:param is_anime: is anmie
|
||
:type is_anime: bool
|
||
:return:
|
||
:rtype: AnyStr
|
||
"""
|
||
|
||
if name:
|
||
rc = [re.compile(r'(?i)' + v) for v in [
|
||
r'([\s\.\-_\[\{\(]*(no-rar|nzbgeek|ripsalot|siklopentan)[\s\.\-_\]\}\)]*)$',
|
||
r'([\s\.\-_\[\{\(]rp[\s\.\-_\]\}\)]*)$',
|
||
r'(?<=\w)([\s\.\-_]*[\[\{\(][\s\.\-_]*(www\.\w+.\w+)[\s\.\-_]*[\]\}\)][\s\.\-_]*)$',
|
||
r'(?<=\w)([\s\.\-_]*[\[\{\(]\s*(rar(bg|tv)|((e[tz]|v)tv))[\s\.\-_]*[\]\}\)][\s\.\-_]*)$'] +
|
||
([r'(?<=\w)([\s\.\-_]*[\[\{\(][\s\.\-_]*[\w\s\.\-\_]+[\s\.\-_]*[\]\}\)][\s\.\-_]*)$',
|
||
r'^([\s\.\-_]*[\[\{\(][\s\.\-_]*[\w\s\.\-\_]+[\s\.\-_]*[\]\}\)][\s\.\-_]*)(?=\w)'], [])[is_anime]]
|
||
rename = name = remove_extension(name)
|
||
while rename:
|
||
for regex in rc:
|
||
result = regex.findall(name)
|
||
if result:
|
||
for cur_match in isinstance(result[0], tuple) and result[0] or result:
|
||
if not is_parsable_date(cur_match.strip(' ()')):
|
||
name = regex.sub('', name)
|
||
rename = (name, False)[name == rename]
|
||
|
||
return name
|
||
|
||
|
||
def is_sync_file(filename):
|
||
"""
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
extension = filename.rpartition(".")[2].lower()
|
||
return '!sync' == extension or 'lftp-pget-status' == extension
|
||
|
||
|
||
def has_media_ext(filename):
|
||
"""
|
||
checks if file has media extension
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
# ignore samples
|
||
if re.search(r'(^|[\W_])(sample\d*)[\W_]', filename, re.I) \
|
||
or filename.startswith('._'): # and MAC OS's 'resource fork' files
|
||
return False
|
||
|
||
sep_file = filename.rpartition('.')
|
||
return (None is re.search('extras?$', sep_file[0], re.I)) and (sep_file[2].lower() in mediaExtensions)
|
||
|
||
|
||
def has_image_ext(filename):
|
||
"""
|
||
checks if file has image extension
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
try:
|
||
if os.path.splitext(filename)[1].lower() in ['.bmp', '.gif', '.jpeg', '.jpg', '.png', '.webp']:
|
||
return True
|
||
except (BaseException, Exception):
|
||
pass
|
||
return False
|
||
|
||
|
||
def is_first_rar_volume(filename):
|
||
"""
|
||
checks if file is part of rar set
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
return None is not re.search(r'(?P<file>^(?P<base>(?:(?!\.part\d+\.rar$).)*)\.(?:(?:part0*1\.)?rar)$)', filename)
|
||
|
||
|
||
def find_show_by_id(
|
||
show_id, # type: Union[AnyStr, Dict[int, int], int]
|
||
show_list=None, # type: Optional[List[TVShow]]
|
||
no_mapped_ids=True, # type: bool
|
||
check_multishow=False, # type: bool
|
||
no_exceptions=False # type: bool
|
||
):
|
||
# type: (...) -> Optional[TVShow]
|
||
"""
|
||
:param show_id: {indexer: id} or 'tvid_prodid'.
|
||
:param show_list: (optional) TVShow objects list
|
||
:param no_mapped_ids: don't check mapped ids
|
||
:param check_multishow: check for multiple show matches
|
||
:param no_exceptions: suppress the MultipleShowObjectsException and return None instead
|
||
"""
|
||
results = []
|
||
if None is show_list:
|
||
show_list = sickgear.showList
|
||
|
||
if show_id and show_list:
|
||
tvid_prodid_obj = None if not isinstance(show_id, string_types) else sickgear.tv.TVidProdid(show_id)
|
||
|
||
if tvid_prodid_obj and no_mapped_ids:
|
||
if None is tvid_prodid_obj.prodid:
|
||
return
|
||
return sickgear.showDict.get(int(tvid_prodid_obj))
|
||
else:
|
||
if tvid_prodid_obj:
|
||
if None is tvid_prodid_obj.prodid:
|
||
return None
|
||
show_id = tvid_prodid_obj.dict
|
||
|
||
if isinstance(show_id, dict):
|
||
if no_mapped_ids:
|
||
sid_int_list = [sickgear.tv.TVShow.create_sid(k, v) for k, v in iteritems(show_id) if k and v and
|
||
0 < v and sickgear.tv.tvid_bitmask >= k]
|
||
if not check_multishow:
|
||
return next((sickgear.showDict.get(_show_sid_id) for _show_sid_id in sid_int_list
|
||
if sickgear.showDict.get(_show_sid_id)), None)
|
||
results = [sickgear.showDict.get(_show_sid_id) for _show_sid_id in sid_int_list
|
||
if sickgear.showDict.get(_show_sid_id)]
|
||
else:
|
||
results = [_show_obj for k, v in iteritems(show_id) if k and v and 0 < v
|
||
for _show_obj in show_list if v == _show_obj.internal_ids.get(k, {'id': 0})['id']]
|
||
|
||
num_shows = len(set(results))
|
||
if 1 == num_shows:
|
||
return results[0]
|
||
if 1 < num_shows and not no_exceptions:
|
||
raise MultipleShowObjectsException()
|
||
|
||
|
||
def make_dir(path):
|
||
"""
|
||
create given path recursively
|
||
|
||
:param path: path
|
||
:type path: AnyStr
|
||
:return: success of creation
|
||
:rtype: bool
|
||
"""
|
||
if not os.path.isdir(path):
|
||
try:
|
||
os.makedirs(path)
|
||
# do a Synology library update
|
||
notifiers.NotifierFactory().get('SYNOINDEX').addFolder(path)
|
||
except OSError:
|
||
return False
|
||
return True
|
||
|
||
|
||
def search_infosrc_for_show_id(reg_show_name, tvid=None, prodid=None, ui=None):
|
||
"""
|
||
search info source for show
|
||
|
||
:param reg_show_name: show name to search
|
||
:type reg_show_name: AnyStr
|
||
:param tvid: tvid
|
||
:type tvid: int or None
|
||
:param prodid: prodid
|
||
:type prodid: int or long or None
|
||
:param ui: ui class
|
||
:type ui: object
|
||
:return: seriesname, tvid, prodid or None, None, None
|
||
:rtype: Tuple[None, None, None] or Tuple[AnyStr, int, int or long]
|
||
"""
|
||
show_names = [re.sub('[. -]', ' ', reg_show_name)]
|
||
|
||
# Query Indexers for each search term and build the list of results
|
||
for cur_tvid in (sickgear.TVInfoAPI().sources if not tvid else [int(tvid)]) or []:
|
||
# Query Indexers for each search term and build the list of results
|
||
tvinfo_config = sickgear.TVInfoAPI(cur_tvid).api_params.copy()
|
||
if ui is not None:
|
||
tvinfo_config['custom_ui'] = ui
|
||
t = sickgear.TVInfoAPI(cur_tvid).setup(**tvinfo_config)
|
||
|
||
for cur_name in show_names:
|
||
logger.debug('Trying to find %s on %s' % (cur_name, sickgear.TVInfoAPI(cur_tvid).name))
|
||
|
||
try:
|
||
if prodid:
|
||
show_info_list = t.get_show(prodid)
|
||
else:
|
||
show_info_list = t.search_show(cur_name)
|
||
show_info_list = show_info_list if isinstance(show_info_list, list) else [show_info_list]
|
||
except (BaseException, Exception):
|
||
continue
|
||
|
||
seriesname = _prodid = None
|
||
for cur_show_info in show_info_list: # type: dict
|
||
try:
|
||
seriesname = cur_show_info['seriesname']
|
||
_prodid = cur_show_info['id']
|
||
except (BaseException, Exception):
|
||
_prodid = seriesname = None
|
||
continue
|
||
if seriesname and _prodid:
|
||
break
|
||
|
||
if not (seriesname and _prodid):
|
||
continue
|
||
|
||
if None is prodid and str(cur_name).lower() == str(seriesname).lower():
|
||
return seriesname, cur_tvid, int(_prodid)
|
||
elif None is not prodid and int(prodid) == int(_prodid):
|
||
return seriesname, cur_tvid, int(prodid)
|
||
|
||
if tvid:
|
||
break
|
||
|
||
return None, None, None
|
||
|
||
|
||
def sizeof_fmt(number, digits=1, sep=' '):
|
||
# type: (int, int, AnyStr) -> AnyStr
|
||
"""
|
||
format given bytes to human-readable text
|
||
|
||
:param number: value to convert
|
||
:param digits: number of digits after decimal point
|
||
:param sep: seperater of value and dimension
|
||
:return: human-readable formatted text
|
||
"""
|
||
for cur_dimension in ['bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
|
||
if 1024.0 > number:
|
||
return f'{number:3.{digits}f}{sep}{cur_dimension}'
|
||
number /= 1024.0
|
||
|
||
|
||
def list_media_files(path):
|
||
# type: (AnyStr) -> List[AnyStr]
|
||
"""
|
||
list all media files in given path
|
||
|
||
:param path: path
|
||
:return: list of media files
|
||
"""
|
||
result = []
|
||
if path:
|
||
if [direntry for direntry in scantree(path, include=[r'\.sickgearignore'], filter_kind=False, recurse=False)]:
|
||
logger.debug('Skipping folder "%s" because it contains ".sickgearignore"' % path)
|
||
else:
|
||
result = [direntry.path for direntry in scantree(path, exclude_dirs=[
|
||
'^Extras$',
|
||
'^Behind The Scenes$', '^Deleted Scenes$', '^Featurettes$',
|
||
'^Interviews$', '^Scenes$', '^Shorts$', '^Trailers$', '^Other$'
|
||
], filter_kind=False, exclude_folders_with_files=['.sickgearignore'])
|
||
if has_media_ext(direntry.name)]
|
||
return result
|
||
|
||
|
||
def link(src_file, dest_file):
|
||
"""
|
||
|
||
:param src_file: source file
|
||
:type src_file: AnyStr
|
||
:param dest_file: destination file
|
||
:type dest_file: AnyStr
|
||
"""
|
||
if 'nt' == os.name:
|
||
import ctypes
|
||
|
||
if 0 == ctypes.windll.kernel32.CreateHardLinkW(text_type(dest_file), text_type(src_file), 0):
|
||
raise ctypes.WinError()
|
||
else:
|
||
os.link(src_file, dest_file)
|
||
|
||
|
||
def hardlink_file(src_file, dest_file):
|
||
"""
|
||
|
||
:param src_file: source file
|
||
:type src_file: AnyStr
|
||
:param dest_file: destination file
|
||
:type dest_file: AnyStr
|
||
"""
|
||
try:
|
||
link(src_file, dest_file)
|
||
fix_set_group_id(dest_file)
|
||
except (BaseException, Exception) as e:
|
||
logger.error(f'Failed to create hardlink of {src_file} at {dest_file}: {ex(e)}. Copying instead.')
|
||
copy_file(src_file, dest_file)
|
||
|
||
|
||
def symlink(src_file, dest_file):
|
||
"""
|
||
|
||
:param src_file: source file
|
||
:type src_file: AnyStr
|
||
:param dest_file: destination
|
||
:type dest_file: AnyStr
|
||
"""
|
||
if 'nt' == os.name:
|
||
import ctypes
|
||
|
||
if ctypes.windll.kernel32.CreateSymbolicLinkW(
|
||
text_type(dest_file), text_type(src_file), 1 if os.path.isdir(src_file) else 0) in [0, 1280]:
|
||
raise ctypes.WinError()
|
||
else:
|
||
os.symlink(src_file, dest_file)
|
||
|
||
|
||
def move_and_symlink_file(src_file, dest_file):
|
||
"""
|
||
|
||
:param src_file: source file
|
||
:type src_file: AnyStr
|
||
:param dest_file: destination file
|
||
:type dest_file: AnyStr
|
||
"""
|
||
try:
|
||
shutil.move(src_file, dest_file)
|
||
fix_set_group_id(dest_file)
|
||
symlink(dest_file, src_file)
|
||
except (BaseException, Exception):
|
||
logger.error(f'Failed to create symlink of {src_file} at {dest_file}. Copying instead')
|
||
copy_file(src_file, dest_file)
|
||
|
||
|
||
def rename_ep_file(cur_path, new_path, old_path_length=0, use_rename=False):
|
||
"""
|
||
Creates all folders needed to move a file to its new location, renames it, then cleans up any folders
|
||
left that are now empty.
|
||
|
||
:param cur_path: The absolute path to the file you want to move/rename
|
||
:type cur_path: AnyStr
|
||
:param new_path: The absolute path to the destination for the file WITHOUT THE EXTENSION
|
||
:type new_path: AnyStr
|
||
:param old_path_length: The length of media file path (old name) WITHOUT THE EXTENSION
|
||
:type old_path_length: int or long
|
||
:param use_rename: use rename instead of shutil.move
|
||
:return: success
|
||
:rtype: bool
|
||
"""
|
||
|
||
# new_dest_dir, new_dest_name = os.path.split(new_path)
|
||
|
||
if 0 == old_path_length or len(cur_path) < old_path_length:
|
||
# approach from the right
|
||
cur_file_name, cur_file_ext = os.path.splitext(cur_path)
|
||
else:
|
||
# approach from the left
|
||
cur_file_ext = cur_path[old_path_length:]
|
||
cur_file_name = cur_path[:old_path_length]
|
||
|
||
if cur_file_ext[1:] in subtitleExtensions:
|
||
# Extract subtitle language from filename
|
||
sublang = os.path.splitext(cur_file_name)[1][1:]
|
||
|
||
# Check if the language extracted from filename is a valid language
|
||
try:
|
||
_ = subliminal.language.Language(sublang, strict=True)
|
||
cur_file_ext = '.' + sublang + cur_file_ext
|
||
except ValueError:
|
||
pass
|
||
|
||
# put the extension on the incoming file
|
||
new_path += cur_file_ext
|
||
|
||
make_path(os.path.dirname(new_path), syno=True)
|
||
|
||
# move the file
|
||
try:
|
||
logger.log(f'Renaming file from {cur_path} to {new_path}')
|
||
if use_rename:
|
||
os.rename(cur_path, new_path)
|
||
else:
|
||
shutil.move(cur_path, new_path)
|
||
except (OSError, IOError, IsADirectoryError, NotADirectoryError, FileExistsError) as e:
|
||
logger.error(f'Failed renaming {cur_path} to {new_path}: {ex(e)}')
|
||
return False
|
||
|
||
# clean up any old folders that are empty
|
||
delete_empty_folders(os.path.dirname(cur_path))
|
||
|
||
return True
|
||
|
||
|
||
def delete_empty_folders(check_empty_dir, keep_dir=None):
|
||
"""
|
||
Walks backwards up the path and deletes any empty folders found.
|
||
|
||
:param check_empty_dir: The path to clean (absolute path to a folder)
|
||
:type check_empty_dir: AnyStr
|
||
:param keep_dir: Clean until this path is reached
|
||
:type keep_dir: bool
|
||
"""
|
||
|
||
# treat check_empty_dir as empty when it only contains these items
|
||
ignore_items = []
|
||
|
||
logger.log(f'Trying to clean any empty folders under {check_empty_dir}')
|
||
|
||
# as long as the folder exists and doesn't contain any files, delete it
|
||
while os.path.isdir(check_empty_dir) and check_empty_dir != keep_dir:
|
||
check_files = os.listdir(check_empty_dir)
|
||
|
||
if not check_files or (len(check_files) <= len(ignore_items) and all(
|
||
[check_file in ignore_items for check_file in check_files])):
|
||
# directory is empty or contains only ignore_items
|
||
try:
|
||
logger.log(f"Deleting empty folder: {check_empty_dir}")
|
||
# need shutil.rmtree when ignore_items is really implemented
|
||
os.rmdir(check_empty_dir)
|
||
# do a Synology library update
|
||
notifiers.NotifierFactory().get('SYNOINDEX').deleteFolder(check_empty_dir)
|
||
except OSError as e:
|
||
logger.warning(f'Unable to delete {check_empty_dir}: {repr(e)} / {ex(e)}')
|
||
break
|
||
check_empty_dir = os.path.dirname(check_empty_dir)
|
||
else:
|
||
break
|
||
|
||
|
||
def get_absolute_number_from_season_and_episode(show_obj, season, episode):
|
||
"""
|
||
|
||
:param show_obj: show object
|
||
:type show_obj: TVShow
|
||
:param season: season number
|
||
:type season: int
|
||
:param episode: episode number
|
||
:type episode: int
|
||
:return: absolute number
|
||
:type: int or long
|
||
"""
|
||
absolute_number = None
|
||
|
||
if season and episode:
|
||
my_db = db.DBConnection()
|
||
sql_result = my_db.select('SELECT *'
|
||
' FROM tv_episodes'
|
||
' WHERE indexer = ? AND showid = ? AND season = ? AND episode = ?',
|
||
[show_obj.tvid, show_obj.prodid, season, episode])
|
||
|
||
if 1 == len(sql_result):
|
||
absolute_number = int(sql_result[0]["absolute_number"])
|
||
logger.debug(f'Found absolute_number:{absolute_number} by {season}x{episode}')
|
||
else:
|
||
logger.debug('No entries for absolute number in show: %s found using %sx%s' %
|
||
(show_obj.unique_name, str(season), str(episode)))
|
||
|
||
return absolute_number
|
||
|
||
|
||
def get_all_episodes_from_absolute_number(show_obj, absolute_numbers):
|
||
# type: (TVShow, List[int]) -> Tuple[int, List[int]]
|
||
"""
|
||
|
||
:param show_obj: show object
|
||
:param absolute_numbers: absolute numbers
|
||
"""
|
||
episode_numbers = []
|
||
season_number = None
|
||
|
||
if show_obj and len(absolute_numbers):
|
||
for absolute_number in absolute_numbers:
|
||
ep_obj = show_obj.get_episode(None, None, absolute_number=absolute_number)
|
||
if ep_obj:
|
||
episode_numbers.append(ep_obj.episode)
|
||
season_number = ep_obj.season # this takes the last found season so eps that cross the season
|
||
# border are not handled well
|
||
|
||
return season_number, episode_numbers
|
||
|
||
|
||
def sanitize_scene_name(name):
|
||
"""
|
||
Takes a show name and returns the "scenified" version of it.
|
||
|
||
:param name: name
|
||
:type name: AnyStr
|
||
:return: A string containing the scene version of the show name given.
|
||
:rtype: AnyStr
|
||
"""
|
||
if name:
|
||
bad_chars = ',:()£\'!?\u2019'
|
||
|
||
# strip out any bad chars
|
||
name = re.sub(r'[%s]' % bad_chars, '', name, flags=re.U)
|
||
|
||
# tidy up stuff that doesn't belong in scene names
|
||
name = re.sub(r'(-?\s|/)', '.', name).replace('&', 'and')
|
||
name = re.sub(r"\.+", '.', name).rstrip('.')
|
||
|
||
return name
|
||
return ''
|
||
|
||
|
||
def create_https_certificates(ssl_cert, ssl_key):
|
||
"""
|
||
Create self-signed HTTPS certificares and store in paths 'ssl_cert' and 'ssl_key'
|
||
"""
|
||
|
||
try:
|
||
from lib.certgen import generate_key, generate_local_cert
|
||
except (BaseException, Exception):
|
||
return False
|
||
|
||
private_key = generate_key(key_size=4096, output_file=ssl_key)
|
||
cert = generate_local_cert(private_key, days_valid=3650, output_file=ssl_cert)
|
||
return bool(cert)
|
||
|
||
|
||
if '__main__' == __name__:
|
||
import doctest
|
||
|
||
doctest.testmod()
|
||
|
||
|
||
def parse_xml(data, del_xmlns=False):
|
||
# type: (AnyStr, bool) -> Optional[etree.ElementTree]
|
||
"""
|
||
Parse data into an xml elementtree.ElementTree
|
||
|
||
data: data string containing xml
|
||
del_xmlns: if True, removes xmlns namesspace from data before parsing
|
||
|
||
Returns: parsed data as elementtree or None
|
||
"""
|
||
|
||
if del_xmlns:
|
||
data = re.sub(' xmlns="[^"]+"', '', data)
|
||
|
||
if isinstance(data, text_type) and is_lxml:
|
||
data = RE_XML_ENCODING.sub(r'\1\3', data, count=1)
|
||
|
||
try:
|
||
parsed_xml = etree.fromstring(data)
|
||
except (BaseException, Exception) as e:
|
||
logger.debug(f"Error trying to parse xml data. Error: {ex(e)}")
|
||
parsed_xml = None
|
||
|
||
return parsed_xml
|
||
|
||
|
||
def backup_versioned_file(old_file, version):
|
||
"""
|
||
|
||
:param old_file: old filename
|
||
:type old_file: AnyStr
|
||
:param version: version number
|
||
:type version: int
|
||
:return: success
|
||
:rtype: bool
|
||
"""
|
||
num_tries = 0
|
||
|
||
new_file = '%s.v%s' % (old_file, version)
|
||
|
||
if os.path.isfile(new_file):
|
||
changed_old_db = False
|
||
for back_nr in range(1, 10000):
|
||
alt_name = '%s.r%s' % (new_file, back_nr)
|
||
if not os.path.isfile(alt_name):
|
||
try:
|
||
shutil.move(new_file, alt_name)
|
||
changed_old_db = True
|
||
break
|
||
except (BaseException, Exception):
|
||
if os.path.isfile(new_file):
|
||
continue
|
||
logger.warning('could not rename old backup db file')
|
||
if not changed_old_db:
|
||
raise Exception('can\'t create a backup of db')
|
||
|
||
while not os.path.isfile(new_file):
|
||
if not os.path.isfile(old_file) or 0 == get_size(old_file):
|
||
logger.debug('No need to create backup')
|
||
break
|
||
|
||
try:
|
||
logger.debug(f'Trying to back up {old_file} to {new_file}')
|
||
shutil.copy(old_file, new_file)
|
||
logger.debug('Backup done')
|
||
break
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Error while trying to back up {old_file} to {new_file} : {ex(e)}')
|
||
num_tries += 1
|
||
time.sleep(3)
|
||
logger.debug('Trying again.')
|
||
|
||
if 3 <= num_tries:
|
||
logger.error(f'Unable to back up {old_file} to {new_file} please do it manually.')
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def restore_versioned_file(backup_file, version):
|
||
"""
|
||
|
||
:param backup_file: filename
|
||
:type backup_file: AnyStr
|
||
:param version: version number
|
||
:type version: int
|
||
:return: success
|
||
:rtype: bool
|
||
"""
|
||
num_tries = 0
|
||
|
||
new_file, backup_version = os.path.splitext(backup_file)
|
||
restore_file = new_file + '.' + 'v' + str(version)
|
||
|
||
if not os.path.isfile(new_file):
|
||
logger.debug(f'Not restoring, {new_file} doesn\'t exist')
|
||
return False
|
||
|
||
try:
|
||
logger.debug(f'Trying to backup {new_file} to {new_file}.r{version} before restoring backup')
|
||
shutil.move(new_file, new_file + '.' + 'r' + str(version))
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Error while trying to backup DB file {restore_file} before proceeding with restore: {ex(e)}')
|
||
return False
|
||
|
||
while not os.path.isfile(new_file):
|
||
if not os.path.isfile(restore_file):
|
||
logger.debug(f'Not restoring, {restore_file} doesn\'t exist')
|
||
break
|
||
|
||
try:
|
||
logger.debug(f'Trying to restore {restore_file} to {new_file}')
|
||
shutil.copy(restore_file, new_file)
|
||
logger.debug('Restore done')
|
||
break
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Error while trying to restore {restore_file}: {ex(e)}')
|
||
num_tries += 1
|
||
time.sleep(1)
|
||
logger.debug('Trying again.')
|
||
|
||
if 10 <= num_tries:
|
||
logger.error(f'Unable to restore {restore_file} to {new_file} please do it manually.')
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
# try to convert to float, return default on failure
|
||
def try_float(s, s_default=0.0):
|
||
try:
|
||
return float(s)
|
||
except (BaseException, Exception):
|
||
return float(s_default)
|
||
|
||
|
||
# generates a md5 hash of a file
|
||
def md5_for_file(filename, block_size=2 ** 16):
|
||
"""
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:param block_size: block size
|
||
:type block_size: int or long
|
||
:return:
|
||
:rtype: AnyStr or None
|
||
"""
|
||
try:
|
||
with open(filename, 'rb') as f:
|
||
md5 = hashlib.md5()
|
||
while True:
|
||
data = f.read(block_size)
|
||
if not data:
|
||
break
|
||
md5.update(data)
|
||
f.close()
|
||
return md5.hexdigest()
|
||
except (BaseException, Exception):
|
||
return None
|
||
|
||
|
||
def get_lan_ip():
|
||
"""
|
||
Simple function to get LAN localhost_ip
|
||
http://stackoverflow.com/questions/11735821/python-get-localhost-ip
|
||
"""
|
||
|
||
if 'nt' != os.name:
|
||
# noinspection PyUnresolvedReferences
|
||
import fcntl
|
||
import struct
|
||
|
||
def get_interface_ip(if_name):
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', if_name[:15]))[20:24])
|
||
|
||
ip = socket.gethostbyname(socket.gethostname())
|
||
if ip.startswith("127.") and "nt" != os.name:
|
||
interfaces = [
|
||
"eth0",
|
||
"eth1",
|
||
"eth2",
|
||
"wlan0",
|
||
"wlan1",
|
||
"wifi0",
|
||
"ath0",
|
||
"ath1",
|
||
"ppp0",
|
||
]
|
||
for ifname in interfaces:
|
||
try:
|
||
# noinspection PyUnboundLocalVariable
|
||
ip = get_interface_ip(ifname)
|
||
print(ifname, ip)
|
||
break
|
||
except IOError:
|
||
pass
|
||
return ip
|
||
|
||
|
||
def check_url(url):
|
||
"""
|
||
Check if a URL exists without downloading the whole file.
|
||
:param url: url
|
||
:type url: AnyStr
|
||
:return:
|
||
:rtype: bool
|
||
"""
|
||
try:
|
||
return requests.head(url).ok
|
||
except (BaseException, Exception):
|
||
return False
|
||
|
||
|
||
def anon_url(*url):
|
||
"""
|
||
|
||
:param url: url
|
||
:type url:
|
||
:return: a URL string consisting of the Anonymous redirect URL and an arbitrary number of values appended
|
||
:rtype: AnyStr
|
||
"""
|
||
return '' if None in url else '%s%s' % (sickgear.ANON_REDIRECT, ''.join([str(s) for s in url]))
|
||
|
||
|
||
def starify(text, verify=False):
|
||
"""
|
||
If verify is true, return true if text is a star block created text else return false.
|
||
|
||
:param text: text
|
||
:type text: AnyStr
|
||
:param verify:
|
||
:type verify: bool
|
||
:return: Return text input string with either its latter half or its centre area (if 12 chars or more)
|
||
replaced with asterisks. Useful for securely presenting api keys to a ui.
|
||
"""
|
||
return '' if not text\
|
||
else ((('%s%s' % (text[:len(text) // 2], '*' * (len(text) // 2))),
|
||
('%s%s%s' % (text[:4], '*' * (len(text) - 8), text[-4:])))[12 <= len(text)],
|
||
set('*') == set((text[len(text) // 2:], text[4:-4])[12 <= len(text)]))[verify]
|
||
|
||
|
||
"""
|
||
Encryption
|
||
==========
|
||
By Pedro Jose Pereira Vieito <pvieito@gmail.com> (@pvieito)
|
||
|
||
* If encryption_version==0 then return data without encryption
|
||
* The keys should be unique for each device
|
||
|
||
To add a new encryption_version:
|
||
1) Code your new encryption_version
|
||
2) Update the last encryption_version available in webserve.py
|
||
3) Remember to maintain old encryption versions and key generators for retrocompatibility
|
||
"""
|
||
|
||
# Key Generators
|
||
unique_key1 = hex(uuid.getnode() ** 2) # Used in encryption v1
|
||
|
||
|
||
# Encryption Functions
|
||
def encrypt(data, encryption_version=0, do_decrypt=False):
|
||
# Version 1: Simple XOR encryption (this is not very secure, but works)
|
||
if 1 == encryption_version:
|
||
if do_decrypt:
|
||
return ''.join([chr(try_ord(x) ^ try_ord(y)) for (x, y) in
|
||
zip(b64decodebytes(decode_bytes(data)), cycle(unique_key1))])
|
||
|
||
return decode_str(b64encodebytes(decode_bytes(
|
||
''.join([chr(try_ord(x) ^ try_ord(y)) for (x, y) in zip(data, cycle(unique_key1))])))).strip()
|
||
|
||
# Version 0: Plain text
|
||
return data
|
||
|
||
|
||
def decrypt(data, encryption_version=0):
|
||
return encrypt(data, encryption_version, do_decrypt=True)
|
||
|
||
|
||
def full_sanitize_scene_name(name):
|
||
"""
|
||
sanitize scene name
|
||
|
||
:param name: name
|
||
:type name: AnyStr
|
||
:return: sanitized name
|
||
:rtype: AnyStr
|
||
"""
|
||
return re.sub('[. -]', ' ', sanitize_scene_name(name)).lower().lstrip()
|
||
|
||
|
||
def get_show(name, try_scene_exceptions=False):
|
||
# type: (AnyStr, bool) -> Optional[TVShow]
|
||
"""
|
||
get show object for show with given name
|
||
|
||
:param name: name of show
|
||
:type name: AnyStr
|
||
:param try_scene_exceptions: check scene exceptions
|
||
:type try_scene_exceptions: bool
|
||
:return: None or show object
|
||
:type: TVShow or None
|
||
"""
|
||
if not sickgear.showList or None is name:
|
||
return
|
||
|
||
show_obj = None
|
||
|
||
try:
|
||
tvid, prodid = sickgear.name_cache.retrieve_name_from_cache(name)
|
||
if tvid and prodid:
|
||
show_obj = find_show_by_id({tvid: prodid})
|
||
|
||
if not show_obj and try_scene_exceptions:
|
||
tvid, prodid, season = sickgear.scene_exceptions.get_scene_exception_by_name(name)
|
||
if tvid and prodid:
|
||
show_obj = find_show_by_id({tvid: prodid})
|
||
except (BaseException, Exception) as e:
|
||
logger.debug(f'Error when attempting to find show: {name} in SickGear: {ex(e)}')
|
||
|
||
return show_obj
|
||
|
||
|
||
def is_hidden_folder(folder):
|
||
"""
|
||
On Linux based systems hidden folders start with . (dot)
|
||
|
||
:param folder: Full path of folder to check
|
||
:type folder: AnyStr
|
||
:return: Returns True if folder is hidden
|
||
:rtype: bool
|
||
"""
|
||
if os.path.isdir(folder):
|
||
if os.path.basename(folder).startswith('.'):
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def real_path(path):
|
||
"""
|
||
The resulting path will have no symbolic link, '/./' or '/../' components.
|
||
|
||
:param path: path
|
||
:type path: AnyStr
|
||
:return: the canonicalized absolute pathname
|
||
:rtype: AnyStr
|
||
"""
|
||
return os.path.normpath(os.path.normcase(os.path.realpath(os.path.expanduser(path))))
|
||
|
||
|
||
def validate_show(show_obj, season=None, episode=None):
|
||
"""
|
||
|
||
:param show_obj: show object
|
||
:type show_obj: TVShow
|
||
:param season: optional season
|
||
:type season: int or None
|
||
:param episode: opitonal episode
|
||
:type episode: int or None
|
||
:return: TVInfoAPI source
|
||
:rtype: object
|
||
"""
|
||
show_lang = show_obj.lang
|
||
|
||
try:
|
||
tvinfo_config = sickgear.TVInfoAPI(show_obj.tvid).api_params.copy()
|
||
tvinfo_config['dvdorder'] = 0 != show_obj.dvdorder
|
||
|
||
if show_lang and not 'en' == show_lang:
|
||
tvinfo_config['language'] = show_lang
|
||
|
||
t = sickgear.TVInfoAPI(show_obj.tvid).setup(**tvinfo_config)
|
||
if season is None and episode is None:
|
||
return t
|
||
|
||
return t.get_show(show_obj.prodid, language=show_obj.lang)[season][episode]
|
||
except (BaseTVinfoEpisodenotfound, BaseTVinfoSeasonnotfound, TypeError):
|
||
pass
|
||
|
||
|
||
def _maybe_request_url(e, def_url=''):
|
||
return hasattr(e, 'request') and hasattr(e.request, 'url') and ' ' + e.request.url or def_url
|
||
|
||
|
||
def clear_cache(force=False):
|
||
"""
|
||
clear sickgear cache folder
|
||
|
||
:param force: force clearing
|
||
:type force: bool
|
||
"""
|
||
# clean out cache directory, remove everything > 12 hours old
|
||
dirty = None
|
||
del_time = SGDatetime.timestamp_near(td=datetime.timedelta(hours=12))
|
||
direntry_args = dict(follow_symlinks=False)
|
||
for direntry in scantree(sickgear.CACHE_DIR, exclude_dirs=['images|rss|zoneinfo'], follow_symlinks=True):
|
||
if direntry.is_file(**direntry_args) and (force or del_time > direntry.stat(**direntry_args).st_mtime):
|
||
dirty = dirty or False if remove_file_perm(direntry.path) else True
|
||
elif direntry.is_dir(**direntry_args) and direntry.name not in ['cheetah', 'sessions', 'indexers']:
|
||
dirty = dirty or False
|
||
try:
|
||
os.rmdir(direntry.path)
|
||
except OSError:
|
||
dirty = True
|
||
|
||
logger.log(
|
||
f'{(("Found items not removed", "Found items removed")[not dirty], "No items found to remove")[None is dirty]}'
|
||
f' from cache folder {sickgear.CACHE_DIR}')
|
||
|
||
|
||
def human(size):
|
||
"""
|
||
format a size in bytes into a 'human' file size, e.g. bytes, KB, MB, GB, TB, PB
|
||
Note that bytes/KB will be reported in whole numbers but MB and above will have greater precision
|
||
e.g. 1 byte, 43 bytes, 443 KB, 4.3 MB, 4.43 GB, etc
|
||
|
||
:param size: numerical value to be converted
|
||
:type size: int or long or float
|
||
:return: human readable string
|
||
:rtype: AnyStr
|
||
"""
|
||
if 1 == size:
|
||
# because I really hate unnecessary plurals
|
||
return "1 byte"
|
||
|
||
suffixes_table = [('bytes', 0), ('KB', 0), ('MB', 1), ('GB', 2), ('TB', 2), ('PB', 2)]
|
||
|
||
num = float(size)
|
||
for suffix, precision in suffixes_table:
|
||
if 1024.0 > num:
|
||
break
|
||
num /= 1024.0
|
||
|
||
# noinspection PyUnboundLocalVariable
|
||
if 0 == precision:
|
||
formatted_size = '%d' % num
|
||
else:
|
||
formatted_size = str(round(num, ndigits=precision))
|
||
|
||
# noinspection PyUnboundLocalVariable
|
||
return '%s %s' % (formatted_size, suffix)
|
||
|
||
|
||
def get_size(start_path='.'):
|
||
"""
|
||
return combined size of data in given path
|
||
|
||
:param start_path: start path
|
||
:type start_path: AnyStr
|
||
:return: size in bytes
|
||
:rtype: int or long
|
||
"""
|
||
if os.path.isfile(start_path):
|
||
return os.path.getsize(start_path)
|
||
try:
|
||
return sum(map((lambda x: x.stat(follow_symlinks=False).st_size), scantree(start_path)))
|
||
except OSError:
|
||
return 0
|
||
|
||
|
||
def get_media_stats(start_path='.'):
|
||
# type: (AnyStr) -> Tuple[int, int, int, int]
|
||
"""
|
||
return recognised media stats for a folder as...
|
||
|
||
number of media files, smallest size in bytes, largest size in bytes, average size in bytes
|
||
|
||
:param start_path: path to scan
|
||
"""
|
||
if os.path.isdir(start_path):
|
||
sizes = sorted(map(lambda y: y.stat(follow_symlinks=False).st_size,
|
||
filter(lambda x: has_media_ext(x.name), scantree(start_path))))
|
||
if sizes:
|
||
return len(sizes), sizes[0], sizes[-1], int(sum(sizes) / len(sizes))
|
||
|
||
elif os.path.isfile(start_path):
|
||
size = os.path.getsize(start_path)
|
||
return 1, size, size, size
|
||
|
||
return 0, 0, 0, 0
|
||
|
||
|
||
def remove_article(text=''):
|
||
"""
|
||
remove articles from text
|
||
|
||
:param text: input text
|
||
:type text: AnyStr
|
||
:return: text without articles
|
||
:rtype: AnyStr
|
||
"""
|
||
return re.sub(r'(?i)^(?:A(?!\s+to)n?|The)\s(\w)', r'\1', text)
|
||
|
||
|
||
def re_valid_hostname(with_allowed=True):
|
||
this_host = socket.gethostname()
|
||
return re.compile(r'(?i)(%slocalhost|.*\.local%s%s)$' % (
|
||
(with_allowed
|
||
and '%s|' % (sickgear.ALLOWED_HOSTS
|
||
and '|'.join(re.escape(x.strip()) for x in sickgear.ALLOWED_HOSTS.split(','))
|
||
or '.*')
|
||
or ''),
|
||
bool(this_host) and ('|%s' % this_host) or '',
|
||
sickgear.ALLOW_ANYIP and ('|%s' % valid_ipaddr_expr()) or ''))
|
||
|
||
|
||
def valid_ipaddr_expr():
|
||
"""
|
||
Returns a regular expression that will validate an ip address
|
||
:return: Regular expression
|
||
:rtype: String
|
||
"""
|
||
return r'(%s)' % '|'.join([re.sub(r'\s+(#.[^\r\n]+)?', '', x) for x in [
|
||
# IPv4 address (accurate)
|
||
# Matches 0.0.0.0 through 255.255.255.255
|
||
r'''
|
||
(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])
|
||
''',
|
||
# IPv6 address (standard and mixed)
|
||
# 8 hexadecimal words, or 6 hexadecimal words followed by 4 decimal bytes All with optional leading zeros
|
||
r'''
|
||
(?:(?<![:.\w])\[? # Anchor address
|
||
(?:[A-F0-9]{1,4}:){6} # 6 words
|
||
(?:[A-F0-9]{1,4}:[A-F0-9]{1,4} # 2 words
|
||
| (?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3} # or 4 bytes
|
||
(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])
|
||
)(?![:.\w])) # Anchor address
|
||
''',
|
||
# IPv6 address (compressed and compressed mixed)
|
||
# 8 hexadecimal words, or 6 hexadecimal words followed by 4 decimal bytes
|
||
# All with optional leading zeros. Consecutive zeros may be replaced with ::
|
||
r'''
|
||
(?:(?<![:.\w])\[?(?: # Anchor address
|
||
(?: # Mixed
|
||
(?:[A-F0-9]{1,4}:){6} # Non-compressed
|
||
|(?=(?:[A-F0-9]{0,4}:){2,6} # Compressed with 2 to 6 colons
|
||
(?:[0-9]{1,3}\.){3}[0-9]{1,3} # and 4 bytes
|
||
(?![:.\w])) # and anchored
|
||
(([0-9A-F]{1,4}:){1,5}|:)((:[0-9A-F]{1,4}){1,5}:|:) # and at most 1 double colon
|
||
|::(?:[A-F0-9]{1,4}:){5} # Compressed with 7 colons and 5 numbers
|
||
)
|
||
(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3} # 255.255.255.
|
||
(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9]) # 255
|
||
| # Standard
|
||
(?:[A-F0-9]{1,4}:){7}[A-F0-9]{1,4} # Standard
|
||
| # Compressed
|
||
(?=(?:[A-F0-9]{0,4}:){0,7}[A-F0-9]{0,4} # Compressed with at most 7 colons
|
||
(?![:.\w])) # and anchored
|
||
(([0-9A-F]{1,4}:){1,7}|:)((:[0-9A-F]{1,4}){1,7}|:) # and at most 1 double colon
|
||
|(?:[A-F0-9]{1,4}:){7}:|:(:[A-F0-9]{1,4}){7} # Compressed with 8 colons
|
||
)(?![:.\w])) # Anchor address
|
||
'''
|
||
]])
|
||
|
||
|
||
def build_dict(seq, key):
|
||
"""
|
||
|
||
:param seq: Iterable sequence
|
||
:type seq: Iterable
|
||
:param key: key
|
||
:type key: AnyStr
|
||
:return: returns dict
|
||
:rtype: Dict
|
||
"""
|
||
return dict([(d[key], dict(d, index=index)) for (index, d) in enumerate(seq)])
|
||
|
||
|
||
def client_host(server_host):
|
||
"""Extracted from cherrypy libs
|
||
Return the host on which a client can connect to the given listener."""
|
||
if '0.0.0.0' == server_host:
|
||
# 0.0.0.0 is INADDR_ANY, which should answer on localhost.
|
||
return '127.0.0.1'
|
||
if server_host in ('::', '::0', '::0.0.0.0'):
|
||
# :: is IN6ADDR_ANY, which should answer on localhost.
|
||
# ::0 and ::0.0.0.0 are non-canonical but common ways to write
|
||
# IN6ADDR_ANY.
|
||
return '::1'
|
||
return server_host
|
||
|
||
|
||
def wait_for_free_port(host, port):
|
||
"""Extracted from cherrypy libs
|
||
Wait for the specified port to become free (drop requests)."""
|
||
if not host:
|
||
raise ValueError("Host values of '' or None are not allowed.")
|
||
for trial in range(50):
|
||
try:
|
||
# we are expecting a free port, so reduce the timeout
|
||
check_port(host, port, timeout=0.1)
|
||
except IOError:
|
||
# Give the old server thread time to free the port.
|
||
time.sleep(0.1)
|
||
else:
|
||
return
|
||
|
||
raise IOError("Port %r is not free on %r" % (port, host))
|
||
|
||
|
||
def check_port(host, port, timeout=1.0):
|
||
"""Extracted from cherrypy libs
|
||
Raise an error if the given port is not free on the given host."""
|
||
if not host:
|
||
raise ValueError("Host values of '' or None are not allowed.")
|
||
host = client_host(host)
|
||
port = int(port)
|
||
|
||
import socket
|
||
|
||
# AF_INET or AF_INET6 socket
|
||
# Get the correct address family for our host (allows IPv6 addresses)
|
||
try:
|
||
info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
|
||
socket.SOCK_STREAM)
|
||
except socket.gaierror:
|
||
if ':' in host:
|
||
info = [(socket.AF_INET6, socket.SOCK_STREAM, 0, "", (host, port, 0, 0))]
|
||
else:
|
||
info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", (host, port))]
|
||
|
||
for res in info:
|
||
af, socktype, proto, canonname, sa = res
|
||
s = None
|
||
try:
|
||
s = socket.socket(af, socktype, proto)
|
||
# See http://groups.google.com/group/cherrypy-users/browse_frm/thread/bbfe5eb39c904fe0
|
||
s.settimeout(timeout)
|
||
s.connect((host, port))
|
||
s.close()
|
||
raise IOError("Port %s is in use on %s; perhaps the previous "
|
||
"httpserver did not shut down properly." %
|
||
(repr(port), repr(host)))
|
||
except socket.error:
|
||
if s:
|
||
s.close()
|
||
|
||
|
||
def clear_unused_providers():
|
||
providers = [x.cache.providerID for x in sickgear.providers.sorted_sources() if x.is_active()]
|
||
|
||
if providers:
|
||
my_db = db.DBConnection('cache.db')
|
||
my_db.action('DELETE FROM provider_cache WHERE provider NOT IN (%s)' % ','.join(['?'] * len(providers)),
|
||
providers)
|
||
|
||
|
||
def make_search_segment_html_string(segment, max_eps=5):
|
||
seg_str = ''
|
||
if segment and not isinstance(segment, list):
|
||
segment = [segment]
|
||
if segment and len(segment) > max_eps:
|
||
seasons = [x for x in set([x.season for x in segment])]
|
||
seg_str = f'Season{maybe_plural(len(seasons))}: '
|
||
divider = ''
|
||
for x in seasons:
|
||
eps = [str(s.episode) for s in segment if x == s.season]
|
||
ep_c = len(eps)
|
||
seg_str += '%s%s <span title="Episode%s: %s">(%s Ep%s)</span>' \
|
||
% (divider, x, maybe_plural(ep_c), ', '.join(eps), ep_c, maybe_plural(ep_c))
|
||
divider = ', '
|
||
elif segment:
|
||
episode_numbers = ['S%sE%s' % (str(x.season).zfill(2), str(x.episode).zfill(2)) for x in segment]
|
||
seg_str = f'Episode{maybe_plural(len(episode_numbers))}: {", ".join(episode_numbers)}'
|
||
return seg_str
|
||
|
||
|
||
def has_anime():
|
||
"""
|
||
:return: if there are any anime shows in show list
|
||
:rtype: bool
|
||
"""
|
||
# noinspection PyTypeChecker
|
||
return False if not sickgear.showList else any(filter(lambda show: show.is_anime, sickgear.showList))
|
||
|
||
|
||
def cpu_sleep():
|
||
if cpu_presets[sickgear.CPU_PRESET]:
|
||
time.sleep(cpu_presets[sickgear.CPU_PRESET])
|
||
|
||
|
||
def cleanup_cache():
|
||
"""
|
||
Delete old cached files
|
||
"""
|
||
delete_not_changed_in(
|
||
[os.path.join(sickgear.CACHE_DIR, 'images', 'browse', 'thumb', x)
|
||
for x in ['anidb', 'imdb', 'trakt', 'tvdb']] +
|
||
[os.path.join(sickgear.CACHE_DIR, 'images', x)
|
||
for x in ['characters', 'person']] +
|
||
[os.path.join(sickgear.CACHE_DIR, 'tvinfo_cache')])
|
||
|
||
|
||
def delete_not_changed_in(paths, days=30, minutes=0):
|
||
"""
|
||
Delete files under paths not changed in n days and/or n minutes.
|
||
If a file was modified later than days/and or minutes, then don't delete it.
|
||
|
||
:param paths: Path(s) to scan for files to delete
|
||
:type paths: String or List of strings
|
||
:param days: Purge files not modified in this number of days (default: 30 days)
|
||
:param minutes: Purge files not modified in this number of minutes (default: 0 minutes)
|
||
:return: tuple; number of files that qualify for deletion, number of qualifying files that failed to be deleted
|
||
"""
|
||
del_time = SGDatetime.timestamp_near(td=datetime.timedelta(days=days, minutes=minutes))
|
||
errors = 0
|
||
qualified = 0
|
||
for cur_path in (paths, [paths])[not isinstance(paths, list)]:
|
||
try:
|
||
for direntry in scantree(cur_path, filter_kind=False):
|
||
if del_time > direntry.stat(follow_symlinks=False).st_mtime:
|
||
if not remove_file_perm(direntry.path):
|
||
errors += 1
|
||
qualified += 1
|
||
except (BaseException, Exception):
|
||
pass
|
||
return qualified, errors
|
||
|
||
|
||
def set_file_timestamp(filename, min_age=3, new_time=None):
|
||
"""
|
||
|
||
:param filename: filename
|
||
:type filename: AnyStr
|
||
:param min_age: minimum age in days
|
||
:type min_age: int
|
||
:param new_time:
|
||
:type new_time: None or int
|
||
"""
|
||
min_time = SGDatetime.timestamp_near(td=datetime.timedelta(days=min_age))
|
||
try:
|
||
if os.path.isfile(filename) and os.path.getmtime(filename) < min_time:
|
||
os.utime(filename, new_time)
|
||
except (BaseException, Exception):
|
||
pass
|
||
|
||
|
||
def should_delete_episode(status):
|
||
"""
|
||
check if episode should be deleted from db
|
||
|
||
:param status: episode status
|
||
:type status: int
|
||
:return: should be deleted
|
||
:rtype: bool
|
||
"""
|
||
s = Quality.split_composite_status(status)[0]
|
||
if s not in SNATCHED_ANY + [DOWNLOADED, ARCHIVED, IGNORED]:
|
||
return True
|
||
logger.debug('not safe to delete episode from db because of status: %s' % statusStrings[s])
|
||
return False
|
||
|
||
|
||
def is_link(filepath):
|
||
"""
|
||
Check if given file/pathname is symbolic link
|
||
|
||
:param filepath: file or path to check
|
||
:return: True or False
|
||
"""
|
||
if 'win32' == sys.platform:
|
||
if not os.path.exists(filepath):
|
||
return False
|
||
|
||
import ctypes
|
||
invalid_file_attributes = 0xFFFFFFFF
|
||
file_attribute_reparse_point = 0x0400
|
||
|
||
attr = ctypes.windll.kernel32.GetFileAttributesW(text_type(filepath))
|
||
return invalid_file_attributes != attr and 0 != attr & file_attribute_reparse_point
|
||
|
||
return os.path.islink(filepath)
|
||
|
||
|
||
def find_mount_point(path):
|
||
# type: (AnyStr) -> AnyStr
|
||
"""
|
||
returns the mount point for the given path
|
||
|
||
:param path: to find the mount path
|
||
:return: mount point for path or path if no mount
|
||
"""
|
||
result = path
|
||
if os.path.exists(path):
|
||
result = os.path.realpath(os.path.abspath(path))
|
||
try:
|
||
while not os.path.ismount(result):
|
||
new_path = os.path.dirname(result)
|
||
if new_path == result:
|
||
# return input path if mount point not found
|
||
return path
|
||
result = new_path
|
||
except (BaseException, Exception):
|
||
return path
|
||
return result
|
||
|
||
|
||
def df():
|
||
# type: (...) -> Tuple[List[Tuple[AnyStr, AnyStr]], bool]
|
||
"""
|
||
Return disk free space at known parent locations
|
||
|
||
:return: string path, string value that is formatted size
|
||
"""
|
||
result = []
|
||
min_output = True # flag ui to output minimal (e.g. vol: size, vol: size)
|
||
if sickgear.ROOT_DIRS and sickgear.DISPLAY_FREESPACE:
|
||
targets = []
|
||
for cur_target in filter(lambda _t: _t and _t not in targets,
|
||
map(find_mount_point, sickgear.ROOT_DIRS.split('|')[1:])):
|
||
targets += [cur_target]
|
||
free = freespace(cur_target)
|
||
if 'win32' == sys.platform and None is not free:
|
||
cur_target = os.path.splitdrive(cur_target)[0]
|
||
if any(['win32' == sys.platform and not re.match('(?i)[a-z]:(\\\\)?$', cur_target),
|
||
# Windows, if a simple drive letter isn't found, fallback to full path. On Linux, full path is used
|
||
# trigger ui to output long paths instead of minimal volume letters layout
|
||
sys.platform.startswith(('linux', 'darwin', 'sunos5')), 'bsd' in sys.platform]):
|
||
min_output = False
|
||
result += [(cur_target, 'unavailable' if None is free else sizeof_fmt(free, sep=''))]
|
||
|
||
return result, min_output
|
||
|
||
|
||
def freespace(path=None):
|
||
"""
|
||
Return free space available at path location
|
||
|
||
:param path: Example paths (Windows) = '\\\\192.168.0.1\\sharename\\existing_path', 'd:\\existing_path'
|
||
Untested with mount points under linux
|
||
:type path: AnyStr
|
||
:return: Size in bytes
|
||
:rtype: long or None
|
||
"""
|
||
result = None
|
||
|
||
if 'win32' == sys.platform:
|
||
try:
|
||
import ctypes
|
||
if None is not ctypes:
|
||
max_val = (2 ** 64) - 1
|
||
storage = ctypes.c_ulonglong(max_val)
|
||
ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(storage))
|
||
result = (storage.value, None)[max_val == storage.value]
|
||
except (BaseException, Exception):
|
||
pass
|
||
elif sys.platform.startswith(('linux', 'darwin', 'sunos5')) or 'bsd' in sys.platform:
|
||
try:
|
||
storage = os.statvfs(path) # perms errors can result
|
||
result = storage.f_bavail * storage.f_frsize
|
||
except OSError:
|
||
pass
|
||
|
||
return result
|
||
|
||
|
||
def path_mapper(search, replace, subject):
|
||
"""
|
||
Substitute strings in a path
|
||
|
||
:param search: Search text
|
||
:type search: AnyStr
|
||
:param replace: Replacement text
|
||
:type replace: AnyStr
|
||
:param subject: Path text to search
|
||
:type subject: AnyStr
|
||
:return: Subject with or without substitution, True if a change was made otherwise False
|
||
:rtype: Tuple[AnyStr, bool]
|
||
"""
|
||
delim = '/!~!/'
|
||
search = re.sub(r'\\', delim, search)
|
||
replace = re.sub(r'\\', delim, replace)
|
||
path = re.sub(r'\\', delim, subject)
|
||
result = re.sub('(?i)^%s' % search, replace, path)
|
||
result = os.path.normpath(re.sub(delim, '/', result))
|
||
|
||
return result, result != subject
|
||
|
||
|
||
def get_overview(ep_status, show_quality, upgrade_once, split_snatch=False):
|
||
# type: (integer_types, integer_types, Union[integer_types, bool], bool) -> integer_types
|
||
"""
|
||
|
||
:param ep_status: episode status
|
||
:param show_quality: show quality
|
||
:param upgrade_once: upgrade once
|
||
:param split_snatch:
|
||
:type split_snatch: bool
|
||
:return: constant from classes Overview
|
||
"""
|
||
status, quality = Quality.split_composite_status(ep_status)
|
||
if ARCHIVED == status:
|
||
return Overview.GOOD
|
||
if WANTED == status:
|
||
return Overview.WANTED
|
||
if status in (SKIPPED, IGNORED):
|
||
return Overview.SKIPPED
|
||
if status in (UNAIRED, UNKNOWN):
|
||
return Overview.UNAIRED
|
||
if status in [SUBTITLED] + Quality.SNATCHED_ANY + Quality.DOWNLOADED + Quality.FAILED:
|
||
|
||
if FAILED == status:
|
||
return Overview.WANTED
|
||
if not split_snatch and status in SNATCHED_ANY:
|
||
return Overview.SNATCHED
|
||
|
||
void, best_qualities = Quality.split_quality(show_quality)
|
||
# if re-downloads aren't wanted then mark it "good" if there is anything
|
||
if not len(best_qualities):
|
||
return Overview.GOOD
|
||
|
||
min_best, max_best = min(best_qualities), max(best_qualities)
|
||
if quality >= max_best \
|
||
or (upgrade_once and
|
||
(quality in best_qualities or (None is not min_best and quality > min_best))):
|
||
return Overview.GOOD
|
||
return (Overview.QUAL, Overview.SNATCHED_QUAL)[status in SNATCHED_ANY]
|
||
|
||
|
||
def generate_show_dir_name(root_dir, show_name):
|
||
# type: (Optional[AnyStr], AnyStr) -> AnyStr
|
||
"""
|
||
generate show dir name
|
||
|
||
:param root_dir: root dir
|
||
:param show_name: show name
|
||
:return: show dir name
|
||
"""
|
||
san_show_name = sanitize_filename(show_name)
|
||
if sickgear.SHOW_DIRS_WITH_DOTS:
|
||
san_show_name = san_show_name.replace(' ', '.')
|
||
if None is root_dir:
|
||
return san_show_name
|
||
return os.path.join(root_dir, san_show_name)
|
||
|
||
|
||
def count_files_dirs(base_dir):
|
||
"""
|
||
|
||
:param base_dir: path
|
||
:type base_dir: AnyStr
|
||
:return: tuple of count of files, dirs
|
||
:rtype: Tuple[int, int]
|
||
"""
|
||
f = d = 0
|
||
try:
|
||
with scandir(base_dir) as s_d:
|
||
try:
|
||
files = s_d
|
||
except OSError as e:
|
||
logger.warning('Unable to count files %s / %s' % (repr(e), ex(e)))
|
||
else:
|
||
for e in files:
|
||
if e.is_file():
|
||
f += 1
|
||
elif e.is_dir():
|
||
d += 1
|
||
except OSError as e:
|
||
logger.warning('Unable to count files %s / %s' % (repr(e), ex(e)))
|
||
|
||
return f, d
|
||
|
||
|
||
def upgrade_new_naming():
|
||
my_db = db.DBConnection()
|
||
sql_result = my_db.select('SELECT indexer AS tv_id, indexer_id AS prod_id FROM tv_shows')
|
||
show_list = {}
|
||
for cur_result in sql_result:
|
||
show_list[int(cur_result['prod_id'])] = int(cur_result['tv_id'])
|
||
|
||
if sickgear.FANART_RATINGS:
|
||
from sickgear.tv import TVidProdid
|
||
ne = {}
|
||
for k, v in iteritems(sickgear.FANART_RATINGS):
|
||
nk = show_list.get(try_int(k))
|
||
if nk:
|
||
ne[TVidProdid({nk: int(k)})()] = sickgear.FANART_RATINGS[k]
|
||
sickgear.FANART_RATINGS = ne
|
||
sickgear.CFG.setdefault('GUI', {})['fanart_ratings'] = '%s' % ne
|
||
sickgear.CFG.write()
|
||
|
||
image_cache_dir = os.path.join(sickgear.CACHE_DIR, 'images')
|
||
bp_match = re.compile(r'(\d+)\.((?:banner|poster|(?:\d+(?:\.\w*)?\.\w{5,8}\.)?fanart)\.jpg)', flags=re.I)
|
||
|
||
def _set_progress(p_msg, c, s):
|
||
ps = None
|
||
if 0 == s:
|
||
ps = 0
|
||
elif 1 == s and 0 == c:
|
||
ps = 100
|
||
elif 1 > c % s:
|
||
ps = c / s
|
||
if None is not ps:
|
||
sickgear.classes.loading_msg.set_msg_progress(p_msg, '{:6.2f}%'.format(ps))
|
||
|
||
for d in ['', 'thumbnails']:
|
||
bd = os.path.join(image_cache_dir, d)
|
||
if os.path.isdir(bd):
|
||
fc, dc = count_files_dirs(bd)
|
||
step = fc / float(100)
|
||
cf = 0
|
||
p_text = 'Upgrading %s' % (d, 'banner/poster')[not d]
|
||
_set_progress(p_text, 0, 0)
|
||
with scandir(bd) as s_d:
|
||
for entry in scandir(bd):
|
||
if entry.is_file():
|
||
cf += 1
|
||
_set_progress(p_text, cf, step)
|
||
b_s = bp_match.search(entry.name)
|
||
if b_s:
|
||
old_id = int(b_s.group(1))
|
||
tvid = show_list.get(old_id)
|
||
if tvid:
|
||
nb_dir = os.path.join(sickgear.CACHE_DIR, 'images', 'shows', '%s-%s' % (tvid, old_id), d)
|
||
if not os.path.isdir(nb_dir):
|
||
try:
|
||
os.makedirs(nb_dir)
|
||
except (BaseException, Exception):
|
||
pass
|
||
new_name = os.path.join(nb_dir, bp_match.sub(r'\2', entry.name))
|
||
try:
|
||
move_file(entry.path, new_name)
|
||
except (BaseException, Exception) as e:
|
||
logger.warning('Unable to rename %s to %s: %s / %s'
|
||
% (entry.path, new_name, repr(e), ex(e)))
|
||
else:
|
||
# clean up files without reference in db
|
||
try:
|
||
os.remove(entry.path)
|
||
except (BaseException, Exception):
|
||
pass
|
||
elif entry.is_dir():
|
||
if entry.name in ['shows', 'browse']:
|
||
continue
|
||
elif 'fanart' == entry.name:
|
||
_set_progress(p_text, 0, 1)
|
||
fc_fan, dc_fan = count_files_dirs(entry.path)
|
||
step_fan = dc_fan / float(100)
|
||
cf_fan = 0
|
||
p_text = 'Upgrading fanart'
|
||
_set_progress(p_text, 0, 0)
|
||
try:
|
||
with scandir(entry.path) as s_p:
|
||
try:
|
||
entries = s_p
|
||
except OSError as e:
|
||
logger.warning('Unable to stat dirs %s / %s' % (repr(e), ex(e)))
|
||
continue
|
||
for d_entry in entries:
|
||
if d_entry.is_dir():
|
||
cf_fan += 1
|
||
_set_progress(p_text, cf_fan, step_fan)
|
||
old_id = try_int(d_entry.name)
|
||
if old_id:
|
||
new_id = show_list.get(old_id)
|
||
if new_id:
|
||
new_dir_name = os.path.join(sickgear.CACHE_DIR, 'images', 'shows',
|
||
'%s-%s' % (new_id, old_id), 'fanart')
|
||
try:
|
||
move_file(d_entry.path, new_dir_name)
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(f'Unable to rename {d_entry.path}'
|
||
f' to {new_dir_name}: {repr(e)} / {ex(e)}')
|
||
if os.path.isdir(new_dir_name):
|
||
try:
|
||
with scandir(new_dir_name) as s_d_n:
|
||
try:
|
||
f_n = filter(lambda fn: fn.is_file(), s_d_n)
|
||
except OSError as e:
|
||
logger.warning(
|
||
f'Unable to rename {repr(e)} / {ex(d)}')
|
||
else:
|
||
rename_args = []
|
||
# noinspection PyTypeChecker
|
||
for f_entry in f_n:
|
||
rename_args += [
|
||
(f_entry.path,
|
||
bp_match.sub(r'\2', f_entry.path))]
|
||
|
||
for args in rename_args:
|
||
try:
|
||
move_file(*args)
|
||
except (BaseException, Exception) as e:
|
||
logger.warning(
|
||
f'Unable to rename {args[0]}'
|
||
f' to {args[1]}: {repr(e)} / {ex(e)}')
|
||
except OSError as e:
|
||
logger.warning(
|
||
'Unable to rename %s / %s' % (repr(e), ex(e)))
|
||
else:
|
||
try:
|
||
shutil.rmtree(d_entry.path)
|
||
except (BaseException, Exception):
|
||
pass
|
||
try:
|
||
shutil.rmtree(d_entry.path)
|
||
except (BaseException, Exception):
|
||
pass
|
||
except OSError as e:
|
||
logger.warning('Unable to stat dirs %s / %s' % (repr(e), ex(e)))
|
||
continue
|
||
try:
|
||
os.rmdir(entry.path)
|
||
except (BaseException, Exception):
|
||
pass
|
||
if 'thumbnails' == d:
|
||
try:
|
||
os.rmdir(bd)
|
||
except (BaseException, Exception):
|
||
pass
|
||
_set_progress(p_text, 0, 1)
|
||
|
||
|
||
def xhtml_escape(text, br=True):
|
||
"""
|
||
Escapes a string, so it is valid within HTML or XML using the function from Tornado.
|
||
|
||
:param text: Text to convert entities from for example '"' to '"'
|
||
:type text: AnyStr
|
||
:param br: True, replace newline with html `<br>`
|
||
:type br: bool
|
||
:return: Text with entities replaced
|
||
:rtype: AnyStr
|
||
"""
|
||
if not text:
|
||
return text
|
||
from tornado import escape
|
||
if br:
|
||
text = re.sub(r'\r?\n', '<br>', text)
|
||
|
||
return escape.xhtml_escape(normalise_chars(text))
|
||
|
||
|
||
def normalise_chars(text):
|
||
# noinspection GrazieInspection
|
||
"""
|
||
Normalise characters to maintain a consistent output from different sources, and to prevent issues when sorting
|
||
|
||
e.g.
|
||
curved apostrophe ’ with standard ' apostrophe
|
||
wide dash with standard hyphen
|
||
|
||
:param text: Text to convert entities from for example '"' to '"'
|
||
:type text: AnyStr
|
||
:return: Text with entities replaced
|
||
:rtype: AnyStr
|
||
"""
|
||
result = text.replace('\u2010', '-').replace('\u2011', '-').replace('\u2012', '-') \
|
||
.replace('\u2013', '-').replace('\u2014', '-').replace('\u2015', '-') \
|
||
.replace('\u2018', "'").replace('\u2019', "'") \
|
||
.replace('\u201c', '\"').replace('\u201d', '\"') \
|
||
.replace('\u0020', ' ').replace('\u00a0', ' ')
|
||
|
||
return result
|
||
|
||
|
||
def parse_imdb_id(string):
|
||
# type: (AnyStr) -> Optional[AnyStr]
|
||
""" Parse an IMDB ID from a string
|
||
|
||
:param string: string to parse
|
||
:return: parsed ID of the form char, char, number of digits or None if no match
|
||
"""
|
||
result = None
|
||
try:
|
||
result = RC_IMDB_ID.findall(string)[0]
|
||
except(BaseException, Exception):
|
||
pass
|
||
|
||
return result
|
||
|
||
|
||
def generate_word_str(words, regex=False, join_chr=','):
|
||
# type: (Set[AnyStr], bool, AnyStr) -> AnyStr
|
||
"""
|
||
combine a list or set to a string with optional prefix 'regex:'
|
||
|
||
:param words: list or set of words
|
||
:type words: set
|
||
:param regex: prefix regex: ?
|
||
:type regex: bool
|
||
:param join_chr: character(s) used for join words
|
||
:type join_chr: basestring
|
||
:return: combined string
|
||
:rtype: basestring
|
||
"""
|
||
return '%s%s' % (('', 'regex:')[True is regex], join_chr.join(words))
|
||
|
||
|
||
def split_word_str(word_list):
|
||
# type: (AnyStr) -> Tuple[Set[AnyStr], bool]
|
||
"""
|
||
split string into set and boolean regex
|
||
|
||
:param word_list: string with words
|
||
:type word_list: basestring
|
||
:return: set of words, is it regex
|
||
:rtype: (set, bool)
|
||
"""
|
||
try:
|
||
if word_list.startswith('regex:'):
|
||
rx = True
|
||
word_list = word_list.replace('regex:', '')
|
||
else:
|
||
rx = False
|
||
s = set(w.strip() for w in word_list.split(',') if w.strip())
|
||
except (BaseException, Exception):
|
||
rx = False
|
||
s = set()
|
||
return s, rx
|