Contracts, Schema Evolution and Data Pipelines

5th Elephant 2019,

Bangalore

Share learnings from mistakes and the best practices we have developed over the years in building data processing systems

Generally, in the industry there are lot of tools and technologies which are available which solve a particular set of problems..as and when we see them

I’ll try to cover the approaches/methodologies that we’ve refined over the years when building data pipelines

This talk will be particularly helpful for organizations who are new to building data processing pipelines

or are facing the same challenges as we have faced

2

Layout of the talk

  • About Zapr
  • How the problem statement influenced building a data processing pipeline
  • The challenges with building an event processing pipeline
  • Solving for Enrichment Issues
  • What is a contract and how it can address many of the challenges
  • How can we use thrift IDL for writing these contracts
  • Benefits from following this in your organization

3

Zapr and Data processing

4

Zapr is a media tech company which is profiling millions of users for what content they prefer on tv. It does this by implementing ACR detection at scale.

The technology works by taking periodic one-way frequency fingerprints on the device and sending them to zapr servers as queries to identify the content the user might be watching

On the server side this query is matched against a library of fingerprints captured from tv around the same time and an answer is declared if the query matches one of the 500 channels that zapr is tracking on tv

5

The technology works by taking periodic one-way frequency fingerprints on the device and sending them to zapr servers as queries to identify the content the user might be watching

On the server side this query is matched against a library of fingerprints captured from tv around the same time and an answer is declared if the query matches one of the 500 channels that zapr is tracking on tv

6

The engineering problem for us to solve was building this massive scale data processing pipelines where at different steps different processing requirement is needed. We also needed to do this in a very robust and reliable manner so as to not have no data losses or correctness problems. The data collection and matching infrastructure which is needed on server has to be really scalable ..And be able to generate 200+gb of fact data reliably everyday

Tight Coupling, No clear separation of Responsibility

What does it take to build such a scalable system ?

A Services based architecture ?

Leads to tight coupling between not so dependent services

Leads to inversion of responsibility of roles when services are talking to each other

7

Example 1

What does it take to build such a scalable system ?

  • The volume of data is very high
  • The results are not needed to be sent back to the client immediately
  • A Message carrying bus like Kafka is Needed

<insert diagrams>

8

The volume of data received on the servers is very high and the requests received on the server doesn’t need to be answered then and there..

hence, we could offload the messages to be processed by writing to a temporary buffer, for us we chose Kafka,

where we will within some time time guarantees/constraints..ensure that all the messages are processed

Kafka as the Main Message Bus

Repeated pattern of Reading and Writing to the Bus makes a pipeline of event processing

9

Message buses generally solve the problem of de-coupling producers and consumer at a massive scale..

where you can have variety of consumers getting access to feeds of data and can do some form of processing over it.

Processing like enrichment, filtration, batch writing to databases for future querying, reliably backing up the events in the pipeline to a long term storage system like s3..etc can be powered

The message bus acts like a center place for systems to interact with one other and perform processing on events flowing in it

A Common design pattern that has emerged is of systems reading from the message bus. Applying transformations over the input event and writing it back to the message bus for another downstream system to read that and do it’s processing.

Hence, it is easy to build multi stage - linear - pipelines of processing events in the system

Where the systems processing the event have a single responsibility of altering the input message.

How complex can these Apps get ?

10

App Type

Use Cases

Some Existing Solutions

Real Time Processing (~ms)

Event delivery
Database backed, service Backed Enrichment of Messages

Spark Streaming, Samza, Kafka Streams

Worker Model of Processing (~mins)

Micro batching of Messages and processing

AWS Batch, Beanstalk, Spark Streaming

Data Sinks(~mins)

Long term storage of data

Apache Flume, Kafka Connect

*Batch Data Jobs(~hours)

Bulk processing of data (ex. Nightly Processing at day level)

Apache Spark, Map reduce etc.

Type 1 - RT processing

Simple Enrichment and forwarding
Delivering/Pushing events to other systems

Database backed

Client side caching

Scalable to number of partitions

Stream Joins

Kafka Streams, Samza...

Challenges

11

A good contract should look like model classes in codebase.

With interactions between different model classes are driven more by the responsibility

Example. Geo is a property of User and User is part of the final message

There can be stages in the pipelines who’s responsibility is enrichment of all of user struct or geo

Defining a cost model for data processing

12

Let’s take it backwards when defining a cost model

When taking an important decision based on the data generated from your systems, you have to be doubly sure about -

  • Is the data complete ?
  • Is it consistent with other data sources
  • If it is repossessed, Is that data reflective of that, are the SLA’s all met ?
  • If some downstream service was down for sometime, did that impact the final data ?

