## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import threading
from utils import str_date, make_dict
from contextlib import closing
import pickle
import multiplex
[docs]class Task:
""" A segment code associated with its input/output product(s).
A task is the association of a segment with its input product, its
execution status and its output product(s).
"""
def __init__(self, seg, task_input=[], status=None, task_output=[], id=None, queued_on=None, parents=[], str_input=None):
""" Initialize a task object.
"""
## string, a segment name
self.seg = seg
## pythonn object, an input product
self.task_input = task_input
## string, task status (done, queued, failed, running)
self.status = status
## list, task output product(s)
self.task_output = task_output
## integer, a uniq id for the task attributed by the tracker
self.id = id
## String, store the result of the computation
self.tag = None
## Dict of module:version to save on disk at the segment level
self.version = {}
## date_time object
self.queued_on = queued_on
## date_time object
self.begun_on = None
## date_time object
self.ended_on = None
## List of the task id whose output become the input of this task
self.parents = parents
## Dict of str_input of the parent tasks
self.str_parents = {}
## Dict of parameters to save on disk at the segment level
self.args = {}
## store the disk path
self.str_input = str_input
def __str__(self):
""" Convert task object to string.
"""
return 'seg: %s, status: %s, id: %s, input: %s, output: %s' \
% (self.seg, self.status, self.id, str(self.task_input), str(self.task_output))
[docs] def update_status(self, status):
""" Update task status.
"""
str_d = str_date()
self.status = status
if status == 'done':
self.ended_on = str_d
elif status == 'queued':
self.queued_on = str_d
else:
self.task_output = None
if status == 'running':
self.begun_on = str_d
[docs]class TaskList:
""" List of accomplished tasks per segment.
"""
def __init__(self):
""" Initialize a task list.
"""
## dict, task list
self._list={}
## lock on task list
self._void_task_lock = threading.Lock()
[docs] def pop(self, seg):
""" Remove and return stored products of a segment.
"""
with self._void_task_lock:
try:
return self._list[seg]
except KeyError:
return None
[docs] def push(self, t):
""" Add a task to the task list.
Parameters
----------
t : task object.
"""
if not isinstance(t, Task):
raise TypeError
if not self._list.has_key(t.seg):
self._list[t.seg] = []
self._list[t.seg].append(t)
[docs] def multiplex (self, seg, parent_list, directive):
""" Compute the result of a multiplex directive
Multiplex method is one of 'cross_prod', 'union', 'zip'.
"""
method = eval("multiplex.%s"%directive.method)
a = [] ## as many as parent_list
for p in parent_list:
b = []
try:
lst_p = self._list[p]
## no input for orphan segment
except KeyError:
lst_p = []
b = [(o, t.id, p) for t in lst_p for o in t.task_output]
#for t in lst_p:
# b = b+ [(o, t.id, p) for o in t.task_output]
a.append(b)
output_set = method (*a) ## output set is a tuple (prod, list of parent id, segname)
if directive.where:
f = multiplex._where(directive.where) ## get selection function
output_set = filter(f, output_set) ## apply selection
if directive.group_by:
output_set = multiplex._group(output_set, directive.group_by)
lst_task = [Task(seg, e[0], status='queued', parents=list(set(e[1]))) for e in output_set]
## make the list of task to push, input are dictionnaries
else:
lst_task = [Task(seg, make_dict([(r[2],r[0]) for r in e]), status='queued', parents=list(set([r[1] for r in e if r[1] is not None]))) for e in output_set]
return lst_task