1 of 74

7th Apache Beam meet up London

#ApacheBeamLondon

2 of 74

Hi everyone!

About me:

  • Active community organiser and Beam user for Beam (London, Stockholm, Paris, and Berlin soon). Get involved!
  • Software Engineer at Arabesque S-Ray (hiring!)

Apache Beam Committer

Google Developer Expert

Apache Beam meetup London - #ApacheBeamLondon

3 of 74

Update from the community

  • Tweet with #ApacheBeamLondon

  • Stay in the loop:
    • Sign-up to user@ and dev@beam.apache.org for the latest updates
    • Join the conversation of Slack: the-asf.slack.com

  • YouTube channel: meetup talks, conference talks, …

Apache Beam meetup London - #ApacheBeamLondon

4 of 74

Update from the community

  • Restart in Stockholm on 06/05 with talk from Spotify, EQT and Google
  • Beam Summit Europe 2019: https://beamsummit.org / @BeamSummit

Apache Beam meetup London - #ApacheBeamLondon

5 of 74

Apache Beam meetup London - #ApacheBeamLondon

6 of 74

Thanks to:

Apache Beam meetup London - #ApacheBeamLondon

7 of 74

Joe Cullen - Data Engineer [Datatonic]

Talk 2: Building a data lake using Beam

Meet the people

Reuven Lax (@reuvenlax) - Senior Staff Software Engineer [Google]

Talk 3: Schema support

Thomas Weise (@thweise) - Software Engineer [Lyft]

Talk 1: Dynamic pricing using Beam

Apache Beam meetup London - #ApacheBeamLondon

8 of 74

Dynamic Pricing using Apache Beam

Thomas Weise (@thweise)

9 of 74

Agenda

  • Introduction to dynamic pricing
  • Legacy pricing infrastructure
  • Streaming based infrastructure
  • Beam & multiple languages
  • Beam Flink runner
  • Lessons learned

9

10 of 74

Dynamic Pricing

Supply/Demand curve

ETA

Pricing

Notifications

Detect Delays

Coupons

User Delight

Fraud

Behaviour Fingerprinting

Monetary Impact

Imperative to act fast

Top Destinations

Core Experience

10

11 of 74

Introduction to Dynamic Pricing

11

12 of 74

The Marketplace affects Prices

  • Dynamic Pricing - price evaluated minutely at each location bucket
  • An Imbalanced Market is Inefficient
    • Too many available drivers: bad
    • Too few available drivers: bad
    • Solution: Price lever controls passenger request rate, which maintains healthy supply levels
  • Result: increase price if demand >> supply

12

13 of 74

What is PrimeTime?

  • Belief: There exists some set of optimal price multipliers per location/time bucket
  • PrimeTime- Lyft product that sets a multiplier for each gh6 each minute
  • Example: In ‘9q8yyv’, at 5:01pm PST, PrimeTime = 2.0
  • Scale: Roughly 3 million geohashes prices every minute

13

14 of 74

Why is PrimeTime Hard?

  • 1. Need low-latency information about supply and demand
  • 2. Pricing is an unsupervised problem- correct answer is never observed
  • Solution: break the problem into multiple models that form a DAG, where intermediate models are solving supervised problems
    • Example: f (available_supply) -> pickup_times

14

15 of 74

Legacy Pricing Infrastructure

15

16 of 74

Legacy architecture: A series of cron jobs

  • Ingest high volume of client app events (Kinesis, KCL)�
  • Compute features (e.g. demand, conversation rate, supply) from events�
  • Run ML models on features to compute primetime for all regions (per min, per gh6)

SFO, calendar_min_1: {gh6: 1.0, gh6: 2.0, ...}

NYC: calendar_min_1: {gh6, 2.0, gh6: 1.0, ...}

16

17 of 74

Problems

  1. Latency�
  2. Code complexity (LOC)�
  3. Hard to add new features involving windowing/join (i.e. arbitrary demand windows, subregional computation)�
  4. No dynamic / smart triggers

17

18 of 74

Can we use Flink?

18

19 of 74

Streaming Stack

19

Streaming

Application

(SQL, Java)

Stream / Schema

Registry

Deployment

Tooling

Metrics & Dashboards

Alerts

Logging

Amazon

EC2

Amazon S3

Wavefront

Salt

(Config / Orca)

Docker

Source

Sink

19

20 of 74

