Source code for pipelet.pipeline

## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html

import os
import os.path as path

from utils import get_hashkey, crc32
from utils import str_file, flatten
from contextlib import closing
from environment import Environment
from repository import LocalRepository
import re


[docs]class PipeException(Exception): """ Extension of exception class. """ pass
def _str_join(strs): """ Print a node. >>> print(_str_join(('sega -> segb','segc','segd'))) sega -> segb segc segd """ l = [len(max(a.splitlines(), key=len)) for a in strs] if l: lmax = max(l) else: lmax = 0 s = "" for a, l1 in zip(strs, l): s += '\n'.join([' ' * (lmax - l1) + i for i in a.splitlines()]) s += ('\n') return s[:-1] def _str_red(a, b): """ Print a sequence. >>> print(_str_red('segb->segc', 'sega')) sega->segb->segc """ if b: return b + '->' + a else: return a
[docs]class Pipeline: """ A segment oriented graph. A pipeline is a list of code segments with their corresponding repository and dependency tree. Setting matplotlib to True will turn the matplotlib backend to Agg in order to allow the execution in non-interactive environment. """ def __init__(self, seg_list, code_dir='./', prefix='./', sqlfile=None, matplotlib=False, matplotlib_interactive=False, env=Environment, permissive=False): """ Initialize a pipeline. seg_list : either a list of string (the list of segment for a linear pipeline), or a string description of the pipeline using a dot-like syntax. code_dir : string, the location of the segment source code prefix : string, indicates where to save the pipeline products. sqlfile: string, sql data base matplotlib: boolean, if true, turn the matplotlib backend to Agg in order to allow the execution in non-interactive environment. matplotlib_interactive: same thing for interactive workers env: extension of the EnvironmentBase class. permissive: If set no error raise for segment not found. >>> p = Pipeline('') >>> print p._curr_dirs, p._parents, p._children {} {} {} >>> p = Pipeline('a;', code_dir=r'/test/', permissive=True) >>> print p._curr_dirs, p._parents, p._children # doctest: +ELLIPSIS {'a': '.../test/a_AAAAAAA'} {'a': []} {'a': []} """ if isinstance(seg_list, str): self.from_dot(seg_list) else: self._make_tree(seg_list) ## string, the location of the segment source code self.repository = None if code_dir: self.repository = LocalRepository(self._seg_list, code_dir, permissive=permissive) ## string, indicates where to save the pipeline products. os.umask(002) self._prefix = path.realpath(prefix) if not os.path.exists(prefix): os.mkdir(prefix) logdir = os.path.join(prefix, "log") if not os.path.exists(logdir): os.mkdir(logdir) ## dict, directory corresponding to each segment self._curr_dirs = {} ## dict, hash key corresponding to each segment self._hashes = {} self.compute_hash() ## string, sql data base if sqlfile is None: sqlfile = path.join(prefix, ".sqlstatus") self.sqlfile = path.expanduser(path.expandvars(sqlfile)) ## boolean, if true, turn the matplotlib backend to Agg self.matplotlib = matplotlib ## boolean, if true, turn the matplotlib backend to Agg self.matplotlib_interactive = matplotlib_interactive ## string, pipe name. self.name = "pipelet" ## Environment Base extension self.env = env ## input dictionnary (for orphan segments) self._input = {} def __str__(self): """ Print the segment tree. >>> print(Pipeline('first->second->fourth\\nthird->fourth', ... code_dir = '../test/', permissive=True) ... ) #doctest: +NORMALIZE_WHITESPACE first -> second -> third -> fourth -> """ s = "" for k, v in self._children.iteritems(): if not v: s += self.strseg(k) return s
[docs] def to_dot(self, filename=None): """ Return a string representation of the graph. Writen using a subsample of the graphviz dot syntax. >>> p = Pipeline("") >>> p.from_dot('''a->b->c; ... d->c;''') >>> print p.to_dot() digraph pipelet { b -> c; d -> c; a -> b; } """ s = "digraph %s {\n" % self.name for k, v in self._parents.iteritems(): for p in v: s += '%s -> %s;\n' % (p, k) s += '}' if filename: with closing(file(filename, 'w')) as f: f.write(s) return s
[docs] def from_dot(self, s): """ Parse a subset of the graphviz dot language. >>> p = Pipeline('') >>> p.from_dot('a;') >>> print p._parents == {'a':[]}, p._children == {'a':[]} True True >>> p.from_dot('''a->b; ... c->b; ... c->d;''') >>> print p._children {'a': ['b'], 'c': ['b', 'd'], 'b': [], 'd': []} >>> print p._parents {'a': [], 'c': [], 'b': ['a', 'c'], 'd': ['c']} """ _edge = re.compile('(\w+)\s*->\s*(\w+)') c = 0 a = _edge.search(s) sp = '->' if not a: _edge = re.compile('(\w+)\s*;') a = _edge.search(s) sp = ';' r = [] while(a): r.append(a.groups()) c = s.find(sp, a.start()) a = _edge.search(s, c) segs = set(flatten(r)) self._seg_list = segs self._parents = dict([(seg, []) for seg in segs]) self._children = dict([(seg, []) for seg in segs]) for c in r: if len(c) > 1: self._parents[c[1]].append(c[0]) self._children[c[0]].append(c[1])
[docs] def strseg(self, seg): """ Convert a segment to a string. >>> p = Pipeline('a->b', permissive=True) >>> print p.strseg('a') a -> >>> print p.strseg('b') a -> b -> """ p = [] for k in self._parents[seg]: p.append(self.strseg(k)) s = _str_join(p) s += seg + " -> " return s
[docs] def connect(self, seg1, p, seg2): """ Connect two pipes. First segment becomes parent of second segment. seg1: string, segment name p: pipeline object seg2: string, segment name """ if isinstance(p, Pipeline): parents = p._parents children = p._children else: children = dict(zip(p, [[s] for s in p[1:]] + [[]])) parents = dict(zip(p, [[]] + [[s] for s in p[:-1]])) self._parents.update(parents) self._children.update(children) self._parents[seg2].append(seg1) self._children[seg1].append(seg2)
def _make_tree(self, seg_list): """ Convert a segment list into a segment tree. >>> a = Pipeline('') >>> a._make_tree(['first','second']) >>> print(a._parents) {'second': ['first'], 'first': []} >>> print(a._children) {'second': [], 'first': ['second']} """ self._seg_list = seg_list self._children = dict(zip(seg_list, [[s] for s in seg_list[1:]] + [[]])) self._parents = dict(zip(seg_list, [[]] + [[s] for s in seg_list[:-1]]))
[docs] def get_parents(self, seg, nophantom=False): """ Return a list of parent segments. >>> a = Pipeline('''first->second->fourth ... third->fourth''', ... permissive=True) >>> print(a.get_parents('fourth')) ['second', 'third'] """ if not nophantom and not self._parents[seg]: l = [seg + "phantom"] else: l = self._parents[seg] return l
[docs] def get_childrens(self, seg): """ Return a list of child segments. """ return self._children[seg]
[docs] def push(self, **args): """ Add input to orphan segment. """ for k, v in args.iteritems(): if self._parents[k]: raise PipeException("Segment %s not an orphan segment" % k) if not isinstance(v, list): raise TypeError("Segment input should be a list object") self._input[k] = v
[docs] def flatten(self): """ Return a generator delivering segments in execution order. Perform a topological sort of the dependency graph. For this to succeed the graph MUST be acyclic. Iterative algorithm from Kahn (1962) http://en.wikipedia.org/wiki/Topological_sorting >>> T = Pipeline('''first->second->fourth; ... third->fourth;''', permissive=True) >>> for e in T.flatten(): ... print e third first second fourth """ S = [s for s, p in self._parents.iteritems() if not p] # copy the graph parents = self._parents.copy() for k in parents: parents[k] = list(parents[k]) children = self._children.copy() for k in children: children[k] = list(children[k]) while S: s = S.pop(0) yield s while children[s]: c = children[s].pop(0) parents[c].remove(s) if not parents[c]: S.append(c) for s, p in parents.iteritems(): if p: raise PipeException("Dependency graph has " "at least one cyle including " "segment %s" % s)
def _compute_hash(self, seg): """ Compute the hashkey for a given segment. The hash function is fed with a cleaned version of the segment code and args fatten by the list of subdirectories. """ hash = crc32() currdir = self._prefix #self.repository.save_all_string(seg) for k in self._parents[seg]: h, currdir = self._compute_hash(k) hash.update(h) s = self.repository.get_all_string(seg) h = get_hashkey(s, hash).base32_digest() self._hashes[seg] = h currdir = path.join(currdir, '%s_%s' % (seg, h)) self._curr_dirs[seg] = currdir return h, currdir
[docs] def compute_hash(self): """ Compute the hashkey for all segments of the tree once and for all. """ for k, v in self._children.iteritems(): if not v: self._compute_hash(k)
[docs] def get_full_seg_name(self, seg): """ Return segment full name (segment name + hashkey). """ return '%s_%s' % (seg, self._hashes[seg])
def _match(self, seg, re): """ Find the first segment which name matches regexp. """ if re.match(seg): return seg for k in self._parents[seg]: s = self._match(k, re) if s is not None: return s return None
[docs] def find_seg(self, seg, expr): """ Find segment which name matches regexp. """ if expr in self._parents: return expr if expr == -1: return self._parents[seg][0] if isinstance(expr, str): _re = re.compile(expr) return self._match(seg, _re) return None # Manage directories
[docs] def get_curr_dir(self, seg): """ Return the segment directory. """ return self._curr_dirs[seg]
[docs] def get_param_file(self, seg): """ Return the segment directory. """ return path.join(self.get_curr_dir(seg), '%s.args' % seg)
[docs] def get_tag_file(self, seg): """ Return the segment directory. """ return path.join(self.get_curr_dir(seg), 'stdout')
[docs] def get_data_dir(self, seg, prod=None): """ Return the data directory for the segment or a product full name. """ if prod is not None: task_id = prod[1] prod = str_file(prod[0]) if prod: prod = "%d_%s" % (task_id, prod) return '%s/data/%s' % (self._curr_dirs[seg], prod) else: return '%s/data' % self._curr_dirs[seg]
[docs] def get_log_dir(self): """ Return the pipe log directory. """ return path.join(self._prefix, "log")
[docs] def get_log_file(self, seg): """ Return the segment log filename. """ return path.join(self.get_curr_dir(seg), '%s.log' % seg)
[docs] def get_meta_file(self, seg, prod=-1): """ Return the meta data filename This routine is called for segment meta data storage and task meta data storage. If the first case, meta data are stored in the segment curr_dir. In the second case, meta data are stored in the task directory (prod may be None) """ if (prod == -1): dirname = self.get_curr_dir(seg) else: dirname = self.get_data_dir(seg, prod) return path.join(dirname, '%s.meta' % seg)
if __name__ == "__main__": import doctest doctest.testmod()

Table Of Contents