Source code for mchartanalyzer.databasehandler

import os
import sqlite3
import traceback
from datetime import datetime

from . import constants
from .objects.artistdata import ArtistData
from .objects.songdata import SongData
from .objects.chartdata import ChartData
from .objects.artistcalculations import ArtistCalculations
from .objects.chartcalculations import ChartCalculations

[docs]class DatabaseHandler: """ Class responsible for handling database operations. """ def __init__(self, testMode=None): self.dbConnection = None self.dbOpened = False def _connect(self): if not self.dbOpened: # This also creates a database if it doesn't exist! self.dbConnection = sqlite3.connect(constants.DATABASE_FILE_PATH) self.dbOpened = True return self.dbConnection.cursor() def _commit(self): self.dbConnection.commit() def _close(self): if self.dbOpened: self.dbConnection.close() self.dbOpened = False def _commitAndClose(self): if self.dbOpened: self.dbConnection.commit() self.dbConnection.close() self.dbOpened = False
[docs] def _executeOperation(self, statement, keepConnectionOpen=None): """ Executes a database operation, and doesn't return a value. Intended for insert/update/delete operations. :param query: """ try: c = self._connect() # print(" " + repr(statement)) c.execute(statement) except sqlite3.IntegrityError as exc: print(repr(exc)) print(traceback.format_exc()) except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: if keepConnectionOpen: self._commit() else: self._commitAndClose()
[docs] def _executeQuery(self, query, keepConnectionOpen=None): """ Executes a query, and returns any retrieved rows. For queries that return no rows, this function will return an empty list. Intended for select statements. :param query: :return: rows """ try: c = self._connect() # print(" " + repr(query)) if type(query) is tuple: c.execute(query[0], query[1]) else: c.execute(query) rows = c.fetchall() if rows is None: return [] else: return rows except sqlite3.IntegrityError as exc: print(repr(exc)) print(traceback.format_exc()) except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: if keepConnectionOpen: self._commit() else: self._commitAndClose()
[docs] def saveArtistData(self, artistData): """ Saves artist data to the database. """ insertStmt = "INSERT INTO ARTISTS('name','source_names','source_urls','update_time') VALUES (\'{artistName}\', \'{sourceNames}\', \'{sourceUrls}\', \'{updateTime}\')" updateStmt = "UPDATE ARTISTS SET 'source_names' = \'{sourceNames}\', 'source_urls' = \'{sourceUrls}\', 'update_time' = \'{updateTime}\' WHERE name=\'{artistName}\'" try: c = self._connect() existingArtist = self.getArtistByName(artistData.name, keepConnectionOpen=True) timestampStr = datetime.now().strftime(constants.DATETIME_FORMAT) if existingArtist: # Artist with this name already exists. Update it. finalQuery = updateStmt.format(artistName=artistData.name, sourceNames=artistData.getSourceNamesAsString(), sourceUrls=artistData.getSourceUrlsAsString(), updateTime=timestampStr) # print("Running query: " + finalQuery) c.execute(finalQuery) else: # Artist does not exist yet. Insert new record. finalQuery = insertStmt.format(artistName=artistData.name, sourceNames=artistData.getSourceNamesAsString(), sourceUrls=artistData.getSourceUrlsAsString(), updateTime=timestampStr) # print("Running query: " + finalQuery) c.execute(finalQuery) except sqlite3.IntegrityError as exc: print(repr(exc)) except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: self._commitAndClose()
[docs] def saveSongData(self, artistData, songData): """ Saves song data to the database. If a song with the same name exists in the database, nothing happens. """ insertStmt = "INSERT INTO SONGS(artist_id, title, definitive_chart_id, update_time) VALUES ({artistId}, \'{songTitle}\', {definitiveChartId}, \'{updateTime}\')" existingArtist = self.getArtistByName(artistData.name, keepConnectionOpen=True) existingSong = self.getSongByTitleAndArtistName(songData.title, artistData.name, keepConnectionOpen=True) timestampStr = datetime.now().strftime(constants.DATETIME_FORMAT) defChartId = songData.definitiveChartId if songData.definitiveChartId > 0 else 'NULL' if not existingSong: # Song does not exist yet. Insert new record. finalQuery = insertStmt.format(artistId=existingArtist.id, songTitle=songData.title, definitiveChartId=defChartId, updateTime=timestampStr) self._executeOperation(finalQuery)
[docs] def saveChartData(self, artistData, songData, chartData, isDefinitiveChart): """ Saves chart data to the database. If a chart with the same URL exists, the existing record is updated with the newer chords and sections. """ insertStmt = "INSERT INTO CHARTS('song_id','source_url','chords_specific','sections','is_new','update_time') VALUES ({songId}, \'{url}\', \'{chordList}\', \'{sectionList}\', 1, \'{updateTime}\')" updateStmt = "UPDATE CHARTS SET 'chords_specific' = \'{chordList}\', 'sections' = \'{sectionList}\', 'is_new' = 1, 'update_time' = \'{updateTime}\' WHERE source_url=\'{url}\'" updateSongDefinitive = "UPDATE SONGS SET definitive_chart_id = (SELECT id FROM CHARTS WHERE source_url = \'{sourceUrl}\') WHERE id = {songId}" try: c = self._connect() existingChart = self.getChartByUrl(chartData.source, keepConnectionOpen=True) existingSong = self.getSongByTitleAndArtistName(songData.title, artistData.name, keepConnectionOpen=True) timestampStr = datetime.now().strftime(constants.DATETIME_FORMAT) if existingChart: # Chart from this source already exists. Update it. c.execute(updateStmt.format(url=chartData.source, chordList=chartData.getChordListString(), sectionList=chartData.getSectionListString(), updateTime=timestampStr)) else: # Chart does not exist yet. Insert new record. c.execute(insertStmt.format(songId=existingSong.id, url=chartData.source, chordList=chartData.getChordListString(), sectionList=chartData.getSectionListString(), updateTime=timestampStr)) if isDefinitiveChart: c.execute(updateSongDefinitive.format(sourceUrl=chartData.source, songId=existingSong.id)) except sqlite3.IntegrityError as exc: print(repr(exc)) except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: self._commitAndClose()
[docs] def saveChartCalculationData(self, chartData, chartCalcs): """ Saves chart calculations to the database. """ insertCalcsStmt = "INSERT INTO CHART_CALCS('chart_id', 'key', 'key_certainty', 'chords_general', 'num_chords', 'num_sections', 'update_time') VALUES ({chartId}, \'{key}\', \'{keyCertainty}\', \'{chordsGeneral}\', {numChords}, {numSections}, \'{updateTime}\')" updateCalcsStmt = "UPDATE CHART_CALCS SET 'key' = \'{key}\', 'key_certainty' = \'{keyCertainty}\', 'chords_general' = \'{chordsGeneral}\', 'num_chords' = {numChords}, 'num_sections' = {numSections}, 'update_time' = \'{updateTime}\' WHERE chart_id={chartId}" updateChartStmt = "UPDATE CHARTS SET is_new = 0, update_time = \'{updateTime}\' WHERE id={chartId}" selectCalcsStmt = "SELECT * from CHART_CALCS WHERE chart_id = {chartId}" try: c = self._connect() c.execute(selectCalcsStmt.format(chartId=chartData.id)) existingChartCalc = c.fetchone() timestampStr = datetime.now().strftime(constants.DATETIME_FORMAT) if existingChartCalc: # Chart from this source already exists. Update it. c.execute(updateCalcsStmt.format(chartId=chartData.id, key=chartCalcs.key, keyCertainty=chartCalcs.keyAnalysisCertainty, chordsGeneral=chartCalcs.getChordListString(), numChords=chartCalcs.numChords, numSections=chartCalcs.numSections, updateTime=timestampStr)) else: # Chart does not exist yet. Insert new record. c.execute(insertCalcsStmt.format(chartId=chartData.id, key=chartCalcs.key, keyCertainty=chartCalcs.keyAnalysisCertainty, chordsGeneral=chartCalcs.getChordListString(), numChords=chartCalcs.numChords, numSections=chartCalcs.numSections, updateTime=timestampStr)) # Update the chart, toggle "is_new" to 0 c.execute(updateChartStmt.format(chartId=chartData.id, updateTime=timestampStr)) except sqlite3.IntegrityError as exc: print(repr(exc)) except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: self._commitAndClose()
[docs] def saveArtistCalculationData(self, chartData): """ Saves artist calculations to the database. """ try: c = self._connect() # ... except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: self._commitAndClose()
[docs] def getArtistByName(self, artistName, keepConnectionOpen=None): """ Retrieves an artist with the given name. Returns "None" if a record isn't found. """ query = ("SELECT * FROM ARTISTS WHERE name = ?", (artistName,)) artistRows = self._executeQuery(query, keepConnectionOpen) if len(artistRows) > 0: newArtistData = ArtistData(databaseRow=artistRows[0]) return newArtistData else: return None
[docs] def getSongByTitleAndArtistName(self, title, artistName, keepConnectionOpen=None): """ Retrieves a song with the given title. Returns "None" if a record isn't found. """ query = ("SELECT SONGS.* FROM SONGS INNER JOIN ARTISTS ON SONGS.artist_id = ARTISTS.id WHERE SONGS.title = ? AND ARTISTS.name = ?", (title, artistName)) songRows = self._executeQuery(query, keepConnectionOpen) if len(songRows) > 0: newSongData = SongData(databaseRow=songRows[0]) # print("getSongByTitleAndArtistName(" + title + ", " + artistName + ") = " + str(newSongData)) return newSongData else: return None
[docs] def getSongById(self, songId, keepConnectionOpen=None): """ Retrieves a song with the given ID. Returns "None" if a record isn't found. """ query = ("SELECT * FROM SONGS WHERE id = ?", (songId,)) songRows = self._executeQuery(query, keepConnectionOpen) if len(songRows) > 0: newSongData = SongData(databaseRow=songRows[0]) return newSongData else: return None
[docs] def getSongsByArtist(self, artistData): """ Retrieves all songs by the given artist. Returns an empty list if a record isn't found. """ songRecords = [] query = ("SELECT SONGS.*, CHART_CALCS.key FROM SONGS INNER JOIN CHARTS ON SONGS.id = CHARTS.song_id INNER JOIN CHART_CALCS ON CHARTS.id = CHART_CALCS.chart_id WHERE artist_id = ? GROUP BY SONGS.id", (artistData.id,)) songRows = self._executeQuery(query) for row in songRows: newSongData = SongData(databaseRow=row) songRecords.append(newSongData) return songRecords
[docs] def getChartsForSong(self, artistData, songData): """ Retrieves all charts for a given song. """ chartRecords = [] try: c = self._connect() existingSong = self.getSongByTitleAndArtistName(songData.title, artistData.name, keepConnectionOpen=True) c.execute("SELECT CHARTS.* FROM SONGS INNER JOIN CHARTS ON SONGS.id = CHARTS.song_id WHERE CHARTS.song_id = ?", (existingSong.id,)) for row in c: newChartData = ChartData(databaseRow=row) chartRecords.append(newChartData) except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: self._close() return chartRecords
[docs] def getChartByUrl(self, sourceUrl, keepConnectionOpen=None): """ Retrieves a chart with the given source URL. Returns "None" if a record isn't found. """ query = ("SELECT * FROM CHARTS WHERE source_url = ?", (sourceUrl,)) chartRows = self._executeQuery(query, keepConnectionOpen) if len(chartRows) > 0: newChartData = ChartData(databaseRow=chartRows[0]) return newChartData else: return None
[docs] def getChartById(self, chartId, keepConnectionOpen=None): """ Retrieves a chart with the given source URL. Returns "None" if a record isn't found. """ query = ("SELECT * FROM CHARTS WHERE id = ?", (chartId,)) chartRows = self._executeQuery(query, keepConnectionOpen) if len(chartRows) > 0: newChartData = ChartData(databaseRow=chartRows[0]) return newChartData else: return None
[docs] def getArtistsWithFreshCharts(self): """ Retrieves artists with charts that haven't been analyzed yet. Returns an empty list if there are no such artists """ artistRecords = [] rows = self._executeQuery("SELECT ARTISTS.* FROM ARTISTS INNER JOIN SONGS ON ARTISTS.id = SONGS.artist_id INNER JOIN CHARTS ON SONGS.id = CHARTS.song_id WHERE CHARTS.is_new != 0 GROUP BY ARTISTS.name") for row in rows: newArtistData = ArtistData(databaseRow=row) artistRecords.append(newArtistData) return artistRecords
[docs] def getAllArtists(self): """ Retrieves artists. Returns an empty list if there are no such artists """ artistRecords = [] rows = self._executeQuery("SELECT * FROM ARTISTS") for row in rows: newArtistData = ArtistData(databaseRow=row) artistRecords.append(newArtistData) return artistRecords
[docs] def getFreshChartsForArtist(self, artistName): """ For a given artist, retrieves charts that haven't been analyzed yet. Returns an empty list if there are no new charts. """ chartRecords = [] query = ("SELECT CHARTS.* FROM ARTISTS INNER JOIN SONGS ON ARTISTS.id = SONGS.artist_id INNER JOIN CHARTS ON SONGS.id = CHARTS.song_id WHERE CHARTS.is_new != 0 AND ARTISTS.name = ?", (artistName.upper(),)) rows = self._executeQuery(query) for row in rows: newChartData = ChartData(databaseRow=row) chartRecords.append(newChartData) return chartRecords
[docs] def getDefinitiveChartsForArtist(self, artistName): """ For a given artist, retrieves their "definitive" charts. If no charts are found, this returns an empty list. """ chartRecords = [] query = ("SELECT CHARTS.* FROM ARTISTS INNER JOIN SONGS ON ARTISTS.id = SONGS.artist_id INNER JOIN CHARTS ON SONGS.id = CHARTS.song_id WHERE ARTISTS.name = ? AND CHARTS.id = SONGS.definitive_chart_id", (artistName.upper(),)) rows = self._executeQuery(query) for row in rows: newChartData = ChartData(databaseRow=row) chartRecords.append(newChartData) return chartRecords
[docs] def getDefinitiveChartCalcsForArtist(self, artistName): """ For a given artist, retrieves their "definitive" chart calculations. If no charts are found, this returns an empty list. """ chartCalcs = [] query = ("SELECT CHART_CALCS.* FROM ARTISTS INNER JOIN SONGS ON ARTISTS.id = SONGS.artist_id INNER JOIN CHARTS ON SONGS.id = CHARTS.song_id INNER JOIN CHART_CALCS ON CHARTS.id = CHART_CALCS.chart_id WHERE ARTISTS.name = ? AND CHARTS.id = SONGS.definitive_chart_id", (artistName.upper(),)) rows = self._executeQuery(query, keepConnectionOpen=True) for row in rows: newChartCalc = ChartCalculations(databaseRow=row) newChartCalc.chartData = self.getChartById(newChartCalc.chartId) chartCalcs.append(newChartCalc) return chartCalcs
[docs] def getAllChartsForArtist(self, artistName): """ For a given artist, retrieves all their charts. If no charts are found, this returns an empty list. """ chartRecords = [] query = ("SELECT CHARTS.* FROM ARTISTS INNER JOIN SONGS ON ARTISTS.id = SONGS.artist_id INNER JOIN CHARTS ON SONGS.id = CHARTS.song_id WHERE ARTISTS.name = ?", (artistName.upper(),)) rows = self._executeQuery(query) for row in rows: newChartData = ChartData(databaseRow=row) chartRecords.append(newChartData) return chartRecords
[docs] def initializeDatabase(self): """ Initializes database. Creates database file if it doesn't exist. If a database already exists, this function will delete it and recreate it! """ try: # delete database if it exists. os.remove(constants.DATABASE_FILE_PATH) print("Database file deleted!") except OSError as exc: print(repr(exc)) try: c = self._connect() print("Initializing Database!") c.execute("CREATE TABLE ARTISTS ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `name` TEXT UNIQUE, `source_names` TEXT, `source_urls` TEXT, `update_time` TEXT )") c.execute("CREATE TABLE `SONGS` ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `artist_id` INTEGER, `title` TEXT, `definitive_chart_id` INTEGER, `update_time` TEXT, FOREIGN KEY(`artist_id`) REFERENCES ARTISTS(id), FOREIGN KEY(`definitive_chart_id`) REFERENCES CHARTS(id) )") c.execute("CREATE TABLE CHARTS ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `song_id` INTEGER, `source_url` TEXT UNIQUE, `chords_specific` TEXT, `sections` TEXT, `is_new` INTEGER, `update_time` TEXT, FOREIGN KEY(`song_id`) REFERENCES `SONGS`(`id`) )") c.execute("CREATE TABLE \"ARTIST_CALCS\" ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `artist_id` INTEGER UNIQUE, `num_chords` INTEGER, `num_sections` INTEGER, `num_songs` INTEGER, `num_charts` INTEGER, `num_major` INTEGER, `num_minor` INTEGER, `common_keys` TEXT, `common_chords_spec` TEXT, `common_chords_gen` TEXT, `common_progs` TEXT, `common_structs` TEXT, `update_time` TEXT, FOREIGN KEY(`artist_id`) REFERENCES `ARTISTS`(`id`) )") c.execute("CREATE TABLE \"CHART_CALCS\" ( `id` INTEGER PRIMARY KEY AUTOINCREMENT, `chart_id` INTEGER UNIQUE, `key` TEXT, `key_certainty` TEXT, `chords_general` TEXT, `num_chords` INTEGER, `num_sections` INTEGER, `update_time` TEXT, FOREIGN KEY(`chart_id`) REFERENCES `CHARTS`(`id`) )") c.execute("CREATE TABLE `ARTISTS_DUPL_ASC` ( `id` INTEGER, `primary_artist_id` INTEGER, `duplicate_artist_id` INTEGER, PRIMARY KEY(`id`) )") c.execute("CREATE TABLE `SONGS_DUPL_ASC` ( `id` INTEGER, `primary_song_id` INTEGER, `duplicate_song_id` INTEGER, PRIMARY KEY(`id`) )") except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: self._commitAndClose()
[docs] def purgeDatabase(self): """ Removes all data from the database. """ try: c = self._connect() print("PURGING ALL DATA FROM THE DATABASE!") c.execute("DELETE FROM ARTISTS") c.execute("DELETE FROM ARTIST_CALCS") c.execute("DELETE FROM CHARTS") c.execute("DELETE FROM CHART_CALCS") c.execute("DELETE FROM SONGS") except Exception as exc: print("UNEXPECTED ERROR: " + repr(exc)) print(traceback.format_exc()) finally: self._commitAndClose()