Streaming and Python

  • Flink and many other big data ecosystem projects are Java / JVM based
    • Team wants to adopt streaming, but doesn’t have the Java skills
    • Jython != Python
  • Use cases for different language environments
    • Python primary option for Machine Learning
  • Cost of many API styles and runtime environments

20

21 of 74

Python via Beam

Streaming

Application

(Python/Beam)

Source

Sink

21

22 of 74

Streaming based Pricing Infrastructure

22

23 of 74

Pipeline (conceptual outline)

23

kinesis events (source)

aggregate and window

filter events

run models to generate features (culminating in PT)

internal services

redis

ride_requested, app_open, ...

unique_users_per_min,

unique_requests_per_5_min, ...

conversion learner,

eta learner, ...

Lyft apps (phones)

valid sessions, dedupe, ...

24 of 74

Details of implementation

  1. Filtering (with internal service calls)
  2. Aggregation with Beam windowing: 1min, 5min (by event time)
  3. Triggers: watermark, data-driven (stateful processing)
  4. Join multiple streams: CoGroup or stateful processing
  5. Machine learning models invoked within Beam transforms
  6. Final gh6:pt output from pipeline stored to Redis

24

25 of 74

Gains

  • Latency: 3 minutes -> 30s
    • Latency now dominated by model execution
  • Reuse of model code
  • 10K => 4K LOC
  • 300 => 120 AWS instances

25

26 of 74

Beam and multiple languages

26

27 of 74

The Beam Vision

  1. End users: who want to write pipelines in a language that’s familiar.�
  2. SDK writers: who want to make Beam concepts available in new languages. Includes IOs: connectors to data stores.
  3. Runner writers: who have a distributed processing environment and want to support Beam pipelines

Beam Model: Fn Runners

Apache Flink

Apache Spark

Beam Model: Pipeline Construction

Other

Languages

Beam Java

Beam Python

Execution

Execution

Cloud Dataflow

Execution

27

28 of 74

Multi-Language Support

  • Initially Java SDK and Java Runners
  • 2016: Start of cross-language support effort
  • 2017: Python SDK on Dataflow
  • 2018: Go SDK (for portable runners)
  • 2018: Python on Flink MVP
  • Next: Cross-language pipelines, more portable runners�

28

29 of 74

Python Example

p = beam.Pipeline(runner=runner, options=pipeline_options)

(p

| ReadFromText("/path/to/text*") | Map(lambda line: ...)

| WindowInto(FixedWindows(120)

trigger=AfterWatermark(� early=AfterProcessingTime(60),

late=AfterCount(1)) accumulation_mode=ACCUMULATING)

| CombinePerKey(sum))

| WriteToText("/path/to/outputs")

)

result = p.run()

( What, Where, When, How )

29

30 of 74

Portability (current)

input | Sum.PerKey()

Python

stats.Sum(s, input)

Go

SELECT key, SUM(value) FROM input GROUP BY key

SQL (via Java)

input.apply(

Sum.integersPerKey())

Java

Apache Spark

Apache Flink

Apache Apex

Gearpump

Cloud Dataflow

Apache Samza

Apache Nemo (incubating)

IBM Streams

Sum Per Key

Java objects

Sum Per Key

Portable protos

30

31 of 74

Beam Flink Runner

31

32 of 74

Portable Flink Runner

SDK

(Python)

Job Service

Artifact�Staging

Job Manager

Fn Services

(Beam Flink Task)

Task Manager

Executor / Fn API

Provision

Control

Data

Artifact�Retrieval

State

Logging

gRPC

Pipeline (protobuf)

Cluster

Runner

Dependencies

(optional)

python -m apache_beam.examples.wordcount \� --input=/etc/profile \� --output=/tmp/py-wordcount-direct \� --runner=PortableRunner \

--job_endpoint=localhost:8099 \� --streaming

Staging Location

(DFS, S3, …)

SDK Worker (UDFs)

SDK Worker (UDFs)

SDK Worker (Python)

Flink Job

32

33 of 74

Lyft Flink Runner Customizations

  • Translator extension for streaming sources
    • Kinesis, Kafka consumers that we also use in Java Flink jobs
    • Message decoding, watermarks
  • Python execution environment for SDK workers
    • Tailored to internal deployment tooling
    • Docker-free, frozen virtual envs
  • https://github.com/lyft/beam/tree/release-2.11.0-lyft

33

34 of 74

Workers have Fn

Runner

Runner worker launches services.

Control Service

Data Service

State Service

Logging Service

35 of 74

