Riak と Erlang/OTP

Riak は分散型で障害体制を備えたオープンソースのデータベースであり、Erlang/OTP を使った大規模システム構築の好例です。大規模でスケーラブルな分散システムに対する Erlang のサポートのおかげで、Riak は容量とスループットの両方に対する線形スケーラビリティや高可用性といったデータベースではあまり見られない特徴を持っています。

Erlang/OTP ではノード間通信、メッセージキュー、障害検出機構、クライアント-サーバーの抽象化といったものが最初から利用可能なので、Riak のようなシステムを開発するのには理想的なプラットフォームとなっています。その上、Erlang において頻繁に使われるパターンは OTP ビヘイビアと呼ばれるライブラリモジュールで実装されています。OTP ビヘイビアは並列処理とエラー対応のための汎用コードフレームワークであり、並列プログラミングを単純化し、陥りがちな落とし穴から開発者を守ります。ビヘイビアはスーパーバイザ (supervisor) によって監視され、監視ツリー (supervision tree) にまとめられます (スーパーバイザ自身もビヘイビアです)。監視ツリーはパッケージ化されてアプリケーションとなり、Erlang プログラムの構成単位のとなります。

Riak のような完全な Erlang システムは、互いに対話する疎に結びついたアプリケーションの集合体です。アプリケーションには開発者によって書かれるものもあれば、他にも標準の Erlang/OTP ディストリビューションに含まれるもの、他のオープンソースプロジェクトに由来するものなどがあります。アプリケーションはブートスクリプトによって一つずつロード・開始され、このブートスクリプトはアプリケーションとバージョンを並べたリストから作成されます。

ここで開始するアプリケーションのうちどれがリリースに含まれるかはシステムごとに異なります。標準 Erlang ディストリビューションでは、KernelStdLib (Standard Library) というアプリケーションがブートファイルによって起動されます。インストールの種類によってはリリースとソフトウェアアップグレードツールやログ機能が含まれる SASL (Systems Architecture Support Library) アプリケーションが起動される場合もあります。Riak も同様であり、Riak 固有のアプリケーションとその依存アプリケーションの他にも、Kernel, StdLib, SASL といったランタイムの依存アプリケーションが起動されます。Riak の完全な実行可能ビルドにはこういった Erlang/OTP ディストリビューションの標準的な要素も含まれ、コマンドラインから riak start を実行したときに Riak 本体と共に起動されます。Riak には複雑なアプリケーションが多数含まれるので、この章は Riak の完全なガイドとして考えるべきではなく、Riak のソースコードを例として使った OTP の紹介と考えるべきです。説明のため、図や例を省略している部分があります。

Erlang の簡単な紹介

Erlang は並列関数型言語であり、バイトコードにコンパイルされて仮想マシンで実行されます。プログラムは互いに呼び出し合う関数から構成され、関数はたいていプロセス間メッセージのやり取りや I/O、データベース操作といった副作用を持ちます。Erlang において変数は単一代入であり、変数に一度値を代入したらその後は変更できません。Erlang は次の階乗関数の例のようなパターンマッチを多用します:

-module(factorial).
-export([fac/1]).
fac(0) -> 1;
fac(N) when N>0 ->
   Prev = fac(N-1),
   N*Prev.

この例では最初の節 (clause) がゼロの階乗を定義し、二つ目の節が正の整数の階乗を定義します。節の本体 (body) には式が並び、本体の最後の式がその節の値となります。この fac 関数を負の数に対して呼ぶとマッチする節がないのでランタイムエラーとなります。負数のケースを処理しないのは、Erlang において奨励される非防御的プログラミング (non-defensive programming) の例です。

モジュール内部では関数は普通の方法で呼ばれますが、モジュールの外側ではモジュールの名前が最初に付き、factorial.fac(3) という形で呼ばれます。引数の数 (アリティ) が異なる同じ名前の関数を定義することもできます。上述の factorial モジュールの export ディレクティブでは、アリティ 1 の関数 facfac/1 と表記されています。

Erlang はタプルとリストをサポートします (タプルは直積型 (product type) とも呼ばれます)。タプルは中括弧で囲って {ok, 37} のように表し、タプルの要素には位置でアクセスします。レコードは別の型であり、固定された数の要素を持ち、各要素は名前によってアクセスや操作を受けます。レコードは -record(state, {id, msg_list = []}) のように定義し、Var = #state{id=1} のようにインスタンスを作成し、Var#state.id のように要素を取得します。可変個数の要素を扱うときにはリストを使います。リストは大括弧を使って [23,34] のように定義します。[X|Xs] という表記は空でないリストとマッチし、ヘッドが X でテールが Xs となります。小文字で始まる識別子はアトムであり、それ自身が値となります。例えば {ok, 37}ok はアトムです。アトムは関数の戻り値を区別するために使われることが多く、例えば ok の他にも {error, "Error String"} のような形の返り値が考えられます。

Erlang のプロセスは独立したメモリ空間で並列に動作し、メッセージパッシングを使って相互に通信します。プロセスはアプリケーションの様々な部分で使用され、例えばデータベースへのゲートウェイ、プロトコルスタックのハンドラ、他のプロセスからのトレースメッセージの管理などを行います。こういったプロセスは様々なリクエストを処理しますが、その方法には共通点があります。

プロセスは仮想マシン (VM) 内部にのみ存在するため、単一の VM が数百万個のプロセスを同時に実行することも可能です。Riak はこの特性を大いに利用しており、例えばデータベースへのリクエスト (読み込み、書き込み、削除) は個別のプロセスとしてモデル化されます。OS レベルのスレッド実装の多くでは、このアプローチは不可能でしょう。

