From ab8d9e64050c0bfcf9d2fbaa5999d58ef157b797 Mon Sep 17 00:00:00 2001 From: echel0n Date: Mon, 19 May 2014 18:04:23 -0700 Subject: [PATCH] Fixed issues with queues. Tweaked code to get more performance from it. --- sickbeard/db.py | 7 ++-- sickbeard/generic_queue.py | 57 +++++++++++++++++++++++++++------ sickbeard/name_parser/parser.py | 2 -- sickbeard/search_queue.py | 22 ++++++------- sickbeard/show_queue.py | 20 ++---------- sickbeard/tv.py | 4 ++- sickbeard/webserve.py | 8 ++--- 7 files changed, 70 insertions(+), 50 deletions(-) diff --git a/sickbeard/db.py b/sickbeard/db.py index cbe67063..980b2381 100644 --- a/sickbeard/db.py +++ b/sickbeard/db.py @@ -29,6 +29,7 @@ import sickbeard from sickbeard import encodingKludge as ek from sickbeard import logger from sickbeard.exceptions import ex +from sickbeard.common import cpu_presets db_lock = threading.Lock() @@ -99,7 +100,7 @@ class DBConnection: if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]: logger.log(u"DB error: " + ex(e), logger.WARNING) attempt += 1 - time.sleep(1) + time.sleep(cpu_presets[sickbeard.CPU_PRESET]) else: logger.log(u"DB error: " + ex(e), logger.ERROR) raise @@ -140,7 +141,7 @@ class DBConnection: if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]: logger.log(u"DB error: " + ex(e), logger.WARNING) attempt += 1 - time.sleep(1) + time.sleep(cpu_presets[sickbeard.CPU_PRESET]) else: logger.log(u"DB error: " + ex(e), logger.ERROR) raise @@ -178,7 +179,7 @@ class DBConnection: if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]: logger.log(u"DB error: " + ex(e), logger.WARNING) attempt += 1 - time.sleep(1) + time.sleep(cpu_presets[sickbeard.CPU_PRESET]) else: logger.log(u"DB error: " + ex(e), logger.ERROR) raise diff --git a/sickbeard/generic_queue.py b/sickbeard/generic_queue.py index 9bf96dd5..5e2d4bc7 100644 --- a/sickbeard/generic_queue.py +++ b/sickbeard/generic_queue.py @@ -18,7 +18,6 @@ import datetime import threading -import Queue from sickbeard import logger @@ -27,25 +26,33 @@ class QueuePriorities: NORMAL = 20 HIGH = 30 -class GenericQueue: +class GenericQueue(object): + def __init__(self): + self.currentItem = None + self.queue = [] + self.thread = None + self.queue_name = "QUEUE" + self.min_priority = 0 - self.queue = Queue.Queue() + + self.currentItem = None def pause(self): logger.log(u"Pausing queue") self.min_priority = 999999999999 - + def unpause(self): logger.log(u"Unpausing queue") self.min_priority = 0 def add_item(self, item): item.added = datetime.datetime.now() - self.queue.put(item) + self.queue.append(item) + return item def run(self, force=False): @@ -58,31 +65,61 @@ class GenericQueue: self.currentItem.finish() self.currentItem = None - if not self.queue.empty(): - queueItem = self.queue.get() + # if there's something in the queue then run it in a thread and take it out of the queue + if len(self.queue) > 0: + + # sort by priority + def sorter(x,y): + """ + Sorts by priority descending then time ascending + """ + if x.priority == y.priority: + if y.added == x.added: + return 0 + elif y.added < x.added: + return 1 + elif y.added > x.added: + return -1 + else: + return y.priority-x.priority + + self.queue.sort(cmp=sorter) + + queueItem = self.queue[0] + if queueItem.priority < self.min_priority: return + # launch the queue item in a thread + # TODO: improve thread name threadName = self.queue_name + '-' + queueItem.get_thread_name() self.thread = threading.Thread(None, queueItem.execute, threadName) self.thread.start() self.currentItem = queueItem + # take it out of the queue + del self.queue[0] + class QueueItem: - def __init__(self, name, action_id=0): + def __init__(self, name, action_id = 0): self.name = name + self.inProgress = False + self.priority = QueuePriorities.NORMAL + self.thread_name = None + self.action_id = action_id + self.added = None def get_thread_name(self): if self.thread_name: return self.thread_name else: - return self.name.replace(" ", "-").upper() + return self.name.replace(" ","-").upper() def execute(self): """Implementing classes should call this""" @@ -92,4 +129,4 @@ class QueueItem: def finish(self): """Implementing Classes should call this""" - self.inProgress = False + self.inProgress = False \ No newline at end of file diff --git a/sickbeard/name_parser/parser.py b/sickbeard/name_parser/parser.py index 8a8fd7b9..47e42eae 100644 --- a/sickbeard/name_parser/parser.py +++ b/sickbeard/name_parser/parser.py @@ -99,8 +99,6 @@ class NameParser(object): for (cur_regex_name, cur_regex) in self.compiled_regexes: - time.sleep(cpu_presets[sickbeard.CPU_PRESET]) - match = cur_regex.match(name) if not match: diff --git a/sickbeard/search_queue.py b/sickbeard/search_queue.py index 398e0ec0..76270e40 100644 --- a/sickbeard/search_queue.py +++ b/sickbeard/search_queue.py @@ -43,11 +43,9 @@ class SearchQueue(generic_queue.GenericQueue): self.queue_name = "SEARCHQUEUE" def is_in_queue(self, show, segment): - queue = [x for x in self.queue.queue] + [self.currentItem] - for cur_item in queue: - if cur_item: - if cur_item.show == show and cur_item.segment == segment: - return True + for cur_item in self.queue: + if isinstance(cur_item, BacklogQueueItem) and cur_item.show == show and cur_item.segment == segment: + return True return False def pause_backlog(self): @@ -61,15 +59,13 @@ class SearchQueue(generic_queue.GenericQueue): return self.min_priority >= generic_queue.QueuePriorities.NORMAL def is_backlog_in_progress(self): - queue = [x for x in self.queue.queue] + [self.currentItem] - for cur_item in queue: + for cur_item in self.queue + [self.currentItem]: if isinstance(cur_item, BacklogQueueItem): return True return False def is_dailysearch_in_progress(self): - queue = [x for x in self.queue.queue] + [self.currentItem] - for cur_item in queue: + for cur_item in self.queue + [self.currentItem]: if isinstance(cur_item, DailySearchQueueItem): return True return False @@ -113,7 +109,7 @@ class DailySearchQueueItem(generic_queue.QueueItem): search.snatchEpisode(result) # give the CPU a break - time.sleep(2) + time.sleep(common.cpu_presets[sickbeard.CPU_PRESET]) generic_queue.QueueItem.finish(self) @@ -142,7 +138,7 @@ class ManualSearchQueueItem(generic_queue.QueueItem): self.success = search.snatchEpisode(searchResult[0]) # give the CPU a break - time.sleep(2) + time.sleep(common.cpu_presets[sickbeard.CPU_PRESET]) else: ui.notifications.message('No downloads were found', @@ -190,7 +186,7 @@ class BacklogQueueItem(generic_queue.QueueItem): search.snatchEpisode(result) # give the CPU a break - time.sleep(2) + time.sleep(common.cpu_presets[sickbeard.CPU_PRESET]) else: logger.log(u"No needed episodes found during backlog search for [" + self.show.name + "]") @@ -237,7 +233,7 @@ class FailedQueueItem(generic_queue.QueueItem): search.snatchEpisode(result) # give the CPU a break - time.sleep(2) + time.sleep(common.cpu_presets[sickbeard.CPU_PRESET]) else: logger.log(u"No valid episode found to retry for [" + epObj.prettyName() + "]") diff --git a/sickbeard/show_queue.py b/sickbeard/show_queue.py index 6be14335..8837255a 100644 --- a/sickbeard/show_queue.py +++ b/sickbeard/show_queue.py @@ -20,7 +20,6 @@ from __future__ import with_statement import traceback import threading -import Queue import sickbeard @@ -32,20 +31,13 @@ from sickbeard import generic_queue from sickbeard import name_cache from sickbeard.exceptions import ex -show_queue_lock = threading.Lock() - class ShowQueue(generic_queue.GenericQueue): def __init__(self): - generic_queue.GenericQueue.__init__(self) self.queue_name = "SHOWQUEUE" def _isInQueue(self, show, actions): - shows = [x.show for x in self.queue.queue if x.action_id in actions] if not self.queue.empty() else [] - if self.currentItem != None and self.currentItem.action_id in actions: - shows.append(self.currentItem) - - return show in shows + return show in [x.show for x in self.queue if x.action_id in actions] def _isBeingSomethinged(self, show, actions): return self.currentItem != None and show == self.currentItem.show and \ @@ -79,11 +71,7 @@ class ShowQueue(generic_queue.GenericQueue): return self._isBeingSomethinged(show, (ShowQueueActions.SUBTITLE,)) def _getLoadingShowList(self): - shows = [x for x in self.queue.queue if x != None and x.isLoading] if not self.queue.empty() else [] - if self.currentItem != None and self.currentItem.isLoading: - shows.append(self.currentItem) - return shows - + return [x for x in self.queue + [self.currentItem] if x != None and x.isLoading] loadingShowList = property(_getLoadingShowList) @@ -187,9 +175,7 @@ class ShowQueueItem(generic_queue.QueueItem): self.show = show def isInQueue(self): - queue = [x for x in sickbeard.showQueueScheduler.action.queue.queue] - queue.append(sickbeard.showQueueScheduler.action.currentItem) - return self in queue + return self in sickbeard.showQueueScheduler.action.queue + [sickbeard.showQueueScheduler.action.currentItem] #@UndefinedVariable def _getName(self): return str(self.show.indexerid) diff --git a/sickbeard/tv.py b/sickbeard/tv.py index d74872f7..a874a0a4 100644 --- a/sickbeard/tv.py +++ b/sickbeard/tv.py @@ -862,6 +862,7 @@ class TVShow(object): logger.log(str(self.indexerid) + u": Obtained info from IMDb ->" + str(self.imdb_info), logger.DEBUG) def nextEpisode(self): + logger.log(str(self.indexerid) + ": Finding the episode which airs next", logger.DEBUG) myDB = db.DBConnection() @@ -872,7 +873,8 @@ class TVShow(object): sqlResults = myDB.select(query, params) if sqlResults == None or len(sqlResults) == 0: - logger.log(str(self.indexerid) + u": No episode found... need to implement show status", logger.DEBUG) + logger.log(str(self.indexerid) + u": No episode found... need to implement a show status", + logger.DEBUG) return [] else: logger.log(str(self.indexerid) + u": Found episode " + str(sqlResults[0]["season"]) + "x" + str( diff --git a/sickbeard/webserve.py b/sickbeard/webserve.py index 366bf6ef..01690007 100644 --- a/sickbeard/webserve.py +++ b/sickbeard/webserve.py @@ -54,7 +54,7 @@ from sickbeard import failedProcessor from sickbeard import network_timezones from sickbeard.providers import newznab, rsstorrent -from sickbeard.common import Quality, Overview, statusStrings, qualityPresetStrings +from sickbeard.common import Quality, Overview, statusStrings, qualityPresetStrings, cpu_presets from sickbeard.common import SNATCHED, SKIPPED, UNAIRED, IGNORED, ARCHIVED, WANTED, FAILED from sickbeard.common import SD, HD720p, HD1080p from sickbeard.exceptions import ex @@ -3114,7 +3114,7 @@ class Home: ui.notifications.error("Unable to refresh this show.", ex(e)) - time.sleep(3) + time.sleep(cpu_presets[sickbeard.CPU_PRESET]) redirect("/home/displayShow?show=" + str(showObj.indexerid)) @@ -3137,7 +3137,7 @@ class Home: ex(e)) # just give it some time - time.sleep(3) + time.sleep(cpu_presets[sickbeard.CPU_PRESET]) redirect("/home/displayShow?show=" + str(showObj.indexerid)) @@ -3155,7 +3155,7 @@ class Home: # search and download subtitles sickbeard.showQueueScheduler.action.downloadSubtitles(showObj, bool(force)) # @UndefinedVariable - time.sleep(3) + time.sleep(cpu_presets[sickbeard.CPU_PRESET]) redirect("/home/displayShow?show=" + str(showObj.indexerid))