7th Apache Beam meet up London
#ApacheBeamLondon
Hi everyone!
About me:
Apache Beam Committer
Google Developer Expert
Apache Beam meetup London - #ApacheBeamLondon
Update from the community
Apache Beam meetup London - #ApacheBeamLondon
Update from the community
Apache Beam meetup London - #ApacheBeamLondon
Apache Beam meetup London - #ApacheBeamLondon
Thanks to:
Apache Beam meetup London - #ApacheBeamLondon
Joe Cullen - Data Engineer [Datatonic]
Talk 2: Building a data lake using Beam
Meet the people
Reuven Lax (@reuvenlax) - Senior Staff Software Engineer [Google]
Talk 3: Schema support
Thomas Weise (@thweise) - Software Engineer [Lyft]
Talk 1: Dynamic pricing using Beam
Apache Beam meetup London - #ApacheBeamLondon
Dynamic Pricing using Apache Beam
Thomas Weise (@thweise)
Agenda
9
Dynamic Pricing
Supply/Demand curve
ETA
Pricing
Notifications
Detect Delays
Coupons
User Delight
Fraud
Behaviour Fingerprinting
Monetary Impact
Imperative to act fast
Top Destinations
Core Experience
10
Introduction to Dynamic Pricing
11
The Marketplace affects Prices
12
What is PrimeTime?
13
Why is PrimeTime Hard?
14
Legacy Pricing Infrastructure
15
Legacy architecture: A series of cron jobs
SFO, calendar_min_1: {gh6: 1.0, gh6: 2.0, ...}
NYC: calendar_min_1: {gh6, 2.0, gh6: 1.0, ...}
16
Problems
17
Can we use Flink?
18
Streaming Stack
19
Streaming
Application
(SQL, Java)
Stream / Schema
Registry
Deployment
Tooling
Metrics & Dashboards
Alerts
Logging
Amazon
EC2
Amazon S3
Wavefront
Salt
(Config / Orca)
Docker
Source
Sink
19
Streaming and Python
20
Python via Beam
Streaming
Application
(Python/Beam)
Source
Sink
21
Streaming based Pricing Infrastructure
22
Pipeline (conceptual outline)
23
kinesis events (source)
aggregate and window
filter events
run models to generate features (culminating in PT)
internal services
redis
ride_requested, app_open, ...
unique_users_per_min,
unique_requests_per_5_min, ...
conversion learner,
eta learner, ...
Lyft apps (phones)
valid sessions, dedupe, ...
Details of implementation
24
Gains
25
Beam and multiple languages
26
The Beam Vision
Beam Model: Fn Runners
Apache Flink
Apache Spark
Beam Model: Pipeline Construction
Other
Languages
Beam Java
Beam Python
Execution
Execution
Cloud Dataflow
Execution
27
Multi-Language Support
28
Python Example
p = beam.Pipeline(runner=runner, options=pipeline_options)
(p
| ReadFromText("/path/to/text*") | Map(lambda line: ...)
| WindowInto(FixedWindows(120)
trigger=AfterWatermark(� early=AfterProcessingTime(60),
late=AfterCount(1))� accumulation_mode=ACCUMULATING)
| CombinePerKey(sum))
| WriteToText("/path/to/outputs")
)
result = p.run()
( What, Where, When, How )
29
Portability (current)
⋮
input | Sum.PerKey()
Python
stats.Sum(s, input)
Go
SELECT key, SUM(value) FROM input GROUP BY key
SQL (via Java)
⋮
input.apply(
Sum.integersPerKey())
Java
Apache Spark
Apache Flink
Apache Apex
Gearpump
Cloud Dataflow
Apache Samza
Apache Nemo (incubating)
IBM Streams
Sum Per Key
Java objects
Sum Per Key
Portable protos
30
Beam Flink Runner
31
Portable Flink Runner
SDK
(Python)
Job Service
Artifact�Staging
Job Manager
Fn Services
(Beam Flink Task)
Task Manager
Executor / Fn API
Provision
Control
Data
Artifact�Retrieval
State
Logging
gRPC
Pipeline (protobuf)
Cluster
Runner
Dependencies
(optional)
python -m apache_beam.examples.wordcount \� --input=/etc/profile \� --output=/tmp/py-wordcount-direct \� --runner=PortableRunner \
--job_endpoint=localhost:8099 \� --streaming
Staging Location
(DFS, S3, …)
SDK Worker (UDFs)
SDK Worker (UDFs)
SDK Worker (Python)
Flink Job
32
Lyft Flink Runner Customizations
33
Workers have Fn
Runner
Runner worker launches services.
Control Service
Data Service
State Service
Logging Service
How slow is this ?
Fn API
decode, …, window
count
(messages
| 'reshuffle' >> beam.Reshuffle()
| 'decode' >> beam.Map(lambda x: (__import__('random').randint(0, 511), 1))
| 'noop1' >> beam.Map(lambda x : x)
| 'noop2' >> beam.Map(lambda x : x)
| 'noop3' >> beam.Map(lambda x : x)
| 'window' >> beam.WindowInto(window.GlobalWindows(),
trigger=Repeatedly(AfterProcessingTime(5 * 1000)),
accumulation_mode= AccumulationMode.DISCARDING)
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count)
)
35
Fast enough for real Python work !
36
Beam Portability Recap
37
Feature Support Matrix (Beam 2.12.0)
38
Lessons Learned
39
Lessons Learned
40
We’re Hiring! Apply at www.lyft.com/careers
or email data-recruiting@lyft.com
Data Engineering
Engineering Manager
San Francisco
Software Engineer
San Francisco, Seattle, & New York City
Data Infrastructure
Engineering Manager
San Francisco
Software Engineer
San Francisco & Seattle
Experimentation
Software Engineer
San Francisco
Streaming
Software Engineer
San Francisco
Observability
Software Engineer
San Francisco
Please ask questions!
Building a datalake using Apache Beam
Joe Cullen
Schema support
Reuven Lax (@reuvenlax)
Schemas in Beam
Or, how I learned to stop worrying and write simple, easy-to-read, Beam pipelines.
In Beam data points are processed as elements
Converting elements into objects...
byte[*&&J%$#]
DoFn<byte[],...> You can access as bytes...
Custom Serialization
DoFn<Transaction,...>.withCoder( Coder thing....)
You can use a custom coder to encode / decode .. not a lot of fun....
Proto object
DoFn<Transaction,...> .withCoder(Easy peasy Proto)
Beam can also infer the object using Proto, Avro, BigQuery...
OR
OR
Low-level Java bytecode generated so that Schemas are nearly as efficient as hand-written Coders.
So what is a schema anyway?
Purchases
userId, itemId, transactionId: String
location: Location
Transactions
bank: String
transactionId : Long
purchaseAmount : Long
Location
latitude: Double
longitude: Double
A schema describes a type in terms of fields and values.
Each type has a number of fields (or columns in DB terminology)
Each field is named, and has a type.
Schemas can be nested arbitrarily, and can contain repeated or map fields as well!
A example use case
Using the purchases and transaction feeds we would like to find the total spend by users in the lower manhattan area. We also want some usage data on the different banks used in our transactions. One approach would be to:
1. Read the streams
Two streams, encoded with Avro
PCollection<Purchase> purchases = p.apply(
PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic));
PCollection<Transaction> transactions = p.apply(
PubSubIO.readAvros(Transaction.class).fromTopic(transactionTopic));
2. Filter Purchases
Lets, filter out only purchases that happened in lower Manhattan.
Using the classic BeamJava API
PCollection<Purchase> filtered = purchases.apply(Filter.by(purchase -> {
return purchase.location.lat < 40.720 && purchase.location.lat > 40.699
&& purchase.location.lon < -73.969 && purchase.location.lon > -74.747
}));
3. Join on the transactionId field
final TupleTag<Purchase> lhsTag = new TupleTag<>();
final TupleTag<Transaction> rhsTag = new TupleTag<>();
PCollection<KV<Long, Purchase>> keyedPurchases = purchases.apply(WithKeys.of(p -> p.transactionId)
.withKeyType(TypeDescriptors.longs()));
PCollection<KV<Long, Transaction>> keyedTransactions = transactions.apply(WithKeys.of(t -> t.transactionId).
.withKeyType(TypeDescriptors.longs()));
PCollection<KV<String, Long>> userSpend =
KeyedPCollectionTuple.of(lhsTag, keyedPurchases)
.and(rhsTag, keyedTransactions)
.apply(CoGroupByKey.<Long>create())
.apply(Values.create())
.apply(ParDo.of(new DoFn<CoGbkResult, KV<String, Long>>() {
@ProcessElement
public void process(@Element CoGbkResult result, OutputReceiver<KV<String, Long>> o) {
Purchase purchase = result.getOnly(lhsTag);
for (Transaction transaction : result.getAll(rhsTag)) {
o.output(KV.of(purchase.userId, transaction.purchaseAmount));
}
}})
.apply(Sum.longsPerKey());
Oh dear all that for a join!
4. Calculate Data About Bank Usage
PCollection<KV<String, Long>> perBankPurchases = transactions.apply(
MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))
.via(t -> KV.of(t.bank, t.purchaseAmount)));
PCollection<KV<String, Long>> totalPurchases = perBankPurchases.apply(Sum.longsPerKey());
PCollection<KV<String, List<Long>> topPurchases = perBankPurchases.apply(Top.largestPerKey(10));
PCollection<KV<String, List<Long>> purchaseDistribution =
perBankPurchases.apply(ApproximateQuantiles.<String, Long>perKey(21));
TupleTag<Long> totalPurchaseTag = new TupleTag<>();
TupleTag<List<Long>> topPurchaseTag = new TupleTag<>();
TupleTag<List<Long>> purchaseDistributionTag =new TupleTag<>();
PCollection<KV<String, Transaction>> perBankTransactions =
transactions.apply(WithKeys.of(t -> t.bank));
PCollection<KV<String, BankPurchaseStats>> = perBankTransactions.apply(
Combine.perKey(
CombineFns.compose()
.with(t -> t.purchaseAmount, Sum.ofLongs(), totalPurchaseTag)
.with(t -> t.purchaseAmount, Top.largestLongsFn(10), topPurchaseTag)
.with(t -> t.purchaseAmount, ApproximateQuantilesCombineFn.create(21),
purchaseDistributionTag)))
.apply(MapElements.into(BankPurchaseStats.class)
.via((CoCombineResult result) -> {
return new BankPurchaseStats(
result.get(totalPurchaseTag),
result.get(topPurchaseTag),
result.get(purchaseDistributionTag));}));
Mmmmmmm......
Schemas to the rescue!
Infer Schemas
Beam natively understands Avro. Data is transparently transformed into Beam Schemas.
PCollection<Purchase> purchases = p.apply(
PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic));
PCollection<Transaction> transactions = p.apply(
PubSubIO.readAvros(Transaction.class).fromTopic(transactionTopic));
Filters
Filter A PTransform for filtering a collection of schema types
Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type.
Filter.whereFieldName(NameOfField, predicate)
Filter.whereFieldNames(List<NameOfField>, predicate)
Joins
Join syntax with schemas, can now be defined by field name.
Join.innerJoin(PCollection)
.using(<Field Name>)
Join.innerJoin(transactions)
.using(“transactionId”)
Filters ... old vs new
Filters: Instead of
purchases.apply(Filter.by(purchase -> {
return purchase.location.lat < 40.720 && purchase.location.lat > 40.699
&& purchase.location.lon < -73.969 && purchase.location.lon > -74.747}));
use
purchases.apply(Filter.whereField(“location.lat”, lat -> lat < 40.720 && lat > 40.699)
.whereField(“location.lon”, lon -> lon < -73.969 && lon > -74.747));
Aggregations
Aggregations are now much simpler with easy access to the fields by name
aggregateField(<Field by Name> , function , <output field name>)
purchases.apply(Group
.byField(“bank”)
.aggregateField(“purchaseAmount”, Sum.ofLongs(), “totalPurchase”)
.aggregateField(“purchaseAmount”, Top.largestLongsFn(10), “top_purchases”)
Joins: Classic BeamJava
final TupleTag<Purchase> lhsTag = new TupleTag<>();
final TupleTag<Transaction> rhsTag = new TupleTag<>();
PCollection<KV<Long, Purchase>> keyedPurchases = purchases.apply(WithKeys.of(p -> p.transactionId)
.withKeyType(TypeDescriptors.longs()));
PCollection<KV<Long, Transaction>> keyedTransactions = transactions.apply(WithKeys.of(t -> t.transactionId).
.withKeyType(TypeDescriptors.longs()));
PCollection<KV<String, Long>> userSpend =
KeyedPCollectionTuple.of(lhsTag, keyedPurchases)
.and(rhsTag, keyedTransactions)
.apply(CoGroupByKey.<Long>create())
.apply(Values.create())
.apply(ParDo.of(new DoFn<CoGbkResult, KV<String, Long>>() {
@ProcessElement
public void process(@Element CoGbkResult result, OutputReceiver<KV<String, Long>> o) {
Purchase purchase = result.getOnly(lhsTag);
for (Transaction transaction : result.getAll(rhsTag)) {
o.output(KV.of(purchase.userId, transaction.purchaseAmount));
}
}})
.apply(Sum.longsPerKey());
PCollection<UserPurchases> userSums = purchases.apply(Join.innerJoin(transactions).using(“transactionId”))
.apply(Select.fieldNames(“lhs.userId”, “rhs.totalPurchase”))
.apply(Group.byField(“userId”).aggregateField(Sum.ofLongs(), “totalPurchase”));
Old
New
Aggregations: Classic Beam
PCollection<KV<String, Long>> perBankPurchases = transactions.apply( MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.longs()))
.via(t -> KV.of(t.bank, t.purchaseAmount)));PCollection<KV<String, Long>> totalPurchases = perBankPurchases.apply (Sum.longsPerKey()); PCollection<KV<String, List<Long>> topPurchases = perBankPurchases.apply (Top.largestPerKey(10));PCollection<KV<String, List<Long>> purchaseDistribution = perBankPurchases.apply(ApproximateQuantiles.<String, Long>perKey(21));
TupleTag<Long> totalPurchaseTag = new TupleTag<>();
TupleTag<List<Long>> topPurchaseTag = new TupleTag<>();
TupleTag<List<Long>> purchaseDistributionTag =new TupleTag<>();
PCollection<KV<String, Transaction>> perBankTransactions =
transactions.apply(WithKeys.of(t -> t.bank));
PCollection<KV<String, BankPurchaseStats>> = perBankTransactions.apply(
Combine.perKey(
CombineFns.compose()
.with(t -> t.purchaseAmount, Sum.ofLongs(), totalPurchaseTag)
.with(t -> t.purchaseAmount, Top.largestLongsFn(10), topPurchaseTag)
.with(t -> t.purchaseAmount, ApproximateQuantilesCombineFn.create(21),
purchaseDistributionTag)))
.apply(MapElements.into(BankPurchaseStats.class)
.via((CoCombineResult result) -> {
return new BankPurchaseStats(result.get(totalPurchaseTag), result.get(topPurchaseTag), result.get(purchaseDistributionTag));}));
PCollection<KV<String, BankPurchaseStats>> bankStats =
transactions.apply(Group.byField(“bank”)
.aggregateField(“purchaseAmount”, Sum.ofLongs(), “totalPurchase”)
.aggregateField(“purchaseAmount”, Top.largestLongsFn(10), “top_purchases”)
.aggregateField(“purchaseAmount”, ApproximateQuantilesCombineFn.create(21), “purchaseDistribution”));
Old
New
Even more fun with schemas!
Schemas: ParDo
Schemas are also understood by Beam primitives such as ParDo!
If the input has a schema, individual fields can be selected in the process method.
purchases.apply(ParDo.of(new DoFn<Purchase, EnrichedPurchase>() {
@ProcessElement
public void process(@Field(“userId”) String userId,
@Field(“location.latitude”) double lat,
@Field(“location.longitude”) double lon,
OutputReceiver<EnrichedPurchase> o) {
...
}
});
Schemas - making source and sinks easier....
Before Schema's the BigQuery sink required you to
p.apply(PubSubIO.readAvros(Purchase.class).fromTopic(purchaseTopic))
.apply(BigQueryIO.<Purchase>write()
.to(purchasesTable)
.withSchema(new TableSchema().setFields(
ImmutableList.of(
new TableFieldSchema().setName(“userId”).setType(“STRING”),
new TableFieldSchema().setName(“itemId”).setType(“STRING”),
new TableFieldSchema().setName(“transactionId”).setType(“STRING”),
new TableFieldSchema().setName(“location”).setType(“RECORD”)
.setFields(ImmutableList.of(
new TableFieldSchema().setName(“latitude”).setType(“DOUBLE”),
new TableFieldSchema().setName(“longitude”).setType(“DOUBLE”))))))
Schemas - making source and sinks easier....
2. Convert your data elements into TableRow objects
.withFormatFunction( purchase -> {
return new TableRow()
.set(“userId”, purchase.userId)
.set(“itemId”, purchase.itemId)
.set(“transactionId”, purchase.transactionId)
.set(“location”, new TableRow()
.set(“latitude”, purchase.location.latitude)
.set(“longitude”, purchase.location.longitude));
}));
Schemas - making source and sinks easier....
All that code for a simple transport pipeline! The good news with schemas is that BigQueryIO can infer all of this information from the PCollection.
p.apply(PubSubIO.readAvros(Purchase.class)
.fromTopic(purchaseTopic))
.apply(BigQueryIO.<Purchase>write().to(purchasesTable).useBeamSchema());
Schemas - inferring
You can create PCollections out of your own classes too!
Beam knows how to infer schemas from a variety of Java classes.
Having a schema allows use of the schema transforms, BeamSQL, and also means you don’t need a Coder!
Schemas - inferring POJOs
@DefaultSchema(JavaFieldSchema.class)
public class PurchasePOJO {
public final String userId;
public final String itemId;
public final String transactionId;
public final Location location;
@SchemaCreate
Public Purchase(String userId, String itemId, String transactionId, Location location) {
...
}
...
}
Schemas - inferring JavaBeans
@DefaultSchema(JavaBeanSchema.class)
public class PurchaseBean {
private final String userId;
private final String itemId;
private final String transactionId;
private final Location location;
@SchemaCreate
public Purchase(String userId, String itemId, String transactionId, Location location) {
...
}
public String getUserId() { … }
public String getItemId() { … }
public String getTransactionId() { … }
public Location getLocation() { … }
...
}
Schemas - inferring AutoValue
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class PurchaseValue {
public abstract String getUserId();
public abstract String getItemId();
public abstract String getTransactionId();
public abstract Location getLocation();
}
Schemas - Converting
PCollection<T> can always be converted to PCollection<S> as long as they both have compatible schemas.
PCollection<Row> rows = readPurchaseRows();
PCollection<PurchasePOJO> pojos = rows.apply(Convert.to(PurchasePOJO.class));
PCollection<PurchaseAutoValue> autovalues = pojos.apply(Convert.to(PurchaseValue.class));
Schemas - Modifying
// Add new fields.
rows.apply(AddFields.<T>create()
.field(“user”, FieldType.STRING)
.field(“userInformation.zipCode”, FieldType.INT32));
// Remove fields.
rows.apply(DropFields.fields(“user”, “userInformation.zipCode”));
// Rename fields.
rows.apply(RenameFields.<T>create(
.rename(“userName”, “userId”)
.rename(“location.country”, “location.countryCode”));
Schemas - Future
Also considering a fluent Dataframe-style API on top of schemas.
Apache Beam meetup London - #ApacheBeamLondon