1 of 26

Making Schemas Portable

2 of 26

Brian Hulette

Software Engineer at Google

Apache Arrow Committer

Apache Beam Contributor

@BrianHulette

github.com/TheNeuralBit

theneuralbit.com

3 of 26

Agenda

  • Intro to Beam (Java) Schemas
  • Portable Schemas
  • Demo - SqlTransform in Python

4 of 26

Intro to Beam (Java) Schemas

(Slides borrowed from Reuven Lax’s "Schemas in Beam" presentation)

5 of 26

In Beam data points are processed as elements

6 of 26

Converting elements into objects...

byte[*&&J%$#]

DoFn<byte[],...> You can access as bytes...

Custom Serialization

DoFn<Transaction,...>.withCoder( Coder thing....)

You can use a custom coder to encode / decode .. not a lot of fun....

Proto object

DoFn<Transaction,...> .withCoder(Easy peasy Proto)

Beam can also infer the object using Proto, Avro, BigQuery...

OR

OR

Low-level Java bytecode generated so that Schemas are nearly as efficient as hand-written Coders.

7 of 26

So what is a schema anyway?

Purchases

userId, itemId, transactionId: String

location: Location

Transactions

bank: String

transactionId : Long

purchaseAmount : Long

Location

latitude: Double

longitude: Double

A schema describes a type in terms of fields and values.

Each type has a number of fields (or columns in DB terminology)

Each field is named, and has a type.

Schemas can be nested arbitrarily, and can contain repeated or map fields as well!

8 of 26

A example use case

Using the purchases and transaction feeds we would like to find the total spend by users in the lower manhattan area. We also want some usage data on the different banks used in our transactions. One approach would be to:

  1. Read both streams
  2. Filter purchases to those in the lower Manhattan area using the location property.
  3. Calculate total spend per user (requires joining by transactionId)
  4. Complete some aggregations
    1. Top ten purchases
    2. The distribution of purchase amounts
    3. The total amount purchased using the bank

9 of 26

1. Read the streams

Two streams, encoded with Avro

PCollection<Purchase> purchases = p.apply(

PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic));

PCollection<Transaction> transactions = p.apply(

PubSubIO.readAvros(Transaction.class).fromTopic(transactionTopic));

10 of 26

2. Filter Purchases

Lets, filter out only purchases that happened in lower Manhattan.

Using the classic BeamJava API

PCollection<Purchase> filtered = purchases.apply(Filter.by(purchase -> {

return purchase.location.lat < 40.720 && purchase.location.lat > 40.699

&& purchase.location.lon < -73.969 && purchase.location.lon > -74.747

}));

11 of 26

3. Join on the transactionId field

final TupleTag<Purchase> lhsTag = new TupleTag<>();

final TupleTag<Transaction> rhsTag = new TupleTag<>();

PCollection<KV<Long, Purchase>> keyedPurchases = purchases.apply(WithKeys.of(p -> p.transactionId)

.withKeyType(TypeDescriptors.longs()));

PCollection<KV<Long, Transaction>> keyedTransactions = transactions.apply(WithKeys.of(t -> t.transactionId).

.withKeyType(TypeDescriptors.longs()));

PCollection<KV<String, Long>> userSpend =

KeyedPCollectionTuple.of(lhsTag, keyedPurchases)

.and(rhsTag, keyedTransactions)

.apply(CoGroupByKey.<Long>create())

.apply(Values.create())

.apply(ParDo.of(new DoFn<CoGbkResult, KV<String, Long>>() {

@ProcessElement

public void process(@Element CoGbkResult result, OutputReceiver<KV<String, Long>> o) {

Purchase purchase = result.getOnly(lhsTag);

for (Transaction transaction : result.getAll(rhsTag)) {

o.output(KV.of(purchase.userId, transaction.purchaseAmount));

}

}})

.apply(Sum.longsPerKey());

Oh dear all that for a join!

12 of 26

4. Calculate Data About Bank Usage

PCollection<KV<String, Long>> perBankPurchases = transactions.apply(

MapElements

.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))

.via(t -> KV.of(t.bank, t.purchaseAmount)));

PCollection<KV<String, Long>> totalPurchases = perBankPurchases.apply(Sum.longsPerKey());

PCollection<KV<String, List<Long>> topPurchases = perBankPurchases.apply(Top.largestPerKey(10));

