1 of 56

Evolving Programming Models

for Massively Scalable Scientific Applications

Douglas Thain and the CCL Team

University of Notre Dame

Stevens Institute of Technology

12 November 2025

2 of 56

Evolving Programming Models

  • Large Scale Scientific Workflow Applications
  • The TaskVine Workflow System
  • Evolution of Programming Models
    • From Tasks to Functions
    • Dataflow with Dask + TaskVine
    • Dataflow with Parsl + TaskVine
    • Dynamic Data Reduction
    • Static Graph Compilation
  • Lessons Learned and What's Next?

3 of 56

The Cooperative Computing Lab

We collaborate with people who have large scale computing problems in science, engineering, and other fields.

We operate computer systems on the O(10,000) cores: clusters, clouds, grids.

We conduct computer science research in the context of real people and problems.

We develop open source software for large scale distributed computing.

http://ccl.cse.nd.edu

4 of 56

Computing is the Third Mode of Science

3. Computation

1. Theory

2. Experiment

5 of 56

Example: CMS Experiment at the LHC

  • The Compact Muon Solenoid (CMS) is one of four experiments at the Large Hadron Collider (LHC). The LHC generates proton beams that collide and generate new subatomic particles in order to learn new physics interactions and discover new particles. (e.g. Higgs Boson)
  • CMS observes 30M collisions ("events") per second, resulting in several PB of useful data each year. Physicists must write analysis code to sort through these events, select "interesting" categories, compute derived properties, and summarize new physical interactions.
  • Rapid iterative development in Python using well known numeric toolkits (numpy, pandas, etc).
  • Potential to execute with extreme parallelism on clusters with thousands of nodes!

1. Theory

2. Experiment

6 of 56

Typical CMS Data Analysis Apps

Example Application: DV3 (Small)

  • Consumes 1.5TB Data
  • Produces 17K Tasks
  • Uses 2400 cores on 200 nodes.
  • Runs in 3545s (~1 hour)

Kelci

Mohrman

Kevin Lannon

This is not bad,

but can we make it near-interactive?

Connor

Moore

7 of 56

More Scientific Workflows

Nanoreactors

ab-initio Chemistry

ForceBalance

FF Optimization

Adaptive Weighted

Ensemble

Molecular Dynamics

Lobster

CMS Data Analysis

SHADHO

Hyperparameter Optimization

Full Workflow:

12,500 species

x 15 climate scenarios

x 6 experiments

x 500 MB per projection

= 1.1M jobs, 72TB of output

Small Example: 10 species x 10 expts

Lifemapper

Species Distribution Modeling

8 of 56

Scientific computing usually starts here:

9 of 56

How do you go from laptop to 10,000 cores?

???

The scientific user doesn't want to start over from scratch writing everything in a new language!

10 of 56

The Scientific Workflow Approach

environ

file

file

file

app

First, encapsulate what you already have in a disciplined way: environment, inputs, invocation, outputs, resources.

RAM

Disk

CPU

11 of 56

The Scientific Workflow Approach

Then, connect together a large number of enscapsulated tasks into a graph structure expressing the entire objective.

12 of 56

The Scientific Workflow Approach

Workflow Manager

Task / Data Scheduler

Computing Facility

Express overall workflow structure, components, constraints, and goals.

Assign ready tasks and data objects to resources in the cluster, subject to runtime constraints.

Execute tasks on computational resources, store and move data between nodes.

13 of 56

Workflows are really interesting!

Workflows are massively parallel programs that can be scaled up and down at runtime based on available resources.

  • How do we express workflows in productive ways?
  • How do we execute workflows efficiently?
  • How do we predict performance and resource needs?

What are the tradeoffs between expressiveness, performance, and predictability?

14 of 56

Evolving Programming Models

  • Large Scale Scientific Workflow Applications
  • The TaskVine Workflow System
  • Evolution of Programming Models
    • From Tasks to Functions
    • From Functions to Data Flow Languages
    • Dynamic Data Reduction
    • Static Graph Compilation
  • Lessons Learned and What's Next?

