## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import os
import os.path as path
from utils import get_hashkey, crc32
from utils import str_file, flatten
from contextlib import closing
from environment import Environment
from repository import LocalRepository
import re
[docs]class PipeException(Exception):
""" Extension of exception class.
"""
pass
def _str_join(strs):
""" Print a node.
>>> print(_str_join(('sega -> segb','segc','segd')))
sega -> segb
segc
segd
"""
l = [len(max(a.splitlines(), key=len)) for a in strs]
if l:
lmax = max(l)
else:
lmax = 0
s = ""
for a, l1 in zip(strs, l):
s += '\n'.join([' ' * (lmax - l1) + i for i in a.splitlines()])
s += ('\n')
return s[:-1]
def _str_red(a, b):
""" Print a sequence.
>>> print(_str_red('segb->segc', 'sega'))
sega->segb->segc
"""
if b:
return b + '->' + a
else:
return a
[docs]class Pipeline:
""" A segment oriented graph.
A pipeline is a list of code segments with their corresponding
repository and dependency tree.
Setting matplotlib to True will turn the matplotlib backend to Agg
in order to allow the execution in non-interactive environment.
"""
def __init__(self, seg_list, code_dir='./',
prefix='./', sqlfile=None,
matplotlib=False, matplotlib_interactive=False,
env=Environment, permissive=False):
""" Initialize a pipeline.
seg_list : either a list of string (the list of segment for a
linear pipeline), or a string description of the pipeline
using a dot-like syntax.
code_dir : string, the location of the segment source code
prefix : string, indicates where to save the pipeline products.
sqlfile: string, sql data base
matplotlib: boolean, if true, turn the matplotlib backend to Agg
in order to allow the execution in non-interactive environment.
matplotlib_interactive: same thing for interactive workers
env: extension of the EnvironmentBase class.
permissive: If set no error raise for segment not found.
>>> p = Pipeline('')
>>> print p._curr_dirs, p._parents, p._children
{} {} {}
>>> p = Pipeline('a;', code_dir=r'/test/', permissive=True)
>>> print p._curr_dirs, p._parents, p._children # doctest: +ELLIPSIS
{'a': '.../test/a_AAAAAAA'} {'a': []} {'a': []}
"""
if isinstance(seg_list, str):
self.from_dot(seg_list)
else:
self._make_tree(seg_list)
## string, the location of the segment source code
self.repository = None
if code_dir:
self.repository = LocalRepository(self._seg_list,
code_dir,
permissive=permissive)
## string, indicates where to save the pipeline products.
os.umask(002)
self._prefix = path.realpath(prefix)
if not os.path.exists(prefix):
os.mkdir(prefix)
logdir = os.path.join(prefix, "log")
if not os.path.exists(logdir):
os.mkdir(logdir)
## dict, directory corresponding to each segment
self._curr_dirs = {}
## dict, hash key corresponding to each segment
self._hashes = {}
self.compute_hash()
## string, sql data base
if sqlfile is None:
sqlfile = path.join(prefix, ".sqlstatus")
self.sqlfile = path.expanduser(path.expandvars(sqlfile))
## boolean, if true, turn the matplotlib backend to Agg
self.matplotlib = matplotlib
## boolean, if true, turn the matplotlib backend to Agg
self.matplotlib_interactive = matplotlib_interactive
## string, pipe name.
self.name = "pipelet"
## Environment Base extension
self.env = env
## input dictionnary (for orphan segments)
self._input = {}
def __str__(self):
""" Print the segment tree.
>>> print(Pipeline('first->second->fourth\\nthird->fourth',
... code_dir = '../test/', permissive=True)
... ) #doctest: +NORMALIZE_WHITESPACE
first -> second ->
third -> fourth ->
"""
s = ""
for k, v in self._children.iteritems():
if not v:
s += self.strseg(k)
return s
[docs] def to_dot(self, filename=None):
""" Return a string representation of the graph.
Writen using a subsample of the graphviz dot syntax.
>>> p = Pipeline("")
>>> p.from_dot('''a->b->c;
... d->c;''')
>>> print p.to_dot()
digraph pipelet {
b -> c;
d -> c;
a -> b;
}
"""
s = "digraph %s {\n" % self.name
for k, v in self._parents.iteritems():
for p in v:
s += '%s -> %s;\n' % (p, k)
s += '}'
if filename:
with closing(file(filename, 'w')) as f:
f.write(s)
return s
[docs] def from_dot(self, s):
""" Parse a subset of the graphviz dot language.
>>> p = Pipeline('')
>>> p.from_dot('a;')
>>> print p._parents == {'a':[]}, p._children == {'a':[]}
True True
>>> p.from_dot('''a->b;
... c->b;
... c->d;''')
>>> print p._children
{'a': ['b'], 'c': ['b', 'd'], 'b': [], 'd': []}
>>> print p._parents
{'a': [], 'c': [], 'b': ['a', 'c'], 'd': ['c']}
"""
_edge = re.compile('(\w+)\s*->\s*(\w+)')
c = 0
a = _edge.search(s)
sp = '->'
if not a:
_edge = re.compile('(\w+)\s*;')
a = _edge.search(s)
sp = ';'
r = []
while(a):
r.append(a.groups())
c = s.find(sp, a.start())
a = _edge.search(s, c)
segs = set(flatten(r))
self._seg_list = segs
self._parents = dict([(seg, []) for seg in segs])
self._children = dict([(seg, []) for seg in segs])
for c in r:
if len(c) > 1:
self._parents[c[1]].append(c[0])
self._children[c[0]].append(c[1])
[docs] def strseg(self, seg):
""" Convert a segment to a string.
>>> p = Pipeline('a->b', permissive=True)
>>> print p.strseg('a')
a ->
>>> print p.strseg('b')
a -> b ->
"""
p = []
for k in self._parents[seg]:
p.append(self.strseg(k))
s = _str_join(p)
s += seg + " -> "
return s
[docs] def connect(self, seg1, p, seg2):
""" Connect two pipes.
First segment becomes parent of second segment.
seg1: string, segment name
p: pipeline object
seg2: string, segment name
"""
if isinstance(p, Pipeline):
parents = p._parents
children = p._children
else:
children = dict(zip(p, [[s] for s in p[1:]] + [[]]))
parents = dict(zip(p, [[]] + [[s] for s in p[:-1]]))
self._parents.update(parents)
self._children.update(children)
self._parents[seg2].append(seg1)
self._children[seg1].append(seg2)
def _make_tree(self, seg_list):
""" Convert a segment list into a segment tree.
>>> a = Pipeline('')
>>> a._make_tree(['first','second'])
>>> print(a._parents)
{'second': ['first'], 'first': []}
>>> print(a._children)
{'second': [], 'first': ['second']}
"""
self._seg_list = seg_list
self._children = dict(zip(seg_list,
[[s] for s in seg_list[1:]] + [[]]))
self._parents = dict(zip(seg_list,
[[]] + [[s] for s in seg_list[:-1]]))
[docs] def get_parents(self, seg, nophantom=False):
""" Return a list of parent segments.
>>> a = Pipeline('''first->second->fourth
... third->fourth''',
... permissive=True)
>>> print(a.get_parents('fourth'))
['second', 'third']
"""
if not nophantom and not self._parents[seg]:
l = [seg + "phantom"]
else:
l = self._parents[seg]
return l
[docs] def get_childrens(self, seg):
""" Return a list of child segments.
"""
return self._children[seg]
[docs] def push(self, **args):
""" Add input to orphan segment.
"""
for k, v in args.iteritems():
if self._parents[k]:
raise PipeException("Segment %s not an orphan segment" % k)
if not isinstance(v, list):
raise TypeError("Segment input should be a list object")
self._input[k] = v
[docs] def flatten(self):
""" Return a generator delivering segments in execution order.
Perform a topological sort of the dependency graph. For this
to succeed the graph MUST be acyclic.
Iterative algorithm from Kahn (1962)
http://en.wikipedia.org/wiki/Topological_sorting
>>> T = Pipeline('''first->second->fourth;
... third->fourth;''', permissive=True)
>>> for e in T.flatten():
... print e
third
first
second
fourth
"""
S = [s for s, p in self._parents.iteritems() if not p]
# copy the graph
parents = self._parents.copy()
for k in parents: parents[k] = list(parents[k])
children = self._children.copy()
for k in children: children[k] = list(children[k])
while S:
s = S.pop(0)
yield s
while children[s]:
c = children[s].pop(0)
parents[c].remove(s)
if not parents[c]:
S.append(c)
for s, p in parents.iteritems():
if p:
raise PipeException("Dependency graph has "
"at least one cyle including "
"segment %s" % s)
def _compute_hash(self, seg):
""" Compute the hashkey for a given segment.
The hash function is fed with a cleaned version of the segment
code and args fatten by the list of subdirectories.
"""
hash = crc32()
currdir = self._prefix
#self.repository.save_all_string(seg)
for k in self._parents[seg]:
h, currdir = self._compute_hash(k)
hash.update(h)
s = self.repository.get_all_string(seg)
h = get_hashkey(s, hash).base32_digest()
self._hashes[seg] = h
currdir = path.join(currdir, '%s_%s' % (seg, h))
self._curr_dirs[seg] = currdir
return h, currdir
[docs] def compute_hash(self):
""" Compute the hashkey for all segments of the tree once and for all.
"""
for k, v in self._children.iteritems():
if not v:
self._compute_hash(k)
[docs] def get_full_seg_name(self, seg):
""" Return segment full name (segment name + hashkey).
"""
return '%s_%s' % (seg, self._hashes[seg])
def _match(self, seg, re):
""" Find the first segment which name matches regexp.
"""
if re.match(seg):
return seg
for k in self._parents[seg]:
s = self._match(k, re)
if s is not None:
return s
return None
[docs] def find_seg(self, seg, expr):
""" Find segment which name matches regexp.
"""
if expr in self._parents:
return expr
if expr == -1:
return self._parents[seg][0]
if isinstance(expr, str):
_re = re.compile(expr)
return self._match(seg, _re)
return None
# Manage directories
[docs] def get_curr_dir(self, seg):
""" Return the segment directory.
"""
return self._curr_dirs[seg]
[docs] def get_param_file(self, seg):
""" Return the segment directory.
"""
return path.join(self.get_curr_dir(seg), '%s.args' % seg)
[docs] def get_tag_file(self, seg):
""" Return the segment directory.
"""
return path.join(self.get_curr_dir(seg), 'stdout')
[docs] def get_data_dir(self, seg, prod=None):
""" Return the data directory for the segment or a product
full name.
"""
if prod is not None:
task_id = prod[1]
prod = str_file(prod[0])
if prod:
prod = "%d_%s" % (task_id, prod)
return '%s/data/%s' % (self._curr_dirs[seg], prod)
else:
return '%s/data' % self._curr_dirs[seg]
[docs] def get_log_dir(self):
""" Return the pipe log directory.
"""
return path.join(self._prefix, "log")
[docs] def get_log_file(self, seg):
""" Return the segment log filename.
"""
return path.join(self.get_curr_dir(seg), '%s.log' % seg)
if __name__ == "__main__":
import doctest
doctest.testmod()