try:
import sqlite3
except ImportError:
import pysqlite2 as sqlite3
from glob import glob
import os
from contextlib import closing
import pickle
import shutil
[docs]def old_to_new (old_file, new_file):
""" Convert an old database to the new database format."""
conn1 = sqlite3.connect(old_file)
conn2 = sqlite3.connect(new_file)
# Creates tables in the new database
with conn2:
conn2.execute('create table segments (seg_id INTEGER PRIMARY KEY, seg TEXT, curr_dir TEXT, tag TEXT, comment TEXT)')
conn2.execute('create table tasks (task_id INTEGER PRIMARY KEY, seg_id INTEGER, status TEXT, input BLOB, output BLOB, str_prod TEXT)')
conn2.execute('create table task_relations (father_id INTEGER, child_id INTEGER)')
with conn1:
l = conn1.execute('select distinct seg, curr_dir from tasks').fetchall()
with conn2:
for e in l:
conn2.execute('insert into segments (seg, curr_dir) values (?, ?)', e)
ls = conn2.execute('select seg_id, curr_dir from segments').fetchall()
with conn2:
for seg_id, curr_dir in ls:
l = conn1.execute('select status, prod, res, str_prod from tasks where curr_dir==?',(curr_dir,))
for t in l:
conn2.execute('insert into tasks (seg_id, status, input, output, str_prod) values (?, ?, ?, ?, ?)',(seg_id,)+t)
[docs]def change_root(sql_file,old_prefix, new_prefix):
""" Update the data root path in the database."""
conn1 = sqlite3.connect(sql_file)
with conn1:
conn1.execute('update segments set curr_dir = replace(curr_dir, ?,?)', (old_prefix, new_prefix))
conn1.execute('update tasks set str_input = replace(str_input, ?,?)', (old_prefix, new_prefix))
[docs]def deltask(db_file, lst_task, report_only=False):
""" Delete a tasks instances.
Delete all products directories of a tasks instance.
"""
import shutil
# remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
while lst_task:
taskid = lst_task[0]
with conn:
print "removing task %s from db"%taskid
str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0]
lst_task.remove(taskid)
# delete from tasks_relations
if not report_only:
l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),))
# mark child tasks for deletion
children = conn.execute('select child_id from task_relations where father_id = ? ',(int(taskid),)).fetchall()
# delete from tasks
if not report_only:
l = conn.execute('delete from tasks where task_id = ?',(int(taskid),))
print 'Task %s removed from db'%taskid
else:
print 'Task %s would be removed from db'%taskid
#conn.close()
try:
print "Removing directory %s"%str_input
if not report_only:
shutil.rmtree(str_input)
print "%s removed"%str_input
else:
print "%s would be removed"%str_input
except OSError:
print "Failed to remove %s"%str_input
if children:
print "Adding children of %s"%taskid
lst_task += [c[0] for c in children]
lst_task = list(set(lst_task))
lst_task.sort()
conn.close()
#if lst_task:
# deltask(db_file, lst_task, report_only=report_only)
[docs]def get_lst_tag (db_file):
""" Return the list of existing tags
Tags are ; separated in the db.
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
lst = []
with conn:
l = conn.execute('select tag from segments').fetchall()
for s in l:
if s[0] is not None:
str_tag = s[0].split(";")
for e in str_tag:
if e:
lst.append(e)
return lst
[docs]def get_lst_seg (db_file):
""" Return the list of segment name
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
lst = []
with conn:
l = conn.execute('select distinct seg from segments').fetchall()
lst = [s[0] for s in l]
return lst
[docs]def get_lst_date (db_file):
""" Return the list of existing dates
Date strings are picked from queued_on field corresponding to the
first task of each segment.
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
with conn:
l = conn.execute('select queued_on from tasks group by queued_on').fetchall()
return l[0]
[docs]def add_tag (db_file, segid, tag):
""" Add new tag to the database
Check that the tag do not exist yet for this id
"""
seglst = segid.split('-')
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
for segid in seglst:
if segid:
with conn:
l = conn.execute('select tag,curr_dir from segments where seg_id=?',(segid,)).fetchone()
lst_tag = []
if l[0] is not None and l:
lst_tag = l[0].split(";")
lst_tag.append(tag)
lst_tag = list(set(lst_tag))
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,segid))
fn = glob(os.path.join(l[1], "*.meta"))
_update_meta(fn, str_tag)
[docs]def get_last(db_file):
""" Return the last computed segment in a given database.
Return a seg-id
"""
conn = sqlite3.connect(db_file, check_same_thread=True)
conn.text_factory=str
l = conn.execute('select seg_id,max(ended_on) from tasks').fetchone()
return l[0]
def _update_meta(fn, str_tag):
""" Update meta file with current tag value.
fn: string, meta file name
tag: string, tag string
"""
if fn:
with closing(file(fn[0], 'r')) as f:
d = pickle.load(f)
d['tag'] = str_tag
with closing(file(fn[0], 'w')) as f:
r = pickle.dump(d,f)
[docs]def del_tag (db_file, tag):
""" Delete tag from the database
"""
conn = sqlite3.connect(db_file,check_same_thread=True)
conn.text_factory=str
print tag
with conn:
l = conn.execute('select seg_id, tag, curr_dir from segments where tag == ?',(tag,)).fetchall()
for s in l:
lst_tag = s[1].split(";")
print lst_tag
lst_tag.remove(tag)
str_tag = ";".join(lst_tag)
conn.execute('update segments set tag=? where seg_id=?',(str_tag,s[0]))
fn = glob(os.path.join(s[2], "*.meta"))
_update_meta(fn, str_tag)
def _delseg(db_file, lst_seg):
""" Delete a pipeline instance.
Delete all segments and products directories of a pipeline instance.
"""
## remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
segid = lst_seg[0]
with conn:
currdir = conn.execute('select curr_dir from segments where seg_id = ?',(int(segid),)).fetchone()[0]
l = conn.execute('select seg_id from segments where curr_dir like ?',(currdir+'%',))
for e in l:
lst_seg.remove(e[0])
## delete from tasks_relations
l = conn.execute('delete from task_relations where child_id in (select task_id from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?))',(currdir+'%',))
## delete from tasks
l = conn.execute('delete from tasks where seg_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',))
## delete from segments_relations
l = conn.execute('delete from segment_relations where child_id in (select seg_id from segments where segments.curr_dir like ?)',(currdir+'%',))
## delete from segments
l = conn.execute('delete from segments where curr_dir like ?',(currdir+'%',))
conn.close()
try:
shutil.rmtree(currdir)
except:
pass
if lst_seg:
_delseg(db_file, lst_seg)
def _deltask(db_file, lst_task):
""" Delete a tasks instances.
Delete all products directories of a tasks instance.
"""
## remove everyboby
conn = sqlite3.connect(db_file,check_same_thread=True)
taskid = lst_task[0]
with conn:
str_input = conn.execute('select str_input from tasks where task_id = ?',(int(taskid),)).fetchone()[0]
lst_task.remove(taskid)
## delete from tasks_relations
l = conn.execute('delete from task_relations where child_id = ? ',(int(taskid),))
## delete from tasks
l = conn.execute('delete from tasks where task_id = ?',(int(taskid),))
conn.close()
try:
shutil.rmtree(str_input)
except:
pass
if lst_task:
_deltask(db_file, lst_task)
def _get_fathers(db_file, segid):
""" Append upstream pipeline instance paths to the current path.
Return a list which contains all upstream segment instances
id for a given segment instance. This is used to print a pipeline
tree view with all dependencies.
"""
lstid = [int(segid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select father_id from segment_relations where child_id = ?'
,(segid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_fathers(db_file,l[0])
return lstid
def _get_children(db_file, segid):
""" Return the list of pipeline instance paths which depend on seg
Return a list which contains all downstream segment instances
id for a given segment instance. This is used to delete all dependencies.
"""
lstid = [int(segid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select child_id from segment_relations where father_id = ?'
,(segid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_children(db_file, l[0])
return lstid
def _get_children_task(db_file, taskid):
""" Return the list of tasks which depend on task.
Return a list which contains all downstream task instances
id for a given task instance. This is used to delete all dependencies.
"""
lstid = [int(taskid)]
conn = sqlite3.connect(db_file,check_same_thread=True)
with conn:
fids = conn.execute(
'select child_id from task_relations where father_id = ?'
,(taskid,)).fetchall()
conn.close()
if fids:
for l in fids:
lstid += _get_children_task(db_file, l[0])
return lstid