15 of 56

TaskVine is a system for executing data intensive scientific workflows on clusters, clouds, and grids from very small to massive scale.

TaskVine controls the computation and storage capability of a large number of workers, striving to carefully manage, transfer, and re-use data and software wherever possible.

16 of 56

TaskVine Architecture Overview

16

HTCondor Pool

Workflow

TaskVine Mgr

tasks

results

Remote

Services

Shared

Filesystem

TaskVine

Worker

TaskVine

Worker

TaskVine

Worker

TaskVine

Worker

Files

Files

Files

Data

S/W

Other

App

Other

App

The TaskVine manager directs workers to read data from remote sources, run tasks on that data, and share data with each other.

TaskVine leaves data on workers in the cluster wherever possible!

17 of 56

TaskVine Worker

f3

f5

url

sd698d

url

wq73dv

temp

xyz123

file

su3g2n

buffer

r223cdf

T1

data.tar.gz

output.txt

T2

config

input.txt

output.txt

Task 1 Sandbox

Task 2 Sandbox

Workflow

Manager

tasks

results

RAM

CPU

0

CPU

1

GPU

0

GPU

1

File = Single file or complex dir.

Manager directs all file movements and accesses.

Files are immutable and given a unique cache name.

Each task runs in a sandbox with a private namespace and an allocation of cores, memory, disk, and gpus.

TaskVine Worker

TaskVine Worker

18 of 56

Low Level API: Declare Files

import ndcctools.taskvine as vine

m = vine.Manager(9123)

file = m.declareFile("mydata.txt")

buffer = m.declareBuffer("Some literal data")

url = m.declareURL("https://somewhere.edu/data.tar.gz")

temp = m.declareTemp();

data = m.declareUntar( url )

package = m.declareStarch( executable )

19 of 56

Low Level API: Connect Tasks to Files

task = vine.Task("mysim.exe -p 50 input.data -o output.data")

t.add_input(url,"input.data")

t.add_output(temp,"output.data")

t.set_cores(4)

t.set_memory(2048)

t.set_disk(100)

t.set_tag("simulator")

taskid = m.submit(t)

20 of 56

Low Level API: Function Shipping

task = vine.PythonTask(simulate_func,molecule,parameters)

t.add_input(url,"input.data")

t.add_output(temp,"output.data")

t.set_cores(4)

t.set_memory(2048)

t.set_disk(100)

t.set_tag("simulator")

taskid = m.submit(t)

21 of 56

Low Level API: Build up a DAG

x = m.define_file()

y = m.define_file()

z = m.define_file()

. . .

a = Task()

b = Task()

c = Task()

. . .

a.add_input(x,"data")

a.add_output(y,"temp")

. . .

22 of 56

Barry Sly-Delgado, Thanh Son Phung, Colin Thomas, David Simonetti, Andrew Hennessee, Ben Tovar, Douglas Thain,

TaskVine: Managing In-Cluster Storage for High-Throughput Data Intensive Workflows,

18th Workshop on Workflows in Support of Large-Scale Science, November, 2023. DOI: 10.1145/3624062.3624277

23 of 56

Evolving Programming Models

  • Large Scale Scientific Workflow Applications
  • The TaskVine Workflow System
  • Evolution of Programming Models
    • From Tasks to Functions
    • From Functions to Data Flow Languages
    • Dynamic Data Reduction
    • Static Graph Compilation
  • Lessons Learned and What's Next?

24 of 56

More Tasks Requires Smaller Tasks

and that Requires Lower Overhead!

Ideal

Nodes Running

Elapsed Time

Nodes Running

Elapsed Time

Ideal

Actual

Actual

Long Running

N nodes for 60 minutes

High Concurrency

N*10 nodes for 6 minutes

Use Function

