## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
import os
import os.path as path
import task
from task import TaskList
from task import Task
import Queue
import tracker
import shutil
import threading
import logging
from contextlib import closing
from directive import Multiplex
import pickle
from pipelet.utils import close_logger, spl
import multiprocessing
import multiprocessing.managers
import re
[docs]class NullHandler(logging.Handler):
""" Extension of the logging handler class.
"""
[docs] def emit(self, record):
""" Avoid warnings.
"""
pass
logger = logging.getLogger("scheduler")
h = NullHandler()
logger.addHandler(h)
[docs]class Scheduler():
""" Feed tasks to an execution queue.
A scheduler is made of a execution queue which is feeded with
tasks. The scheduler coordinates the execution of the pipeline,
building tasks for each segment and product.
"""
def __init__(self, pipe):
""" Initialize a scheduler.
A Tracker instance is created at this stage.
Parameters
----------
pipe : a Pipeline instance.
"""
## pipeline object
self.pipe = pipe
## task queue
self.task_queue = Queue.Queue(0)
## list of tasks
self.products_list = TaskList()
## number of successfull tasks
self.nb_success = 0
## lock on success
self.success_lock = threading.Lock()
## lock on worker
self.nb_worker_lock = threading.RLock()
## number of worker
self.nb_worker = 0
## stoppping event
self.stop_event = threading.Event()
## stopping occurence value
self.stopping = False
## aborting occurence value
self.abort = False
## tracker object
self.tracker = None
[docs] def get_pipe(self):
""" Return the pipe object bound to the scheduler
"""
logger.info("a worker is trying to recover pipe")
return self.pipe
[docs] def check_in(self):
""" Register a worker.
A worker call this method to check in
"""
with self.nb_worker_lock:
if not self.stopping:
self.nb_worker = self.nb_worker + 1
res = True
logger.info('%d workers registered'%self.nb_worker)
else:
logger.warning('Reject logging as scheduler is stopping')
res = False
return res
[docs] def check_out(self):
""" Unregister a worker.
A worker call this method to check out
"""
with self.nb_worker_lock:
self.nb_worker = self.nb_worker - 1
logger.info('%d workers registered'%self.nb_worker)
if self.nb_worker == 0:
self.stop_event.set()
self.abort_scheduling(None)
[docs] def put_task(self, t):
""" Add task to the queue.
There is one task for each input which are the products of the
parent segment.
"""
t = self.tracker.add_queued(t)
if not self.abort:
self.task_queue.put(t)
[docs] def get_task(self):
""" Return the next task of the queue.
"""
t = self.task_queue.get()
if t is not None:
self.tracker.update_status(t,'running')
return t
[docs] def task_done(self, t):
""" Tell the tracker when a task has been succesfully
completed.
The product list is updated with the task outputs. If the
completed task is the last of the segment task list, next segment
is pushed to the queue.
"""
with self.success_lock:
self.nb_success = self.nb_success + 1
if self.nb_success==1:
self.tracker.update_param(t) ## first successful task registers segment tag
self.store_meta_seg(t)
self.tracker.update_status(t,'done')
self.products_list.push(t)
self.task_queue.task_done()
[docs] def push_next_seg(self, seg):
""" Push the segment task list to the queue.
Segment repository is created at this stage, where a copy of
the code, visu and argument files are saved.
The parent segment outputs are combined to form the task list.
"""
## some cleaning in the data base first
self.tracker.clean (seg)
logger.info('Pushing segment %s'%seg)
os.umask(002) # 022 TODO this is a problem for group shared
# pipelines
######################################
# This belongs to a dedicated function
#######################################
d = self.pipe.get_curr_dir(seg)
if not path.exists(d):
os.mkdir(d)
r = self.pipe.repository
f = r.get_code_string(seg)
if f:
dest = os.path.join(d,os.path.basename(f[1]))
fid = open(dest, "w")
fid.write(f[0])
fid.close()
lst_hook = r.get_hook_list(seg)
for h in lst_hook:
f = r.get_hook_string(seg, h)
if f[0]:
dest = os.path.join(d,os.path.basename(f[1]))
fid = open(dest, "w")
fid.write(f[0])
fid.close()
for dk, dv in r.get_deps_string(seg).items():
if dk:
dest = d+'/'+os.path.basename(dk)
fid = open(dest, "w")
fid.write(dv)
fid.close()
d = self.pipe.get_data_dir(seg)
if not path.exists(d):
os.mkdir(d)
############################################
parents = self.pipe.get_parents(seg) ## parents segments
d = self.tracker.get_done(seg) ## done tasks
failed = self.tracker.get_failed(seg) # failed tasks
#reg = re.compile('/(\d*)(_[^/]*)$')
#dprod = [reg.sub(r'/0\2', t.str_input) for t in d] ## done products
dprod = dict([(spl(t.str_input), t) for t in d])
#failed_prod = [reg.sub(r'/0\2', t.str_input) for t in failed] # failed products
failed_prod = dict([(spl(t.str_input), t) for t in failed])
dstrp = dict([(str(sorted(d[i].parents)),i) for i in range(len(d))]) ## parents of done
fstrp = dict([(str(sorted(failed[i].parents)),i) for i in range(len(failed))]) ## parents of done
logger.debug('Found %d done tasks segment %s'%(len(d),seg))
logger.debug('Found %d failed tasks segment %s'%(len(failed),seg))
## task list to queue
l = self.products_list.multiplex(seg, parents, self.pipe.repository.get_directive(Multiplex,seg))
# Convention: task with no input and not already done are push once with None as input
if not l and not d:
l = [Task(seg)]
# Updating product list for task already done
for t in dprod:
self.products_list.push(dprod[t])
self.nb_success = self.nb_success + 1
# Looking for new task to get done
logger.info('Found %d tasks in seg %s to get done'%(len(l),seg))
with self.tracker.conn_lock:
with self.tracker.conn:
for t in l: # foreach task of the task list
t.str_input = self.pipe.get_data_dir(t.seg, prod=(t.task_input, 0))
if (t.str_input in failed_prod): # done but failed
continue
if (not (t.str_input in dprod)): #not done wrt task_input
self.put_task(t)
else: # found a task with a matching task_input. Checking whether parents also match
if len(t.parents)==0: # orphan task -> product gives the status
pass # already done nothing to do
else: # task with parents
try:
strp = str(sorted(t.parents))
ind = dstrp[strp] ## find parents set in list of done parents sets
tjk = d[ind]; ## check that parents set exist
except KeyError, ValueError: ## parents do not match parents of the done list. This is a new task.
self.put_task(t)
logger.debug("nb_success starts at %d for segment %s"%(self.nb_success,seg))
[docs] def task_failed(self, t):
""" Tell the tracker when a task has failed.
If the failed task is the last of the segment task list,
next segment is pushed to the queue.
"""
self.tracker.update_status(t,'failed')
self.task_queue.task_done()
[docs] def requeue(self, t):
""" Requeue a running task.
"""
self.tracker.update_status(t,'queued')
if not self.abort:
self.task_queue.put(t)
self.task_queue.task_done()
[docs] def abort_scheduling(self, t):
""" Abort the processing.
"""
with self.nb_worker_lock:
if t is not None:
logger.warning("Aborting scheduling of task %d..."%t.id)
else:
logger.warning("Aborting scheduling ...")
self.abort = True
if t is not None:
logger.warning("tasks from %s has failed"%t.seg)
self.task_queue.task_done()
self.tracker.update_status(t,'failed')
# empty the task queue
try:
logger.info ("Trying to empty task queue")
while(self.task_queue.get_nowait()):#(block=False)):
self.task_queue.task_done()
except Queue.Empty:
logger.debug("Empty Queue")
logger.warning("Scheduler aborted")
#def run(self):
# import cProfile
# cProfile.runctx('self._run()', globals(), locals(),'schedprof')
[docs] def run(self):
""" Start the scheduler.
Start a tracker and fill the task queue with the first segment task.
"""
self.stop_event.clear()
self.tracker = tracker.SqliteTracker(self.pipe)
self.tracker.start()
logger.info("Tracker launched")
for k,v in self.pipe._input.iteritems():
t = Task(self.pipe.get_parents(k)[0], task_output=v)
self.products_list.push(t)
logger.debug("Pushing phantom task %s"%str(t))
try:
for s in self.pipe.flatten():
self.push_next_seg(s)
self.task_queue.join()
if self.abort:
logger.warning("Interrupting the main loop")
break
if self.nb_success:
#self.store_meta_seg(s)
self.nb_success = 0
else:
logger.warning("No successfull job in the previous step %s, stopping here."%s)
break
except Exception, e:
logger.critical(str(e))
self.abort_scheduling(None)
raise
finally:
# inform the workers that there are no jobs remaining
logger.info("Asking workers to stop...")
with self.nb_worker_lock:
self.stopping = True
for i in range(self.nb_worker):
self.task_queue.put(None)
# wait for workers to complete
logger.info("Waiting for workers to complete...")
if self.nb_worker == 0:
self.stop_event.set()
self.stop_event.wait()
logger.info("No more workers. Stopping the sqltracker ...")
self.tracker.stop()
logger.info("Stopping the scheduler ...")
self.tracker.join()
close_logger (logger)
if __name__ == "__main__":
import doctest
doctest.testmod()