How slow is this ?

  • Fn API Overhead 15% ?
  • Fused stages
  • Bundle size
  • Parallel SDK workers
  • TODO: Cython
  • protobuf C++ bindings

Fn API

decode, …, window

count

(messages

| 'reshuffle' >> beam.Reshuffle()

| 'decode' >> beam.Map(lambda x: (__import__('random').randint(0, 511), 1))

| 'noop1' >> beam.Map(lambda x : x)

| 'noop2' >> beam.Map(lambda x : x)

| 'noop3' >> beam.Map(lambda x : x)

| 'window' >> beam.WindowInto(window.GlobalWindows(),

trigger=Repeatedly(AfterProcessingTime(5 * 1000)),

accumulation_mode= AccumulationMode.DISCARDING)

| 'group' >> beam.GroupByKey()

| 'count' >> beam.Map(count)

)

35

36 of 74

Fast enough for real Python work !

  • c5.4xlarge machines (16 vCPU, 32 GB)
  • 16 SDK workers / machine
  • 1000 ms or 1000 records / bundle
  • 280,000 transforms / second / machine (~ 17,500 per worker)
  • Python user code will be gating factor

36

37 of 74

Beam Portability Recap

  • Pipelines written in non-JVM languages on JVM runners
    • Python, Go on Flink (and others)
  • Full isolation of user code
    • Native CPython execution w/o library restrictions
  • Flexible SDK worker execution
    • Docker, Process, Embedded, ...
  • Multiple languages in a single pipeline (WIP)
    • Use Java Beam IO with Python
    • Use TFX with Java
    • <your use case here>

37

38 of 74

Feature Support Matrix (Beam 2.12.0)

38

39 of 74

Lessons Learned

39

40 of 74

Lessons Learned

  • Python Beam SDK and portable Flink runner evolving
  • Keep pipeline simple - Flink tasks / shuffles are not free
  • Stateful processing is essential for complex logic
  • Model execution latency matters
  • Instrument everything for monitoring
  • Think about pipeline restart and upgrade
  • Mind your dependencies - rate limit API calls
  • Long running Python processes may expose memory leaks

40

41 of 74

We’re Hiring! Apply at www.lyft.com/careers

or email data-recruiting@lyft.com

Data Engineering

Engineering Manager

San Francisco

Software Engineer

San Francisco, Seattle, & New York City

Data Infrastructure

Engineering Manager

San Francisco

Software Engineer

San Francisco & Seattle

Experimentation

Software Engineer

San Francisco

Streaming

Software Engineer

San Francisco

Observability

Software Engineer

San Francisco

42 of 74

Please ask questions!

43 of 74

Building a datalake using Apache Beam

Joe Cullen

44 of 74

Schema support

Reuven Lax (@reuvenlax)

45 of 74

Schemas in Beam

Or, how I learned to stop worrying and write simple, easy-to-read, Beam pipelines.

46 of 74

In Beam data points are processed as elements

47 of 74

Converting elements into objects...

byte[*&&J%$#]

DoFn<byte[],...> You can access as bytes...

Custom Serialization

DoFn<Transaction,...>.withCoder( Coder thing....)

You can use a custom coder to encode / decode .. not a lot of fun....

Proto object

DoFn<Transaction,...> .withCoder(Easy peasy Proto)

Beam can also infer the object using Proto, Avro, BigQuery...

OR

OR

Low-level Java bytecode generated so that Schemas are nearly as efficient as hand-written Coders.

48 of 74

So what is a schema anyway?

Purchases

userId, itemId, transactionId: String

location: Location

Transactions

bank: String

transactionId : Long

purchaseAmount : Long

Location

latitude: Double

longitude: Double

A schema describes a type in terms of fields and values.

Each type has a number of fields (or columns in DB terminology)

Each field is named, and has a type.

Schemas can be nested arbitrarily, and can contain repeated or map fields as well!

49 of 74

A example use case

Using the purchases and transaction feeds we would like to find the total spend by users in the lower manhattan area. We also want some usage data on the different banks used in our transactions. One approach would be to:

  1. Read both streams
  2. Filter purchases to those in the lower Manhattan area using the location property.
  3. Calculate total spend per user (requires joining by transactionId)
  4. Complete some aggregations
    1. Top ten purchases
    2. The distribution of purchase amounts
    3. The total amount purchased using the bank

50 of 74

1. Read the streams

Two streams, encoded with Avro

PCollection<Purchase> purchases = p.apply(

PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic));

