1 of 53

14章 分散プログラミング

2020 年 8 月 16 日

maton

2 of 53

本章で学ぶこと

  • 分散プログラミングとは何か
  • 分散プログラミング in Haskell
    • Cloud Haskell
    • メッセージパッシングモデル
  • 例題を通じた分散プログラミング技法の習得
    • 小さな例: ping
    • 少し大きな例: 分散チャットサーバ
    • 練習問題: 分散KVS

3 of 53

分散プログラミングとは何か

= 複数のマシンで�プログラムを走らせるための�プログラミング技法

4 of 53

なぜ複数のマシンで動かすのか

より大きな並列性

1台の性能に制約がある�↓�複数台の並列計算で�パフォーマンス向上

高速なサービス提供

クライアントから�サーバが遠い�↓�クライアント近くにサーバを�配置して通信速度向上

マシンの特性を活かす

異なるリソースを持つ�マシンを有効に扱える

スケール

アウト

クライアント

との通信

応答

性能

計算の

移譲

計算

性能

ストレージ性能

データの

保存

5 of 53

単一マシン(共有メモリ環境)との違い

マシンは壊れやすい

  • マシンを増やすほど、どこかが壊れている確率が高くなる
  • 計算の失敗にどう対処するかはアプリケーション固有の問題
  • プログラマに失敗の観測方法を提供する必要がある

通信コストは大きい

  • 単一マシンの共有メモリの並行・並列計算とは異なり、ネットワーク通信が必要
  • 共有メモリアクセス100nsに対して、日米間の通信は100msかかる(100万倍!)※

一貫性の保証は難しい

  • STMは共有メモリ環境を想定しているので分散環境ではそのまま使えない
  • アプリケーション毎に一貫性の要求が異なり、合意アルゴリズムも異なる
  • アプリケーション層で実装すべき

※参考:Latency numbers every programmer should know

https://gist.github.com/hellerbarde/2843375

6 of 53

分散プログラミング in Haskell

Cloud Haskell https://haskell-distributed.github.io

  • Erlang 式のメッセージパッシング
    • メッセージはHaskellのデータ(型付き)
  • 並行・非決定性プログラミングモデルによる並列性
    • 13章 スレッドを用いた並列プログラミング と同じ背景
      • 通信(IO)は失敗する可能性がある
      • ノードは突然利用不能になる
    • プロセスの死を受信できるようになっている
  • プロセスの抽象化
    • ピア(遠隔ノード)の自動検索
    • 遠隔ノードへのプロセス生成

ノード

ノード

プロセス

プロセス

プロセス

メッセージ

ノードより下層が単一マシンなのか

複数マシンなのか等は意識しなくてよい

7 of 53

ステップ1

単一ノードで動く ping

Node

Ping

Pong

8 of 53

Ping

Pong

9 of 53

実装に用いるAPI

Control.Distributed.Process in distributed-process

data Process -- Monad と MonadIO のインスタンス

data NodeId -- Eq, Ord, Show, Typeable, Binary のインスタンス

data ProcessId -- Eq, Ord, Show, Typeable, Binary のインスタンス

getSelfNode :: Process NodeId

getSelfPid :: Process ProcessId

spawn :: NodeId -> Closure (Process ()) -> Process ProcessId

send :: Serializable a => ProcessId -> a -> Process ()

expect :: Serializable a => Process a

terminate :: Process a

say :: String -> Process ()

※ Closure とか Serializable については後述

10 of 53

Process モナド

  • プロセスのアクションを記述するモナド
    • OS プロセスとは異なり、あくまで Cloud Haskell における抽象化
    • Haskell のスレッドに近い
  • 相違点

Thread

Process

生成

ローカルのみ

ローカル/リモート

データの共有

共有メモリ(MVar/TVar/IORef)

メッセージパッシングのみ

IO の実行

STM内不可/それ以外は可

liftIO すれば可

11 of 53

ping の実装 | メッセージ型の定義

  • Ping/Pong には送信者の pid を乗せる
  • ネットワーク越しの通信のためシリアライズが必要

