Source code for pipelet.launchers

## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr>  <betoule@apc.univ-paris7.fr>
## 
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html

""" Utilitary functions to launch Pipes

- launch_interactive : launch a local worker in the interactive session
  (debugger compliant)

- launch_thread : launch a bunch of local workers in separate threads
  (SMP machine compliant)

- launch_process : launch a bunch of local workers in separate
  processes (usefull when the GIL becomes limitant ie most time spend
  in pure python processings)

- launch_ssh : launch a bunch of distant workers through ssh

- launch_pbs : submit a bunch of distant workers to any PBS compliant
  system (keep in mind that the pipe directory must be accessible to
  workers)

- launch_ccali : submit a bunch of distant workers to the BQS system
  at CCIN2P3

- launch_ccali_ge : submit a bunch of distant workers to the Grid Engine system
  at CCIN2P3

- launch_pbspro : submit a bunch of distant workers to the PBSpro
  manager (magique3, ...)

Run as a main, it can be used to launch a worker on a distant machine
"""

import pipelet.scheduler as scheduler
import pipelet.worker as worker
import pipelet.pipeline as pipeline
from contextlib import closing
import logging
import logging.handlers
import sys
from pipelet.utils import get_log_file, init_logger
import socket  # to get the hostname
from multiprocessing.managers import BaseManager
import subprocess


[docs]def launch_interactive(pipe, log_level=logging.INFO): """ Launch a local worker in the interactive session. This is debugger compliant, so that exception in the segment execution can be tracked. log_level values given by logging object: logging.DEBUG=10 logging.WARNING=30, logging.CRITICAL=50,etc). Set it to 0 to disable stream logging. >>> T = pipeline.Pipeline("", code_dir='./', ... prefix='./', permissive=True) >>> w,t = launch_interactive(T, log_level=logging.ERROR) >>> w.run() """ init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level) s = scheduler.Scheduler(pipe) wl = init_logger('worker', get_log_file(pipe, 'worker'), level=log_level) w = worker.InteractiveWorker(s, wl) import threading t = threading.Thread(target=s.run) t.start() return (w, t)
[docs]def launch_thread(pipe, n, log_level=logging.WARNING ): """ Launch a bunch of local workers in separate threads. This is SMP machine compliant. Exceptions araising in the execution of any segment are caught and the corresponding task is marked as failed. >>> T = pipeline.Pipeline("", permissive=True) >>> launch_thread(T, 2) """ init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level) s = scheduler.Scheduler(pipe) for i in range(n): wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i), level=log_level) w = worker.ThreadWorker(s, wl) w.start() s.run()
[docs]class SchedulerManager(BaseManager): """ Extension of the BaseManager class. """ pass
[docs]def launch_process(pipe, n, address=('', 50000), authkey='secret', log_level=logging.WARNING, nice=0): """ Launch a bunch of local workers in separate processes . This is usefull (compared to launch_thread) when the GIL becomes limitant, which is bound to be the case when most time is spend in pure python processings. >>> T = pipeline.Pipeline("", permissive=True) >>> launch_process(T, 2) """ init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level) s = scheduler.Scheduler(pipe) SchedulerManager.register('get_scheduler', callable=lambda: s) mgr = SchedulerManager(address=address, authkey=authkey) mgr.start() processlist = [] for i in range(n): wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i), level=log_level) w = worker.ProcessWorker(address=address, authkey=authkey, logger=wl, nice=nice) w.start() processlist.append(w) sched_proxy = mgr.get_scheduler() sched_proxy.run() # joining Zombie process for w in processlist: w.join()
def _scp(file, dest): """ Wrapper around the scp command.""" subprocess.Popen(['scp', file, dest]).communicate()[0]
[docs]def launch_ssh(pipe, host_list, address=None, authkey='secret', log_level=logging.ERROR): """ Launch a bunch of distant workers through ssh. This is used mainly for testing purposes. It can be usefull to distribute computations on a pool of machine that share an access to a NAS. >>> T = pipeline.Pipeline("", permissive=True) >>> launch_ssh(T, ['127.0.0.1'], address=('127.0.0.1',50000)) """ init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level) s = scheduler.Scheduler(pipe) SchedulerManager.register('get_scheduler', callable=lambda: s) mgr = SchedulerManager(address=address, authkey=authkey) mgr.start() processlist = [] for i, h in enumerate(host_list): w = subprocess.Popen( ['ssh', h, "python -m pipelet.launchers -H %s -p %s -s %s" " -l %s" % (address[0], address[1], authkey, get_log_file(pipe, 'worker%d' % i))]) processlist.append(w) # 'launching the scheduler' sched_proxy = mgr.get_scheduler() sched_proxy.run() # joining Zombie process for w in processlist: w.wait()
[docs]def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header=""" #/bin/bash echo $PYTHONPATH """ ): """ Launch a bunch of distant workers through a PBS batch system. """ init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level) s = scheduler.Scheduler(pipe) SchedulerManager.register('get_scheduler', callable=lambda:s) if server: mgr = SchedulerManager(address=address, authkey=authkey) mgr.start() processlist = [] for i in range(n): jobfile = get_log_file (pipe, 'job%d'%i) errfile = jobfile.replace('job', 'err') logfile = jobfile.replace('job', 'log') f = file(jobfile, 'w') f.write (job_header+"\n") f.write ("#PBS -o %s\n"%logfile) f.write ("#PBS -e %s\n"%errfile) f.write ("#PBS -N %s%d\n"%(job_name,i)) f.write ("#PBS -l select=1:ncpus=1,walltime=%s\n"%cpu_time) f.write ("python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0],address[1],authkey,jobfile.replace('job','worker'))) f.close() subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0] if server: print 'launching the scheduler' sched_proxy = mgr.get_scheduler() sched_proxy.run() # def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", job_header=""" # #/bin/bash # echo $PYTHONPATH # """ ): # """ Launch a bunch of distant workers through a PBS batch system using mpirun. # """ # server_file=job_header+""" # #PBS -l walltime=%s # echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh # pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' & # %s # EOF # sleep 5 # mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME # """ # import cPickle as pickle # pipedesc = pickle.dumps(pipe) # jobfile = get_log_file (pipe, '.job') # errfile = jobfile.replace('job', 'err') # logfile = jobfile.replace('job', 'log') # with closing(file(jobfile,'w')) as f: # f.write(server_file%(n, cpu_time, log_level, address[1], pipedesc, n, address[1], authkey, logfile)) # subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
[docs]def launch_nersc (pipe, n, ppn=1, port=50000, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="00:30:00", job_header=""" #PBS -q regular #PBS -l nodes=%d:ppn=%d """): """ Launch a bunch of distant workers through a PBS batch system using aprun. """ # server_file=""" ##PBS -q regular ##PBS -l mppwidth=%d ##PBS -l walltime=%s #export CRAY_ROOTFS=DSL #echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh #pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' & #%s #EOF #sleep 5 #aprun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOST #""" server_file=job_header+""" #PBS -l walltime=%s export CRAY_ROOTFS=DSL echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' & %s EOF sleep 5 mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME """ nnode=n/ppn if (n%ppn>0): nnode=nnode+1 import cPickle as pickle pipedesc = pickle.dumps(pipe) jobfile = get_log_file (pipe, '.job') errfile = jobfile.replace('job', 'err') logfile = jobfile.replace('job', 'log') with closing(file(jobfile,'w')) as f: f.write(server_file%(nnode, ppn, cpu_time, log_level, port, pipedesc, n, port, authkey, logfile)) subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
_job_file = """ #/bin/zsh export PATH=/afs/in2p3.fr/home/b/betoule/software/photcalib/tools:$PATH echo $SHELL echo $PYTHONPATH python -m pipelet.launchers -H %s -p %s -s %s -l %s """ _server_file = """ #/bin/zsh echo $SHELL echo $PYTHONPATH echo $USER echo pouet > /sps/snls13/users/betoule/calib5_ref/log/11-06-06_15-13_scheduler_on_ccwl0674 cat /sps/snls13/users/betoule/calib5_ref/log/11-06-06_15-13_scheduler_on_ccwl0674 echo "export PIPELETD_HOST=$HOST\nexport PIPEWEB_HOST=$HOST\nexport PIPELETD_PORT=%d" > ~/host_info.sh source ~/host_info.sh pipeletd -n -l %s -p $PIPELETD_PORT -a $PIPELETD_HOST << 'EOF' & %s EOF echo "starting web server at $(date)" pipeweb start -n & echo "web server started at $(date)" sleep 2 echo "launching worker at $(date)" python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s echo "worker returned at $(date) """ _worker_file = """ #/bin/zsh echo $SHELL echo $PYTHONPATH while [[ ! -e ~/host_info.sh ]]; do sleep 2; done; source ~/host_info.sh sleep 5 python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s """
[docs]def launch_ccali(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts", job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs", cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.INFO,buff=None): """submit a bunch of distant workers to the BQS system at CCIN2P3. """ if server: # Submit a job that will run the scheduler, a temporary web server and a single worker import cPickle as pickle if buff is not None: b = ['-b', buff] else: b = [] name = '%s_sched'%(job_name,) pipedesc = pickle.dumps(pipe) jobfile = get_log_file (pipe, name+'.job') errfile = jobfile.replace('job', 'err') logfile = jobfile.replace('job', 'log') with closing(file(jobfile,'w')) as f: f.write(_server_file%(address[1], log_level, pipedesc, authkey, logfile)) com = ['qsub', '-l', 'platform=%s'%plateform, '-l', 'T=%d'%cpu_time, '-l', 'M=%s'%mem, '-l', 'scratch=%s'%scratch, '-o', logfile, '-e', errfile, '-N', name, jobfile] print " ".join(com) subprocess.Popen(com).communicate()[0] import re l = subprocess.Popen(['qjob', '-wide','-nh'], stdout=subprocess.PIPE).communicate()[0] existing_process = re.findall( '%s([0-9]*)'%job_name,l) existing_process = filter(lambda x: x!='', existing_process) try: starting_num = max([int(p) for p in existing_process])+1 except: starting_num = 0 for i in range(starting_num,starting_num+n): name = '%s%d'%(job_name,i) jobfile = get_log_file (pipe, name+'.job%d'%i) errfile = jobfile.replace('job', 'err') logfile = jobfile.replace('job', 'log') with closing(file(jobfile,'w')) as f: f.write(_worker_file%(authkey,logfile)) com=['qsub', '-l', 'platform=%s'%plateform, '-l', 'T=%d'%cpu_time, '-l', 'M=%s'%mem, '-l', 'scratch=%s'%scratch, '-o', logfile, '-e', errfile, '-N', name, jobfile] print " ".join(com) subprocess.Popen(com).communicate()[0]
[docs]def launch_ccali_ge(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts", job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs", cpu_time=1000, scratch="512MB", mem="512MB",project='snovae',server=False, log_level=logging.INFO,buff=None, ressource=['sps']): """submit a bunch of distant workers to the BQS system at CCIN2P3. """ if server: # Submit a job that will run the scheduler, a temporary web server and a single worker import cPickle as pickle if buff is not None: b = ['-b', buff] else: b = [] name = '%s_sched'%(job_name,) pipedesc = pickle.dumps(pipe) jobfile = get_log_file (pipe, name+'.job') errfile = jobfile.replace('job', 'err') logfile = jobfile.replace('job', 'log') with closing(file(jobfile,'w')) as f: f.write(_server_file%(address[1], log_level, pipedesc, authkey, logfile)) com = ['qsub', '-P', 'P_%s'%project, '-l', 'ct=%d'%cpu_time, '-l', 'vmem=%s'%mem, '-l', 'fsize=%s'%scratch, '-o', logfile, '-e', errfile, '-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile] print " ".join(com) subprocess.Popen(com).communicate()[0] import re l = subprocess.Popen(['qstat', '-r'], stdout=subprocess.PIPE).communicate()[0] existing_process = re.findall( '%s([0-9]*)'%job_name,l) existing_process = filter(lambda x: x!='', existing_process) try: starting_num = max([int(p) for p in existing_process])+1 except: starting_num = 0 for i in range(starting_num,starting_num+n): name = '%s%d'%(job_name,i) jobfile = get_log_file (pipe, name+'.job%d'%i) errfile = jobfile.replace('job', 'err') logfile = jobfile.replace('job', 'log') with closing(file(jobfile,'w')) as f: f.write(_worker_file%(authkey,logfile)) com=['qsub', '-P', 'P_%s'%project, '-l', 'ct=%d'%cpu_time, '-l', 'vmem=%s'%mem, '-l', 'fsize=%s'%scratch, '-o', logfile, '-e', errfile, '-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile] print " ".join(com) subprocess.Popen(com).communicate()[0]
[docs]def launch_ccage_server (pipe, address=('127.0.0.1',5000), authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='24:00:00', scratch="512M", mem="512M", ressource=['sps']): """ Launch a scheduler + web server through a SGE batch system @ccin2p3. """ server_file="""#! /bin/zsh -l echo $HOST echo "export PIPELETD_HOST=$HOST\nexport PIPEWEB_HOST=$HOST\nexport PIPELETD_PORT=%d" > $HOME/host_info.sh source $HOME/host_info.sh echo "starting web server at $(date)" sleep 5 pipeweb start -n & echo "web server returned at $(date)" pipeletd -n -l %s -p $PIPELETD_PORT -a $HOST << 'EOF' %s EOF """ port = address[1] import cPickle as pickle pipedesc = pickle.dumps(pipe) name = 'sched' jobfile = get_log_file (pipe, name+'.job') errfile = jobfile.replace('job', 'err') logfile = jobfile.replace('job', 'log') with closing(file(jobfile,'w')) as f: f.write(server_file%(port, log_level, pipedesc)) com=['qsub', '-P', 'P_%s'%project, #'-q', 'demon', #'-l', 'demon=1', '-l', 'sps=1', '-l', 'fsize=%s'%scratch, '-l', 'vmem=%s'%mem, '-l', 'ct=%s'%cpu_time, '-o' ,logfile, '-e', errfile, '-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile] subprocess.Popen(com).communicate()[0]
[docs]def launch_ccage_worker (pipe, n, address=('127.0.0.1',5000), authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='01:00:00', scratch="512M", mem="512M", ressource=['sps'], multicores=False): """ Launch n worker using job array """ worker_file = """#! /bin/zsh -l echo $HOST sleep 1 source $HOME/host_info.sh echo "launching worker at $(date)" python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s$SGE_TASK_ID echo "worker returned at $(date)" """ name = 'worker' jobfile = get_log_file (pipe, name+'.job') errfile = jobfile.replace('job', 'err') logfile = jobfile.replace('job', 'log') with closing(file(jobfile,'w')) as f: f.write(worker_file%(authkey,logfile)) if multicores: mc = ['-pe', 'multicores' , '%d'%multicores] else: mc = [] com=['qsub', '-P', 'P_%s'%project, '-l', 'fsize=%s'%scratch, '-l', 'vmem=%s'%mem, '-l', 'ct=%s'%cpu_time, '-t', '1-%d'%n, '-o' ,logfile, '-e', errfile, '-N', name]+mc+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile] subprocess.Popen(com).communicate()[0]
if __name__ == "__main__": import optparse parser = optparse.OptionParser() parser.add_option('-H', '--host', metavar='hostname', help='hostame or adress of the scheduler server', default=socket.gethostname()) parser.add_option('-p', '--port', metavar='port', help='port the scheduler is listenning to', default=50000, type='int') parser.add_option('-s', '--secret', metavar='key', help='authentication key', default='secret') parser.add_option('-l', '--logfile', metavar='logfile', help='worker log filename') # parser.add_option('-L', '--log-level', metavar='level', # type= # help='worker log filename') (options, args) = parser.parse_args() # print (options.host, options.port) # print options.secret name = options.logfile.split("_")[-1] wl = init_logger(name, options.logfile) w = worker.ProcessWorker(address=(options.host, options.port), authkey=options.secret, logger=wl) sys.argv = [sys.argv[0]] # do not propage command line argument to script w.run()

Table Of Contents