プロセスはプロセス識別子 (process identifier, PID) によって識別されますが、別名を付けて登録することもできます (ただしこの機能は寿命の長い“静的な”プロセスにのみ用いるべきです)。プロセスに別名を付けて登録すると、他のプロセスが PID を知ることなくそのプロセスにメッセージを送れるようになります。プロセスは spawn(Module, Function, Arguments) という組み込み関数 (built-in function, BIF) で生成されます。BIF とは VM に組み込まれている関数のことであり、純粋な Erlang では実行が不可能だったり遅すぎたりする処理を実行するのに使われます。BIF spawn/3 のパラメータは Module, Function, Arguments リストの三つです。この呼び出しは新しく生成されたプロセスの PID を返し、副作用としてそのモジュールにあるその関数を指定された引数で実行する新しいプロセスを生成します。

Pid ! Msg とすると Msg というメッセージを PID が Pid のプロセスに送ることができます。プロセスは自分自身の PID を BIF self を使って知ることができ、この情報を使えば他のプロセスと元のプロセスの間で通信が可能になります。あるプロセスが {ok, N} または {error, Reason} という形のメッセージの受信を期待しているとすると、このような場合には receive 式が利用できます:

receive
   {ok, N} ->
      N+1;
   {error, _} ->
      0
end

この式の値はパターンにマッチした節によって決まります。変数の値がパターンマッチで必要とされない場合には、上記のようにアンダースコアをワイルドカードとして利用できます。

プロセス間のメッセージパッシングは非同期であり、プロセスが受け取ったメッセージはプロセスの受信箱 (mailbox) に受け取った順番で溜まっていきます。上述の receive 式が実際に実行される所を考えてみると、受信箱の最初の要素が {ok, N} または {error, Reason} であれば、対応する値が返ります。もし受信箱の最初の要素がどちらの形でもない場合、その要素は受信箱に留め置かれ、二つ目の要素が同様に処理されます。マッチするメッセージが無い場合には、recieve はマッチするメッセージが届くまで待ちます。

プロセスが終了する理由は二つあります。一つは実行するコードが無くなった場合で、このときプロセスは 通常 (normal) の理由で終了したと言います。そうではなくプロセスがランタイムエラーに遭遇した場合は、そのプロセスは 異常 (non-normal) な理由で終了したと言います。プロセスの終了は基本的に他のプロセスに影響を及ぼしませんが、リンクされているプロセスは別です。プロセスは BIF link(Pid) または spawn_link(Module, Function, Arguments) を使って互いをリンクでき、あるプロセスが終了するとリンクされているプロセス全てに終了シグナルが送られます。終了理由が異常である場合、このシグナルを受け取ったプロセスは終了シグナルをさらに伝搬させてから自身を終了させます。BIF process_flag(trap_exit, true) を呼びだせば、終了シグナルを Erlang メッセージとして受信してプロセスを終了しないようにすることも可能です。

Riak はヘルパープロセスの状態の監視に終了シグナルを使います。このヘルパープロセスはリクエストを生成する有限状態機械によって生成され、クリティカルでない作業を実行します。ヘルパープロセスが異常終了すると、そのときに発生する終了シグナルを受け取った親プロセスはエラーを無視するかプロセスを再起動します。

プロセスの骨格

プロセスには作られた目的に関わらず共通のパターンがあると前に説明しました。これについて説明しましょう。プロセスはまず最初に生成され、そのとき場合によっては別名が登録されます。新しく生成されたプロセスが最初に行うのはプロセスのループデータ (loop data) の初期化です。ループデータは通常組み込み関数 spawn に渡される引数からプロセスの初期化時に作られます。ループデータはプロセスの状態を表す変数 (たいていはレコード) に格納され、その後 receive-evaluation 関数に渡されます。この関数はメッセージを受け取り、処理し、状態を更新し、新しい状態を引数として末尾再帰呼び出しを行うというループを実行します。この関数が「停止」メッセージを受け取とると、プロセスは自身のクリーンアップを行ってから終了します。

プロセスに課された仕事が何であれ、以上のパターンは繰り返し現れます。次はこのパターンに合致するプロセスの間で異なる部分を見てみましょう:

つまり、一般的な処理の骨格は確かに存在するものの、それを補うプロセスの仕事に直接結び付いた専用の処理もきちんと存在するということです。この骨格をテンプレートとして使えば、サーバー、有限状態機械、イベントハンドラ、スーパーバイザといった Erlang プロセスを作ることができます。ただしこういったパターンは毎回最初から実装するものではなく、ビヘイビア (behavior) と呼ばれるライブラリモジュールにまとめられています。ビヘイビアは OTP というミドルウェアの一部です。

OTP ビヘイビア

Riak に取り組んでいる開発者の中心チームは世界中の十数もの地点に散らばっています。このような状況では、緊密に連携を取りながら同じテンプレートを使って取り組まない限り、特殊な境界条件や並列性に関するエラーに対処できないクライアント-サーバーの実装がいくつも作られることになります。クライアントとサーバーのクラッシュへの対処は一貫しないでしょうし、リクエストに対するレスポンスが「内部のメッセージプロトコルに準拠している何らかのメッセージ」ではなくて正しいレスポンスであることの保証は無い可能性が高いです。

OTP は Erlang ライブラリと設計原則の集合体であり、頑健なシステムを開発するための出来合いのツールを提供します。OTP のパターンやライブラリの多くは「ビヘイビア」の形で提供されます。

OTP ビヘイビアはライブラリモジュールを提供することで上述の問題を解決します。並列デザインパターンの中でも特によく見られるものが実装されており、さらにエラーや特殊ケースをプログラマの気付かないところで一貫した方法で処理します。そのため OTP ビヘイビアを基本要素として使えば、産業グレードのシステムの設計・構築が可能です。

はじめに

