1 of 76

RxJava, Ratpack, Couchbase

Laurent Doguin

Couchbase Developer Advocate

@ldoguin

2 of 76

$whoami

2

Laurent Doguin

Couchbase Developer Advocate

@ldoguin | laurent.doguin@couchbase.com

©2015 Couchbase Inc.

2

3 of 76

Ratpack ?

4 of 76

Not an actor/singer club

©2015 Couchbase Inc.

4

5 of 76

Netty based, fullstack, non-blocking, web framework

©2015 Couchbase Inc.

5

6 of 76

In a Nutshell…

  • Asynchronous
  • Non-Blocking
  • Build on top of Netty
  • Unopinonated
  • Think about NodeJS or Vert.X
  • Promises
  • Java 8
  • Top Notch Gradle integration
  • Strong Groovy Support
  • Great for I/O Bound applications

©2015 Couchbase Inc.

6

7 of 76

Sync, blocking V.S. Async, non-blocking

©2015 Couchbase Inc.

7

8 of 76

Handlers

  • Core of Ratpack
  • This is where the magic happens
  • RatpackServer.start(
    • server -> server.handlers(
        • chain -> chain.all(
          • ctx -> ctx.render("Hello World")
        • )
    • )
  • );

©2015 Couchbase Inc.

8

9 of 76

Plugins and Modules

  • GUICE based Modules system
  • Can be switched to others
    • Spring Boot support
  • Module can be as low level as DI Framework
  • Think of it like decorations

©2015 Couchbase Inc.

9

10 of 76

Why Ratpack?

  • If you spend to much time on I/O
  • Code in an async non-blocking with a derminist/synchronous execution style

©2015 Couchbase Inc.

10

11 of 76

RxJava

12 of 76

Why RxJava

  • Blocking is Evil
    • Async is good, Better when
    • Reactive
    • Parallelizable
    • Composable
    • Readable

©2015 Couchbase Inc.

12

13 of 76

But I can do this already

  • Callbacks?
    • No composition
    • Callback Hell
  • Futures<T>?
    • ‘lol’
    • Too easy to block
    • Hard to compose

©2015 Couchbase Inc.

13

14 of 76

©2015 Couchbase Inc.

14

15 of 76

RxJava ?

  • Netflix OpenSource
  • From Iterator – Iterable (Pull)
  • To Observable – Observer(Push)

  • Allow to Compose
  • Asynchronous code
  • Based on events
  • Using Observable Sequences

©2015 Couchbase Inc.

15

16 of 76

Why Reactive?

  • New challenges
    • React to user load
    • React to failure
    • Be responsive under load and failure

  • Need new solutions
    • Decoupled, event-driven architectures
    • Optimal resource utilization

16

©2015 Couchbase Inc.

16

17 of 76

Why Reactive?

17

©2015 Couchbase Inc.

17

18 of 76

RxJava 101

A Gentle Introduction

19 of 76

RxJava: Introduction

  • Java implementation for Reactive Extensions �https://github.com/ReactiveX

  • A library to compose asynchronous and event-driven programs through observable sequences.

19

single

multiple

sync

T

Iterable<T>

async

Future<T>

Observable<T>

©2015 Couchbase Inc.

19

20 of 76

RxJava: Introduction

  • Observables are the duals of Iterables
  • They describe both Latency and Error side effects.

20

event

Iterable<T> (pull)

Observable<T> (push)

data retrieval

T next()

onNext(T)

error discovery

throws Exception

onError(Exception)

completion

returns

onCompleted()

©2015 Couchbase Inc.

20

21 of 76

Consuming Observables

  • The Observer subscribes �and receives events.

  • A cold Observable�starts when subscribed.

  • onNext can be called�0..N times

21

©2015 Couchbase Inc.

21

22 of 76

RxJava: Creating Observables

22

just

©2015 Couchbase Inc.

22

23 of 76

RxJava: Creating Observables

23

©2015 Couchbase Inc.

23

24 of 76

RxJava: Creating Observables

24

©2015 Couchbase Inc.

24

