マルチプロセッシングと分散計算

分散メモリ並列計算は Distributed モジュールとして実装され、Julia の標準ライブラリに付属します。

多くの現代的なコンピューターは複数の CPU を使って処理を行い、複数のコンピューターが組み合わさってクラスターとなることもあります。複数の CPU のパワーを上手く活用すれば、多くの計算を素早く終わらわせることができます。計算性能に影響を及ぼす主要な因子は二つあります: CPU 自身の計算速度と、CPU からメモリへのアクセス速度です。コンピューターのクラスターでは同じコンピューター (ノード) の RAM に対するアクセスが最も高速なのは自然に理解できると思います。しかし意外にも、典型的なマルチコアのノートパソコンでも同じ問題が起こります: メインメモリとキャッシュではアクセス速度が異なるためです。そのため優れたマルチプロセッシング環境では、特定のメモリ領域をどの CPU が保持するか (メモリのオーナーシップ) を制御できる必要があります。Julia はメッセージパッシングを使ったマルチプロセッシング環境を提供するので、異なるメモリドメインを持つ複数のプロセスでプログラムを同時に実行することが可能です。

Julia のメッセージパッシングの実装は MPI1 といった他の環境とは異なります。Julia における通信は一般に片方向 (one-sided) です。これは二つのプロセスが絡む通信操作であってもプログラマが管理しなければならないプロセスは一つだけであることを意味します。さらに、通信に関する操作は通常「メッセージの送信」や「メッセージの受信」という形をしておらず、「ユーザー関数の呼び出し」のようなより高水準な操作の形をしています。

Julia における分散プログラミングの基礎となるのはリモートリファレンス (remote reference) とリモートコール (remote call) という二つのプリミティブです。リモートリファレンスは特定のプロセスが持つオブジェクトを指すオブジェクトであり、どのプロセスからでも利用できます。リモートコールは一つのプロセスが行う関数呼び出しのリクエストであり、関数と (同じプロセスにあるとは限らない) 引数を指定して行います。

リモートリファレンスには二つの種類があります: FutureRemoteChannel です。

リモートコールは Future を返り値として返します。リモートコールはすぐに値を返すので、呼び出したプロセスはリモートコールがどこか別の場所で行われている間に次の処理へ進むことができます。リモートコールの終了まで待機するには、呼び出しが返す Future に対して wait を行い、その後 fetch をすれば返り値を受け取れます。

これに対して RemoteChannel には何度でも書き込みが可能です。例えば複数のプロセスが一つの RemoteChannel を参照しながら協調して処理を行うといったことができます。

各プロセスには識別子 (ID) が関連付きます。Julia の対話プロンプトを提供するプロセスの ID は必ず 1 です。並列操作を行ったときにデフォルトで使われるプロセスを、まとめてワーカー (worker) と呼びます。プロセスが一つしかないときは ID が 1 のプロセスがワーカーとみなされ、そうでない場合は ID が 1 でない全てのプロセスがワーカーとみなされます。そのため、pmap といった並列処理メソッドの恩恵を得るためには少なくとも二つのプロセスが必要です。長い計算をワーカーで行っている間にメインプロセスで他のことを行いたいだけなら、プロセスを一つ追加するだけでも十分です。

実際に試してみましょう。Julia を julia -p n で起動すると、n 個のワーカープロセスがローカルのマシンに起動します。通常は n をマシンの CPU スレッド (論理コア) 数と一致させるのが理にかなっています。コマンドライン引数で -p を指定すると Distributed が自動的にインポートされることに注意してください。

$ ./julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.18526  1.50912
 1.16296  1.60607

remotecall の第一引数は呼び出す関数です。Julia における並列プログラミングの大部分は実行するプロセスの ID や個数を指定しませんが、remotecall はそういった数値を指定できる低水準なインターフェースとなっています。remotecall の第二引数は処理を行うプロセスの ID で、それ以降の引数は呼ばれる関数へ渡される引数です。

コードを見ればわかると思いますが、一行目で乱数が入った 2×2 行列の構築する命令をプロセス 2 に出し、二行目でその行列の各要素に 1 を加えています。この二つの計算結果として r, s という二つの Future が手に入ります。@spawnat マクロは第二引数の式を第一引数が指定するプロセスで評価します。

リモートノードで計算した値をすぐに利用したい場合もあるでしょう。典型的なのがローカルで次に行う計算に必要なデータを取得するためにリモートのオブジェクトを読むときです。このための関数が remotecall_fetch です。この関数が行う処理は fetch(remotecall(...)) と等価ですが、二つの関数を別々に呼ぶより効率的に行えます:

julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.18526337335308085

以前に説明したように、getindex(r,1,1)r[1,1] と等価です。そのためこの呼び出しは r の最初の要素を返します。

呼び出しを簡単にするために、@spawnat にはシンボル :any を渡せるようになっています。:any を渡すと処理を行うプロセスが自動的に選ばれます:

julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)

julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.38854  1.9098
 1.20939  1.57158

1 .+ r ではなく 1 .+ fetch(r) としていることに注目してください。この例では二つのコードがそれぞれどこで実行されるか分からないので、加算を行うプロセスへ r を移動させなければならない可能性があります。ただこの場合には二つ目の @spawnat が気を利かせて r があるプロセスで計算を始めてくれるので、1 .+ fetch(r) に含まれる fetch は noop (何もしない) となります。

(@spawnat が組み込みの構文ではなく Julia で定義されるマクロであることは注目に値します。こういった構文を自分で定義することもできます。)

