非同期プログラミング

プログラムが外界とやり取りをするとき、例えばインターネット越しに他のマシンと通信をするとき、プログラムの処理を前もって予測できない順序で実行しなければならないことがあります。プログラムの途中でファイルをダウンロードするとしましょう。このときはダウンロード操作を開始し、完了するのを待ちながら他の処理を行い、ファイルが利用可能になったらそれを使う処理を行いたいはずです。この種のシナリオをカバーするのが非同期 (asynchronous) プログラミングです。非同期プログラミングは並列 (concurrent) プログラミングとも呼ばれます (概念上は複数の処理が一度に起こるからです)。

こういったシナリオに対応するために、Julia はタスクを提供します (タスクは対称的コルーチン (symmetric coroutines)・軽量スレッド (light thread)・協調的マルチタスク (cooperative multitasking)・ワンショット継続 (one-shot continuations) といった名前で呼ばれることもあります)。計算処理の一部分 (実際のコードでは関数) を Task として切り出すと、その部分の実行を途中で中断して他のタスクに切り替えられるようになります。中断した Task は後から再開でき、それをさらに中断することもできます。

一見するとタスクは通常の関数呼び出しに似ていますが、重要な違いが二つあります。まず、タスクの切り替えは空間を一切使用しないので、タスクをいくら切り替えてもコールスタックは消費されません。次に、呼び出し側に制御を戻すには実行を終えるしかない通常の関数呼び出しとは対称的に、タスクの切り替えは任意のタイミングで行えます。

基本的なタスク演算

Task はこれから実行される計算処理の一単位を指すハンドルと考えることができ、作成-開始-実行-終了というライフサイクルを持ちます。タスクは Task コンストラクタに実行するゼロ引数の関数を渡すか、@task マクロを使うことで作成します:

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0

@task xTask(()->x) と等価です。

このタスクは五秒間何もしないで待ってから done を出力します。ただし、上のように書いただけでは実行は始まりません。schedule を呼び出すとタスクを実行できます:

julia> schedule(t);

これを REPL で試すと、schedule がすぐに返るのが分かるでしょう。schedule(t) は実行するタスクを収めた内部キューに t を追加するだけであるためです。その後 REPL は次のプロンプトを出してキーボード入力を待つので、t を実行している間に別のタスクを開始することもできます。t はタイマーをセットして実行を停止する sleep を呼び出すので五秒後にタイマーが発火して t が再開され、そのとき REPL に done が出力されます。t の実行はこの直後に終了します。

wait 関数を呼び出すと、呼び出し側のタスクを他のタスクが終了するまでブロックできます。そのため schedule(t) とする代わりに

julia> schedule(t); wait(t)

とすると、五秒経過してから次の入力プロンプトが表示されます。これは REPL が t の終了を待ってから次に進んでいるためです。

タスクの作成とスケジュールよく同時に行われるので、@async マクロが提供されます。@async xschedule(@task x) と等価です。

チャンネルを使った通信

一部の問題では行うべきジョブの間で「呼び出し元」と「呼び出し先」が明らかでないために、関数の呼び出しでは必要になる処理を自然に定式化できません。こういった問題の例として生産者-消費者問題があります: ある一つの複雑な手続きが値を生産し、別の複雑な手続きがそれを消費するという状況では、生産される値の個数と消費される値の個数が一致しない可能性があるので、単純に消費者が生産者を呼び出すだけでは上手く処理が進みません。このような場合にタスクを使えば、生産者と消費者の双方が好きなように自身の手続きを実行し、必要に応じて値をやり取りすることが可能です。

この問題を解決するために、Julia は Channel という機構を提供します。Channel はブロック可能な先入れ先出しのキューであり、複数のタスクからの書き込みおよび読み出しに対応します。

生産者を表すタスクを定義してみましょう。次の生産者タスクは put! を使って値を生産します (実際のコードでは消費者タスクが別に必要です)。一引数関数を引数に受け取る特別な Channel コンストラクタを使うと、チャンネルの作成とそのチャンネルを利用するタスクの作成・実行を同時に行えます。その後 take! を使えば、チャンネルオブジェクトから何度も値を取り出せます:

julia> function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

julia> chnl = Channel(producer);

julia> take!(chnl)
"start"

julia> take!(chnl)
2

julia> take!(chnl)
4

julia> take!(chnl)
6

julia> take!(chnl)
8

julia> take!(chnl)
"stop"

この振る舞いを理解する一つの方法として、producer が複数の値を返しているのだと考えてみてください。生産者が put! を実行して “値を返す” と生産者の実行が停止され、消費者が take! を実行して “値を受け取る” と生産者の実行が再開されます。

コンストラクタが返す Channel は反復可能オブジェクトとして for ループで利用できます。生産された値がループ変数となり、チャンネルが閉じられるとループは終了します:

julia> for x in Channel(producer)
           println(x)
       end
start
2
4
6
8
stop

生産者の側で明示的にチャンネルを閉じていないことに注目してください。これは ChannelTask に (コンストラクタで) バインドすると、そのチャンネルの寿命がタスクの寿命と同期されるためです。チャンネルオブジェクトはバインドされたタスクが終了すると自動的に閉じられます。複数のチャンネルを一つのタスクにバインドでき、逆も可能です。

