1 of 39

Time-series code competitionで

生き残るには

2021/08/17 第3回分析コンペLT会

2 of 39

Nomi (能見)

Kaggle master / @nyanp / @nyanpn

Software engineer

Recent 5 competitions:

暫定7位

★: Time-series code competition

3 of 39

Time-series code competition?

  • Code competition = コードをKaggle側の環境で実行
    • 実行時間制限がある
  • Time-series code competition (以下Time-seriesコンペ) = 専用API経由で入出力する、Code competition特殊形
  • 2019年以降、テーブルデータで4回開催
    • NFL, Riiid, Jane, MLB

▲Riiidでの表記 (細かい表現はコンペによって異なるが、Dataタブで確認できる)

4 of 39

どんな仕組み?

  • APIから時間順にデータが渡され、逐次予測値を返す
  • 未来の情報が使えない設計
    • Solutionが実用的
  • 通常は推論がバッチ処理だが、ストリーム処理が必要
  • なんか大変そう…?

import riiideducation

env = riiideducation.make_env()

iter_test = env.iter_test()

for (test_df, sample_pred_df) in iter_test:

# 前処理&予測

sample_pred_df['target'] = 0.5

# ここで予測値を提出

# predictを呼ばずに次データを取ろうとすると例外

env.predict(sample_pred_df)

5 of 39

どんな仕組み?

  • APIから時間順にデータが渡され、逐次予測値を返す
  • 未来の情報が使えない設計
    • Solutionが実用的
  • 通常は推論がバッチ処理だが、ストリーム処理が必要
  • なんか大変そう…?

import riiideducation

env = riiideducation.make_env()

iter_test = env.iter_test()

for (test_df, sample_pred_df) in iter_test:

# 前処理&予測

sample_pred_df['target'] = 0.5

# ここで予測値を提出

# predictを呼ばずに次データを取ろうとすると例外

env.predict(sample_pred_df)

実際大変

6 of 39

A. ストリーム処理に対応しつつ、

高速でロバストなコードを書くのが大変

Q. Time-seriesコンペ、何が大変?

7 of 39

Q. Time-seriesコンペ、何が大変?

A. ストリーム処理に対応しつつ、

高速でロバストなコードを書くのが大変

とはいえ、ポイントを押さえれば全然怖くない!

8 of 39

本日のお品書き

Time-seriesコンペで生き残るために重要な4つのポイントについて、個人的に気を付けている点を共有します

1. Train/Testの両対応

2. 状態管理の設計

3. コード構成とデバッグ

4. エラーハンドリング

※スタイルの好みやコンペ毎の差もあるので、一意見として見て下さい

9 of 39

本日のお品書き

Time-seriesコンペで生き残るために重要な4つのポイントについて、個人的に気を付けている点を共有します

1. Train/Testの両対応

2. 状態管理の設計

ストリーム処理に

対応しつつ

高速でロバストな

コードを書く

3. コード構成とデバッグ

4. エラーハンドリング

※スタイルの好みやコンペ毎の差もあるので、一意見として見て下さい

10 of 39

Train/Testの両対応

1

11 of 39

Train/Testの両対応

  • Test dataは基本的に1行ずつ(※)処理
  • 当然、前処理も特徴量生成も全て1行ずつ処理
  • 普段通りTrain用のコードを書くとサブで行き詰まる
    • あれ…1st Submitまで遠い…?
  • Train(バッチ)とTest(ストリーム)にどう両対応するか
    • Feature Storeが解決しようとしている課題と同じ

submission.csv

※説明の簡略化のために1行としていますが、1ループで同時刻の複数データが来ることも(コンペによる)

12 of 39

よくある実装方針

①Train向けのバッチ処理関数を、Testにも使う?

②Train向けとTest向けで別々のコードを書く?

13 of 39

よくある実装方針

①Train向けのバッチ処理関数を、Testにも使う?

②Train向けとTest向けで別々のコードを書く?

× バッチ用処理を無理やりTestに使うと遅くなりがち

× Testの予期しない入力で死にがち

× 実装の違いでバグを生みがち

× アイデアの実装が億劫になりがち

14 of 39

よくある実装方針

①Train向けのバッチ処理関数を、Testにも使う?

