Introduction to Data Processing Workflow Languages���
Michael A Bouzinier
Harvard University Research Computing
Introduction to Data Processing Workflow Languages���
This course provides an easy-to-understand introduction to different languages designed to describe (and define) a data acquisition and data processing workflow.
Learn the differences between API-based and command-based pipelines, explore topologies, and get hands-on with popular descriptive languages like CWL and Nextflow. By the end, you’ll craft your own simple pipelines in CWL and Nextflow.
Pre-Requisites
Basic understanding of data processing concepts. No prior knowledge of specific workflow languages is required
Performing included exercises requires installation of Docker and Python on your computer
Agenda
Workflow automation
Data processing pipelines, a.k.a. workflows
Application Domains for Data Processing Workflows
Data acquisition as a workflow
For most computational studies, their data acquisition workflow can be represented as a data processing pipeline
The changing landscape […] has created a need for computational pipelines capable of efficiently orchestrating complex analysis stages while handling large volumes of data across heterogeneous computational environments. Workflow Management Systems (WfMSs) are the software components employed to fill this gap.
Nature Scientific Reports, https://www.nature.com/articles/s41598-021-99288-8
“
”
Pipeline is a DAG
Pipeline topologies
How to represent pipeline topologies
API-based
Command-based
Descriptive domain specific languages (DSL)
Workflow definition frameworks
For more information: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5429012/
API-based frameworks
Good
Bad
Examples: Apache Spark and Apache Hadoop
Command-based frameworks
Solve the problem of repeatability
Do not address the problem of reproducibility
Workflow definition languages
Example of Workflow definition languages
See table: https://www.nature.com/articles/s41598-021-99288-8/tables/1 for comparison of some popular workflow definition languages
This table is also useful: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5429012/table/bbw020-T1
Get your hands dirty
Exercises
Prerequisites
Java
Software you will need to install
CWL
Activate your virtual environment [optional]
Run the following commands:�pip install cwltool�pip install cwlref-runner
Details
https://github.com/common-workflow-language/cwltool#install
Nextflow
Run the following commands
If Java is not installed:
curl -s "https://get.sdkman.io" | bash
source ~/.sdkman/bin/sdkman-init.sh
sdk install java 17.0.6-amzn
When Java is installed:
curl -s https://get.nextflow.io | bash
Copy nextflow executable to /usr/local/bin
Details: https://www.nextflow.io/docs/latest/getstarted.html
CWL Examples
Hello World with CWL
cwlVersion: v1.2�class: CommandLineTool�baseCommand: echo��inputs:� message:� type: string� default: "Hello World"� inputBinding:� position: 1�outputs: []�
cwl-runner helloworld.cwl
cwl-runner helloworld.cwl --message "Bonjour le monde"
cwl-runner helloworld.cwl helloworld-job.json
cwl-runner helloworld.cwl helloworld-job.yml
{
"message": "こんにちは世界"
}
message: "こんにちは世界"
Nextflow Examples
Command:
nextflow run https://github.com/nextflow-io/hello
https://www.nextflow.io/docs/latest/example.html
Hello World with Nextflow�(using GitHub)
The simplest command:
nextflow run https://github.com/nextflow-io/hello
Your own Hello World with Nextflow
#!/usr/bin/env nextflow��params.message = 'Bonjour,Ciao,Hello,Hola'��process sayHello {� input:� val x� output:� stdout� script:� """� echo '$x world!'� """�}��workflow {� Channel.fromList(params.message.split(',') as List) | sayHello | view�}
nextflow run helloworld.nf
nextflow run helloworld.nf --message Olá,Ciao
helloworld.nf
Let us get more advanced
Problem statement
We have a tab-separated values file with several columns. We would like to select one numerical column (main variable) and explore how other numerical columns correlate with it.
Disclaimer: I know that all of you can easily write a Python, R, Java or C program that will do it. The purpose of this course, though is not to present a use case for a workflow definition language but to show how to use one.
A typical real world use case would include pulling data from multiple sources, applying different processing steps to different datasets, sometimes in a particular order, combining and recombining datasets
Fetching the solution to your local system
git clone https://github.com/mmcentre/pipelines-tutorial.git
(Or clone it in your favorite IDE)
Steps we will perform
We are not sure, that all required software is installed on your local system, therefore we will use a Docker container to perform the operations
CWL Pipeline
correlate_all.cwl: https://github.com/mmcentre/pipelines-tutorial/blob/main/src/cwl/correlate_all.cwl
class: Workflow�requirements:� …�
hints:� DockerRequirement:� dockerPull: forome/slimpipe�
inputs:� …
steps:� …
outputs:� …
CWL uses YaML syntax. Basically, CWL workflow is a YaML file.
Dependencies between steps are defined by passing outputs of one step to another
If a step needs to consume output of another step, it has to wait for that other step to complete
In this exercise, we have almost linear pipeline, but it is not always the case
CWL Steps: Cleanse
class: CommandLineTool�baseCommand: [gunzip, '-c']�inputs:� archive:� type: File� inputBinding:� position: 1�outputs:� unpacked:� type: stdout�stdout: $(inputs.archive.nameroot)�
class: CommandLineTool�baseCommand: [grep, '-v', '(null)' ]�inputs:� raw_data:� type: File� inputBinding:� position: 1�outputs:� clean_data:� type: stdout�stdout: $('clean-' + inputs.raw_data.basename)�
unpack:� run:� …� in:� archive: data� out:� - unpacked�
clean:� run:� …
in:� raw_data: unpack/unpacked� out:� - clean_data�
CWL Steps: Correlate
We use a very simple Python program:
import sys�import pandas��df = pandas.read_csv(sys.argv[1], sep='\t')�s1 = sys.argv[2]�s2 = sys.argv[3]�print(f"{s1.replace('_','-')} \t{s2.replace('_','-')}\t {df[s1].corr(df[s2])}")�
To run it with CWL, we need to create a tool:
correlate.cwl: https://github.com/mmcentre/pipelines-tutorial/blob/main/src/cwl/correlate.cwl
CWL: Parallelizing calculations
run: correlate.cwl�in:� program: program� data: clean/clean_data� column1: variable� column2: columns�scatter: column2�out:� - correlation_coefficient�
Perform after clean step
CWL Steps: Combine and sort outputs
class: CommandLineTool�baseCommand: [cat]�inputs:� files:� type: File[]� inputBinding:� position: 1�outputs:� combined:� type: stdout�stdout: correlations.txt
class: CommandLineTool�baseCommand: [sort, '-gk', '3,3']�inputs:� data:� type: File� inputBinding:� position: 1�outputs:� sorted:� type: stdout�stdout: correlations.txt�
CWL Steps: Plot
We use gnuplot and we need to create a simple script for it
Therefore, like we did for Python script, we are creating a CWL tool to pass a script to gnuplot utility
plot.cwl: https://github.com/mmcentre/pipelines-tutorial/blob/main/src/cwl/plot.cwl
plot:� run: plot.cwl� in:� script: plot� data: sort/sorted� out:� - plot
Perform after sort step
CWL Inputs
inputs:� program:� type: File� doc: Path to the file with Python code� plot:� type: File� doc: Path to the file containing Gnu Plot script� data:� type: File� doc: Path the tab-separated data file� variable:� type: string� doc: Names of the first columns� default: 'poverty'� columns:� type: string[]� doc: Names of the second columns�
default:� - no_grad� - density� - median_age� - median_household_income� - population_density� - smoke_rate� - mean_bmi� - tmmx� - pm25� - latitude� - longitude�
CWL Outputs
outputs:�# unpacked:�# type: File�# outputSource: unpack/unpacked�# clean_data:�# type: File�# outputSource: clean/clean_data�# correlations:�# type: File[]�# outputSource: correlate/correlation_coefficient� table:� type: File� outputSource: sort/sorted� plot:� type: File� outputSource: plot/plot�
Outputs, that are commented out are intermediate files, useful only for debugging purposes
CWL: Run the pipeline
Create work directory
cwl-runner --parallel ../pipelines-tutorial/src/cwl/correlate_all.cwl \
--program ../pipelines-tutorial/src/python/pipelines-tutorial/correlate.py \
--data https://htp-data.s3.us.cloud-object-storage.appdomain.cloud/data.csv.gz \
--plot ../pipelines-tutorial/src/gnuplot/plot.gpl
Nextflow Pipeline
main.nf: https://github.com/mmcentre/pipelines-tutorial/blob/main/src/nextflow/main.nf
Nextflow uses Groovy syntax: https://groovy-lang.org/
workflow {� main:�� variable = Channel.from(params.variable)� columns = Channel.fromList(params.columns)� rawDataFile = Channel.fromPath(params.data, checkIfExists:true)� cleanDataFile = clean(rawDataFile)� � cleanDataFile.combine(variable).combine(columns)� | correlate� | collectFile� | sort� | plot� | view�}
Dependencies between steps are defined by special operators, including ”pipe” operator “|” and by passing outputs of one step to another
https://www.nextflow.io/docs/latest/workflow.html#special-operators
More about Nextflow syntax
Groovy is Java family language, hence using curly brackets {} for blocks and “//” for comments.
Main block is workflow{}, refers to blocks called process. A process can be called using syntax of a procedure call. The whole workflow looks more like an ordinary program. See https://www.nextflow.io/docs/latest/process.html
If two processes are not dependent on each other (i.e., one does not use output of the other and no operators are used between them), they can be executed in any order or in parallel.
Conditional operators using if-then-else are allowed (we will not use them).
Scripts can be written inside the process definition as multiline strings. Normal shell operators like redirection (‘|’, ‘>’) can be used.
Nextflow: Multiline scripts
process correlate {� input:� tuple (path(data), val(var), val(col))�� output:� stdout�� script:� """� #!/usr/local/bin/python� import sys� import pandas�� df = pandas.read_csv('$data', sep='\t')� s1 = '$var'� s2 = '$col'� print(f"{s1.replace('_','-')} \t{s2.replace('_','-')}\t {df[s1].corr(df[s2])}")� """�}
�
Nextflow: shell operators
process clean {�� input:� path rawDataFile�� output:� path 'clean_data.csv'�� script:� """� #!/bin/sh� gunzip - < $rawDataFile | grep -v '(null)' > clean_data.csv� """�}�
Nextflow: inputs, outputs and channels
Nextflow: Conciseness
Nextflow: Run the pipeline
Nextflow pipelines can be run directly from VCS repository. We have already used this feature:
nextflow run https://github.com/nextflow-io/hello
To run our pipeline, create working directory (nextflow will produce a lot of debugging outputs!)
If the main workflow file is called main.nf, a path to directory containing it is sufficient. Alternatively, you can point nexflow executable to any specific arbitrarily named file.
Nextflow will also look for optional file nextflow.config in the same directory as main script.
nextflow run ../pipelines-tutorial/src/nextflow --data https://htp-data.s3.us.cloud-object-storage.appdomain.cloud/data.csv.gz
You have run two pipelines. What is next?
If you need to orchestrate your tools