Evolving Programming Models
for Massively Scalable Scientific Applications
Douglas Thain and the CCL Team
University of Notre Dame
Stevens Institute of Technology
12 November 2025
Evolving Programming Models
The Cooperative Computing Lab
We collaborate with people who have large scale computing problems in science, engineering, and other fields.
We operate computer systems on the O(10,000) cores: clusters, clouds, grids.
We conduct computer science research in the context of real people and problems.
We develop open source software for large scale distributed computing.
http://ccl.cse.nd.edu
Computing is the Third Mode of Science
3. Computation
1. Theory
2. Experiment
Example: CMS Experiment at the LHC
1. Theory
2. Experiment
Typical CMS Data Analysis Apps
Example Application: DV3 (Small)
Kelci
Mohrman
Kevin Lannon
This is not bad,
but can we make it near-interactive?
Connor
Moore
More Scientific Workflows
Nanoreactors
ab-initio Chemistry
ForceBalance
FF Optimization
Adaptive Weighted
Ensemble
Molecular Dynamics
Lobster
CMS Data Analysis
SHADHO
Hyperparameter Optimization
Full Workflow:
12,500 species
x 15 climate scenarios
x 6 experiments
x 500 MB per projection
= 1.1M jobs, 72TB of output
Small Example: 10 species x 10 expts
Lifemapper
Species Distribution Modeling
Scientific computing usually starts here:
How do you go from laptop to 10,000 cores?
???
The scientific user doesn't want to start over from scratch writing everything in a new language!
The Scientific Workflow Approach
environ
file
file
file
app
First, encapsulate what you already have in a disciplined way: environment, inputs, invocation, outputs, resources.
RAM
Disk
CPU
The Scientific Workflow Approach
Then, connect together a large number of enscapsulated tasks into a graph structure expressing the entire objective.
The Scientific Workflow Approach
Workflow Manager
Task / Data Scheduler
Computing Facility
Express overall workflow structure, components, constraints, and goals.
Assign ready tasks and data objects to resources in the cluster, subject to runtime constraints.
Execute tasks on computational resources, store and move data between nodes.
Workflows are really interesting!
Workflows are massively parallel programs that can be scaled up and down at runtime based on available resources.
What are the tradeoffs between expressiveness, performance, and predictability?
Evolving Programming Models
TaskVine is a system for executing data intensive scientific workflows on clusters, clouds, and grids from very small to massive scale.
TaskVine controls the computation and storage capability of a large number of workers, striving to carefully manage, transfer, and re-use data and software wherever possible.
TaskVine Architecture Overview
16
HTCondor Pool
Workflow
TaskVine Mgr
tasks
results
Remote
Services
Shared
Filesystem
TaskVine
Worker
TaskVine
Worker
TaskVine
Worker
TaskVine
Worker
Files
Files
Files
Data
S/W
Other
App
Other
App
The TaskVine manager directs workers to read data from remote sources, run tasks on that data, and share data with each other.
TaskVine leaves data on workers in the cluster wherever possible!
TaskVine Worker
f3
f5
url
sd698d
url
wq73dv
temp
xyz123
file
su3g2n
buffer
r223cdf
T1
data.tar.gz
output.txt
T2
config
input.txt
output.txt
Task 1 Sandbox
Task 2 Sandbox
Workflow
Manager
tasks
results
RAM
CPU
0
CPU
1
GPU
0
GPU
1
File = Single file or complex dir.
Manager directs all file movements and accesses.
Files are immutable and given a unique cache name.
Each task runs in a sandbox with a private namespace and an allocation of cores, memory, disk, and gpus.
TaskVine Worker
TaskVine Worker
Low Level API: Declare Files
import ndcctools.taskvine as vine
m = vine.Manager(9123)
file = m.declareFile("mydata.txt")
buffer = m.declareBuffer("Some literal data")
url = m.declareURL("https://somewhere.edu/data.tar.gz")
temp = m.declareTemp();
data = m.declareUntar( url )
package = m.declareStarch( executable )
Low Level API: Connect Tasks to Files
task = vine.Task("mysim.exe -p 50 input.data -o output.data")
t.add_input(url,"input.data")
t.add_output(temp,"output.data")
t.set_cores(4)
t.set_memory(2048)
t.set_disk(100)
t.set_tag("simulator")
taskid = m.submit(t)
Low Level API: Function Shipping
task = vine.PythonTask(simulate_func,molecule,parameters)
t.add_input(url,"input.data")
t.add_output(temp,"output.data")
t.set_cores(4)
t.set_memory(2048)
t.set_disk(100)
t.set_tag("simulator")
taskid = m.submit(t)
Low Level API: Build up a DAG
x = m.define_file()
y = m.define_file()
z = m.define_file()
. . .
a = Task()
b = Task()
c = Task()
. . .
a.add_input(x,"data")
a.add_output(y,"temp")
. . .
Barry Sly-Delgado, Thanh Son Phung, Colin Thomas, David Simonetti, Andrew Hennessee, Ben Tovar, Douglas Thain,
TaskVine: Managing In-Cluster Storage for High-Throughput Data Intensive Workflows,
18th Workshop on Workflows in Support of Large-Scale Science, November, 2023. DOI: 10.1145/3624062.3624277
Evolving Programming Models
More Tasks Requires Smaller Tasks
and that Requires Lower Overhead!
Ideal
Nodes Running
Elapsed Time
Nodes Running
Elapsed Time
Ideal
Actual
Actual
Long Running
N nodes for 60 minutes
High Concurrency
N*10 nodes for 6 minutes
Use Function
Tasks
to Reduce
Overhead!
From Tasks to Libraries and Functions
Task
Input
Output
LibraryTask
F
Args
Result
A Task runs to completion a single time, reading input files, and producing output files.
A LibraryTask contains a Function.
It receives arguments, produces results, but then stays running, waiting for the next invocation.
Args
Args
Result
Result
Functions as a Service - Install Library
26
# Define ordinary Python functions
def my_sum(x, y):
return x+y
def my_mul(x, y):
return x*y
# Create a library object from functions
L = m.create_library_from_functions(
"my_library",my_sum, my_mul)
# Install the library on all workers.
m.install_library(L)
worker
worker
worker
L
L
L
py
py
py
manager
Functions as a Service - Invoke Function
27
# Define a function invocation and submit it
for i in range(1,100):
t = vine.FunctionCall("my_library","my_sum",10,i)
worker
worker
worker
L
L
L
py
py
py
λ
manager
λ
λ
λ
λ
λ
λ
λ
λ
λ
λ
λ
Simply converting "import tensorflow" into the preamble of a LibraryTask saves 1.2GB of Python libraries, 30K metadata system calls, and 5-10s latency per FunctionCall.
David Simonetti, Poster: Mixed Modality Workflows in TaskVine,
ACM High Performance Distributed Computing, pages 331-332, August, 2023.
Evolving Programming Models
Data Flow Languages on TaskVine
TaskVine Manager
Parsl
Coffea
Dask
Custom
App
Python
C or Python
Python
Python
HPC Cluster
TaskVine
Worker
TaskVine
Worker
TaskVine
Worker
TaskVine
Worker
TaskVine
Worker
import taskvine
file = URL(www)
m.submit(task)
task = m.wait(5)
We can use existing high level languages to generate tasks and files, connect them together, and pass the graph along to TaskVine for execution.
Generalized Dataflow Programming
a = dataset("/path/…").split(n)
b = a.map(f).reduce(g)
c = a.reduce(g)
d = h(a,b)
# nothing happens until:�print (d)
f
f
f
f
t0
t1
t2
t3
b
g
dataset
a0
a1
a2
a3
g
c
h
g = Task()
g.add_input(t0)
g.add_input(t1)
…
g.add_output(r)
Dataflow Programming with Dask + TaskVine
Dataflow Programming with Dask + TaskVine
Barry Sly-Delgado, Ben Tovar, Jin Zhou, and Douglas Thain,
Reshaping High Energy Physics Applications for Near-Interactive Execution Using TaskVine,
ACM/IEEE Supercomputing, pages 1-11, November, 2024. DOI: 10.1109/SC41406.2024.00068
Program generated DAG of 185,000 tasks running on 7000 cores done in 41 minutes.
Example: DV3 HEP Application
Wide reduction trees are sensitive to failure (keep retrying!) and require maintaining a large amount of state on disk until the final task runs.
Restructuring Task Graphs Via Annotations
We could ask the user to rewrite the application, but this requires substantial low level knowledge.
Instead: ask them to label functions with algebraic properties: commutative, associative, etc.
Let the workflow system rearrange the graph within provided constraints.`
def write(events):
import awkward as ak
d = ak_to_parquet(events,"out_parquet")
return d
@associative
def merge(*events):
import awkward as ak
return ak.concatenate(events)
graph = {}
events = get_events()
d = dask.delayed(merge)(events)
d = dask.delayed(write)(d)
graph[f"{dataset_name}_write"] = d
Same Application: Hierarchical Reduction
Barry Sly-Delgado
unmodified program
@associative
(avoids long tail)
TaskVine Workers
Parallel Filesystem
Parsl DAG Manager
Parsl Application
TaskVine Executor
expand
release
dispatch
?
shared filesystem access
ready
tasks
blocked
tasks
Parsl knows about the entire program and releases tasks that are "ready" to TaskVine for execution. TaskVine handles scheduling and data locality.
Problem: Data Locality
Intermediate File
Persistent File
Task
Solution: Task Groups
Intermediate File
Persistent File
Task
Task Group
Task Grouping with Parsl and TaskVine
Parsl
DFK
Parsl DAG
TaskVine Ready Tasks
Intermediate File (taskvinetemp://)
Persistent File
Group Task
Independent Task
Effect of Task Grouping on Montage
Colin Thomas and Douglas Thain,
Liberating the Data Aware Scheduler to Achieve Locality in Layered Scientific Workflow Systems,
IEEE Conference on eScience, pages 9, September, 2025.
Managing Function State for LLMs
Traditionally, model loading is coupled with the actual inferences, resulting in repeated LLM creation costs over many invocations of the inference function.
The LLM creation is now separated into a load function, which will be contextualize in a Library to serve multiple inference invocations, effectively amortizing its creation cost.
Exploiting Context with Parsl and TaskVine
42
Large Scale Neural Network Inference
100K tasks on 150 x 32c workers
T. Phung, C. Thomas, L. Ward, K. Chard, D. Thain, Accelerating Function-Centric Applications by Discovering, Distributing, and Retaining Reusable Context in Workflow Systems, ACM International Symposium on High-Performance Parallel and Distributed Computing (HPDC), June, 2024.
Evolving Programming Models
Challenge of Scaling Up Graphs
(this is the small version)
As we scale up the DV5 application up to O(1M) nodes, the graph construction performed by Dask begins to dominate the execution time! (Note common structure within each branch.)
Task Scheduler
DAG Scheduler
Worker Node
Schedule Fine Grained Tasks to Workers
Release Fine Grained Tasks to Scheduler
Worker Node
Worker Node
Generate Complete Graph for Application
Coffea-Dask Application
DDR Scheduler
Generate Abstract Representation
Task Scheduler
Dask Local
DDR Application
Worker Node
Worker Node
Dask Local
Dask Local
pending
Worker Node
running
Generate Variable Sized Subgraphs Specs
Dispatch Subgraphs Specs as Tasks to Workers
d
d
d
d
r
p
p
done
All Datasets
One Dataset
Dynamic Data Reduction API
def preprocess(dataset_info, **kwargs):
# break the dataset into chunks
def postprocess(val, **kwargs):
# compute the properties of each chunk
def processor(x):
# a data parallel Dask program
def reducer(a, b):
# commutative / associative function
ddr = DynamicDataReduction(
mgr, data,
preprocess, postprocess,
processor, reducer )
Kelci
Mohrman
Ben
Tovar
Evolving Programming Models
Observation: Serialization Overheads
Static Graph Compilation
Runtime Execution Graph (REG)
Strategic Orchestration Graph (SOG)
TaskVine Manager
a
b
c
d
e
def func1(a, b):
return (a + b) * 2
Task1 = func1(10, 20)
Task2 = func1(Task1, 30)
Submit Task Graph
1
2
3
4
5
Proxy Function
fork
“run task 1”
Tracks data dependencies and computation status
REG
fork
fork
1->a
2->b
3->c
“run task 2”
“run task 3”
Stores all task definitions, executables, and arguments
Install REG on every worker
Worker
In this approach, the entire graph is compiled upfront and distributed to all workers. Task execution then only involves sending the task key to the worker, eliminating the need for Serialization.
Initial Results from Graph Compilation
Static compilation dramatically improves task latency and increases throughput by 100X on small tasks!
Jin Zhou
Evolving Programming Models
The Design of Abstractions
The fundamental problem of systems research lies in the design of abstractions for the end user (or the layer above.)
Find ways to hoist common definitions of code/data/structure so that they are processed once and then distributed.
It is usually better to compile than to interpret structures!
Dealing with "Real World" Applications
The Translational Research Cycle
Develop a
Research Concept
Write Code
Test & Evaluate
Publish Results
Release Software
Engage Users
Give TaskVine a Try!
55
This work was supported by
NSF Award OAC-1931348
conda install -c conda-forge ndcctools
For more information…
56
CCL Staff and Students
conda install -c conda-forge ndcctools
This work was supported by
NSF Award OAC-1931348