Fixed issues with queues.

Tweaked code to get more performance from it.
This commit is contained in:
echel0n 2014-05-19 18:04:23 -07:00
parent a4c790eedc
commit ab8d9e6405
7 changed files with 70 additions and 50 deletions

View file

@ -29,6 +29,7 @@ import sickbeard
from sickbeard import encodingKludge as ek from sickbeard import encodingKludge as ek
from sickbeard import logger from sickbeard import logger
from sickbeard.exceptions import ex from sickbeard.exceptions import ex
from sickbeard.common import cpu_presets
db_lock = threading.Lock() db_lock = threading.Lock()
@ -99,7 +100,7 @@ class DBConnection:
if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]: if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]:
logger.log(u"DB error: " + ex(e), logger.WARNING) logger.log(u"DB error: " + ex(e), logger.WARNING)
attempt += 1 attempt += 1
time.sleep(1) time.sleep(cpu_presets[sickbeard.CPU_PRESET])
else: else:
logger.log(u"DB error: " + ex(e), logger.ERROR) logger.log(u"DB error: " + ex(e), logger.ERROR)
raise raise
@ -140,7 +141,7 @@ class DBConnection:
if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]: if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]:
logger.log(u"DB error: " + ex(e), logger.WARNING) logger.log(u"DB error: " + ex(e), logger.WARNING)
attempt += 1 attempt += 1
time.sleep(1) time.sleep(cpu_presets[sickbeard.CPU_PRESET])
else: else:
logger.log(u"DB error: " + ex(e), logger.ERROR) logger.log(u"DB error: " + ex(e), logger.ERROR)
raise raise
@ -178,7 +179,7 @@ class DBConnection:
if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]: if "unable to open database file" in e.args[0] or "database is locked" in e.args[0]:
logger.log(u"DB error: " + ex(e), logger.WARNING) logger.log(u"DB error: " + ex(e), logger.WARNING)
attempt += 1 attempt += 1
time.sleep(1) time.sleep(cpu_presets[sickbeard.CPU_PRESET])
else: else:
logger.log(u"DB error: " + ex(e), logger.ERROR) logger.log(u"DB error: " + ex(e), logger.ERROR)
raise raise

View file

@ -18,7 +18,6 @@
import datetime import datetime
import threading import threading
import Queue
from sickbeard import logger from sickbeard import logger
@ -27,13 +26,20 @@ class QueuePriorities:
NORMAL = 20 NORMAL = 20
HIGH = 30 HIGH = 30
class GenericQueue: class GenericQueue(object):
def __init__(self): def __init__(self):
self.currentItem = None self.currentItem = None
self.queue = []
self.thread = None self.thread = None
self.queue_name = "QUEUE" self.queue_name = "QUEUE"
self.min_priority = 0 self.min_priority = 0
self.queue = Queue.Queue()
self.currentItem = None
def pause(self): def pause(self):
logger.log(u"Pausing queue") logger.log(u"Pausing queue")
@ -45,7 +51,8 @@ class GenericQueue:
def add_item(self, item): def add_item(self, item):
item.added = datetime.datetime.now() item.added = datetime.datetime.now()
self.queue.put(item) self.queue.append(item)
return item return item
def run(self, force=False): def run(self, force=False):
@ -58,31 +65,61 @@ class GenericQueue:
self.currentItem.finish() self.currentItem.finish()
self.currentItem = None self.currentItem = None
if not self.queue.empty(): # if there's something in the queue then run it in a thread and take it out of the queue
queueItem = self.queue.get() if len(self.queue) > 0:
# sort by priority
def sorter(x,y):
"""
Sorts by priority descending then time ascending
"""
if x.priority == y.priority:
if y.added == x.added:
return 0
elif y.added < x.added:
return 1
elif y.added > x.added:
return -1
else:
return y.priority-x.priority
self.queue.sort(cmp=sorter)
queueItem = self.queue[0]
if queueItem.priority < self.min_priority: if queueItem.priority < self.min_priority:
return return
# launch the queue item in a thread
# TODO: improve thread name
threadName = self.queue_name + '-' + queueItem.get_thread_name() threadName = self.queue_name + '-' + queueItem.get_thread_name()
self.thread = threading.Thread(None, queueItem.execute, threadName) self.thread = threading.Thread(None, queueItem.execute, threadName)
self.thread.start() self.thread.start()
self.currentItem = queueItem self.currentItem = queueItem
# take it out of the queue
del self.queue[0]
class QueueItem: class QueueItem:
def __init__(self, name, action_id=0): def __init__(self, name, action_id = 0):
self.name = name self.name = name
self.inProgress = False self.inProgress = False
self.priority = QueuePriorities.NORMAL self.priority = QueuePriorities.NORMAL
self.thread_name = None self.thread_name = None
self.action_id = action_id self.action_id = action_id
self.added = None self.added = None
def get_thread_name(self): def get_thread_name(self):
if self.thread_name: if self.thread_name:
return self.thread_name return self.thread_name
else: else:
return self.name.replace(" ", "-").upper() return self.name.replace(" ","-").upper()
def execute(self): def execute(self):
"""Implementing classes should call this""" """Implementing classes should call this"""

