1 of 77

サーバーサイドでの非同期処理で色々やったよ

Koji Lin, LINE Fukuoka

@kojilin

2 of 77

自己紹介

  • LINE Fukuoka Corp
    • Java でサーバーサイド開発
  • Taiwan Java User Group メンバー
    • https://www.meetup.com/taiwanjug/

3 of 77

4 of 77

全てがシンプルだった

List<Item> getRanking(String country) {

String rankingType = dao.getRankingType("JP");

return dao.getRanking(rankingType).stream()

.map(item -> ...)

.collect(toList());

}

5 of 77

6 of 77

複雑化していくシステム

  • パフォーマンスや機能の追加/複雑さを軽減するため、いろんなサービス/ミドルウェア/チームに分ける

List<Item> getRanking(String country) {

String rankingType a api.getRankingType("JP");

List<String> rankingIds =

searchClient.getRanking(rankingType);

return rankingIds.stream()

.map(dao::getItem)

.map(...)

.collect(toList());

}

7 of 77

Latency が上がっていく

  • いろんな API やミドルウェア等のアクセスが増え、各リモートのラウンドトリップの積み重ね
  • リクエストスレッドがブロックされて、軽いリクエストも影響を受ける

8 of 77

何故サーバーサイドで非同期?

  • 並行できるタスクも自然に順序に書いてしまう
  • 1 スレッド 1 リクエスト
    • 同時処理数 <= 最高スレッド数
    • 重い処理が軽い処理をブロック
    • CPU とメモリの無駄遣い
      • デフォルトの -Xss は 1024KB
      • コンテキストスイッチのコスト

9 of 77

同期なコードを非同期に書き直す

  • フレームワークの変更
  • 戻り値の型は Guava ListenableFuture を選択
    • CompletableFuture に対応するライブラリがまだ少ない
    • Futures#transform で非同期が組み合わせれる
    • Dagger で非同期 DI が利用できる
  • ストレージアクセス
  • リモート API アクセス

10 of 77

フレームワークを社内製品へ

  • RESTful と Thrift RPC のエンドポイントを提供
    • Spring Web/Spark と Facebook Nifty を使っていた
  • 社内製でオープンソースの Armeria に移行
    • https://github.com/line/armeria
    • Netty ベース HTTP/2 対応の非同期 RPC/REST library
  • 実際 Nifty + swift で非同期も可能です
    • https://github.com/facebook/swift
    • その swift ではない !

11 of 77

REST Controller も非同期へ

  • 全てのフレームワークを Spring Web
  • 一部同期の Controller から非同期
    • Spring Web の DeferredResult<T>

@RequestMapping("/hello")

public DefferredResult<String> hello() {

DeferredResult<String> deferredResult = new

DeferredResult<>();

... // callback で deferredResult.setResult("hello");

return deferredResult;

}

12 of 77

Thrift とは?

  • RPC フレームワーク
  • .thrift の IDL を定義
  • Thrift Compiler で対応の言語のコードを生成
  • ロジックを入れて、サポートしてるライブラリ上でデプロイすれば良い

service HelloService{

string hello(1: string name)

}

13 of 77

同期の Iface から非同期の AsyncIface へ

@Override

public String hello(String name) {

return "Hello, " + name + '!';

}

@Override

public void hello(String name,

AsyncMethodCallback<String> resultHandler) {

resultHandler.onComplete("Hello, " + name + '!');

}

14 of 77

ストレージアクセス

  • MySQL
    • MyBatis
    • Guava ListeningExecutorService と組み合わせる
      • Async JDBC 自体がまだ無い
  • MongoDB
    • MongoDB Asynchronous Java Driver に変更

15 of 77

MyBatis の API

<E> List<E> selectList(String statement,

Object parameter);

16 of 77

JDBC のアクセスを非同期に

ListeningExecutorService executor;

public <E> ListenableFuture<List<E>> selectList(

String statement, Object parameter) {

return executor.submit(() ->

delegate.selectList(statement, parameter));

}

17 of 77

MongoDB のアクセスを非同期に

private MongoCollection<Model> collection;

