launchers Module

Utilitary functions to launch Pipes

  • launch_interactive : launch a local worker in the interactive session (debugger compliant)
  • launch_thread : launch a bunch of local workers in separate threads (SMP machine compliant)
  • launch_process : launch a bunch of local workers in separate processes (usefull when the GIL becomes limitant ie most time spend in pure python processings)
  • launch_ssh : launch a bunch of distant workers through ssh
  • launch_pbs : submit a bunch of distant workers to any PBS compliant system (keep in mind that the pipe directory must be accessible to workers)
  • launch_ccali : submit a bunch of distant workers to the BQS system at CCIN2P3
  • launch_ccali_ge : submit a bunch of distant workers to the Grid Engine system at CCIN2P3
  • launch_pbspro : submit a bunch of distant workers to the PBSpro manager (magique3, ...)

Run as a main, it can be used to launch a worker on a distant machine

class pipelet.launchers.SchedulerManager(address=None, authkey=None, serializer='pickle')[source]

Bases: multiprocessing.managers.BaseManager

Extension of the BaseManager class.

pipelet.launchers.launch_ccage_server(pipe, address=('127.0.0.1', 5000), authkey='secret', log_level=30, project='planck', cpu_time='24:00:00', scratch='512M', mem='512M', ressource=['sps'])[source]

Launch a scheduler + web server through a SGE batch system @ccin2p3.

pipelet.launchers.launch_ccage_worker(pipe, n, address=('127.0.0.1', 5000), authkey='secret', log_level=30, project='planck', cpu_time='01:00:00', scratch='512M', mem='512M', ressource=['sps'], multicores=False)[source]

Launch n worker using job array

pipelet.launchers.launch_ccali(pipe, n, address=('127.0.0.1', 5000), authkey='secret', job_dir='/sps/snls13/users/betoule/bqs/scripts', job_name='job_', log_dir='/sps/snls13/users/betoule/bqs/logs', cpu_time=1000, scratch='512MB', mem='512MB', plateform='LINUX', server=False, log_level=20, buff=None)[source]

submit a bunch of distant workers to the BQS system at CCIN2P3.

pipelet.launchers.launch_ccali_ge(pipe, n, address=('127.0.0.1', 5000), authkey='secret', job_dir='/sps/snls13/users/betoule/bqs/scripts', job_name='job_', log_dir='/sps/snls13/users/betoule/bqs/logs', cpu_time=1000, scratch='512MB', mem='512MB', project='snovae', server=False, log_level=20, buff=None, ressource=['sps'])[source]

submit a bunch of distant workers to the BQS system at CCIN2P3.

pipelet.launchers.launch_interactive(pipe, log_level=20)[source]

Launch a local worker in the interactive session.

This is debugger compliant, so that exception in the segment execution can be tracked.

log_level values given by logging object: logging.DEBUG=10 logging.WARNING=30, logging.CRITICAL=50,etc). Set it to 0 to disable stream logging.

>>> T = pipeline.Pipeline("", code_dir='./',
...                       prefix='./', permissive=True)
>>> w,t = launch_interactive(T, log_level=logging.ERROR)
>>> w.run()
pipelet.launchers.launch_nersc(pipe, n, ppn=1, port=50000, authkey='secret', job_name='job_', log_level=30, cpu_time='00:30:00', job_header='\n#PBS -q regular\n#PBS -l nodes=%d:ppn=%d\n')[source]

Launch a bunch of distant workers through a PBS batch system using aprun.

pipelet.launchers.launch_pbs(pipe, n, address=None, authkey='secret', job_name='job_', log_level=30, cpu_time='2:00:00', server=False, job_header='\n#/bin/bash\necho $PYTHONPATH\n')[source]

Launch a bunch of distant workers through a PBS batch system.

pipelet.launchers.launch_process(pipe, n, address=('', 50000), authkey='secret', log_level=30, nice=0)[source]

Launch a bunch of local workers in separate processes .

This is usefull (compared to launch_thread) when the GIL becomes limitant, which is bound to be the case when most time is spend in pure python processings.

>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_process(T, 2)
pipelet.launchers.launch_ssh(pipe, host_list, address=None, authkey='secret', log_level=40)[source]

Launch a bunch of distant workers through ssh.

This is used mainly for testing purposes. It can be usefull to distribute computations on a pool of machine that share an access to a NAS.

>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_ssh(T, ['127.0.0.1'], address=('127.0.0.1',50000))
pipelet.launchers.launch_thread(pipe, n, log_level=30)[source]

Launch a bunch of local workers in separate threads.

This is SMP machine compliant. Exceptions araising in the execution of any segment are caught and the corresponding task is marked as failed.

>>> T = pipeline.Pipeline("", permissive=True)
>>> launch_thread(T, 2)

Table Of Contents

This Page