GCPでつくるデータ処理パイプライン

Nagai Yoichi (id:orfeon)

Google Cloud Certified Professional
Data Engineer / Cloud Architect

アジェンダ

  • 今日話すこと
    • バッチとストリーム処理の概要と問題
    • Apache Beam と Cloud Dataflow の概要と使い方
  • 今日の目標
    • Apache Beam (Dataflow) が役立つ領域を知ってもらう
    • データ処理でDataflow使ってみたいなと思ってもらう

2つのタイプのデータ処理

  • バッチ処理
    • 処理対象となるデータの範囲が有限
      • 全データが揃っている
      • データをまとめて処理できる
  • ストリーム処理
    • 処理対象となるデータの範囲が(時間軸上)無限
      • データが都度入力される
      • データをその時々の範囲でその都度処理する

同じ処理をバッチとストリーム両方で扱わないといけないケースが意外と多い

データ処理ユースケース① 〜イベント検知

  • イベントをウォッチして一定の条件を満たした場合にお知らせする
    • 例: ログ異常検知
      • ログのWARN、ERRORが一定期間続いたらアラートをあげる
    • 例: トレーディングボット
      • 銘柄の値動きをウォッチし、指定した条件を満たせば売買を実行
    • 処理のために過去の状態を持つストリーム処理
  • 適切な条件の設定のため、バッチで過去データを試行錯誤することも
    • ログの一定期間の幅や期間内の閾値を調整してシミュレーションしたり

データ処理ユースケース② 〜機械学習の特徴量生成

  • 時系列系の特徴量
    • 例: サイト訪問者へのリアルタイムなレコメンド
      • ユーザの直近の閲覧行動
    • 例: FX自動売買ボット
      • 直近1分間の通貨ペアごとの価格や取引数の移動平均
  • 学習時のデータはバッチで作れるが、予測時はリアルタイムに作る場合も
  • リアルタイムに作ろうとすると意外に面倒なことも

バッチとストリームを扱う場合の問題

  • 似た処理をしているのに別々に対応が必要
    • プログラミング
    • 実行環境
  • インフラ管理
    • バッチ処理
      • 処理のフェーズにより適切な並列数が変わる
    • ストリーム
      • 時間帯により入力データ量が変動する

GCP上で分散データ処理するためのサービス

  • プログラミングモデル
    • バッチ処理とストリーム処理をシームレスに記述
    • 複数の実行環境に対応
      • Dataflow, Spark, Flink, Apex, Gearpump,..
    • 対応言語: Java, Python, Go
  • 分散実行インフラ
    • フルマネージド
      • サーバ管理不要
    • オートスケール
      • 負荷に応じて自動でスケール
    • GCP各種ストレージと連携容易

GCPの中での Beam / Dataflow の位置付け

  • Cloud Dataflow + Beam
    • データの処理と分析を担う
  • Cloud Dataprep
    • 非プログラマー向け
    • UI上で処理内容をレシピとして記述して実行
    • 裏側ではDataflowが動く
  • Cloud Dataproc + Hadoop
    • 既存Hadoop/Sparkソフトウェアを動かしたい人向け
    • インフラ管理を最小限に

バッチとストリームを統合したプログラミングフレームワーク

Apache Beam のプログラミングモデル

  • データの処理の流れを宣言的に記述する
  • 抽象データ(PCollection)を処理(PTransform)でつないでグラフを作る

PCollection

PCollection

PTransform

PTransform

PCollection

Pipeline p = Pipeline.create(options);

PCollection<String> pc1 = p.apply(TextIO.Read.from(~))

PCollection<MyClass> pc2 = pc1.apply(new PTransformSub1())

PCollection<MyClass> pc3 = pc1.apply(new PTransformSub2())

PCollection<MyClass> pc4 = PCollectionList.of(pc2).and(pc3)

.apply(Flatten.pCollection())

Javaコード例

※データにはPTransformのメソッドを経由してしか触れない

PTransformのサブクラス

よく使う処理はSDKで用意してくれている

※ WindowIntoはParDoのサブクラス

PTransformer

引数

内容

Read, Write

BigQuery,GCS,DatastoreなどへのIO

各種Source,SinkへのIO

ParDo

DoFnサブクラス

1レコードごとの処理の記述

(Co)GroupByKey

-

GroupBy, Joinの適用宣言

Combine*

CombineFnサブクラス

集約処理の記述

Partition

PartitionFnサブクラス

PCollectionを分割

Flatten

PCollectionのリスト