public void list(String id, int offset, int limit,

SingleResultCallback<Model> callback) {

collection.find(eq(ID, id))

.skip(offset)

.limit(limit).into(list, callback));

}

18 of 77

MongoDB を ListenableFuture に

public ListenableFuture<List<Model>> list(

String id, int offset, int limit) {

SettableFuture<List<Model>> future =

SettableFuture.create();

collection.find(eq(ID, id))

.skip(offset)

.limit(limit).into(list, (result, t) -> {

if (t != null) { future.setException(t); }

else { future.set(result); }

});

return future;

}

19 of 77

リモート API アクセス

  • Apache HttpComponents から Armeria の HttpClient へ
    • Apache HttpComponents にも Async Client がある
  • REST API が多すぎるので、Retrofit を利用して、ネットワーク層は Armeria の HttpClient

20 of 77

Retrofit と併用

  • Retrofit で API を Java コードへマッピングする�

public interface GitHubService {

@GET("users/{user}/repos")

Call<List<Repo>> listRepos(@Path("user") String user);

}

Retrofit retrofit = new Retrofit.Builder()

.baseUrl("https://api.github.com/")

.build();

GitHubService service = � retrofit.create(GitHubService.class);

21 of 77

Retrofit と GuavaCallAdapterFactory

  • Retrofit は戻り値の型を拡張できる�

public interface GitHubService {

@GET("users/{user}/repos")

ListenableFuture<List<Repo>> listRepos(

@Path("user") String user);

}

Retrofit retrofit = new Retrofit.Builder()

.baseUrl("https://api.github.com/")

.addCallAdapterFactory(

GuavaCallAdapterFactory.create())

.build();

22 of 77

IO 関連のアクセスを全て非同期化

23 of 77

Guava の transform で組み合わせ

ListenableFuture<String> result =

FuturesExtra.syncTransform(

dao.getUser("id"),

user -> user.getName());

ListenableFuture<Image> result =

Futures.transformAsync(

dao.getUser("id"),

user -> apiClient.getIcon(user));

24 of 77

Concurrent 化

  • Zipkin でアクセスチェック

25 of 77

移行で大変だったとこ

  • 同期から非同期の慣れ、特にコードがどっちも併存している時、Future#get に逃げやすい
    • 非同期で event-loop、thread-pool を使うので、小さなブロックもパフォーマンスに影響が出る
    • リクエストからレスポンスまでの完全な非同期コードを準備する
    • コードレビューを頑張る

26 of 77

移行で大変だったとこ

  • 次の非同期タスクの発火スレッドは?
    • リクエスト関連の情報をフレームワークのイベントループ ThreadLocal を多用している
    • デフォルトでは今のスレッドか前のタスクのスレッドを使う
    • transform メソッドで executor を設定する

Futures.transformAsync(

dao.getUser("id"),

user -> apiClient.getIcon(user),

executor);

27 of 77

その他

  • Spotify の Futures-extra を多用
    • https://github.com/spotify/futures-extra
    • Guava 19 で transform メソッドのオーバロディングでコンパイルウォーニングがめんどくさい
  • AsyncRetrier と ConcurrencyLimiter も便利

28 of 77

AsyncRetrier

int retryCount = 3;

int delayMillis = 100;

AsyncRetrier retrier = AsyncRetrier.create(

Executors.newSingleThreadScheduledExecutor());

ListenableFuture<List<String>> listFuture =

retrier.retry(() -> api.listByRanking("JP"),

retryCount,

delayMillis);

29 of 77

ConcurrencyLimiter

  • 非同期化で一気にリモートアクセスが一杯流せてリソースを喰いつくすのを防ぐ

int maxConcurrentCount = 100;

int maxQueueSize = 1000;

ConcurrencyLimiter<List<String>> concurrencyLimiter =

ConcurrencyLimiter.create(maxConcurrentCount,

maxQueueSize);

ListenableFuture<List<String>> listFuture =

concurrencyLimiter.add(() ->

dao.listByRanking("JP"));

30 of 77

これが複雑になっていくと!?

Futures.transformAsync(

Futures.transformAsync(getRankingType("JP"),

dao::listByRanking,

executor),

ids -> Futures.allAsList(ids.stream()

.map(client::getUserById)

.collect(toList())),

executor);