Tasks

to Reduce

Overhead!

25 of 56

From Tasks to Libraries and Functions

Task

Input

Output

LibraryTask

F

Args

Result

A Task runs to completion a single time, reading input files, and producing output files.

A LibraryTask contains a Function.

It receives arguments, produces results, but then stays running, waiting for the next invocation.

Args

Args

Result

Result

26 of 56

Functions as a Service - Install Library

26

# Define ordinary Python functions

def my_sum(x, y):

return x+y

def my_mul(x, y):

return x*y

# Create a library object from functions

L = m.create_library_from_functions(

"my_library",my_sum, my_mul)

# Install the library on all workers.

m.install_library(L)

worker

worker

worker

L

L

L

py

py

py

manager

27 of 56

Functions as a Service - Invoke Function

27

# Define a function invocation and submit it

for i in range(1,100):

t = vine.FunctionCall("my_library","my_sum",10,i)

worker

worker

worker

L

L

L

py

py

py

λ

manager

λ

λ

λ

λ

λ

λ

λ

λ

λ

λ

λ

Simply converting "import tensorflow" into the preamble of a LibraryTask saves 1.2GB of Python libraries, 30K metadata system calls, and 5-10s latency per FunctionCall.

David Simonetti, Poster: Mixed Modality Workflows in TaskVine,

ACM High Performance Distributed Computing, pages 331-332, August, 2023.

28 of 56

Evolving Programming Models

  • Large Scale Scientific Workflow Applications
  • The TaskVine Workflow System
  • Evolution of Programming Models
    • From Tasks to Functions
    • From Functions to Data Flow Languages
    • Dynamic Data Reduction
    • Static Graph Compilation
  • Lessons Learned and What's Next?

29 of 56

Data Flow Languages on TaskVine

TaskVine Manager

Parsl

Coffea

Dask

Custom

App

Python

C or Python

Python

Python

HPC Cluster

TaskVine

Worker

TaskVine

Worker

TaskVine

Worker

TaskVine

Worker

TaskVine

Worker

import taskvine

file = URL(www)

m.submit(task)

task = m.wait(5)

We can use existing high level languages to generate tasks and files, connect them together, and pass the graph along to TaskVine for execution.

30 of 56

Generalized Dataflow Programming

a = dataset("/path/…").split(n)

b = a.map(f).reduce(g)

c = a.reduce(g)

d = h(a,b)

# nothing happens until:�print (d)

f

f

f

f

t0

t1

t2

t3

b

g

dataset

a0

a1

a2

a3

g

c

h

g = Task()

g.add_input(t0)

g.add_input(t1)

g.add_output(r)

31 of 56

Dataflow Programming with Dask + TaskVine

32 of 56

Dataflow Programming with Dask + TaskVine

Barry Sly-Delgado, Ben Tovar, Jin Zhou, and Douglas Thain,

Reshaping High Energy Physics Applications for Near-Interactive Execution Using TaskVine,

ACM/IEEE Supercomputing, pages 1-11, November, 2024. DOI: 10.1109/SC41406.2024.00068

Program generated DAG of 185,000 tasks running on 7000 cores done in 41 minutes.

33 of 56

Example: DV3 HEP Application

Wide reduction trees are sensitive to failure (keep retrying!) and require maintaining a large amount of state on disk until the final task runs.

34 of 56

Restructuring Task Graphs Via Annotations

We could ask the user to rewrite the application, but this requires substantial low level knowledge.

Instead: ask them to label functions with algebraic properties: commutative, associative, etc.

