# coding=utf-8
#
# This file is part of SickGear.
#
# SickGear is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SickGear is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SickGear.  If not, see <http://www.gnu.org/licenses/>.

from itertools import cycle
import datetime
import hashlib
import os
import re
import shutil
import socket
import time
import uuid
import sys

import sickgear
from . import db, logger, notifiers
from .common import cpu_presets, mediaExtensions, Overview, Quality, statusStrings, subtitleExtensions, \
    ARCHIVED, DOWNLOADED, FAILED, IGNORED, SKIPPED, SNATCHED_ANY, SUBTITLED, UNAIRED, UNKNOWN, WANTED
from .sgdatetime import SGDatetime
from lib.tvinfo_base.exceptions import *
from exceptions_helper import ex, MultipleShowObjectsException

import dateutil.parser
import requests
import requests.exceptions
import subliminal
from lxml_etree import etree, is_lxml
from base64 import decodebytes as b64decodebytes, encodebytes as b64encodebytes

from _23 import decode_bytes, decode_str, scandir
from six import iteritems, string_types, text_type
# noinspection PyUnresolvedReferences
from six.moves import zip

# the following are imported from elsewhere,
# therefore, they intentionally don't resolve and are unused in this particular file.
# noinspection PyUnresolvedReferences
from sg_helpers import chmod_as_parent, clean_data, copy_file, download_file, fix_set_group_id, get_system_temp_dir, \
    get_url, indent_xml, make_path, maybe_plural, md5_for_text, move_file, proxy_setting, remove_file, \
    remove_file_perm, replace_extension, sanitize_filename, scantree, touch_file, try_int, try_ord, write_file

# deprecated item, remove in 2020, kept here as rollback uses it
copyFile = copy_file
moveFile = move_file
tryInt = try_int  # one legacy custom provider is keeping this signature here

# noinspection PyUnreachableCode
if False:
    # noinspection PyUnresolvedReferences
    from typing import Any, AnyStr, Dict, Generator, NoReturn, Iterable, Iterator, List, Optional, Set, Tuple, Union
    from .tv import TVShow
    # the following workaround hack resolves a pyc resolution bug
    from .name_cache import retrieve_name_from_cache
    from six import integer_types

RE_XML_ENCODING = re.compile(r'^(<\?xml[^>]+)\s+(encoding\s*=\s*[\"\'][^\"\']*[\"\'])(\s*\?>|)', re.U)
RE_IMDB_ID = r'tt\d{7,10}'
RC_IMDB_ID = re.compile(r'(?i)(%s)' % RE_IMDB_ID)


def remove_extension(name):
    """
    Remove download or media extension from name (if any)

    :param name: filename
    :type name: AnyStr
    :return: name without extension
    :rtype: AnyStr
    """

    if name and "." in name:
        base_name, sep, extension = name.rpartition('.')
        if base_name and extension.lower() in ['nzb', 'torrent'] + mediaExtensions:
            name = base_name

    return name


def is_parsable_date(text):
    try:
        result = isinstance(dateutil.parser.parse(text), datetime.datetime)
    except (BaseException, Exception):
        result = False
    return result


def remove_non_release_groups(name, is_anime=False):
    """
    Remove non release groups from name

    :param name: release name
    :type name: AnyStr
    :param is_anime: is anmie
    :type is_anime: bool
    :return:
    :rtype: AnyStr
    """

    if name:
        rc = [re.compile(r'(?i)' + v) for v in [
              r'([\s\.\-_\[\{\(]*(no-rar|nzbgeek|ripsalot|siklopentan)[\s\.\-_\]\}\)]*)$',
              r'([\s\.\-_\[\{\(]rp[\s\.\-_\]\}\)]*)$',
              r'(?<=\w)([\s\.\-_]*[\[\{\(][\s\.\-_]*(www\.\w+.\w+)[\s\.\-_]*[\]\}\)][\s\.\-_]*)$',
              r'(?<=\w)([\s\.\-_]*[\[\{\(]\s*(rar(bg|tv)|((e[tz]|v)tv))[\s\.\-_]*[\]\}\)][\s\.\-_]*)$'] +
              ([r'(?<=\w)([\s\.\-_]*[\[\{\(][\s\.\-_]*[\w\s\.\-\_]+[\s\.\-_]*[\]\}\)][\s\.\-_]*)$',
                r'^([\s\.\-_]*[\[\{\(][\s\.\-_]*[\w\s\.\-\_]+[\s\.\-_]*[\]\}\)][\s\.\-_]*)(?=\w)'], [])[is_anime]]
        rename = name = remove_extension(name)
        while rename:
            for regex in rc:
                result = regex.findall(name)
                if result:
                    for cur_match in isinstance(result[0], tuple) and result[0] or result:
                        if not is_parsable_date(cur_match.strip(' ()')):
                            name = regex.sub('', name)
            rename = (name, False)[name == rename]

    return name


def is_sync_file(filename):
    """

    :param filename: filename
    :type filename: AnyStr
    :return:
    :rtype: bool
    """
    extension = filename.rpartition(".")[2].lower()
    return '!sync' == extension or 'lftp-pget-status' == extension


def has_media_ext(filename):
    """
    checks if file has media extension

    :param filename: filename
    :type filename: AnyStr
    :return:
    :rtype: bool
    """
    # ignore samples
    if re.search(r'(^|[\W_])(sample\d*)[\W_]', filename, re.I) \
            or filename.startswith('._'):  # and MAC OS's 'resource fork' files
        return False

    sep_file = filename.rpartition('.')
    return (None is re.search('extras?$', sep_file[0], re.I)) and (sep_file[2].lower() in mediaExtensions)


def has_image_ext(filename):
    """
    checks if file has image extension

    :param filename: filename
    :type filename: AnyStr
    :return:
    :rtype: bool
    """
    try:
        if os.path.splitext(filename)[1].lower() in ['.bmp', '.gif', '.jpeg', '.jpg', '.png', '.webp']:
            return True
    except (BaseException, Exception):
        pass
    return False


def is_first_rar_volume(filename):
    """
    checks if file is part of rar set

    :param filename: filename
    :type filename: AnyStr
    :return:
    :rtype: bool
    """
    return None is not re.search(r'(?P<file>^(?P<base>(?:(?!\.part\d+\.rar$).)*)\.(?:(?:part0*1\.)?rar)$)', filename)


def find_show_by_id(
        show_id,  # type: Union[AnyStr, Dict[int, int], int]
        show_list=None,  # type: Optional[List[TVShow]]
        no_mapped_ids=True,  # type: bool
        check_multishow=False,  # type: bool
        no_exceptions=False  # type: bool
):
    # type: (...) -> Optional[TVShow]
    """
    :param show_id: {indexer: id} or 'tvid_prodid'.
    :param show_list: (optional) TVShow objects list
    :param no_mapped_ids: don't check mapped ids
    :param check_multishow: check for multiple show matches
    :param no_exceptions: suppress the MultipleShowObjectsException and return None instead
    """
    results = []
    if None is show_list:
        show_list = sickgear.showList

    if show_id and show_list:
        tvid_prodid_obj = None if not isinstance(show_id, string_types) else sickgear.tv.TVidProdid(show_id)

        if tvid_prodid_obj and no_mapped_ids:
            if None is tvid_prodid_obj.prodid:
                return
            return sickgear.showDict.get(int(tvid_prodid_obj))
        else:
            if tvid_prodid_obj:
                if None is tvid_prodid_obj.prodid:
                    return None
                show_id = tvid_prodid_obj.dict

            if isinstance(show_id, dict):
                if no_mapped_ids:
                    sid_int_list = [sickgear.tv.TVShow.create_sid(k, v) for k, v in iteritems(show_id) if k and v and
                                    0 < v and sickgear.tv.tvid_bitmask >= k]
                    if not check_multishow:
                        return next((sickgear.showDict.get(_show_sid_id) for _show_sid_id in sid_int_list
                                     if sickgear.showDict.get(_show_sid_id)), None)
                    results = [sickgear.showDict.get(_show_sid_id) for _show_sid_id in sid_int_list
                               if sickgear.showDict.get(_show_sid_id)]
                else:
                    results = [_show_obj for k, v in iteritems(show_id) if k and v and 0 < v
                               for _show_obj in show_list if v == _show_obj.internal_ids.get(k, {'id': 0})['id']]

    num_shows = len(set(results))
    if 1 == num_shows:
        return results[0]
    if 1 < num_shows and not no_exceptions:
        raise MultipleShowObjectsException()