31 of 77

これが複雑になっていくと!?

Futures.transformAsync(

Futures.transformAsync(getRankingType("JP"),

dao::listByRanking,

executor),

ids -> Futures.allAsList(ids.stream()

.map(client::getUserById)

.collect(toList())),

executor);

32 of 77

これが複雑になっていくと!?

Futures.transformAsync(

Futures.transformAsync(getRankingType("JP"),

dao::listByRanking,

executor),

ids -> Futures.allAsList(ids.stream()

.map(client::getUserById)

.collect(toList())),

executor);

33 of 77

これが複雑になっていくと!?

Futures.transformAsync(

Futures.transformAsync(getRankingType("JP"),

dao::listByRanking,

executor),

ids -> Futures.allAsList(ids.stream()

.map(client::getUserById)

.collect(toList())),

executor);

34 of 77

Dagger Producers で複雑さを軽減

  • Dagger
    • コンパイル時依存性を解決する DI フレームワーク
  • Dagger Producers
    • 非同期な DI を実現
    • メソッドのリターンタイプを ListenableFuture<T> にして、受取メソッドのパラメータを T にすると Dagger がよしなに組み合わせてくれる

35 of 77

@ProducerModule

public static class RankingGraph {

@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })

interface Component {

ListenableFuture<List<Item>> getRanking();

}

public RankingGraph(Service service, String country) {

...

}

@Produces

public ListenableFuture<String> getRankingType() {

return service.getRankingType(country);

}

@Produces

public ListenableFuture<List<String>> listByRanking(String type) {

return service.listByRanking(type);

}

@Produces

public ListenableFuture<List<Item>> listUsers(List<String> ids) {

return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));

}

}

36 of 77

@ProducerModule

public static class RankingGraph {

@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })

interface Component {

ListenableFuture<List<Item>> getRanking();

}

public RankingGraph(Service service, String country) {

...

}

@Produces

public ListenableFuture<String> getRankingType() {

return service.getRankingType(country);

}

@Produces

public ListenableFuture<List<String>> listByRanking(String type) {

return service.listByRanking(type);

}

@Produces

public ListenableFuture<List<Item>> listUsers(List<String> ids) {

return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));

}

}

37 of 77

@ProducerModule

public static class RankingGraph {

@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })

interface Component {

ListenableFuture<List<Item>> getRanking();

}

public RankingGraph(Service service, String country) {

...

}

@Produces

public ListenableFuture<String> getRankingType() {

return service.getRankingType(country);

}

@Produces

public ListenableFuture<List<String>> listByRanking(String type) {

return service.listByRanking(type);

}

@Produces

public ListenableFuture<List<Item>> listUsers(List<String> ids) {

return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));

}

}

38 of 77

@ProducerModule

public static class RankingGraph {

@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })

interface Component {

ListenableFuture<List<Item>> getRanking();

}

public RankingGraph(Service service, String country) {

...

}

@Produces

public ListenableFuture<String> getRankingType() {

return service.getRankingType(country);

}

@Produces

public ListenableFuture<List<String>> listByRanking(String type) {

return service.listByRanking(type);

}

@Produces

public ListenableFuture<List<Item>> listUsers(List<String> ids) {

return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));

}

}

39 of 77

Dagger が生成したコード

40 of 77

41 of 77

呼び出しコード

ListenableFuture<List<Item>> result =

DaggerRankingGraph_Component

.builder()

.rankingGraph(new RankingGraph(service, "JP"))

.build()

.getRanking();

42 of 77

Dagger を使ったメリット

  • transformAsync 等でのネストが減った
  • 発火スレッドが全て ExecutorModule で指定した executor で始まる
  • Convention 化し易い
  • メソッド毎にバラバラで書いても、コンパイルタイムで揃ってるか検査してくれる
  • 実際 Guava ドキュメントも勧めてる(?)

43 of 77

これで一通り完成

44 of 77

Thread 数の減少

45 of 77

Latency の改善

46 of 77

Latency の改善

47 of 77

CPU 利用率の改善

48 of 77

CompletableFutureが増えてきた

  • CompletableFuture は Java 8 で追加されてるので、�ライブラリ開発者は優先に使い始める
  • けどコードベースはほとんど Guava ListenableFuture
  • Dagger もがんがん使っている
  • Spotify の Future-extra の CompletableFutureExtra で ListenableFuture に変換

49 of 77

なぜ RxJava2 に移行した?

  • ListenableFuture だけで組み合わせが書きやすくない
    • Guava 23 で FluentFuture がある
  • Dagger Producers
    • Module の再利用が大変だった
    • 微妙に読みやすくない
  • RxJava2 がそろそろ安定してそうだった

50 of 77

RxJava2

  • Java VM implementation of Reactive Extensions
  • A library for composing asynchronous and event-based programs by using observable sequences.

51 of 77

RxJava2

  • Single<T>
    • 1 個のデータ
    • CompletableFuture<T> で中身は絶対 null ではない
  • Maybe<T>
    • 空っぽか1個のデータ
    • CompletableFuture<T> で中身は null かも知れない
  • Completable
    • 空っぽ
    • CompletableFuture<Void>

52 of 77

RxJava2

  • Observable<T>
    • 0 から n 個のデータ
    • backpressure なし
    • 基本サーバサイドでは使わない
  • Flowable<T>
    • 0 から n 個のデータ
    • backpressure あり

53 of 77

RxJava2 でサーバサイド

  • サーバでリモートアクセスは基本 1 リクエスト/1レスポンスなので、Single<T>、Maybe<T> と Completable で API を設計できる

54 of 77

RxJava2 で設計した API

class UserDao {

public Single<User> get(String id){...}

public Maybe<User> find(String id){...}

public Completable delete(String id){...}

public Flowable<User> listAll(){...}

public Single<List<User>> listAllSingle(){...}

}

55 of 77

JDBC のアクセスを RxJava2 に

ListeningExecutorService asyncExecutor;

public <E> Single<List<E>> selectListRx(

String statement, Object parameter) {

return toSingle(asyncExecutor.submit(() ->

delegate.selectList(statement, parameter)));

}

56 of 77

MongoDB を RxJava2 に

Public Single<List<Model>> listRx(

String id, int offset, int limit) {

SettableFuture<List<Model>> future =

SettableFuture.create();

collection.find(eq(ID, id))

.skip(offset)

.limit(limit).into(list, (result, t) -> {

if (t != null) { future.setException(t); }

else { future.set(result); }

});

return toSingle(future);

}

57 of 77

MongoDB を RxJava2 に

  • 実際 MongoDB には reactive extension 対応の Driver があるので、それを RxJava2 化すればいい

Public Single<List<Model>> listRx(

String id, int offset, int limit) {

return Flowable.fromPublisher(

collection.find(eq(ID, id))

.skip(offset)

.limit(limit))

.toList();

}

58 of 77

Retrofit と RxJava2CallAdapterFactory

  • Retrofit の戻り値の型をRxJava2 に�

public interface GitHubService {

@GET("users/{user}/repos")

Single<List<Repo>> listRepos(

@Path("user") String user);

}

Retrofit retrofit = new Retrofit.Builder()

.baseUrl("https://api.github.com/")

.addCallAdapterFactory(

RxJava2CallAdapterFactory.create())

.build();

59 of 77

非同期の組み合わせ

Single<List<User>> ranking =

api.getRankingType("JP")

// Single<List<String>>

.flatMapSingle(this::listByRanking)

// Flowable<String>

.flatterAsFlowable(list -> list)

.concatMapEager(this::getUserById)

.toList();

60 of 77

@ProducerModule

public static class RankingGraph {

@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })

interface Component {

ListenableFuture<List<Item>> getRanking();

}

public RankingGraph(Service service, String country) {

...

}

@Produces

public ListenableFuture<String> getRankingType() {

return service.getRankingType(country);

}

@Produces

public ListenableFuture<List<String>> listByRanking(String type) {

return service.listByRanking(type);

}

@Produces

public ListenableFuture<List<Item>> listUsers(List<String> ids) {

return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));

}

}

ListenableFuture<List<Item>> result =

DaggerRankingGraph_Component

.builder()

.rankingGraph(new RankingGraph(service, "JP"))

.build()

.getRanking();

61 of 77

リトライ

api.listByRanking("JP")

.retry(10)

.flatMap({

...

})... //

62 of 77

リトライ

api.listByRanking("JP")

.retryWhen(throwableFlowable -> {

return throwableFlowable.flatMap(thrown -> {

if (thrown instanceof IOException) {

return Flowable.timer(1000,

MILLISECONDS);

}

return Flowable.error(thrown);

}).flatMap({

...

})... //

63 of 77

移行で大変だったとこ

  • ListenableFuture と Dagger で慣れ始めたのに、またかよ...
    • RxJava2 は Stream API な感じでつなげていけるので、できれば同じような感覚で開発してもらいたい
    • Project Reactor とかもあるし、似てるようなプログラミング手法がでてくる

64 of 77

移行で大変だったとこ

  • RxJava は null を容赦しない !
    • Flowable/Single/Maybe に null を入れたら、NPE !

Single.just("koji")

.map(id -> null) // NPE !!!

...//

65 of 77

移行で大変だったとこ

  • Eager vs Lazy
    • Future<User> getUser(String id)
      • 呼んだ瞬間に発火
    • Single<User> getUser(String id)
      • 戻り値に subscribe した時に発火
      • でも RxJava2 と Future 変換があるので、実はそうでもない

66 of 77

移行で大変だったとこ

  • subscribe は絶対 Spring web controller と thrift handler で呼ぶ
    • 他の層での subscribeOn は基本出現しない方向

67 of 77

移行で大変だったとこ

  • Flowable の flatMap vs concatEagerMap
    • 順番守りたいなら concatEagerMap

http://www.nurkiewicz.com/2017/08/flatmap-vs-concatmap-vs-concatmapeager.html

Flowable.just("koji", "kishida", "tempo")

.flatMapSingle(id -> api.fetchUser(id))

...//

Flowable.just("koji", "kishida", "tempo")

.concatMapEager(id ->

api.fetchUser(id).toFlowable())

...//

68 of 77

移行で大変だったとこ

  • 複数回 subscribe 問題

Single<User> user = client.getUser("1234");

Single<Profile> first = user.map(...)...;

Single<Company> second = user.map(...)...;

Single.zip(first, second, (profile, address) -> {

...

})...;

69 of 77

移行で大変だったとこ

  • 複数回 subscribe 問題

Single<User> user = client.getUser("1234").cache();

Single<Profile> first = user.map(...)...;

Single<Company> second = user.map(...)...;

Single.zip(first, second, (profile, address) -> {

...

})...;

70 of 77

移行で大変だったとこ

  • 同じく発火スレッド問題

api.getRankingType("JP")

.flatMap(type -> getRanking(type)) // どの thread?

.flatMap(ids -> xxx) // どの thread?

.map(...)

.... // 色々

71 of 77

移行で大変だったとこ

  • 次に非同期がある前に必ず observeOn

api.getRankingType("1234")

.ovserveOn(Schedulers.from(executor))

.flatMap(type -> getRanking(type))

.ovserveOn(Schedulers.from(executor))

.flatMap(ids -> xxx)

.map(...) // map 且つ重くないならいらない

.ovserveOn(Schedulers.from(executor))

  .flatMap(...)

.... // 色々

72 of 77

その他

  • RxJava2 と Java8 CompletableFuture の変換ライブラリ
    • akarnokd/RxJava2Jdk8Interop
  • Debugging と色々な便利 operators/transformers
    • akarnokd/RxJava2Extensions

73 of 77

例えば filterAsync

Flowable.just(...)

.compose(FlowableTransformers.filterAsync(

user -> {

return dao.isAvailable(user);

}))

.map(...)

...

74 of 77

これから...

  • Java には extension method みたいなものがないので、 filterAsync みたいな compose でするしかない
  • 複雑な組み合わせ以外は、async/await みたいなものがほしい
  • ……..kotlin かな?

75 of 77

資料とか

76 of 77

  • Reactive Programming with RxJava

  • Going Reactive with Spring 5 & Project Reactor
    • Devoxx youtube: https://youtu.be/yAXgkSlrmBA

77 of 77

Q&A