TL;DR - Comes down to meeting the SLA’s for data availability

With respect to pipelines...

13

The concerns should be -

  • How stable are the underlying systems which generate the data ?
  • How reliable is the final data even when working with unstable components ?

TL;DR - Comes down to meeting the SLA’s for data availability

Handle downtime of service

14

Services could/should/do go down !

What should a consumer of a service do under such a situation ? What if, it is a component of a data pipeline

Handle slow consumer apps

15

The silent problem in data pipelines

And the hardest to keep track, until data loss happens

Ever happened, that new release changed the message processing time from 5ms to 10ms

Leads to cascading failures of all systems

Ensuring 0 message loss

16

A general concern -

How do you build a system which continuously processes data with only a few hours of buffer for messages ?

It’s a milestone for a data organization to reach 0 data loss (atleast for us!)

Global view of the schema (basically, all the engineers in the org)

17

Setting the blueprint of your data (or, in simple words the data model )

  • It’s type, default value, placement in
    Data model

Something which is mandated/agreed up
from all of engineering teams

And so many ways we get it wrong !!

Tracking lineage

18

  • Track data flow and their dependencies

  • Build better data products

The next set of challenges in building pipelines?

19

  • Handle downtime of services
  • Handle slow consumer apps
  • Ensure there’s no message loss anywhere in pipeline
  • Have a global view of the schema of the messages

  • Handle Evolution of Schema
  • Tracking Lineage

COMPLEXITY

Broad categorization of the issues

20

Enrichment of Data

  • multiple stages of enrichment
  • Varying cost of enrichment and
    where is it placed in the pipeline

Contract Control and Evolution

  • Handling deprecation of fields
  • adding of new fields

COMPLEXITY

Issues that will be faced

Solutions

21

A good contract should look like model classes in codebase.

With interactions between different model classes are driven more by the responsibility

Example. Geo is a property of User and User is part of the final message

There can be stages in the pipelines who’s responsibility is enrichment of all of user struct or geo

Techniques for solving for enrichment related issues (or, stability related issues )

22

  • Fail Fast


  • Implement a Dead Letter Queue

  • Sink Messages at every stage

  • Have a batch reprocessing system for every app

Message buses generally solve the problem of de-coupling producers and consumer at a massive scale..

where you can have variety of consumers getting access to feeds of data and can do some form of processing over it.

Processing like enrichment, filtration, batch writing to databases for future querying, reliably backing up the events in the pipeline to a long term storage system like s3..etc can be powered

The message bus acts like a center place for systems to interact with one other and perform processing on events flowing in it

A Common design pattern that has emerged is of systems reading from the message bus. Applying transformations over the input event and writing it back to the message bus for another downstream system to read that and do it’s processing.

Hence, it is easy to build multi stage - linear - pipelines of processing events in the system

Where the systems processing the event have a single responsibility of altering the input message.

Need for data flow philosophies in these pipelines

23

To Solve for Enrichment Related Issues we also need to be more philosophical

  • Tracking message losses - Instead of dropping, marking filling flags to messages
  • Do all the CPU heavy processing as early as possible

  • Do enrichments as late as possible in the pipeline

Message buses generally solve the problem of de-coupling producers and consumer at a massive scale..

where you can have variety of consumers getting access to feeds of data and can do some form of processing over it.

Processing like enrichment, filtration, batch writing to databases for future querying, reliably backing up the events in the pipeline to a long term storage system like s3..etc can be powered

The message bus acts like a center place for systems to interact with one other and perform processing on events flowing in it

A Common design pattern that has emerged is of systems reading from the message bus. Applying transformations over the input event and writing it back to the message bus for another downstream system to read that and do it’s processing.

Hence, it is easy to build multi stage - linear - pipelines of processing events in the system

Where the systems processing the event have a single responsibility of altering the input message.

Techniques for solving schema control and evolution

24

Have a global schema

A good contract should look like model classes in codebase.

With interactions between different model classes are driven more by the responsibility

Example. Geo is a property of User and User is part of the final message

There can be stages in the pipelines who’s responsibility is enrichment of all of user struct or geo

What is a contract?

25

  • A good contract should look like model classes in codebase.

  • With interactions between different model classes are driven more by the responsibility

User contract @zapr

26

This is a simple modelling of the user contracts we have internally..they are like the model classes in code
Or like the normalised structures of a table

from a data pipeline perspective, having such a message flow in the pipelines, the responsibility of a step can be enrichment of one of more of these entities of an incoming message.

For example - there can be a stage where we only do geo enrichments over every message in the pipeline..

If there’s a service build to answer that , the response format is already decided for it and hence, there are no chance for errors

