## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
from glob import glob
from os import path
import logging
from directive import Depend
import subprocess
logger = logging.getLogger('scheduler')
def get_git_revision(path):
""" Return Git revision if any.
"""
rev = ""
try:
P = subprocess.Popen(["git", "rev-parse", "HEAD"],
stdout=subprocess.PIPE, cwd=path)
(rev, jk) = P.communicate()
except:
pass
return rev
class RepositoryError(Exception):
""" To avoid catching Exception
"""
def __init__(self, value):
self.parameter = value
def __str__(self):
return repr(self.parameter)
[docs]class Repository:
""" Location of the segment's source code.
A reposiroty is the location where to find the segment's source
code files. It returns the full path of all files that will be
used to execute the segment (code, hook, ...). By default,
local repositories are used, but it can also be a version control
system (git, CVS, ...).
"""
def __init__(self, lstseg, permissive=False):
""" Initialize a repository.
If permissive is False raise an error when segment script is
not found. Otherwise default to an empty script
>>> r = Repository([], permissive=True)
"""
## dict, code string corresponding to each segment
self._code = {}
## dict, code string corresponding to each hook
self._hook = {}
## dict, code string corresponding to each external dependency
self._deps = {}
## dict, code string
self._all_string = {}
self._fill_dict(lstseg)
@staticmethod
def _match_fn(fns, seg, hook=None):
""" Find filename matching pipelet rules for a given segment.
>>> print Repository._match_fn( ['first.py'], 'first111')
first.py
"""
if hook is None:
for s in range(len(seg)):
def match(x):
return path.basename(x) == seg[0:len(seg) - s] + '.py'
fn = filter(match, fns)
if len(fn) > 0:
return fn[0]
else:
for s in range(len(seg)):
for h in range(len(hook)):
def match(x):
return path.basename(x) == seg[0:len(seg) - s]\
+ '_' + hook[0:len(hook) - h] + '.py'
fn = filter(match, fns)
if len(fn) > 0:
return fn[0]
raise RepositoryError('No source file corresponding to segment '
'%s and hook %s' % (seg, hook))
def _fill_dict(self, lstseg):
""" Initialize all dictionnaries.
"""
fns = self.get_fns()
for s in lstseg:
## code
try:
fn = self._match_fn(fns, s)
except RepositoryError, e:
logger.warning(str(e))
if not self.permissive:
raise e
else:
fn = ""
if not fn:
source = ""
logger.warning("Assuming empty script for segment %s" % s)
else:
source = self.get_code_source(fn)
self._code[s] = (source, fn) # compile(source, fn, "exec")
self._all_string[s] = source
## hooks
lsthook = self._get_hook_list(source)
self._hook[s] = {}
for h in lsthook:
try:
fn = self._match_fn(fns, s, h)
source = self.get_code_source(fn)
except RepositoryError, e: # No source file
logger.warning(str(e))
source = ""
fn = ""
self._hook[s][h] = (source, fn) # compile(source, fn, "exec")
self._all_string[s] += source
## deps
depend = self.get_directive(Depend, s)
self._deps[s] = {}
for fn in depend.deps:
source = self.get_file_content(fn)
self._deps[s][fn] = source
self._all_string[s] += source
[docs] def get_all_string(self, seg):
""" Return all code as a string.
"""
return self._all_string[seg]
[docs] def get_directive(self, Direct, seg):
""" Initialize a directive object from segment seg.
>>> r = LocalRepository(['fourth'], "./test/first_test")
>>> print r.get_directive (Multiplex, 'fourth').__class__
pipelet.directive.Multiplex
"""
c = self._all_string[seg]
d = Direct()
for l in c.splitlines():
try:
if l[0] != "#":
continue
except IndexError:
continue
try:
d.parse(l)
except StopIteration:
pass
return d
@staticmethod
def _get_hook_list(code):
""" Return the list of hooks by parsing the code source.
>>> print Repository._get_hook_list ("a=1\nhook('thehook',globals())\n")
['thehook']
"""
hook_list = []
for l in code.split("hook(")[1:]:
hook_list.append(l.split(",")[0][1:-1])
return hook_list
[docs] def get_hook_list(self, seg):
""" Return the list of hooks for a segment
"""
return self._hook[seg].keys()
[docs] def get_code_string(self, seg):
""" Return segment code as a string.
This is done to prevent file changes after hashkey computation.
"""
return self._code[seg]
[docs] def get_deps_string(self, seg):
""" Return external dependency code as a string.
This is done to prevent file changes after hashkey computation.
"""
return self._deps[seg]
[docs] def get_hook_string(self, seg, hook):
""" Return hook code as a string.
This is done to prevent file changes after hashkey computation.
"""
return self._hook[seg][hook]
[docs] def get_docline(self, seg):
""" Return the segment synopsis doc line.
"""
import pydoc
return pydoc.source_synopsis(self._code[seg].co_filename)
@staticmethod
[docs] def get_file_content(filename):
""" Read content of a given file
This is the default to read dependencies of a segment.
>>> print Repository.get_file_content ('./test/first_test/third.py')[0:17]
adj = 'the_brave'
"""
fid = open(path.expandvars(path.expanduser(filename)), "r")
code = fid.read()
fid.close()
return code
[docs] def get_fns(self):
""" Return filename candidates.
"""
return []
[docs]class LocalRepository(Repository):
"""A local repository is made of a main directory where to find segment's
code file.
"""
def __init__(self, lst_seg, src_path, permissive=False):
""" Initialize a local repository.
>>> r = LocalRepository(['first','second'], "./test/first_test", permissive=True)
>>> print len(r._code)
2
"""
## dict, code string corresponding to each segment
self._code = {}
self._hook = {}
self._deps = {}
self._all_string = {}
self.permissive = permissive
## string, where to find segment's source code file
self.src_path = path.expanduser(path.expandvars(src_path))
self._rev = get_git_revision(self.src_path)
self._fill_dict(lst_seg)
@staticmethod
[docs] def get_code_source(filename):
""" Read source code from file
>>> print LocalRepository.get_code_source ('./test/first_test/third.py')[0:17]
adj = 'the_brave'
"""
fid = open(filename, "r")
code = fid.read()
fid.close()
# if len(self._rev)>0:
# code = "### %s: %s\n%s"%(path.basename(filename), self._rev , code)
return code
[docs] def get_fns(self):
""" Return filename candidates.
>>> r = LocalRepository(['first','second'], "./test/first_test", permissive=True)
>>> print r.get_fns()
['./test/first_test/second.py', './test/first_test/fourth.py', './test/first_test/third.py', './test/first_test/main.py', './test/first_test/first.py']
"""
return glob(path.join(self.src_path, '*.py'))
class GitRepository(LocalRepository):
""" A Git repository.
"""
def __init__(self):
pass
def get_code_source(self, filename):
pass
def get_fns(self):
pass
class CVSRepository(Repository):
""" A CVS repository.
"""
def __init__(self):
pass
def get_code_source(self, filename):
pass
def get_fns(self):
pass
if __name__ == "__main__":
import doctest
doctest.testmod()