PCollection<Transaction> transactions = p.apply(

PubSubIO.readAvros(Transaction.class).fromTopic(transactionTopic));

51 of 74

2. Filter Purchases

Lets, filter out only purchases that happened in lower Manhattan.

Using the classic BeamJava API

PCollection<Purchase> filtered = purchases.apply(Filter.by(purchase -> {

return purchase.location.lat < 40.720 && purchase.location.lat > 40.699

&& purchase.location.lon < -73.969 && purchase.location.lon > -74.747

}));

52 of 74

3. Join on the transactionId field

final TupleTag<Purchase> lhsTag = new TupleTag<>();

final TupleTag<Transaction> rhsTag = new TupleTag<>();

PCollection<KV<Long, Purchase>> keyedPurchases = purchases.apply(WithKeys.of(p -> p.transactionId)

.withKeyType(TypeDescriptors.longs()));

PCollection<KV<Long, Transaction>> keyedTransactions = transactions.apply(WithKeys.of(t -> t.transactionId).

.withKeyType(TypeDescriptors.longs()));

PCollection<KV<String, Long>> userSpend =

KeyedPCollectionTuple.of(lhsTag, keyedPurchases)

.and(rhsTag, keyedTransactions)

.apply(CoGroupByKey.<Long>create())

.apply(Values.create())

.apply(ParDo.of(new DoFn<CoGbkResult, KV<String, Long>>() {

@ProcessElement

public void process(@Element CoGbkResult result, OutputReceiver<KV<String, Long>> o) {

Purchase purchase = result.getOnly(lhsTag);

for (Transaction transaction : result.getAll(rhsTag)) {

o.output(KV.of(purchase.userId, transaction.purchaseAmount));

}

}})

.apply(Sum.longsPerKey());

Oh dear all that for a join!

53 of 74

4. Calculate Data About Bank Usage

PCollection<KV<String, Long>> perBankPurchases = transactions.apply(

MapElements

.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))

.via(t -> KV.of(t.bank, t.purchaseAmount)));

PCollection<KV<String, Long>> totalPurchases = perBankPurchases.apply(Sum.longsPerKey());

PCollection<KV<String, List<Long>> topPurchases = perBankPurchases.apply(Top.largestPerKey(10));

PCollection<KV<String, List<Long>> purchaseDistribution =

perBankPurchases.apply(ApproximateQuantiles.<String, Long>perKey(21));

TupleTag<Long> totalPurchaseTag = new TupleTag<>();

TupleTag<List<Long>> topPurchaseTag = new TupleTag<>();

TupleTag<List<Long>> purchaseDistributionTag =new TupleTag<>();

PCollection<KV<String, Transaction>> perBankTransactions =

transactions.apply(WithKeys.of(t -> t.bank));

PCollection<KV<String, BankPurchaseStats>> = perBankTransactions.apply(

Combine.perKey(

CombineFns.compose()

.with(t -> t.purchaseAmount, Sum.ofLongs(), totalPurchaseTag)

.with(t -> t.purchaseAmount, Top.largestLongsFn(10), topPurchaseTag)

.with(t -> t.purchaseAmount, ApproximateQuantilesCombineFn.create(21),

purchaseDistributionTag)))

.apply(MapElements.into(BankPurchaseStats.class)

.via((CoCombineResult result) -> {

return new BankPurchaseStats(

result.get(totalPurchaseTag),

result.get(topPurchaseTag),

result.get(purchaseDistributionTag));}));

Mmmmmmm......

54 of 74

Schemas to the rescue!

55 of 74

Infer Schemas

Beam natively understands Avro. Data is transparently transformed into Beam Schemas.

PCollection<Purchase> purchases = p.apply(

PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic));

PCollection<Transaction> transactions = p.apply(

PubSubIO.readAvros(Transaction.class).fromTopic(transactionTopic));

56 of 74

Filters

Filter A PTransform for filtering a collection of schema types

Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type.

Filter.whereFieldName(NameOfField, predicate)

Filter.whereFieldNames(List<NameOfField>, predicate)

57 of 74

Joins

Join syntax with schemas, can now be defined by field name.

Join.innerJoin(PCollection)

.using(<Field Name>)

Join.innerJoin(transactions)

.using(“transactionId”)

58 of 74

Filters ... old vs new

Filters: Instead of

purchases.apply(Filter.by(purchase -> {

return purchase.location.lat < 40.720 && purchase.location.lat > 40.699

&& purchase.location.lon < -73.969 && purchase.location.lon > -74.747}));