def make_dir(path):
    """
    create given path recursively

    :param path: path
    :type path: AnyStr
    :return: success of creation
    :rtype: bool
    """
    if not os.path.isdir(path):
        try:
            os.makedirs(path)
            # do a Synology library update
            notifiers.NotifierFactory().get('SYNOINDEX').addFolder(path)
        except OSError:
            return False
    return True


def search_infosrc_for_show_id(reg_show_name, tvid=None, prodid=None, ui=None):
    """
    search info source for show

    :param reg_show_name: show name to search
    :type reg_show_name: AnyStr
    :param tvid: tvid
    :type tvid: int or None
    :param prodid: prodid
    :type prodid: int or long or None
    :param ui: ui class
    :type ui: object
    :return: seriesname, tvid, prodid or None, None, None
    :rtype: Tuple[None, None, None] or Tuple[AnyStr, int, int or long]
    """
    show_names = [re.sub('[. -]', ' ', reg_show_name)]

    # Query Indexers for each search term and build the list of results
    for cur_tvid in (sickgear.TVInfoAPI().sources if not tvid else [int(tvid)]) or []:
        # Query Indexers for each search term and build the list of results
        tvinfo_config = sickgear.TVInfoAPI(cur_tvid).api_params.copy()
        if ui is not None:
            tvinfo_config['custom_ui'] = ui
        t = sickgear.TVInfoAPI(cur_tvid).setup(**tvinfo_config)

        for cur_name in show_names:
            logger.debug('Trying to find %s on %s' % (cur_name, sickgear.TVInfoAPI(cur_tvid).name))

            try:
                if prodid:
                    show_info_list = t.get_show(prodid)
                else:
                    show_info_list = t.search_show(cur_name)
                show_info_list = show_info_list if isinstance(show_info_list, list) else [show_info_list]
            except (BaseException, Exception):
                continue

            seriesname = _prodid = None
            for cur_show_info in show_info_list:  # type: dict
                try:
                    seriesname = cur_show_info['seriesname']
                    _prodid = cur_show_info['id']
                except (BaseException, Exception):
                    _prodid = seriesname = None
                    continue
                if seriesname and _prodid:
                    break

            if not (seriesname and _prodid):
                continue

            if None is prodid and str(cur_name).lower() == str(seriesname).lower():
                return seriesname, cur_tvid, int(_prodid)
            elif None is not prodid and int(prodid) == int(_prodid):
                return seriesname, cur_tvid, int(prodid)

        if tvid:
            break

    return None, None, None


def sizeof_fmt(number, digits=1, sep=' '):
    # type: (int, int, AnyStr) -> AnyStr
    """
    format given bytes to human-readable text

    :param number: value to convert
    :param digits: number of digits after decimal point
    :param sep: seperater of value and dimension
    :return: human-readable formatted text
    """
    for cur_dimension in ['bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']:
        if 1024.0 > number:
            return f'{number:3.{digits}f}{sep}{cur_dimension}'
        number /= 1024.0


def list_media_files(path):
    # type: (AnyStr) -> List[AnyStr]
    """
    list all media files in given path

    :param path: path
    :return: list of media files
    """
    result = []
    if path:
        if [direntry for direntry in scantree(path, include=[r'\.sickgearignore'], filter_kind=False, recurse=False)]:
            logger.debug('Skipping folder "%s" because it contains ".sickgearignore"' % path)
        else:
            result = [direntry.path for direntry in scantree(path, exclude_dirs=[
                '^Extras$',
                '^Behind The Scenes$', '^Deleted Scenes$', '^Featurettes$',
                '^Interviews$', '^Scenes$', '^Shorts$', '^Trailers$', '^Other$'
            ], filter_kind=False, exclude_folders_with_files=['.sickgearignore'])
                      if has_media_ext(direntry.name)]
    return result


def link(src_file, dest_file):
    """

    :param src_file: source file
    :type src_file: AnyStr
    :param dest_file: destination file
    :type dest_file: AnyStr
    """
    if 'nt' == os.name:
        import ctypes

        if 0 == ctypes.windll.kernel32.CreateHardLinkW(text_type(dest_file), text_type(src_file), 0):
            raise ctypes.WinError()
    else:
        os.link(src_file, dest_file)


def hardlink_file(src_file, dest_file):
    """

    :param src_file: source file
    :type src_file: AnyStr
    :param dest_file: destination file
    :type dest_file: AnyStr
    """
    try:
        link(src_file, dest_file)
        fix_set_group_id(dest_file)
    except (BaseException, Exception) as e:
        logger.error(f'Failed to create hardlink of {src_file} at {dest_file}: {ex(e)}. Copying instead.')
        copy_file(src_file, dest_file)


def symlink(src_file, dest_file):
    """

    :param src_file: source file
    :type src_file: AnyStr
    :param dest_file: destination
    :type dest_file: AnyStr
    """
    if 'nt' == os.name:
        import ctypes

        if ctypes.windll.kernel32.CreateSymbolicLinkW(
                text_type(dest_file), text_type(src_file), 1 if os.path.isdir(src_file) else 0) in [0, 1280]:
            raise ctypes.WinError()
    else:
        os.symlink(src_file, dest_file)


def move_and_symlink_file(src_file, dest_file):
    """

    :param src_file: source file
    :type src_file: AnyStr
    :param dest_file: destination file
    :type dest_file: AnyStr
    """
    try:
        shutil.move(src_file, dest_file)
        fix_set_group_id(dest_file)
        symlink(dest_file, src_file)
    except (BaseException, Exception):
        logger.error(f'Failed to create symlink of {src_file} at {dest_file}. Copying instead')
        copy_file(src_file, dest_file)