25 of 76

RxJava: Creating Observables

25

©2015 Couchbase Inc.

25

26 of 76

RxJava: Creating Observables

26

©2015 Couchbase Inc.

26

27 of 76

RxJava: Creating Observables

27

©2015 Couchbase Inc.

27

28 of 76

RxJava: Transforming Observables

28

©2015 Couchbase Inc.

28

29 of 76

RxJava: Transforming Observables

29

©2015 Couchbase Inc.

29

30 of 76

RxJava: Transforming Observables

30

©2015 Couchbase Inc.

30

31 of 76

RxJava: Transforming Observables

31

©2015 Couchbase Inc.

31

32 of 76

RxJava: Transforming Observables

32

©2015 Couchbase Inc.

32

33 of 76

RxJava: Transforming Observables

33

©2015 Couchbase Inc.

33

34 of 76

RxJava: Transforming Observables

34

©2015 Couchbase Inc.

34

35 of 76

RxJava: Transforming Observables

35

©2015 Couchbase Inc.

35

36 of 76

RxJava: Transforming Observables

36

©2015 Couchbase Inc.

36

37 of 76

RxJava: Filtering Observables

37

©2015 Couchbase Inc.

37

38 of 76

RxJava: Filtering Observables

38

©2015 Couchbase Inc.

38

39 of 76

RxJava: Filtering Observables

39

©2015 Couchbase Inc.

39

40 of 76

RxJava: Filtering Observables

40

©2015 Couchbase Inc.

40

41 of 76

Brain Break

©2015 Couchbase Inc.

41

42 of 76

Example

Store, index and search files

43 of 76

The Application

  • Using Ratpack as web framework
  • Using Couchbase as database
  • User can Upload a file
  • User can list all the files
  • User can search files
    • Using N1QL
    • Using fulltext search

©2015 Couchbase Inc.

43

44 of 76

About Couchbase

©2015 Couchbase Inc.

44

45 of 76

Couchbase Server – Single Node Architecture

  • Data Service – builds and maintains local view indexes

  • Indexing Engine – builds and maintains Global Secondary Indexes

  • Query Engine – plans, coordinates, and executes queries against either Global or Local view indexes

  • Cluster Manager – configuration, heartbeat, statistics, RESTful Management interface

©2015 Couchbase Inc.

45

46 of 76

Application to Database Integration

©2015 Couchbase Inc.

46

47 of 76

Smart Connectivity with Built in Sharding and Replication

ACTIVE

ACTIVE

ACTIVE

REPLICA

REPLICA

REPLICA

Couchbase Server 1

Couchbase Server 2

Couchbase Server 3

ACTIVE

ACTIVE

REPLICA

REPLICA

Couchbase Server 4

Couchbase Server 5

SHARD

5

SHARD

2

SHARD

SHARD

SHARD

4

SHARD

SHARD

SHARD

1

SHARD

3

SHARD

SHARD

SHARD

4

SHARD

1

SHARD

8

SHARD

SHARD

SHARD

SHARD

6

SHARD

3

SHARD

2

SHARD

SHARD

SHARD

SHARD

7

SHARD

9

SHARD

5

SHARD

SHARD

SHARD

SHARD

7

SHARD

SHARD

6

SHARD

SHARD

8

SHARD

9

SHARD

READ/WRITE/UPDATE

©2015 Couchbase Inc.

47

48 of 76

Accessing Data From Couchbase

Key access using Document ID

  • Operations are extremely fast with consistent low latency
  • Reads and writes are evenly distributed across Data Service nodes
  • Data is cached in built-in Managed Caching layer and stored in persistent storage layer

Queries using N1QL

  • SQL-like : SELECT * FROM WHERE, LIKE, GROUP, etc.,
  • JOINs
  • Powerful Extensions (nest, unnest) for JSON to support nested and hierarchical data structures.
  • Multiple access paths – Views and global secondary indexes
  • ODBC/JDBC drivers available

Views using static queries

  • Pre-computed complex Map-Reduce queries
  • Incrementally updated to power analytics, reporting and dashboards
  • Strong for complex custom aggregations