OTP ビヘイビアは Erlang/OTP ディストリビューションに付属する stdlib アプリケーションのライブラリモジュールとして提供されます。プログラマが書くアプリケーションに固有な部分のコードは別のモジュールに分離され、各ビヘイビアで事前に標準化・定義されるコールバック関数を通じて呼び出されます。このコールバックを定義するモジュールにはユーザーの望む機能を提供するのに必要なコードが全て含まれます。

OTP ビヘイビアにおけるプロセスにはワーカープロセスとスーパーバイザの二つがあります。ワーカープロセスは実際の処理を行い、スーパーバイザはワーカーと他のスーパーバイザの監視を行います。図中の楕円で表されるワーカービヘイビアの例としては、サーバー、イベントハンドラ、有限状態機械などがあります。図中の四角形で表されるスーパーバイザはワーカーおよび他のスーパーバイザからなる自身の子を監視し、全体が監視ツリーを構成します。

OTP Riak の監視ツリー
図 15.1. OTP Riak の監視ツリー

監視ツリーはパッケージ化されてアプリケーションというビヘイビアとなります。OTP におけるアプリケーションは Erlang システムの構成単位であるだけではなく、再利用可能なコンポーネントをパッケージ化する手段でもあります。Riak のような産業グレードのシステムは疎に結合した、場合によっては分散したアプリケーションから構成されます。こういったアプリケーションには標準 Erlang ディストリビューションに含まれるものもあれば、Riak に必要な機能を実現するためのものもあります。

OTP アプリケーションの例としては、Corba ORB や Simple Network Management Protocol (SNMP) エージェントがあります。OTP アプリケーションは再利用可能なコンポーネントであり、ライブラリモジュールをスーパーバイザとワーカープロセスと共にパッケージ化したものです。ここから先では、アプリケーションという言葉で OTP アプリケーションのことを指すことにします。

ビヘイビアモジュールにはそのビヘイビアタイプが使う汎用コードが全て含まれます。ユーザーが自分でビヘイビアモジュールを実装することもできますが、Erlang/OTP ディストリビューションに付属するモジュールが頻繁に使われるデザインパターンのほとんどを実装しているので、実際にそうすることはまれです。ビヘイビアモジュールで提供される汎用的な機能には次のようなものがあります:

ループデータとはビヘイビアが呼び出しの間で保持する必要がある変数のことです。呼び出しのたびに更新されたループデータが返り、この新しいループデータは次の呼び出しの引数に渡されます。ループデータはビヘイビアの状態 (behavior state) と呼ばれることもあります。

汎用サーバーアプリケーションがビヘイビアを利用するために定義するコールバックモジュールに含まれる機能には次のようなものがあります:

汎用サーバー

クライアント/サーバーの動作を実装する汎用サーバーは、標準ライブラリアプリケーションの gen_server ビヘイビアで定義されています。汎用サーバーの説明には riak_core アプリケーションの riak_core_node_watcher.erl モジュールを利用します。これは Riak クラスタ内で稼働中のサブサービスとノードを追跡・報告するサーバーであり、モジュールのヘッダーとディレクティブは次のようになっています:

-module(riak_core_node_watcher).
-behavior(gen_server).
%% API
-export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0,
         services/1,nodes/1,avsn/0]).
%% gen_server のコールバック
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]).

-record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref,
                bcast_mod={gen_server, abcast}}).

汎用サーバーが定義されていることは -behavior(gen_server) ディレクティブから容易に分かります。このディレクティブを読んだコンパイラは、全てのコールバックが正しくエクスポートされていることを確認します。定義されているレコードタイプはサーバーのループデータで使います。

サーバーの起動

gen_server ビヘイビアを使った場合には、プロセスの生成に BIF の spawnspawn_link ではなくて gen_server:startgen_server:start_link 関数を使うことになります。spawnstart の主な違いは start が同期的である点です。spawn の代わりに start を使うとプロセスが初期化されるまでワーカーの PID が返らないので、ワーカープロセスの生成が決定的になり、予期するのが難しい競合状態を避けることができます。start は次のいずれかの形で呼び出します:

gen_server:start_link(ServerName, CallbackModule, Arguments, Options)
gen_server:start_link(CallbackModule, Arguments, Options)

ServerName{local, Name} または {global, Name} という形をしたタプルであり、プロセスが登録される場合にはこの値に応じてローカルまたはグローバルな Name が設定されます。グローバルな名前を設定すると、作成されるサーバーは Erlang の分散ノードのクラスターをまたいで透過的にアクセス可能になります。プロセスを登録せずに PID を使って参照する場合には、引数を省略して start_link/3 または start/3 関数を呼び出します。CallbackModule はコールバック関数が属するモジュールを表します。Arguments は合法な Erlang 項であり、この値が init/1 に渡されます。Options にはメモリ管理に関するフラグ fullsweep_afterheapsize およびトレースやデバッグに関するフラグを設定します。

今考えている例では次のようにして start_link/4 を呼び、プロセスをコールバックモジュールと同じ名前で登録します。このときに使われているのは ?MODULE というマクロ呼び出しであり、このマクロはコンパイル時にプリプロセッサによってモジュールの名前に置き換えられます。ユーザーのビヘイビアを実装されているコールバックモジュールと同じ名前にするというのは良いプラクティスです。init/1 に渡す引数は無いので、空のリストを渡します。オプションリストも空です:

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

start_link 関数と start 関数の違いは明らかだと思いますが、start_link を使った場合にはプロセスをその親 (たいていはスーパーバイザ) にリンクするのに対し、start はしない点です。OTP ビヘイビアをその親とリンクさせるのは OTP ビヘイビア自身の責任なので、この点には注意が必要です。start 関数はビヘイビアの動作をシェルからテストするのによく使われます。こうしておけば、型エラーが起きてシェルプロセスがクラッシュしてもビヘイビアは影響を受けません。start 関数と start_link 関数の仲間は全て {ok, Pid} という形の値を返します。