一度 fetch された Future の値がローカルにキャッシュされることは覚えておくべき重要な事実です。以降の fetch はネットワークホップを意味しません。全ての Future の参照がフェッチされると、リモートで保存される値は削除されます。

パッケージの読み込みとコードの可視性

コードを他のプロセスで実行するときは、実行を行う全てのプロセスでそのコードが利用可能である必要があります。例えば、次のコードを Julia プロンプトに入力してみてください:

julia> function rand2(dims...)
           return 2*rand(dims...)
       end

julia> rand2(2,2)
2×2 Array{Float64,2}:
 0.153756  0.368514
 1.15119   0.918912

julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]

このコードでプロセス 1 は rand2 関数を知っていますが、プロセス 2 は知っていません。

コードはファイルやパッケージから読み込むことがほとんどですが、コードを読み込むプロセスの選択では非常に柔軟な制御が可能です。DummyModule.jl という名前のファイルに次のコードが含まれているとします:

module DummyModule

export MyType, f

mutable struct MyType
    a::Int
end

f(x) = x^2+1

println("loaded")

end

全てのプロセスから MyType を参照できるようにするには、DummyModule.jl を全てのプロセスで読み込む必要があります。include("DummyModule.jl") を呼び出しても一つのプロセスで読み込まれるだけなので、@everywhere マクロが必要です。次の例では julia -p 2 として起動したプロンプトを使っています:

julia> @everywhere include("DummyModule.jl")
loaded
      From worker 3:    loaded
      From worker 2:    loaded

通常通り、これだけでは DummyModule はプロセスのスコープに入らず、usingimport が必要です。また一つのプロセスのスコープに DummyModule が持ち込まれたとしても、他のプロセスのスコープに持ち込まれることはありません:

julia> using .DummyModule

julia> MyType(7)
MyType(7)

julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: MyType not defined


julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)

ただし、次に示すように、DummyModule を読み込んだプロセスに対してスコープに入っていない MyType 型の値を送るといったことは可能です:

julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)

Julia を開始するときに複数のプロセスでファイルを事前ロードするには、コマンドライン引数 -L を使います。さらに全体の計算の制御をドライバスクリプト (driver.jl) にまとめれば、次のコマンドとなります:

$ julia -p <n> -L file1.jl -L file2.jl driver.jl

この例でドライバスクリプトを実行するプロセスの ID は 1 であり、対話プロンプトを提供するプロセスと同じです。

最後に、もし DummyModule.jl が単一のファイルではなくパッケージなら、using DummyModuleDummyModule.jl全てのプロセスで読み込みます。ただしスコープに入るのは using を呼んだプロセスでだけです。

ワーカープロセスの開始と管理

通常の方法でインストールされた Julia は二種類のクラスターに対する組み込みのサポートを持ちます:

addprocs, rmprocs, workers といった関数を使うとプログラムからプロセスの追加・削除・問い合わせが可能です:

julia> using Distributed

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

addprocs を呼ぶためには Distributed モジュールを明示的にマスタープロセスを読み込む必要があります。ワーカープロセスでは Distributed は最初から利用可能です。

ワーカーはスタートアップスクリプト ~/.julia/config/startup.jl を実行しないこと、そしてワーカーはグローバルな状態 (グローバル変数・新しいメソッドの定義・読み込まれたモジュール) を実行中の他のプロセスと同期しないことに注意してください。ワーカーを特定の環境で初期化するには addprocs(exeflags="--project") を使った上で @everywhere using <modulename> または @everywhere include("file.jl") とします。

独自の ClusterManager を書けば他の種類のクラスターをサポートできます。ClusterManager の節でこの話題を詳しく説明します。

データの移動

メッセージの送信とデータの移動は分散プログラムにおける主要なオーバーヘッドです。そのため送信されるメッセージの数とデータの量を削減することが高い性能とスケーラビリティを達成する上で非常に重要となります。このためには、Julia の様々な分散プログラミング構文が行うデータの移動に関して理解しておかなければなりません。

fetch 関数は明示的なデータ移動を行う操作とみなせます。この関数はオブジェクトをローカルマシンに移動させることを直接指示するからです。@spawnat (およびこれに似た構文) もデータを移動しますが、何が移動するかは fetch の場合ほど明快ではありません。@spawnat が暗黙のデータ移動操作を意味する場合があるからです。例えば、乱数行列を作成して二乗する二つのアプローチを考えます:

二つのコードに大きな違いはないと思うかもしれませんが、@spawnat の振る舞いにより大きな違いが存在します。一つ目の方法では乱数行列がローカルに構築され、それから二乗を実行するプロセスに送信されます。これに対して二つ目の方法では乱数行列の構築と二乗が同じプロセスで行われます。そのため二つ目の方法はデータの通信量が一つ目の方法よりずっと少なく済みます。

この簡単な例では二つの方法の違いを見つけて正しい方を選ぶのは難しくありません。しかし現実の問題では、正しいデータ移動を設計するのに様々なことを考えなければならず、いくらかの計測も必要になることでしょう。例えば一つ目のプロセスが行列 A を必要とするなら一つ目の方法が優れているかもしれませんし、A の計算に時間がかかるなら別のプロセスへの移動は避けられません。あるいは並列性はそもそも必要とならず、@spawnatfetch(Bref) を使わない方が性能が高くなる可能性さえあります。また rand(1000,1000) の部分がもっと時間のかかる処理であれば、その一つのステップに対して @spawnat を行うのが理にかなっているでしょう。

