1 of 52

Beam Up Your GenAI Usage

Usability, Efficiency, Reliability with Apache Beam

May 2024

2 of 52

What is Apache Beam?

What?

  • Open-source data processing tool available since 2015.
  • Top Apache project with growing usage.

Why?

  • Offers a balance of correctness, latency, cost.
  • Proven in Cloud, On-Premise, Hybrid environments.
  • Heavily used in the ML community. – Why & How is the subject of this talk!

3 of 52

History of Apache Beam

2012

2013

2002

2004

2006

2008

2010

GFS

MapReduce

BigTable

Flume

Dremel

Spanner

Millwheel

2015

Cloud

Dataflow

+

Apache Beam

Simple distributed data processing

Logical pipelines

& optimization

Low-latency

streaming

Batch + Streaming

Serverless Cloud

4 of 52

Beam’s ML Journey

  • TensorFlow Extended (TFX) started in 2017 as the end to end ML platform powered by Apache Beam.
    • “Industrial-scale” ML for Alphabet: 1000s of TFX pipelines, processing exabytes of data with 10,000+ modes, 100m+ inferences per second.
    • Adopted by a number of large scale technical companies: AirBnB, PayPal, X
  • Outside Alphabet there were headwinds for adoption
    • ML Frameworks other than TensorFlow had significant mind share.
    • Growing and diverse group of ML practitioners preferred easier to use options.
    • Demand for accelerators grew exponentially, high costs and limited availability of GPUs were becoming increasingly challenging.
    • As number of ML applications grew exponentially, each requiring ongoing maintenance taking time away from core tasks.

5 of 52

What is Beam ML

  • Based on the learnings at Google scale & 1000s+ small-to-large user
  • Standard set of tools to enable ML practitioners to focus delivering real time value with ease.
  • Emphasis on usability, efficiency, reliability
  • Significant usage growth since 2022 with marquee use cases like:
    • Spotify - Generating podcast previews 30x faster with improved reliability and lower management overhead.
    • Credit Karma - processing 20,000 features with 200+ models for 130 million users.

6 of 52

With Dataflow ML, the new streaming pipeline reduced the median preview latency from 111.7 minutes to 3.7 minutes. That means the new pipeline generates podcast previews 30 times faster than the batch pipeline. Quite an improvement!

Spotify on generating podcast previews using NLP and ML

Spotify: ML at Enterprise Scale with Beam ML

7 of 52

Ahmet Altay

  • Member of Apache Software Foundation
  • Google Cloud Dataflow, Engineering Director

Interests

  • Open Spaces 🌲🌲🌲
  • Open Source 👐
  • Data & Python 🔥
  • Streaming my hair 💇

Reach out! ✋

8 of 52

ML �predictions

- Local models

- Remote models

- GPU Choice

- GPU Efficiency

- Model Ensembles

ML Frameworks & Platforms

  • Huggingface
  • Tensorflow
  • PyTorch
  • scikit-learn, XGBoost
  • Vertex AI
  • others...

LLM

  • MLTransform Embedding generation
  • Integrate with Vector DBs
  • Integrate with OSS LLM

Insights activation

Downstream integrations with 30+ Sources / Sinks:

  • Kafka
  • BigQuery
  • BigTable
  • Pub/Sub

Continuous data preparation

  • TFX
  • Kubeflow Pipelines
  • Vertex AI Pipelines

Real time AI

Beam ML - Features & Foundation

Usability

Reliability

Efficiency

9 of 52

Foundations of Beam ML

Easy To Use

10 of 52

Dataflow

Streaming ML pipeline

Transactional Systems

Operational Systems

Real time BI

Pre-Process

Read

Activate

Post-Process

Pub/sub

11 of 52

DoFn

Input.apply

(Sum.integersPerKey())

Java

input | Sum.PerKey()

Python

stats.Sum(s, input)

Go

SELECT key, SUM(value) FROM input GROUP BY key

SQL

GBK

State

Combiner

Timers

Windows

Beam primitives are great ... but..

12 of 52

Accelerating developer productivity

Building blocks

Goal

Turn Key transformations

13 of 52

Writing that code!

Writing same code!

Yup same again!

Déjà vu?

14 of 52

with beam.Pipeline(options=pipeline_options) as p:

