## Copyright (C) 2008, 2009, 2010 APC LPNHE CNRS Universite Paris Diderot <lejeune@apc.univ-paris7.fr> <betoule@apc.univ-paris7.fr>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see http://www.gnu.org/licenses/gpl.html
""" Utilitary functions to launch Pipes
- launch_interactive : launch a local worker in the interactive session
(debugger compliant)
- launch_thread : launch a bunch of local workers in separate threads
(SMP machine compliant)
- launch_process : launch a bunch of local workers in separate
processes (usefull when the GIL becomes limitant ie most time spend
in pure python processings)
- launch_ssh : launch a bunch of distant workers through ssh
- launch_pbs : submit a bunch of distant workers to any PBS compliant
system (keep in mind that the pipe directory must be accessible to
workers)
- launch_ccali : submit a bunch of distant workers to the BQS system
at CCIN2P3
- launch_ccali_ge : submit a bunch of distant workers to the Grid Engine system
at CCIN2P3
- launch_pbspro : submit a bunch of distant workers to the PBSpro
manager (magique3, ...)
Run as a main, it can be used to launch a worker on a distant machine
"""
import pipelet.scheduler as scheduler
import pipelet.worker as worker
import pipelet.pipeline as pipeline
from contextlib import closing
import logging
import logging.handlers
import sys
from pipelet.utils import get_log_file, init_logger
import socket # to get the hostname
from multiprocessing.managers import BaseManager
import subprocess
[docs]def launch_interactive(pipe, log_level=logging.INFO):
""" Launch a local worker in the interactive session.
This is debugger compliant, so that exception in the segment
execution can be tracked.
log_level values given by logging object: logging.DEBUG=10 logging.WARNING=30,
logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.
>>> T = pipeline.Pipeline("", code_dir='./',
... prefix='./', permissive=True)
>>> w,t = launch_interactive(T, log_level=logging.ERROR)
>>> w.run()
"""
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
wl = init_logger('worker', get_log_file(pipe, 'worker'), level=log_level)
w = worker.InteractiveWorker(s, wl)
import threading
t = threading.Thread(target=s.run)
t.start()
return (w, t)
[docs]def launch_thread(pipe, n, log_level=logging.WARNING ):
""" Launch a bunch of local workers in separate threads.
This is SMP machine compliant. Exceptions araising in the
execution of any segment are caught and the corresponding task is
marked as failed.
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_thread(T, 2)
"""
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
for i in range(n):
wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i),
level=log_level)
w = worker.ThreadWorker(s, wl)
w.start()
s.run()
[docs]class SchedulerManager(BaseManager):
""" Extension of the BaseManager class.
"""
pass
[docs]def launch_process(pipe, n, address=('', 50000), authkey='secret',
log_level=logging.WARNING, nice=0):
""" Launch a bunch of local workers in separate processes .
This is usefull (compared to launch_thread) when the GIL becomes
limitant, which is bound to be the case when most time is spend in
pure python processings.
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_process(T, 2)
"""
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda: s)
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i in range(n):
wl = init_logger('worker%d' % i, get_log_file(pipe, 'worker%d' % i),
level=log_level)
w = worker.ProcessWorker(address=address, authkey=authkey,
logger=wl, nice=nice)
w.start()
processlist.append(w)
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
# joining Zombie process
for w in processlist:
w.join()
def _scp(file, dest):
""" Wrapper around the scp command."""
subprocess.Popen(['scp', file, dest]).communicate()[0]
[docs]def launch_ssh(pipe, host_list, address=None, authkey='secret',
log_level=logging.ERROR):
""" Launch a bunch of distant workers through ssh.
This is used mainly for testing purposes. It can be usefull to
distribute computations on a pool of machine that share an access
to a NAS.
>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_ssh(T, ['127.0.0.1'], address=('127.0.0.1',50000))
"""
init_logger('scheduler', get_log_file(pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda: s)
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i, h in enumerate(host_list):
w = subprocess.Popen(
['ssh', h,
"python -m pipelet.launchers -H %s -p %s -s %s"
" -l %s" % (address[0], address[1], authkey,
get_log_file(pipe, 'worker%d' % i))])
processlist.append(w)
# 'launching the scheduler'
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
# joining Zombie process
for w in processlist:
w.wait()
[docs]def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", server=False, job_header="""
#/bin/bash
echo $PYTHONPATH
""" ):
""" Launch a bunch of distant workers through a PBS batch system.
"""
init_logger ('scheduler', get_log_file (pipe, 'scheduler'), level=log_level)
s = scheduler.Scheduler(pipe)
SchedulerManager.register('get_scheduler', callable=lambda:s)
if server:
mgr = SchedulerManager(address=address, authkey=authkey)
mgr.start()
processlist = []
for i in range(n):
jobfile = get_log_file (pipe, 'job%d'%i)
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
f = file(jobfile, 'w')
f.write (job_header+"\n")
f.write ("#PBS -o %s\n"%logfile)
f.write ("#PBS -e %s\n"%errfile)
f.write ("#PBS -N %s%d\n"%(job_name,i))
f.write ("#PBS -l select=1:ncpus=1,walltime=%s\n"%cpu_time)
f.write ("python -m pipelet.launchers -H %s -p %s -s %s -l %s"%(address[0],address[1],authkey,jobfile.replace('job','worker')))
f.close()
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
if server:
print 'launching the scheduler'
sched_proxy = mgr.get_scheduler()
sched_proxy.run()
# def launch_pbs(pipe, n, address=None, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="2:00:00", job_header="""
# #/bin/bash
# echo $PYTHONPATH
# """ ):
# """ Launch a bunch of distant workers through a PBS batch system using mpirun.
# """
# server_file=job_header+"""
# #PBS -l walltime=%s
# echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
# pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
# %s
# EOF
# sleep 5
# mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
# """
# import cPickle as pickle
# pipedesc = pickle.dumps(pipe)
# jobfile = get_log_file (pipe, '.job')
# errfile = jobfile.replace('job', 'err')
# logfile = jobfile.replace('job', 'log')
# with closing(file(jobfile,'w')) as f:
# f.write(server_file%(n, cpu_time, log_level, address[1], pipedesc, n, address[1], authkey, logfile))
# subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
[docs]def launch_nersc (pipe, n, ppn=1, port=50000, authkey='secret', job_name="job_", log_level=logging.WARNING, cpu_time="00:30:00", job_header="""
#PBS -q regular
#PBS -l nodes=%d:ppn=%d
"""):
""" Launch a bunch of distant workers through a PBS batch system using aprun.
"""
# server_file="""
##PBS -q regular
##PBS -l mppwidth=%d
##PBS -l walltime=%s
#export CRAY_ROOTFS=DSL
#echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
#pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
#%s
#EOF
#sleep 5
#aprun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOST
#"""
server_file=job_header+"""
#PBS -l walltime=%s
export CRAY_ROOTFS=DSL
echo "export PIPELETD_HOST=$HOSTNAME" > ~/host_info.sh
pipeletd -n -l %d -p %d -a $HOSTNAME << 'EOF' &
%s
EOF
sleep 5
mpirun -np %d python $PIPELETPATH/launchers.py -p %d -s %s -l %s -H $HOSTNAME
"""
nnode=n/ppn
if (n%ppn>0):
nnode=nnode+1
import cPickle as pickle
pipedesc = pickle.dumps(pipe)
jobfile = get_log_file (pipe, '.job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(server_file%(nnode, ppn, cpu_time, log_level, port, pipedesc, n, port, authkey, logfile))
subprocess.Popen(['qsub','-o' ,logfile, '-e', errfile,jobfile]).communicate()[0]
_job_file = """
#/bin/zsh
export PATH=/afs/in2p3.fr/home/b/betoule/software/photcalib/tools:$PATH
echo $SHELL
echo $PYTHONPATH
python -m pipelet.launchers -H %s -p %s -s %s -l %s
"""
_server_file = """
#/bin/zsh
echo $SHELL
echo $PYTHONPATH
echo $USER
echo pouet > /sps/snls13/users/betoule/calib5_ref/log/11-06-06_15-13_scheduler_on_ccwl0674
cat /sps/snls13/users/betoule/calib5_ref/log/11-06-06_15-13_scheduler_on_ccwl0674
echo "export PIPELETD_HOST=$HOST\nexport PIPEWEB_HOST=$HOST\nexport PIPELETD_PORT=%d" > ~/host_info.sh
source ~/host_info.sh
pipeletd -n -l %s -p $PIPELETD_PORT -a $PIPELETD_HOST << 'EOF' &
%s
EOF
echo "starting web server at $(date)"
pipeweb start -n &
echo "web server started at $(date)"
sleep 2
echo "launching worker at $(date)"
python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s
echo "worker returned at $(date)
"""
_worker_file = """
#/bin/zsh
echo $SHELL
echo $PYTHONPATH
while [[ ! -e ~/host_info.sh ]]; do sleep 2; done;
source ~/host_info.sh
sleep 5
python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s
"""
[docs]def launch_ccali(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts",
job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs",
cpu_time=1000, scratch="512MB", mem="512MB",plateform='LINUX',server=False, log_level=logging.INFO,buff=None):
"""submit a bunch of distant workers to the BQS system
at CCIN2P3.
"""
if server:
# Submit a job that will run the scheduler, a temporary web server and a single worker
import cPickle as pickle
if buff is not None:
b = ['-b', buff]
else:
b = []
name = '%s_sched'%(job_name,)
pipedesc = pickle.dumps(pipe)
jobfile = get_log_file (pipe, name+'.job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(_server_file%(address[1], log_level, pipedesc, authkey, logfile))
com = ['qsub',
'-l', 'platform=%s'%plateform,
'-l', 'T=%d'%cpu_time,
'-l', 'M=%s'%mem,
'-l', 'scratch=%s'%scratch,
'-o', logfile,
'-e', errfile,
'-N', name,
jobfile]
print " ".join(com)
subprocess.Popen(com).communicate()[0]
import re
l = subprocess.Popen(['qjob', '-wide','-nh'], stdout=subprocess.PIPE).communicate()[0]
existing_process = re.findall( '%s([0-9]*)'%job_name,l)
existing_process = filter(lambda x: x!='', existing_process)
try:
starting_num = max([int(p) for p in existing_process])+1
except:
starting_num = 0
for i in range(starting_num,starting_num+n):
name = '%s%d'%(job_name,i)
jobfile = get_log_file (pipe, name+'.job%d'%i)
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(_worker_file%(authkey,logfile))
com=['qsub',
'-l', 'platform=%s'%plateform,
'-l', 'T=%d'%cpu_time,
'-l', 'M=%s'%mem,
'-l', 'scratch=%s'%scratch,
'-o', logfile,
'-e', errfile,
'-N', name,
jobfile]
print " ".join(com)
subprocess.Popen(com).communicate()[0]
[docs]def launch_ccali_ge(pipe, n, address=('127.0.0.1',5000), authkey='secret', job_dir="/sps/snls13/users/betoule/bqs/scripts",
job_name="job_", log_dir="/sps/snls13/users/betoule/bqs/logs",
cpu_time=1000, scratch="512MB", mem="512MB",project='snovae',server=False, log_level=logging.INFO,buff=None, ressource=['sps']):
"""submit a bunch of distant workers to the BQS system
at CCIN2P3.
"""
if server:
# Submit a job that will run the scheduler, a temporary web server and a single worker
import cPickle as pickle
if buff is not None:
b = ['-b', buff]
else:
b = []
name = '%s_sched'%(job_name,)
pipedesc = pickle.dumps(pipe)
jobfile = get_log_file (pipe, name+'.job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(_server_file%(address[1], log_level, pipedesc, authkey, logfile))
com = ['qsub',
'-P', 'P_%s'%project,
'-l', 'ct=%d'%cpu_time,
'-l', 'vmem=%s'%mem,
'-l', 'fsize=%s'%scratch,
'-o', logfile,
'-e', errfile,
'-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
print " ".join(com)
subprocess.Popen(com).communicate()[0]
import re
l = subprocess.Popen(['qstat', '-r'], stdout=subprocess.PIPE).communicate()[0]
existing_process = re.findall( '%s([0-9]*)'%job_name,l)
existing_process = filter(lambda x: x!='', existing_process)
try:
starting_num = max([int(p) for p in existing_process])+1
except:
starting_num = 0
for i in range(starting_num,starting_num+n):
name = '%s%d'%(job_name,i)
jobfile = get_log_file (pipe, name+'.job%d'%i)
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(_worker_file%(authkey,logfile))
com=['qsub',
'-P', 'P_%s'%project,
'-l', 'ct=%d'%cpu_time,
'-l', 'vmem=%s'%mem,
'-l', 'fsize=%s'%scratch,
'-o', logfile,
'-e', errfile,
'-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
print " ".join(com)
subprocess.Popen(com).communicate()[0]
[docs]def launch_ccage_server (pipe, address=('127.0.0.1',5000), authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='24:00:00', scratch="512M", mem="512M", ressource=['sps']):
""" Launch a scheduler + web server through a SGE batch system @ccin2p3.
"""
server_file="""#! /bin/zsh -l
echo $HOST
echo "export PIPELETD_HOST=$HOST\nexport PIPEWEB_HOST=$HOST\nexport PIPELETD_PORT=%d" > $HOME/host_info.sh
source $HOME/host_info.sh
echo "starting web server at $(date)"
sleep 5
pipeweb start -n &
echo "web server returned at $(date)"
pipeletd -n -l %s -p $PIPELETD_PORT -a $HOST << 'EOF'
%s
EOF
"""
port = address[1]
import cPickle as pickle
pipedesc = pickle.dumps(pipe)
name = 'sched'
jobfile = get_log_file (pipe, name+'.job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(server_file%(port, log_level, pipedesc))
com=['qsub',
'-P', 'P_%s'%project,
#'-q', 'demon',
#'-l', 'demon=1',
'-l', 'sps=1',
'-l', 'fsize=%s'%scratch,
'-l', 'vmem=%s'%mem,
'-l', 'ct=%s'%cpu_time,
'-o' ,logfile,
'-e', errfile,
'-N', name]+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
subprocess.Popen(com).communicate()[0]
[docs]def launch_ccage_worker (pipe, n, address=('127.0.0.1',5000), authkey='secret', log_level=logging.WARNING, project="planck", cpu_time='01:00:00', scratch="512M", mem="512M", ressource=['sps'], multicores=False):
""" Launch n worker using job array
"""
worker_file = """#! /bin/zsh -l
echo $HOST
sleep 1
source $HOME/host_info.sh
echo "launching worker at $(date)"
python -m pipelet.launchers -H $PIPELETD_HOST -p $PIPELETD_PORT -s %s -l %s$SGE_TASK_ID
echo "worker returned at $(date)"
"""
name = 'worker'
jobfile = get_log_file (pipe, name+'.job')
errfile = jobfile.replace('job', 'err')
logfile = jobfile.replace('job', 'log')
with closing(file(jobfile,'w')) as f:
f.write(worker_file%(authkey,logfile))
if multicores:
mc = ['-pe', 'multicores' , '%d'%multicores]
else:
mc = []
com=['qsub',
'-P', 'P_%s'%project,
'-l', 'fsize=%s'%scratch,
'-l', 'vmem=%s'%mem,
'-l', 'ct=%s'%cpu_time,
'-t', '1-%d'%n,
'-o' ,logfile,
'-e', errfile,
'-N', name]+mc+reduce(lambda x,y : x+y, [['-l', '%s=1'%res] for res in ressource], [])+[jobfile]
subprocess.Popen(com).communicate()[0]
if __name__ == "__main__":
import optparse
parser = optparse.OptionParser()
parser.add_option('-H', '--host', metavar='hostname',
help='hostame or adress of the scheduler server',
default=socket.gethostname())
parser.add_option('-p', '--port', metavar='port',
help='port the scheduler is listenning to',
default=50000, type='int')
parser.add_option('-s', '--secret', metavar='key',
help='authentication key', default='secret')
parser.add_option('-l', '--logfile', metavar='logfile',
help='worker log filename')
# parser.add_option('-L', '--log-level', metavar='level',
# type=
# help='worker log filename')
(options, args) = parser.parse_args()
# print (options.host, options.port)
# print options.secret
name = options.logfile.split("_")[-1]
wl = init_logger(name, options.logfile)
w = worker.ProcessWorker(address=(options.host, options.port),
authkey=options.secret, logger=wl)
sys.argv = [sys.argv[0]] # do not propage command line argument to script
w.run()