PCollection<KV<String, List<Long>> purchaseDistribution =

perBankPurchases.apply(ApproximateQuantiles.<String, Long>perKey(21));

TupleTag<Long> totalPurchaseTag = new TupleTag<>();

TupleTag<List<Long>> topPurchaseTag = new TupleTag<>();

TupleTag<List<Long>> purchaseDistributionTag =new TupleTag<>();

PCollection<KV<String, Transaction>> perBankTransactions =

transactions.apply(WithKeys.of(t -> t.bank));

PCollection<KV<String, BankPurchaseStats>> = perBankTransactions.apply(

Combine.perKey(

CombineFns.compose()

.with(t -> t.purchaseAmount, Sum.ofLongs(), totalPurchaseTag)

.with(t -> t.purchaseAmount, Top.largestLongsFn(10), topPurchaseTag)

.with(t -> t.purchaseAmount, ApproximateQuantilesCombineFn.create(21),

purchaseDistributionTag)))

.apply(MapElements.into(BankPurchaseStats.class)

.via((CoCombineResult result) -> {

return new BankPurchaseStats(

result.get(totalPurchaseTag),

result.get(topPurchaseTag),

result.get(purchaseDistributionTag));}));

Mmmmmmm......

13 of 26

Infer Schemas

Beam natively understands Avro. Data is transparently transformed into Beam Schemas.

PCollection<Purchase> purchases = p.apply(

PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic));

PCollection<Transaction> transactions = p.apply(

PubSubIO.readAvros(Transaction.class).fromTopic(transactionTopic));

14 of 26

Filters ... old vs new

Filters: Instead of

purchases.apply(Filter.by(purchase -> {

return purchase.location.lat < 40.720 && purchase.location.lat > 40.699

&& purchase.location.lon < -73.969 && purchase.location.lon > -74.747}));

use

purchases.apply(Filter.whereField(“location.lat”, lat -> lat < 40.720 && lat > 40.699)

.whereField(“location.lon”, lon -> lon < -73.969 && lon > -74.747));

15 of 26

Joins: Classic BeamJava

final TupleTag<Purchase> lhsTag = new TupleTag<>();

final TupleTag<Transaction> rhsTag = new TupleTag<>();

PCollection<KV<Long, Purchase>> keyedPurchases = purchases.apply(WithKeys.of(p -> p.transactionId)

.withKeyType(TypeDescriptors.longs()));

PCollection<KV<Long, Transaction>> keyedTransactions = transactions.apply(WithKeys.of(t -> t.transactionId).

.withKeyType(TypeDescriptors.longs()));

PCollection<KV<String, Long>> userSpend =

KeyedPCollectionTuple.of(lhsTag, keyedPurchases)

.and(rhsTag, keyedTransactions)

.apply(CoGroupByKey.<Long>create())

.apply(Values.create())

.apply(ParDo.of(new DoFn<CoGbkResult, KV<String, Long>>() {

@ProcessElement

public void process(@Element CoGbkResult result, OutputReceiver<KV<String, Long>> o) {

Purchase purchase = result.getOnly(lhsTag);

for (Transaction transaction : result.getAll(rhsTag)) {

o.output(KV.of(purchase.userId, transaction.purchaseAmount));

}

}})

.apply(Sum.longsPerKey());

PCollection<UserPurchases> userSums = purchases.apply(Join.innerJoin(transactions).using(“transactionId”))

.apply(Select.fieldNames(“lhs.userId”, “rhs.totalPurchase”))

.apply(Group.byField(“userId”).aggregateField(Sum.ofLongs(), totalPurchase”));

Old

New

16 of 26

Aggregations: Classic Beam

PCollection<KV<String, Long>> perBankPurchases = transactions.apply( MapElements

.into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.longs()))

.via(t -> KV.of(t.bank, t.purchaseAmount)));PCollection<KV<String, Long>> totalPurchases = perBankPurchases.apply (Sum.longsPerKey()); PCollection<KV<String, List<Long>> topPurchases = perBankPurchases.apply (Top.largestPerKey(10));PCollection<KV<String, List<Long>> purchaseDistribution = perBankPurchases.apply(ApproximateQuantiles.<String, Long>perKey(21));

TupleTag<Long> totalPurchaseTag = new TupleTag<>();

TupleTag<List<Long>> topPurchaseTag = new TupleTag<>();