def rename_ep_file(cur_path, new_path, old_path_length=0, use_rename=False):
    """
    Creates all folders needed to move a file to its new location, renames it, then cleans up any folders
    left that are now empty.

    :param cur_path: The absolute path to the file you want to move/rename
    :type cur_path: AnyStr
    :param new_path: The absolute path to the destination for the file WITHOUT THE EXTENSION
    :type new_path: AnyStr
    :param old_path_length: The length of media file path (old name) WITHOUT THE EXTENSION
    :type old_path_length: int or long
    :param use_rename: use rename instead of shutil.move
    :return: success
    :rtype: bool
    """

    # new_dest_dir, new_dest_name = os.path.split(new_path)

    if 0 == old_path_length or len(cur_path) < old_path_length:
        # approach from the right
        cur_file_name, cur_file_ext = os.path.splitext(cur_path)
    else:
        # approach from the left
        cur_file_ext = cur_path[old_path_length:]
        cur_file_name = cur_path[:old_path_length]

    if cur_file_ext[1:] in subtitleExtensions:
        # Extract subtitle language from filename
        sublang = os.path.splitext(cur_file_name)[1][1:]

        # Check if the language extracted from filename is a valid language
        try:
            _ = subliminal.language.Language(sublang, strict=True)
            cur_file_ext = '.' + sublang + cur_file_ext
        except ValueError:
            pass

    # put the extension on the incoming file
    new_path += cur_file_ext

    make_path(os.path.dirname(new_path), syno=True)

    # move the file
    try:
        logger.log(f'Renaming file from {cur_path} to {new_path}')
        if use_rename:
            os.rename(cur_path, new_path)
        else:
            shutil.move(cur_path, new_path)
    except (OSError, IOError, IsADirectoryError, NotADirectoryError, FileExistsError) as e:
        logger.error(f'Failed renaming {cur_path} to {new_path}: {ex(e)}')
        return False

    # clean up any old folders that are empty
    delete_empty_folders(os.path.dirname(cur_path))

    return True


def delete_empty_folders(check_empty_dir, keep_dir=None):
    """
    Walks backwards up the path and deletes any empty folders found.

    :param check_empty_dir: The path to clean (absolute path to a folder)
    :type check_empty_dir: AnyStr
    :param keep_dir: Clean until this path is reached
    :type keep_dir: bool
    """

    # treat check_empty_dir as empty when it only contains these items
    ignore_items = []

    logger.log(f'Trying to clean any empty folders under {check_empty_dir}')

    # as long as the folder exists and doesn't contain any files, delete it
    while os.path.isdir(check_empty_dir) and check_empty_dir != keep_dir:
        check_files = os.listdir(check_empty_dir)

        if not check_files or (len(check_files) <= len(ignore_items) and all(
                [check_file in ignore_items for check_file in check_files])):
            # directory is empty or contains only ignore_items
            try:
                logger.log(f"Deleting empty folder: {check_empty_dir}")
                # need shutil.rmtree when ignore_items is really implemented
                os.rmdir(check_empty_dir)
                # do a Synology library update
                notifiers.NotifierFactory().get('SYNOINDEX').deleteFolder(check_empty_dir)
            except OSError as e:
                logger.warning(f'Unable to delete {check_empty_dir}: {repr(e)} / {ex(e)}')
                break
            check_empty_dir = os.path.dirname(check_empty_dir)
        else:
            break


def get_absolute_number_from_season_and_episode(show_obj, season, episode):
    """

    :param show_obj: show object
    :type show_obj: TVShow
    :param season: season number
    :type season: int
    :param episode: episode number
    :type episode: int
    :return: absolute number
    :type: int or long
    """
    absolute_number = None

    if season and episode:
        my_db = db.DBConnection()
        sql_result = my_db.select('SELECT *'
                                  ' FROM tv_episodes'
                                  ' WHERE indexer = ? AND showid = ? AND season = ? AND episode = ?',
                                  [show_obj.tvid, show_obj.prodid, season, episode])

        if 1 == len(sql_result):
            absolute_number = int(sql_result[0]["absolute_number"])
            logger.debug(f'Found absolute_number:{absolute_number} by {season}x{episode}')
        else:
            logger.debug('No entries for absolute number in show: %s found using %sx%s' %
                         (show_obj.unique_name, str(season), str(episode)))

    return absolute_number


def get_all_episodes_from_absolute_number(show_obj, absolute_numbers):
    # type: (TVShow, List[int]) -> Tuple[int, List[int]]
    """

    :param show_obj: show object
    :param absolute_numbers: absolute numbers
    """
    episode_numbers = []
    season_number = None

    if show_obj and len(absolute_numbers):
        for absolute_number in absolute_numbers:
            ep_obj = show_obj.get_episode(None, None, absolute_number=absolute_number)
            if ep_obj:
                episode_numbers.append(ep_obj.episode)
                season_number = ep_obj.season  # this takes the last found season so eps that cross the season
                # border are not handled well

    return season_number, episode_numbers


def sanitize_scene_name(name):
    """
    Takes a show name and returns the "scenified" version of it.

    :param name: name
    :type name: AnyStr
    :return: A string containing the scene version of the show name given.
    :rtype: AnyStr
    """
    if name:
        bad_chars = ',:()£\'!?\u2019'

        # strip out any bad chars
        name = re.sub(r'[%s]' % bad_chars, '', name, flags=re.U)

        # tidy up stuff that doesn't belong in scene names
        name = re.sub(r'(-?\s|/)', '.', name).replace('&', 'and')
        name = re.sub(r"\.+", '.', name).rstrip('.')

        return name
    return ''


def create_https_certificates(ssl_cert, ssl_key):
    """
    Create self-signed HTTPS certificares and store in paths 'ssl_cert' and 'ssl_key'
    """

    try:
        from lib.certgen import generate_key, generate_local_cert
    except (BaseException, Exception):
        return False

    private_key = generate_key(key_size=4096, output_file=ssl_key)
    cert = generate_local_cert(private_key, days_valid=3650, output_file=ssl_cert)
    return bool(cert)


if '__main__' == __name__:
    import doctest

    doctest.testmod()


def parse_xml(data, del_xmlns=False):
    # type: (AnyStr, bool) -> Optional[etree.ElementTree]
    """
    Parse data into an xml elementtree.ElementTree

    data: data string containing xml
    del_xmlns: if True, removes xmlns namesspace from data before parsing

    Returns: parsed data as elementtree or None
    """

    if del_xmlns:
        data = re.sub(' xmlns="[^"]+"', '', data)

    if isinstance(data, text_type) and is_lxml:
        data = RE_XML_ENCODING.sub(r'\1\3', data, count=1)

    try:
        parsed_xml = etree.fromstring(data)
    except (BaseException, Exception) as e:
        logger.debug(f"Error trying to parse xml data. Error: {ex(e)}")
        parsed_xml = None

    return parsed_xml


def backup_versioned_file(old_file, version):
    """

    :param old_file: old filename
    :type old_file: AnyStr
    :param version: version number
    :type version: int
    :return: success
    :rtype: bool
    """
    num_tries = 0

    new_file = '%s.v%s' % (old_file, version)

    if os.path.isfile(new_file):
        changed_old_db = False
        for back_nr in range(1, 10000):
            alt_name = '%s.r%s' % (new_file, back_nr)
            if not os.path.isfile(alt_name):
                try:
                    shutil.move(new_file, alt_name)
                    changed_old_db = True
                    break
                except (BaseException, Exception):
                    if os.path.isfile(new_file):
                        continue
                    logger.warning('could not rename old backup db file')
        if not changed_old_db:
            raise Exception('can\'t create a backup of db')

    while not os.path.isfile(new_file):
        if not os.path.isfile(old_file) or 0 == get_size(old_file):
            logger.debug('No need to create backup')
            break

        try:
            logger.debug(f'Trying to back up {old_file} to {new_file}')
            shutil.copy(old_file, new_file)
            logger.debug('Backup done')
            break
        except (BaseException, Exception) as e:
            logger.warning(f'Error while trying to back up {old_file} to {new_file} : {ex(e)}')
            num_tries += 1
            time.sleep(3)
            logger.debug('Trying again.')

        if 3 <= num_tries:
            logger.error(f'Unable to back up {old_file} to {new_file} please do it manually.')
            return False

    return True


