1 of 44

Introduction to Data Processing Workflow Languages���

Michael A Bouzinier

Harvard University Research Computing

2 of 44

Introduction to Data Processing Workflow Languages���

This course provides an easy-to-understand introduction to different languages designed to describe (and define) a data acquisition and data processing workflow.

Learn the differences between API-based and command-based pipelines, explore topologies, and get hands-on with popular descriptive languages like CWL and Nextflow. By the end, you’ll craft your own simple pipelines in CWL and Nextflow.

Pre-Requisites

Basic understanding of data processing concepts. No prior knowledge of specific workflow languages is required

Performing included exercises requires installation of Docker and Python on your computer

3 of 44

Agenda

  • How to automate workflows and what are data processing pipelines?
  • What languages are used to define a pipeline?
  • Ensure prerequisites are met and software is installed
  • Run simple pipelines
    • CWL: maximum portability and standardization
    • Nextflow: better readability and ease of use
  • Explore a more advanced pipeline
    • In CWL
    • In Nextflow

4 of 44

Workflow automation

Data processing pipelines, a.k.a. workflows

5 of 44

Application Domains for Data Processing Workflows

  • Data Science Pipelines are primarily used for machine and deep learning, but also for data cleansing and feature extraction.
  • Bioinformatics Pipelines are used for processing biological information, primarily *-omics data.
  • Text Analytics Pipelines extract structured data from various texts and include standalone NLP pipelines as well as sub-pipelines that are embedded as modules in a data science or a bioinformatics pipeline.
  • There are a few universal, general purpose pipeline frameworks.
  • There are also a number of domain specific frameworks
  • We will focus on the Data Science domain
    • Leaving bioinformatics and text analytics aside

6 of 44

Data acquisition as a workflow

  • Your model (AI, ML) requires data
  • Data acquisition is a workflow
    • Can be preformed manually
    • Prone to mistakes
    • Not reproducible
    • Can be improved with proper standard operating procedures (SoPs)
  • Automating data acquisition
    • Brings reproducibility
    • Saves time when the task need to be repeated

For most computational studies, their data acquisition workflow can be represented as a data processing pipeline

7 of 44

The changing landscape […] has created a need for computational pipelines capable of efficiently orchestrating complex analysis stages while handling large volumes of data across heterogeneous computational environments. Workflow Management Systems (WfMSs) are the software components employed to fill this gap. 

8 of 44

Pipeline is a DAG

  • DAG stand for: directed acyclic graph
  • A data processing pipeline consists of steps:
  • A step might be
    • a script
    • a binary executable
    • a specific data transformation
  • Some steps are dependent on the results of other steps.
  • Avery workflow can be represented as DAG
    • steps are the nodes
    • dependencies are the edges

9 of 44

Pipeline topologies

  • Pipeline topologies are complex
    • They provide for massive parallelization and multiple dependencies.
  • When pipeline logic is expressed in a procedural programming language, the result is a very convoluted program that is economically ineffective to maintain.
  • Fortunately, alternatives to procedural languages have been proposed and are widely used.�

10 of 44

How to represent pipeline topologies

API-based

Command-based

Descriptive domain specific languages (DSL)

Workflow definition frameworks

11 of 44

API-based frameworks

Good

  • enforces good and clean architecture
  • Usually efficient in utilizing computational resources
  • Possible to leverage libraries available in a supported programming language

Bad

  • requires special programming for each step
  • Not possible to leverage standalone command line tools
  • difficult to refactor
    • porting legacy pipelines would require a full rewrite from scratch
    • If a tool used in the existing pipeline is not programmatically callable from inside your chosen framework, the resulting problem might be extremely difficult to solve

Examples: Apache Spark and Apache Hadoop

12 of 44