TupleTag<List<Long>> purchaseDistributionTag =new TupleTag<>();

PCollection<KV<String, Transaction>> perBankTransactions =

transactions.apply(WithKeys.of(t -> t.bank));

PCollection<KV<String, BankPurchaseStats>> = perBankTransactions.apply(

Combine.perKey(

CombineFns.compose()

.with(t -> t.purchaseAmount, Sum.ofLongs(), totalPurchaseTag)

.with(t -> t.purchaseAmount, Top.largestLongsFn(10), topPurchaseTag)

.with(t -> t.purchaseAmount, ApproximateQuantilesCombineFn.create(21),

purchaseDistributionTag)))

.apply(MapElements.into(BankPurchaseStats.class)

.via((CoCombineResult result) -> {

return new BankPurchaseStats(result.get(totalPurchaseTag), result.get(topPurchaseTag), result.get(purchaseDistributionTag));}));

PCollection<KV<String, BankPurchaseStats>> bankStats =

transactions.apply(Group.byField(“bank”)

.aggregateField(“purchaseAmount”, Sum.ofLongs(), totalPurchase”)

.aggregateField(“purchaseAmount”, Top.largestLongsFn(10), top_purchases”)

.aggregateField(“purchaseAmount”, ApproximateQuantilesCombineFn.create(21), purchaseDistribution”));

Old

New

17 of 26

Portable Schemas

18 of 26

What do you mean “Portable Schemas”?

Beam Model: Fn Runners

Apache Flink

Apache Spark

Beam Model: Pipeline Construction

Other

Languages

Beam Java

Beam Python

Execution

Execution

Cloud Dataflow

Execution

Currently chemas exist here…

Schema inference, schema-aware transforms

...and here

Generated Coder

We want them here!

Common representation for every SDK and runner

19 of 26

Motivating use-case: SqlTransform in Python

  • SqlTransform is a Java transform → Exercises cross-language transforms
  • Inputs and outputs are Rows (schema-aware) → Requires portable schemas
  • Bonus: It’s generally useful!

p | beam.io.ReadFromText(input_path)

| beam.ParDo(WordExtractingDoFn())

| beam.SqlTransform('SELECT word, COUNT(*) as `count` FROM PCOLLECTION GROUP BY word')

| beam.Map(lambda row: '{}: {}'.format(row.word, row.count))

| beam.io.WriteToText(output_path)

20 of 26

What do we need for this?

  1. A way to use a Java transform from the Python SDK
    • Done! It’s possible to use cross-language transforms from Python on the Flink Runner
  2. A portable representation for the schema
    • So Python and Java can communicate the type of the SqlTransform input and output
  3. A standard way to serialize data that has a schema (i.e. Rows)
    • For data transfer between Python and Beam contexts
  4. A representation for Rows in Python
    • So Python user code can understand the input and output

21 of 26

  1. A portable representation for the schema

message Schema {

repeated Field fields = 1;

string id = 2;

}

message Field {

string name = 1;

string description = 2;

FieldType type = 3;

int32 id = 4;

int32 encoding_position = 5;

}

message FieldType {

bool nullable = 1;

oneof type_info {

AtomicType atomic_type = 2;

ArrayType array_type = 3;

MapType map_type = 4;

RowType row_type = 5;

LogicalType logical_type = 6;

}

}

22 of 26

2. A standard way to serialize Rows

  • RowCoder already exists in Java
  • Need to make it a standard coder, and implement it in Python
  • Currently implemented in pending PR#9188

23 of 26

3. A representation for Rows in Python

  • Currently implemented in pending PR#9188
  • Relies on typing.NamedTuple, PEP484 type hints, and numpy (for numeric type hints)

class Location(NamedTuple):

latitude: float

longitude: float

class Purchases(NamedTuple):

userId: str

itemId: str

transactionId: int

location: Location

class Transaction(NamedTuple):

bank: str

transactionId: int

purchaseAmount: int

24 of 26

Demo - SqlTransform in Python

25 of 26

What else can we do with Portable Schemas?

  • Schema support in Go
  • Fluent Dataframe-style API
  • Portable runner optimizations
    • Spark Structured Streaming - Catalyst Optimizer
    • predicate/projection pushdown
  • Cross-language transforms with structured data
    • Schema-aware IOs, SQL, and relational transforms for all!
  • Columnar and vectorization

26 of 26

Thank You!