Tutorial¶
Imports¶
To use Plumbium you should start by importing the call
function, record
decorator, pipeline
instance and
any plumbium.artefacts
you need. Artefacts are classes representing the
data files used by your pipeline e.g. text files and images.
from plumbium import call, record, pipeline
from plumbium.artefacts import TextFile
Processing stages¶
Next, define the stages of your analysis to be recorded. For this example
we’ll concatenate two files in the first stage and then count the words of the
resulting file in the second stage. The record
decorator indicates that the function should
be recorded. The list of arguments to record is used to name the return values
from the function - the number of arguments to record should match the number
of variables returned the the function. Calls to external programs should be
made using call
so that printed output
can be captured.
@record('concatenated_file')
def concatenate(input_1, input_2):
cmd = 'cat {0.filename} {1.filename} > joined.txt'.format(
input_1, input_2
)
call([cmd], shell=True)
return TextFile('joined.txt')
@record(count)
def count_words(target):
wc_output = call(['wc', target.filename])
return int(wc_output.strip())
The complete pipeline¶
Now to use our stages to define the whole pipeline. Functions decorated with
record
return an instance of
ProcessOutput
, the outputs from
the function can be accessed using a dict-like method.
def cat_and_count(input_1, input_2):
concatenate_output = concatenate(input_1, input_2)
count_output = count_words(concatenate_output['concatenated_file'])
return count_output['count']
Running the pipeline¶
Finally we use pipeline.run
to
execute the pipeline.
import sys
if __name__ == '__main__':
input_1 = TextFile(sys.argv[1])
input_2 = TextFile(sys.argv[2])
pipeline.run('cat_and_count', cat_and_count, '.', input_1, input_2)
To try this out save the complete example as tutorial.py, create a pair of text
files in the same directory and then run python tutorial.py [text file 1]
[text file 2]
. If everything works no errors should be printed and a file
called cat_and_count-[date]_[time].tar.gz
should be created.
Results¶
Extract the result file using tar -zxf [result file]
and have a look in the
new directory. You’ll find the two files that you used as input to the script,
the result output of concatenating the files as joined.txt
and a .json
file. If you open the .json
file you’ll see a full record of the commands
run (any errors that occur will also be recorded in this file).
{
"processes": [
{
"function": "concatenate",
"returned": [
"TextFile('joined.txt')"
],
"input_kwargs": {},
"finish_time": "20160426 12:13",
"start_time": "20160426 12:13",
"printed_output": "",
"input_args": [
"TextFile('text_file.txt')",
"TextFile('text_file2.txt')"
],
"called_commands": [
"cat text_file.txt text_file2.txt"
]
},
{
"function": "count_words",
"returned": [],
"input_kwargs": {},
"finish_time": "20160426 12:13",
"start_time": "20160426 12:13",
"printed_output": "4 joined.txt\n",
"input_args": [
"TextFile('joined.txt')"
],
"called_commands": [
"wc joined.txt"
]
}
],
"name": "cat_and_count",
"finish_date": "20160426 12:13",
"start_date": "20160426 12:13",
"results": {
"0": 1234
},
"dir": ".",
"inputs: [
"TextFile('text_file.txt')",
"TextFile('text_file2.txt')"
],
"environment": {
"python_packages": [
...
],
"hostname": "machine.example.com",
"environ": {
...
},
"uname": [
"Linux"
"machine.example.com"
"3.10.0-327.18.2.el7.x86_64",
"#1 SMP Thu May 12 11:03:55 UTC 2016",
"x86_64"
]
}
}