14章 分散プログラミング
2020 年 8 月 16 日
maton
本章で学ぶこと
分散プログラミングとは何か
= 複数のマシンで�プログラムを走らせるための�プログラミング技法
なぜ複数のマシンで動かすのか
より大きな並列性
1台の性能に制約がある�↓�複数台の並列計算で�パフォーマンス向上
高速なサービス提供
クライアントから�サーバが遠い�↓�クライアント近くにサーバを�配置して通信速度向上
マシンの特性を活かす
異なるリソースを持つ�マシンを有効に扱える
スケール
アウト
クライアント
との通信
応答
性能
計算の
移譲
計算
性能
ストレージ性能
データの
保存
単一マシン(共有メモリ環境)との違い
マシンは壊れやすい
通信コストは大きい
一貫性の保証は難しい
※参考:Latency numbers every programmer should know
https://gist.github.com/hellerbarde/2843375
分散プログラミング in Haskell
Cloud Haskell https://haskell-distributed.github.io
ノード
ノード
プロセス
プロセス
プロセス
メッセージ
?
ノードより下層が単一マシンなのか
複数マシンなのか等は意識しなくてよい
ステップ1
単一ノードで動く ping
Node
Ping
Pong
Ping
Pong
実装に用いるAPI
Control.Distributed.Process in distributed-process
data Process -- Monad と MonadIO のインスタンス
data NodeId -- Eq, Ord, Show, Typeable, Binary のインスタンス
data ProcessId -- Eq, Ord, Show, Typeable, Binary のインスタンス
getSelfNode :: Process NodeId
getSelfPid :: Process ProcessId
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
send :: Serializable a => ProcessId -> a -> Process ()
expect :: Serializable a => Process a
terminate :: Process a
say :: String -> Process ()
※ Closure とか Serializable については後述
Process モナド
| Thread | Process |
生成 | ローカルのみ | ローカル/リモート |
データの共有 | 共有メモリ(MVar/TVar/IORef) | メッセージパッシングのみ |
IO の実行 | STM内不可/それ以外は可 | liftIO すれば可 |
ping の実装 | メッセージ型の定義
import Data.Binary
import Data.Typeable
import GHC.Generics (Generic)
data Message = Ping ProcessId
| Pong ProcessId
deriving (Typeable, Generic) -- <1>
instance Binary Message -- <2>
class (Binary a, Typeable a) => Serializable a
ping の実装 | ping サーバ
pingServer :: Process ()
pingServer = do
Ping from <- expect -- <1>
say $ printf "ping received from %s" (show from) -- <2>
mypid <- getSelfPid -- <3>
send from (Pong mypid) -- <4>
remotable ['pingServer]
expect :: Serializable a
=> Process a
send :: Serializable a
=> ProcessId
-> a
-> Process ()
ping の実装 | ping サーバ | remotable?
⇒ Template Haskell でクロージャのシリアライズを簡易化� 詳しくは @tanakh さんの資料や論文を見てください…
pingServer :: Process ()
pingServer = do
Ping from <- expect -- <1>
say $ printf "ping received from %s" (show from) -- <2>
mypid <- getSelfPid -- <3>
send from (Pong mypid) -- <4>
remotable ['pingServer]
?
子プロセスで実行する関数を送ってる
ping の実装 | 親プロセス
master :: Process ()
master = do
node <- getSelfNode -- <1>
say $ printf "spawning on %s" (show node)
pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>
mypid <- getSelfPid -- <3>
say $ printf "sending ping to %s" (show pid)
send pid (Ping mypid) -- <4>
Pong _ <- expect -- <5>
say "pong."
terminate -- <6>
ping の実装 | 親プロセス 1/6
自身のノードIDを得る (プロセスを作る前段階)
※今回は単一ノードなので親/子プロセスは同一ノード上に作る
master :: Process ()
master = do
node <- getSelfNode -- <1>
say $ printf "spawning on %s" (show node)
pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>
mypid <- getSelfPid -- <3>
say $ printf "sending ping to %s" (show pid)
send pid (Ping mypid) -- <4>
Pong _ <- expect -- <5>
say "pong."
terminate -- <6>
図には無いですが…
getSelfNode :: Process NodeId
ping の実装 | 親プロセス 2/6
pingServer アクションを TH でクロージャ化し、�自身のノードにプロセスを作成する
master :: Process ()
master = do
node <- getSelfNode -- <1>
say $ printf "spawning on %s" (show node)
pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>
mypid <- getSelfPid -- <3>
say $ printf "sending ping to %s" (show pid)
send pid (Ping mypid) -- <4>
Pong _ <- expect -- <5>
say "pong."
terminate -- <6>
spawn :: NodeId
-> Closure (Process ()) -- クロージャ化されたアクション
-> Process ProcessId -- 子pid
say :: String -> Process ()
ping の実装 | 親プロセス 3/6
親 pid を取得する
master :: Process ()
master = do
node <- getSelfNode -- <1>
say $ printf "spawning on %s" (show node)
pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>
mypid <- getSelfPid -- <3>
say $ printf "sending ping to %s" (show pid)
send pid (Ping mypid) -- <4>
Pong _ <- expect -- <5>
say "pong."
terminate -- <6>
getSelfPid :: Process ProcessId
ping の実装 | 親プロセス 4/6
親 pid を Ping に乗せて、子プロセスに送る
master :: Process ()
master = do
node <- getSelfNode -- <1>
say $ printf "spawning on %s" (show node)
pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>
mypid <- getSelfPid -- <3>
say $ printf "sending ping to %s" (show pid)
send pid (Ping mypid) -- <4>
Pong _ <- expect -- <5>
say "pong."
terminate -- <6>
send :: Serializable a => ProcessId -> a -> Process ()
ping の実装 | 親プロセス 5/6
子プロセスが Pong を返してくるのを待つ
master :: Process ()
master = do
node <- getSelfNode -- <1>
say $ printf "spawning on %s" (show node)
pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>
mypid <- getSelfPid -- <3>
say $ printf "sending ping to %s" (show pid)
send pid (Ping mypid) -- <4>
Pong _ <- expect -- <5>
say "pong."
terminate -- <6>
expect :: Serializable a => Process a
ping の実装 | 親プロセス 6/6
Pong が届いたら終了
※今回の場合は明示的に terminate しなくても終了してくれる
master :: Process ()
master = do
node <- getSelfNode -- <1>
say $ printf "spawning on %s" (show node)
pid <- spawn node $(mkStaticClosure 'pingServer) -- <2>
mypid <- getSelfPid -- <3>
say $ printf "sending ping to %s" (show pid)
send pid (Ping mypid) -- <4>
Pong _ <- expect -- <5>
say "pong."
terminate -- <6>
terminate :: Process a
ping の実装 | main 関数
ネットワーク層のバックエンドを初期化�(ヘルパー関数を使うので詳細は割愛)
main :: IO ()
main = distribMain (\_ -> master) Main.__remoteTable
サンプル用のヘルパー関数
ネットワーク層のバックエンドの�初期化関数を呼んでいる
親プロセス
[NodeId] -> Process ()
今回の例では [NodeId] は不要
リモート実行のためのメタデータ
remotable されたアクションのルックアップテーブル�(THによって生成)
動作例
ping
ステップ2
複数ノードで動く ping
Node
Node
Node
Node
pingServer の分散化
Node
Node
Node
Node
pingServer の分散化 | 親プロセスの変更
master :: [NodeId] -> Process ()
master peers = do
ps <- forM peers $ \nid -> do
say $ printf "spawning on %s" (show nid)
spawn nid $(mkStaticClosure 'pingServer)
mypid <- getSelfPid
forM_ ps $ \pid -> do
say $ printf "pinging %s" (show pid)
send pid (Ping mypid)
waitForPongs ps
say "All pongs successfully received"
terminate
waitForPongs :: [ProcessId] -> Process ()
waitForPongs [] = return ()
waitForPongs ps = do
m <- expect
case m of
Pong p -> waitForPongs (filter (/= p) ps)
_ -> say "MASTER received ping" >> terminate
ピア探索で得たノードのリストを
引数に取る
各ノードでサーバプロセスを起動
各プロセスにPing
各プロセスのPongを待つ
Pongが到着し次第リストから引き抜く(空になったら終了)
うっかりPong以外�(つまりPing)が来たら
直ちに終了
main :: IO ()
main = distribMain master Main.__remoteTable
main では [NodeId] を捨てないよう変更(それだけ)
動作例
ping-multi
ステップ3
型付きチャネル
受信ポート
送信ポート
なぜ型付きチャネルが必要か
send/expect を使った通信の問題点
expect :: forall a. Serializable a => Process a
解決策: 型付きチャネル
受信者が、型と送信者を特定できるような�チャネルを予め送りつけてあげればよい
data SendPort a -- Serializable のインスタンス
data ReceivePort a -- Serializable のインスタンス... ではない!(後述)
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
sendChan :: Serializable a => SendPort a -> a -> Process ()
receiveChan :: Serializable a => ReceivePort a -> Process a
String
受信ポート
String
送信ポート
server A
server B
server C
client
x
x
y
y
z
z
client
Stringを受け取りたい
newChan
sendChan
receiveChan
型付きチャネルを使う
data Message = Ping (SendPort ProcessId)
deriving (Typeable, Generic)
instance Binary Message
pingServer :: Process ()
pingServer = do
Ping chan <- expect
say $ printf "ping received from %s" (show chan)
mypid <- getSelfPid
sendChan chan mypid
master :: [NodeId] -> Process ()
master peers = do
(中略)
ports <- forM ps $ \pid -> do
say $ printf "pinging %s" (show pid)
(sendport,recvport) <- newChan
send pid (Ping sendport)
return recvport
forM_ ports $ \port -> do
_ <- receiveChan port
return ()
say "All pongs successfully received"
terminate
型付きチャネルを使う 1/4
data Message = Ping (SendPort ProcessId)
deriving (Typeable, Generic)
instance Binary Message
pingServer :: Process ()
pingServer = do
Ping chan <- expect
say $ printf "ping received from %s" (show chan)
mypid <- getSelfPid
sendChan chan mypid
master :: [NodeId] -> Process ()
master peers = do
(中略)
ports <- forM ps $ \pid -> do
say $ printf "pinging %s" (show pid)
(sendport,recvport) <- newChan
send pid (Ping sendport)
return recvport
forM_ ports $ \port -> do
_ <- receiveChan port
return ()
say "All pongs successfully received"
terminate
チャネルを作る
型付きチャネルを使う 2/4
data Message = Ping (SendPort ProcessId)
deriving (Typeable, Generic)
instance Binary Message
pingServer :: Process ()
pingServer = do
Ping chan <- expect
say $ printf "ping received from %s" (show chan)
mypid <- getSelfPid
sendChan chan mypid
master :: [NodeId] -> Process ()
master peers = do
(中略)
ports <- forM ps $ \pid -> do
say $ printf "pinging %s" (show pid)
(sendport,recvport) <- newChan
send pid (Ping sendport)
return recvport
forM_ ports $ \port -> do
_ <- receiveChan port
return ()
say "All pongs successfully received"
terminate
サーバに送信ポートを送る
型付きチャネルを使う 3/4
data Message = Ping (SendPort ProcessId)
deriving (Typeable, Generic)
instance Binary Message
pingServer :: Process ()
pingServer = do
Ping chan <- expect
say $ printf "ping received from %s" (show chan)
mypid <- getSelfPid
sendChan chan mypid
master :: [NodeId] -> Process ()
master peers = do
(中略)
ports <- forM ps $ \pid -> do
say $ printf "pinging %s" (show pid)
(sendport,recvport) <- newChan
send pid (Ping sendport)
return recvport
forM_ ports $ \port -> do
_ <- receiveChan port
return ()
say "All pongs successfully received"
terminate
受信ポートで待つ
型付きチャネルを使う 4/4
data Message = Ping (SendPort ProcessId)
deriving (Typeable, Generic)
instance Binary Message
pingServer :: Process ()
pingServer = do
Ping chan <- expect
say $ printf "ping received from %s" (show chan)
mypid <- getSelfPid
sendChan chan mypid
master :: [NodeId] -> Process ()
master peers = do
(中略)
ports <- forM ps $ \pid -> do
say $ printf "pinging %s" (show pid)
(sendport,recvport) <- newChan
send pid (Ping sendport)
return recvport
forM_ ports $ \port -> do
_ <- receiveChan port
return ()
say "All pongs successfully received"
terminate
送られてきたチャネルで通信する
型付きチャネル | 補足
Q. send/expect を全く使わず書けないの?� A. 書けます → � ただし、pingを送るためのチャネル� を送るためのチャネルが必要になり、ちょっと煩雑
Q. 最初に受信チャネルを送れば良くない?� A. 送れません� 受信チャネルを送れてしまうと、� 送信チャネルは複数の宛先を持つことになる。� チャネルを生成した時点では宛先がわからないという問題もある。
do
(s,r) <- newChan
spawn nid ($(mkClosure 'pingServer) s)
ping <- receiveChan r -- サーバがping送信用チャネルを投げる
(sendpong,recvpong) <- newChan
sendChan ping (Ping sendpong)
receiveChan recvpong
data SendPort a -- Serializable のインスタンス
data ReceivePort a -- Serializable のインスタンス... ではない!
※送信チャネルは複数回送ることができる�(送信者を一意にしたければIDを積む必要が出てくる)
do
(s,r) <- newChan
spawn nid ($(mkClosure 'pingServer) r)
(中略)
receiveChan r -- pong を受け取る
型付きチャネル | チャネルのマージ
折角受信チャネルを分離したのに、マージ?
2
1
3
2
1
3
2
1
2
1
3
3
mergePortsBiased
mergePortsRR
到着を待つわけではない
ステップ4
失敗処理
・・・
失敗処理
withMonitor_ :: ProcessId -> Process a -> Process a
receiveWait :: [Match b] -> Process b
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
match :: forall a b. Serializable a => (a -> Process b) -> Match b
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
プロセスを監視対象にする
(Withパターン)
パターンの型 a は遮蔽される
パターンマッチのリストをまとめて待つ
※withパターンで書きにくい
場合(再帰しているなど)は
monitor/unmonitor を使うこともできる
失敗処理 | 使い方
withMonitor 中ではプロセスの失敗をメッセージとして受け取れる
Pongの返答またはプロセスの失敗を待つ
withMonitor_ pid $ do
send pid (Pong mypid)
receiveWait
[ match $ \(Pong _) -> do
say "pong."
terminate
, match $ \(ProcessMonitorNotification _ref deadpid reason) -> do
say (printf "process %s died: %s" (show deadpid) (show reason))
terminate
]
monitorの参照
失敗したプロセスのID
失敗した理由
data DiedReason
= DiedNormal -- 正常終了
| DiedException !String -- プロセスは例外を投げられて終了
| DiedDisconnect -- ノードへの接続を切られた
| DiedNodeDown -- ノードが死んでいる
| DiedUnknownId -- プロセス、ノード、チャネルの不正な識別子
サーバに(例えば)誤ってPong を送ると、�パターンマッチ失敗の実行時エラーがDiedExceptionとして帰る
失敗処理 | 考え方
14章 分散プログラミング
(続き)
2020 年 8 月 23 日
maton
前回のおさらい
少し大きな例
分散チャットサーバ
分散チャットサーバ
12.3 チャットサーバの拡張
サーバが持つべき情報
このサーバ
視点
分散チャットサーバ | メッセージ
クライアント/サーバ間で用いるメッセージ
サーバ間で用いるメッセージ
data Message = Notice String -- サーバからクライアントへのメッセージ
| Tell ClientName String -- クライアントからのプライベートメッセージ
| Broadcast ClientName String -- クライアントからのパブリックメッセージ
| Command String -- ユーザが入力したコマンド
deriving (Typeable, Generic)
instance Binary Message
data PMessage
= MsgServers [ProcessId] -- サーバノード起動時に送られるサーバ一覧
| MsgSend ClientName Message -- クライアントのプライベートメッセージ
| MsgBroadcast Message -- クライアントのパブリックメッセージ
| MsgKick ClientName ClientName -- クライアントのキック情報
| MsgNewClient ClientName ProcessId -- 新たに接続したクライアントの情報
| MsgClientDisconnected ClientName ProcessId -- 接続が切れたクライアントの情報
deriving (Typeable, Generic)
instance Binary PMessage
Message
PMessage
分散チャットサーバ | データ
クライアントデータ
サーバデータ
基本方針
data Server = Server
{ clients :: TVar (Map ClientName Client)
, proxychan :: TChan (Process ())
, servers :: TVar [ProcessId]
, spid :: ProcessId
}
type ClientName = String
data Client
= ClientLocal LocalClient
| ClientRemote RemoteClient
data RemoteClient = RemoteClient
{ remoteName :: ClientName
, clientHome :: ProcessId
}
data LocalClient = LocalClient
{ localName :: ClientName
, clientHandle :: Handle
, clientKicked :: TVar (Maybe String)
, clientSendChan :: TChan Message
}
?
分散チャットサーバ | proxychan?
既存のサーバ実装を使いまわすので、基本的な処理はSTMモナド
一方、 遠隔プロセスへの通信はProcessモナドで実行しないといけない
STM 中では直接Processアクションできないので、�Processアクションを代理実行するプロセスに�TChan経由でアクションを渡す
Process
IO
STM
Process
liftIO
atomically
data Server = Server
{ clients :: TVar (Map ClientName Client)
, proxychan :: TChan (Process ())
, servers :: TVar [ProcessId]
, spid :: ProcessId
}
できないので…
分散チャットサーバ | メッセージ送信
準備: ローカルクライアントへの送信
準備: 遠隔サーバへの送信
任意のクライアントへの送信
特定の名前付きクライアントへの送信
その他のアクション(ブロードキャスト、キック、�クライアントの追加・削除等)も同様に書く (略)
sendLocal :: LocalClient -> Message -> STM ()
sendLocal LocalClient{..} msg = writeTChan clientSendChan msg
sendRemote :: Server -> ProcessId -> PMessage -> STM ()
sendRemote Server{..} pid pmsg = writeTChan proxychan (send pid pmsg)
sendMessage :: Server -> Client -> Message -> STM ()
sendMessage server (ClientLocal client) msg =
sendLocal client msg
sendMessage server (ClientRemote client) msg =
sendRemote server (clientHome client) (MsgSend (remoteName client) msg)
data LocalClient = LocalClient
{ localName :: ClientName
, clientHandle :: Handle
, clientKicked :: TVar (Maybe String)
, clientSendChan :: TChan Message
}
data Server = Server
{ clients :: TVar (Map ClientName Client)
, proxychan :: TChan (Process ())
, servers :: TVar [ProcessId]
, spid :: ProcessId
}
sendToName :: Server -> ClientName -> Message -> STM Bool
sendToName server@Server{..} name msg = do
clientmap <- readTVar clients
case Map.lookup name clientmap of
Nothing -> return False
Just client -> sendMessage server client msg >> return True
分散チャットサーバ | チャットサーバ
準備: proxychan に届いたアクションをひたすら実行する Process アクション
サーバプロセス
proxy :: Server -> Process ()
proxy Server{..} = forever $ join $ liftIO $ atomically $ readTChan proxychan
chatServer :: Int -> Process ()
chatServer port = do
server <- newServer []
liftIO $ forkIO (socketListener server port)
spawnLocal (proxy server)
forever $ do m <- expect; handleRemoteMessage server m
handleRemoteMessage :: Server -> PMessage -> Process ()
handleRemoteMessage server@Server{..} m = liftIO $ atomically
$ case m of
MsgSend name msg -> void $ sendToName server name msg
(以下略)
動作例
distrib-chat
練習問題: 分散KVS
次のAPIを持つ分散耐故障性KVSを作れ
Part 1. 単一ノードのための実装
Part 2. 分散化
Part 3. ワーカプロセスの観測
Part 4. 対故障性
Part X. 新しいノード上にワーカを再始動�…��
type Database = ProcessId
type Key = String
type Value = String
createDB :: [NodeId] -> Process Database
set :: Database -> Key -> Value -> Process ()
get :: Database -> Key -> Process (Maybe Value)
Thanks