②Train向けとTest向けで別々のコードを書く?

× バッチ用処理を無理やりTestに使うと遅くなりがち

× Testの予期しない入力で死にがち

× 実装の違いでバグを生みがち

× アイデアの実装が億劫になりがち

(最終形としては)両方おすすめできない

15 of 39

おすすめ:全部ストリーム処理

  • Test向けに書いた関数を、Trainにもfor-loopで適用
    • TrainもTestのようにストリーム処理する
  • Train側の処理が遅い?
    • 並列化
    • 特徴量をfeather等のファイルにキャッシュ

16 of 39

実例: MLBコンペ

@feature(['divisionId', 'divisionRank', 'leagueRank', 'wildCardRank'])

def f015_latest_standings(ctx: Context) -> Dict:

if ctx.team is None or len(tx.team.standings) == 0:

return empty_feature(ctx.current_feature_name)

standings = ctx.team.standings

return {

'divisionId': standings['divisionId'][-1],

'divisionRank': standings['divisionRank'][-1],

'leagueRank': standings['leagueRank'][-1],

'wildCardRank': standings['wildCardRank'][-1]

}

デコレータで作る特徴量を宣言

Contextには状態管理クラス(後述)のインスタンスと、行を特定する情報(MLBでは日付✖️playerId)が入っている

17 of 39

デコレータの中身

def feature(columns: List[str]):

def _feature(func):

@functools.wraps(func)

def wrapper(*args):

ctx = args[0]

assert isinstance(ctx, Context)

assert len(args) == 1

ctx.current_feature_name = func.__name__

try:

return func(ctx)

except Exception:

msg = f"WARNING: exception in {func.__name__}: {traceback.format_exc()}"

warnings.warn(msg)

# guard

if ctx.fallback_to_none:

return {c: None for c in columns}

else:

raise

_FEATURE_COLUMNS[func.__name__] = columns

_FEATURES[func.__name__] = wrapper

return wrapper

return _feature

特徴量関数の中で例外が出たら、

欠損値にフォールバック

(本番の推論時のみ)

デコレータ側でグローバル変数に特徴量の名前やスキーマ情報を自動登録

18 of 39

特徴量生成関数

  • 推論時はこの関数をそのまま呼ぶ
  • 学習時にはtrain.csvをチャンクに分割してこれを並列に呼び、featherに保存

def make_feature(base_df: pd.DataFrame, store: Store, feature_list: List[str] = None):

feature_functions = {normalize_feature_name(k): get_feature(k) for k in feature_list}

features = []

for i, row in tqdm(base_df.iterrows()):

date = row["dailyDataDate"]

player = store.players[row["playerId"]].slice_until(date)

ctx = Context(store, player, date)

feature = {}

for fname, func in feature_functions.items():

feature.update(func(ctx))

features.append(feature)

return pd.DataFrame(features)

デコレータで登録しておいた特徴量生成関数の実体を引いてくる

1行毎に特徴量生成関数を順番に呼ぶ

19 of 39

何が嬉しいのか

  • ストリーム処理に統一することで…
    • Train/Testでの計算結果が確実に一致
    • 推論時のパフォーマンスを考慮しやすい
    • Trainの行数回だけローカルで呼び出すので、自然とエラーが起きにくいコードになる(重要)
  • 特徴量を細かく関数に分けることで…
    • テストと計測がしやすい
      • 重い関数だけcython/numbaを使っても良い
    • エラー時の被害の範囲が抑えられる

20 of 39

状態管理の設計

2

21 of 39

状態管理って?

  • 予測に過去の情報を使うなら、Train+Time-Series APIから得たデータを蓄積する必要がある
    • つまり、どこかに状態を持たせることになる
  • Notebook上に状態の更新が散らばると大変
    • OOMになると手詰まりに…
    • チームマージのハードルも爆上げ

22 of 39

銀の弾丸は無いが…

  • 状態管理する場所を1箇所にまとめる
  • どうデータを貯めるのか、ちゃんと考えて書く
    • データの形式と粒度
      • 生データを貯めるか、集計済みデータを貯めるか
    • データの置き場(メモリに乗る?ストレージ?)

23 of 39

