Beam Up Your GenAI Usage
Usability, Efficiency, Reliability with Apache Beam
May 2024
What is Apache Beam?
What?
Why?
History of Apache Beam
2012
2013
2002
2004
2006
2008
2010
GFS
MapReduce
BigTable
Flume
Dremel
Spanner
Millwheel
2015
Cloud
Dataflow
+
Apache Beam
Simple distributed data processing
Logical pipelines
& optimization
Low-latency
streaming
Batch + Streaming
Serverless Cloud
Beam’s ML Journey
What is Beam ML
With Dataflow ML, the new streaming pipeline reduced the median preview latency from 111.7 minutes to 3.7 minutes. That means the new pipeline generates podcast previews 30 times faster than the batch pipeline. Quite an improvement!
Spotify on generating podcast previews using NLP and ML
Spotify: ML at Enterprise Scale with Beam ML
Ahmet Altay
Interests
ML �predictions
- Local models
- Remote models
- GPU Choice
- GPU Efficiency
- Model Ensembles
ML Frameworks & Platforms
LLM
Insights activation
Downstream integrations with 30+ Sources / Sinks:
Continuous data preparation
Real time AI
Beam ML - Features & Foundation
Usability
Reliability
Efficiency
Foundations of Beam ML
Easy To Use
Dataflow
Streaming ML pipeline
Transactional Systems
Operational Systems
Real time BI
Pre-Process
Read
Activate
Post-Process
Pub/sub
DoFn
Input.apply
(Sum.integersPerKey())
Java
input | Sum.PerKey()
Python
stats.Sum(s, input)
Go
SELECT key, SUM(value) FROM input GROUP BY key
SQL
GBK
State
Combiner
Timers
Windows
Beam primitives are great ... but..
Accelerating developer productivity
Building blocks
Goal
Turn Key transformations
Writing that code!
Writing same code!
Yup same again!
Déjà vu?
with beam.Pipeline(options=pipeline_options) as p:
(p
| beam.io.fileio.MatchFiles(gs://my_bucket/*)
| beam.io.fileio.ReadMatches()
| beam.Map(preprocess_image)
| beam.xxx_pattern_xxx(Configuration)
...
Accelerating developer productivity
Turn Key transformations
with beam.Pipeline(options=pipeline_options) as p:
(p
| beam.io.PubSubIO(my_topic)
| beam.Map(preprocess)
| beam.ml.inference.RunInference(model_handler)
| beam.Map(post_process)
...
Accelerating developer productivity
RunInference
model_handler = PytorchModelHandlerTensor(
state_dict_path='gs://my_model_dir/my_model',
model_class=my_model_class,
model_params={'input_dim': 1,...}
)
Accelerating developer productivity
RunInference
RunInference Features - Framework support
Open Framework support
RunInference - �Model Repositories
model_handler = HuggingFacePipelineModelHandler(
task=PipelineTask.QuestionAnswering,
model=known_args.model_name,
load_model_args={
'framework': 'pt', 'revision': known_args.revision
})
RunInference Features - Pre / Post processing
RunInference Features - Exception handling
Dataflow
Streaming ML pipeline
Cloud Pub/Sub
Transactional Systems
Operational Systems
Real time BI
Pre-Process
Read
Activate
Post-Process
Real pipelines
Speech to Text
Voice
Sentiment Analysis
Language Understanding
Product
Recommender
Text
Support
Recommender
Logs
Text to Speech
Response
Real pipelines - A/B
data = p | beam.io.textio(files)
data | RunInference(model_a_handler)
data | RunInference(model_b_handler)
B
Source
Sinks
A
Real pipelines - Sequential
data = p | beam.io.textio(files)
model_a_output =
data | RunInference(model_a_handler)
model_a_output
| Map(postprocess)
| RunInference(model_b_handler)
B
Source
Sinks
A
RunInference - �PerKey Inference
Routing of data to specific models, within a single transform.
per_key_mhs = [
KeyModelMapping(['a', 'b'],model_x), KeyModelMapping(['c', 'd'],model_y)
]
mh = KeyedModelHandler(
per_key_mhs, max_models_per_worker_hint=2)
RunInference�model_a , model_b
Post process
Read
Pre process
Pub/Sub
Dataflow
Streaming LLM pipeline
Vector DB
Pre-Process
Read
Pub/sub
Local / Remote Models
Generate Embeddings
Store
Dataflow
Inference
...
p
| beam.Create(data)
| MLTransform
(write_artifact_location=artifact_location)
.with_transform
(ComputeAndApplyVocabulary(columns=['x']))
Beam ML - MLTransform
...
embedding_transform = SentenceTransformerEmbeddings(
model_name=text_embedding_model_name, columns=['x'],
inference_args={'convert_to_numpy': False}
)
with beam.Pipeline() as pipeline:
data_pcoll = (
pipeline
| "CreateData" >> beam.Create(content))
transformed_pcoll = (
data_pcoll
| MLTransform(write_artifact_location=artifact_location_t5_with_inference_args)
.with_transform(embedding_transform))
Beam ML for Embeddings
...
with beam.Pipeline() as p:
output = (p
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION)
| "Convert bytes to Row" >> beam.ParDo(DecodeBytes())
| "Enrichment" >> Enrichment(bigtable_handler)
| "Run Inference" >> RunInference(model_handler)
)
Turn key - Enrichment transform
Beam ML Foundations: Easy to Use
Foundations of Beam ML
Efficient
Hardware is … Hard.
Beam ML - GPU's
Pull model onto worker
PCollection
(bounded or unbounded)
Sink
Ideal small model configuration
VM
I/O &
Model
I/O &
Model
I/O &
Model
I/O &
Model
I/O &
Model
I/O &
Model
I/O &
Model
I/O &
Model
Ideal Multi Large Model Configuration
VM
I/O
I/O
I/O
I/O
I/O
I/O
I/O
I/O
Model 1
VM
I/O
I/O
I/O
I/O
Model 2
I/O
I/O
I/O
I/O
Model 3
Rightfitting for GPUs
PCollection
(bounded or unbounded)
Source
Sink
Small VMs
Min-Mem 512 GB
GPU Required
Rightfitting for GPUs
Source
Storage
Pipeline 1
Storage
Pipeline 2
Storage
Storage
Small VM's
Min-Mem 512GB
GPU Required
Pipeline 3
Rightfitting for GPUs
PCollection
(bounded or unbounded)
Source
Sink
Worker Pool 1
Small VMs
Worker Pool 3
Min-Mem 512GB
Worker Pool 2�GPU boxes
Beam ML Foundations: Efficient
Foundations of Beam ML
Reliable
Credit Karma: Self Service ML Platform
Apache Beam enabled self-service ML for our data scientists. They can plug in pieces of code, and those transformations will be automatically attached to models without any engineering involvement. Within seconds, our data science team can move from experimentation to production.”
The scale of data processing has grown 2x since Apache Beam adoption, and their data engineering team did not have to undertake any significant changes to the infrastructure. Onboarding new partners requires minimal changes to the pipelines, compared to several weeks needed before.
Apache Beam. The Apache Beam ingestion pipeline accelerated data loading to the warehouse from days to under an hour, processing around 5-10 TB of data daily.
Transforms with DIY Platforms
DoFn
Read
Write
DoFn
DoFn
User's problem
Service provider’s problem
Run!
Shuffle
GBK
Read
DoFn
Control Plane
Metrics
Autotuning
Worker Infra
DoFn
DoFn
Write
UI
Transforms - Managed Services
DoFn
Read
Write
DoFn
DoFn
Run!
GBK
Read
DoFn
DoFn
DoFn
Write
Control Plane
Metrics
Autotuning
UI
Shuffle
Worker Infra
User's problem
Service provider’s problem
Managed Transforms
DoFn
Read
Write
DoFn
DoFn
Run!
GBK
Read
DoFn
DoFn
DoFn
Write
Control Plane
Metrics
Autotuning
UI
Shuffle
Worker Infra
User's problem
Service provider’s problem
Dependency management - Auto Drivers & Configs
Driver installation:
worker_accelerator option install-nvidia-driver
option
use_nvidia_mps
RunInference - Streaming Update
Two modes:
1. Watch Mode
Upload updated model to files stores like and RunInference will auto pull the new model for you
2. Event Mode
Push an update message to RunInference via a streaming source such as Kafka.
RunInference
Post process
Read
Pre process
Pub/Sub
Model v1
Model v2
Beam ML Foundations: Reliable
So What?
Beam ML has grown significantly in the past 2 years
Learnings from Beam’s ML Journey
What’s next for Beam ML?
Turn Key ML Pipelines & Solutions
Resources
Try Beam in your browser - https://play.beam.apache.org/
Case studies: https://beam.apache.org/case-studies/
Join the community:
Beam Summit September 4 & 5 - https://beamsummit.org/
Learn more, use, or contribute - https://beam.apache.org/community/
Docs & end to end samples: