Source code for pipelet.db_utils

try:
    import sqlite3
except ImportError:
    import pysqlite2 as sqlite3
from glob import glob
import os
from contextlib import closing
import pickle
import shutil
[docs]def old_to_new (old_file, new_file): """ Convert an old database to the new database format.""" conn1 = sqlite3.connect(old_file) conn2 = sqlite3.connect(new_file) # Creates tables in the new database with conn2: conn2.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT, curr_dir TEXT, tag TEXT, comment TEXT)') conn2.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER, status TEXT, input BLOB, output BLOB, str_prod TEXT)') conn2.execute('create table task_relations (father_id INTEGER, child_id INTEGER)') with conn1: l = conn1.execute('select distinct seg, curr_dir from tasks').fetchall() with conn2: for e in l: conn2.execute('insert into segments (seg, curr_dir) values (?, ?)', e) ls = conn2.execute('select seg_id, curr_dir from segments').fetchall() with conn2: for seg_id, curr_dir in ls: l = conn1.execute('select status, prod, res, str_prod from tasks where curr_dir==?',(curr_dir,)) for t in l: conn2.execute('insert into tasks (seg_id, status, input, output, str_prod) values (?, ?, ?, ?, ?)',(seg_id,)+t)
[docs]def change_root(sql_file,old_prefix, new_prefix): """ Update the data root path in the database.""" conn1 = sqlite3.connect(sql_file) with conn1: conn1.execute('update segments set curr_dir = replace(curr_dir, ?,?)', (old_prefix, new_prefix)) conn1.execute('update tasks set str_input = replace(str_input, ?,?)', (old_prefix, new_prefix))
[docs]def deltask(db_file, lst_task, report_only=False): """ Delete a tasks instances. Delete all products directories of a tasks instance. """ import shutil # remove everyboby conn = sqlite3.connect(db_file,check_same_thread=True) while lst_task: taskid = lst_task[0] with conn: print "removing task %s from db"%taskid str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0] lst_task.remove(taskid) # delete from tasks_relations if not report_only: l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),)) # mark child tasks for deletion children = conn.execute('select child_id from task_relations where father_id = ? ',(int(taskid),)).fetchall() # delete from tasks if not report_only: l = conn.execute('delete from tasks where task_id = ?',(int(taskid),)) print 'Task %s removed from db'%taskid else: print 'Task %s would be removed from db'%taskid #conn.close() try: print "Removing directory %s"%str_input if not report_only: shutil.rmtree(str_input) print "%s removed"%str_input else: print "%s would be removed"%str_input except OSError: print "Failed to remove %s"%str_input if children: print "Adding children of %s"%taskid lst_task += [c[0] for c in children] lst_task = list(set(lst_task)) lst_task.sort() conn.close() #if lst_task: # deltask(db_file, lst_task, report_only=report_only)
[docs]def get_lst_tag (db_file): """ Return the list of existing tags Tags are ; separated in the db. """ conn = sqlite3.connect(db_file,check_same_thread=True) conn.text_factory=str lst = [] with conn: l = conn.execute('select tag from segments').fetchall() for s in l: if s[0] is not None: str_tag = s[0].split(";") for e in str_tag: if e: lst.append(e) return lst
[docs]def get_lst_seg (db_file): """ Return the list of segment name """ conn = sqlite3.connect(db_file,check_same_thread=True) conn.text_factory=str lst = [] with conn: l = conn.execute('select distinct seg from segments').fetchall() lst = [s[0] for s in l] return lst
[docs]def get_lst_date (db_file): """ Return the list of existing dates Date strings are picked from queued_on field corresponding to the first task of each segment. """ conn = sqlite3.connect(db_file,check_same_thread=True) conn.text_factory=str with conn: l = conn.execute('select queued_on from tasks group by queued_on').fetchall() return l[0]
[docs]def add_tag (db_file, segid, tag): """ Add new tag to the database Check that the tag do not exist yet for this id """ seglst = segid.split('-') conn = sqlite3.connect(db_file,check_same_thread=True) conn.text_factory=str for segid in seglst: if segid: with conn: l = conn.execute('select tag,curr_dir from segments where seg_id=?',(segid,)).fetchone() lst_tag = [] if l[0] is not None and l: lst_tag = l[0].split(";") lst_tag.append(tag) lst_tag = list(set(lst_tag)) str_tag = ";".join(lst_tag) conn.execute('update segments set tag=? where seg_id=?',(str_tag,segid)) fn = glob(os.path.join(l[1], "*.meta")) _update_meta(fn, str_tag)
[docs]def get_last(db_file): """ Return the last computed segment in a given database. Return a seg-id """ conn = sqlite3.connect(db_file, check_same_thread=True) conn.text_factory=str l = conn.execute('select seg_id,max(ended_on) from tasks').fetchone() return l[0]
def _update_meta(fn, str_tag): """ Update meta file with current tag value. fn: string, meta file name tag: string, tag string """ if fn: with closing(file(fn[0], 'r')) as f: d = pickle.load(f) d['tag'] = str_tag with closing(file(fn[0], 'w')) as f: r = pickle.dump(d,f)
[docs]def del_tag (db_file, tag): """ Delete tag from the database """ conn = sqlite3.connect(db_file,check_same_thread=True) conn.text_factory=str print tag with conn: l = conn.execute('select seg_id, tag, curr_dir from segments where tag == ?',(tag,)).fetchall() for s in l: lst_tag = s[1].split(";") print lst_tag lst_tag.remove(tag) str_tag = ";".join(lst_tag) conn.execute('update segments set tag=? where seg_id=?',(str_tag,s[0])) fn = glob(os.path.join(s[2], "*.meta")) _update_meta(fn, str_tag)
def _delseg(db_file, lst_seg): """ Delete a pipeline instance. Delete all segments and products directories of a pipeline instance. """ ## remove everyboby conn = sqlite3.connect(db_file,check_same_thread=True) segid = lst_seg[0] with conn: currdir = conn.execute('select curr_dir from segments where seg_id = ?',(int(segid),)).fetchone()[0] l = conn.execute('select seg_id from segments where curr_dir like ?',(currdir+'%',)) for e in l: lst_seg.remove(e[0]) ## delete from tasks_relations l = conn.execute('delete from task_relations where child_id in (select task_id from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?))',(currdir+'%',)) ## delete from tasks l = conn.execute('delete from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',)) ## delete from segments_relations l = conn.execute('delete from segment_relations where child_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',)) ## delete from segments l = conn.execute('delete from segments where curr_dir like ?',(currdir+'%',)) conn.close() try: shutil.rmtree(currdir) except: pass if lst_seg: _delseg(db_file, lst_seg) def _deltask(db_file, lst_task): """ Delete a tasks instances. Delete all products directories of a tasks instance. """ ## remove everyboby conn = sqlite3.connect(db_file,check_same_thread=True) taskid = lst_task[0] with conn: str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0] lst_task.remove(taskid) ## delete from tasks_relations l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),)) ## delete from tasks l = conn.execute('delete from tasks where task_id = ?',(int(taskid),)) conn.close() try: shutil.rmtree(str_input) except: pass if lst_task: _deltask(db_file, lst_task) def _get_fathers(db_file, segid): """ Append upstream pipeline instance paths to the current path. Return a list which contains all upstream segment instances id for a given segment instance. This is used to print a pipeline tree view with all dependencies. """ lstid = [int(segid)] conn = sqlite3.connect(db_file,check_same_thread=True) with conn: fids = conn.execute( 'select father_id from segment_relations where child_id = ?' ,(segid,)).fetchall() conn.close() if fids: for l in fids: lstid += _get_fathers(db_file,l[0]) return lstid def _get_children(db_file, segid): """ Return the list of pipeline instance paths which depend on seg Return a list which contains all downstream segment instances id for a given segment instance. This is used to delete all dependencies. """ lstid = [int(segid)] conn = sqlite3.connect(db_file,check_same_thread=True) with conn: fids = conn.execute( 'select child_id from segment_relations where father_id = ?' ,(segid,)).fetchall() conn.close() if fids: for l in fids: lstid += _get_children(db_file, l[0]) return lstid def _get_children_task(db_file, taskid): """ Return the list of tasks which depend on task. Return a list which contains all downstream task instances id for a given task instance. This is used to delete all dependencies. """ lstid = [int(taskid)] conn = sqlite3.connect(db_file,check_same_thread=True) with conn: fids = conn.execute( 'select child_id from task_relations where father_id = ?' ,(taskid,)).fetchall() conn.close() if fids: for l in fids: lstid += _get_children_task(db_file, l[0]) return lstid

Table Of Contents