(p

| beam.io.fileio.MatchFiles(gs://my_bucket/*)

| beam.io.fileio.ReadMatches()

| beam.Map(preprocess_image)

| beam.xxx_pattern_xxx(Configuration)

...

Accelerating developer productivity

Turn Key transformations

15 of 52

with beam.Pipeline(options=pipeline_options) as p:

(p

| beam.io.PubSubIO(my_topic)

| beam.Map(preprocess)

| beam.ml.inference.RunInference(model_handler)

| beam.Map(post_process)

...

Accelerating developer productivity

RunInference

16 of 52

model_handler = PytorchModelHandlerTensor(

state_dict_path='gs://my_model_dir/my_model',

model_class=my_model_class,

model_params={'input_dim': 1,...}

)

Accelerating developer productivity

RunInference

17 of 52

RunInference Features - Framework support

Open Framework support

18 of 52

RunInference - �Model Repositories

  • TensorHub
  • Hugging Face

model_handler = HuggingFacePipelineModelHandler(

task=PipelineTask.QuestionAnswering,

model=known_args.model_name,

load_model_args={

'framework': 'pt', 'revision': known_args.revision

})

19 of 52

RunInference Features - Pre / Post processing

20 of 52

RunInference Features - Exception handling

  • other.failed_preprocessing
  • other.failed_inferences
  • other.failed_postprocessing

21 of 52

Dataflow

Streaming ML pipeline

Cloud Pub/Sub

Transactional Systems

Operational Systems

Real time BI

Pre-Process

Read

Activate

Post-Process

22 of 52

Real pipelines

Speech to Text

Voice

Sentiment Analysis

Language Understanding

Product

Recommender

Text

Support

Recommender

Logs

Text to Speech

Response

23 of 52

Real pipelines - A/B

data = p | beam.io.textio(files)

data | RunInference(model_a_handler)

data | RunInference(model_b_handler)

B

Source

Sinks

A

24 of 52

Real pipelines - Sequential

data = p | beam.io.textio(files)

model_a_output =

data | RunInference(model_a_handler)

model_a_output

| Map(postprocess)

| RunInference(model_b_handler)

B

Source

Sinks

A

25 of 52

RunInference - �PerKey Inference

Routing of data to specific models, within a single transform.

per_key_mhs = [

KeyModelMapping(['a', 'b'],model_x), KeyModelMapping(['c', 'd'],model_y)

]

mh = KeyedModelHandler(

per_key_mhs, max_models_per_worker_hint=2)

RunInference�model_a , model_b

Post process

Read

Pre process

Pub/Sub

26 of 52

Dataflow

Streaming LLM pipeline

Vector DB

Pre-Process

Read

Pub/sub

Local / Remote Models

Generate Embeddings

Store

Dataflow

Inference

27 of 52

...

p

| beam.Create(data)

| MLTransform

(write_artifact_location=artifact_location)

.with_transform

(ComputeAndApplyVocabulary(columns=['x']))

Beam ML - MLTransform

28 of 52

...

embedding_transform = SentenceTransformerEmbeddings(

model_name=text_embedding_model_name, columns=['x'],

inference_args={'convert_to_numpy': False}

)

with beam.Pipeline() as pipeline:

data_pcoll = (

pipeline

| "CreateData" >> beam.Create(content))

transformed_pcoll = (

data_pcoll

| MLTransform(write_artifact_location=artifact_location_t5_with_inference_args)

.with_transform(embedding_transform))

Beam ML for Embeddings

29 of 52

...

with beam.Pipeline() as p:

output = (p

| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION)

| "Convert bytes to Row" >> beam.ParDo(DecodeBytes())

| "Enrichment" >> Enrichment(bigtable_handler)

| "Run Inference" >> RunInference(model_handler)

)

Turn key - Enrichment transform

30 of 52

Beam ML Foundations: Easy to Use

  • Declarative TurnKey Transforms - Encapsulates complex real use cases like live model updates, support for multiple models with a few lines of code.
  • Serves different parts of the ML life cycle including pre/post processing & inference.
  • Integrated with all major frameworks, feature stores, tools and libraries.
  • Production Ready - error handling, comprehensive metrics, dead letter queues.

31 of 52

Foundations of Beam ML

Efficient

32 of 52

Hardware is … Hard.

  • A lot of swapping models in and out of GPU memory and compromising on pipeline visibility – Spotify
  • The cost of running a large number of GPU machines year-round is elevated – Spotify
  • Administrative cost of running pipelines on 1000s of TPUs is high – Alphabet
  • Models were scored sequentially one at a time which resulted in excessive compute costs – Credit Karma

33 of 52

Beam ML - GPU's

Pull model onto worker

PCollection

(bounded or unbounded)

Sink

34 of 52

Ideal small model configuration

VM

I/O &

Model

I/O &

Model

I/O &

Model

I/O &

Model

I/O &

Model

I/O &

Model

I/O &

Model

I/O &

Model

35 of 52

Ideal Multi Large Model Configuration

VM

I/O

I/O

I/O

I/O

I/O

I/O

I/O

I/O

Model 1

VM

I/O

I/O

I/O

I/O

Model 2

I/O

I/O

I/O

I/O

Model 3

36 of 52

Rightfitting for GPUs

PCollection

(bounded or unbounded)

Source

Sink

Small VMs

Min-Mem 512 GB

GPU Required

37 of 52

Rightfitting for GPUs

Source

Storage

Pipeline 1

Storage

Pipeline 2

Storage

Storage

Small VM's

Min-Mem 512GB

GPU Required

Pipeline 3

38 of 52

Rightfitting for GPUs

PCollection

(bounded or unbounded)

Source

Sink

Worker Pool 1

Small VMs

Worker Pool 3

Min-Mem 512GB

Worker Pool 2�GPU boxes

39 of 52

Beam ML Foundations: Efficient

  • Addresses complex operational problems of aka memory management automatically:
    • Efficiently load, switch, share, use multiple large or small models.
  • Maximize the utilization of scarce and expensive accelerators (GPUs) automatically:
    • Use accelerators only for the relevant parts of your ML pipeline.
  • Use best practices for cost and latency optimization automatically:
    • Dynamically batch elements based on runtime characteristics

40 of 52

Foundations of Beam ML

Reliable

41 of 52

Credit Karma: Self Service ML Platform

Apache Beam enabled self-service ML for our data scientists. They can plug in pieces of code, and those transformations will be automatically attached to models without any engineering involvement. Within seconds, our data science team can move from experimentation to production.

The scale of data processing has grown 2x since Apache Beam adoption, and their data engineering team did not have to undertake any significant changes to the infrastructure. Onboarding new partners requires minimal changes to the pipelines, compared to several weeks needed before.

Apache Beam. The Apache Beam ingestion pipeline accelerated data loading to the warehouse from days to under an hour, processing around 5-10 TB of data daily.

42 of 52

Transforms with DIY Platforms

DoFn

Read

Write

DoFn

DoFn

User's problem

Service provider’s problem

Run!

Shuffle

GBK

Read

DoFn

Control Plane

Metrics

Autotuning

Worker Infra

DoFn

DoFn

Write

UI

43 of 52

Transforms - Managed Services

DoFn

Read

Write

DoFn

DoFn

Run!

GBK

Read

DoFn

DoFn

DoFn

Write

Control Plane

Metrics

Autotuning

UI

Shuffle

Worker Infra

User's problem

Service provider’s problem

44 of 52

Managed Transforms

DoFn

Read

Write

DoFn

DoFn

Run!

GBK

Read

DoFn

DoFn

DoFn

Write

Control Plane

Metrics

Autotuning

UI

Shuffle

Worker Infra

User's problem

Service provider’s problem

45 of 52

Dependency management - Auto Drivers & Configs

Driver installation:

worker_accelerator option install-nvidia-driver

option

use_nvidia_mps

46 of 52

RunInference - Streaming Update

Two modes:

1. Watch Mode

Upload updated model to files stores like and RunInference will auto pull the new model for you

2. Event Mode

Push an update message to RunInference via a streaming source such as Kafka.

RunInference

Post process

Read

Pre process

Pub/Sub

Model v1

Model v2

47 of 52

Beam ML Foundations: Reliable

  • Address all faucets of running reliable streaming ML pipelines.
  • Easy and declarative IOs, moving the concerns for a major source of production & performance issues to the service providers.
  • Easy dependency management to choose the latest and most efficient drivers and configurations for each pipeline.
  • Easily manage and update live models in running pipelines.

48 of 52

So What?

49 of 52

Beam ML has grown significantly in the past 2 years

  • Credit Karma: “The scale of data processing has grown 2x since Apache Beam adoption.”
  • Alphabet - “process millions of examples across thousands of TPUs without any administrative intervention” powering Gemini, Ads, Waymo, Nest and YouTube.
  • Beam Community - 5x more Beam ML related talks compared to year before.

50 of 52

Learnings from Beam’s ML Journey

  • Build a platform that can evolve as fast as your users and the ecosystem
    • Declarative style, capturing intent gives you the most flexibility to do that.
  • Build for your end users
    • Easier said than done.
    • You may not be your ideal user either. For example Google scale does not represent typical GCP user.
  • Enable your users to focus on what matters most to them.
    • Move other concerns from users to your side.
  • Or just use Beam ML!

51 of 52

What’s next for Beam ML?

  • Address the next level of ML problems with the current building blocks.

Turn Key ML Pipelines & Solutions

  • Real time recommendations
    • Requires the joining of users 'live' clickstream with the historic clickstream data.
  • Anomaly detection
    • Join the IOT telemetry data, with historic information
  • Financial Time series index building
    • Join the current instrument ticks, with historic information metrics to build near real time indexes

52 of 52

Resources