Let the workflow system rearrange the graph within provided constraints.`

def write(events):

import awkward as ak

d = ak_to_parquet(events,"out_parquet")

return d

@associative

def merge(*events):

import awkward as ak

return ak.concatenate(events)

graph = {}

events = get_events()

d = dask.delayed(merge)(events)

d = dask.delayed(write)(d)

graph[f"{dataset_name}_write"] = d

35 of 56

Same Application: Hierarchical Reduction

Barry Sly-Delgado

unmodified program

@associative

(avoids long tail)

36 of 56

TaskVine Workers

Parallel Filesystem

Parsl DAG Manager

Parsl Application

TaskVine Executor

expand

release

dispatch

?

shared filesystem access

ready

tasks

blocked

tasks

Parsl knows about the entire program and releases tasks that are "ready" to TaskVine for execution. TaskVine handles scheduling and data locality.

37 of 56

Problem: Data Locality

Intermediate File

Persistent File

Task

38 of 56

Solution: Task Groups

Intermediate File

Persistent File

Task

Task Group

39 of 56

Task Grouping with Parsl and TaskVine

  • By declaring an intermediate data item with ‘taskvinetemp://’, the Parsl DAG manager will send tasks to the scheduler

Parsl

DFK

Parsl DAG

TaskVine Ready Tasks

Intermediate File (taskvinetemp://)

Persistent File

Group Task

Independent Task

40 of 56

Effect of Task Grouping on Montage

Colin Thomas and Douglas Thain,

Liberating the Data Aware Scheduler to Achieve Locality in Layered Scientific Workflow Systems,

IEEE Conference on eScience, pages 9, September, 2025.

41 of 56

Managing Function State for LLMs

Traditionally, model loading is coupled with the actual inferences, resulting in repeated LLM creation costs over many invocations of the inference function.

The LLM creation is now separated into a load function, which will be contextualize in a Library to serve multiple inference invocations, effectively amortizing its creation cost.

42 of 56

Exploiting Context with Parsl and TaskVine

42

Large Scale Neural Network Inference

100K tasks on 150 x 32c workers

T. Phung, C. Thomas, L. Ward, K. Chard, D. Thain, Accelerating Function-Centric Applications by Discovering, Distributing, and Retaining Reusable Context in Workflow Systems, ACM International Symposium on High-Performance Parallel and Distributed Computing (HPDC), June, 2024.

  • L1 - Traditional Access to HPC Filesystem.
  • L2 - Tasks with data cached on workers.
  • L3 - Functions retaining state at workers.

43 of 56

Evolving Programming Models

  • Large Scale Scientific Workflow Applications
  • The TaskVine Workflow System
  • Evolution of Programming Models
    • From Tasks to Functions
    • From Functions to Data Flow Languages
    • Dynamic Data Reduction
    • Static Graph Compilation
  • Lessons Learned and What's Next?

44 of 56

Challenge of Scaling Up Graphs

(this is the small version)

As we scale up the DV5 application up to O(1M) nodes, the graph construction performed by Dask begins to dominate the execution time! (Note common structure within each branch.)

45 of 56

Task Scheduler

DAG Scheduler

Worker Node

Schedule Fine Grained Tasks to Workers

Release Fine Grained Tasks to Scheduler

Worker Node

Worker Node

Generate Complete Graph for Application

Coffea-Dask Application

DDR Scheduler

Generate Abstract Representation

Task Scheduler

Dask Local

DDR Application

Worker Node

Worker Node

Dask Local

Dask Local

pending

Worker Node

running

Generate Variable Sized Subgraphs Specs

Dispatch Subgraphs Specs as Tasks to Workers

d

d

d

d

r

p

p

done

All Datasets

One Dataset

46 of 56

Dynamic Data Reduction API

def preprocess(dataset_info, **kwargs):

# break the dataset into chunks

def postprocess(val, **kwargs):

# compute the properties of each chunk

def processor(x):

# a data parallel Dask program

def reducer(a, b):

# commutative / associative function

ddr = DynamicDataReduction(

mgr, data,

preprocess, postprocess,

processor, reducer )

  • Cortado data analysis application:�419 datasets, 19631 files, 14TB, 12 billion events
  • Generating the complete Dask graph the simple way takes 20 hours even before execution starts!
  • DDR method with Cortado completes the entire execution on 250 8-core nodes (1600 cores) in 5.5 hours.

Kelci

Mohrman

Ben

Tovar

47 of 56

Evolving Programming Models

  • Large Scale Scientific Workflow Applications
  • The TaskVine Workflow System
  • Evolution of Programming Models
    • From Tasks to Functions
    • From Functions to Data Flow Languages
    • Dynamic Data Reduction
    • Static Graph Compilation
  • Lessons Learned and What's Next?

48 of 56

Observation: Serialization Overheads

  • A task needs to go through the lifecycle: Serialization, Scheduling, Execution, Retrieval
  • Amdahl's law: throughput is limited by the Serialization phase: if each serialization takes 20ms, the maximum achievable throughput is only 100 tasks/second, which limits our scalability.
  • Observation: the same functions get used many times over, can we ensure that each one only gets serialized once?

49 of 56

Static Graph Compilation

Runtime Execution Graph (REG)

Strategic Orchestration Graph (SOG)

TaskVine Manager

a

b

c

d

e

def func1(a, b):

return (a + b) * 2

Task1 = func1(10, 20)

Task2 = func1(Task1, 30)

Submit Task Graph

1

2

3

4

5

Proxy Function

fork

“run task 1”

Tracks data dependencies and computation status

REG

fork

fork

1->a

2->b

3->c

“run task 2”

“run task 3”

Stores all task definitions, executables, and arguments

Install REG on every worker

Worker

In this approach, the entire graph is compiled upfront and distributed to all workers. Task execution then only involves sending the task key to the worker, eliminating the need for Serialization.

50 of 56

Initial Results from Graph Compilation

Static compilation dramatically improves task latency and increases throughput by 100X on small tasks!

Jin Zhou

51 of 56

Evolving Programming Models

  • Large Scale Scientific Workflow Applications
  • The TaskVine Workflow System
  • Evolution of Programming Models
    • From Tasks to Functions
    • From Functions to Data Flow Languages
    • Annotating Data Flow
    • Graph Generation Approaches
  • Lessons Learned and What's Next?

52 of 56

The Design of Abstractions

The fundamental problem of systems research lies in the design of abstractions for the end user (or the layer above.)

  • A flexible abstraction solves a large class of problems, but is not optimized for solving any one problem.
  • A rigid abstraction solves one problem very well, but is not capable of adapting to other types of problems.

Find ways to hoist common definitions of code/data/structure so that they are processed once and then distributed.

It is usually better to compile than to interpret structures!

53 of 56

Dealing with "Real World" Applications

  • "Real" Applications with "Real" Users are "Real" Messy!
    • Crazy software dependencies that are slow to install.
    • Datasets that are too large to fit on your laptop.
    • Complex graph structures that take forever to set up.
  • Advice: Embrace the messy problems, because those are the system design issues that actually affect the conduct of science!
    • Create an efficient solution for distributed software installation.
    • Build a large scale data distribution system for data >> laptop.
    • Redesign the abstraction to avoid large graph structures.
  • Messy problems represent system design challenges that are important, interesting, and publishable research!

54 of 56

The Translational Research Cycle

Develop a

Research Concept

Write Code

Test & Evaluate

Publish Results

Release Software

Engage Users

55 of 56

Give TaskVine a Try!

55

  • TaskVine is a component of the Cooperative Computing Tools (cctools) from Notre Dame alongside Makeflow, Work Queue, Resource Monitor, etc.
  • Release 7.15.4 made in Nov 2025.
  • Research software with an engineering process: issues, tests, manual, examples.
  • We are eager to collaborate with new users on applications and challenges!

This work was supported by

NSF Award OAC-1931348

conda install -c conda-forge ndcctools

56 of 56

For more information…

56

CCL Staff and Students

conda install -c conda-forge ndcctools

This work was supported by

NSF Award OAC-1931348