""" parser.sql package (imdb package). This package provides the IMDbSqlAccessSystem class used to access IMDb's data through a SQL database. Every database supported by the SQLObject _AND_ SQLAlchemy Object Relational Managers is available. the imdb.IMDb function will return an instance of this class when called with the 'accessSystem' argument set to "sql", "database" or "db". Copyright 2005-2012 Davide Alberani This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """ # FIXME: this whole module was written in a veeery short amount of time. # The code should be commented, rewritten and cleaned. :-) import re import logging from difflib import SequenceMatcher from codecs import lookup from imdb import IMDbBase from imdb.utils import normalizeName, normalizeTitle, build_title, \ build_name, analyze_name, analyze_title, \ canonicalTitle, canonicalName, re_titleRef, \ build_company_name, re_episodes, _unicodeArticles, \ analyze_company_name, re_year_index, re_nameRef from imdb.Person import Person from imdb.Movie import Movie from imdb.Company import Company from imdb._exceptions import IMDbDataAccessError, IMDbError # Logger for miscellaneous functions. _aux_logger = logging.getLogger('imdbpy.parser.sql.aux') # ============================= # Things that once upon a time were in imdb.parser.common.locsql. def titleVariations(title, fromPtdf=0): """Build title variations useful for searches; if fromPtdf is true, the input is assumed to be in the plain text data files format.""" if fromPtdf: title1 = u'' else: title1 = title title2 = title3 = u'' if fromPtdf or re_year_index.search(title): # If it appears to have a (year[/imdbIndex]) indication, # assume that a long imdb canonical name was provided. titldict = analyze_title(title, canonical=1) # title1: the canonical name. title1 = titldict['title'] if titldict['kind'] != 'episode': # title3: the long imdb canonical name. if fromPtdf: title3 = title else: title3 = build_title(titldict, canonical=1, ptdf=1) else: title1 = normalizeTitle(title1) title3 = build_title(titldict, canonical=1, ptdf=1) else: # Just a title. # title1: the canonical title. title1 = canonicalTitle(title) title3 = u'' # title2 is title1 without the article, or title1 unchanged. if title1: title2 = title1 t2s = title2.split(u', ') if t2s[-1].lower() in _unicodeArticles: title2 = u', '.join(t2s[:-1]) _aux_logger.debug('title variations: 1:[%s] 2:[%s] 3:[%s]', title1, title2, title3) return title1, title2, title3 re_nameIndex = re.compile(r'\(([IVXLCDM]+)\)') def nameVariations(name, fromPtdf=0): """Build name variations useful for searches; if fromPtdf is true, the input is assumed to be in the plain text data files format.""" name1 = name2 = name3 = u'' if fromPtdf or re_nameIndex.search(name): # We've a name with an (imdbIndex) namedict = analyze_name(name, canonical=1) # name1 is the name in the canonical format. name1 = namedict['name'] # name3 is the canonical name with the imdbIndex. if fromPtdf: if namedict.has_key('imdbIndex'): name3 = name else: name3 = build_name(namedict, canonical=1) else: # name1 is the name in the canonical format. name1 = canonicalName(name) name3 = u'' # name2 is the name in the normal format, if it differs from name1. name2 = normalizeName(name1) if name1 == name2: name2 = u'' _aux_logger.debug('name variations: 1:[%s] 2:[%s] 3:[%s]', name1, name2, name3) return name1, name2, name3 try: from cutils import ratcliff as _ratcliff def ratcliff(s1, s2, sm): """Return the Ratcliff-Obershelp value between the two strings, using the C implementation.""" return _ratcliff(s1.encode('latin_1', 'replace'), s2.encode('latin_1', 'replace')) except ImportError: _aux_logger.warn('Unable to import the cutils.ratcliff function.' ' Searching names and titles using the "sql"' ' data access system will be slower.') def ratcliff(s1, s2, sm): """Ratcliff-Obershelp similarity.""" STRING_MAXLENDIFFER = 0.7 s1len = len(s1) s2len = len(s2) if s1len < s2len: threshold = float(s1len) / s2len else: threshold = float(s2len) / s1len if threshold < STRING_MAXLENDIFFER: return 0.0 sm.set_seq2(s2.lower()) return sm.ratio() def merge_roles(mop): """Merge multiple roles.""" new_list = [] for m in mop: if m in new_list: keep_this = new_list[new_list.index(m)] if not isinstance(keep_this.currentRole, list): keep_this.currentRole = [keep_this.currentRole] keep_this.currentRole.append(m.currentRole) else: new_list.append(m) return new_list def scan_names(name_list, name1, name2, name3, results=0, ro_thresold=None, _scan_character=False): """Scan a list of names, searching for best matches against the given variations.""" if ro_thresold is not None: RO_THRESHOLD = ro_thresold else: RO_THRESHOLD = 0.6 sm1 = SequenceMatcher() sm2 = SequenceMatcher() sm3 = SequenceMatcher() sm1.set_seq1(name1.lower()) if name2: sm2.set_seq1(name2.lower()) if name3: sm3.set_seq1(name3.lower()) resd = {} for i, n_data in name_list: nil = n_data['name'] # XXX: on Symbian, here we get a str; not sure this is the # right place to fix it. if isinstance(nil, str): nil = unicode(nil, 'latin1', 'ignore') # Distance with the canonical name. ratios = [ratcliff(name1, nil, sm1) + 0.05] namesurname = u'' if not _scan_character: nils = nil.split(', ', 1) surname = nils[0] if len(nils) == 2: namesurname = '%s %s' % (nils[1], surname) else: nils = nil.split(' ', 1) surname = nils[-1] namesurname = nil if surname != nil: # Distance with the "Surname" in the database. ratios.append(ratcliff(name1, surname, sm1)) if not _scan_character: ratios.append(ratcliff(name1, namesurname, sm1)) if name2: ratios.append(ratcliff(name2, surname, sm2)) # Distance with the "Name Surname" in the database. if namesurname: ratios.append(ratcliff(name2, namesurname, sm2)) if name3: # Distance with the long imdb canonical name. ratios.append(ratcliff(name3, build_name(n_data, canonical=1), sm3) + 0.1) ratio = max(ratios) if ratio >= RO_THRESHOLD: if resd.has_key(i): if ratio > resd[i][0]: resd[i] = (ratio, (i, n_data)) else: resd[i] = (ratio, (i, n_data)) res = resd.values() res.sort() res.reverse() if results > 0: res[:] = res[:results] return res def scan_titles(titles_list, title1, title2, title3, results=0, searchingEpisode=0, onlyEpisodes=0, ro_thresold=None): """Scan a list of titles, searching for best matches against the given variations.""" if ro_thresold is not None: RO_THRESHOLD = ro_thresold else: RO_THRESHOLD = 0.6 sm1 = SequenceMatcher() sm2 = SequenceMatcher() sm3 = SequenceMatcher() sm1.set_seq1(title1.lower()) sm2.set_seq2(title2.lower()) if title3: sm3.set_seq1(title3.lower()) if title3[-1] == '}': searchingEpisode = 1 hasArt = 0 if title2 != title1: hasArt = 1 resd = {} for i, t_data in titles_list: if onlyEpisodes: if t_data.get('kind') != 'episode': continue til = t_data['title'] if til[-1] == ')': dateIdx = til.rfind('(') if dateIdx != -1: til = til[:dateIdx].rstrip() if not til: continue ratio = ratcliff(title1, til, sm1) if ratio >= RO_THRESHOLD: resd[i] = (ratio, (i, t_data)) continue if searchingEpisode: if t_data.get('kind') != 'episode': continue elif t_data.get('kind') == 'episode': continue til = t_data['title'] # XXX: on Symbian, here we get a str; not sure this is the # right place to fix it. if isinstance(til, str): til = unicode(til, 'latin1', 'ignore') # Distance with the canonical title (with or without article). # titleS -> titleR # titleS, the -> titleR, the if not searchingEpisode: til = canonicalTitle(til) ratios = [ratcliff(title1, til, sm1) + 0.05] # til2 is til without the article, if present. til2 = til tils = til2.split(', ') matchHasArt = 0 if tils[-1].lower() in _unicodeArticles: til2 = ', '.join(tils[:-1]) matchHasArt = 1 if hasArt and not matchHasArt: # titleS[, the] -> titleR ratios.append(ratcliff(title2, til, sm2)) elif matchHasArt and not hasArt: # titleS -> titleR[, the] ratios.append(ratcliff(title1, til2, sm1)) else: ratios = [0.0] if title3: # Distance with the long imdb canonical title. ratios.append(ratcliff(title3, build_title(t_data, canonical=1, ptdf=1), sm3) + 0.1) ratio = max(ratios) if ratio >= RO_THRESHOLD: if resd.has_key(i): if ratio > resd[i][0]: resd[i] = (ratio, (i, t_data)) else: resd[i] = (ratio, (i, t_data)) res = resd.values() res.sort() res.reverse() if results > 0: res[:] = res[:results] return res def scan_company_names(name_list, name1, results=0, ro_thresold=None): """Scan a list of company names, searching for best matches against the given name. Notice that this function takes a list of strings, and not a list of dictionaries.""" if ro_thresold is not None: RO_THRESHOLD = ro_thresold else: RO_THRESHOLD = 0.6 sm1 = SequenceMatcher() sm1.set_seq1(name1.lower()) resd = {} withoutCountry = not name1.endswith(']') for i, n in name_list: # XXX: on Symbian, here we get a str; not sure this is the # right place to fix it. if isinstance(n, str): n = unicode(n, 'latin1', 'ignore') o_name = n var = 0.0 if withoutCountry and n.endswith(']'): cidx = n.rfind('[') if cidx != -1: n = n[:cidx].rstrip() var = -0.05 # Distance with the company name. ratio = ratcliff(name1, n, sm1) + var if ratio >= RO_THRESHOLD: if resd.has_key(i): if ratio > resd[i][0]: resd[i] = (ratio, (i, analyze_company_name(o_name))) else: resd[i] = (ratio, (i, analyze_company_name(o_name))) res = resd.values() res.sort() res.reverse() if results > 0: res[:] = res[:results] return res try: from cutils import soundex except ImportError: _aux_logger.warn('Unable to import the cutils.soundex function.' ' Searches of movie titles and person names will be' ' a bit slower.') _translate = dict(B='1', C='2', D='3', F='1', G='2', J='2', K='2', L='4', M='5', N='5', P='1', Q='2', R='6', S='2', T='3', V='1', X='2', Z='2') _translateget = _translate.get _re_non_ascii = re.compile(r'^[^a-z]*', re.I) SOUNDEX_LEN = 5 def soundex(s): """Return the soundex code for the given string.""" # Maximum length of the soundex code. s = _re_non_ascii.sub('', s) if not s: return None s = s.upper() soundCode = s[0] for c in s[1:]: cw = _translateget(c, '0') if cw != '0' and soundCode[-1] != cw: soundCode += cw return soundCode[:SOUNDEX_LEN] or None def _sortKeywords(keyword, kwds): """Sort a list of keywords, based on the searched one.""" sm = SequenceMatcher() sm.set_seq1(keyword.lower()) ratios = [(ratcliff(keyword, k, sm), k) for k in kwds] checkContained = False if len(keyword) > 4: checkContained = True for idx, data in enumerate(ratios): ratio, key = data if key.startswith(keyword): ratios[idx] = (ratio+0.5, key) elif checkContained and keyword in key: ratios[idx] = (ratio+0.3, key) ratios.sort() ratios.reverse() return [r[1] for r in ratios] def filterSimilarKeywords(keyword, kwdsIterator): """Return a sorted list of keywords similar to the one given.""" seenDict = {} kwdSndx = soundex(keyword.encode('ascii', 'ignore')) matches = [] matchesappend = matches.append checkContained = False if len(keyword) > 4: checkContained = True for movieID, key in kwdsIterator: if key in seenDict: continue seenDict[key] = None if checkContained and keyword in key: matchesappend(key) continue if kwdSndx == soundex(key.encode('ascii', 'ignore')): matchesappend(key) return _sortKeywords(keyword, matches) # ============================= _litlist = ['screenplay/teleplay', 'novel', 'adaption', 'book', 'production process protocol', 'interviews', 'printed media reviews', 'essays', 'other literature'] _litd = dict([(x, ('literature', x)) for x in _litlist]) _buslist = ['budget', 'weekend gross', 'gross', 'opening weekend', 'rentals', 'admissions', 'filming dates', 'production dates', 'studios', 'copyright holder'] _busd = dict([(x, ('business', x)) for x in _buslist]) def _reGroupDict(d, newgr): """Regroup keys in the d dictionary in subdictionaries, based on the scheme in the newgr dictionary. E.g.: in the newgr, an entry 'LD label': ('laserdisc', 'label') tells the _reGroupDict() function to take the entry with label 'LD label' (as received from the sql database) and put it in the subsection (another dictionary) named 'laserdisc', using the key 'label'.""" r = {} newgrks = newgr.keys() for k, v in d.items(): if k in newgrks: r.setdefault(newgr[k][0], {})[newgr[k][1]] = v # A not-so-clearer version: ##r.setdefault(newgr[k][0], {}) ##r[newgr[k][0]][newgr[k][1]] = v else: r[k] = v return r def _groupListBy(l, index): """Regroup items in a list in a list of lists, grouped by the value at the given index.""" tmpd = {} for item in l: tmpd.setdefault(item[index], []).append(item) res = tmpd.values() return res def sub_dict(d, keys): """Return the subdictionary of 'd', with just the keys listed in 'keys'.""" return dict([(k, d[k]) for k in keys if k in d]) def get_movie_data(movieID, kindDict, fromAka=0, _table=None): """Return a dictionary containing data about the given movieID; if fromAka is true, the AkaTitle table is searched; _table is reserved for the imdbpy2sql.py script.""" if _table is not None: Table = _table else: if not fromAka: Table = Title else: Table = AkaTitle try: m = Table.get(movieID) except Exception, e: _aux_logger.warn('Unable to fetch information for movieID %s: %s', movieID, e) mdict = {} return mdict mdict = {'title': m.title, 'kind': kindDict[m.kindID], 'year': m.productionYear, 'imdbIndex': m.imdbIndex, 'season': m.seasonNr, 'episode': m.episodeNr} if not fromAka: if m.seriesYears is not None: mdict['series years'] = unicode(m.seriesYears) if mdict['imdbIndex'] is None: del mdict['imdbIndex'] if mdict['year'] is None: del mdict['year'] else: try: mdict['year'] = int(mdict['year']) except (TypeError, ValueError): del mdict['year'] if mdict['season'] is None: del mdict['season'] else: try: mdict['season'] = int(mdict['season']) except: pass if mdict['episode'] is None: del mdict['episode'] else: try: mdict['episode'] = int(mdict['episode']) except: pass episodeOfID = m.episodeOfID if episodeOfID is not None: ser_dict = get_movie_data(episodeOfID, kindDict, fromAka) mdict['episode of'] = Movie(data=ser_dict, movieID=episodeOfID, accessSystem='sql') if fromAka: ser_note = AkaTitle.get(episodeOfID).note if ser_note: mdict['episode of'].notes = ser_note return mdict def _iterKeywords(results): """Iterate over (key.id, key.keyword) columns of a selection of the Keyword table.""" for key in results: yield key.id, key.keyword def getSingleInfo(table, movieID, infoType, notAList=False): """Return a dictionary in the form {infoType: infoListOrString}, retrieving a single set of information about a given movie, from the specified table.""" infoTypeID = InfoType.select(InfoType.q.info == infoType) if infoTypeID.count() == 0: return {} res = table.select(AND(table.q.movieID == movieID, table.q.infoTypeID == infoTypeID[0].id)) retList = [] for r in res: info = r.info note = r.note if note: info += u'::%s' % note retList.append(info) if not retList: return {} if not notAList: return {infoType: retList} else: return {infoType: retList[0]} def _cmpTop(a, b, what='top 250 rank'): """Compare function used to sort top 250/bottom 10 rank.""" av = int(a[1].get(what)) bv = int(b[1].get(what)) if av == bv: return 0 return (-1, 1)[av > bv] def _cmpBottom(a, b): """Compare function used to sort top 250/bottom 10 rank.""" return _cmpTop(a, b, what='bottom 10 rank') class IMDbSqlAccessSystem(IMDbBase): """The class used to access IMDb's data through a SQL database.""" accessSystem = 'sql' _sql_logger = logging.getLogger('imdbpy.parser.sql') def __init__(self, uri, adultSearch=1, useORM=None, *arguments, **keywords): """Initialize the access system.""" IMDbBase.__init__(self, *arguments, **keywords) if useORM is None: useORM = ('sqlobject', 'sqlalchemy') if not isinstance(useORM, (tuple, list)): if ',' in useORM: useORM = useORM.split(',') else: useORM = [useORM] self.useORM = useORM nrMods = len(useORM) _gotError = False DB_TABLES = [] for idx, mod in enumerate(useORM): mod = mod.strip().lower() try: if mod == 'sqlalchemy': from alchemyadapter import getDBTables, NotFoundError, \ setConnection, AND, OR, IN, \ ISNULL, CONTAINSSTRING, toUTF8 elif mod == 'sqlobject': from objectadapter import getDBTables, NotFoundError, \ setConnection, AND, OR, IN, \ ISNULL, CONTAINSSTRING, toUTF8 else: self._sql_logger.warn('unknown module "%s"' % mod) continue self._sql_logger.info('using %s ORM', mod) # XXX: look ma'... black magic! It's used to make # TableClasses and some functions accessible # through the whole module. for k, v in [('NotFoundError', NotFoundError), ('AND', AND), ('OR', OR), ('IN', IN), ('ISNULL', ISNULL), ('CONTAINSSTRING', CONTAINSSTRING)]: globals()[k] = v self.toUTF8 = toUTF8 DB_TABLES = getDBTables(uri) for t in DB_TABLES: globals()[t._imdbpyName] = t if _gotError: self._sql_logger.warn('falling back to "%s"' % mod) break except ImportError, e: if idx+1 >= nrMods: raise IMDbError('unable to use any ORM in %s: %s' % ( str(useORM), str(e))) else: self._sql_logger.warn('unable to use "%s": %s' % (mod, str(e))) _gotError = True continue else: raise IMDbError('unable to use any ORM in %s' % str(useORM)) # Set the connection to the database. self._sql_logger.debug('connecting to %s', uri) try: self._connection = setConnection(uri, DB_TABLES) except AssertionError, e: raise IMDbDataAccessError( \ 'unable to connect to the database server; ' + \ 'complete message: "%s"' % str(e)) self.Error = self._connection.module.Error # Maps some IDs to the corresponding strings. self._kind = {} self._kindRev = {} self._sql_logger.debug('reading constants from the database') try: for kt in KindType.select(): self._kind[kt.id] = kt.kind self._kindRev[str(kt.kind)] = kt.id except self.Error: # NOTE: you can also get the error, but - at least with # MySQL - it also contains the password, and I don't # like the idea to print it out. raise IMDbDataAccessError( \ 'unable to connect to the database server') self._role = {} for rl in RoleType.select(): self._role[rl.id] = str(rl.role) self._info = {} self._infoRev = {} for inf in InfoType.select(): self._info[inf.id] = str(inf.info) self._infoRev[str(inf.info)] = inf.id self._compType = {} for cType in CompanyType.select(): self._compType[cType.id] = cType.kind info = [(it.id, it.info) for it in InfoType.select()] self._compcast = {} for cc in CompCastType.select(): self._compcast[cc.id] = str(cc.kind) self._link = {} for lt in LinkType.select(): self._link[lt.id] = str(lt.link) self._moviesubs = {} # Build self._moviesubs, a dictionary used to rearrange # the data structure for a movie object. for vid, vinfo in info: if not vinfo.startswith('LD '): continue self._moviesubs[vinfo] = ('laserdisc', vinfo[3:]) self._moviesubs.update(_litd) self._moviesubs.update(_busd) self.do_adult_search(adultSearch) def _findRefs(self, o, trefs, nrefs): """Find titles or names references in strings.""" if isinstance(o, (unicode, str)): for title in re_titleRef.findall(o): a_title = analyze_title(title, canonical=0) rtitle = build_title(a_title, ptdf=1) if trefs.has_key(rtitle): continue movieID = self._getTitleID(rtitle) if movieID is None: movieID = self._getTitleID(title) if movieID is None: continue m = Movie(title=rtitle, movieID=movieID, accessSystem=self.accessSystem) trefs[rtitle] = m rtitle2 = canonicalTitle(a_title.get('title', u'')) if rtitle2 and rtitle2 != rtitle and rtitle2 != title: trefs[rtitle2] = m if title != rtitle: trefs[title] = m for name in re_nameRef.findall(o): a_name = analyze_name(name, canonical=1) rname = build_name(a_name, canonical=1) if nrefs.has_key(rname): continue personID = self._getNameID(rname) if personID is None: personID = self._getNameID(name) if personID is None: continue p = Person(name=rname, personID=personID, accessSystem=self.accessSystem) nrefs[rname] = p rname2 = normalizeName(a_name.get('name', u'')) if rname2 and rname2 != rname: nrefs[rname2] = p if name != rname and name != rname2: nrefs[name] = p elif isinstance(o, (list, tuple)): for item in o: self._findRefs(item, trefs, nrefs) elif isinstance(o, dict): for value in o.values(): self._findRefs(value, trefs, nrefs) return trefs, nrefs def _extractRefs(self, o): """Scan for titles or names references in strings.""" trefs = {} nrefs = {} try: return self._findRefs(o, trefs, nrefs) except RuntimeError, e: # Symbian/python 2.2 has a poor regexp implementation. import warnings warnings.warn('RuntimeError in ' "imdb.parser.sql.IMDbSqlAccessSystem; " "if it's not a recursion limit exceeded and we're not " "running in a Symbian environment, it's a bug:\n%s" % e) return trefs, nrefs def _changeAKAencoding(self, akanotes, akatitle): """Return akatitle in the correct charset, as specified in the akanotes field; if akatitle doesn't need to be modified, return None.""" oti = akanotes.find('(original ') if oti == -1: return None ote = akanotes[oti+10:].find(' title)') if ote != -1: cs_info = akanotes[oti+10:oti+10+ote].lower().split() for e in cs_info: # excludes some strings that clearly are not encoding. if e in ('script', '', 'cyrillic', 'greek'): continue if e.startswith('iso-') and e.find('latin') != -1: e = e[4:].replace('-', '') try: lookup(e) lat1 = akatitle.encode('latin_1', 'replace') return unicode(lat1, e, 'replace') except (LookupError, ValueError, TypeError): continue return None def _buildNULLCondition(self, col, val): """Build a comparison for columns where values can be NULL.""" if val is None: return ISNULL(col) else: if isinstance(val, (int, long)): return col == val else: return col == self.toUTF8(val) def _getTitleID(self, title): """Given a long imdb canonical title, returns a movieID or None if not found.""" td = analyze_title(title) condition = None if td['kind'] == 'episode': epof = td['episode of'] seriesID = [s.id for s in Title.select( AND(Title.q.title == self.toUTF8(epof['title']), self._buildNULLCondition(Title.q.imdbIndex, epof.get('imdbIndex')), Title.q.kindID == self._kindRev[epof['kind']], self._buildNULLCondition(Title.q.productionYear, epof.get('year'))))] if seriesID: condition = AND(IN(Title.q.episodeOfID, seriesID), Title.q.title == self.toUTF8(td['title']), self._buildNULLCondition(Title.q.imdbIndex, td.get('imdbIndex')), Title.q.kindID == self._kindRev[td['kind']], self._buildNULLCondition(Title.q.productionYear, td.get('year'))) if condition is None: condition = AND(Title.q.title == self.toUTF8(td['title']), self._buildNULLCondition(Title.q.imdbIndex, td.get('imdbIndex')), Title.q.kindID == self._kindRev[td['kind']], self._buildNULLCondition(Title.q.productionYear, td.get('year'))) res = Title.select(condition) try: if res.count() != 1: return None except (UnicodeDecodeError, TypeError): return None return res[0].id def _getNameID(self, name): """Given a long imdb canonical name, returns a personID or None if not found.""" nd = analyze_name(name) res = Name.select(AND(Name.q.name == self.toUTF8(nd['name']), self._buildNULLCondition(Name.q.imdbIndex, nd.get('imdbIndex')))) try: c = res.count() if res.count() != 1: return None except (UnicodeDecodeError, TypeError): return None return res[0].id def _normalize_movieID(self, movieID): """Normalize the given movieID.""" try: return int(movieID) except (ValueError, OverflowError): raise IMDbError('movieID "%s" can\'t be converted to integer' % \ movieID) def _normalize_personID(self, personID): """Normalize the given personID.""" try: return int(personID) except (ValueError, OverflowError): raise IMDbError('personID "%s" can\'t be converted to integer' % \ personID) def _normalize_characterID(self, characterID): """Normalize the given characterID.""" try: return int(characterID) except (ValueError, OverflowError): raise IMDbError('characterID "%s" can\'t be converted to integer' \ % characterID) def _normalize_companyID(self, companyID): """Normalize the given companyID.""" try: return int(companyID) except (ValueError, OverflowError): raise IMDbError('companyID "%s" can\'t be converted to integer' \ % companyID) def get_imdbMovieID(self, movieID): """Translate a movieID in an imdbID. If not in the database, try an Exact Primary Title search on IMDb; return None if it's unable to get the imdbID. """ try: movie = Title.get(movieID) except NotFoundError: return None imdbID = movie.imdbID if imdbID is not None: return '%07d' % imdbID m_dict = get_movie_data(movie.id, self._kind) titline = build_title(m_dict, ptdf=0) imdbID = self.title2imdbID(titline, m_dict['kind']) # If the imdbID was retrieved from the web and was not in the # database, update the database (ignoring errors, because it's # possibile that the current user has not update privileges). # There're times when I think I'm a genius; this one of # those times... if imdbID is not None and not isinstance(imdbID, list): try: movie.imdbID = int(imdbID) except: pass return imdbID def get_imdbPersonID(self, personID): """Translate a personID in an imdbID. If not in the database, try an Exact Primary Name search on IMDb; return None if it's unable to get the imdbID. """ try: person = Name.get(personID) except NotFoundError: return None imdbID = person.imdbID if imdbID is not None: return '%07d' % imdbID n_dict = {'name': person.name, 'imdbIndex': person.imdbIndex} namline = build_name(n_dict, canonical=False) imdbID = self.name2imdbID(namline) if imdbID is not None and not isinstance(imdbID, list): try: person.imdbID = int(imdbID) except: pass return imdbID def get_imdbCharacterID(self, characterID): """Translate a characterID in an imdbID. If not in the database, try an Exact Primary Name search on IMDb; return None if it's unable to get the imdbID. """ try: character = CharName.get(characterID) except NotFoundError: return None imdbID = character.imdbID if imdbID is not None: return '%07d' % imdbID n_dict = {'name': character.name, 'imdbIndex': character.imdbIndex} namline = build_name(n_dict, canonical=False) imdbID = self.character2imdbID(namline) if imdbID is not None and not isinstance(imdbID, list): try: character.imdbID = int(imdbID) except: pass return imdbID def get_imdbCompanyID(self, companyID): """Translate a companyID in an imdbID. If not in the database, try an Exact Primary Name search on IMDb; return None if it's unable to get the imdbID. """ try: company = CompanyName.get(companyID) except NotFoundError: return None imdbID = company.imdbID if imdbID is not None: return '%07d' % imdbID n_dict = {'name': company.name, 'country': company.countryCode} namline = build_company_name(n_dict) imdbID = self.company2imdbID(namline) if imdbID is not None and not isinstance(imdbID, list): try: company.imdbID = int(imdbID) except: pass return imdbID def do_adult_search(self, doAdult): """If set to 0 or False, movies in the Adult category are not episodeOf = title_dict.get('episode of') shown in the results of a search.""" self.doAdult = doAdult def _search_movie(self, title, results, _episodes=False): title = title.strip() if not title: return [] title_dict = analyze_title(title, canonical=1) s_title = title_dict['title'] if not s_title: return [] episodeOf = title_dict.get('episode of') if episodeOf: _episodes = False s_title_split = s_title.split(', ') if len(s_title_split) > 1 and \ s_title_split[-1].lower() in _unicodeArticles: s_title_rebuilt = ', '.join(s_title_split[:-1]) if s_title_rebuilt: s_title = s_title_rebuilt #if not episodeOf: # if not _episodes: # s_title_split = s_title.split(', ') # if len(s_title_split) > 1 and \ # s_title_split[-1].lower() in _articles: # s_title_rebuilt = ', '.join(s_title_split[:-1]) # if s_title_rebuilt: # s_title = s_title_rebuilt #else: # _episodes = False if isinstance(s_title, unicode): s_title = s_title.encode('ascii', 'ignore') soundexCode = soundex(s_title) # XXX: improve the search restricting the kindID if the # "kind" of the input differs from "movie"? condition = conditionAka = None if _episodes: condition = AND(Title.q.phoneticCode == soundexCode, Title.q.kindID == self._kindRev['episode']) conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode, AkaTitle.q.kindID == self._kindRev['episode']) elif title_dict['kind'] == 'episode' and episodeOf is not None: # set canonical=0 ? Should not make much difference. series_title = build_title(episodeOf, canonical=1) # XXX: is it safe to get "results" results? # Too many? Too few? serRes = results if serRes < 3 or serRes > 10: serRes = 10 searchSeries = self._search_movie(series_title, serRes) seriesIDs = [result[0] for result in searchSeries] if seriesIDs: condition = AND(Title.q.phoneticCode == soundexCode, IN(Title.q.episodeOfID, seriesIDs), Title.q.kindID == self._kindRev['episode']) conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode, IN(AkaTitle.q.episodeOfID, seriesIDs), AkaTitle.q.kindID == self._kindRev['episode']) else: # XXX: bad situation: we have found no matching series; # try searching everything (both episodes and # non-episodes) for the title. condition = AND(Title.q.phoneticCode == soundexCode, IN(Title.q.episodeOfID, seriesIDs)) conditionAka = AND(AkaTitle.q.phoneticCode == soundexCode, IN(AkaTitle.q.episodeOfID, seriesIDs)) if condition is None: # XXX: excludes episodes? condition = AND(Title.q.kindID != self._kindRev['episode'], Title.q.phoneticCode == soundexCode) conditionAka = AND(AkaTitle.q.kindID != self._kindRev['episode'], AkaTitle.q.phoneticCode == soundexCode) # Up to 3 variations of the title are searched, plus the # long imdb canonical title, if provided. if not _episodes: title1, title2, title3 = titleVariations(title) else: title1 = title title2 = '' title3 = '' try: qr = [(q.id, get_movie_data(q.id, self._kind)) for q in Title.select(condition)] q2 = [(q.movieID, get_movie_data(q.id, self._kind, fromAka=1)) for q in AkaTitle.select(conditionAka)] qr += q2 except NotFoundError, e: raise IMDbDataAccessError( \ 'unable to search the database: "%s"' % str(e)) resultsST = results * 3 res = scan_titles(qr, title1, title2, title3, resultsST, searchingEpisode=episodeOf is not None, onlyEpisodes=_episodes, ro_thresold=0.0) res[:] = [x[1] for x in res] if res and not self.doAdult: mids = [x[0] for x in res] genreID = self._infoRev['genres'] adultlist = [al.movieID for al in MovieInfo.select( AND(MovieInfo.q.infoTypeID == genreID, MovieInfo.q.info == 'Adult', IN(MovieInfo.q.movieID, mids)))] res[:] = [x for x in res if x[0] not in adultlist] new_res = [] # XXX: can there be duplicates? for r in res: if r not in q2: new_res.append(r) continue mdict = r[1] aka_title = build_title(mdict, ptdf=1) orig_dict = get_movie_data(r[0], self._kind) orig_title = build_title(orig_dict, ptdf=1) if aka_title == orig_title: new_res.append(r) continue orig_dict['akas'] = [aka_title] new_res.append((r[0], orig_dict)) if results > 0: new_res[:] = new_res[:results] return new_res def _search_episode(self, title, results): return self._search_movie(title, results, _episodes=True) def get_movie_main(self, movieID): # Every movie information is retrieved from here. infosets = self.get_movie_infoset() try: res = get_movie_data(movieID, self._kind) except NotFoundError, e: raise IMDbDataAccessError( \ 'unable to get movieID "%s": "%s"' % (movieID, str(e))) if not res: raise IMDbDataAccessError('unable to get movieID "%s"' % movieID) # Collect cast information. castdata = [[cd.personID, cd.personRoleID, cd.note, cd.nrOrder, self._role[cd.roleID]] for cd in CastInfo.select(CastInfo.q.movieID == movieID)] for p in castdata: person = Name.get(p[0]) p += [person.name, person.imdbIndex] if p[4] in ('actor', 'actress'): p[4] = 'cast' # Regroup by role/duty (cast, writer, director, ...) castdata[:] = _groupListBy(castdata, 4) for group in castdata: duty = group[0][4] for pdata in group: curRole = pdata[1] curRoleID = None if curRole is not None: robj = CharName.get(curRole) curRole = robj.name curRoleID = robj.id p = Person(personID=pdata[0], name=pdata[5], currentRole=curRole or u'', roleID=curRoleID, notes=pdata[2] or u'', accessSystem='sql') if pdata[6]: p['imdbIndex'] = pdata[6] p.billingPos = pdata[3] res.setdefault(duty, []).append(p) if duty == 'cast': res[duty] = merge_roles(res[duty]) res[duty].sort() # Info about the movie. minfo = [(self._info[m.infoTypeID], m.info, m.note) for m in MovieInfo.select(MovieInfo.q.movieID == movieID)] minfo += [(self._info[m.infoTypeID], m.info, m.note) for m in MovieInfoIdx.select(MovieInfoIdx.q.movieID == movieID)] minfo += [('keywords', Keyword.get(m.keywordID).keyword, None) for m in MovieKeyword.select(MovieKeyword.q.movieID == movieID)] minfo = _groupListBy(minfo, 0) for group in minfo: sect = group[0][0] for mdata in group: data = mdata[1] if mdata[2]: data += '::%s' % mdata[2] res.setdefault(sect, []).append(data) # Companies info about a movie. cinfo = [(self._compType[m.companyTypeID], m.companyID, m.note) for m in MovieCompanies.select(MovieCompanies.q.movieID == movieID)] cinfo = _groupListBy(cinfo, 0) for group in cinfo: sect = group[0][0] for mdata in group: cDb = CompanyName.get(mdata[1]) cDbTxt = cDb.name if cDb.countryCode: cDbTxt += ' %s' % cDb.countryCode company = Company(name=cDbTxt, companyID=mdata[1], notes=mdata[2] or u'', accessSystem=self.accessSystem) res.setdefault(sect, []).append(company) # AKA titles. akat = [(get_movie_data(at.id, self._kind, fromAka=1), at.note) for at in AkaTitle.select(AkaTitle.q.movieID == movieID)] if akat: res['akas'] = [] for td, note in akat: nt = build_title(td, ptdf=1) if note: net = self._changeAKAencoding(note, nt) if net is not None: nt = net nt += '::%s' % note if nt not in res['akas']: res['akas'].append(nt) # Complete cast/crew. compcast = [(self._compcast[cc.subjectID], self._compcast[cc.statusID]) for cc in CompleteCast.select(CompleteCast.q.movieID == movieID)] if compcast: for entry in compcast: val = unicode(entry[1]) res[u'complete %s' % entry[0]] = val # Movie connections. mlinks = [[ml.linkedMovieID, self._link[ml.linkTypeID]] for ml in MovieLink.select(MovieLink.q.movieID == movieID)] if mlinks: for ml in mlinks: lmovieData = get_movie_data(ml[0], self._kind) if lmovieData: m = Movie(movieID=ml[0], data=lmovieData, accessSystem='sql') ml[0] = m res['connections'] = {} mlinks[:] = _groupListBy(mlinks, 1) for group in mlinks: lt = group[0][1] res['connections'][lt] = [i[0] for i in group] # Episodes. episodes = {} eps_list = list(Title.select(Title.q.episodeOfID == movieID)) eps_list.sort() if eps_list: ps_data = {'title': res['title'], 'kind': res['kind'], 'year': res.get('year'), 'imdbIndex': res.get('imdbIndex')} parentSeries = Movie(movieID=movieID, data=ps_data, accessSystem='sql') for episode in eps_list: episodeID = episode.id episode_data = get_movie_data(episodeID, self._kind) m = Movie(movieID=episodeID, data=episode_data, accessSystem='sql') m['episode of'] = parentSeries season = episode_data.get('season', 'UNKNOWN') if season not in episodes: episodes[season] = {} ep_number = episode_data.get('episode') if ep_number is None: ep_number = max((episodes[season].keys() or [0])) + 1 episodes[season][ep_number] = m res['episodes'] = episodes res['number of episodes'] = sum([len(x) for x in episodes.values()]) res['number of seasons'] = len(episodes.keys()) # Regroup laserdisc information. res = _reGroupDict(res, self._moviesubs) # Do some transformation to preserve consistency with other # data access systems. if 'quotes' in res: for idx, quote in enumerate(res['quotes']): res['quotes'][idx] = quote.split('::') if 'runtimes' in res and len(res['runtimes']) > 0: rt = res['runtimes'][0] episodes = re_episodes.findall(rt) if episodes: res['runtimes'][0] = re_episodes.sub('', rt) if res['runtimes'][0][-2:] == '::': res['runtimes'][0] = res['runtimes'][0][:-2] if 'votes' in res: res['votes'] = int(res['votes'][0]) if 'rating' in res: res['rating'] = float(res['rating'][0]) if 'votes distribution' in res: res['votes distribution'] = res['votes distribution'][0] if 'mpaa' in res: res['mpaa'] = res['mpaa'][0] if 'top 250 rank' in res: try: res['top 250 rank'] = int(res['top 250 rank']) except: pass if 'bottom 10 rank' in res: try: res['bottom 100 rank'] = int(res['bottom 10 rank']) except: pass del res['bottom 10 rank'] for old, new in [('guest', 'guests'), ('trademarks', 'trade-mark'), ('articles', 'article'), ('pictorials', 'pictorial'), ('magazine-covers', 'magazine-cover-photo')]: if old in res: res[new] = res[old] del res[old] trefs,nrefs = {}, {} trefs,nrefs = self._extractRefs(sub_dict(res,Movie.keys_tomodify_list)) return {'data': res, 'titlesRefs': trefs, 'namesRefs': nrefs, 'info sets': infosets} # Just to know what kind of information are available. get_movie_alternate_versions = get_movie_main get_movie_business = get_movie_main get_movie_connections = get_movie_main get_movie_crazy_credits = get_movie_main get_movie_goofs = get_movie_main get_movie_keywords = get_movie_main get_movie_literature = get_movie_main get_movie_locations = get_movie_main get_movie_plot = get_movie_main get_movie_quotes = get_movie_main get_movie_release_dates = get_movie_main get_movie_soundtrack = get_movie_main get_movie_taglines = get_movie_main get_movie_technical = get_movie_main get_movie_trivia = get_movie_main get_movie_vote_details = get_movie_main get_movie_episodes = get_movie_main def _search_person(self, name, results): name = name.strip() if not name: return [] s_name = analyze_name(name)['name'] if not s_name: return [] if isinstance(s_name, unicode): s_name = s_name.encode('ascii', 'ignore') soundexCode = soundex(s_name) name1, name2, name3 = nameVariations(name) # If the soundex is None, compare only with the first # phoneticCode column. if soundexCode is not None: condition = IN(soundexCode, [Name.q.namePcodeCf, Name.q.namePcodeNf, Name.q.surnamePcode]) conditionAka = IN(soundexCode, [AkaName.q.namePcodeCf, AkaName.q.namePcodeNf, AkaName.q.surnamePcode]) else: condition = ISNULL(Name.q.namePcodeCf) conditionAka = ISNULL(AkaName.q.namePcodeCf) try: qr = [(q.id, {'name': q.name, 'imdbIndex': q.imdbIndex}) for q in Name.select(condition)] q2 = [(q.personID, {'name': q.name, 'imdbIndex': q.imdbIndex}) for q in AkaName.select(conditionAka)] qr += q2 except NotFoundError, e: raise IMDbDataAccessError( \ 'unable to search the database: "%s"' % str(e)) res = scan_names(qr, name1, name2, name3, results) res[:] = [x[1] for x in res] # Purge empty imdbIndex. returnl = [] for x in res: tmpd = x[1] if tmpd['imdbIndex'] is None: del tmpd['imdbIndex'] returnl.append((x[0], tmpd)) new_res = [] # XXX: can there be duplicates? for r in returnl: if r not in q2: new_res.append(r) continue pdict = r[1] aka_name = build_name(pdict, canonical=1) p = Name.get(r[0]) orig_dict = {'name': p.name, 'imdbIndex': p.imdbIndex} if orig_dict['imdbIndex'] is None: del orig_dict['imdbIndex'] orig_name = build_name(orig_dict, canonical=1) if aka_name == orig_name: new_res.append(r) continue orig_dict['akas'] = [aka_name] new_res.append((r[0], orig_dict)) if results > 0: new_res[:] = new_res[:results] return new_res def get_person_main(self, personID): # Every person information is retrieved from here. infosets = self.get_person_infoset() try: p = Name.get(personID) except NotFoundError, e: raise IMDbDataAccessError( \ 'unable to get personID "%s": "%s"' % (personID, str(e))) res = {'name': p.name, 'imdbIndex': p.imdbIndex} if res['imdbIndex'] is None: del res['imdbIndex'] if not res: raise IMDbDataAccessError('unable to get personID "%s"' % personID) # Collect cast information. castdata = [(cd.movieID, cd.personRoleID, cd.note, self._role[cd.roleID], get_movie_data(cd.movieID, self._kind)) for cd in CastInfo.select(CastInfo.q.personID == personID)] # Regroup by role/duty (cast, writer, director, ...) castdata[:] = _groupListBy(castdata, 3) episodes = {} seenDuties = [] for group in castdata: for mdata in group: duty = orig_duty = group[0][3] if duty not in seenDuties: seenDuties.append(orig_duty) note = mdata[2] or u'' if 'episode of' in mdata[4]: duty = 'episodes' if orig_duty not in ('actor', 'actress'): if note: note = ' %s' % note note = '[%s]%s' % (orig_duty, note) curRole = mdata[1] curRoleID = None if curRole is not None: robj = CharName.get(curRole) curRole = robj.name curRoleID = robj.id m = Movie(movieID=mdata[0], data=mdata[4], currentRole=curRole or u'', roleID=curRoleID, notes=note, accessSystem='sql') if duty != 'episodes': res.setdefault(duty, []).append(m) else: episodes.setdefault(m['episode of'], []).append(m) if episodes: for k in episodes: episodes[k].sort() episodes[k].reverse() res['episodes'] = episodes for duty in seenDuties: if duty in res: if duty in ('actor', 'actress', 'himself', 'herself', 'themselves'): res[duty] = merge_roles(res[duty]) res[duty].sort() # Info about the person. pinfo = [(self._info[pi.infoTypeID], pi.info, pi.note) for pi in PersonInfo.select(PersonInfo.q.personID == personID)] # Regroup by duty. pinfo = _groupListBy(pinfo, 0) for group in pinfo: sect = group[0][0] for pdata in group: data = pdata[1] if pdata[2]: data += '::%s' % pdata[2] res.setdefault(sect, []).append(data) # AKA names. akan = [(an.name, an.imdbIndex) for an in AkaName.select(AkaName.q.personID == personID)] if akan: res['akas'] = [] for n in akan: nd = {'name': n[0]} if n[1]: nd['imdbIndex'] = n[1] nt = build_name(nd, canonical=1) res['akas'].append(nt) # Do some transformation to preserve consistency with other # data access systems. for key in ('birth date', 'birth notes', 'death date', 'death notes', 'birth name', 'height'): if key in res: res[key] = res[key][0] if 'guest' in res: res['notable tv guest appearances'] = res['guest'] del res['guest'] miscnames = res.get('nick names', []) if 'birth name' in res: miscnames.append(res['birth name']) if 'akas' in res: for mname in miscnames: if mname in res['akas']: res['akas'].remove(mname) if not res['akas']: del res['akas'] trefs,nrefs = self._extractRefs(sub_dict(res,Person.keys_tomodify_list)) return {'data': res, 'titlesRefs': trefs, 'namesRefs': nrefs, 'info sets': infosets} # Just to know what kind of information are available. get_person_filmography = get_person_main get_person_biography = get_person_main get_person_other_works = get_person_main get_person_episodes = get_person_main def _search_character(self, name, results): name = name.strip() if not name: return [] s_name = analyze_name(name)['name'] if not s_name: return [] if isinstance(s_name, unicode): s_name = s_name.encode('ascii', 'ignore') s_name = normalizeName(s_name) soundexCode = soundex(s_name) surname = s_name.split(' ')[-1] surnameSoundex = soundex(surname) name2 = '' soundexName2 = None nsplit = s_name.split() if len(nsplit) > 1: name2 = '%s %s' % (nsplit[-1], ' '.join(nsplit[:-1])) if s_name == name2: name2 = '' else: soundexName2 = soundex(name2) # If the soundex is None, compare only with the first # phoneticCode column. if soundexCode is not None: if soundexName2 is not None: condition = OR(surnameSoundex == CharName.q.surnamePcode, IN(CharName.q.namePcodeNf, [soundexCode, soundexName2]), IN(CharName.q.surnamePcode, [soundexCode, soundexName2])) else: condition = OR(surnameSoundex == CharName.q.surnamePcode, IN(soundexCode, [CharName.q.namePcodeNf, CharName.q.surnamePcode])) else: condition = ISNULL(Name.q.namePcodeNf) try: qr = [(q.id, {'name': q.name, 'imdbIndex': q.imdbIndex}) for q in CharName.select(condition)] except NotFoundError, e: raise IMDbDataAccessError( \ 'unable to search the database: "%s"' % str(e)) res = scan_names(qr, s_name, name2, '', results, _scan_character=True) res[:] = [x[1] for x in res] # Purge empty imdbIndex. returnl = [] for x in res: tmpd = x[1] if tmpd['imdbIndex'] is None: del tmpd['imdbIndex'] returnl.append((x[0], tmpd)) return returnl def get_character_main(self, characterID, results=1000): # Every character information is retrieved from here. infosets = self.get_character_infoset() try: c = CharName.get(characterID) except NotFoundError, e: raise IMDbDataAccessError( \ 'unable to get characterID "%s": "%s"' % (characterID, e)) res = {'name': c.name, 'imdbIndex': c.imdbIndex} if res['imdbIndex'] is None: del res['imdbIndex'] if not res: raise IMDbDataAccessError('unable to get characterID "%s"' % \ characterID) # Collect filmography information. items = CastInfo.select(CastInfo.q.personRoleID == characterID) if results > 0: items = items[:results] filmodata = [(cd.movieID, cd.personID, cd.note, get_movie_data(cd.movieID, self._kind)) for cd in items if self._role[cd.roleID] in ('actor', 'actress')] fdata = [] for f in filmodata: curRole = None curRoleID = f[1] note = f[2] or u'' if curRoleID is not None: robj = Name.get(curRoleID) curRole = robj.name m = Movie(movieID=f[0], data=f[3], currentRole=curRole or u'', roleID=curRoleID, roleIsPerson=True, notes=note, accessSystem='sql') fdata.append(m) fdata = merge_roles(fdata) fdata.sort() if fdata: res['filmography'] = fdata return {'data': res, 'info sets': infosets} get_character_filmography = get_character_main get_character_biography = get_character_main def _search_company(self, name, results): name = name.strip() if not name: return [] if isinstance(name, unicode): name = name.encode('ascii', 'ignore') soundexCode = soundex(name) # If the soundex is None, compare only with the first # phoneticCode column. if soundexCode is None: condition = ISNULL(CompanyName.q.namePcodeNf) else: if name.endswith(']'): condition = CompanyName.q.namePcodeSf == soundexCode else: condition = CompanyName.q.namePcodeNf == soundexCode try: qr = [(q.id, {'name': q.name, 'country': q.countryCode}) for q in CompanyName.select(condition)] except NotFoundError, e: raise IMDbDataAccessError( \ 'unable to search the database: "%s"' % str(e)) qr[:] = [(x[0], build_company_name(x[1])) for x in qr] res = scan_company_names(qr, name, results) res[:] = [x[1] for x in res] # Purge empty country keys. returnl = [] for x in res: tmpd = x[1] country = tmpd.get('country') if country is None and 'country' in tmpd: del tmpd['country'] returnl.append((x[0], tmpd)) return returnl def get_company_main(self, companyID, results=0): # Every company information is retrieved from here. infosets = self.get_company_infoset() try: c = CompanyName.get(companyID) except NotFoundError, e: raise IMDbDataAccessError( \ 'unable to get companyID "%s": "%s"' % (companyID, e)) res = {'name': c.name, 'country': c.countryCode} if res['country'] is None: del res['country'] if not res: raise IMDbDataAccessError('unable to get companyID "%s"' % \ companyID) # Collect filmography information. items = MovieCompanies.select(MovieCompanies.q.companyID == companyID) if results > 0: items = items[:results] filmodata = [(cd.movieID, cd.companyID, self._compType[cd.companyTypeID], cd.note, get_movie_data(cd.movieID, self._kind)) for cd in items] filmodata = _groupListBy(filmodata, 2) for group in filmodata: ctype = group[0][2] for movieID, companyID, ctype, note, movieData in group: movie = Movie(data=movieData, movieID=movieID, notes=note or u'', accessSystem=self.accessSystem) res.setdefault(ctype, []).append(movie) res.get(ctype, []).sort() return {'data': res, 'info sets': infosets} def _search_keyword(self, keyword, results): constr = OR(Keyword.q.phoneticCode == soundex(keyword.encode('ascii', 'ignore')), CONTAINSSTRING(Keyword.q.keyword, self.toUTF8(keyword))) return filterSimilarKeywords(keyword, _iterKeywords(Keyword.select(constr)))[:results] def _get_keyword(self, keyword, results): keyID = Keyword.select(Keyword.q.keyword == keyword) if keyID.count() == 0: return [] keyID = keyID[0].id movies = MovieKeyword.select(MovieKeyword.q.keywordID == keyID)[:results] return [(m.movieID, get_movie_data(m.movieID, self._kind)) for m in movies] def _get_top_bottom_movies(self, kind): if kind == 'top': kind = 'top 250 rank' elif kind == 'bottom': # Not a refuse: the plain text data files contains only # the bottom 10 movies. kind = 'bottom 10 rank' else: return [] infoID = InfoType.select(InfoType.q.info == kind) if infoID.count() == 0: return [] infoID = infoID[0].id movies = MovieInfoIdx.select(MovieInfoIdx.q.infoTypeID == infoID) ml = [] for m in movies: minfo = get_movie_data(m.movieID, self._kind) for k in kind, 'votes', 'rating', 'votes distribution': valueDict = getSingleInfo(MovieInfoIdx, m.movieID, k, notAList=True) if k in (kind, 'votes') and k in valueDict: valueDict[k] = int(valueDict[k]) elif k == 'rating' and k in valueDict: valueDict[k] = float(valueDict[k]) minfo.update(valueDict) ml.append((m.movieID, minfo)) sorter = (_cmpBottom, _cmpTop)[kind == 'top 250 rank'] ml.sort(sorter) return ml def __del__(self): """Ensure that the connection is closed.""" if not hasattr(self, '_connection'): return self._sql_logger.debug('closing connection to the database') self._connection.close()