Source code for pipelet.task

## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr>  <betoule@apc.univ-paris7.fr>
## 
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html

import threading
from utils import str_date, make_dict
from contextlib import closing
import pickle
import multiplex

[docs]class Task: """ A segment code associated with its input/output product(s). A task is the association of a segment with its input product, its execution status and its output product(s). """ def __init__(self, seg, task_input=[], status=None, task_output=[], id=None, queued_on=None, parents=[], str_input=None): """ Initialize a task object. """ ## string, a segment name self.seg = seg ## pythonn object, an input product self.task_input = task_input ## string, task status (done, queued, failed, running) self.status = status ## list, task output product(s) self.task_output = task_output ## integer, a uniq id for the task attributed by the tracker self.id = id ## String, store the result of the computation self.tag = None ## Dict of module:version to save on disk at the segment level self.version = {} ## date_time object self.queued_on = queued_on ## date_time object self.begun_on = None ## date_time object self.ended_on = None ## List of the task id whose output become the input of this task self.parents = parents ## Dict of str_input of the parent tasks self.str_parents = {} ## Dict of parameters to save on disk at the segment level self.args = {} ## store the disk path self.str_input = str_input def __str__(self): """ Convert task object to string. """ return 'seg: %s, status: %s, id: %s, input: %s, output: %s' \ % (self.seg, self.status, self.id, str(self.task_input), str(self.task_output))
[docs] def update_status(self, status): """ Update task status. """ str_d = str_date() self.status = status if status == 'done': self.ended_on = str_d elif status == 'queued': self.queued_on = str_d else: self.task_output = None if status == 'running': self.begun_on = str_d
[docs] def store_meta(self, fn): """ Store meta information for task. This is used to rebuild db. """ with closing(file(fn, 'w')) as f: r = pickle.dump(dict({'id':self.id, 'parents':self.str_parents, 'queued_on':self.queued_on, 'ended_on':self.ended_on, 'begun_on':self.begun_on, 'input':pickle.dumps(self.task_input), 'output':pickle.dumps(self.task_output), 'status':self.status}),f)
[docs]class TaskList: """ List of accomplished tasks per segment. """ def __init__(self): """ Initialize a task list. """ ## dict, task list self._list={} ## lock on task list self._void_task_lock = threading.Lock()
[docs] def pop(self, seg): """ Remove and return stored products of a segment. """ with self._void_task_lock: try: return self._list[seg] except KeyError: return None
[docs] def push(self, t): """ Add a task to the task list. Parameters ---------- t : task object. """ if not isinstance(t, Task): raise TypeError if not self._list.has_key(t.seg): self._list[t.seg] = [] self._list[t.seg].append(t)
[docs] def multiplex (self, seg, parent_list, directive): """ Compute the result of a multiplex directive Multiplex method is one of 'cross_prod', 'union', 'zip'. """ method = eval("multiplex.%s"%directive.method) a = [] ## as many as parent_list for p in parent_list: b = [] try: lst_p = self._list[p] ## no input for orphan segment except KeyError: lst_p = [] b = [(o, t.id, p) for t in lst_p for o in t.task_output] #for t in lst_p: # b = b+ [(o, t.id, p) for o in t.task_output] a.append(b) output_set = method (*a) ## output set is a tuple (prod, list of parent id, segname) if directive.where: f = multiplex._where(directive.where) ## get selection function output_set = filter(f, output_set) ## apply selection if directive.group_by: output_set = multiplex._group(output_set, directive.group_by) lst_task = [Task(seg, e[0], status='queued', parents=list(set(e[1]))) for e in output_set] ## make the list of task to push, input are dictionnaries else: lst_task = [Task(seg, make_dict([(r[2],r[0]) for r in e]), status='queued', parents=list(set([r[1] for r in e if r[1] is not None]))) for e in output_set] return lst_task

Table Of Contents