©2015 Couchbase Inc.

48

49 of 76

Building Applications with Java SDK – Synchronous API

//connecting to the cluster via known node(s)

CouchbaseCluster cluster = CouchbaseCluster.create("192.168.1.101");

//opening a bucket, establishing resources

Bucket bucket = cluster.openBucket("customBucket", "password");

//creating JSON and a Document

JsonObject json = JsonObject.create().put("name", "John");

JsonDocument doc = JsonDocument.create("key1", json);

//storing the Document

Document inDb = bucket.insert(doc);

©2015 Couchbase Inc.

49

50 of 76

Building Applications with Java SDK

public List<Map<String, Object>> function(Bucket bucket) {

String query = "SELECT * FROM `" + bucket.name() + “`”;

N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));

if (!result.finalSuccess()) {

throw new DataRetrievalFailureException("Query error: " + result.errors());

}

List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();

for(N1qlQueryRow row : result) {

content.add(row.value().toMap());

}

return content;

}

©2015 Couchbase Inc.

50

51 of 76

Building Applications with Java SDK

public List<Map<String, Object>> function(Bucket bucket) {

String query = "SELECT * FROM `" + bucket.name() + “`”;

N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));

if (!result.finalSuccess()) {

throw new DataRetrievalFailureException("Query error: " + result.errors());

}

List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();

for(N1qlQueryRow row : result) {

content.add(row.value().toMap());

}

return content;

}

©2015 Couchbase Inc.

51

52 of 76

Building Applications with Java SDK

public List<Map<String, Object>> function(Bucket bucket) {

String query = "SELECT * FROM `" + bucket.name() + “`”;

N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));

if (!result.finalSuccess()) {

throw new DataRetrievalFailureException("Query error: " + result.errors());

}

List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();

for(N1qlQueryRow row : result) {

content.add(row.value().toMap());

}

return content;

}

©2015 Couchbase Inc.

52

53 of 76

Building Applications with Java SDK

public List<Map<String, Object>> function(Bucket bucket) {

String query = "SELECT * FROM `" + bucket.name() + “`”;

N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));

if (!result.finalSuccess()) {

throw new DataRetrievalFailureException("Query error: " + result.errors());

}

List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();

for(N1qlQueryRow row : result) {

content.add(row.value().toMap());

}

return content;

}

©2015 Couchbase Inc.

53

54 of 76

Building Applications with Java SDK

public List<Map<String, Object>> function(Bucket bucket) {

String query = "SELECT * FROM `" + bucket.name() + “`”;

N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));

if (!result.finalSuccess()) {

throw new DataRetrievalFailureException("Query error: " + result.errors());

}

List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();

for(N1qlQueryRow row : result) {

content.add(row.value().toMap());

}

return content;

}

©2015 Couchbase Inc.

54

55 of 76

Building Applications with Java SDK

public List<Map<String, Object>> function(Bucket bucket) {

String query = "SELECT * FROM `" + bucket.name() + “`”;

N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));

if (!result.finalSuccess()) {

throw new DataRetrievalFailureException("Query error: " + result.errors());

}

List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();

for(N1qlQueryRow row : result) {

content.add(row.value().toMap());

}

return content;

}

©2015 Couchbase Inc.

55

56 of 76

Building Applications with Java SDK