def restore_versioned_file(backup_file, version):
    """

    :param backup_file: filename
    :type backup_file: AnyStr
    :param version: version number
    :type version: int
    :return: success
    :rtype: bool
    """
    num_tries = 0

    new_file, backup_version = os.path.splitext(backup_file)
    restore_file = new_file + '.' + 'v' + str(version)

    if not os.path.isfile(new_file):
        logger.debug(f'Not restoring, {new_file} doesn\'t exist')
        return False

    try:
        logger.debug(f'Trying to backup {new_file} to {new_file}.r{version} before restoring backup')
        shutil.move(new_file, new_file + '.' + 'r' + str(version))
    except (BaseException, Exception) as e:
        logger.warning(f'Error while trying to backup DB file {restore_file} before proceeding with restore: {ex(e)}')
        return False

    while not os.path.isfile(new_file):
        if not os.path.isfile(restore_file):
            logger.debug(f'Not restoring, {restore_file} doesn\'t exist')
            break

        try:
            logger.debug(f'Trying to restore {restore_file} to {new_file}')
            shutil.copy(restore_file, new_file)
            logger.debug('Restore done')
            break
        except (BaseException, Exception) as e:
            logger.warning(f'Error while trying to restore {restore_file}: {ex(e)}')
            num_tries += 1
            time.sleep(1)
            logger.debug('Trying again.')

        if 10 <= num_tries:
            logger.error(f'Unable to restore {restore_file} to {new_file} please do it manually.')
            return False

    return True


# try to convert to float, return default on failure
def try_float(s, s_default=0.0):
    try:
        return float(s)
    except (BaseException, Exception):
        return float(s_default)


# generates a md5 hash of a file
def md5_for_file(filename, block_size=2 ** 16):
    """

    :param filename: filename
    :type filename: AnyStr
    :param block_size: block size
    :type block_size: int or long
    :return:
    :rtype: AnyStr or None
    """
    try:
        with open(filename, 'rb') as f:
            md5 = hashlib.md5()
            while True:
                data = f.read(block_size)
                if not data:
                    break
                md5.update(data)
            f.close()
            return md5.hexdigest()
    except (BaseException, Exception):
        return None


def get_lan_ip():
    """
    Simple function to get LAN localhost_ip
    http://stackoverflow.com/questions/11735821/python-get-localhost-ip
    """

    if 'nt' != os.name:
        # noinspection PyUnresolvedReferences
        import fcntl
        import struct

        def get_interface_ip(if_name):
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', if_name[:15]))[20:24])

    ip = socket.gethostbyname(socket.gethostname())
    if ip.startswith("127.") and "nt" != os.name:
        interfaces = [
            "eth0",
            "eth1",
            "eth2",
            "wlan0",
            "wlan1",
            "wifi0",
            "ath0",
            "ath1",
            "ppp0",
        ]
        for ifname in interfaces:
            try:
                # noinspection PyUnboundLocalVariable
                ip = get_interface_ip(ifname)
                print(ifname, ip)
                break
            except IOError:
                pass
    return ip


def check_url(url):
    """
    Check if a URL exists without downloading the whole file.
    :param url: url
    :type url: AnyStr
    :return:
    :rtype: bool
    """
    try:
        return requests.head(url).ok
    except (BaseException, Exception):
        return False


def anon_url(*url):
    """

    :param url: url
    :type url:
    :return: a URL string consisting of the Anonymous redirect URL and an arbitrary number of values appended
    :rtype: AnyStr
    """
    return '' if None in url else '%s%s' % (sickgear.ANON_REDIRECT, ''.join([str(s) for s in url]))


