RxJava, Ratpack, Couchbase
Laurent Doguin
Couchbase Developer Advocate
@ldoguin
$whoami
2
©2015 Couchbase Inc.
2
Ratpack ?
Not an actor/singer club
©2015 Couchbase Inc.
4
Netty based, fullstack, non-blocking, web framework
©2015 Couchbase Inc.
5
In a Nutshell…
©2015 Couchbase Inc.
6
Sync, blocking V.S. Async, non-blocking
©2015 Couchbase Inc.
7
Handlers
©2015 Couchbase Inc.
8
Plugins and Modules
©2015 Couchbase Inc.
9
Why Ratpack?
©2015 Couchbase Inc.
10
RxJava
Why RxJava
©2015 Couchbase Inc.
12
But I can do this already
©2015 Couchbase Inc.
13
©2015 Couchbase Inc.
14
RxJava ?
©2015 Couchbase Inc.
15
Why Reactive?
16
©2015 Couchbase Inc.
16
Why Reactive?
17
©2015 Couchbase Inc.
17
RxJava 101
A Gentle Introduction
RxJava: Introduction
19
| single | multiple |
sync | T | Iterable<T> |
async | Future<T> | Observable<T> |
©2015 Couchbase Inc.
19
RxJava: Introduction
20
event | Iterable<T> (pull) | Observable<T> (push) |
data retrieval | T next() | onNext(T) |
error discovery | throws Exception | onError(Exception) |
completion | returns | onCompleted() |
©2015 Couchbase Inc.
20
Consuming Observables
21
©2015 Couchbase Inc.
21
RxJava: Creating Observables
22
just
©2015 Couchbase Inc.
22
RxJava: Creating Observables
23
©2015 Couchbase Inc.
23
RxJava: Creating Observables
24
©2015 Couchbase Inc.
24
RxJava: Creating Observables
25
©2015 Couchbase Inc.
25
RxJava: Creating Observables
26
©2015 Couchbase Inc.
26
RxJava: Creating Observables
27
©2015 Couchbase Inc.
27
RxJava: Transforming Observables
28
©2015 Couchbase Inc.
28
RxJava: Transforming Observables
29
©2015 Couchbase Inc.
29
RxJava: Transforming Observables
30
©2015 Couchbase Inc.
30
RxJava: Transforming Observables
31
©2015 Couchbase Inc.
31
RxJava: Transforming Observables
32
©2015 Couchbase Inc.
32
RxJava: Transforming Observables
33
©2015 Couchbase Inc.
33
RxJava: Transforming Observables
34
©2015 Couchbase Inc.
34
RxJava: Transforming Observables
35
©2015 Couchbase Inc.
35
RxJava: Transforming Observables
36
©2015 Couchbase Inc.
36
RxJava: Filtering Observables
37
©2015 Couchbase Inc.
37
RxJava: Filtering Observables
38
©2015 Couchbase Inc.
38
RxJava: Filtering Observables
39
©2015 Couchbase Inc.
39
RxJava: Filtering Observables
40
©2015 Couchbase Inc.
40
Brain Break
©2015 Couchbase Inc.
41
Example
Store, index and search files
The Application
©2015 Couchbase Inc.
43
About Couchbase
©2015 Couchbase Inc.
44
Couchbase Server – Single Node Architecture
©2015 Couchbase Inc.
45
Application to Database Integration
©2015 Couchbase Inc.
46
Smart Connectivity with Built in Sharding and Replication
ACTIVE
ACTIVE
ACTIVE
REPLICA
REPLICA
REPLICA
Couchbase Server 1
Couchbase Server 2
Couchbase Server 3
ACTIVE
ACTIVE
REPLICA
REPLICA
Couchbase Server 4
Couchbase Server 5
SHARD
5
SHARD
2
SHARD
SHARD
SHARD
4
SHARD
SHARD
SHARD
1
SHARD
3
SHARD
SHARD
SHARD
4
SHARD
1
SHARD
8
SHARD
SHARD
SHARD
SHARD
6
SHARD
3
SHARD
2
SHARD
SHARD
SHARD
SHARD
7
SHARD
9
SHARD
5
SHARD
SHARD
SHARD
SHARD
7
SHARD
SHARD
6
SHARD
SHARD
8
SHARD
9
SHARD
READ/WRITE/UPDATE
©2015 Couchbase Inc.
47
Accessing Data From Couchbase
Key access using Document ID
Queries using N1QL
Views using static queries
©2015 Couchbase Inc.
48
Building Applications with Java SDK – Synchronous API
//connecting to the cluster via known node(s)
CouchbaseCluster cluster = CouchbaseCluster.create("192.168.1.101");
//opening a bucket, establishing resources
Bucket bucket = cluster.openBucket("customBucket", "password");
//creating JSON and a Document
JsonObject json = JsonObject.create().put("name", "John");
JsonDocument doc = JsonDocument.create("key1", json);
//storing the Document
Document inDb = bucket.insert(doc);
©2015 Couchbase Inc.
49
Building Applications with Java SDK
public List<Map<String, Object>> function(Bucket bucket) {
String query = "SELECT * FROM `" + bucket.name() + “`”;
N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));
if (!result.finalSuccess()) {
throw new DataRetrievalFailureException("Query error: " + result.errors());
}
List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();
for(N1qlQueryRow row : result) {
content.add(row.value().toMap());
}
return content;
}
©2015 Couchbase Inc.
50
Building Applications with Java SDK
public List<Map<String, Object>> function(Bucket bucket) {
String query = "SELECT * FROM `" + bucket.name() + “`”;
N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));
if (!result.finalSuccess()) {
throw new DataRetrievalFailureException("Query error: " + result.errors());
}
List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();
for(N1qlQueryRow row : result) {
content.add(row.value().toMap());
}
return content;
}
©2015 Couchbase Inc.
51
Building Applications with Java SDK
public List<Map<String, Object>> function(Bucket bucket) {
String query = "SELECT * FROM `" + bucket.name() + “`”;
N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));
if (!result.finalSuccess()) {
throw new DataRetrievalFailureException("Query error: " + result.errors());
}
List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();
for(N1qlQueryRow row : result) {
content.add(row.value().toMap());
}
return content;
}
©2015 Couchbase Inc.
52
Building Applications with Java SDK
public List<Map<String, Object>> function(Bucket bucket) {
String query = "SELECT * FROM `" + bucket.name() + “`”;
N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));
if (!result.finalSuccess()) {
throw new DataRetrievalFailureException("Query error: " + result.errors());
}
List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();
for(N1qlQueryRow row : result) {
content.add(row.value().toMap());
}
return content;
}
©2015 Couchbase Inc.
53
Building Applications with Java SDK
public List<Map<String, Object>> function(Bucket bucket) {
String query = "SELECT * FROM `" + bucket.name() + “`”;
N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));
if (!result.finalSuccess()) {
throw new DataRetrievalFailureException("Query error: " + result.errors());
}
List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();
for(N1qlQueryRow row : result) {
content.add(row.value().toMap());
}
return content;
}
©2015 Couchbase Inc.
54
Building Applications with Java SDK
public List<Map<String, Object>> function(Bucket bucket) {
String query = "SELECT * FROM `" + bucket.name() + “`”;
N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));
if (!result.finalSuccess()) {
throw new DataRetrievalFailureException("Query error: " + result.errors());
}
List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();
for(N1qlQueryRow row : result) {
content.add(row.value().toMap());
}
return content;
}
©2015 Couchbase Inc.
55
Building Applications with Java SDK
public List<Map<String, Object>> function(Bucket bucket) {
String query = "SELECT * FROM `" + bucket.name() + “`”;
N1qlQueryResult result = bucket.query(N1qlQuery.simple(query));
if (!result.finalSuccess()) {
throw new DataRetrievalFailureException("Query error: " + result.errors());
}
List<Map<String, Object>> content = new ArrayList<Map<String, Object>>();
for(N1qlQueryRow row : result) {
content.add(row.value().toMap());
}
return content;
}
©2015 Couchbase Inc.
56
Complex N1QL Query
public static List<Map<String, Object>> getAll(final Bucket bucket, String from, String to) {
String queryStr = "SELECT faa AS fromAirport, geo " +
"FROM `" + bucket.name() + "` r" +
"WHERE airportname = " + from +
"UNION SELECT faa AS toAirport, geo " +
"FROM `" + bucket.name() + "` r" +
"WHERE airportname = " + to;
ParameterizedN1qlQuery query = ParameterizedN1qlQuery.parameterized(queryStr,
JsonArray.create().add(from).add(to));
N1qlQueryResult queryResult = bucket.query(query);
return extractResultOrThrow(queryResult);
}
©2015 Couchbase Inc.
57
Building Applications with Java SDK - Asynchronous API
©2015 Couchbase Inc.
58
Building Applications with Java SDK - Asynchronous API
//retrieving a document and extracting data for output
bucket.async()
.get("key1")
.map(doc -> doc.content().getString("name"))
.subscribe(name -> System.out.println("Hello " + name))
©2015 Couchbase Inc.
59
Building Applications with Java SDK - Asynchronous API
//retrieving a document and extracting data for output
bucket.async()
.get("key1")
.map(doc -> doc.content().getString("name"))
.subscribe(name -> System.out.println("Hello " + name))
©2015 Couchbase Inc.
60
Building Applications with Java SDK - Asynchronous API
//retrieving a document and extracting data for output
bucket.async()
.get("key1")
.map(doc -> doc.content().getString("name"))
.subscribe(name -> System.out.println("Hello " + name))
©2015 Couchbase Inc.
61
Error Handling
with RxJava
Timeouts
©2015 Couchbase Inc.
63
Timeouts
©2015 Couchbase Inc.
64
Circuit Breakers
©2015 Couchbase Inc.
65
Backpressure
©2015 Couchbase Inc.
66
Preparing to Fail
67
©2015 Couchbase Inc.
67
Timeouts
68
©2015 Couchbase Inc.
68
Timeouts: Simple
69
©2015 Couchbase Inc.
69
Timeouts: Synchronous API
70
©2015 Couchbase Inc.
70
Timeouts: Complex Example
71
©2015 Couchbase Inc.
71
Coordinated Retry
72
©2015 Couchbase Inc.
72
Coordinated Fallback
73
©2015 Couchbase Inc.
73
Coordinated Retry: Builder
74
©2015 Couchbase Inc.
74
Code
©2015 Couchbase Inc.
75
Where do you find us?
©2015 Couchbase Inc.
76