Similarly, a different kind of system..like a key value store can be used to get a users device object.
Overall, the different type of systems can be build to solve with responsibilities are not shared

Contracts Relationship Diagram

27

This is a simple modelling of the user contracts we have internally..they are like the model classes in code
Or like the normalised structures of a table

from a data pipeline perspective, having such a message flow in the pipelines, the responsibility of a step can be enrichment of one of more of these entities of an incoming message.

For example - there can be a stage where we only do geo enrichments over every message in the pipeline..

If there’s a service build to answer that , the response format is already decided for it and hence, there are no chance for errors

Similarly, a different kind of system..like a key value store can be used to get a users device object.
Overall, the different type of systems can be build to solve with responsibilities are not shared

Tools for writing global contracts - The Thrift IDL

Strong typing of fields

Renaming of fields and types

Bindings for all major languages

Very fast SerDe for messages

28

One of the methods available to model these structures is using thrift’s (Interface description language)

Thrift is a an apache projected donated by facebook to the apache foundation

It’s main goals is to build RPC’s with extendable message formats for requests and responses

We have skipped the RPC bit from thrift and only used thrift IDL as

The IDL allows for strong typing of fields, very fast serialization and deserialization of messages, bindings for all major languages like java,python, cpp

29

Tool

Description

In-depth

Thrift

complete RPC and serialization framework

20+ target languages and that number is still growing

Tag Based association of fields. tags and field types are stored in the binary encoding

Avro

You need to have the exact same version of the schema as the writer of the data used

Protobuf

Tag Based association of fields. tags and field types are stored in the binary encoding

Thrift, Protobuf and Avro all support Schema evolution

One of the methods available to model these structures is using thrift’s (Interface description language)

Thrift is a an apache projected donated by facebook to the apache foundation

It’s main goals is to build RPC’s with extendable message formats for requests and responses

We have skipped the RPC bit from thrift and only used thrift IDL as

The IDL allows for strong typing of fields, very fast serialization and deserialization of messages, bindings for all major languages like java,python, cpp

Long Term Support


No Data Cleaning Needed


Less Chances of error

Cost and time

30

Time to recover/reprocess data...with no double processing

Forward and Backward Compatibility

31

- Producers and Consumers work on different versions of the contract but are able to work well

- Forward compatibility - ensure data written today is able to be read and processed in future

- Backward compatibility - ensure past data read today is able to be read and processed

Message buses generally solve the problem of de-coupling producers and consumer at a massive scale..

where you can have variety of consumers getting access to feeds of data and can do some form of processing over it.

Processing like enrichment, filtration, batch writing to databases for future querying, reliably backing up the events in the pipeline to a long term storage system like s3..etc can be powered

The message bus acts like a center place for systems to interact with one other and perform processing on events flowing in it

A Common design pattern that has emerged is of systems reading from the message bus. Applying transformations over the input event and writing it back to the message bus for another downstream system to read that and do it’s processing.

Hence, it is easy to build multi stage - linear - pipelines of processing events in the system

Where the systems processing the event have a single responsibility of altering the input message.

Full Compatibility via Thrift

32

  • Ensure that the required fields are never changed/removed
  • The index tags are never changed for a field
  • This will lead to full compatibility between version x and x+1
  • Transitively, if this is followed for all versions.

All the data ever written will be Fully Compatible

Message buses generally solve the problem of de-coupling producers and consumer at a massive scale..

where you can have variety of consumers getting access to feeds of data and can do some form of processing over it.

Processing like enrichment, filtration, batch writing to databases for future querying, reliably backing up the events in the pipeline to a long term storage system like s3..etc can be powered

The message bus acts like a center place for systems to interact with one other and perform processing on events flowing in it

A Common design pattern that has emerged is of systems reading from the message bus. Applying transformations over the input event and writing it back to the message bus for another downstream system to read that and do it’s processing.

Hence, it is easy to build multi stage - linear - pipelines of processing events in the system

Where the systems processing the event have a single responsibility of altering the input message.

Take Away

33

One Contract for a pipeline - Easy to build

Each stage only takes responsibility of one component of the contract - Linear pipelines

The building blocks - structs - are re-used across other pipelines

34

Hence, there’s very less conflict over when or where to enrich a field

Pipelines build are linea,

Development ease

Predictability of where do add new enhancements - structurally

Predictability of who owns which part of the final struct

35

Covers both the IDL path as well as implementation part

Naturally Tracking Data Lineage

Tracking Data Lineage is a solved problem (if implemented all the previous things) as -

- Messages are already structured

  • Responsibility of ownership of a field is naturally captured in the thrift IDL file

36

Covers both the IDL path as well as implementation part

References

37

Questions

38

5th Elephant v1 - Agam - Google Slides