use

purchases.apply(Filter.whereField(“location.lat”, lat -> lat < 40.720 && lat > 40.699)

.whereField(“location.lon”, lon -> lon < -73.969 && lon > -74.747));

59 of 74

Aggregations

Aggregations are now much simpler with easy access to the fields by name

aggregateField(<Field by Name> , function , <output field name>)

purchases.apply(Group

.byField(“bank”)

.aggregateField(“purchaseAmount”, Sum.ofLongs(), totalPurchase”)

.aggregateField(“purchaseAmount”, Top.largestLongsFn(10), top_purchases”)

60 of 74

Joins: Classic BeamJava

final TupleTag<Purchase> lhsTag = new TupleTag<>();

final TupleTag<Transaction> rhsTag = new TupleTag<>();

PCollection<KV<Long, Purchase>> keyedPurchases = purchases.apply(WithKeys.of(p -> p.transactionId)

.withKeyType(TypeDescriptors.longs()));

PCollection<KV<Long, Transaction>> keyedTransactions = transactions.apply(WithKeys.of(t -> t.transactionId).

.withKeyType(TypeDescriptors.longs()));

PCollection<KV<String, Long>> userSpend =

KeyedPCollectionTuple.of(lhsTag, keyedPurchases)

.and(rhsTag, keyedTransactions)

.apply(CoGroupByKey.<Long>create())

.apply(Values.create())

.apply(ParDo.of(new DoFn<CoGbkResult, KV<String, Long>>() {

@ProcessElement

public void process(@Element CoGbkResult result, OutputReceiver<KV<String, Long>> o) {

Purchase purchase = result.getOnly(lhsTag);

for (Transaction transaction : result.getAll(rhsTag)) {

o.output(KV.of(purchase.userId, transaction.purchaseAmount));

}

}})

.apply(Sum.longsPerKey());

PCollection<UserPurchases> userSums = purchases.apply(Join.innerJoin(transactions).using(“transactionId”))

.apply(Select.fieldNames(“lhs.userId”, “rhs.totalPurchase”))

.apply(Group.byField(“userId”).aggregateField(Sum.ofLongs(), totalPurchase”));

Old

New

61 of 74

Aggregations: Classic Beam

PCollection<KV<String, Long>> perBankPurchases = transactions.apply( MapElements

.into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.longs()))

.via(t -> KV.of(t.bank, t.purchaseAmount)));PCollection<KV<String, Long>> totalPurchases = perBankPurchases.apply (Sum.longsPerKey()); PCollection<KV<String, List<Long>> topPurchases = perBankPurchases.apply (Top.largestPerKey(10));PCollection<KV<String, List<Long>> purchaseDistribution = perBankPurchases.apply(ApproximateQuantiles.<String, Long>perKey(21));

TupleTag<Long> totalPurchaseTag = new TupleTag<>();

TupleTag<List<Long>> topPurchaseTag = new TupleTag<>();

TupleTag<List<Long>> purchaseDistributionTag =new TupleTag<>();

PCollection<KV<String, Transaction>> perBankTransactions =

transactions.apply(WithKeys.of(t -> t.bank));

PCollection<KV<String, BankPurchaseStats>> = perBankTransactions.apply(

Combine.perKey(

CombineFns.compose()

.with(t -> t.purchaseAmount, Sum.ofLongs(), totalPurchaseTag)

.with(t -> t.purchaseAmount, Top.largestLongsFn(10), topPurchaseTag)

.with(t -> t.purchaseAmount, ApproximateQuantilesCombineFn.create(21),

purchaseDistributionTag)))

.apply(MapElements.into(BankPurchaseStats.class)

.via((CoCombineResult result) -> {

return new BankPurchaseStats(result.get(totalPurchaseTag), result.get(topPurchaseTag), result.get(purchaseDistributionTag));}));

PCollection<KV<String, BankPurchaseStats>> bankStats =

transactions.apply(Group.byField(“bank”)

.aggregateField(“purchaseAmount”, Sum.ofLongs(), totalPurchase”)

.aggregateField(“purchaseAmount”, Top.largestLongsFn(10), top_purchases”)

.aggregateField(“purchaseAmount”, ApproximateQuantilesCombineFn.create(21), purchaseDistribution”));

Old

New

62 of 74

Even more fun with schemas!

63 of 74

Schemas: ParDo

Schemas are also understood by Beam primitives such as ParDo!

If the input has a schema, individual fields can be selected in the process method.

