This section describe more complicated (and useful) features and requires good familiarity with the concept introduced in the previous section.
The default behavior can be altered by specifying a #multiplex directive in the commentary of the segment code. If several multiplex directives are present in the segment code the last one is retained.
The multiplex directive can be one of:
Moreover the #multiplex cross_prod directive admits filtering and grouping by class similarly to SQL requests:
#multiplex cross_prod where "condition" group_by "class_function"
condition and class_function are python code evaluated for each element of the product set.
The argument of where is a condition. The element will be part of the input set only if it evaluates to True.
The group_by directive groups elements into class according to the result of the evaluation of the given class function. The input set contains all the resulting class. For example, if the function is a constant, the input set will contain only one element: the class containing all elements.
During the evaluation, the values of the tuple elements are accessible as variable wearing the name of the corresponding parents.
Given the Cartesian product set:
[('Lancelot','the Brave'), ('Lancelot','the Pure'), ('Galahad','the Brave'), ('Galahad','the Pure')]
one can use
#multiplex cross_prod where "quality=='the Brave'"
to get 2 instances of the following segment (melt) running on ('Lancelot','the Brave'), ('Galahad','the Brave')
or:
#multiplex cross_prod group_by "knights"
to get 2 instances of the melt segment running on ('Lancelot'), ('Galahad')
or:
#multiplex cross_prod group_by "0"
to get 1 instance of the melt segment running on: (0)
Note that to make use of group_by, elements of the output set have to be hashable.
Another caution on the use of group: segment input data type is no longer a dictionary in those cases as the original tuple is lost and replaced by the result of the class function.
See section The segment environment section for more details.
As explained in the introduction section, Pipelet offers the possibility to spare CPU time by saving intermediate products on disk. We call intermediate products the input/output data files of the different segments.
Each segment repository is identified by a unique key which depends on:
Every change made on a segment (new parameter or new parent) will then give a different key, and tell the Pipelet engine to compute a new segment instance.
It is possible to add some external dependencies to the key computation using the depend directive:
#depend file1 file2
At the very beginning of the pipeline execution, all dependencies will be stored, to prevent any change (code edition) between the key computation and actual processing.
Note that this mechanism works only for segment and hook scripts. External dependencies are also read as the beginning of the pipeline execution, but only used for the key computation.
In case of unfortunate lost of the pipeline sql data base, it is possible to reconstruct it from the disk
import pipelet
pipelet.utils.rebuild_db_from_disk (prefix, sqlfile)
All information will be retrieve, but with new identifiers.
As described in the The segment environment section, Pipelet supports an hooking system which allows the use of generic processing code, and code sectioning.
Let’s consider a set of instructions that have to be systematically applied at the end of a segment (post processing), one can put those instruction in the separate script file named for example segname_postproc.py and calls the hook function:
hook('postproc', globals())
A specific dictionary can be passed to the hook script to avoid confusion.
The hook scripts are included into the hash key computation.
By default, segment scripts are read from a local directory, specified at the pipeline initialization with the parameter named code_dir:
from pipelet.pipeline import Pipeline
P = Pipeline(pipedot, code_dir="./", prefix="./")
The segment script contents are immediatly stored, to prevent from any modification between the pipeline start time and the actual execution of each segment.
It is generally a good idea to make this directory controlled by an RCS, to ease the reproducibility of the pipeline (even if the pipelet engine makes a copy of the segment script in the segment output directory).
If using Git, the revision number will be stored at the beginning of the copy of the segment script.
The Pipelet software provides a set of default utilities available from the segment environment. It is possible to extend this default environment or even re-write a completely new one.
The different environment utilities are actually methods of the class Environment. It is possible to add new functionalities by using the python heritage mechanism:
File : myenvironment.py:
from pipelet.environment import *
class MyEnvironment(Environment):
def my_function (self):
""" My function do nothing
"""
return
The Pipelet engine objects (segments, tasks, pipeline) are available from the worker attribut self._worker. See the The pipelet actors section for more details about the Pipelet machinery.
In order to start with a completely new environment, extend the base environment:
File : myenvironment.py:
from pipelet.environment import *
class MyEnvironment(EnvironmentBase):
def my_get_data_fn (self, x):
""" New name for get_data_fn
"""
return self._get_data_fn(x)
def _close(self, glo):
""" Post processing code
"""
return glo['seg_output']
From the base environment, the basic functionalities for getting file names and executing hook scripts are still available through:
The segment input argument is also stored in self._seg_input The segment output argument has to be returned by the _close(self, glo) method.
The pipelet engine objects (segments, tasks, pipeline) are available from the worker attribut self._worker. See doxygen documentation for more details about the Pipelet machinery.
To load another environment, set the pipeline environment attribute accordingly.
Pipeline(pipedot, codedir=, prefix=, env=MyEnvironment)
Pipeweb use the cherrypy web framework server and can be run behind an apache web server which brings essentially two advantages:
There is actually several way of doing so, the cherrypy documentation giving hints about each. We describe here an example case using mod_rewrite and virtual hosting.
The first thing we need is a working installation of apache with mod_rewrite activated. On a debian-like distribution this usually obtain by:
sudo a2enmod rewrite
sudo a2enmod proxy
sudo a2enmod proxy_http
We then configure apache to rewrite request to the cherrypy application except for the static files of the application that will be served directly. Here is a sample configuration file for a dedicated virtual host named pipeweb with pipelet installed under /usr/local/lib/python2.6/dist-packages/ .:
<VirtualHost pipeweb:80>
ServerAdmin pipeweb_admin@localhost
DocumentRoot /usr/local/lib/python2.6/dist-packages/pipelet
# ErrorLog /some/custom/error_file.log
# CustomLog /some/custom/access_file.log common
RewriteEngine on
RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
</VirtualHost>
Restart apache and start the pipeweb application to serve on the specified address and port: pipeweb start -H 127.0.0.1
There is also some possibility to start the application on demand using a cgi script like:
#!/usr/local/bin/python
print "Content-type: text/html\r\n"
print """<html><head><META HTTP-EQUIV="Refresh" CONTENT="1; URL=/"></head><body>Restarting site ...<a href="/">click here<a></body></html>"""
import os
os.system('pipeweb start -H 127.0.0.1')
To have it executed when the proxy detect the absence of the application:
<VirtualHost pipeweb:80>
#...
ScriptAliasMatch ^/pipeweb_autostart\.cgi$ /usr/local/bin/pipeweb_autostart.cgi
RewriteEngine on
RewriteCond %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
RewriteCond %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
ErrorDocument 503 /pipeweb_autostart.cgi
#...
</VirtualHost>
You may want to adjust ownership and suid of the pipeweb_autostart.cgi script so that it executes with the correct rights.
Pipeweb handles access rights using per pipeline ACL registered in the database file. It support Basic and Digest http authentication. When deploying the pipeweb interface in a production environment, one may want to defer a part of the authorization process to external and potentially more secure systems. The pipeweb behavior in term of authorization is controlled by the -A option that accept the following arguments: - Digest (default) Authenticate users via HTTP Digest authentication
according to the user:passwd list stored in the database.
Here is a complete configuration sample making of https, basic authentication, and per pipeline ACL to secure data browsing.:
<VirtualHost _default_:443>
ServerAdmin pipeweb_admin@localhost
DocumentRoot /usr/local/lib/python2.6/dist-packages/pipelet
# ErrorLog /some/custom/error_file.log
# CustomLog /some/custom/access_file.log common
# Adjust the ssl configuration to fit your needs
SSLEngine on
SSLCertificateFile /etc/ssl/certs/ssl-cert-snakeoil.pem
SSLCertificateKeyFile /etc/ssl/private/ssl-cert-snakeoil.key
# This handles authentication and access to the index page
# Access right checking to the various registered pipelines
# is left to pipeweb
<Location />
#Replace with Any suitable authentication system
AuthName "pipeweb"
AuthType Basic
AuthUserFile /etc/apache2/pipelet.pwd
require valid-user
</Location>
ScriptAliasMatch ^/pipeweb_autostart\.cgi$ /usr/local/bin/pipeweb_autostart.cgi
RewriteEngine on
RewriteCond %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
RewriteCond %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
ErrorDocument 503 /pipeweb_autostart.cgi
</VirtualHost>
And the corresponding cgi script:
#!/usr/local/bin/python
print "Content-type: text/html\r\n"
print """<html><head><META HTTP-EQUIV="Refresh" CONTENT="1; URL=/"></head><body>Restarting site ...<a href="/">click here<a></body></html>"""
import os
os.system('pipeweb start -H 127.0.0.1 -A ACL')