どんなデータ構造で状態を持つ?

  • pandasでも良いが、遅いコードが簡単に書けすぎる(個人の感想です)
  • 自分の場合…np.arrayをdfぽく扱えるwrapperを自作して使用
    • データのappendが素のnp.arrayより高速(※)
    • 挿入時に意図しない型の値が来たらnanに自動置換
    • 任意の過去の状態をゼロコピーで取得できる
      • 特徴量エンジニアリング時に未来の情報のリークを防げる
    • すべての状態をこのwrapperのインスタンスで管理
    • コード: https://github.com/nyanp/streamdf

※生Listと同じ、ならし計算量O(1)。データが溜まるほど重くなる処理は推論時間の見積もりが難しい為、定数時間でappendできると嬉しい

24 of 39

どんなデータ構造で状態を持つ?

  • pandasでも良いが、遅いコードが簡単に書けすぎる(個人の感想です)
  • 自分の場合…np.arrayをdfぽく扱えるwrapperを自作して使用
    • データのappendが素のnp.arrayより高速(※)
    • 挿入時に意図しない型の値が来たらnanに自動置換
    • 任意の過去の状態をゼロコピーで取得できる
      • 特徴量エンジニアリング時に未来の情報のリークを防げる
    • すべての状態をこのwrapperのインスタンスで管理
    • コード: https://github.com/nyanp/streamdf

※生Listと同じ、ならし計算量O(1)。データが溜まるほど重くなる処理は推論時間の見積もりが難しい為、定数時間でappendできると嬉しい

いや…ここまでの話…

めちゃくちゃ面倒くさそうでは?

25 of 39

ここまで頑張った実装必要?

  • 正直ケースバイケース。コンペと解法しだい
  • NNだけならNotebook1本で乗り切れるかも
    • Time-seriesコンペは「GBDTで」「履歴系の特徴量を量産して」「チームで」「金圏を目指す」のが一番ハード(RiiidとMLBでこれ頑張った人えらい)
  • 最初はサブを気にせずに書いて勘所をつかんでから、推論向けの設計に取り掛かることをおすすめ
    • 初手から「最強のパイプライン」を目指すのは一般にリスキー
    • ここに書いていることを最初から全部盛り込むのではなく、実際に行き詰まってから必要な箇所をチェリーピックしていくほうが良いかも

26 of 39

コード構成とデバッグ

26

3

27 of 39

コードをどこで書くか

  • Kaggle環境でコードを書き切るのはつらい
  • 原則、手元で書いてGitで管理するのをおすすめ
    • Time-seriesコンペは信頼性命→デバッグ/テスト/管理しやすく書きたい
    • 自分はロジックを.pyに全部書き、Notebookは薄いwrapperにする方向に落ち着いた (MLBでは複数.pyに分割)
  • 分割したコードをどうKaggle環境で実行するの?
    • 方法1:BASE64エンコードしてNotebookにコピペ
    • 方法2:Datasetとしてアップロード

参考:

28 of 39

手元でのデバッグ

  • Time-seriesコンペでは専用モジュールがKaggle環境側で提供されるので、手元では推論Notebookが動かない
  • API Emulatorを用意しておくと手元でデバッグできる
  • 手元環境はなるべく本番と一致させておく
    • 公式のDockerイメージを使うのが一番確実 (私は面倒臭くてまだやったことが無いですが…)
    • https://github.com/Kaggle/docker-python

29 of 39

ベンチマーク

  • 個々の高速化テクニックも大事だが、しっかり高速化するなら再現性のある時間測定基盤を整えるのが最重要だと思う(モデリングでCVが大事なのと一緒)
  • Notebookから実装本体を.pyに移動することで、本番Notebookを汚さずベンチマークスクリプトが書ける

30 of 39

エラーハンドリング

4

31 of 39

コードコンペとエラー

  • コードコンペでサブ時にエラーが出ても、6種類のメッセージのどれかが出るだけ(※)で詳細は分からない
    • 切り分けが辛いので、あまりエラーを出したくない
  • 2-stage制の場合、データ差し替え後の2nd stageでエラーが出ると即失格
    • Time-seriesコンペ過去3/4が2-Stage制
    • どんなデータが来ても死なないコードを書きたい

32 of 39