public List<Map<String, Object>> function(Bucket bucket) {

String query = "SELECT * FROM `" + bucket.name() + “`”;

N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));

if (!result.finalSuccess()) {

throw new DataRetrievalFailureException("Query error: " + result.errors());

}

List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();

for(N1qlQueryRow row : result) {

content.add(row.value().toMap());

}

return content;

}

©2015 Couchbase Inc.

56

57 of 76

Complex N1QL Query

public static List<Map<String, Object>> getAll(final Bucket bucket, String from, String to) {

String queryStr = "SELECT faa AS fromAirport, geo " +

"FROM `" + bucket.name() + "` r" +

"WHERE airportname = " + from +

"UNION SELECT faa AS toAirport, geo " +

"FROM `" + bucket.name() + "` r" +

"WHERE airportname = " + to;

ParameterizedN1qlQuery query = ParameterizedN1qlQuery.parameterized(queryStr,

JsonArray.create().add(from).add(to));

N1qlQueryResult queryResult = bucket.query(query);

return extractResultOrThrow(queryResult);

}

©2015 Couchbase Inc.

57

58 of 76

Building Applications with Java SDK - Asynchronous API

  • The Cluster and Bucket both have async versions, obtained by calling async() method.

  • Asynchronous API exposes RxJava Observables.

  • Very rich and expressive API in terms of combinations and transformations.

©2015 Couchbase Inc.

58

59 of 76

Building Applications with Java SDK - Asynchronous API

//retrieving a document and extracting data for output

bucket.async()

.get("key1")

.map(doc -> doc.content().getString("name"))

.subscribe(name -> System.out.println("Hello " + name))

©2015 Couchbase Inc.

59

60 of 76

Building Applications with Java SDK - Asynchronous API

  • Async API, exposing an Observable<JsonDocument>
  • Observable is a stream, can be connected to an Observer

//retrieving a document and extracting data for output

bucket.async()

.get("key1")

.map(doc -> doc.content().getString("name"))

.subscribe(name -> System.out.println("Hello " + name))

©2015 Couchbase Inc.

60

61 of 76

Building Applications with Java SDK - Asynchronous API

  • Simple transformation operator from RxJava, T->R
  • Gets a JsonDocument
  • Extract String name value
  • Gives Observable<String>

//retrieving a document and extracting data for output

bucket.async()

.get("key1")

.map(doc -> doc.content().getString("name"))

.subscribe(name -> System.out.println("Hello " + name))

©2015 Couchbase Inc.

61

62 of 76

Error Handling

with RxJava

63 of 76

Timeouts

©2015 Couchbase Inc.

63

64 of 76

Timeouts

©2015 Couchbase Inc.

64

65 of 76

Circuit Breakers

  • monitor traffic
  • open if errors happen
    • Latency
    • Throughput
    • Wrong results

  • close in a controlled�fashion
  • expose metrics

©2015 Couchbase Inc.

65

66 of 76

Backpressure

  • Allows for coordinated flow control under stress conditions

©2015 Couchbase Inc.

66

67 of 76

Preparing to Fail

  • Things will go wrong, so better plan for it
  • Do not trust integration points (including the SDK)
  • Synchronous retry & fallback is too damn hard

  • RxJava provides (almost) everything you need to
    • fallback
    • retry
    • fail fast

67

©2015 Couchbase Inc.

67

68 of 76

Timeouts

  • The network is unreliable
  • Servers fail
  • The SDK contains bugs

  • Always specify timeouts and deal with them!
  • The synchronous wrapper defines them for you all the time.

68

©2015 Couchbase Inc.

68

69 of 76

Timeouts: Simple

69

©2015 Couchbase Inc.

69

70 of 76

Timeouts: Synchronous API

70

©2015 Couchbase Inc.

70

71 of 76

Timeouts: Complex Example

71

©2015 Couchbase Inc.

71

72 of 76

Coordinated Retry

  • Fail fast
    • Don’t let your system get stuck
    • Shed load with backpressure

  • Retry
    • immediately won’t help
    • either linear or exponential back-off necessary
    • have a strategy if retry also doesn’t work (Fallbacks!)

72

©2015 Couchbase Inc.

72

73 of 76

Coordinated Fallback

73

©2015 Couchbase Inc.

73

74 of 76

Coordinated Retry: Builder

  • Declarative API instead of complicated retryWhen

74

©2015 Couchbase Inc.

74

75 of 76

Code

©2015 Couchbase Inc.

75

76 of 76

Where do you find us?

  • developer.couchbase.com
  • blog.couchbase.com
  • @couchbase or @couchbasedev
  • forums.couchbase.com
  • stackoverflow.com/questions/tagged/couchbase

©2015 Couchbase Inc.

76