グローバル変数

@spawnat を通じてリモートで実行される式、および remotecall を通じてリモートで実行されるクロージャではグローバル変数を参照できます。そのとき Main モジュールに含まれるグローバル束縛は他のモジュールに含まれるグローバル束縛と少し異なった扱いを受けます。例えばコードスニペット

A = rand(10,10)
remotecall_fetch(()->sum(A), 2)

において、 sum はリモートプロセスで定義されている必要があります。A はローカルのワークスペースで定義されるグローバル変数であることに注目してください。ワーカー 2 は Main 内に A という名前の変数を持ちません。()->sum(A) というクロージャをワーカー 2 に渡すと Main.A がそこで定義され、Main.Aremotecall_fetch が終わった後もワーカー 2 に存在し続けます。Main モジュールのグローバルな参照を含むリモートコールは次のように扱われます:

気が付いたかもしれませんが、マスターではグローバル変数に関連付くメモリの回収・再利用が可能であるのに対して、ワーカーではグローバル束縛が有効であり続けるために不必要になったメモリの回収・再利用が不可能です。clear! を使うと、使わなくなったリモートノード上のグローバル変数に nothing を代入できます。こうすれば通常のガベージコレクションサイクルでそのグローバル変数が持っていたメモリが解放されます。

そのため、リモートコールがグローバル変数を参照するときは注意が必要です。可能ならグローバル変数を一切使わない方がよいでしょう。グローバル変数を参照しなければならないときは、let ブロックを使ってグローバル変数をローカル変数として使用できないか考えてみてください。

let を使ってグローバル変数を “ローカルに” 使う例を示します:

julia> A = rand(10,10);

julia> remotecall_fetch(()->A, 2);

julia> B = rand(10,10);

julia> let B = B
           remotecall_fetch(()->B, 2)
       end;

julia> @fetchfrom 2 InteractiveUtils.varinfo()
name           size summary
––––––––– ––––––––– ––––––––––––––––––––––
A         800 bytes 10×10 Array{Float64,2}
Base                Module
Core                Module
Main                Module

最後の出力から分かるように、グローバル変数 A はワーカー 2 で定義されていますが、ローカル変数としてキャプチャされて使われた B はワーカー 2 に存在しません。

並列 for ループと並列マップ

幸い、データの移動を必要としない並列計算にも実用的なものが存在します。例えばよく使われるモンテカルロシミュレーションでは、複数のプロセスが独立したシミュレーションの試行を同時に行えます。@spawnat を使って二つのプロセスでコインを投げるシミュレーションを書いてみましょう。まず count_heads.jl で次の関数を定義します:

function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

count_heads 関数は n 個のランダムなビットを足しているだけです。この関数を使って二つのマシンで試行を行い、その結果を足すには次のようにします:

julia> @everywhere include_string(Main,
                                  $(read("count_heads.jl", String)),
                                  "count_heads.jl")

julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)

julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)

julia> fetch(a)+fetch(b)
100001564

この例は頻繁に使われる強力なプログラミングパターンを示しています: 多くの反復をいくつかのプロセスを使って独立に実行し、最後に結果を何らかの関数で組み合わせるというパターンです。各プロセスからの結果を組み合わせる最後の処理を縮約 (reduction) と呼びます。こう呼ばれているのは、この操作が一般にテンソルのランクを落とす (reduce する) ためです。例えば数値のベクトルは単一の数値となり、行列は単一の行や列となります。リダクションはループの中で x = f(x,v[i]) という形をしていることが多く、ここで x が最終的な縮約結果の値、f が縮約を行う関数、そして v[i] が縮約される要素を表します。縮約の順序が結果に影響しないように、f が結合律を満たすことが望ましいとされます。

count_heads が持つこのパターンは一般化できます。上記の例では @spawnat 文を使っているので並列性が文の数に制限されますが、並列 for ループを使えば分散メモリ上で任意の個数のプロセスを使って処理を実行できます。Julia では並列 for ループを @distributed マクロで次のように書きます:

nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

この構文が実装するのは、反復を分割して複数のプロセスに割り当て、反復の結果を指定された縮約演算 (この例では + 関数) で組み合わせるというパターンです。ループの最後の式の値が各反復の結果となり、縮約の最終的な結果が並列 for ループ全体の式の評価結果となります。

並列 for ループは通常の逐次 for ループのような見た目をしていますが、その振る舞いは大きく異なることに注意してください。特に気を付けなければならないのが、反復が指定した順序で起こるとは限らないこと、そして変数や配列への書き込みが (別のプロセスで起こるために) グローバルな影響を持たないことです。並列 for ループの内部で使われる変数は全てコピーされ、実行を行う各プロセスにブロードキャストされます。

例えば、次のコードは意図通りに動作しません:

a = zeros(100000)
@distributed for i = 1:100000
    a[i] = i
end

全てのプロセスが個別に a のコピーを持つことになるので、このコードでは a を初期化できません。こういった並列 for ループは避けるべきです。ですが幸いにも、SharedArrays モジュールが提供する共有配列を使えばこの制限を回避できます:

using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

並列 for ループで外部変数を使ったとしても、その変数が読み込み専用であれば何の問題もありません:

a = randn(1000)
@distributed (+) for i = 1:100000
    f(a[rand(1:end)])
end

このコードを実行するとベクトル a が全てのプロセスに共有され、各プロセスがランダムに選んだ a の要素に対して f を計算します。

