Source code for pipelet.scheduler

## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr>  <betoule@apc.univ-paris7.fr>
## 
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html

import os
import os.path as path
import task
from task import TaskList
from task import Task
import Queue
import tracker
import shutil
import threading
import logging
from contextlib import closing
from directive import Multiplex
import pickle
from pipelet.utils import close_logger, spl
import multiprocessing
import multiprocessing.managers
import re

[docs]class NullHandler(logging.Handler): """ Extension of the logging handler class. """
[docs] def emit(self, record): """ Avoid warnings. """ pass
logger = logging.getLogger("scheduler") h = NullHandler() logger.addHandler(h)
[docs]class Scheduler(): """ Feed tasks to an execution queue. A scheduler is made of a execution queue which is feeded with tasks. The scheduler coordinates the execution of the pipeline, building tasks for each segment and product. """ def __init__(self, pipe): """ Initialize a scheduler. A Tracker instance is created at this stage. Parameters ---------- pipe : a Pipeline instance. """ ## pipeline object self.pipe = pipe ## task queue self.task_queue = Queue.Queue(0) ## list of tasks self.products_list = TaskList() ## number of successfull tasks self.nb_success = 0 ## lock on success self.success_lock = threading.Lock() ## lock on worker self.nb_worker_lock = threading.RLock() ## number of worker self.nb_worker = 0 ## stoppping event self.stop_event = threading.Event() ## stopping occurence value self.stopping = False ## aborting occurence value self.abort = False ## tracker object self.tracker = None
[docs] def get_pipe(self): """ Return the pipe object bound to the scheduler """ logger.info("a worker is trying to recover pipe") return self.pipe
[docs] def check_in(self): """ Register a worker. A worker call this method to check in """ with self.nb_worker_lock: if not self.stopping: self.nb_worker = self.nb_worker + 1 res = True logger.info('%d workers registered'%self.nb_worker) else: logger.warning('Reject logging as scheduler is stopping') res = False return res
[docs] def check_out(self): """ Unregister a worker. A worker call this method to check out """ with self.nb_worker_lock: self.nb_worker = self.nb_worker - 1 logger.info('%d workers registered'%self.nb_worker) if self.nb_worker == 0: self.stop_event.set() self.abort_scheduling(None)
[docs] def put_task(self, t): """ Add task to the queue. There is one task for each input which are the products of the parent segment. """ t = self.tracker.add_queued(t) if not self.abort: self.task_queue.put(t)
[docs] def get_task(self): """ Return the next task of the queue. """ t = self.task_queue.get() if t is not None: self.tracker.update_status(t,'running') return t
[docs] def task_done(self, t): """ Tell the tracker when a task has been succesfully completed. The product list is updated with the task outputs. If the completed task is the last of the segment task list, next segment is pushed to the queue. """ with self.success_lock: self.nb_success = self.nb_success + 1 if self.nb_success==1: self.tracker.update_param(t) ## first successful task registers segment tag self.store_meta_seg(t) self.tracker.update_status(t,'done') self.products_list.push(t) self.task_queue.task_done()
[docs] def store_meta_seg(self, t): """ Store meta information for segment. Store meta info at the segment level on disk (to rebuild db). """ ## parents parents = self.pipe.get_parents(t.seg, nophantom=True) lst_dir = [] for e in parents: lst_dir.append(self.pipe.get_curr_dir(e)) fn = self.pipe.get_meta_file(t.seg) try: with closing(file(fn, 'r')) as f: d = pickle.load(f) except: d = {} d['parents'] = lst_dir try: d['param'] = t.tag except: logger.warning("no tag saved for seg %s"%t.seg) with closing(file(fn, 'w')) as f: r = pickle.dump(d,f) ## other stuff try: d = t.args fn= self.pipe.get_param_file(t.seg) if not path.exists(fn): with closing(file(fn, 'w')) as f: r = pickle.dump(d,f) except: logger.warning("no param saved for seg %s"%t.seg) if t.version: d = t.version fn= path.join(self.pipe.get_curr_dir(t.seg),path.basename(self.pipe.repository.get_code_string(t.seg)[1])) s = "### module version informations: \n" for k,v in d.items(): s += "### '"+str(k)+"' : "+str(v)+'\n' with closing(file(fn, 'r+')) as f: c = f.read() c = s+c f.seek(0) f.write(c) else: logger.warning("no version info saved for seg %s"%t.seg)
[docs] def push_next_seg(self, seg): """ Push the segment task list to the queue. Segment repository is created at this stage, where a copy of the code, visu and argument files are saved. The parent segment outputs are combined to form the task list. """ ## some cleaning in the data base first self.tracker.clean (seg) logger.info('Pushing segment %s'%seg) os.umask(002) # 022 TODO this is a problem for group shared # pipelines ###################################### # This belongs to a dedicated function ####################################### d = self.pipe.get_curr_dir(seg) if not path.exists(d): os.mkdir(d) r = self.pipe.repository f = r.get_code_string(seg) if f: dest = os.path.join(d,os.path.basename(f[1])) fid = open(dest, "w") fid.write(f[0]) fid.close() lst_hook = r.get_hook_list(seg) for h in lst_hook: f = r.get_hook_string(seg, h) if f[0]: dest = os.path.join(d,os.path.basename(f[1])) fid = open(dest, "w") fid.write(f[0]) fid.close() for dk, dv in r.get_deps_string(seg).items(): if dk: dest = d+'/'+os.path.basename(dk) fid = open(dest, "w") fid.write(dv) fid.close() d = self.pipe.get_data_dir(seg) if not path.exists(d): os.mkdir(d) ############################################ parents = self.pipe.get_parents(seg) ## parents segments d = self.tracker.get_done(seg) ## done tasks failed = self.tracker.get_failed(seg) # failed tasks #reg = re.compile('/(\d*)(_[^/]*)$') #dprod = [reg.sub(r'/0\2', t.str_input) for t in d] ## done products dprod = dict([(spl(t.str_input), t) for t in d]) #failed_prod = [reg.sub(r'/0\2', t.str_input) for t in failed] # failed products failed_prod = dict([(spl(t.str_input), t) for t in failed]) dstrp = dict([(str(sorted(d[i].parents)),i) for i in range(len(d))]) ## parents of done fstrp = dict([(str(sorted(failed[i].parents)),i) for i in range(len(failed))]) ## parents of done logger.debug('Found %d done tasks segment %s'%(len(d),seg)) logger.debug('Found %d failed tasks segment %s'%(len(failed),seg)) ## task list to queue l = self.products_list.multiplex(seg, parents, self.pipe.repository.get_directive(Multiplex,seg)) # Convention: task with no input and not already done are push once with None as input if not l and not d: l = [Task(seg)] # Updating product list for task already done for t in dprod: self.products_list.push(dprod[t]) self.nb_success = self.nb_success + 1 # Looking for new task to get done logger.info('Found %d tasks in seg %s to get done'%(len(l),seg)) with self.tracker.conn_lock: with self.tracker.conn: for t in l: # foreach task of the task list t.str_input = self.pipe.get_data_dir(t.seg, prod=(t.task_input, 0)) if (t.str_input in failed_prod): # done but failed continue if (not (t.str_input in dprod)): #not done wrt task_input self.put_task(t) else: # found a task with a matching task_input. Checking whether parents also match if len(t.parents)==0: # orphan task -> product gives the status pass # already done nothing to do else: # task with parents try: strp = str(sorted(t.parents)) ind = dstrp[strp] ## find parents set in list of done parents sets tjk = d[ind]; ## check that parents set exist except KeyError, ValueError: ## parents do not match parents of the done list. This is a new task. self.put_task(t) logger.debug("nb_success starts at %d for segment %s"%(self.nb_success,seg))
[docs] def task_failed(self, t): """ Tell the tracker when a task has failed. If the failed task is the last of the segment task list, next segment is pushed to the queue. """ self.tracker.update_status(t,'failed') self.task_queue.task_done()
[docs] def requeue(self, t): """ Requeue a running task. """ self.tracker.update_status(t,'queued') if not self.abort: self.task_queue.put(t) self.task_queue.task_done()
[docs] def abort_scheduling(self, t): """ Abort the processing. """ with self.nb_worker_lock: if t is not None: logger.warning("Aborting scheduling of task %d..."%t.id) else: logger.warning("Aborting scheduling ...") self.abort = True if t is not None: logger.warning("tasks from %s has failed"%t.seg) self.task_queue.task_done() self.tracker.update_status(t,'failed') # empty the task queue try: logger.info ("Trying to empty task queue") while(self.task_queue.get_nowait()):#(block=False)): self.task_queue.task_done() except Queue.Empty: logger.debug("Empty Queue") logger.warning("Scheduler aborted") #def run(self): # import cProfile # cProfile.runctx('self._run()', globals(), locals(),'schedprof')
[docs] def run(self): """ Start the scheduler. Start a tracker and fill the task queue with the first segment task. """ self.stop_event.clear() self.tracker = tracker.SqliteTracker(self.pipe) self.tracker.start() logger.info("Tracker launched") for k,v in self.pipe._input.iteritems(): t = Task(self.pipe.get_parents(k)[0], task_output=v) self.products_list.push(t) logger.debug("Pushing phantom task %s"%str(t)) try: for s in self.pipe.flatten(): self.push_next_seg(s) self.task_queue.join() if self.abort: logger.warning("Interrupting the main loop") break if self.nb_success: #self.store_meta_seg(s) self.nb_success = 0 else: logger.warning("No successfull job in the previous step %s, stopping here."%s) break except Exception, e: logger.critical(str(e)) self.abort_scheduling(None) raise finally: # inform the workers that there are no jobs remaining logger.info("Asking workers to stop...") with self.nb_worker_lock: self.stopping = True for i in range(self.nb_worker): self.task_queue.put(None) # wait for workers to complete logger.info("Waiting for workers to complete...") if self.nb_worker == 0: self.stop_event.set() self.stop_event.wait() logger.info("No more workers. Stopping the sqltracker ...") self.tracker.stop() logger.info("Stopping the scheduler ...") self.tracker.join() close_logger (logger)
if __name__ == "__main__": import doctest doctest.testmod()

Table Of Contents