import Data.Binary

import Data.Typeable

import GHC.Generics (Generic)

data Message = Ping ProcessId

| Pong ProcessId

deriving (Typeable, Generic) -- <1>

instance Binary Message -- <2>

class (Binary a, Typeable a) => Serializable a

12 of 53

ping の実装 | ping サーバ

  1. Ping を受信する (from に親 pid が入る)
    • expect の型は推論される(今回は a = Message )
    • Message 以外のデータは飛ばし、空ならブロックされる
  2. Ping の受信をログに流す
  3. 自分の pid (子 pid )を取得する
  4. Pong に子 pid を乗せて親 pid に送信する

pingServer :: Process ()

pingServer = do

Ping from <- expect -- <1>

say $ printf "ping received from %s" (show from) -- <2>

mypid <- getSelfPid -- <3>

send from (Pong mypid) -- <4>

remotable ['pingServer]

expect :: Serializable a

=> Process a

send :: Serializable a

=> ProcessId

-> a

-> Process ()

13 of 53

ping の実装 | ping サーバ | remotable?

  • 定義したProcessアクションをリモートプロセスで�実行できるようにする必要がある
    • 関数(クロージャ)のシリアライズは型検査の拡張が必要
    • さらに、クロージャ定義の時点で環境のシリアライズが必要

⇒ Template Haskell でクロージャのシリアライズを簡易化� 詳しくは @tanakh さんの資料論文を見てください…

pingServer :: Process ()

pingServer = do

Ping from <- expect -- <1>

say $ printf "ping received from %s" (show from) -- <2>

mypid <- getSelfPid -- <3>

send from (Pong mypid) -- <4>

remotable ['pingServer]

?

子プロセスで実行する関数を送ってる

14 of 53

ping の実装 | 親プロセス

master :: Process ()

master = do

node <- getSelfNode -- <1>

say $ printf "spawning on %s" (show node)

pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>

mypid <- getSelfPid -- <3>

say $ printf "sending ping to %s" (show pid)

send pid (Ping mypid) -- <4>

Pong _ <- expect -- <5>

say "pong."

terminate -- <6>

15 of 53

ping の実装 | 親プロセス 1/6

自身のノードIDを得る (プロセスを作る前段階)

※今回は単一ノードなので親/子プロセスは同一ノード上に作る

master :: Process ()

master = do

node <- getSelfNode -- <1>

say $ printf "spawning on %s" (show node)

pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>

mypid <- getSelfPid -- <3>

say $ printf "sending ping to %s" (show pid)

send pid (Ping mypid) -- <4>

Pong _ <- expect -- <5>

say "pong."

terminate -- <6>

図には無いですが…

getSelfNode :: Process NodeId

16 of 53

ping の実装 | 親プロセス 2/6

pingServer アクションを TH でクロージャ化し、�自身のノードにプロセスを作成する

master :: Process ()

master = do

node <- getSelfNode -- <1>

say $ printf "spawning on %s" (show node)

pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>

mypid <- getSelfPid -- <3>

say $ printf "sending ping to %s" (show pid)

send pid (Ping mypid) -- <4>

Pong _ <- expect -- <5>

say "pong."

terminate -- <6>

spawn :: NodeId

-> Closure (Process ()) -- クロージャ化されたアクション

-> Process ProcessId -- 子pid

say :: String -> Process ()

17 of 53

ping の実装 | 親プロセス 3/6

親 pid を取得する

master :: Process ()

master = do

node <- getSelfNode -- <1>

say $ printf "spawning on %s" (show node)

pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>

mypid <- getSelfPid -- <3>

say $ printf "sending ping to %s" (show pid)

send pid (Ping mypid) -- <4>

Pong _ <- expect -- <5>

say "pong."

terminate -- <6>

getSelfPid :: Process ProcessId

18 of 53

ping の実装 | 親プロセス 4/6

親 pid を Ping に乗せて、子プロセスに送る

master :: Process ()

master = do

node <- getSelfNode -- <1>

say $ printf "spawning on %s" (show node)

pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>

mypid <- getSelfPid -- <3>

say $ printf "sending ping to %s" (show pid)

send pid (Ping mypid) -- <4>

Pong _ <- expect -- <5>

say "pong."

terminate -- <6>

send :: Serializable a => ProcessId -> a -> Process ()

19 of 53

ping の実装 | 親プロセス 5/6

子プロセスが Pong を返してくるのを待つ

master :: Process ()

master = do

node <- getSelfNode -- <1>

say $ printf "spawning on %s" (show node)

pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>

mypid <- getSelfPid -- <3>

say $ printf "sending ping to %s" (show pid)

send pid (Ping mypid) -- <4>

Pong _ <- expect -- <5>

say "pong."

terminate -- <6>

expect :: Serializable a => Process a

20 of 53

ping の実装 | 親プロセス 6/6

Pong が届いたら終了

※今回の場合は明示的に terminate しなくても終了してくれる

master :: Process ()

master = do

node <- getSelfNode -- <1>

say $ printf "spawning on %s" (show node)

pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>

mypid <- getSelfPid -- <3>

say $ printf "sending ping to %s" (show pid)

send pid (Ping mypid) -- <4>

Pong _ <- expect -- <5>

say "pong."

terminate -- <6>

terminate :: Process a

21 of 53

ping の実装 | main 関数

ネットワーク層のバックエンドを初期化�(ヘルパー関数を使うので詳細は割愛)

main :: IO ()

main = distribMain (\_ -> master) Main.__remoteTable

サンプル用のヘルパー関数

ネットワーク層のバックエンドの�初期化関数を呼んでいる

親プロセス

[NodeId] -> Process ()

今回の例では [NodeId] は不要

リモート実行のためのメタデータ

remotable されたアクションのルックアップテーブル�(THによって生成)

22 of 53

動作例

ping

23 of 53

ステップ2

複数ノードで動く ping

Node

Node

Node

Node

24 of 53

pingServer の分散化

  • 複数のノードに pingServer を渡す
    • 複数存在するノードを検知する必要がある
    • バックエンドのピア探索で手に入る
  • 親プロセスの変更
    • 複数のプロセスを生成する
    • 複数のPingを飛ばす
    • 複数のPongを待ち受ける
  • pingServer そのものは変更不要

Node

Node

Node

Node

25 of 53

pingServer の分散化 | 親プロセスの変更

master :: [NodeId] -> Process ()

master peers = do

ps <- forM peers $ \nid -> do

say $ printf "spawning on %s" (show nid)

spawn nid $(mkStaticClosure 'pingServer)

mypid <- getSelfPid

forM_ ps $ \pid -> do

say $ printf "pinging %s" (show pid)

send pid (Ping mypid)

waitForPongs ps

say "All pongs successfully received"

terminate

waitForPongs :: [ProcessId] -> Process ()

waitForPongs [] = return ()

waitForPongs ps = do

m <- expect

case m of

Pong p -> waitForPongs (filter (/= p) ps)

_ -> say "MASTER received ping" >> terminate

ピア探索で得たノードのリストを

引数に取る

各ノードでサーバプロセスを起動

各プロセスにPing

各プロセスのPongを待つ

Pongが到着し次第リストから引き抜く(空になったら終了)

うっかりPong以外�(つまりPing)が来たら

直ちに終了

main :: IO ()

main = distribMain master Main.__remoteTable

main では [NodeId] を捨てないよう変更(それだけ)

26 of 53

動作例

ping-multi

27 of 53

ステップ3

型付きチャネル

受信ポート

送信ポート

28 of 53

なぜ型付きチャネルが必要か

send/expect を使った通信の問題点

  • expect した時に欲しい型のメッセージ用のキューを探す必要がある
    • expect は存在量化されており、Serializable なデータならなんでも受け取れてしまう

  • 同じ型のメッセージの出所を区別できない
    • 送信者がProcessIdなどの識別子を含める必要がある

expect :: forall a. Serializable a => Process a

29 of 53

解決策: 型付きチャネル

受信者が、送信者を特定できるような�チャネルを予め送りつけてあげればよい

  • クライアントがチャネルを作る
    • どんな型のデータを送るポートかをクライアントが決める
  • 送信者ごとに「専用」のチャネルを作る
    • 受信ポートを持っていれば、送信者は自明になる

data SendPort a -- Serializable のインスタンス

data ReceivePort a -- Serializable のインスタンス... ではない!(後述)

newChan :: Serializable a => Process (SendPort a, ReceivePort a)

sendChan :: Serializable a => SendPort a -> a -> Process ()

receiveChan :: Serializable a => ReceivePort a -> Process a

String

受信ポート

String

送信ポート

server A

server B

server C

client

x

x

y

y

z

z

client

Stringを受け取りたい

newChan

sendChan

receiveChan

30 of 53

型付きチャネルを使う

data Message = Ping (SendPort ProcessId)

deriving (Typeable, Generic)

instance Binary Message

pingServer :: Process ()

pingServer = do

Ping chan <- expect

say $ printf "ping received from %s" (show chan)

mypid <- getSelfPid

sendChan chan mypid

master :: [NodeId] -> Process ()

master peers = do

(中略)

ports <- forM ps $ \pid -> do

say $ printf "pinging %s" (show pid)

(sendport,recvport) <- newChan

send pid (Ping sendport)

return recvport

forM_ ports $ \port -> do

_ <- receiveChan port

return ()

say "All pongs successfully received"

terminate

31 of 53

型付きチャネルを使う 1/4

data Message = Ping (SendPort ProcessId)

deriving (Typeable, Generic)

instance Binary Message

pingServer :: Process ()

pingServer = do

Ping chan <- expect

say $ printf "ping received from %s" (show chan)

mypid <- getSelfPid

sendChan chan mypid

master :: [NodeId] -> Process ()

master peers = do

(中略)

ports <- forM ps $ \pid -> do

say $ printf "pinging %s" (show pid)

(sendport,recvport) <- newChan

send pid (Ping sendport)

return recvport

forM_ ports $ \port -> do

_ <- receiveChan port

return ()

say "All pongs successfully received"

terminate

チャネルを作る

32 of 53

型付きチャネルを使う 2/4

data Message = Ping (SendPort ProcessId)

deriving (Typeable, Generic)

instance Binary Message

pingServer :: Process ()

pingServer = do

Ping chan <- expect

say $ printf "ping received from %s" (show chan)

mypid <- getSelfPid

sendChan chan mypid

master :: [NodeId] -> Process ()

master peers = do

(中略)

ports <- forM ps $ \pid -> do

say $ printf "pinging %s" (show pid)

(sendport,recvport) <- newChan

send pid (Ping sendport)

return recvport

forM_ ports $ \port -> do

_ <- receiveChan port

return ()

say "All pongs successfully received"

terminate

サーバに送信ポートを送る

33 of 53

型付きチャネルを使う 3/4

data Message = Ping (SendPort ProcessId)

deriving (Typeable, Generic)

instance Binary Message

pingServer :: Process ()

pingServer = do

Ping chan <- expect

say $ printf "ping received from %s" (show chan)

mypid <- getSelfPid

sendChan chan mypid

master :: [NodeId] -> Process ()

master peers = do

(中略)

ports <- forM ps $ \pid -> do

say $ printf "pinging %s" (show pid)

(sendport,recvport) <- newChan

send pid (Ping sendport)

return recvport

forM_ ports $ \port -> do

_ <- receiveChan port

return ()

say "All pongs successfully received"

terminate

受信ポートで待つ

34 of 53

型付きチャネルを使う 4/4

data Message = Ping (SendPort ProcessId)

deriving (Typeable, Generic)

instance Binary Message

pingServer :: Process ()

pingServer = do

Ping chan <- expect

say $ printf "ping received from %s" (show chan)

mypid <- getSelfPid

sendChan chan mypid

master :: [NodeId] -> Process ()

master peers = do

(中略)

ports <- forM ps $ \pid -> do

say $ printf "pinging %s" (show pid)

(sendport,recvport) <- newChan

send pid (Ping sendport)

return recvport

forM_ ports $ \port -> do

_ <- receiveChan port

return ()

say "All pongs successfully received"

terminate

送られてきたチャネルで通信する

35 of 53

型付きチャネル | 補足

Q. send/expect を全く使わず書けないの?� A. 書けます → � ただし、pingを送るためのチャネル� を送るためのチャネルが必要になり、ちょっと煩雑

Q. 最初に受信チャネルを送れば良くない?� A. 送れません� 受信チャネルを送れてしまうと、� 送信チャネルは複数の宛先を持つことになる。� チャネルを生成した時点では宛先がわからないという問題もある。

do

(s,r) <- newChan

spawn nid ($(mkClosure 'pingServer) s)

ping <- receiveChan r -- サーバがping送信用チャネルを投げる

(sendpong,recvpong) <- newChan

sendChan ping (Ping sendpong)

receiveChan recvpong

data SendPort a -- Serializable のインスタンス

data ReceivePort a -- Serializable のインスタンス... ではない!

※送信チャネルは複数回送ることができる�(送信者を一意にしたければIDを積む必要が出てくる)

do

(s,r) <- newChan

spawn nid ($(mkClosure 'pingServer) r)

(中略)

receiveChan r -- pong を受け取る

36 of 53

型付きチャネル | チャネルのマージ

折角受信チャネルを分離したのに、マージ?

  • 受信してからできるだけ早く返事したい場合
    • 受信チャネルのリストからメッセージ到着済みのチャネルを選ぶ必要がある
    • 先頭から到着を待っていると、後方のメッセージに対する返信が遅れてしまう
  • どうやる?
    • 毎回リストの前方から順に到着済みかをチェック(後方が不利)
      • mergePortsBiased :: Serializable a � => [ReceivePort a] -> Process (ReceivePort a)
    • 受信毎にリストの先頭を末尾に移動(ラウンドロビン)
      • mergePortsRR :: Serializable a � => [ReceivePort a] -> Process (ReceivePort a)

2

1

3

2

1

3

2

1

2

1

3

3

mergePortsBiased

mergePortsRR

到着を待つわけではない

37 of 53

ステップ4

失敗処理

・・・

38 of 53

失敗処理

  • 分散処理中に計算は様々な理由で失敗する
    • ネットワークの停止
    • ハードウェアの故障
    • ソフトウェアの欠陥…
  • distributed-process はプロセスの失敗の監視と復帰の機能を提供する

withMonitor_ :: ProcessId -> Process a -> Process a

receiveWait :: [Match b] -> Process b

receiveTimeout :: Int -> [Match b] -> Process (Maybe b)

match :: forall a b. Serializable a => (a -> Process b) -> Match b

matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b

プロセスを監視対象にする

(Withパターン)

パターンの型 a は遮蔽される

パターンマッチのリストをまとめて待つ

※withパターンで書きにくい

場合(再帰しているなど)は

monitor/unmonitor を使うこともできる

39 of 53

失敗処理 | 使い方

withMonitor 中ではプロセスの失敗をメッセージとして受け取れる

Pongの返答またはプロセスの失敗を待つ

withMonitor_ pid $ do

send pid (Pong mypid)

receiveWait

[ match $ \(Pong _) -> do

say "pong."

terminate

, match $ \(ProcessMonitorNotification _ref deadpid reason) -> do

say (printf "process %s died: %s" (show deadpid) (show reason))

terminate

]

monitorの参照

失敗したプロセスのID

失敗した理由

data DiedReason

= DiedNormal -- 正常終了

| DiedException !String -- プロセスは例外を投げられて終了

| DiedDisconnect -- ノードへの接続を切られた

| DiedNodeDown -- ノードが死んでいる

| DiedUnknownId -- プロセス、ノード、チャネルの不正な識別子

サーバに(例えば)誤ってPong を送ると、�パターンマッチ失敗の実行時エラーがDiedExceptionとして帰る

40 of 53

失敗処理 | 考え方

  • 分散システムのノードは制御できない状況で失敗しうる
    • 例外ハンドリングにも限界がある
    • 例外が上がってこない場合もある

  • Erlang の考え方 “Let it crash”
    • 防御的になりすぎない
      • 局所的な失敗に対して条件を列挙するくらいなら、ただ死ぬ方がよいこともある
      • 失敗への対処は他の正常なプロセスに任せる
    • 失敗を考えるべき粒度はプロセス
      • システム全体として、プロセスがいつ死んでも仕事を続けられるようにする
    • システムが想定するアプリケーション例外も全部無視してCrashしろ ではない

41 of 53

14章 分散プログラミング

(続き)

2020 年 8 月 23 日

maton

42 of 53

前回のおさらい

  • 分散プログラミング
    • 複数のマシンでプログラムを実行するための技法
    • 性能向上を期待できる一方、共有メモリを持たないため、一貫性を保つことが難しい
  • Cloud Haskell
    • メッセージパッシングによる分散プログラミング
    • ピア検出やプロセスの死の検出などのバックエンド側のサポート
  • 例題
    • 単一ノードPing: サーバプロセスとクライアントプロセスの通信
    • 複数ノードPing: 複数のサーバとの送受信
    • 型付きチャネル: 各サーバに専用のチャネルを送り、送信元やデータ型の特定を容易化
    • 失敗処理: プロセスの失敗を監視し、パターンのリストを使ったAPIで通常の通信結果とプロセス失敗を個別に処理

43 of 53

少し大きな例

分散チャットサーバ

44 of 53

分散チャットサーバ

12.3 チャットサーバの拡張

  • クライアントは複数あるサーバのどれかに接続する
  • 既存の実装を大きく壊さないように拡張する

サーバが持つべき情報

  • サーバ一覧
  • ローカルクライアント
    • 自身に接続しているクライアント
  • リモートクライアント
    • 他のサーバに接続しているクライアント

このサーバ

視点

45 of 53

分散チャットサーバ | メッセージ

クライアント/サーバ間で用いるメッセージ

サーバ間で用いるメッセージ

data Message = Notice String -- サーバからクライアントへのメッセージ

| Tell ClientName String -- クライアントからのプライベートメッセージ

| Broadcast ClientName String -- クライアントからのパブリックメッセージ

| Command String -- ユーザが入力したコマンド

deriving (Typeable, Generic)

instance Binary Message

data PMessage

= MsgServers [ProcessId] -- サーバノード起動時に送られるサーバ一覧

| MsgSend ClientName Message -- クライアントのプライベートメッセージ

| MsgBroadcast Message -- クライアントのパブリックメッセージ

| MsgKick ClientName ClientName -- クライアントのキック情報

| MsgNewClient ClientName ProcessId -- 新たに接続したクライアントの情報

| MsgClientDisconnected ClientName ProcessId -- 接続が切れたクライアントの情報

deriving (Typeable, Generic)

instance Binary PMessage

Message

PMessage

46 of 53

分散チャットサーバ | データ

クライアントデータ

サーバデータ

基本方針

  • 状態はTVarで持つ
  • C/S間はTChanで通信

data Server = Server

{ clients :: TVar (Map ClientName Client)

, proxychan :: TChan (Process ())

, servers :: TVar [ProcessId]

, spid :: ProcessId

}

type ClientName = String

data Client

= ClientLocal LocalClient

| ClientRemote RemoteClient

data RemoteClient = RemoteClient

{ remoteName :: ClientName

, clientHome :: ProcessId

}

data LocalClient = LocalClient

{ localName :: ClientName

, clientHandle :: Handle

, clientKicked :: TVar (Maybe String)

, clientSendChan :: TChan Message

}

47 of 53

分散チャットサーバ | proxychan?

既存のサーバ実装を使いまわすので、基本的な処理はSTMモナド

一方、 遠隔プロセスへの通信はProcessモナドで実行しないといけない

STM 中では直接Processアクションできないので、�Processアクションを代理実行するプロセスに�TChan経由でアクションを渡す

Process

IO

STM

Process

liftIO

atomically

data Server = Server

{ clients :: TVar (Map ClientName Client)

, proxychan :: TChan (Process ())

, servers :: TVar [ProcessId]

, spid :: ProcessId

}

できないので…

48 of 53

分散チャットサーバ | メッセージ送信

準備: ローカルクライアントへの送信

準備: 遠隔サーバへの送信

任意のクライアントへの送信

特定の名前付きクライアントへの送信

その他のアクション(ブロードキャスト、キック、�クライアントの追加・削除等)も同様に書く (略)

sendLocal :: LocalClient -> Message -> STM ()

sendLocal LocalClient{..} msg = writeTChan clientSendChan msg

sendRemote :: Server -> ProcessId -> PMessage -> STM ()

sendRemote Server{..} pid pmsg = writeTChan proxychan (send pid pmsg)

sendMessage :: Server -> Client -> Message -> STM ()

sendMessage server (ClientLocal client) msg =

sendLocal client msg

sendMessage server (ClientRemote client) msg =

sendRemote server (clientHome client) (MsgSend (remoteName client) msg)

data LocalClient = LocalClient

{ localName :: ClientName

, clientHandle :: Handle

, clientKicked :: TVar (Maybe String)

, clientSendChan :: TChan Message

}

data Server = Server

{ clients :: TVar (Map ClientName Client)

, proxychan :: TChan (Process ())

, servers :: TVar [ProcessId]

, spid :: ProcessId

}

sendToName :: Server -> ClientName -> Message -> STM Bool

sendToName server@Server{..} name msg = do

clientmap <- readTVar clients

case Map.lookup name clientmap of

Nothing -> return False

Just client -> sendMessage server client msg >> return True

49 of 53

分散チャットサーバ | チャットサーバ

準備: proxychan に届いたアクションをひたすら実行する Process アクション

サーバプロセス

proxy :: Server -> Process ()

proxy Server{..} = forever $ join $ liftIO $ atomically $ readTChan proxychan

chatServer :: Int -> Process ()

chatServer port = do

server <- newServer []

liftIO $ forkIO (socketListener server port)

spawnLocal (proxy server)

forever $ do m <- expect; handleRemoteMessage server m

handleRemoteMessage :: Server -> PMessage -> Process ()

handleRemoteMessage server@Server{..} m = liftIO $ atomically

$ case m of

MsgSend name msg -> void $ sendToName server name msg

(以下略)

50 of 53

動作例

distrib-chat

51 of 53

練習問題: 分散KVS

次のAPIを持つ分散耐故障性KVSを作れ

Part 1. 単一ノードのための実装

  • createDB でプロセスを立ち上げる
  • get および set でDBプロセスと通信する

Part 2. 分散化

  • キー空間を分割して、ワーカDBに振り分ける
  • ワーカプロセスにDBの処理を移譲する
  • メインプロセスはキーの振り分けのみ行う

Part 3. ワーカプロセスの観測

  • 失敗したら検出できるように

Part 4. 対故障性

  • ワーカを組にして、1つのキーを2つのワーカが持つようにする
  • ワーカが死んだら内部のワーカリストから取り除く

Part X. 新しいノード上にワーカを再始動�…��

type Database = ProcessId

type Key = String

type Value = String

createDB :: [NodeId] -> Process Database

set :: Database -> Key -> Value -> Process ()

get :: Database -> Key -> Process (Maybe Value)

52 of 53

53 of 53

Thanks

  • スライドテーマ
    • Google slides 「コンサルティング提案書」
  • ソースコードハイライタ
    • SlidesCodeHighlighter
  • イラスト
    • いらすとや
      • https://www.irasutoya.com
  • シーケンス図
    • diagrams.net