def starify(text, verify=False):
    """
    If verify is true, return true if text is a star block created text else return false.

    :param text: text
    :type text: AnyStr
    :param verify:
    :type verify: bool
    :return: Return text input string with either its latter half or its centre area (if 12 chars or more)
             replaced with asterisks. Useful for securely presenting api keys to a ui.
    """
    return '' if not text\
        else ((('%s%s' % (text[:len(text) // 2], '*' * (len(text) // 2))),
               ('%s%s%s' % (text[:4], '*' * (len(text) - 8), text[-4:])))[12 <= len(text)],
              set('*') == set((text[len(text) // 2:], text[4:-4])[12 <= len(text)]))[verify]


"""
Encryption
==========
By Pedro Jose Pereira Vieito <pvieito@gmail.com> (@pvieito)

* If encryption_version==0 then return data without encryption
* The keys should be unique for each device

To add a new encryption_version:
  1) Code your new encryption_version
  2) Update the last encryption_version available in webserve.py
  3) Remember to maintain old encryption versions and key generators for retrocompatibility
"""

# Key Generators
unique_key1 = hex(uuid.getnode() ** 2)  # Used in encryption v1


# Encryption Functions
def encrypt(data, encryption_version=0, do_decrypt=False):
    # Version 1: Simple XOR encryption (this is not very secure, but works)
    if 1 == encryption_version:
        if do_decrypt:
            return ''.join([chr(try_ord(x) ^ try_ord(y)) for (x, y) in
                            zip(b64decodebytes(decode_bytes(data)), cycle(unique_key1))])

        return decode_str(b64encodebytes(decode_bytes(
            ''.join([chr(try_ord(x) ^ try_ord(y)) for (x, y) in zip(data, cycle(unique_key1))])))).strip()

    # Version 0: Plain text
    return data


def decrypt(data, encryption_version=0):
    return encrypt(data, encryption_version, do_decrypt=True)


def full_sanitize_scene_name(name):
    """
    sanitize scene name

    :param name: name
    :type name: AnyStr
    :return: sanitized name
    :rtype: AnyStr
    """
    return re.sub('[. -]', ' ', sanitize_scene_name(name)).lower().lstrip()


def get_show(name, try_scene_exceptions=False):
    # type: (AnyStr, bool) -> Optional[TVShow]
    """
    get show object for show with given name

    :param name: name of show
    :type name: AnyStr
    :param try_scene_exceptions: check scene exceptions
    :type try_scene_exceptions: bool
    :return: None or show object
    :type: TVShow or None
    """
    if not sickgear.showList or None is name:
        return

    show_obj = None

    try:
        tvid, prodid = sickgear.name_cache.retrieve_name_from_cache(name)
        if tvid and prodid:
            show_obj = find_show_by_id({tvid: prodid})

        if not show_obj and try_scene_exceptions:
            tvid, prodid, season = sickgear.scene_exceptions.get_scene_exception_by_name(name)
            if tvid and prodid:
                show_obj = find_show_by_id({tvid: prodid})
    except (BaseException, Exception) as e:
        logger.debug(f'Error when attempting to find show: {name} in SickGear: {ex(e)}')

    return show_obj


def is_hidden_folder(folder):
    """
    On Linux based systems hidden folders start with . (dot)

    :param folder: Full path of folder to check
    :type folder: AnyStr
    :return: Returns True if folder is hidden
    :rtype: bool
    """
    if os.path.isdir(folder):
        if os.path.basename(folder).startswith('.'):
            return True

    return False


def real_path(path):
    """
    The resulting path will have no symbolic link, '/./' or '/../' components.

    :param path: path
    :type path: AnyStr
    :return: the canonicalized absolute pathname
    :rtype: AnyStr
    """
    return os.path.normpath(os.path.normcase(os.path.realpath(os.path.expanduser(path))))


def validate_show(show_obj, season=None, episode=None):
    """

    :param show_obj: show object
    :type show_obj: TVShow
    :param season: optional season
    :type season: int or None
    :param episode: opitonal episode
    :type episode: int or None
    :return: TVInfoAPI source
    :rtype: object
    """
    show_lang = show_obj.lang

    try:
        tvinfo_config = sickgear.TVInfoAPI(show_obj.tvid).api_params.copy()
        tvinfo_config['dvdorder'] = 0 != show_obj.dvdorder

        if show_lang and not 'en' == show_lang:
            tvinfo_config['language'] = show_lang

        t = sickgear.TVInfoAPI(show_obj.tvid).setup(**tvinfo_config)
        if season is None and episode is None:
            return t

        return t.get_show(show_obj.prodid, language=show_obj.lang)[season][episode]
    except (BaseTVinfoEpisodenotfound, BaseTVinfoSeasonnotfound, TypeError):
        pass


def _maybe_request_url(e, def_url=''):
    return hasattr(e, 'request') and hasattr(e.request, 'url') and ' ' + e.request.url or def_url


def clear_cache(force=False):
    """
    clear sickgear cache folder

    :param force: force clearing
    :type force: bool
    """
    # clean out cache directory, remove everything > 12 hours old
    dirty = None
    del_time = SGDatetime.timestamp_near(td=datetime.timedelta(hours=12))
    direntry_args = dict(follow_symlinks=False)
    for direntry in scantree(sickgear.CACHE_DIR, exclude_dirs=['images|rss|zoneinfo'], follow_symlinks=True):
        if direntry.is_file(**direntry_args) and (force or del_time > direntry.stat(**direntry_args).st_mtime):
            dirty = dirty or False if remove_file_perm(direntry.path) else True
        elif direntry.is_dir(**direntry_args) and direntry.name not in ['cheetah', 'sessions', 'indexers']:
            dirty = dirty or False
            try:
                os.rmdir(direntry.path)
            except OSError:
                dirty = True

    logger.log(
        f'{(("Found items not removed", "Found items removed")[not dirty], "No items found to remove")[None is dirty]}'
        f' from cache folder {sickgear.CACHE_DIR}')


def human(size):
    """
    format a size in bytes into a 'human' file size, e.g. bytes, KB, MB, GB, TB, PB
    Note that bytes/KB will be reported in whole numbers but MB and above will have greater precision
    e.g. 1 byte, 43 bytes, 443 KB, 4.3 MB, 4.43 GB, etc

    :param size: numerical value to be converted
    :type size: int or long or float
    :return: human readable string
    :rtype: AnyStr
    """
    if 1 == size:
        # because I really hate unnecessary plurals
        return "1 byte"

    suffixes_table = [('bytes', 0), ('KB', 0), ('MB', 1), ('GB', 2), ('TB', 2), ('PB', 2)]

    num = float(size)
    for suffix, precision in suffixes_table:
        if 1024.0 > num:
            break
        num /= 1024.0

    # noinspection PyUnboundLocalVariable
    if 0 == precision:
        formatted_size = '%d' % num
    else:
        formatted_size = str(round(num, ndigits=precision))

    # noinspection PyUnboundLocalVariable
    return '%s %s' % (formatted_size, suffix)


def get_size(start_path='.'):
    """
    return combined size of data in given path

    :param start_path: start path
    :type start_path: AnyStr
    :return: size in bytes
    :rtype: int or long
    """
    if os.path.isfile(start_path):
        return os.path.getsize(start_path)
    try:
        return sum(map((lambda x: x.stat(follow_symlinks=False).st_size), scantree(start_path)))
    except OSError:
        return 0


def get_media_stats(start_path='.'):
    # type: (AnyStr) -> Tuple[int, int, int, int]
    """
    return recognised media stats for a folder as...

    number of media files, smallest size in bytes, largest size in bytes, average size in bytes

    :param start_path: path to scan
    """
    if os.path.isdir(start_path):
        sizes = sorted(map(lambda y: y.stat(follow_symlinks=False).st_size,
                           filter(lambda x: has_media_ext(x.name), scantree(start_path))))
        if sizes:
            return len(sizes), sizes[0], sizes[-1], int(sum(sizes) / len(sizes))

    elif os.path.isfile(start_path):
        size = os.path.getsize(start_path)
        return 1, size, size, size

    return 0, 0, 0, 0


def remove_article(text=''):
    """
    remove articles from text

    :param text: input text
    :type text: AnyStr
    :return: text without articles
    :rtype: AnyStr
    """
    return re.sub(r'(?i)^(?:A(?!\s+to)n?|The)\s(\w)', r'\1', text)


def re_valid_hostname(with_allowed=True):
    this_host = socket.gethostname()
    return re.compile(r'(?i)(%slocalhost|.*\.local%s%s)$' % (
        (with_allowed
         and '%s|' % (sickgear.ALLOWED_HOSTS
                      and '|'.join(re.escape(x.strip()) for x in sickgear.ALLOWED_HOSTS.split(','))
                      or '.*')
         or ''),
        bool(this_host) and ('|%s' % this_host) or '',
        sickgear.ALLOW_ANYIP and ('|%s' % valid_ipaddr_expr()) or ''))


def valid_ipaddr_expr():
    """
    Returns a regular expression that will validate an ip address
    :return: Regular expression
    :rtype: String
    """
    return r'(%s)' % '|'.join([re.sub(r'\s+(#.[^\r\n]+)?', '', x) for x in [
        # IPv4 address (accurate)
        #  Matches 0.0.0.0 through 255.255.255.255
        r'''
        (?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])
        ''',
        # IPv6 address (standard and mixed)
        #  8 hexadecimal words, or 6 hexadecimal words followed by 4 decimal bytes All with optional leading zeros
        r'''
        (?:(?<![:.\w])\[?                                            # Anchor address
        (?:[A-F0-9]{1,4}:){6}                                        #    6 words
        (?:[A-F0-9]{1,4}:[A-F0-9]{1,4}                               #    2 words
        |  (?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}  #    or 4 bytes
           (?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])
        )(?![:.\w]))                                                 # Anchor address
        ''',
        # IPv6 address (compressed and compressed mixed)
        #  8 hexadecimal words, or 6 hexadecimal words followed by 4 decimal bytes
        #  All with optional leading zeros.  Consecutive zeros may be replaced with ::
        r'''
        (?:(?<![:.\w])\[?(?:                                       # Anchor address
         (?:  # Mixed
          (?:[A-F0-9]{1,4}:){6}                                    # Non-compressed
         |(?=(?:[A-F0-9]{0,4}:){2,6}                               # Compressed with 2 to 6 colons
             (?:[0-9]{1,3}\.){3}[0-9]{1,3}                         #    and 4 bytes
             (?![:.\w]))                                           #    and anchored
          (([0-9A-F]{1,4}:){1,5}|:)((:[0-9A-F]{1,4}){1,5}:|:)      #    and at most 1 double colon
         |::(?:[A-F0-9]{1,4}:){5}                                  # Compressed with 7 colons and 5 numbers
         )
         (?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}  # 255.255.255.
         (?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])           # 255
        |     # Standard
         (?:[A-F0-9]{1,4}:){7}[A-F0-9]{1,4}                        # Standard
        |     # Compressed
         (?=(?:[A-F0-9]{0,4}:){0,7}[A-F0-9]{0,4}                   # Compressed with at most 7 colons
            (?![:.\w]))                                            #    and anchored
         (([0-9A-F]{1,4}:){1,7}|:)((:[0-9A-F]{1,4}){1,7}|:)        #    and at most 1 double colon
        |(?:[A-F0-9]{1,4}:){7}:|:(:[A-F0-9]{1,4}){7}               # Compressed with 8 colons
        )(?![:.\w]))                                               # Anchor address
        '''
    ]])


def build_dict(seq, key):
    """

    :param seq: Iterable sequence
    :type seq: Iterable
    :param key: key
    :type key: AnyStr
    :return: returns dict
    :rtype: Dict
    """
    return dict([(d[key], dict(d, index=index)) for (index, d) in enumerate(seq)])


def client_host(server_host):
    """Extracted from cherrypy libs
    Return the host on which a client can connect to the given listener."""
    if '0.0.0.0' == server_host:
        # 0.0.0.0 is INADDR_ANY, which should answer on localhost.
        return '127.0.0.1'
    if server_host in ('::', '::0', '::0.0.0.0'):
        # :: is IN6ADDR_ANY, which should answer on localhost.
        # ::0 and ::0.0.0.0 are non-canonical but common ways to write
        # IN6ADDR_ANY.
        return '::1'
    return server_host


def wait_for_free_port(host, port):
    """Extracted from cherrypy libs
    Wait for the specified port to become free (drop requests)."""
    if not host:
        raise ValueError("Host values of '' or None are not allowed.")
    for trial in range(50):
        try:
            # we are expecting a free port, so reduce the timeout
            check_port(host, port, timeout=0.1)
        except IOError:
            # Give the old server thread time to free the port.
            time.sleep(0.1)
        else:
            return

    raise IOError("Port %r is not free on %r" % (port, host))


def check_port(host, port, timeout=1.0):
    """Extracted from cherrypy libs
    Raise an error if the given port is not free on the given host."""
    if not host:
        raise ValueError("Host values of '' or None are not allowed.")
    host = client_host(host)
    port = int(port)

    import socket

    # AF_INET or AF_INET6 socket
    # Get the correct address family for our host (allows IPv6 addresses)
    try:
        info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
                                  socket.SOCK_STREAM)
    except socket.gaierror:
        if ':' in host:
            info = [(socket.AF_INET6, socket.SOCK_STREAM, 0, "", (host, port, 0, 0))]
        else:
            info = [(socket.AF_INET, socket.SOCK_STREAM, 0, "", (host, port))]

    for res in info:
        af, socktype, proto, canonname, sa = res
        s = None
        try:
            s = socket.socket(af, socktype, proto)
            # See http://groups.google.com/group/cherrypy-users/browse_frm/thread/bbfe5eb39c904fe0
            s.settimeout(timeout)
            s.connect((host, port))
            s.close()
            raise IOError("Port %s is in use on %s; perhaps the previous "
                          "httpserver did not shut down properly." %
                          (repr(port), repr(host)))
        except socket.error:
            if s:
                s.close()


def clear_unused_providers():
    providers = [x.cache.providerID for x in sickgear.providers.sorted_sources() if x.is_active()]

    if providers:
        my_db = db.DBConnection('cache.db')
        my_db.action('DELETE FROM provider_cache WHERE provider NOT IN (%s)' % ','.join(['?'] * len(providers)),
                     providers)


def make_search_segment_html_string(segment, max_eps=5):
    seg_str = ''
    if segment and not isinstance(segment, list):
        segment = [segment]
    if segment and len(segment) > max_eps:
        seasons = [x for x in set([x.season for x in segment])]
        seg_str = f'Season{maybe_plural(len(seasons))}: '
        divider = ''
        for x in seasons:
            eps = [str(s.episode) for s in segment if x == s.season]
            ep_c = len(eps)
            seg_str += '%s%s <span title="Episode%s: %s">(%s Ep%s)</span>' \
                       % (divider, x, maybe_plural(ep_c), ', '.join(eps), ep_c, maybe_plural(ep_c))
            divider = ', '
    elif segment:
        episode_numbers = ['S%sE%s' % (str(x.season).zfill(2), str(x.episode).zfill(2)) for x in segment]
        seg_str = f'Episode{maybe_plural(len(episode_numbers))}: {", ".join(episode_numbers)}'
    return seg_str


def has_anime():
    """
    :return: if there are any anime shows in show list
    :rtype: bool
    """
    # noinspection PyTypeChecker
    return False if not sickgear.showList else any(filter(lambda show: show.is_anime, sickgear.showList))


def cpu_sleep():
    if cpu_presets[sickgear.CPU_PRESET]:
        time.sleep(cpu_presets[sickgear.CPU_PRESET])


def cleanup_cache():
    """
    Delete old cached files
    """
    delete_not_changed_in(
        [os.path.join(sickgear.CACHE_DIR, 'images', 'browse', 'thumb', x)
         for x in ['anidb', 'imdb', 'trakt', 'tvdb']] +
        [os.path.join(sickgear.CACHE_DIR, 'images', x)
         for x in ['characters', 'person']] +
        [os.path.join(sickgear.CACHE_DIR, 'tvinfo_cache')])


def delete_not_changed_in(paths, days=30, minutes=0):
    """
    Delete files under paths not changed in n days and/or n minutes.
    If a file was modified later than days/and or minutes, then don't delete it.

    :param paths: Path(s) to scan for files to delete
    :type paths: String or List of strings
    :param days: Purge files not modified in this number of days (default: 30 days)
    :param minutes: Purge files not modified in this number of minutes (default: 0 minutes)
    :return: tuple; number of files that qualify for deletion, number of qualifying files that failed to be deleted
    """
    del_time = SGDatetime.timestamp_near(td=datetime.timedelta(days=days, minutes=minutes))
    errors = 0
    qualified = 0
    for cur_path in (paths, [paths])[not isinstance(paths, list)]:
        try:
            for direntry in scantree(cur_path, filter_kind=False):
                if del_time > direntry.stat(follow_symlinks=False).st_mtime:
                    if not remove_file_perm(direntry.path):
                        errors += 1
                    qualified += 1
        except (BaseException, Exception):
            pass
    return qualified, errors


def set_file_timestamp(filename, min_age=3, new_time=None):
    """

    :param filename: filename
    :type filename: AnyStr
    :param min_age: minimum age in days
    :type min_age: int
    :param new_time:
    :type new_time: None or int
    """
    min_time = SGDatetime.timestamp_near(td=datetime.timedelta(days=min_age))
    try:
        if os.path.isfile(filename) and os.path.getmtime(filename) < min_time:
            os.utime(filename, new_time)
    except (BaseException, Exception):
        pass


def should_delete_episode(status):
    """
    check if episode should be deleted from db

    :param status: episode status
    :type status: int
    :return: should be deleted
    :rtype: bool
    """
    s = Quality.split_composite_status(status)[0]
    if s not in SNATCHED_ANY + [DOWNLOADED, ARCHIVED, IGNORED]:
        return True
    logger.debug('not safe to delete episode from db because of status: %s' % statusStrings[s])
    return False


def is_link(filepath):
    """
    Check if given file/pathname is symbolic link

    :param filepath: file or path to check
    :return: True or False
    """
    if 'win32' == sys.platform:
        if not os.path.exists(filepath):
            return False

        import ctypes
        invalid_file_attributes = 0xFFFFFFFF
        file_attribute_reparse_point = 0x0400

        attr = ctypes.windll.kernel32.GetFileAttributesW(text_type(filepath))
        return invalid_file_attributes != attr and 0 != attr & file_attribute_reparse_point

    return os.path.islink(filepath)


def find_mount_point(path):
    # type: (AnyStr) -> AnyStr
    """
    returns the mount point for the given path

    :param path: to find the mount path
    :return: mount point for path or path if no mount
    """
    result = path
    if os.path.exists(path):
        result = os.path.realpath(os.path.abspath(path))
        try:
            while not os.path.ismount(result):
                new_path = os.path.dirname(result)
                if new_path == result:
                    # return input path if mount point not found
                    return path
                result = new_path
        except (BaseException, Exception):
            return path
    return result


def df():
    # type: (...) -> Tuple[List[Tuple[AnyStr, AnyStr]], bool]
    """
    Return disk free space at known parent locations

    :return: string path, string value that is formatted size
    """
    result = []
    min_output = True  # flag ui to output minimal (e.g. vol: size, vol: size)
    if sickgear.ROOT_DIRS and sickgear.DISPLAY_FREESPACE:
        targets = []
        for cur_target in filter(lambda _t: _t and _t not in targets,
                                 map(find_mount_point, sickgear.ROOT_DIRS.split('|')[1:])):
            targets += [cur_target]
            free = freespace(cur_target)
            if 'win32' == sys.platform and None is not free:
                cur_target = os.path.splitdrive(cur_target)[0]
            if any(['win32' == sys.platform and not re.match('(?i)[a-z]:(\\\\)?$', cur_target),
                    # Windows, if a simple drive letter isn't found, fallback to full path. On Linux, full path is used
                    # trigger ui to output long paths instead of minimal volume letters layout
                    sys.platform.startswith(('linux', 'darwin', 'sunos5')), 'bsd' in sys.platform]):
                min_output = False
            result += [(cur_target, 'unavailable' if None is free else sizeof_fmt(free, sep=''))]

    return result, min_output


def freespace(path=None):
    """
    Return free space available at path location

    :param path: Example paths (Windows) = '\\\\192.168.0.1\\sharename\\existing_path', 'd:\\existing_path'
                 Untested with mount points under linux
    :type path: AnyStr
    :return: Size in bytes
    :rtype: long or None
    """
    result = None

    if 'win32' == sys.platform:
        try:
            import ctypes
            if None is not ctypes:
                max_val = (2 ** 64) - 1
                storage = ctypes.c_ulonglong(max_val)
                ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(storage))
                result = (storage.value, None)[max_val == storage.value]
        except (BaseException, Exception):
            pass
    elif sys.platform.startswith(('linux', 'darwin', 'sunos5')) or 'bsd' in sys.platform:
        try:
            storage = os.statvfs(path)  # perms errors can result
            result = storage.f_bavail * storage.f_frsize
        except OSError:
            pass

    return result


def path_mapper(search, replace, subject):
    """
    Substitute strings in a path

    :param search: Search text
    :type search: AnyStr
    :param replace: Replacement text
    :type replace: AnyStr
    :param subject: Path text to search
    :type subject: AnyStr
    :return: Subject with or without substitution, True if a change was made otherwise False
    :rtype: Tuple[AnyStr, bool]
    """
    delim = '/!~!/'
    search = re.sub(r'\\', delim, search)
    replace = re.sub(r'\\', delim, replace)
    path = re.sub(r'\\', delim, subject)
    result = re.sub('(?i)^%s' % search, replace, path)
    result = os.path.normpath(re.sub(delim, '/', result))

    return result, result != subject


def get_overview(ep_status, show_quality, upgrade_once, split_snatch=False):
    # type: (integer_types, integer_types, Union[integer_types, bool], bool) -> integer_types
    """

    :param ep_status: episode status
    :param show_quality: show quality
    :param upgrade_once: upgrade once
    :param split_snatch:
    :type split_snatch: bool
    :return: constant from classes Overview
    """
    status, quality = Quality.split_composite_status(ep_status)
    if ARCHIVED == status:
        return Overview.GOOD
    if WANTED == status:
        return Overview.WANTED
    if status in (SKIPPED, IGNORED):
        return Overview.SKIPPED
    if status in (UNAIRED, UNKNOWN):
        return Overview.UNAIRED
    if status in [SUBTITLED] + Quality.SNATCHED_ANY + Quality.DOWNLOADED + Quality.FAILED:

        if FAILED == status:
            return Overview.WANTED
        if not split_snatch and status in SNATCHED_ANY:
            return Overview.SNATCHED

        void, best_qualities = Quality.split_quality(show_quality)
        # if re-downloads aren't wanted then mark it "good" if there is anything
        if not len(best_qualities):
            return Overview.GOOD

        min_best, max_best = min(best_qualities), max(best_qualities)
        if quality >= max_best \
                or (upgrade_once and
                    (quality in best_qualities or (None is not min_best and quality > min_best))):
            return Overview.GOOD
        return (Overview.QUAL, Overview.SNATCHED_QUAL)[status in SNATCHED_ANY]


def generate_show_dir_name(root_dir, show_name):
    # type: (Optional[AnyStr], AnyStr) -> AnyStr
    """
    generate show dir name

    :param root_dir: root dir
    :param show_name: show name
    :return: show dir name
    """
    san_show_name = sanitize_filename(show_name)
    if sickgear.SHOW_DIRS_WITH_DOTS:
        san_show_name = san_show_name.replace(' ', '.')
    if None is root_dir:
        return san_show_name
    return os.path.join(root_dir, san_show_name)


def count_files_dirs(base_dir):
    """

    :param base_dir: path
    :type base_dir: AnyStr
    :return: tuple of count of files, dirs
    :rtype: Tuple[int, int]
    """
    f = d = 0
    try:
        with scandir(base_dir) as s_d:
            try:
                files = s_d
            except OSError as e:
                logger.warning('Unable to count files %s / %s' % (repr(e), ex(e)))
            else:
                for e in files:
                    if e.is_file():
                        f += 1
                    elif e.is_dir():
                        d += 1
    except OSError as e:
        logger.warning('Unable to count files %s / %s' % (repr(e), ex(e)))

    return f, d


def upgrade_new_naming():
    my_db = db.DBConnection()
    sql_result = my_db.select('SELECT indexer AS tv_id, indexer_id AS prod_id FROM tv_shows')
    show_list = {}
    for cur_result in sql_result:
        show_list[int(cur_result['prod_id'])] = int(cur_result['tv_id'])

    if sickgear.FANART_RATINGS:
        from sickgear.tv import TVidProdid
        ne = {}
        for k, v in iteritems(sickgear.FANART_RATINGS):
            nk = show_list.get(try_int(k))
            if nk:
                ne[TVidProdid({nk: int(k)})()] = sickgear.FANART_RATINGS[k]
        sickgear.FANART_RATINGS = ne
        sickgear.CFG.setdefault('GUI', {})['fanart_ratings'] = '%s' % ne
        sickgear.CFG.write()

    image_cache_dir = os.path.join(sickgear.CACHE_DIR, 'images')
    bp_match = re.compile(r'(\d+)\.((?:banner|poster|(?:\d+(?:\.\w*)?\.\w{5,8}\.)?fanart)\.jpg)', flags=re.I)

    def _set_progress(p_msg, c, s):
        ps = None
        if 0 == s:
            ps = 0
        elif 1 == s and 0 == c:
            ps = 100
        elif 1 > c % s:
            ps = c / s
        if None is not ps:
            sickgear.classes.loading_msg.set_msg_progress(p_msg, '{:6.2f}%'.format(ps))

    for d in ['', 'thumbnails']:
        bd = os.path.join(image_cache_dir, d)
        if os.path.isdir(bd):
            fc, dc = count_files_dirs(bd)
            step = fc / float(100)
            cf = 0
            p_text = 'Upgrading %s' % (d, 'banner/poster')[not d]
            _set_progress(p_text, 0, 0)
            with scandir(bd) as s_d:
                for entry in scandir(bd):
                    if entry.is_file():
                        cf += 1
                        _set_progress(p_text, cf, step)
                        b_s = bp_match.search(entry.name)
                        if b_s:
                            old_id = int(b_s.group(1))
                            tvid = show_list.get(old_id)
                            if tvid:
                                nb_dir = os.path.join(sickgear.CACHE_DIR, 'images', 'shows', '%s-%s' % (tvid, old_id), d)
                                if not os.path.isdir(nb_dir):
                                    try:
                                        os.makedirs(nb_dir)
                                    except (BaseException, Exception):
                                        pass
                                new_name = os.path.join(nb_dir, bp_match.sub(r'\2', entry.name))
                                try:
                                    move_file(entry.path, new_name)
                                except (BaseException, Exception) as e:
                                    logger.warning('Unable to rename %s to %s: %s / %s'
                                                   % (entry.path, new_name, repr(e), ex(e)))
                            else:
                                # clean up files without reference in db
                                try:
                                    os.remove(entry.path)
                                except (BaseException, Exception):
                                    pass
                    elif entry.is_dir():
                        if entry.name in ['shows', 'browse']:
                            continue
                        elif 'fanart' == entry.name:
                            _set_progress(p_text, 0, 1)
                            fc_fan, dc_fan = count_files_dirs(entry.path)
                            step_fan = dc_fan / float(100)
                            cf_fan = 0
                            p_text = 'Upgrading fanart'
                            _set_progress(p_text, 0, 0)
                            try:
                                with scandir(entry.path) as s_p:
                                    try:
                                        entries = s_p
                                    except OSError as e:
                                        logger.warning('Unable to stat dirs %s / %s' % (repr(e), ex(e)))
                                        continue
                                    for d_entry in entries:
                                        if d_entry.is_dir():
                                            cf_fan += 1
                                            _set_progress(p_text, cf_fan, step_fan)
                                            old_id = try_int(d_entry.name)
                                            if old_id:
                                                new_id = show_list.get(old_id)
                                                if new_id:
                                                    new_dir_name = os.path.join(sickgear.CACHE_DIR, 'images', 'shows',
                                                                                '%s-%s' % (new_id, old_id), 'fanart')
                                                    try:
                                                        move_file(d_entry.path, new_dir_name)
                                                    except (BaseException, Exception) as e:
                                                        logger.warning(f'Unable to rename {d_entry.path}'
                                                                       f' to {new_dir_name}: {repr(e)} / {ex(e)}')
                                                    if os.path.isdir(new_dir_name):
                                                        try:
                                                            with scandir(new_dir_name) as s_d_n:
                                                                try:
                                                                    f_n = filter(lambda fn: fn.is_file(), s_d_n)
                                                                except OSError as e:
                                                                    logger.warning(
                                                                        f'Unable to rename {repr(e)} / {ex(d)}')
                                                                else:
                                                                    rename_args = []
                                                                    # noinspection PyTypeChecker
                                                                    for f_entry in f_n:
                                                                        rename_args += [
                                                                            (f_entry.path,
                                                                             bp_match.sub(r'\2', f_entry.path))]

                                                                    for args in rename_args:
                                                                        try:
                                                                            move_file(*args)
                                                                        except (BaseException, Exception) as e:
                                                                            logger.warning(
                                                                                f'Unable to rename {args[0]}'
                                                                                f' to {args[1]}: {repr(e)} / {ex(e)}')
                                                        except OSError as e:
                                                            logger.warning(
                                                                'Unable to rename %s / %s' % (repr(e), ex(e)))
                                                else:
                                                    try:
                                                        shutil.rmtree(d_entry.path)
                                                    except (BaseException, Exception):
                                                        pass
                                            try:
                                                shutil.rmtree(d_entry.path)
                                            except (BaseException, Exception):
                                                pass
                            except OSError as e:
                                logger.warning('Unable to stat dirs %s / %s' % (repr(e), ex(e)))
                                continue
                        try:
                            os.rmdir(entry.path)
                        except (BaseException, Exception):
                            pass
            if 'thumbnails' == d:
                try:
                    os.rmdir(bd)
                except (BaseException, Exception):
                    pass
            _set_progress(p_text, 0, 1)


def xhtml_escape(text, br=True):
    """
    Escapes a string, so it is valid within HTML or XML using the function from Tornado.

    :param text: Text to convert entities from for example '"' to '&quot;'
    :type text: AnyStr
    :param br: True, replace newline with html `<br>`
    :type br: bool
    :return: Text with entities replaced
    :rtype: AnyStr
    """
    if not text:
        return text
    from tornado import escape
    if br:
        text = re.sub(r'\r?\n', '<br>', text)

    return escape.xhtml_escape(normalise_chars(text))


def normalise_chars(text):
    # noinspection GrazieInspection
    """
    Normalise characters to maintain a consistent output from different sources, and to prevent issues when sorting

    e.g.
    curved apostrophe ’ with standard ' apostrophe
    wide dash with standard hyphen

    :param text: Text to convert entities from for example '"' to '&quot;'
    :type text: AnyStr
    :return: Text with entities replaced
    :rtype: AnyStr
    """
    result = text.replace('\u2010', '-').replace('\u2011', '-').replace('\u2012', '-') \
        .replace('\u2013', '-').replace('\u2014', '-').replace('\u2015', '-') \
        .replace('\u2018', "'").replace('\u2019', "'") \
        .replace('\u201c', '\"').replace('\u201d', '\"') \
        .replace('\u0020', ' ').replace('\u00a0', ' ')

    return result


def parse_imdb_id(string):
    # type: (AnyStr) -> Optional[AnyStr]
    """ Parse an IMDB ID from a string

    :param string: string to parse
    :return: parsed ID of the form char, char, number of digits or None if no match
    """
    result = None
    try:
        result = RC_IMDB_ID.findall(string)[0]
    except(BaseException, Exception):
        pass

    return result


def generate_word_str(words, regex=False, join_chr=','):
    # type: (Set[AnyStr], bool, AnyStr) -> AnyStr
    """
    combine a list or set to a string with optional prefix 'regex:'

    :param words: list or set of words
    :type words: set
    :param regex: prefix regex: ?
    :type regex: bool
    :param join_chr: character(s) used for join words
    :type join_chr: basestring
    :return: combined string
    :rtype: basestring
    """
    return '%s%s' % (('', 'regex:')[True is regex], join_chr.join(words))


def split_word_str(word_list):
    # type: (AnyStr) -> Tuple[Set[AnyStr], bool]
    """
    split string into set and boolean regex

    :param word_list: string with words
    :type word_list: basestring
    :return: set of words, is it regex
    :rtype: (set, bool)
    """
    try:
        if word_list.startswith('regex:'):
            rx = True
            word_list = word_list.replace('regex:', '')
        else:
            rx = False
        s = set(w.strip() for w in word_list.split(',') if w.strip())
    except (BaseException, Exception):
        rx = False
        s = set()
    return s, rx