Source code for pipelet.repository

## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr>  <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html

from glob import glob
from os import path
import logging
from directive import Depend
import subprocess

logger = logging.getLogger('scheduler')


def get_git_revision(path):
    """ Return Git revision if any.
    """
    rev = ""
    try:
        P = subprocess.Popen(["git", "rev-parse", "HEAD"],
                             stdout=subprocess.PIPE, cwd=path)
        (rev, jk) = P.communicate()
    except:
        pass
    return rev


class RepositoryError(Exception):
    """ To avoid catching Exception
    """
    def __init__(self, value):
        self.parameter = value

    def __str__(self):
        return repr(self.parameter)


[docs]class Repository: """ Location of the segment's source code. A reposiroty is the location where to find the segment's source code files. It returns the full path of all files that will be used to execute the segment (code, hook, ...). By default, local repositories are used, but it can also be a version control system (git, CVS, ...). """ def __init__(self, lstseg, permissive=False): """ Initialize a repository. If permissive is False raise an error when segment script is not found. Otherwise default to an empty script >>> r = Repository([], permissive=True) """ ## dict, code string corresponding to each segment self._code = {} ## dict, code string corresponding to each hook self._hook = {} ## dict, code string corresponding to each external dependency self._deps = {} ## dict, code string self._all_string = {} self._fill_dict(lstseg) @staticmethod def _match_fn(fns, seg, hook=None): """ Find filename matching pipelet rules for a given segment. >>> print Repository._match_fn( ['first.py'], 'first111') first.py """ if hook is None: for s in range(len(seg)): def match(x): return path.basename(x) == seg[0:len(seg) - s] + '.py' fn = filter(match, fns) if len(fn) > 0: return fn[0] else: for s in range(len(seg)): for h in range(len(hook)): def match(x): return path.basename(x) == seg[0:len(seg) - s]\ + '_' + hook[0:len(hook) - h] + '.py' fn = filter(match, fns) if len(fn) > 0: return fn[0] raise RepositoryError('No source file corresponding to segment ' '%s and hook %s' % (seg, hook)) def _fill_dict(self, lstseg): """ Initialize all dictionnaries. """ fns = self.get_fns() for s in lstseg: ## code try: fn = self._match_fn(fns, s) except RepositoryError, e: logger.warning(str(e)) if not self.permissive: raise e else: fn = "" if not fn: source = "" logger.warning("Assuming empty script for segment %s" % s) else: source = self.get_code_source(fn) self._code[s] = (source, fn) # compile(source, fn, "exec") self._all_string[s] = source ## hooks lsthook = self._get_hook_list(source) self._hook[s] = {} for h in lsthook: try: fn = self._match_fn(fns, s, h) source = self.get_code_source(fn) except RepositoryError, e: # No source file logger.warning(str(e)) source = "" fn = "" self._hook[s][h] = (source, fn) # compile(source, fn, "exec") self._all_string[s] += source ## deps depend = self.get_directive(Depend, s) self._deps[s] = {} for fn in depend.deps: source = self.get_file_content(fn) self._deps[s][fn] = source self._all_string[s] += source
[docs] def get_all_string(self, seg): """ Return all code as a string. """ return self._all_string[seg]
[docs] def get_directive(self, Direct, seg): """ Initialize a directive object from segment seg. >>> r = LocalRepository(['fourth'], "./test/first_test") >>> print r.get_directive (Multiplex, 'fourth').__class__ pipelet.directive.Multiplex """ c = self._all_string[seg] d = Direct() for l in c.splitlines(): try: if l[0] != "#": continue except IndexError: continue try: d.parse(l) except StopIteration: pass return d
@staticmethod def _get_hook_list(code): """ Return the list of hooks by parsing the code source. >>> print Repository._get_hook_list ("a=1\nhook('thehook',globals())\n") ['thehook'] """ hook_list = [] for l in code.split("hook(")[1:]: hook_list.append(l.split(",")[0][1:-1]) return hook_list
[docs] def get_hook_list(self, seg): """ Return the list of hooks for a segment """ return self._hook[seg].keys()
[docs] def get_code_string(self, seg): """ Return segment code as a string. This is done to prevent file changes after hashkey computation. """ return self._code[seg]
[docs] def get_deps_string(self, seg): """ Return external dependency code as a string. This is done to prevent file changes after hashkey computation. """ return self._deps[seg]
[docs] def get_hook_string(self, seg, hook): """ Return hook code as a string. This is done to prevent file changes after hashkey computation. """ return self._hook[seg][hook]
[docs] def get_docline(self, seg): """ Return the segment synopsis doc line. """ import pydoc return pydoc.source_synopsis(self._code[seg].co_filename)
@staticmethod
[docs] def get_file_content(filename): """ Read content of a given file This is the default to read dependencies of a segment. >>> print Repository.get_file_content ('./test/first_test/third.py')[0:17] adj = 'the_brave' """ fid = open(path.expandvars(path.expanduser(filename)), "r") code = fid.read() fid.close() return code
[docs] def get_fns(self): """ Return filename candidates. """ return []
[docs]class LocalRepository(Repository): """A local repository is made of a main directory where to find segment's code file. """ def __init__(self, lst_seg, src_path, permissive=False): """ Initialize a local repository. >>> r = LocalRepository(['first','second'], "./test/first_test", permissive=True) >>> print len(r._code) 2 """ ## dict, code string corresponding to each segment self._code = {} self._hook = {} self._deps = {} self._all_string = {} self.permissive = permissive ## string, where to find segment's source code file self.src_path = path.expanduser(path.expandvars(src_path)) self._rev = get_git_revision(self.src_path) self._fill_dict(lst_seg) @staticmethod
[docs] def get_code_source(filename): """ Read source code from file >>> print LocalRepository.get_code_source ('./test/first_test/third.py')[0:17] adj = 'the_brave' """ fid = open(filename, "r") code = fid.read() fid.close() # if len(self._rev)>0: # code = "### %s: %s\n%s"%(path.basename(filename), self._rev , code) return code
[docs] def get_fns(self): """ Return filename candidates. >>> r = LocalRepository(['first','second'], "./test/first_test", permissive=True) >>> print r.get_fns() ['./test/first_test/second.py', './test/first_test/fourth.py', './test/first_test/third.py', './test/first_test/main.py', './test/first_test/first.py'] """ return glob(path.join(self.src_path, '*.py'))
class GitRepository(LocalRepository): """ A Git repository. """ def __init__(self): pass def get_code_source(self, filename): pass def get_fns(self): pass class CVSRepository(Repository): """ A CVS repository. """ def __init__(self): pass def get_code_source(self, filename): pass def get_fns(self): pass if __name__ == "__main__": import doctest doctest.testmod()

Table Of Contents