Making Schemas Portable
Brian Hulette
Software Engineer at Google
Apache Arrow Committer
Apache Beam Contributor
Agenda
Intro to Beam (Java) Schemas
(Slides borrowed from Reuven Lax’s "Schemas in Beam" presentation)
In Beam data points are processed as elements
Converting elements into objects...
byte[*&&J%$#]
DoFn<byte[],...> You can access as bytes...
Custom Serialization
DoFn<Transaction,...>.withCoder( Coder thing....)
You can use a custom coder to encode / decode .. not a lot of fun....
Proto object
DoFn<Transaction,...> .withCoder(Easy peasy Proto)
Beam can also infer the object using Proto, Avro, BigQuery...
OR
OR
Low-level Java bytecode generated so that Schemas are nearly as efficient as hand-written Coders.
So what is a schema anyway?
Purchases
userId, itemId, transactionId: String
location: Location
Transactions
bank: String
transactionId : Long
purchaseAmount : Long
Location
latitude: Double
longitude: Double
A schema describes a type in terms of fields and values.
Each type has a number of fields (or columns in DB terminology)
Each field is named, and has a type.
Schemas can be nested arbitrarily, and can contain repeated or map fields as well!
A example use case
Using the purchases and transaction feeds we would like to find the total spend by users in the lower manhattan area. We also want some usage data on the different banks used in our transactions. One approach would be to:
1. Read the streams
Two streams, encoded with Avro
PCollection<Purchase> purchases = p.apply(
PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic));
PCollection<Transaction> transactions = p.apply(
PubSubIO.readAvros(Transaction.class).fromTopic(transactionTopic));
2. Filter Purchases
Lets, filter out only purchases that happened in lower Manhattan.
Using the classic BeamJava API
PCollection<Purchase> filtered = purchases.apply(Filter.by(purchase -> {
return purchase.location.lat < 40.720 && purchase.location.lat > 40.699
&& purchase.location.lon < -73.969 && purchase.location.lon > -74.747
}));
3. Join on the transactionId field
final TupleTag<Purchase> lhsTag = new TupleTag<>();
final TupleTag<Transaction> rhsTag = new TupleTag<>();
PCollection<KV<Long, Purchase>> keyedPurchases = purchases.apply(WithKeys.of(p -> p.transactionId)
.withKeyType(TypeDescriptors.longs()));
PCollection<KV<Long, Transaction>> keyedTransactions = transactions.apply(WithKeys.of(t -> t.transactionId).
.withKeyType(TypeDescriptors.longs()));
PCollection<KV<String, Long>> userSpend =
KeyedPCollectionTuple.of(lhsTag, keyedPurchases)
.and(rhsTag, keyedTransactions)
.apply(CoGroupByKey.<Long>create())
.apply(Values.create())
.apply(ParDo.of(new DoFn<CoGbkResult, KV<String, Long>>() {
@ProcessElement
public void process(@Element CoGbkResult result, OutputReceiver<KV<String, Long>> o) {
Purchase purchase = result.getOnly(lhsTag);
for (Transaction transaction : result.getAll(rhsTag)) {
o.output(KV.of(purchase.userId, transaction.purchaseAmount));
}
}})
.apply(Sum.longsPerKey());
Oh dear all that for a join!
4. Calculate Data About Bank Usage
PCollection<KV<String, Long>> perBankPurchases = transactions.apply(
MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))
.via(t -> KV.of(t.bank, t.purchaseAmount)));
PCollection<KV<String, Long>> totalPurchases = perBankPurchases.apply(Sum.longsPerKey());
PCollection<KV<String, List<Long>> topPurchases = perBankPurchases.apply(Top.largestPerKey(10));
PCollection<KV<String, List<Long>> purchaseDistribution =
perBankPurchases.apply(ApproximateQuantiles.<String, Long>perKey(21));
TupleTag<Long> totalPurchaseTag = new TupleTag<>();
TupleTag<List<Long>> topPurchaseTag = new TupleTag<>();
TupleTag<List<Long>> purchaseDistributionTag =new TupleTag<>();
PCollection<KV<String, Transaction>> perBankTransactions =
transactions.apply(WithKeys.of(t -> t.bank));
PCollection<KV<String, BankPurchaseStats>> = perBankTransactions.apply(
Combine.perKey(
CombineFns.compose()
.with(t -> t.purchaseAmount, Sum.ofLongs(), totalPurchaseTag)
.with(t -> t.purchaseAmount, Top.largestLongsFn(10), topPurchaseTag)
.with(t -> t.purchaseAmount, ApproximateQuantilesCombineFn.create(21),
purchaseDistributionTag)))
.apply(MapElements.into(BankPurchaseStats.class)
.via((CoCombineResult result) -> {
return new BankPurchaseStats(
result.get(totalPurchaseTag),
result.get(topPurchaseTag),
result.get(purchaseDistributionTag));}));
Mmmmmmm......
Infer Schemas
Beam natively understands Avro. Data is transparently transformed into Beam Schemas.
PCollection<Purchase> purchases = p.apply(
PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic));
PCollection<Transaction> transactions = p.apply(
PubSubIO.readAvros(Transaction.class).fromTopic(transactionTopic));
Filters ... old vs new
Filters: Instead of
purchases.apply(Filter.by(purchase -> {
return purchase.location.lat < 40.720 && purchase.location.lat > 40.699
&& purchase.location.lon < -73.969 && purchase.location.lon > -74.747}));
use
purchases.apply(Filter.whereField(“location.lat”, lat -> lat < 40.720 && lat > 40.699)
.whereField(“location.lon”, lon -> lon < -73.969 && lon > -74.747));
Joins: Classic BeamJava
final TupleTag<Purchase> lhsTag = new TupleTag<>();
final TupleTag<Transaction> rhsTag = new TupleTag<>();
PCollection<KV<Long, Purchase>> keyedPurchases = purchases.apply(WithKeys.of(p -> p.transactionId)
.withKeyType(TypeDescriptors.longs()));
PCollection<KV<Long, Transaction>> keyedTransactions = transactions.apply(WithKeys.of(t -> t.transactionId).
.withKeyType(TypeDescriptors.longs()));
PCollection<KV<String, Long>> userSpend =
KeyedPCollectionTuple.of(lhsTag, keyedPurchases)
.and(rhsTag, keyedTransactions)
.apply(CoGroupByKey.<Long>create())
.apply(Values.create())
.apply(ParDo.of(new DoFn<CoGbkResult, KV<String, Long>>() {
@ProcessElement
public void process(@Element CoGbkResult result, OutputReceiver<KV<String, Long>> o) {
Purchase purchase = result.getOnly(lhsTag);
for (Transaction transaction : result.getAll(rhsTag)) {
o.output(KV.of(purchase.userId, transaction.purchaseAmount));
}
}})
.apply(Sum.longsPerKey());
PCollection<UserPurchases> userSums = purchases.apply(Join.innerJoin(transactions).using(“transactionId”))
.apply(Select.fieldNames(“lhs.userId”, “rhs.totalPurchase”))
.apply(Group.byField(“userId”).aggregateField(Sum.ofLongs(), “totalPurchase”));
Old
New
Aggregations: Classic Beam
PCollection<KV<String, Long>> perBankPurchases = transactions.apply( MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.longs()))
.via(t -> KV.of(t.bank, t.purchaseAmount)));PCollection<KV<String, Long>> totalPurchases = perBankPurchases.apply (Sum.longsPerKey()); PCollection<KV<String, List<Long>> topPurchases = perBankPurchases.apply (Top.largestPerKey(10));PCollection<KV<String, List<Long>> purchaseDistribution = perBankPurchases.apply(ApproximateQuantiles.<String, Long>perKey(21));
TupleTag<Long> totalPurchaseTag = new TupleTag<>();
TupleTag<List<Long>> topPurchaseTag = new TupleTag<>();
TupleTag<List<Long>> purchaseDistributionTag =new TupleTag<>();
PCollection<KV<String, Transaction>> perBankTransactions =
transactions.apply(WithKeys.of(t -> t.bank));
PCollection<KV<String, BankPurchaseStats>> = perBankTransactions.apply(
Combine.perKey(
CombineFns.compose()
.with(t -> t.purchaseAmount, Sum.ofLongs(), totalPurchaseTag)
.with(t -> t.purchaseAmount, Top.largestLongsFn(10), topPurchaseTag)
.with(t -> t.purchaseAmount, ApproximateQuantilesCombineFn.create(21),
purchaseDistributionTag)))
.apply(MapElements.into(BankPurchaseStats.class)
.via((CoCombineResult result) -> {
return new BankPurchaseStats(result.get(totalPurchaseTag), result.get(topPurchaseTag), result.get(purchaseDistributionTag));}));
PCollection<KV<String, BankPurchaseStats>> bankStats =
transactions.apply(Group.byField(“bank”)
.aggregateField(“purchaseAmount”, Sum.ofLongs(), “totalPurchase”)
.aggregateField(“purchaseAmount”, Top.largestLongsFn(10), “top_purchases”)
.aggregateField(“purchaseAmount”, ApproximateQuantilesCombineFn.create(21), “purchaseDistribution”));
Old
New
Portable Schemas
What do you mean “Portable Schemas”?
Beam Model: Fn Runners
Apache Flink
Apache Spark
Beam Model: Pipeline Construction
Other
Languages
Beam Java
Beam Python
Execution
Execution
Cloud Dataflow
Execution
Currently chemas exist here…
Schema inference, schema-aware transforms
...and here
Generated Coder
We want them here!
Common representation for every SDK and runner
Motivating use-case: SqlTransform in Python
p | beam.io.ReadFromText(input_path)
| beam.ParDo(WordExtractingDoFn())
| beam.SqlTransform('SELECT word, COUNT(*) as `count` FROM PCOLLECTION GROUP BY word')
| beam.Map(lambda row: '{}: {}'.format(row.word, row.count))
| beam.io.WriteToText(output_path)
What do we need for this?
message Schema {
repeated Field fields = 1;
string id = 2;
}
message Field {
string name = 1;
string description = 2;
FieldType type = 3;
int32 id = 4;
int32 encoding_position = 5;
}
message FieldType {
bool nullable = 1;
oneof type_info {
AtomicType atomic_type = 2;
ArrayType array_type = 3;
MapType map_type = 4;
RowType row_type = 5;
LogicalType logical_type = 6;
}
}
2. A standard way to serialize Rows
3. A representation for Rows in Python
class Location(NamedTuple):
latitude: float
longitude: float
class Purchases(NamedTuple):
userId: str
itemId: str
transactionId: int
location: Location
class Transaction(NamedTuple):
bank: str
transactionId: int
purchaseAmount: int
Demo - SqlTransform in Python
What else can we do with Portable Schemas?
Thank You!