Task コンストラクタはゼロ引数の関数を受け取るのに対して、タスクにバインドされたチャンネルを作成する Channel メソッドは「Channel 型の引数を受け取る一引数関数」を受け取ります。生産者をパラメータにするのがよくあるパターンで、この場合は引数がゼロ個または一つの無名関数を作成するために関数の部分適用が必要です。

例えば Task オブジェクトでこのパターンを使うときは無名関数を直接作るか、利便性のためのマクロ @task を使います:

function mytask(myarg)
    ...
end

taskHdl = Task(() -> mytask(7))
# 次のマクロ呼び出しと等価
taskHdl = @task mytask(7)

より高度な分散パターンを実装するには、TaskChannel のコンストラクタだけではなく bindschedule を使ってチャンネルと生産者/消費者のタスクを明示的に関連付けることになります。

さらにチャンネルについて

チャンネルは書き込み口と読み込み口を持つパイプと考えることができます。チャンネルの特徴を示します:

タスク間の通信にチャンネルを使う例を示す簡単なシミュレーションを考えます。四つのタスクが一つのチャンネル jobs からのデータを処理しており、ジョブを識別する job_id がチャンネルに書き込まれます。このシミュレーションの各タスクは一つの job_id を読み、ランダムな時間だけ待ち、job_id とタスクにかかった時間を含んだタプルを結果収集用チャンネル results に書き出します。最後に書き込まれたデータを results に出力しています:

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               # 現実的な処理を行う時間をシミュレートする。
               # 通常は外部の関数が使われる。
               sleep(exec_time)
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> @async make_jobs(n); # n 個のジョブを jobs チャンネルに供給する。

julia> for i in 1:4 # リクエストを並行に処理する 4 つのタスクを開始する
           @async do_work()
       end

julia> @elapsed while n > 0 # 結果を出力する
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

さらにタスクの操作について

タスクの操作は低水準プリミティブ yieldto を使って実装されています。yieldto(task, value) を呼び出すと現在のタスクが停止し、実行が指定された task へ切り替わり、切り替え先のタスクが切り替わる直前に呼び出した yieldto が指定された value を返した状態で実行が再開されます。タスクを使った制御構造に必要な唯一の操作が yieldto であることに注目してください。タスクに呼び出しや値の返却という概念は存在せず、タスクは必ず別のタスクへ切り替わります。これはタスクの機能が「対称的コルーチン (symmetric coroutines)」と呼ばれることがある理由でもあります: どんなタスクも同じ機構を使って切り替えが行われるのです。

yieldto は強力ですが、ほとんどのタスクはこの関数を直接呼び出しません。この理由を考えてみてください。もし現在のタスクを別のタスクに切り替えるなら、いずれ実行は現在のタスクに戻るはずです。しかし実行をいつ戻すのか、そして戻す責任を負うタスクがどれかを決めるには、相当の調整が必要です。例えば put!take! はどちらもブロックする可能性のある操作であり、チャンネルで使うときは消費者が誰であるかを覚えておかなくてはなりません。put! を使えば消費者のタスクを覚えておく必要がないというのが、低水準の yieldto よりも put! の方が簡単に使える理由です。

タスクを効率的に使うには、yieldto 以外にもいくつか基礎的な関数が必要です:

タスクとイベント

タスクの切り替えの多くは IO リクエストのようなイベントに対する待機の結果として起こります。そういったタスクの切り替えは Julia Base に含まれるスケジューラによって実行されます。スケジューラは実行可能なタスクのキューを管理しており、メッセージの到着といった外部イベントが発生すると、タスクを再起動するイベントループを実行します。

イベントに対する待機を行う基礎的な関数は wait です。wait を実装するオブジェクトはいくつかあります。例えば Process オブジェクトに対する wait は終了を待ちます。wait が間接的に呼ばれることもよくあります。例えばデータが利用可能になるまで待機する read を呼び出せば、その中で wait が呼ばれる可能性があります。

こういったケースの全てで wait が最終的に操作しているのが Condition オブジェクトであり、このオブジェクトがタスクのキューと再起動を制御します。タスクが Condition に対して wait を呼び出すと、そのタスクには実行できないことを示す印が付けられ、その Condition が持つキューにそのタスクが入れられ、実行がスケジューラに切り替わります。その後スケジューラは他のタスクを選んで実行するか、外部イベントが起こるまでブロックして待機します。もし全てが上手く行けば、いずれ何らかのイベントハンドラが Condition に対して notify を呼び出し、待機中のタスクを実行可能にします。

Task を呼び出して明示的に作成されたタスクは最初スケジューラによって感知されません。このため、もし望むなら yieldto を使って自分の手でタスクを管理することもできます。ただしそういったタスクがイベントを wait したときは、そのタスクは (通常通り) 自動的に再開されます。


日本語 Julia 書籍 (Amazon アソシエイト)
1 から始める Julia プログラミング
Julia プログラミングクックブック―言語仕様からデータ分析、機械学習、数値計算まで
スタンフォード ベクトル・行列からはじめる最適化数学