Time-seriesコンペで

最低限やること

  • 推論ループで例外をキャッチして、最悪でsample_submissionのまま推論は遂行
  • ダメージを推論1回分に抑えられる(状態管理への副作用は別)
  • コンペ初期は問題の発見が遅れるのでむしろ無いほうが良い。終盤で忘れずに入れる

for test_df, sample_df in iter_test:

try:

# 処理

...

except Exception:

pass

# 何があっても推論は死守

env.predict(sample_df)

33 of 39

やっておいた方が良いこと

  • 例外を可能な限り細かく拾い、適切にフォールバック
    • 例:2nd-stageで学習中に例外→1st-stageで学習したモデルを使う
  • API Emulatorに壊れたデータを投入してテスト
    • 例外処理であれ、一度も動かしたことがないコードは本番で上手く動かない(例外処理コード中で例外が出たり…)
  • 処理時間の見積もり
    • Emulatorでの実測も併用すると良い

34 of 39

心構え: 暗黙の仮定を意識する

2nd stageのデータ仕様について、Kaggleでは十分な情報が与えられないことが多い。自分のコードに暗黙の仮定が入っていないか注意

  • intでカラムを処理→欠損値が来たら?
  • idで特徴量をleft join →idが重複したら?
  • Time-Series APIは常に1行ずつ返す
  • Private Testは連続かつ完全?

NFLで実際に発生してDiscussionが荒れた

MLBでは1日に複数試合する選手が稀にいた

MLB→APIの説明を見ると、1行ずつとは書いていない

35 of 39

ここまでをまとめたコード例

実例: 推論Notebookの個人的な基本形

def in_kaggle():

return 'kaggle_web_client' in sys.modules

if in_kaggle():

import mlb

else:

mlb = Emulator(...)

env = mlb.make_env()

iter_test = env.iter_test()

store = Store(...)

for test_df, sample_prediction_df in iter_test:

try:

# 状態の更新

store.update(test_df)

# 特徴量の作成

feature_df = make_feature(sample_prediction_df, store)

# 推論

sample_prediction_df['target'] = predict(feature_df, model)

except Exception:

warnings.warn(traceback.format_exc())

env.predict(sample_prediction_df)

Kaggle環境なら実物、ローカルならEmulatorを使う

中で例外が起きても、predictは確実に行う

推論用dfへの代入は1回にまとめる(例外安全性の担保)

具体的な処理は全て.pyに追い出しておく

状態はこのStoreで一元管理

36 of 39

37 of 39

まとめ

  • Time-seriesコンペ:ストリーム処理に対応しつつ高速でロバストに書くのが大変
  • 4つの観点でポイントを解説
    • Train/Testの両対応:同じ関数で1行ずつ処理しよう
    • 状態管理:(必要そうなら)しっかり設計しよう
    • デバッグ:手元で推論コードを動かせるようにしよう
    • エラーハンドリング:
      • 例外をしっかりキャッチしよう
      • 暗黙の仮定を意識しよう

38 of 39

Message

  • Time-seriesコンペは大変だが、そこを乗り切れば現状メダルは取りやすい
  • バッチとストリームの両対応大変とか、状態管理を誰がどう受け持つかとか、アーティファクトの依存関係をどう整理するかといったTime-seriesコンペで苦しむ部分は、現実の分析基盤でもリアルな課題(のはず)。自分で解いてみると、課題感が実感できて良いと思う
  • 今後もTime-seriesコンペは出てくるはずなので、ぜひ参加してみましょう

38

39 of 39

関連リンク

39

Kaggle Days Tokyo, “How to succeed in code (kernel) competitions”

https://www.youtube.com/watch?v=HCg_ewbNEss

My solution of MLB player digital engagement prediction

https://github.com/nyanp/mlb-player-digital-engagement

Code Competitions - Errors & Debugging Tips

https://www.kaggle.com/code-competition-debugging

Discussion from NFL Big Data Bowl, “What kind of effort did you make to make your script robust?

https://www.kaggle.com/c/nfl-big-data-bowl-2020/discussion/120375

Discussion from Riiid Answer Correctness Prediction, “A few notes...

https://www.kaggle.com/c/riiid-test-answer-prediction/discussion/196942