start 関数と start_link 関数は新しいプロセスを生成し、CallbackModule のコールバック関数 init/1 に引数 Arguments を与えて呼び出します。init 関数はサーバーの LoopData を初期化し、{ok, LoopData} という形のタプルを返さなければなりません。この LoopData がループデータの最初のインスタンスであり、その後コールバック関数に渡されます。init 関数に渡した引数を保存する必要がある場合には、LoopData 変数を使って行います。Riak のノード監視サーバーにおける LoopData は、state 型のレコードのデフォルト値を schedule_broadcast/1 関数に渡した時の返り値です:

init([]) ->

    %% ノードの up/down イベントを監視する。
    net_kernel:monitor_nodes(true),

    %% ノードの状態を追跡する ETS テーブルをセットアップする。
    ets:new(?MODULE, [protected, named_table]),

    {ok, schedule_broadcast(#state{})}.

start_link/4 関数を呼び出すのはスーパーバイザプロセスですが、コールバック init/1 を呼び出すのはちょうど新しく生成されたばかりの別プロセスです。このサーバーの役割は Riak のサブサービスの可用性を監視、記録、ブロードキャストすることなので、初期化処理ではそのようなイベントを通知するよう Erlang ランタイムの設定を変更し、この情報を保存するためのテーブルをセットアップします。このテーブルが存在していないとサーバーへの全ての通信が失敗することになるので、このセットアップは初期化中に行う必要があります。また init 関数では必要なことだけを行い、処理は最小化しなければなりません。init は同期的に呼び出されるために、この関数が値を返すまで他のどんな逐次的なプロセスも開始できないからです。

メッセージのやり取り

サーバーに同期メッセージを送信するときは gen_server:call/2 関数を使い、非同期メッセージには gen_server:cast/2 を使います。まずは Riak のサービス API から次の二つの関数に注目します (別の関数にも後で触れます)。この二つの関数はクライアントプロセスによって呼ばれ、コールバックモジュールと同じ名前で登録されたサーバープロセスに対する同期メッセージを送信します。サーバーに送るデータの検証はクライアントサイドで行うべきであることに注目してください。クライアントが不正な情報を送った場合、サーバーはその時点で終了します。

service_up(Id, Pid) ->
    gen_server:call(?MODULE, {service_up, Id, Pid}).

service_down(Id) ->
    gen_server:call(?MODULE, {service_down, Id}).

この関数によって送信されたメッセージを受け取った gen_server プロセスは、コールバック関数 handle_call/3 を呼び出してメッセージを送られた順に処理します:

handle_call({service_up, Id, Pid}, _From, State) ->
    %% アクティブサービスの集合をローカルで更新する
    Services = ordsets:add_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% このサービスに対する mref を削除する。
    delete_service_mref(Id),

    %% このサービスの Pid に対するモニターをセットアップする。
    Mref = erlang:monitor(process, Pid),
    erlang:put(Mref, Id),
    erlang:put(Id, Mref),

    %% ローカルの ETS テーブルを更新し、ブロードキャストする。
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};

handle_call({service_down, Id}, _From, State) ->
    %% アクティブサービスの集合をローカルで更新する
    Services = ordsets:del_element(Id, State#state.services),
    S2 = State#state { services = Services },

    %% このサービスに対する mref を削除する。
    delete_service_mref(Id),

    %% ローカルの ETS テーブルを更新し、ブロードキャストする。
    S3 = local_update(S2),
    {reply, ok, update_avsn(S3)};

二つのコールバック関数の返り値に注目してください。制御用のタプル replyget_server の汎用コードに、タプルの第二要素 (両方の場合において ok) がクライアントへの返信であることを伝えます。タプルの第三要素は新しい State であり、サーバーの次の反復で handle_call/3 関数に渡されます。両方の場合において、新しい状態は更新された稼働中のサービスの集合です。引数の _From はユニークなメッセージの参照とクライアントのプロセス ID を含んだタプルです。このタプルはこの章で触れない一部のライブラリ関数で利用されますが、ほとんどの場合で必要になることはありません。

ライブラリモジュール get_server には裏側で動く組み込みの保護機構が多くあります。例えばクライアントが同期メッセージをサーバーに送り、レスポンスが 5 秒以内に返ってこない場合には、call/2 関数を実行しているプロセスは終了します。この長さは gen_server:call(Name, Message, Timeout) を使ってオーバーライドできます。ここで Timeout は数値 (ミリ秒) またはアトム infinity です。

このタイムアウトメカニズムはもともとデッドロックを避けるために追加されました。サーバー同士が誤って互いを呼び出しあったとしてもデフォルトのタイムアウト時間が経てば終了するようにするためです。クラッシュレポートはログに書き込まれ、エラーはデバッグと修正を受けることになります。多くのアプリケーションは 5 秒のタイムアウトで上手く行きますが、極端に高い負荷があるときにはタイムアウトの値を大きくしたり、場合によっては infinity を使った方が良いこともあります。この値はアプリケーションごとに異なります。例えば Erlang/OTP のクリティカルなコードは全て infinity を使っていますが、Riak は場所によってタイムアウトの値が異なり、密に結合した内部コードでは infinity が多く、Riak と対話するクライアントがタイムアウトを許可する設定になっている場合にはユーザーから渡されたパラメータが使われます。

gen_server:call/2 関数の保護機構は存在しないサーバーに対してメッセージを送った場合やサーバーが返答を返さずにクラッシュした場合にも働き、両方の場合において呼び出したプロセスが終了します。生の Erlang においては receive 節でパターンマッチされないメッセージを送るのはバグであり、メモリリークが発生します。Riak にはこれを和らげるための方策が二つあり、両方とも「全てとマッチする」マッチング節を利用します。一つはユーザーが生成した可能性のあるメッセージを受け取るケースで、このときはメッセージを黙って捨てます。もう一つは Riak の内部からのメッセージを受け取るケースで、これはバグの存在を示しているので、エラー報告用の内部クラッシュレポートを生成してからメッセージを受け取ったワーカープロセスを再起動します。

非同期メッセージの送信も同じように動作します。汎用サーバーに送られた非同期メッセージはコールバック関数 handle_cast/2 で処理され、この関数は {reply, NewState} という形のタプルを返します。サーバーからのリクエストを使わない場合、およびサーバーが処理できる以上のメッセージが生成されても構わない場合には非同期メッセージが使われます。返答を利用をしない場合でも次のリクエストを送る前に現在のリクエストの終了を確認したいときには同期的な gen_server:call/2 関数を使い、返り値の ok を確認します。例えば Riak の処理が追い付かないほどの速度でデータベースのエントリを生成するケースを想像してください。このような状況では、非同期通信を使うとプロセスの受信箱が満杯になってノードがメモリを使い切ってしまう危険があります。Riak は gen_server の同期通信が持つメッセージの逐次化という機能を使って負荷を調整しており、前のリクエストを処理し終わってから次のリクエストに取り掛かるようになっています。こうすることで速度を調整するための複雑なコードも不要になります。つまり gen_server は並列に動作するだけではなく、逐次化点 (serialization point) を配置することも可能なのです。

サーバーの停止

どうやってサーバーを停止するのでしょうか? コールバック関数 handle_call/3handle_cast/2 において {reply, Reply, NewState} または {noreply, NewState} を返す代わりに、{stop, Reason, Reply, NewState} または {stop, Reason, NewState} を返すことで行います。この返り値はサーバーに対する停止メッセージなどによって生成されます。汎用サーバーは ReasonState を含んだ stop タプルを受け取ると、コールバック関数 terminate(Reason, State) を実行します。

この terminate 関数はサーバーの State やシステムの永続データのクリーンアップ用コードを挿入する自然な位置です。次の例では、サーバーノードがもはや生きておらず接続を監視していないことを最後のメッセージとして他のピアに伝えます。Statestatuspeers というフィールドを持つレコードがあると仮定しています:

terminate(_Reason, State) ->
    %% 私たちがシャットダウンすることを他のピアに伝える。
    broadcast(State#state.peers, State#state { status = down }).

ビヘイビアのコールバックをライブラリ関数として使ってユーザーのプログラムのどこかから呼び出すのは非常に悪いプラクティスです。例えばループデータの初期値を取得するために riak_core_node_watcher:init(Args) を他のモジュールから使っては絶対にいけません。そのような値の取得はサーバーへの同期通信を通じて行われるべきです。ビヘイビアのコールバック関数を呼んでいいのはシステム内のイベントに対して反応したライブラリモジュールだけであり、ユーザーは呼んではいけません。

その他のワーカービヘイビア

同じ考え方を使えばたくさんのワーカービヘイビアが実装可能であり、これまでにいくつも実装されています。

有限状態機械

有限状態機械 (finite state machine, FSM) はビヘイビアモジュール get_fsm で実装されており、データ通信システムのプロトコルスタックを実装する上で重要な役割を担います (もともと Erlang はこの分野の問題のために開発されました)。状態はコールバック関数として定義され、この関数は次の State および更新されたループデータを含んだタプルを返します。有限状態機械のコールバックモジュールは他にも init, terminate, handle_info といったコールバック関数をエクスポートします。

有限状態機械はデータ通信システムだけで使われるわけではもちろんありません。例えば Riak ではリクエスト処理に使われています。クライアントが get, put, delete のようなリクエストを発信すると、そのクライアントの通信するプロセスは対応する gen_fsm ビヘイビアを実行するプロセスを生成します。例えば get リクエストを処理するのは riak_kv_get_fsm であり、データの取得とクライアントプロセスへの送信を行います。FSM プロセスはどのノードからデータを取得すべきかを決定するまでに様々な状態を通過し、その間にデータを尋ね、メッセージを送り、データ、エラー、タイムアウトを受信します。

イベントハンドラ

イベントハンドラとイベントマネージャは gen_event ライブラリモジュールによって実装されるビヘイビアです。ここでのアイデアは、ある種のイベントを受信する中央中継所を作ると便利だろうというものです。イベントは同期的または非同期的に送信され、受信されたイベントに対して事前に定義された処理が実行されます。イベントに対する処理の例としてはファイルにログを記録する、SMS でアラームを送信する、統計情報を収集するなどがあります。こういった動作は別々のコールバックモジュールに独立したループデータと共に定義されます (ループデータは呼び出しの間で保存されます)。ハンドラはイベントマネージャごとに追加、削除、更新することが可能であり、実際に動作するときにはイベントマネージャごとに多数のコールバックモジュールが存在することになります。イベントハンドラプロセスが受け取るデータの例としてはアラーム、ライブトレースデータ、設備に関するイベント、簡単なログなどがあります。

Riak で gen_event を使っているものの一つが、Riak クラスターのメンバーシップや分割の割り当て (「リングイベント」) の管理です。Riak ノード内のプロセスは gen_event ビヘイビアを実装する riak_core_ring_events のインスタンスで関数を登録します。そのノードのリングを管理する中央プロセスがクラスター全体のメンバーシップレコードを変更するとイベントが発火し、全てのコールバックモジュールで登録された関数が呼び出されます。こうすると Riak の様々な部分が Riak の最も中心的なデータ構造の変更に対応できるようになり、そのときデータ構造を管理する中央プロセスが複雑になることもありません。

並列パターンと通信パターンの多くはこれまでに説明した gen_server, gen_fsm, gen_event という三つの重要なビヘイビアで扱えます。しかし大規模なシステムでは、開発が進むにつれアプリケーション固有のパターンが生じ、新しいビヘイビアを作成するのに十分な理由となることがあります。Riak にもそのようなビヘイビアが一つあり、仮想ノードの実装方法を定める riak_core_vnode がそれです。仮想ノードとは Riak において重要なストレージの抽象化であり、リクエストを生成する FSM とキーバリューストレージが対話するときの一貫したインターフェースを定義します。コールバックモジュールに対するインターフェースは behavior_info/1 関数として次のように指定されます:

behavior_info(callbacks) ->
    [{init,1},
     {handle_command,3},
     {handoff_starting,2},
     {handoff_cancelled,1},
     {handoff_finished,2},
     {handle_handoff_command,3},
     {handle_handoff_data,2},
     {encode_handoff_item,2},
     {is_empty,1},
     {terminate,2},
     {delete,1}];

これは riak_core_vnode に含まれる behavior_info/1 関数であり、{CallbackFunction, Arity} という形のリストはコールバック関数が従うべき制約を定義しています。仮想ノードの実装はこの形の関数をエクスポートしなければならず、忘れた場合にはコンパイラが警告を出します。OTP ビヘイビアを自分で実装するのは比較的単純です。proc_libsys モジュールを使ってコールバック関数を定義したら、システムメッセージを処理して終了した場合に備えて親を監視する特定の関数を呼んで開始します。

スーパーバイザ

スーパーバイザというビヘイビアの仕事は子を監視すること、そして子が終了したときには事前に設定された規則に基づいて処理を行うことです。子はスーパーバイザとワーカープロセスの両方からなります。このため Riak は成功するケースにだけ集中し、ソフトウェアのバグや破損データおよびシステムエラーをスーパーバイザに任せ、一貫した対処を行うことができます。Erlang の世界ではこの非防御的なプログラミング手法のことをよく「クラッシュさせる (let it crash)」戦略と呼びます。監視ツリーを構成する子にはスーパーバイザーとワーカープロセスの両方を含むことができます。ワーカープロセスとは gen_fsm, gen_server, gen_event といった OTP ビヘイビアのことです。Riak チームは細かいエラーケースを考えずに済むので小さいコードベースで作業でき、さらにビヘイビアのおかげで固有なコードだけを書けば済むことから、開発初期のコードベースも小さくなります。多くの Erlang アプリケーションと同じように Riak にもトップレベルのスーパーバイザがあり、関連する機能のプロセスのグループに対するサブスーパーバイザもいくつかあります。例えば Riak の仮想ノード、TCP ソケットリスナー、クエリレスポンスマネージャなどのグループです。

スーパーバイザのコールバック関数

スーパーバイザビヘイビアがどのように実装されているかの説明には riak_core_sup.erl モジュールを使います。Riak のコアスーパーバイザとは Riak のコアアプリケーションのトップレベルのスーパーバイザのことです。これはワーカーとスーパーバイザの静的な集合を管理するのに加えて、アプリケーション固有の設定ファイルで定義される RESTful API の HTTP と HTTPS バインディングを処理する可変個数のワーカーも管理します。gen_servers と同じようにスーパーバイザのコールバックモジュールは -behavior(supervisor) ディレクティブを含む必要があります。コールバックモジュールは start 関数または start_link 関数を使って開始され、そのときの引数には省略可能な ServerName, CallBackModule, コールバック関数 init/1 に渡される Argument があります。

次に示す riak_core_sup.erl からの抜粋には behavior ディレクティブと後述するマクロ、そして start_link/3 関数が含まれます。

-module(riak_core_sup).
-behavior(supervisor).
%% API
-export([start_link/0]).
%% スーパーバイザコールバック
-export([init/1]).
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

スーパーバイザを起動すると新しいプロセスが生成され、コールバックモジュール riak_core_sup.erl にある init/1 関数が呼ばれます。ServerName{local, Name} または {global, Name} という形をしたタプルであり、Name がスーパーバイザの登録名を表します。今の例では登録名とコールバックモジュールは両方とも ?MODULE マクロから展開された riak_core_sup です。そして init/1 関数にはヌル値として空のリストを渡します。init 関数はスーパーバイザが持つ唯一のコールバック関数であり、次の形をしたタプルを返します:

{ok,  {SupervisorSpecification, ChildSpecificationList}}

ここで SupervisorSpecification は 3-タプル {RestartStrategy, AllowedRestarts, MaxSeconds} であり、プロセスのクラッシュと再起動の扱いについての情報が含まれます。RestartStrategy は次に示す三つのパラメータのどれかであり、異常終了がビヘイビアの兄弟にどのように影響するかを定めます:

AllowedRestartsMaxSeconds は、MaxSeconds の間に AllowedRestarts 個以上の子が終了したらスーパーバイザ自身 (および全ての子) も終了するということを表します。終了するスーパーバイザからは終了シグナルが親に送られ、再起動戦略に応じて処理されます。一定の数よりも多い再起動が起こったスーパーバイザを終了させることで、再起動シグナルが循環するといったこのレベルでは解決できない問題がさらに悪化するのを防いでいます。そのような場合は他の部分木に属するプロセスが問題であることが多く、多数のプロセスが終了したのを確認したスーパーバイザは影響を受けた部分木を終了してから再起動することになります。

riak_core_sup.erl モジュールにあるコールバック関数 init/1 の最後の行を見ると、このスーパーバイザは one-for-one 戦略を使っている、つまり子プロセス同士が関わりを持たないこと、そして 10 個の子プロセスが再起動すると自身を再起動させるようになっていることが分かります。

ChildSpecificationList にはスーパーバイザが開始・監視する子を終了および再起動のための情報と共に指定します。次の形をしたタプルのリストです:

{Id, {Module, Function, Arguments}, Restart, Shutdown, Type, ModuleList}

Id は作成しているスーパーバイザの識別子です。Module, Function, Arguments はエクスポートされた関数であり、ここでは {ok, Pid} の形のタプルを返すビヘイビアの start_link 関数となっています。Restart はプロセスの終了時に何を行うかを定め、次の値のどれかを取ります:

Shutdown にはビヘイビアの再起動時およびシャットダウン時にプロセスを終了するときに実行する terminate 関数に許される実行時間 (ミリ秒) を指定します。アトム infinity を使うこともできますが、スーパーバイザ以外のビヘイビアでは使わない方が良いとされています。Type は汎用サーバーやイベントハンドラ、有限状態機械を表すアトム worker か、そうでなければアトム supervisor です。ビヘイビアを実装するモジュールのリスト ModuleListType は共に実行時ソフトウェアアップグレード手続きにおけるプロセスの制御と停止に使われます。ChildSpecificationList に含まれ監視ツリーの一部となれるのは、既存のビヘイビアまたはユーザーが実装したビヘイビアだけです。

以上の知識があれば、プロセス間の依存関係や障害耐性のしきい値、障害が拡大したときの処理などを決める再起動戦略を共通のアーキテクチャに基づいて定式化できます。また次に示す riak_core_sup.erl モジュールの init/1 も理解できるはずです。最初にあるのは CHILD マクロで、これはコールバックモジュールを受け取って子プロセス用の ChildSpecification を作成します。そのとき子プロセスは permanent であり、シャットダウン時間は 5 秒となるよう設定します。次のコードを見て何が起きているか分かるかどうか確認してみてください:

-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).

init([]) ->
    RiakWebs = case lists:flatten(riak_core_web:bindings(http),
                                  riak_core_web:bindings(https)) of
                   [] ->
                       %% app.config が更新されていない場合は
                       %% 古い設定をチェックする
                       riak_core_web:old_binding();
                   Binding ->
                       Binding
               end,

    Children =
                 [?CHILD(riak_core_vnode_sup, supervisor),
                  ?CHILD(riak_core_handoff_manager, worker),
                  ?CHILD(riak_core_handoff_listener, worker),
                  ?CHILD(riak_core_ring_events, worker),
                  ?CHILD(riak_core_ring_manager, worker),
                  ?CHILD(riak_core_node_watcher_events, worker),
                  ?CHILD(riak_core_node_watcher, worker),
                  ?CHILD(riak_core_gossip, worker) |
                  RiakWebs
                 ],
    {ok, {{one_for_one, 10, 10}, Children}}.

このスーパーバイザによって作られる Children のほとんどは静的に定義されるワーカー (vnode_sup ではスーパーバイザ) です。例外は RiakWebs で、これは Riak の設定ファイルの HTTP に関する部分から動的に決定されます。

ライブラリアプリケーションを除く全ての OTP アプリケーションは独自の監視ツリーを持ちます。Riak の監視ツリーでは様々なアプリケーションが一番上の Erlang ノードの下で実行されます。例えば分散システムアルゴリズムのための riak_core、キーバリューストレージのための riak_kv、HTTP のための webmachine などです。ここまでは riak_core の下にあるツリーを例として多層構造の監視システムがどうようになっているかを見てきました。この構造の利点の一つが、サブシステムが (バグ、環境的な問題、意図的な動作の結果として) クラッシュ可能であり、そのときはまずその部分木だけが終了されることです。

スーパーバイザは必要ならばプロセスを再起動しますが、そのときシステム全体は影響を受けません。この仕組みは Riak において現実の環境で上手く働いています。もしかするとユーザーは仮想ノードをクラッシュさせる方法を見つけるかもしれません。しかし仮想ノードがクラッシュしたとしても、そのプロセスはスーパーバイザ riak_vnode_sup によって再起動されるだけです。もし riak_vnode_sup もクラッシュさせることができたとしても、終了シグナルを受け取った上のレベルのスーパーバイザ riak_coreriak_vnode_sup 再起動させます。障害を切り離して復帰させるこの仕組みによって、 Riak (および Erlang) 開発者は障害耐性の高いシステムを簡単に構築できるようになっています。

Erlang の監視モデルの有用性は、データベースシステムが攻撃を受けやすい環境においてどのようにダウンするかを大規模な産業ユーザーが検証したときに示されました。この環境では巨大なデータや多数の障害がランダムにシステムに与えられたのですが、Riak が最悪の条件下でも停止しないことに研究者は困惑したといいます。彼らは個々のプロセスやサブシステムを様々な方法でクラッシュさせましたが、そのたびにスーパーバイザがクリーンアップと再起動を行うので、システムは正常状態に復帰できたのです。

アプリケーション

前に紹介した application ビヘイビアは Erlang のモジュールとリソースを再利用可能なコンポーネントにパッケージ化するために使います。OTP には二種類のアプリケーションがあります。一番よく使われるのはノーマルアプリケーションで、一つのスーパーバイザと関連する静的なワーカーをいくつか開始します。もう一つは Erlang ディストリビューションに付属する標準ライブラリのようなライブラリアプリケーションで、ライブラリモジュールは含まれますが監視ツリーを開始しません。これはプロセスや監視ツリーが全く含まれないというわけではなく、他のアプリケーションの一部として開始されるということです。

Erlang システムは疎に結合したいくつかのアプリケーションの集合として構成されます。アプリケーションには開発者によって書かれるものもあれば、オープンソースで利用可能なもの、Erlang/OTP ディストリビューションに付属するものもあります。Erlang のランタイムシステムとツールは全てのアプリケーションを同じように扱い、Erlang ディストリビューションに含まれるどうかで処理を変えることはありません。

Riak における複製と通信

Riak は大規模なシステムにおいて非常に高い信頼性と可用性を達成できるように設計されており、Amazon のストレージシステム Dynamo [DHJ+07] の影響を受けています。Dynamo と Riak のアーキテクチャは分散ハッシュテーブル (Distributed Hash Table, DHS) と従来のデータベースを組み合わせており、鍵となるのはレプリカ配置のためのコンシステントハッシュ (consistent hashing) と、共通状態を共有するためのゴシッププロトコル (gossip protocol) です。

コンシステントハッシュではシステム内の全てのノードがお互いを感知し、どのノードがどの区間を保持するかを知っておく必要があります。このデータ割り当てを中央で管理される設定ファイルで保持することもできますが、大規模なシステム構成においてはこれは非常に難しくなります。中央設定サーバーを用意するという方法もありますが、こうするとシステムに単一障害点が生まれてしまいます。Riak ではその代わりゴシッププロトコルを使ってシステム全体のクラスターの構成要素と区間の保有者を管理します。

ゴシッププロトコルは伝染プロトコル (epidemic protocol) とも呼ばれ、その名の通りの動作をします。システム内のノードが共有データを変更する場合には、データのローカルコピーに変更をしてから更新されたデータをランダムなピアに耳打ち (gossip) します。更新を受け取ったノードはそれをローカルのデータにマージし、そのデータをさらにランダムなピアに耳打ちします。

Riak クラスターが開始されたときには、全てのノードが同じ区間カウントを持ちます。コンシステントハッシュリングは区間カウントで分割され、それぞれの区間が {HashRange, Owner} という組になってローカルに保存されます。最初のノードは全ての区間を受け持ち、クラスターに加わる新しいノードは既存のノードから {HashRange, Owner} の組を入手し、(区間カウント)/(ノード数) という個数の区間を受け持ちます。新しいノードはその後ローカルの状態を更新し、それを他のピアに耳打ちします。この更新された情報は上述のアルゴリズムによってクラスター中に広がります。

ゴシッププロトコルを使うことで、Riak は中央構成サーバーによる単一障害点を回避し、システム管理者をクリティカルな構成データの管理から解放します。ゴシップによって広まった区間割り当てデータを使えば、システム内の任意のノードがリクエストをルーティングできます。ゴシッププロトコルとコンシステントハッシュを組み合わせることで、Riak は完全に分散されたシステムとして機能します。これは大規模なシステムをデプロイ・運用する上で重要な意味を持ちます。

結論と教訓

小さくて単純なコードベースは管理しやすいだけではなくバグも少なくなる、というのは大勢のプログラマーによって信じられています。Erlang が提供する基礎的な通信プリミティブを利用することで、Riak は本質的に健全な非同期メッセージングレイヤーと共に開発を始めることができ、その実装を気にすることなく独自のプロトコルを作成できました。Riak が成熟したシステムに成長するにつれ、ネットワーク通信の一部は Erlang 組み込みの方式 (そして TCP ソケットの直接的な生成) ではなくなりましたが、組み込みのプリミティブで十分だった箇所もあります。最初に Erlang ネイティブのメッセージパッシングだけを使うことで、Riak チームはシステム全体を非常に素早く完成させることができました。プリミティブは簡単で明快であり、本番環境で使えないことが後に判明したとしても置き換えるのは簡単です。

加えて、Erlang のメッセージングの特性と Erlang VM の軽量なコアのおかげで、ユーザーは 1 つのマシンで 12 個のノードを実行することも 12 個のマシンで 12 個 のノードを実行することも容易です。このため開発とテストは他の複雑なメッセージングやクラスタリングメカニズムと比べてはるかに簡単になります。Riak は本質的に分散されるので、この特徴は重要です。歴史的に分散システムを開発者のラップトップを使って“開発モード”で運用するのはとても難しいことが多く、テストは完全なシステムの一部を使って、本番とは大きく異なる環境で行うしかありませんでした。Riak では多数のノードを持つクラスターであっても一つのラップトップ上でテスト可能であり、リソースの大きな消費も無く複雑な設定も必要ありません。そのため開発プロセスはデプロイ可能なコードを簡単に作成できます。

Riak は Erlang/OTP スーパーバイザの利用によって構成要素のクラッシュに対する高い耐性を手に入れているだけではなく、さらに先を行っています: ビヘイビアの利用によって、Riak クラスタは全てのノードがクラッシュしたりシステムから消失したとしても難なく動作を続けることが可能です。これによりときには驚くべき障害耐性が発揮されることがあります。一つ例をあげましょう。とある大企業が様々なデータベースのストレステストを行うために意図的にノードをクラッシュさせて何が起こるかを見ていたときのことです。研究者は Riak を試したときに困惑してしまいました。Riak のサブシステムをクラッシュさせる方法 (OS レベルの改ざん、不正な IPC など) を見つけても、パフォーマンスが一瞬落ちるだけでシステムはすぐに正常な状態に戻ってしまうからです。これは「クラッシュさせる」というアプローチの分かりやすい帰結です。Riak はクラッシュしたサブシステムを必要に応じて綺麗に再起動するので、システム全体は動作を続けることができます。この例は Erlang/OTP を使ったプログラム作成アプローチによって可能となる障害耐性をまさに示しています。

謝辞

この章は Francesco Cesarini と Simon Thompson が 2009 年に Budapest と Komárno で開かれた central European Functional Programming School で行った講義に基づいています。主な部分はイギリスの Canterbury にある University of Kent の Simon Thompson によって作成されました。この章の執筆中に様々なタイミングで価値あるフィードバックを提供してくれたレビュアーたちにも感謝します。



Amazon.co.jp アソシエイト (広告)
Audible の無料体験を始めよう
amazon music unlimited で音楽聞き放題