複数のPCollectionを統合

さらに簡易な便利クラス

  • ParDo系
    • Map
      • 1レコード入力 -> 1レコード出力
    • FlatMap
      • 1レコード入力 -> 複数レコード出力
  • Combine系
    • Count, Mean, Sum など

Python版はParDo系の処理はだいたい

こちらで書ける

IO対応

  • 入出力に各種ファイル、データストア、メッセージサービスに対応
    • GCP各種サービスの他、AWSのS3などにも対応
  • 単一パイプラインで複数入出力をまとめて扱える
  • バッチ、ストリームを同時起動可能
    • バッチで集計値を算出してストリームデータに
      集計値を埋め込んでいく(例: TF-IDF)

Apache Beam のフルマネージドな分散実行環境

Deploy

Tear Down

ジョブをサブミットするとクラスタが自動で立ち上がる

ジョブをsubmitすると
クラスタが自動で立ち上がる

ジョブが完了すると
クラスタも自動で落ちる

Workers are created at job submission and torn down when the job completes (or not for streaming jobs!).

負荷に応じてオートスケール

タスクの負荷状況に応じて
ノード数が自動でスケール

Cloud Dataflow configures the number of workers throughout the job lifetime to deal with changes in data size within a batch pipeline and changes in throughput in a streaming pipeline.

100分

65分

vs.

負担の大きいタスクを動的にリバランス

Cloud Dataflow identifies “straggler shards” and automatically rebalances their workloads across available workers which can drastically reduce the overall execution time of batch jobs.

オートスケール例

  • Wikipedia(日本語)の全ページを形態素解析してSpannerに保存するジョブ
    • 約230万ページ、11GBのXML
  • 負荷に応じてオートスケール
    • 起動後に50台までスケール
    • 処理が終わるとシャットダウン



ジョブは 17分弱 で完了
掛かった金額は $0.54

GCPとの連携

  • 各種ファイル、データストア、メッセージサービスへのIO対応
    • GCP上での単純なデータの変形、移動でもよく利用
  • パイプラインのTemplate実行
    • パイプラインをGCSに登録してパラメータ渡して起動できる
      • AppEngineからcronによる定期実行など
    • よく使うジョブのTemplateはGoogleが公式で提供
      • 各種GCPサービス間のバックアップ系

パイプライン例 〜Twitter炎上検知

炎上検知
Cloud Dataflow

Tweetストリーム
Cloud Pub/Sub

炎上スコア保存
BigQuery

センチメント分析
Natural Language API

過去Tweet保存
BigQuery

Twitter

ストリーム送信

流れてくるTweetのセンチメントスコアを算出
一定時間ごとにスコアの平均をとって一定以上だったらアラートを送る

アラート
Cloud Pub/Sub

コード例

パイプライン実行

python -m tweet_sentiment.py \

--runner DataflowRunner \

--project YOUR_GCP_PROJECT \

--temp_location gs://YOUR_GCS_BUCKET/tmp/ \

--streaming

runner: 実行環境を選択、DirectRunnerだとローカルで実行

temp_location: 実行中の一時ファイルの出力先GCSパス

streaming: ストリーム実行する場合はここで指定するだけ!

まとめ

  • Apache Beam
    • バッチ処理とストリーム処理を簡易に記述するためのプログラミングモデル
    • 複数の実行基盤に対応(Spark, Flink, ...)
  • Cloud Dataflow
    • Apache Beam実行基盤としてのGCPのサービス
    • フルマネージド
    • オートスケール

バッチ処理とストリーム処理をGCP上で簡単に動かすことができる

宣伝: 9/19 に Dogrun やります!

Google Cloud Next Tokyo 2018
(9/19@ザ・プリンス パークタワー東京)

初日の夜にGCP関連コミュニティで
イベントやります!

Dataflow のPMも来られるそうなので、
質問ぶつけてみたい人はぜひ!
(Dataflow部屋まだ少ないのでチャンス)

おまけ: Cloud Dataflow Shuffle (2018/07/25 GA!)

  • シャッフルがたくさん発生するバッチジョブを効率的に実行
    • シャッフルが発生する -> GroupByKey, CoGroupByKey(Join), Combine を利用するジョブ
    • ジョブが早く終わったり、コストが安くなったりする
  • シャッフル作業を共通サービスに移譲
    • 実行時にオプションで `--experiments=shuffle_mode=service` を指定するだけ

価格がほぼ ⅛ に!

DevFest18 GCPでつくるデータ処理パイプライン - Google Slides