View file

@ -99,8 +99,6 @@ class NameParser(object):
for (cur_regex_name, cur_regex) in self.compiled_regexes: for (cur_regex_name, cur_regex) in self.compiled_regexes:
time.sleep(cpu_presets[sickbeard.CPU_PRESET])
match = cur_regex.match(name) match = cur_regex.match(name)
if not match: if not match:

View file

@ -43,10 +43,8 @@ class SearchQueue(generic_queue.GenericQueue):
self.queue_name = "SEARCHQUEUE" self.queue_name = "SEARCHQUEUE"
def is_in_queue(self, show, segment): def is_in_queue(self, show, segment):
queue = [x for x in self.queue.queue] + [self.currentItem] for cur_item in self.queue:
for cur_item in queue: if isinstance(cur_item, BacklogQueueItem) and cur_item.show == show and cur_item.segment == segment:
if cur_item:
if cur_item.show == show and cur_item.segment == segment:
return True return True
return False return False
@ -61,15 +59,13 @@ class SearchQueue(generic_queue.GenericQueue):
return self.min_priority >= generic_queue.QueuePriorities.NORMAL return self.min_priority >= generic_queue.QueuePriorities.NORMAL
def is_backlog_in_progress(self): def is_backlog_in_progress(self):
queue = [x for x in self.queue.queue] + [self.currentItem] for cur_item in self.queue + [self.currentItem]:
for cur_item in queue:
if isinstance(cur_item, BacklogQueueItem): if isinstance(cur_item, BacklogQueueItem):
return True return True
return False return False
def is_dailysearch_in_progress(self): def is_dailysearch_in_progress(self):
queue = [x for x in self.queue.queue] + [self.currentItem] for cur_item in self.queue + [self.currentItem]:
for cur_item in queue:
if isinstance(cur_item, DailySearchQueueItem): if isinstance(cur_item, DailySearchQueueItem):
return True return True
return False return False
@ -113,7 +109,7 @@ class DailySearchQueueItem(generic_queue.QueueItem):
search.snatchEpisode(result) search.snatchEpisode(result)
# give the CPU a break # give the CPU a break
time.sleep(2) time.sleep(common.cpu_presets[sickbeard.CPU_PRESET])
generic_queue.QueueItem.finish(self) generic_queue.QueueItem.finish(self)
@ -142,7 +138,7 @@ class ManualSearchQueueItem(generic_queue.QueueItem):
self.success = search.snatchEpisode(searchResult[0]) self.success = search.snatchEpisode(searchResult[0])
# give the CPU a break # give the CPU a break
time.sleep(2) time.sleep(common.cpu_presets[sickbeard.CPU_PRESET])
else: else:
ui.notifications.message('No downloads were found', ui.notifications.message('No downloads were found',
@ -190,7 +186,7 @@ class BacklogQueueItem(generic_queue.QueueItem):
search.snatchEpisode(result) search.snatchEpisode(result)
# give the CPU a break # give the CPU a break
time.sleep(2) time.sleep(common.cpu_presets[sickbeard.CPU_PRESET])
else: else:
logger.log(u"No needed episodes found during backlog search for [" + self.show.name + "]") logger.log(u"No needed episodes found during backlog search for [" + self.show.name + "]")
@ -237,7 +233,7 @@ class FailedQueueItem(generic_queue.QueueItem):
search.snatchEpisode(result) search.snatchEpisode(result)
# give the CPU a break # give the CPU a break
time.sleep(2) time.sleep(common.cpu_presets[sickbeard.CPU_PRESET])
else: else:
logger.log(u"No valid episode found to retry for [" + epObj.prettyName() + "]") logger.log(u"No valid episode found to retry for [" + epObj.prettyName() + "]")

View file

@ -20,7 +20,6 @@ from __future__ import with_statement
import traceback import traceback
import threading import threading
import Queue
import sickbeard import sickbeard
@ -32,20 +31,13 @@ from sickbeard import generic_queue
from sickbeard import name_cache from sickbeard import name_cache
from sickbeard.exceptions import ex from sickbeard.exceptions import ex
show_queue_lock = threading.Lock()
class ShowQueue(generic_queue.GenericQueue): class ShowQueue(generic_queue.GenericQueue):
def __init__(self): def __init__(self):
generic_queue.GenericQueue.__init__(self) generic_queue.GenericQueue.__init__(self)
self.queue_name = "SHOWQUEUE" self.queue_name = "SHOWQUEUE"
def _isInQueue(self, show, actions): def _isInQueue(self, show, actions):
shows = [x.show for x in self.queue.queue if x.action_id in actions] if not self.queue.empty() else [] return show in [x.show for x in self.queue if x.action_id in actions]
if self.currentItem != None and self.currentItem.action_id in actions:
shows.append(self.currentItem)
return show in shows
def _isBeingSomethinged(self, show, actions): def _isBeingSomethinged(self, show, actions):
return self.currentItem != None and show == self.currentItem.show and \ return self.currentItem != None and show == self.currentItem.show and \
@ -79,11 +71,7 @@ class ShowQueue(generic_queue.GenericQueue):
return self._isBeingSomethinged(show, (ShowQueueActions.SUBTITLE,)) return self._isBeingSomethinged(show, (ShowQueueActions.SUBTITLE,))
def _getLoadingShowList(self): def _getLoadingShowList(self):
shows = [x for x in self.queue.queue if x != None and x.isLoading] if not self.queue.empty() else [] return [x for x in self.queue + [self.currentItem] if x != None and x.isLoading]
if self.currentItem != None and self.currentItem.isLoading:
shows.append(self.currentItem)
return shows
loadingShowList = property(_getLoadingShowList) loadingShowList = property(_getLoadingShowList)
@ -187,9 +175,7 @@ class ShowQueueItem(generic_queue.QueueItem):
self.show = show self.show = show
def isInQueue(self): def isInQueue(self):
queue = [x for x in sickbeard.showQueueScheduler.action.queue.queue] return self in sickbeard.showQueueScheduler.action.queue + [sickbeard.showQueueScheduler.action.currentItem] #@UndefinedVariable
queue.append(sickbeard.showQueueScheduler.action.currentItem)
return self in queue
def _getName(self): def _getName(self):
return str(self.show.indexerid) return str(self.show.indexerid)

View file

@ -862,6 +862,7 @@ class TVShow(object):
logger.log(str(self.indexerid) + u": Obtained info from IMDb ->" + str(self.imdb_info), logger.DEBUG) logger.log(str(self.indexerid) + u": Obtained info from IMDb ->" + str(self.imdb_info), logger.DEBUG)
def nextEpisode(self): def nextEpisode(self):
logger.log(str(self.indexerid) + ": Finding the episode which airs next", logger.DEBUG) logger.log(str(self.indexerid) + ": Finding the episode which airs next", logger.DEBUG)
myDB = db.DBConnection() myDB = db.DBConnection()
@ -872,7 +873,8 @@ class TVShow(object):
sqlResults = myDB.select(query, params) sqlResults = myDB.select(query, params)
if sqlResults == None or len(sqlResults) == 0: if sqlResults == None or len(sqlResults) == 0:
logger.log(str(self.indexerid) + u": No episode found... need to implement show status", logger.DEBUG) logger.log(str(self.indexerid) + u": No episode found... need to implement a show status",
logger.DEBUG)
return [] return []
else: else:
logger.log(str(self.indexerid) + u": Found episode " + str(sqlResults[0]["season"]) + "x" + str( logger.log(str(self.indexerid) + u": Found episode " + str(sqlResults[0]["season"]) + "x" + str(

View file

@ -54,7 +54,7 @@ from sickbeard import failedProcessor
from sickbeard import network_timezones from sickbeard import network_timezones
from sickbeard.providers import newznab, rsstorrent from sickbeard.providers import newznab, rsstorrent
from sickbeard.common import Quality, Overview, statusStrings, qualityPresetStrings from sickbeard.common import Quality, Overview, statusStrings, qualityPresetStrings, cpu_presets
from sickbeard.common import SNATCHED, SKIPPED, UNAIRED, IGNORED, ARCHIVED, WANTED, FAILED from sickbeard.common import SNATCHED, SKIPPED, UNAIRED, IGNORED, ARCHIVED, WANTED, FAILED
from sickbeard.common import SD, HD720p, HD1080p from sickbeard.common import SD, HD720p, HD1080p
from sickbeard.exceptions import ex from sickbeard.exceptions import ex
@ -3114,7 +3114,7 @@ class Home:
ui.notifications.error("Unable to refresh this show.", ui.notifications.error("Unable to refresh this show.",
ex(e)) ex(e))
time.sleep(3) time.sleep(cpu_presets[sickbeard.CPU_PRESET])
redirect("/home/displayShow?show=" + str(showObj.indexerid)) redirect("/home/displayShow?show=" + str(showObj.indexerid))
@ -3137,7 +3137,7 @@ class Home:
ex(e)) ex(e))
# just give it some time # just give it some time
time.sleep(3) time.sleep(cpu_presets[sickbeard.CPU_PRESET])
redirect("/home/displayShow?show=" + str(showObj.indexerid)) redirect("/home/displayShow?show=" + str(showObj.indexerid))
@ -3155,7 +3155,7 @@ class Home:
# search and download subtitles # search and download subtitles
sickbeard.showQueueScheduler.action.downloadSubtitles(showObj, bool(force)) # @UndefinedVariable sickbeard.showQueueScheduler.action.downloadSubtitles(showObj, bool(force)) # @UndefinedVariable
time.sleep(3) time.sleep(cpu_presets[sickbeard.CPU_PRESET])
redirect("/home/displayShow?show=" + str(showObj.indexerid)) redirect("/home/displayShow?show=" + str(showObj.indexerid))