なお縮約演算は必要なければ省略でき、そのとき並列 for ループは非同期に実行されます。言い換えると、縮約演算を省略した並列 for ループは利用可能なワーカーで独立したタスクを起動し、その終了を待つことなく Future の配列をすぐに返します。呼び出し側は Future に対して fetch を呼べば処理の終了を待機できます。あるいは @sync をループの最初に付けて @sync @distributed for とするやり方もあります。

並列計算において縮約演算が必要とならず、何らかの区間に属する全ての整数 (より一般には何らかのコレクションに属する全ての要素) に対して関数を適用したいだけである場合があります。これは縮約とは異なる並列計算パターンであり、並列マップ (parallel map) と呼ばれます。Julia は並列マップを pmap 関数で実装します。例えば 10 個の大規模な乱数行列の特異値を並列に計算するには次のようにします:

julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];

julia> pmap(svdvals, M);

Julia の pmap はそれぞれの関数呼び出しが大規模な処理をするものとして設計されています。これに対して @distributed for は各反復が小さな処理 (例えば二つの数字の加算) を行う状況を扱います。pmap@distributed for はどちらもワーカープロセスだけを使って並列計算を行い、@distributed for では最後の縮約で呼び出したプロセスが使われます。

リモートリファレンスと AbstractChannel

リモートリファレンスは必ず AbstractChannel の実装を指します。

AbstractChannel を実装する具象型 (例えば Channel) には put!, take!, fetch, isready, wait の実装が必要です。Future が指すリモートオブジェクトは Channel{Any}(1) (Any 型のオブジェクトを保持するサイズ 1 の Channel) に保持されます。

RemoteChannel は再書き込み可能であり、任意の型とサイズを持つチャンネルあるいは他の AbstractChannel の実装を指すことができます。

コンストラクタ RemoteChannel(f::Function, pid) を使うと、特定の型を持つ複数の値を保持するチャンネルへのリモートリファレンスを構築できます。f はプロセス pid 上で実行される関数であり、AbstractChannel のインスタンスを返す必要があります。

例えば RemoteChannel(()->Channel{Int}(10), pid) はサイズ 10 で Int 型のチャンネルへのリモートリファレンスを返します。このチャンネルは ID が pid のワーカーに存在します。

RemoteChannel に対するメソッド put!, take!, fetch, isready, wait はリモートプロセス上のバッキングストアへ転送されます。

このため RemoteChannel はユーザーが実装した AbstractChannel オブジェクトを参照するために利用できます。この機能の簡単な例として、Julia のサンプルコードレポジトリdictchannel.jl に辞書をリモートの保存場所に使う AbstractChannel があります。

ChannelRemoteChannel の違い

前にスレッドを使って作成したシミュレーションをプロセス間通信を使って書き換えると次のようになります。

このコードは四つのワーカーが一つのリモートチャンネル jobs を処理します。最初 job_id という ID で識別されるジョブを jobs チャンネルへ書き込み、その後リモートで実行されるタスクが同時に jobs チャンネルから読み、ランダムな時間だけ待機し、そして job_id・経過時間・pid からなるタプルを results チャンネルに書き込みます。最後にマスタープロセスが全ての results を出力します:

julia> addprocs(4); # ワーカープロセスを追加する

julia> const jobs = RemoteChannel(()->Channel{Int}(32));

julia> const results = RemoteChannel(()->Channel{Tuple}(32));

julia> # 仕事を行う関数を全てのプロセスで定義する。
       @everywhere function do_work(jobs, results)
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # 実際の仕事をする処理時間をシミュレートする。
               put!(results, (job_id, exec_time, myid()))
           end
       end

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> @async make_jobs(n); # n 個のジョブを jobs チャンネルに入れる。

julia> for p in workers() # ワーカー上でタスクを開始して、リクエストを並列に処理する。
           remote_do(do_work, p, jobs, results)
       end

julia> @elapsed while n > 0 # 結果を出力する
           job_id, exec_time, where = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
           global n = n - 1
       end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741

リモートリファレンスと分散ガベージコレクション

リモートリファレンスによって参照されるオブジェクト (AbstractChannel 型のインスタンス) は、そのオブジェクトを指すクラスター内の参照が全て削除されてはじめて削除できます。

オブジェクトを参照するノードの追跡はそのオブジェクトを保存するノードによって行われます。RemoteChannel や未フェッチの Future がワーカーへシリアライズ (転送) されるたびに、その参照が指す (AbstractChannel 型の) オブジェクトを持つノードは通知を受けます。そして RemoteChannel や未フェッチの Future がローカルでガベージコレクトされるたびに、その参照が指すオブジェクトを持つノードは再度通知を受けます。この仕組みはクラスターと適切なやり取りを行う Julia 内部のシリアライザに実装されています。リモートリファレンスはクラスターで実行されるときにだけ意味を持ちます。また通常の IO オブジェクトへの参照のシリアライズとデシリアライズはサポートされていません。

コレクト対象のオブジェクトが受け取る通知は “追跡” メッセージとして送信されます。例えば参照が他のプロセスに向けてシリアライズされるときは「参照の追加」のメッセージが、ローカルでガベージコレクトされたときは「参照の削除」のメッセージが送信されます。

Future は一度だけ書き込み可能であり計算された値はローカルにキャッシュされるので、Futurefetch すると参照の追跡情報も更新されます。

オブジェクトを指す全ての参照が削除されると、そのオブジェクトを持つノードが削除を行います。

フェッチ済みの Future を異なるノードへシリアライズすると、Future の値も同じノードから送信されます。フェッチされてからシリアライズされるまでの間に、値を計算したノードにあるオリジナルの値がコレクトされる可能性があるためです。

重要な事実として、オブジェクトがローカルのガベージコレクタによってコレクトされるタイミングはオブジェクトのサイズやシステムのメモリ使用量に依存します。

リモートリファレンスの場合には、ローカルに保存される参照オブジェクトは非常に小さくなる一方でリモートノードに保存される値は非常に大きくなる可能性があります。リモートのローカルオブジェクトがすぐにコレクトされない可能性があるので、RemoteChannel や未フェッチの Future のローカルなインスタンスに対しては必要なくなった段階で finalize を明示的に呼び出すのが良い習慣とされます。Future をフェッチするときはリモートストアからの参照が削除されるので、フェッチした Future に対しては finalize は必要ありません。finalize を明示的に呼び出すとすぐにリモートノードへメッセージが送信され、その値への参照を削除すべきであることが伝わります。

ファイナライザを起動した参照は無効となり、それ以降は利用できなくなります。

ローカルノードに対するリモートコール

リモートコールの実行に必要なデータはリモートノードへコピーされます。これは異なるノードにある RemoteChannelFuture にデータが保存されるときでも同様です。期待される通り、このときオブジェクトはシリアライズされてリモートノードにコピーされます。ただし目的ノードがローカルノードのとき、つまり呼び出し側のプロセス ID がリモートノードの ID と同じときは、リモートコールがローカルな呼び出しとして実行されます。このとき通常 (必ずではありません) 呼び出しは異なるタスクとして実行されますが、データのシリアライズとデシリアライズが行われません。つまりコピーは行われず、呼び出しは渡されたオブジェクトと同じオブジェクトを操作することになります。この振る舞いを示す例を示します:

julia> using Distributed;

julia> # ローカルノードに RemoteChannel が作成される。
       rc = RemoteChannel(()->Channel(3));

julia> v = [0];

julia> for i in 1:3
           v[1] = i # v を再利用する。
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[3], [3], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1

julia> addprocs(1);

julia> # リモートノードに RemoteChannel を作成する。
       rc = RemoteChannel(()->Channel(3), workers()[1]);

julia> v = [0];

julia> for i in 1:3
           v[1] = i
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[1], [2], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3

この例から分かるように、ローカルな RemoteChannel に対して同じオブジェクト vput! すると、その値が異なっていたとしても同じ参照が保存されます。これに対して異なるノードにある rcput! を行うと v のコピーが作成されます。

これは通常問題にならないことを指摘しておきます。これはオブジェクトがローカルに保存され、かつ呼び出しの後に変更されるときに限って考慮すべき事項です。そういった場合にはオブジェクトの deepcopy を行うのが適しているでしょう。

ローカルノードに対するリモートコールでも同じことが起こります:

julia> using Distributed; addprocs(1);

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v);       # ローカルノードで実行

julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # リモートノードで実行

julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false

ここでもローカルノードに対するリモートコールは通常の関数呼び出しのように振る舞い、引数に渡されるローカルのオブジェクトを改変します。これに対してリモートの呼び出しは引数をコピーします。

繰り返しになりますが、この振る舞いは通常問題になりません。これを考慮すべきなのはローカルノードが計算ノードとして使われていて引数が関数の後に使われるときだけであり、必要ならローカルノードの呼び出しで深いコピーを渡すことで対処できます。リモートノードに対する呼び出しは必ず引数のコピーに対する処理となります。

共有配列

共有配列はシステムの共有メモリを使って配列を複数のプロセスにマップします。SharedArrayDArray は一部似ている部分もありますが、その振る舞いは大きく異なります。DArray では各プロセスがデータの一部分だけを持ち、同じデータが二つ以上のプロセスで共有されることはありません。一方 SharedArray では配列への処理に “参加” する全てのプロセスが配列全体へのアクセス権を持ちます。同じマシンの二つ以上のプロセスが大きなデータに連帯してアクセスする必要があるなら、SharedArray が良い選択肢です。

共有配列のサポートは SharedArrays モジュールを通して提供されるので、参加する全てのワーカーで明示的に読み込んでおく必要があります。

SharedArray の添え字アクセス (および代入) は通常の配列と同じように動作し、さらに内部のメモリがローカルのプロセスから利用可能なので効率も落ちません。そのため単一プロセスで動作する配列を前提として書かれたアルゴリズムの多くは SharedArray に対して自然に動作します。アルゴリズムが Array の入力を要求するときは、sdata を使えば SharedArray が持つ内部の配列を取得できます。他の AbstractArray 型に対して sdata は引数をそのまま返すので、sdata は任意の Array 型のオブジェクトに対して呼び出して構いません。

共有配列のコンストラクタは次の形をしています:

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

これは要素型が T でサイズが dimsN 次元共有配列を作成し、それを pids で指定されるプロセスで共有します。分散配列とは異なり、共有配列は名前付き引数 pids で指定される “参加する” ワーカーからのみアクセスできます (共有配列を作成するプロセスが同じホスト上にあるなら、そこからもアクセスできます)。また SharedArray が要素型としてサポートするのは isbitstypetrue を返す型だけです。

initfn(S::SharedArray) というシグネチャを持つ関数を init に指定すると、それが参加するワーカーの全てで呼ばれます。各ワーカーの init 関数で配列の異なる部分を初期化させれば、初期化の並列化が可能です。

簡単な例を示します:

julia> using Distributed

julia> addprocs(3)
3-element Array{Int64,1}:
 2
 3
 4

julia> @everywhere using SharedArrays

julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

SharedArrays.localindices はプロセスごとに異なる重ならない添え字の区間を提供するので、プロセス間でタスクを分けるときに便利です。もちろん、初期化処理の切り分け方は自由です:

julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
 2  2  2  2
 3  3  3  3
 4  4  4  4

全てのプロセスが内部データへのアクセスを持つので、衝突を起こさないように注意が必要です。例えば

@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)
        end
    end
end

は未定義動作となります。各プロセスが配列全体に pid を書き込むためです。S の各要素について、その要素への代入を最後に実行したプロセスの pid が代入されることになります。

さらに込み入った複雑な例として、次の “カーネル” を並列に実行する処理を考えます:

q[i,j,t+1] = q[i,j,t] + u[i,j,t]

このカーネルは一次元の添え字を使って処理を分けているので、高い確率で問題が発生します: あるワーカーに割り当てられたブロックの後ろの部分に q[i,j,t] があり、別のワーカーに割り当てられたブロックの前の部分に q[i,j,t+1] があると、q[i,j,t+1] の計算で q[i,j,t] が必要になった段階では q[i,j,t] の準備ができていない可能性が非常に高くなります。こういった場合には、配列を手動で切り分けた方がよいでしょう。ここでは二つ目の次元に関する分割を試してみます。まずワーカーに割り当てる区間 (irange, jrange) を返す関数を作ります:

julia> @everywhere function myrange(q::SharedArray)
         idx = indexpids(q)
         if idx == 0 # このワーカーには何も割り当てない
             return 1:0, 1:0
         end
         nchunks = length(procs(q))
         splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
         1:size(q,1), splits[idx]+1:splits[idx+1]
       end

そしてカーネルを定義します:

julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
           @show (irange, jrange, trange)  # 何が起きているかが分かるよう出力する。
           for t in trange, j in jrange, i in irange
               q[i,j,t+1] = q[i,j,t] + u[i,j,t]
           end
           q
       end

SharedArray から簡単に使えるようにラッパーも定義します:

julia> @everywhere advection_shared_chunk!(q, u) =
           advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)

では三つのバージョンを比べてみましょう。一つ目はシングルプロセスで実行する方法です:

julia> advection_serial!(q, u) =
           advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);

二つ目は @distributed を使う方法です:

julia> function advection_parallel!(q, u)
           for t = 1:size(q,3)-1
               @sync @distributed for j = 1:size(q,2)
                   for i = 1:size(q,1)
                       q[i,j,t+1]= q[i,j,t] + u[i,j,t]
                   end
               end
           end
           q
       end;

三つ目は範囲を切り分けてワーカーに任せる方法です:

julia> function advection_shared!(q, u)
           @sync begin
               for p in procs(q)
                   @async remotecall_wait(advection_shared_chunk!, p, q, u)
               end
           end
           q
       end;

SharedArray を作成して関数の性能を計測します。ここでは julia -p 4 として Julia を起動し、ワーカーを三つとしました:

julia> q = SharedArray{Float64,3}((500,500,500));

julia> u = SharedArray{Float64,3}((500,500,500));

JIT コンパイルのために全ての関数を一度実行し、二度目の実行で @time マクロを使います。次の結果となりました:

julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
 830.220 milliseconds (216 allocations: 13820 bytes)

julia> @time advection_parallel!(q, u);
   2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

julia> @time advection_shared!(q,u);
        From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
        From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
        From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
        From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
 238.119 milliseconds (2264 allocations: 169 KB)

advection_shared! の一番の利点はワーカーの間の転送量が最小化され、割り当てられた部分を計算する時間を多く取れることです。

共有配列と分散ガベージコレクション

リモートリファレンスと同様、共有配列に参加している全てのワーカーからの参照を解放するタイミングは作成したノードにおけるガベージコレクションに左右されます。短命の共有配列オブジェクトをいくつも作るコードでは、ファイナライザを可能になった段階で明示的に呼ぶことで性能が向上するでしょう。共有されるセグメントをマップするファイルハンドルとそのメモリ領域が早く解放されるためです。

ClusterManager

論理クラスターにおける Julia プロセスの起動・管理・通信はクラスターマネージャを通して行われます。ClusterManager は次の処理を担当します:

Julia のクラスターは次の特徴を持ちます:

ワーカー間の (組み込みの TCP/IP トランスポートを使った) 通信は次の手順で行われます:

デフォルトのトランスポート層はプレーンな TCPSocket を使いますが、Julia クラスターが独自のトランスポート層を提供することも可能です。

Julia は組み込みで二種類のクラスターマネージャを提供します:

LocalManager は同じホスト上に追加のワーカーを起動し、マルチコア/マルチプロセッサのハードウェアを最大限活用するために使います。

まとめると、最低限のクラスターマネージャに必要な要件は次の通りです:

addprocs(manager::FooManager) を行うためには、FooManager に対する次のメソッドの実装が必要です:

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

例として、同じホストでワーカーを起動する LocalManager がどう実装されているかを見てみましょう:

struct LocalManager <: ClusterManager
    np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

launch の引数の意味は次の通りです:

launch メソッドは個別のタスクとして非同期的に呼ばれます。このタスクが終了すると要求されたワーカーが全て起動したというシグナルが出るので、launch 関数は要求されたワーカーが起動したらすぐに終了しなければなりません。

新しく起動されたワーカーは他のワーカーとマスタープロセスに all-to-all で接続します。Julia を起動するときにコマンドライン引数 --worker[=<cookie>] を指定するとプロセスが最初からワーカーとして初期化され、TCP/IP ソケットを使った接続が設定されます。

クラスター内の全てのワーカーは同じクッキーを共有します。--worker というオプションでクッキーが指定されずに Julia が起動されると、ワーカーは標準入力からクッキーを読み込みます。LocalManagerSSHManager はどちらも新しく起動したワーカーに標準入力を通してクッキーを渡します。

デフォルトでワーカーは getipaddr() が返すアドレスの使われていないポートにリッスンします。コマンドライン引数 --bind-to bind_addr[:port] を指定すれば特定のアドレスにリッスンさせることが可能です。この機能はマルチホームのホストで有用です。

TCP/IP でないトランスポートの一例として、クラスターマネージャの実装は MPI を使って通信を行うこともできます。その場合 --worker を指定してはならず、新しく起動されたワーカーは並列計算の機能を使う前に init_worker(cookie) を呼び出すべきです。

launch メソッドはワーカーを起動するごとに WorkerConfig オブジェクトを (フィールドを適切に初期化した上で) launched 配列に追加する必要があります。WorkerConfig の定義を示します:

mutable struct WorkerConfig
    # 全てのクラスターマネージャに関連する共通フィールド
    io::Union{IO, Nothing}
    host::Union{AbstractString, Nothing}
    port::Union{Integer, Nothing}

    # ホストで追加のワーカーを起動するときに使われるフィールド
    count::Union{Int, Symbol, Nothing}
    exename::Union{AbstractString, Cmd, Nothing}
    exeflags::Union{Cmd, Nothing}

    # 外部のクラスターマネージャはこのフィールドにワーカー単位の情報を保存する。
    # 複数のフィールドが必要なら辞書にもできる。
    userdata::Any

    # SSHManager でワーカーに接続する SSH トンネル接続が利用する。
    tunnel::Union{Bool, Nothing}
    bind_addr::Union{AbstractString, Nothing}
    sshflags::Union{Cmd, Nothing}
    max_parallel::Union{Integer, Nothing}

    # LocalManager/SSHManager の両方が利用する。
    connect_at::Any

    [...]
end

WorkerConfig の大半のフィールドは組み込みのマネージャによって使われます。独自のクラスターマネージャが指定するのは io または host/port だけであるはずです。

manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) はワーカーが生きている間にイベントが起こると呼ばれます。そのとき op は次に示す値のどれかに設定されます:

独自トランスポートを使ったクラスターマネージャ

デフォルトで使われる all-to-all の TCP/IP ソケット接続を独自のトランスポート層に置き換えるには、もう少し込み入った設定が必要です。各 Julia プロセスは接続しているワーカーと同じ数の通信タスクを持ちます。例えば 32 プロセスの Julia クラスターが all-to-all のメッシュネットワークで接続しているとします。このとき:

デフォルトのトランスポート層を置き換えるのに必要なのは、リモートワーカーへの接続を確立する処理と、メッセージ処理ループが待機できる適切な IO オブジェクトです。独自のマネージャが実装すべき特別なコールバックは次の二つです:

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

connect(manager::ClusterManager, pid::Integer, config::WorkerConfig) がデフォルト実装です (TCP/IP ソケットが使われます)。

connectIO オブジェクトの組を返すべきです。一つはワーカー pid から送信されるデータを読み込むためのオブジェクトで、もう一つはワーカー pid へ送信するデータを書き込むためのオブジェクトです。独自のトランスポート層と Julia 組み込みの並列インフラストラクチャの間でデータを転送するときは、インメモリの BufferStream をパイプとして利用できます (トランスポートは IO でなくても構いません)。

BufferStream はインメモリの IOBuffer であり、IO のように振る舞います ──非同期的に扱えるストリームです。

サンプルコードレポジトリclustermanager/0mq フォルダには ZeroMQ を使って Julia ワーカーをスター型トポロジーに接続する例があります (中間に ZeroMQ ブローカーがあります)。注意: この場合でも論理的には全ての Julia プロセスが互いに接続しています ──任意のワーカーは任意のワーカーに直接メッセージを送信でき、そのときトランスポート層に ZeroMQ が使われていることを意識する必要はありません。

独自のトランスポート層を使うときは:

kill(manager, pid, config) はクラスターからワーカーを削除するときに呼ばれます。これを受けてマスタープロセスでは、クラスターマネージャの実装が対応する IO オブジェクトを閉じて適切なクリーナップを行う必要があります。デフォルトの実装は単純に指定されたリモートワーカーで exit() を実行させるだけです。

サンプルコードレポジトリclustermanager/simple フォルダにはクラスターの設定に UNIX ドメインソケットを使った簡単な実装があります。

LocalManagerSSHManager に対するネットワーク要件

Julia クラスターはローカル PC や機関が保有するクラスター、あるいはクラウドといったインフラストラクチャ上に構築された既にセキュアであることが保証されている環境で実行されるものとして設計されています。この節では組み込みの LocalManagerSSHManager が持つネットワークセキュリティ要件を説明します。

クラスターに含まれる全てのプロセスは同じクッキーを共有します。デフォルトではマスタープロセスで生成されたランダム文字列が使われます。

より高度なセキュリティは独自の ClusterManager を実装することで実現できます。例えばクッキーを事前に共有してスタートアップ時の引数としては指定しない方法などが考えられます。

ネットワークトポロジーの指定 (実験的)

addprocs にキーワード引数 topology を渡すと、ワーカーが互いにどのように接続するかを指定できます:

キーワード引数 lazy=true|falsetopology オプションが :all_to_all のときにだけ効果を持ちます。lazytrue だとクラスターはマスターが全てのワーカーと接続された状態で開始され、ワーカー間の接続はその間でリモートコールがあったときに始めて確立されます。こうするとクラスター内通信のために確保される初期リソースが節約できます。接続は並列プログラムの実行時における要件によって設定されるということです。lazy のデフォルト値は true です。

現在のバージョンでは、接続されていないワーカー間でメッセージを送るとエラーが生じます。この振る舞い (機能とインターフェース) は実験的とみなされるべきであり、将来のリリースで変更される可能性があります。

注目に値する外部パッケージ

Julia に組み込みの並列計算機構の他にも、言及しておくべき外部パッケージがたくさんあります。例えば MPI.jl は MPI プロトコルに対する Julia ラッパーであり、共有配列の説明では DistributedArrays.jl に触れました。

さらに Julia の GPU プログラミングエコシステムにも言及しておかなければなりません:

  1. 低水準な (C カーネルを使った) 操作には OpenCL.jlCUDAdrv.jl が利用できます。それぞれ OpenCL インターフェースと CUDA ラッパーです。

  2. CUDAnative.jl のような低水準インターフェース (Julia カーネル) もあります。これは Julia ネイティブな CUDA 実装です。

  3. CuArrays.jlCLArrays.jl は特定ベンダーに対する高水準な抽象化を提供します。

  4. ArrayFire.jlGPUArrays.jl は GPU プログラミング用の高水準ライブラリです。

[訳注: 現在 CUDAdrv.jl, CUDAnative.jl, CuArrays.jl は CUDA.jl に統合され、個別のパッケージは非推奨となっています。以下の説明は非推奨の個別パッケージに対するものです。]

DistributedArrays.jl と CuArrays.jl を使って複数のプロセスに配列を分散させる例を次に示します。最初に distribute() を使い、その後に CuArray() を使っています。

DistributedArrays.jl をインポートするときは、@everywhere マクロを使って全てのプロセスにインポートすることを忘れないでください:

$ ./julia -p 4

julia> addprocs()

julia> @everywhere using DistributedArrays

julia> using CuArrays

julia> B = ones(10_000) ./ 2;

julia> A = ones(10_000) .* π;

julia> C = 2 .* A ./ B;

julia> all(C .≈ 4*π)
true

julia> typeof(C)
Array{Float64,1}

julia> dB = distribute(B);

julia> dA = distribute(A);

julia> dC = 2 .* dA ./ dB;

julia> all(dC .≈ 4*π)
true

julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}

julia> cuB = CuArray(B);

julia> cuA = CuArray(A);

julia> cuC = 2 .* cuA ./ cuB;

julia> all(cuC .≈ 4*π);
true

julia> typeof(cuC)
CuArray{Float64,1}

Julia の一部の機能は CUDAnative.jl によってサポートされないことに注意してください。特に sin などの関数は CUDAnative.sin とする必要があります。

DistributedArrays.jl と CuArrays.jl の両方を使って配列を複数プロセスに分散し、総称関数をそれに対して呼び出す例をこれから示します。

function power_method(M, v)
    for i in 1:100
        v = M*v
        v /= norm(v)
    end

    return v, norm(M*v) / norm(v)  # または (M*v) ./ v
end

power_method は何度も新しいベクトルを作成し、最後にベクトルを正規化します。関数の宣言には型シグネチャを指定していませんが、これを上述のパッケージの型に適用するとどうなるかを見ましょう:

julia> M = [2. 1; 1 1];

julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877

julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)

julia> cuM = CuArray(M);

julia> cuv = CuArray(v);

julia> curesult = power_method(cuM, cuv);

julia> typeof(curesult)
CuArray{Float64,1}

julia> dM = distribute(M);

julia> dv = distribute(v);

julia> dC = power_method(dM, dv);

julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}

外部パッケージの簡単な利用例な紹介の締めくくりとして、MPI プロトコルの Julia ラッパー MPI.jl を考えます。全ての内部関数を見ていくのは時間がかかり過ぎるので、プロトコルの実装で使われているアプローチを簡単に紹介します。

次に簡単なスクリプトを示します。このスクリプトは各サブプロセスを呼び出し、ランクをインスタンス化し、マスタープロセスでランクの和を計算します:

import MPI

MPI.Init()

comm = MPI.COMM_WORLD
MPI.Barrier(comm)

root = 0
r = MPI.Comm_rank(comm)

sr = MPI.Reduce(r, MPI.SUM, root, comm)

if(MPI.Comm_rank(comm) == root)
   @printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()

このスクリプトは次のコマンドで実行します:

$ mpirun -np 4 ./julia example.jl

  1. ここで言う MPI とは MPI-1 規格のことです。MPI 標準化委員会は MPI-2 で Remote Memory Access (RMA) と呼ばれる新しい種類の通信機構を導入しました。RMA が MPI 規格に追加されたのは、片方向通信を行うパターンを簡単にするためです。最新の MPI 規格についてさらに詳しくは https://mpi-forum.org/docs を参照してください。[return]

日本語 Julia 書籍 (Amazon アソシエイト)
1 から始める Julia プログラミング
Julia プログラミングクックブック―言語仕様からデータ分析、機械学習、数値計算まで
スタンフォード ベクトル・行列からはじめる最適化数学