purchases.apply(ParDo.of(new DoFn<Purchase, EnrichedPurchase>() {

@ProcessElement

public void process(@Field(“userId”) String userId,

@Field(“location.latitude”) double lat,

@Field(“location.longitude”) double lon,

OutputReceiver<EnrichedPurchase> o) {

...

}

});

64 of 74

Schemas - making source and sinks easier....

Before Schema's the BigQuery sink required you to

  1. Define the schema for the Table you are creating.

p.apply(PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic))

.apply(BigQueryIO.<Purchase>write()

.to(purchasesTable)

.withSchema(new TableSchema().setFields(

ImmutableList.of(

new TableFieldSchema().setName(“userId”).setType(“STRING”),

new TableFieldSchema().setName(“itemId”).setType(“STRING”),

new TableFieldSchema().setName(“transactionId”).setType(“STRING”),

new TableFieldSchema().setName(“location”).setType(“RECORD”)

.setFields(ImmutableList.of(

new TableFieldSchema().setName(“latitude”).setType(“DOUBLE”),

new TableFieldSchema().setName(“longitude”).setType(“DOUBLE”))))))

65 of 74

Schemas - making source and sinks easier....

2. Convert your data elements into TableRow objects

.withFormatFunction( purchase -> {

return new TableRow()

.set(“userId”, purchase.userId)

.set(“itemId”, purchase.itemId)

.set(“transactionId”, purchase.transactionId)

.set(“location”, new TableRow()

.set(“latitude”, purchase.location.latitude)

.set(“longitude”, purchase.location.longitude));

}));

66 of 74

Schemas - making source and sinks easier....

All that code for a simple transport pipeline! The good news with schemas is that BigQueryIO can infer all of this information from the PCollection.

p.apply(PubSubIO.readAvros(Purchase.class)

.fromTopic(purchaseTopic))

.apply(BigQueryIO.<Purchase>write().to(purchasesTable).useBeamSchema());

67 of 74

Schemas - inferring

You can create PCollections out of your own classes too!

Beam knows how to infer schemas from a variety of Java classes.

Having a schema allows use of the schema transforms, BeamSQL, and also means you don’t need a Coder!

68 of 74

Schemas - inferring POJOs

@DefaultSchema(JavaFieldSchema.class)

public class PurchasePOJO {

public final String userId;

public final String itemId;

public final String transactionId;

public final Location location;

@SchemaCreate

Public Purchase(String userId, String itemId, String transactionId, Location location) {

...

}

...

}

69 of 74

Schemas - inferring JavaBeans

@DefaultSchema(JavaBeanSchema.class)

public class PurchaseBean {

private final String userId;

private final String itemId;

private final String transactionId;

private final Location location;

@SchemaCreate

public Purchase(String userId, String itemId, String transactionId, Location location) {

...

}

public String getUserId() { }

public String getItemId() { }

public String getTransactionId() { }

public Location getLocation() { }

...

}

70 of 74

Schemas - inferring AutoValue

@DefaultSchema(AutoValueSchema.class)

@AutoValue

public abstract class PurchaseValue {

public abstract String getUserId();

public abstract String getItemId();

public abstract String getTransactionId();

public abstract Location getLocation();

}

71 of 74

Schemas - Converting

PCollection<T> can always be converted to PCollection<S> as long as they both have compatible schemas.

PCollection<Row> rows = readPurchaseRows();

PCollection<PurchasePOJO> pojos = rows.apply(Convert.to(PurchasePOJO.class));

PCollection<PurchaseAutoValue> autovalues = pojos.apply(Convert.to(PurchaseValue.class));

72 of 74

Schemas - Modifying

// Add new fields.

rows.apply(AddFields.<T>create()

.field(“user”, FieldType.STRING)

.field(“userInformation.zipCode”, FieldType.INT32));

// Remove fields.

rows.apply(DropFields.fields(“user”, userInformation.zipCode”));

// Rename fields.

rows.apply(RenameFields.<T>create(

.rename(“userName”, userId”)

.rename(“location.country”, location.countryCode”));

73 of 74

Schemas - Future

  • Integrated with portability and supported in Python and Go.
  • Conversions from more types (e.g. Protocol Buffers)
  • Integration with more sources and sinks (e.g. JdbcIO)
  • Schema-based optimizations (e.g. discover and filter unused fields)

Also considering a fluent Dataframe-style API on top of schemas.

74 of 74

Apache Beam meetup London - #ApacheBeamLondon