Command-based frameworks

  • The most popular: Apache Airflow
  • Also: Luigi (https://github.com/spotify/luigi), Apache Taverna, now retired (https://taverna.apache.org/)
  • Focused on description of the pipeline topology
      • can leverage any tool that can be launched from a command line
  • Well suited to support incremental porting of legacy pipelines
  • The topology is defined explicitly in dedicated modules
    • in a general-purpose programming language, such as Python or Java
    • Allowing for interlocation of business logic and topology definition
      • Difficult to maintain the cleanness during workflow evolution
  • Do not explicitly define inputs, resources and outputs
    • Unable to detect a hardcoded path to a file or a local resource is used by the workflow

Solve the problem of repeatability

Do not address the problem of reproducibility

13 of 44

Workflow definition languages

  • Address both repeatability and reproducibility
  • Steps are backboxes
  • Explicit definition of the pipeline topology
  • Insulation of the actual processing algorithms from
    • topology
    • inputs
    • requirements
    • outputs
  • Can be easily stored in any version control system (VCS)
  • Can be easily included in DevOps pipelines

14 of 44

Example of Workflow definition languages

  • The most widely adopted is Common Workflow Language (CWL)
  • The latest newcomer, the least popular, but possibly, the most advanced language is Nextflow
  • Workflow Definition Language (WDL) is developed by Broad Institute of Harvard and MIT
  • Swift/T: Swift, the parallel scripting language, is powered by turbine, the execution engine.
    • Written by physicists and engineers to emphasize scalable deployment of short, rapid-fire tasks at exascale.
    • A fairly low-level language (similar to C) and extremely powerful
    • Has a steep learning curve
  • Snakemake (https://lachlandeer.github.io/snakemake-econ-r-tutorial/index.html)

See table: https://www.nature.com/articles/s41598-021-99288-8/tables/1 for comparison of some popular workflow definition languages

This table is also useful: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5429012/table/bbw020-T1

15 of 44

Get your hands dirty

Exercises

16 of 44

Prerequisites

Java

17 of 44

Software you will need to install

CWL

Activate your virtual environment [optional]

Run the following commands:�pip install cwltool�pip install cwlref-runner

Details

https://github.com/common-workflow-language/cwltool#install

Nextflow

Run the following commands

If Java is not installed:

curl -s "https://get.sdkman.io" | bash

source ~/.sdkman/bin/sdkman-init.sh

sdk install java 17.0.6-amzn

When Java is installed:

curl -s https://get.nextflow.io | bash

Copy nextflow executable to /usr/local/bin

Details: https://www.nextflow.io/docs/latest/getstarted.html

18 of 44

CWL Examples

  • Hello World

https://www.commonwl.org/user_guide/introduction/quick-start.html

  • User Guide

https://www.commonwl.org/user_guide/

19 of 44

Hello World with CWL

cwlVersion: v1.2�class: CommandLineTool�baseCommand: echo��inputs:� message:� type: string� default: "Hello World"� inputBinding:� position: 1�outputs: []�

cwl-runner helloworld.cwl

cwl-runner helloworld.cwl --message "Bonjour le monde"

cwl-runner helloworld.cwl helloworld-job.json

cwl-runner helloworld.cwl helloworld-job.yml

{

"message": "こんにちは世界"

}

message: "こんにちは世界"

20 of 44

Nextflow Examples

Command:

nextflow run https://github.com/nextflow-io/hello    

  • Basic Example

https://www.nextflow.io/docs/latest/example.html

  • More Examples

https://github.com/nextflow-io/patterns#nextflow-patterns

21 of 44

Hello World with Nextflow�(using GitHub)

The simplest command:

nextflow run https://github.com/nextflow-io/hello  

22 of 44

Your own Hello World with Nextflow

#!/usr/bin/env nextflow��params.message = 'Bonjour,Ciao,Hello,Hola'��process sayHello {� input:� val x� output:� stdout� script:� """� echo '$x world!'� """�}��workflow {� Channel.fromList(params.message.split(',') as List) | sayHello | view�}

nextflow run helloworld.nf

nextflow run helloworld.nf --message Olá,Ciao

helloworld.nf

23 of 44

Let us get more advanced

24 of 44

Problem statement

We have a tab-separated values file with several columns. We would like to select one numerical column (main variable) and explore how other numerical columns correlate with it.

Disclaimer: I know that all of you can easily write a Python, R, Java or C program that will do it. The purpose of this course, though is not to present a use case for a workflow definition language but to show how to use one.

A typical real world use case would include pulling data from multiple sources, applying different processing steps to different datasets, sometimes in a particular order, combining and recombining datasets

25 of 44

Fetching the solution to your local system

  • This task would take more time to implement than we have
    • Hence, we will explore a solution that I have prepared and will try to modify it
  • First clone the project (https://github.com/mmcentre/pipelines-tutorial):

git clone https://github.com/mmcentre/pipelines-tutorial.git

(Or clone it in your favorite IDE)

26 of 44

Steps we will perform

  1. Cleanse the data
    • The original file contains some unparseable values, e.g., “(null)” for numeric data
    • We will use command line utility grep to remove rows with unparseable values
  2. For every column that we would like to correlate with the main variable, we will calculate Pearson Correlation Coefficient using Python pandas package
    • This can be done in parallel
  3. Combine results of the step 2 into a single file and sort it
  4. Plot a bar chart, showing correlations using gnuplot

We are not sure, that all required software is installed on your local system, therefore we will use a Docker container to perform the operations

27 of 44

CWL Pipeline

class: Workflow�requirements:� …�

hints:� DockerRequirement:� dockerPull: forome/slimpipe�

inputs:� …

steps:� …

outputs:� …

CWL uses YaML syntax. Basically, CWL workflow is a YaML file.

Dependencies between steps are defined by passing outputs of one step to another

If a step needs to consume output of another step, it has to wait for that other step to complete

In this exercise, we have almost linear pipeline, but it is not always the case

28 of 44

CWL Steps: Cleanse

class: CommandLineTool�baseCommand: [gunzip, '-c']�inputs:� archive:� type: File� inputBinding:� position: 1�outputs:� unpacked:� type: stdout�stdout: $(inputs.archive.nameroot)�

class: CommandLineTool�baseCommand: [grep, '-v', '(null)' ]�inputs:� raw_data:� type: File� inputBinding:� position: 1�outputs:� clean_data:� type: stdout�stdout: $('clean-' + inputs.raw_data.basename)�

unpack:� run:� …� in:� archive: data� out:� - unpacked�

clean:� run:� …

in:� raw_data: unpack/unpacked� out:� - clean_data�

29 of 44

CWL Steps: Correlate

We use a very simple Python program:

import sys�import pandas��df = pandas.read_csv(sys.argv[1], sep='\t')�s1 = sys.argv[2]�s2 = sys.argv[3]�print(f"{s1.replace('_','-')} \t{s2.replace('_','-')}\t {df[s1].corr(df[s2])}")�

To run it with CWL, we need to create a tool:

correlate.cwl: https://github.com/mmcentre/pipelines-tutorial/blob/main/src/cwl/correlate.cwl

30 of 44

CWL: Parallelizing calculations

  • Use scatter feature of CWL to execute a step for every input:

run: correlate.cwl�in:� program: program� data: clean/clean_data� column1: variable� column2: columns�scatter: column2�out:� - correlation_coefficient�

Perform after clean step

31 of 44

CWL Steps: Combine and sort outputs

class: CommandLineTool�baseCommand: [cat]�inputs:� files:� type: File[]� inputBinding:� position: 1�outputs:� combined:� type: stdout�stdout: correlations.txt

class: CommandLineTool�baseCommand: [sort, '-gk', '3,3']�inputs:� data:� type: File� inputBinding:� position: 1�outputs:� sorted:� type: stdout�stdout: correlations.txt�

32 of 44

CWL Steps: Plot

We use gnuplot and we need to create a simple script for it

Therefore, like we did for Python script, we are creating a CWL tool to pass a script to gnuplot utility

plot.cwl: https://github.com/mmcentre/pipelines-tutorial/blob/main/src/cwl/plot.cwl

plot:� run: plot.cwl� in:� script: plot� data: sort/sorted� out:� - plot

Perform after sort step

33 of 44

CWL Inputs

inputs:� program:� type: File� doc: Path to the file with Python code� plot:� type: File� doc: Path to the file containing Gnu Plot script� data:� type: File� doc: Path the tab-separated data file� variable:� type: string� doc: Names of the first columns� default: 'poverty'� columns:� type: string[]� doc: Names of the second columns�

default:� - no_grad� - density� - median_age� - median_household_income� - population_density� - smoke_rate� - mean_bmi� - tmmx� - pm25� - latitude� - longitude�

  • We are passing Python and Gnuplot scripts as arguments
  • This is a bit ugly, but simplify the deployment
  • One alternative would be to deploy the Python module as a package in the docker container we are using for execution

34 of 44

CWL Outputs

outputs:�# unpacked:�# type: File�# outputSource: unpack/unpacked�# clean_data:�# type: File�# outputSource: clean/clean_data�# correlations:�# type: File[]�# outputSource: correlate/correlation_coefficient� table:� type: File� outputSource: sort/sorted� plot:� type: File� outputSource: plot/plot�

Outputs, that are commented out are intermediate files, useful only for debugging purposes

35 of 44

CWL: Run the pipeline

Create work directory

cwl-runner --parallel ../pipelines-tutorial/src/cwl/correlate_all.cwl \

--program ../pipelines-tutorial/src/python/pipelines-tutorial/correlate.py \

--data https://htp-data.s3.us.cloud-object-storage.appdomain.cloud/data.csv.gz \

--plot ../pipelines-tutorial/src/gnuplot/plot.gpl

36 of 44

Nextflow Pipeline

workflow {� main:�� variable = Channel.from(params.variable)� columns = Channel.fromList(params.columns)� rawDataFile = Channel.fromPath(params.data, checkIfExists:true)� cleanDataFile = clean(rawDataFile)� � cleanDataFile.combine(variable).combine(columns)� | correlate� | collectFile� | sort� | plot� | view�}

Dependencies between steps are defined by special operators, including ”pipe” operator “|” and by passing outputs of one step to another

https://www.nextflow.io/docs/latest/workflow.html#special-operators

37 of 44

More about Nextflow syntax

Groovy is Java family language, hence using curly brackets {} for blocks and “//” for comments.

Main block is workflow{}, refers to blocks called process. A process can be called using syntax of a procedure call. The whole workflow looks more like an ordinary program. See https://www.nextflow.io/docs/latest/process.html

If two processes are not dependent on each other (i.e., one does not use output of the other and no operators are used between them), they can be executed in any order or in parallel.

Conditional operators using if-then-else are allowed (we will not use them).

Scripts can be written inside the process definition as multiline strings. Normal shell operators like redirection (‘|’, ‘>’) can be used.

38 of 44

Nextflow: Multiline scripts

process correlate {� input:� tuple (path(data), val(var), val(col))�� output:� stdout�� script:� """� #!/usr/local/bin/python� import sys� import pandas�� df = pandas.read_csv('$data', sep='\t')� s1 = '$var'� s2 = '$col'� print(f"{s1.replace('_','-')} \t{s2.replace('_','-')}\t {df[s1].corr(df[s2])}")� """�}

39 of 44

Nextflow: shell operators

process clean {�input:� path rawDataFile�� output:� path 'clean_data.csv'�� script:� """� #!/bin/sh� gunzip - < $rawDataFile | grep -v '(null)' > clean_data.csv� """�}�

40 of 44

Nextflow: inputs, outputs and channels

  • Inputs are passed as channels
  • Processes pass channels from one to another
  • Channels can be combined, concatenated, used in a Cartesian product
    • A reach library of operators is available to manipulate with channels
    • https://www.nextflow.io/docs/latest/operator.html
  • Channel factory fromPath() is used to pass a file
  • Workflow outputs are defined in each process rather than in the workflow
    • This is different from CWL!
    • Use publishDir directive to define what outputs will be published

41 of 44

Nextflow: Conciseness

  • Because we can use multiline scripts, we do not need to create separate tools for Python and Gnuplot programs
  • We do not need to pass script files as parameters
  • Because we can use shell redirection, we can combine several steps into one
  • Our whole Nextflow pipeline fits into a single file
    • But, of course, you can have separate modules containing processes and subworkflows
    • Which is, normally a case for more complex pipelines

42 of 44

Nextflow: Run the pipeline

Nextflow pipelines can be run directly from VCS repository. We have already used this feature:

nextflow run https://github.com/nextflow-io/hello  

To run our pipeline, create working directory (nextflow will produce a lot of debugging outputs!)

If the main workflow file is called main.nf, a path to directory containing it is sufficient. Alternatively, you can point nexflow executable to any specific arbitrarily named file.

Nextflow will also look for optional file nextflow.config in the same directory as main script.

nextflow run ../pipelines-tutorial/src/nextflow --data https://htp-data.s3.us.cloud-object-storage.appdomain.cloud/data.csv.gz

43 of 44

You have run two pipelines. What is next?

44 of 44

If you need to orchestrate your tools

  • Orchestration is a related topic
  • If you are serious into creating complex workflows, beside defining a topology you will need orchestration tools
  • Check some of them: