サーバーサイドでの非同期処理で色々やったよ
Koji Lin, LINE Fukuoka
@kojilin
自己紹介
全てがシンプルだった
List<Item> getRanking(String country) {
String rankingType = dao.getRankingType("JP");
return dao.getRanking(rankingType).stream()
.map(item -> ...)
.collect(toList());
}
複雑化していくシステム
List<Item> getRanking(String country) {
String rankingType a api.getRankingType("JP");
List<String> rankingIds =
searchClient.getRanking(rankingType);
return rankingIds.stream()
.map(dao::getItem)
.map(...)
.collect(toList());
}
Latency が上がっていく
何故サーバーサイドで非同期?
同期なコードを非同期に書き直す
フレームワークを社内製品へ
REST Controller も非同期へ
@RequestMapping("/hello")
public DefferredResult<String> hello() {
DeferredResult<String> deferredResult = new
DeferredResult<>();
... // callback で deferredResult.setResult("hello");
return deferredResult;
}
Thrift とは?
service HelloService{
string hello(1: string name)
}
同期の Iface から非同期の AsyncIface へ
@Override
public String hello(String name) {
return "Hello, " + name + '!';
}
@Override
public void hello(String name,
AsyncMethodCallback<String> resultHandler) {
resultHandler.onComplete("Hello, " + name + '!');
}
ストレージアクセス
MyBatis の API
<E> List<E> selectList(String statement,
Object parameter);
JDBC のアクセスを非同期に
ListeningExecutorService executor;
public <E> ListenableFuture<List<E>> selectList(
String statement, Object parameter) {
return executor.submit(() ->
delegate.selectList(statement, parameter));
}
MongoDB のアクセスを非同期に
private MongoCollection<Model> collection;
public void list(String id, int offset, int limit,
SingleResultCallback<Model> callback) {
collection.find(eq(ID, id))
.skip(offset)
.limit(limit).into(list, callback));
}
MongoDB を ListenableFuture に
public ListenableFuture<List<Model>> list(
String id, int offset, int limit) {
SettableFuture<List<Model>> future =
SettableFuture.create();
collection.find(eq(ID, id))
.skip(offset)
.limit(limit).into(list, (result, t) -> {
if (t != null) { future.setException(t); }
else { future.set(result); }
});
return future;
}
リモート API アクセス
Retrofit と併用
public interface GitHubService {
@GET("users/{user}/repos")
Call<List<Repo>> listRepos(@Path("user") String user);
}
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.build();
GitHubService service = � retrofit.create(GitHubService.class);
Retrofit と GuavaCallAdapterFactory
public interface GitHubService {
@GET("users/{user}/repos")
ListenableFuture<List<Repo>> listRepos(
@Path("user") String user);
}
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.addCallAdapterFactory(
GuavaCallAdapterFactory.create())
.build();
IO 関連のアクセスを全て非同期化
Guava の transform で組み合わせ
ListenableFuture<String> result =
FuturesExtra.syncTransform(
dao.getUser("id"),
user -> user.getName());
ListenableFuture<Image> result =
Futures.transformAsync(
dao.getUser("id"),
user -> apiClient.getIcon(user));
Concurrent 化
移行で大変だったとこ
移行で大変だったとこ
Futures.transformAsync(
dao.getUser("id"),
user -> apiClient.getIcon(user),
executor);
その他
AsyncRetrier
int retryCount = 3;
int delayMillis = 100;
AsyncRetrier retrier = AsyncRetrier.create(
Executors.newSingleThreadScheduledExecutor());
ListenableFuture<List<String>> listFuture =
retrier.retry(() -> api.listByRanking("JP"),
retryCount,
delayMillis);
ConcurrencyLimiter
int maxConcurrentCount = 100;
int maxQueueSize = 1000;
ConcurrencyLimiter<List<String>> concurrencyLimiter =
ConcurrencyLimiter.create(maxConcurrentCount,
maxQueueSize);
ListenableFuture<List<String>> listFuture =
concurrencyLimiter.add(() ->
dao.listByRanking("JP"));
これが複雑になっていくと!?
Futures.transformAsync(
Futures.transformAsync(getRankingType("JP"),
dao::listByRanking,
executor),
ids -> Futures.allAsList(ids.stream()
.map(client::getUserById)
.collect(toList())),
executor);
これが複雑になっていくと!?
Futures.transformAsync(
Futures.transformAsync(getRankingType("JP"),
dao::listByRanking,
executor),
ids -> Futures.allAsList(ids.stream()
.map(client::getUserById)
.collect(toList())),
executor);
これが複雑になっていくと!?
Futures.transformAsync(
Futures.transformAsync(getRankingType("JP"),
dao::listByRanking,
executor),
ids -> Futures.allAsList(ids.stream()
.map(client::getUserById)
.collect(toList())),
executor);
これが複雑になっていくと!?
Futures.transformAsync(
Futures.transformAsync(getRankingType("JP"),
dao::listByRanking,
executor),
ids -> Futures.allAsList(ids.stream()
.map(client::getUserById)
.collect(toList())),
executor);
Dagger Producers で複雑さを軽減
@ProducerModule
public static class RankingGraph {
@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })
interface Component {
ListenableFuture<List<Item>> getRanking();
}
public RankingGraph(Service service, String country) {
...
}
@Produces
public ListenableFuture<String> getRankingType() {
return service.getRankingType(country);
}
@Produces
public ListenableFuture<List<String>> listByRanking(String type) {
return service.listByRanking(type);
}
@Produces
public ListenableFuture<List<Item>> listUsers(List<String> ids) {
return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));
}
}
@ProducerModule
public static class RankingGraph {
@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })
interface Component {
ListenableFuture<List<Item>> getRanking();
}
public RankingGraph(Service service, String country) {
...
}
@Produces
public ListenableFuture<String> getRankingType() {
return service.getRankingType(country);
}
@Produces
public ListenableFuture<List<String>> listByRanking(String type) {
return service.listByRanking(type);
}
@Produces
public ListenableFuture<List<Item>> listUsers(List<String> ids) {
return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));
}
}
@ProducerModule
public static class RankingGraph {
@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })
interface Component {
ListenableFuture<List<Item>> getRanking();
}
public RankingGraph(Service service, String country) {
...
}
@Produces
public ListenableFuture<String> getRankingType() {
return service.getRankingType(country);
}
@Produces
public ListenableFuture<List<String>> listByRanking(String type) {
return service.listByRanking(type);
}
@Produces
public ListenableFuture<List<Item>> listUsers(List<String> ids) {
return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));
}
}
@ProducerModule
public static class RankingGraph {
@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })
interface Component {
ListenableFuture<List<Item>> getRanking();
}
public RankingGraph(Service service, String country) {
...
}
@Produces
public ListenableFuture<String> getRankingType() {
return service.getRankingType(country);
}
@Produces
public ListenableFuture<List<String>> listByRanking(String type) {
return service.listByRanking(type);
}
@Produces
public ListenableFuture<List<Item>> listUsers(List<String> ids) {
return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));
}
}
Dagger が生成したコード
呼び出しコード
ListenableFuture<List<Item>> result =
DaggerRankingGraph_Component
.builder()
.rankingGraph(new RankingGraph(service, "JP"))
.build()
.getRanking();
Dagger を使ったメリット
これで一通り完成
Thread 数の減少
Latency の改善
Latency の改善
CPU 利用率の改善
CompletableFutureが増えてきた
なぜ RxJava2 に移行した?
RxJava2
RxJava2
RxJava2
RxJava2 でサーバサイド
RxJava2 で設計した API
class UserDao {
public Single<User> get(String id){...}
public Maybe<User> find(String id){...}
public Completable delete(String id){...}
public Flowable<User> listAll(){...}
public Single<List<User>> listAllSingle(){...}
}
JDBC のアクセスを RxJava2 に
ListeningExecutorService asyncExecutor;
public <E> Single<List<E>> selectListRx(
String statement, Object parameter) {
return toSingle(asyncExecutor.submit(() ->
delegate.selectList(statement, parameter)));
}
MongoDB を RxJava2 に
Public Single<List<Model>> listRx(
String id, int offset, int limit) {
SettableFuture<List<Model>> future =
SettableFuture.create();
collection.find(eq(ID, id))
.skip(offset)
.limit(limit).into(list, (result, t) -> {
if (t != null) { future.setException(t); }
else { future.set(result); }
});
return toSingle(future);
}
MongoDB を RxJava2 に
Public Single<List<Model>> listRx(
String id, int offset, int limit) {
return Flowable.fromPublisher(
collection.find(eq(ID, id))
.skip(offset)
.limit(limit))
.toList();
}
Retrofit と RxJava2CallAdapterFactory
public interface GitHubService {
@GET("users/{user}/repos")
Single<List<Repo>> listRepos(
@Path("user") String user);
}
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.addCallAdapterFactory(
RxJava2CallAdapterFactory.create())
.build();
非同期の組み合わせ
Single<List<User>> ranking =
api.getRankingType("JP")
// Single<List<String>>
.flatMapSingle(this::listByRanking)
// Flowable<String>
.flatterAsFlowable(list -> list)
.concatMapEager(this::getUserById)
.toList();
@ProducerModule
public static class RankingGraph {
@ProductionComponent(modules = { RankingGraph.class, ExecutorModule.class })
interface Component {
ListenableFuture<List<Item>> getRanking();
}
public RankingGraph(Service service, String country) {
...
}
@Produces
public ListenableFuture<String> getRankingType() {
return service.getRankingType(country);
}
@Produces
public ListenableFuture<List<String>> listByRanking(String type) {
return service.listByRanking(type);
}
@Produces
public ListenableFuture<List<Item>> listUsers(List<String> ids) {
return Futures.allAsList(ids.stream().map(service::getItemById).collect(toList()));
}
}
ListenableFuture<List<Item>> result =
DaggerRankingGraph_Component
.builder()
.rankingGraph(new RankingGraph(service, "JP"))
.build()
.getRanking();
リトライ
api.listByRanking("JP")
.retry(10)
.flatMap({
...
})... //
リトライ
api.listByRanking("JP")
.retryWhen(throwableFlowable -> {
return throwableFlowable.flatMap(thrown -> {
if (thrown instanceof IOException) {
return Flowable.timer(1000,
MILLISECONDS);
}
return Flowable.error(thrown);
}).flatMap({
...
})... //
移行で大変だったとこ
移行で大変だったとこ
Single.just("koji")
.map(id -> null) // NPE !!!
...//
移行で大変だったとこ
移行で大変だったとこ
移行で大変だったとこ
http://www.nurkiewicz.com/2017/08/flatmap-vs-concatmap-vs-concatmapeager.html
Flowable.just("koji", "kishida", "tempo")
.flatMapSingle(id -> api.fetchUser(id))
...//
Flowable.just("koji", "kishida", "tempo")
.concatMapEager(id ->
api.fetchUser(id).toFlowable())
...//
移行で大変だったとこ
Single<User> user = client.getUser("1234");
Single<Profile> first = user.map(...)...;
Single<Company> second = user.map(...)...;
Single.zip(first, second, (profile, address) -> {
...
})...;
移行で大変だったとこ
Single<User> user = client.getUser("1234").cache();
Single<Profile> first = user.map(...)...;
Single<Company> second = user.map(...)...;
Single.zip(first, second, (profile, address) -> {
...
})...;
移行で大変だったとこ
api.getRankingType("JP")
.flatMap(type -> getRanking(type)) // どの thread?
.flatMap(ids -> xxx) // どの thread?
.map(...)
.... // 色々
移行で大変だったとこ
api.getRankingType("1234")
.ovserveOn(Schedulers.from(executor))
.flatMap(type -> getRanking(type))
.ovserveOn(Schedulers.from(executor))
.flatMap(ids -> xxx)
.map(...) // map 且つ重くないならいらない
.ovserveOn(Schedulers.from(executor))
.flatMap(...)
.... // 色々
その他
例えば filterAsync
Flowable.just(...)
.compose(FlowableTransformers.filterAsync(
user -> {
return dao.isAvailable(user);
}))
.map(...)
...
これから...
資料とか
Q&A