Distributed

Distributed.addprocs ── 関数
addprocs(manager::ClusterManager; kwargs...) -> プロセス ID のリスト

指定したクラスターマネージャを通してワーカープロセスを起動します。

例えば Beowulf クラスターは ClusterManagers.jl が実装する独自のクラスターマネージャを通してサポートされます。

新しく起動したワーカーがマスターからの接続確立を待機する秒数は、ワーカープロセスの環境変数 JULIA_WORKER_TIMEOUT で指定できます。この値は TCP/IP トランスポートを利用するときにだけ意味を持ちます。

REPL をブロックせずにワーカーを起動するには、addprocs を個別のタスクとして実行してください。ワーカーをプログラム的に起動する関数でも同様です。

# 忙しいクラスターでは、addprocs を非同期的に呼び出す。
t = @async addprocs(...)
# オンラインになったワーカーを利用する。
if nprocs() > 1   # 少なくとも一つの新しいワーカーが利用可能なことを確認する。
   ....   # 分散計算を行う
end
# 新しく起動したワーカーの ID またはエラーメッセージを取得する。
if istaskdone(t)   # fetch がブロックしないように addprocs が完了したことを確認する。
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
  end
addprocs(machines;
         tunnel=false,
         sshflags=``,
         max_parallel=10,
         kwargs...) -> プロセス ID のリスト

SSH を通してリモートマシン上にプロセスを追加します。各ノードに julia が必要であり、同じ場所にインストールされているか、共有ファイルシステムで利用可能でなければなりません。

machines はマシン仕様 (machine specification) のベクトルです。ワーカーはマシン仕様ごとに起動されます。

マシン仕様は文字列 machine_spec またはタプル (machine_spec, count) で表されます。

machine_spec[[email protected]]host[:port] [bind_addr[:port]] という形の文字列です。デフォルトでは user は現在のユーザー、port は標準の SSH ポートとなります。[bind_addr[:port]] が指定されると、このワーカーに対する他のワーカーからの接続は指定された bind_addrport を使うようになります。

count は指定されたホストで起動されるワーカーの個数です。:auto を指定すると、そのホストにおける CPU スレッドと同じ個数のワーカーが起動します。

キーワード引数

  • tunnel: true だと、マスタープロセスからワーカーへの接続で SSH トンネリングが使われます。デフォルトは false です。

  • multiplex: true だと、SSH トンネリングで SSH 多重化が使われます。デフォルトは false です。

  • sshflags: SSH の追加オプションを指定します。例えば sshflags=`-i /home/foo/bar.pem` などとします。

  • max_parallel: ホストへ同時に接続できるワーカーの個数の上限です。デフォルトは 10 です。

  • dir: ワーカーのワーキングディレクトリを指定します。デフォルトはホストのカレントディレクトリ (pwd() の返り値) です。

  • enable_threaded_blas: true だと、追加されたプロセスで BLAS がマルチスレッドで実行されるようになります。デフォルトは false です。

  • exename: julia 実行形式の名前です。デフォルトは場合に応じて "$(Sys.BINDIR)/julia" または "$(Sys.BINDIR)/julia-debug" です。

  • exeflags: ワーカープロセスに渡される追加のフラグです。

  • topology: ワーカー同士がどのように接続するかを指定します。接続されていないワーカー間でメッセージを送信するとエラーとなります。

    • topology=:all_to_all: 全てのプロセスが互いに接続します。デフォルトです。

    • topology=:master_worker: pid が 1 のドライバプロセスだけがワーカーと接続し、ワーカー同士は接続しません。

    • topology=:custom: クラスターマネージャの launch メソッドが WorkerConfigidentconnect_idents のフィールドを通じて接続トポロジーを指定します。ident というクラスターマネージャ ID を持つワーカーは connect_idents に含まれる全てのワーカーと接続します。

  • lazy: topology=:all_to_all のときにだけ意味を持ちます。true だと、ワーカー同士の接続確立が遅延されます。つまりワーカー間で初めてリモートコールが起こったときに接続がセットアップされます。デフォルトは true です。

環境変数

マスタープロセスが新しく起動したワーカーとの接続を 60.0 秒が経過するまでの間に確立できなかった場合、ワーカーはそれを致命的な状況と扱い、終了します。このタイムアウトは環境変数 JULIA_WORKER_TIMEOUT で制御でき、マスタープロセスにおける JULIA_WORKER_TIMEOUT の値が新しいワーカーが接続確立を待機する秒数を指定します。

addprocs(; kwargs...) -> プロセス ID のリスト

addprocs(Sys.CPU_THREADS; kwargs...) と等価です。

ワーカーはスタートアップスクリプト .julia/config/startup.jl を実行しないことに注意してください。またグローバルな状態 (グローバル変数・新しいメソッドの定義・読み込まれたモジュールなど) を実行中の他プロセスと同期することもありません。

addprocs(np::Integer; restrict=true, kwargs...) -> プロセス ID のリスト

組み込みの LocalManager を使ってワーカーを起動します。このマネージャはローカルホストでのみワーカーを起動するので、マルチコアを活用できます。例えば addprocs(4) はローカルマシンに四つのプロセスを追加します。restricttrue だとバインド先が 127.0.0.1 に制限されます。キーワード引数 dir, exename, exeflags, topology, lazy, enable_threaded_blas の効果は addprocs のドキュメントにある通りです。

Distributed.nprocs ── 関数
nprocs()

利用可能なプロセスの個数を返します。

julia> nprocs()
3

julia> workers()
5-element Array{Int64,1}:
 2
 3
Distributed.nworkers ── 関数
nworkers()

利用可能なワーカープロセスの個数を返します。これは nprocs() より 1 小さい値です。ただし nprocs() == 1 のときは nprocs()1 を返します。

$ julia -p 5

julia> nprocs()
6

julia> nworkers()
5
Distributed.procs ── メソッド
procs()

全てのプロセス ID のリストを返します。workers() の返り値は 1 を含みませんが、procs の返り値は含みます。

$ julia -p 5

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
Distributed.procs ── メソッド
procs(pid::Integer)

同じ物理ノード上にある全てのプロセス ID のリストを返します。正確に言うと、pid と同じ IP アドレスにバインドされた全てのワーカーが返ります。

Distributed.workers ── 関数
workers()

全てのワーカープロセス ID のリストを返します。

$ julia -p 5

julia> workers()
2-element Array{Int64,1}:
 2
 3
Distributed.rmprocs ── 関数
rmprocs(pids...; waitfor=typemax(Int))

指定されたワーカーを削除します。ワーカーを追加/削除できるのはプロセス 1 だけであることに注意してください。

キーワード引数 waitfor はワーカーがシャットダウンするまで待つ秒数を指定します。

  • waitfor を指定しないと、rmprocs は要求された pids が全て削除されるまで待ちます。
  • 要求した waitfor 秒までの間に全てのワーカーが終了できないと、ErrorException が送出されます。
  • waitfor0 だと rmprocs はすぐに返り、違うタスクでワーカーの削除がスケジュールされます。スケジュールされた Task オブジェクトが返ります。ユーザーは他の並列な関数を呼び出す前に rmprocs が返すタスクに対して wait を行うべきです。

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
Distributed.interrupt ── 関数
interrupt(pids::Integer...)

指定されたワーカーで現在実行中のタスクを中断します。これはローカルマシンで Ctrl-C を入力するのと等価です。引数が与えられなければ、全てのワーカーが中断されます。

interrupt(pids::AbstractVector=workers())

指定されたワーカーで現在実行中のタスクを中断します。これはローカルマシンで Ctrl-C を入力するのと等価です。引数が与えられなければ、全てのワーカーが中断されます。

Distributed.myid ── 関数
myid()

現在のプロセスの ID を取得します。

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap ── 関数
pmap(f,
     [::AbstractWorkerPool],
     c...;
     distributed=true,
     batch_size=1,
     on_error=nothing,
     retry_delays=[],
     retry_check=nothing) -> collection

コレクション c を変形します。利用可能なワーカーとタスクを使って c の各要素に f が適用されます。

c に複数のコレクションが渡されると、それぞれから要素を一つずつ取ったものが f の引数となります。

f は全てのワーカープロセスで利用可能である必要があることに注意が必要です。詳細はマニュアルのパッケージの読み込みとコードの可視性の章を参照してください。

ワーカープールが指定されないと、利用可能な全てのワーカー (デフォルトのワーカープール) が使われます。

デフォルトで pmap は指定された全てのワーカーに計算を分散します。ローカルプロセスだけを使ってタスクで計算を分散させるには distributed=false を指定してください。このときの振る舞いは asyncmap と等価であり、例えば pmap(f, c; distributed=false)asyncmap(f, c; ntasks=()->nworkers()) と同じです。

引数 batch_size を使うと、pmap でプロセスとタスクを混ぜて利用できます。batch_size を 1 より大きくすると、コレクションは複数のバッチで処理されます。各バッチは batch_size 以下の長さを持ち、利用可能なワーカーに単一のリクエストとして送られます。これに対してローカルな asyncmap では、各要素はバッチの一部として複数の並行なタスクを使って処理されます。

エラーが一つでも発生すると、pmap はコレクションの残りの部分の処理を止めます。この振る舞いを上書きするには、引数 on_error にエラー処理関数を指定してください。この関数はエラーを唯一の引数として受け取り、エラーを再度送出して処理を止めるか、何らかの値を返して処理を継続させるかを選択できます。後者では返した値が結果として呼び出し側に返ります。

on_error の使用例を二つ示します。一つ目では例外オブジェクトが返り値の配列に組み込まれ、二つ目では例外が発生した場所が 0 となっています:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

失敗した計算をやり直すことでエラーを処理することもできます。pmap のキーワード引数 retry_delaysretry_checkretry にキーワード引数 delays および check として渡されます。バッチを使っていてバッチ全体が失敗したときは、そのバッチに含まれる要素が全てやり直されます。

on_errorretry_delays が両方指定されると、やり直しの前に on_error が呼ばれます。on_error が例外を (再) 送出しなかった場合には、要素に対する処理のやり直しは行われません。

例: 次の呼び出しはエラーが起きた要素に対して f を最大三回やり直します。やり直しの間に遅延は挿入されません。

pmap(f, c; retry_delays = zeros(3))

例: 次の呼び出しは例外が InexactError でないときに限って f をやり直します。やり直しは指数的に長くなる遅延を使って最大三回行われます。InexactError が起こった場所には NaN が挿入されます。

pmap(f, c;
     on_error = e->(isa(e, InexactError) ? NaN : rethrow()),
     retry_delays = ExponentialBackOff(n = 3))
RemoteException(captured)

リモート計算で発生した例外をローカルでキャプチャして再度送出するときに使われる例外型です。RemoteException 型の値はワーカーの pid とキャプチャされた例外を包みます。CapturedException 型のフィールド captured は例外が送出された時点におけるリモートのコールスタックをシリアライズ可能な形にしたものと例外オブジェクトを保持します。

Distributed.Future ──
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Future は終了までの時間と終了状態が未知である単一の計算を表すプレースホルダーです。複数の計算に対しては RemoteChannel を使ってください。AbstractRemoteRef の特定には remoteref_id が利用できます。

RemoteChannel(pid::Integer=myid())

プロセス pid に存在する Channel{Any}(1) に対する参照を作成します。デフォルトの pid は現在のプロセスです。

RemoteChannel(f::Function, pid::Integer=myid())

指定されたサイズと型を持つリモートのチャンネルに対する参照を作成します。fpid を受け取って AbstractChannel の実装を返す関数でなければなりません。

例えば RemoteChannel(()->Channel{Int}(10), pid) は、pid に存在するサイズ 10 で Int 型のチャンネルに対する参照を返します。

Base.fetch ── メソッド
fetch(x::Future)

Future を待機し、その値を取得します。フェッチされた値はローカルでキャッシュされ、一度呼び出した後に同じ参照に対して fetch を呼び出すとキャッシュされた値が返ります。リモートが返した値が例外の場合は、リモートの例外とバックトレースを捕捉した RemoteException を送出します。

Base.fetch ── メソッド
fetch(c::RemoteChannel)

RemoteChannel を待機し、その値を取得します。リモートで発生した例外は Future と同じように処理されます。フェッチした要素を削除しません。

Distributed.remotecall ── メソッド
remotecall(f, id::Integer, args...; kwargs...) -> Future

関数 f を指定した引数を使って指定したプロセスで非同期に呼び出します。Future を返します。キーワード引数があれば f にそのまま渡されます。

Distributed.remotecall_wait ── メソッド
remotecall_wait(f, id::Integer, args...; kwargs...)

メッセージを一つだけ用いる高速な wait(remotecall(...)) を行います。関数はワーカー ID が idWorker で実行されます。キーワード引数があれば f にそのまま渡されます。

wait, remotecall も参照してください。

Distributed.remotecall_fetch ── メソッド
remotecall_fetch(f, id::Integer, args...; kwargs...)

メッセージを一つだけ使って fetch(remotecall(...)) を行います。キーワード引数があれば f にそのまま渡されます。リモートで発生した例外はキャプチャされて RemoteException として送出されます。

参照:fetch, remotecall

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
Distributed.remote_do ── メソッド
remote_do(f, id::Integer, args...; kwargs...) -> nothing

ワーカー idf を非同期に実行します。remotecall と異なり、計算の結果は保存されず、終了を待つことはできません。

成功した remote_do の呼び出しは実行の要求がリモートノードで受け付けられたことを意味します。

同じワーカーに対する連続した remotecall は呼び出しの順番でシリアライズされますが、リモートワーカーにおける実行の順序は決まっていません。例えば remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) とすると f1, f2, f3 という順序で呼び出しがシリアライズされますが、ワーカー 2 で f1f3 より先に実行されることは保証されません。

f から送出された任意の例外はリモートワーカーの stderr に出力されます。

キーワード引数があれば f にそのまま渡されます。

Base.put! ── メソッド
put!(rr::RemoteChannel, args...)

RemoteChannel に渡された値を格納します。チャンネルが満杯なら、空間が空くまでブロックします。第一引数を返します。

Base.put! ── メソッド
put!(rr::Future, v)

Future 型の値 rr に値を格納します。Future は一度だけ書き込み可能なリモートリファレンスです。既に値を持つ Future に対して put! すると Exception が送出されます。全ての非同期なリモートコールは Future を返し、呼び出しの終了時にその Future に返り値が設定されます。

Base.take! ── メソッド
take!(rr::RemoteChannel, args...)

RemoteChannel 型の値 rr から値をフェッチします。フェッチした値は rr から削除されます。

Base.isready ── メソッド
isready(rr::RemoteChannel, args...)

RemoteChannel が値を保持しているかどうかを判定します。この関数に頼って処理を行うと競合状態が起こる可能性があることに注意してください。isready を呼び出したプロセスが返り値を受け取るときには条件が成り立たなくなっている場合があるためです。ただし Future は一度だけ値が設定されるので、Future に対しては安全に使用できます。

Base.isready ── メソッド
isready(rr::Future)

Future が値を保持しているかどうかを判定します。

引数の Future が他のノードによって所有されている場合、この関数の呼び出しは返り値が得られるまでブロックします。別のタスクで rr を待機するか、ローカルの Channel をプロキシとして使うことが推奨されます:

p = 1
f = Future(p)
@async put!(f, remotecall_fetch(long_computation, p))
isready(f)  # ブロックしない
AbstractWorkerPool

WorkerPoolCachingPool といったワーカープールを表す上位型です。AbstractWorkerPool が実装すべきメソッドは次の通りです:

  • push!: overall プール (利用可能なワーカーおよび実行中のワーカーが入るプール) に新しいワーカーを追加します。
  • put!: ワーカーを available プールに入れます。
  • take!: available プールから (リモートコールのために) ワーカーを取得します。
  • length: overall プールにある利用可能なワーカーの個数を返します。
  • isready: プールに対する take! がブロックするなら false を、ブロックしないなら true を返します。

これらのメソッドの (AbstractWorkerPool に対する) デフォルト実装は次のフィールドを必要とします:

  • channel::Channel{Int}
  • workers::Set{Int}

channel には利用可能なワーカーのプロセス ID が格納されます。workers はこのプールに関連付いた全てのワーカーのプロセス ID の集合です。

WorkerPool(workers::Vector{Int})

ワーカー ID のベクトルから WorkerPool を作成します。

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
CachingPool(workers::Vector{Int})

AbstractWorkerPool の実装です。remote, remotecall_fetch, pmap (および関数をリモートで実行する他のリモートコール) では、CachingPool 型のワーカープールを使ってワーカーノードにある関数 (特に大きなデータをキャプチャする可能性のあるクロージャ) をシリアライズ/デシリアライズした形で保持することで効率を向上できます。

リモートキャッシュは返される CachingPool オブジェクトと同じ寿命を持ちます。それよりも早くキャッシュを削除するには clear!(pool) を使ってください。

クロージャでグローバル変数をキャプチャするとき、キャプチャされるのは束縛だけであり、データはキャプチャされません。グローバル変数のデータもキャプチャするには let ブロックを使ってください。

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(wp, i -> sum(foo) + i, 1:100);
end

このコードは foo を各ノードに一度だけ送信します。

default_worker_pool()

アイドル状態の worker を保持する WorkerPool です。remote(f)pmap がデフォルトで利用します。

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear! ── メソッド
clear!(pool::CachingPool) -> pool

pool に含まれるワーカーからキャッシュされた関数を全て削除します。

Distributed.remote ── 関数
remote([p::AbstractWorkerPool], f) -> Function

remotecall_fetch を使って f を利用可能なワーカーで実行する無名関数を返します。WorkerPool 型の値 p が与えられれば、f はそこから取ってきたワーカーで実行されます。

Distributed.remotecall ── メソッド
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool を受け取るバージョンの remotecall(f, pid, ....) です。pool でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall を実行します。

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

この例では、ID 1 のプロセスが呼び出したタスクが ID 2 のプロセスで実行されます。

Distributed.remotecall_wait ── メソッド
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool を受け取るバージョンの remotecall_wait(f, pid, ....) です。pool でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_wait を実行します。

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch ── メソッド
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool を受け取るバージョンの remotecall_fetch(f, pid, ....) です。pool でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_wait を実行します。

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do ── メソッド
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

WorkerPool を受け取るバージョンの remote_do(f, pid, ....) です。pool でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_do を実行します。

[email protected] ── マクロ
@spawnat p expr

式を包むクロージャを作成し、それをプロセス p で非同期に実行します。返り値に対する Future を返します。p がクオートされたリテラルシンボル :any なら、expr を実行するプロセスはシステムによって自動的に選択されます。

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

引数 :any は Julia 1.3 以降でサポートされます。

[email protected] ── マクロ
@fetch expr

fetch(@spawnat :any expr) と等価です。fetch@spawnat を参照してください。

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
[email protected] ── マクロ
@fetchfrom

fetch(@spawnat p expr) と等価です。fetch@spawnat を参照してください。

julia> addprocs(3);

julia> @fetchfrom 2 myid()
2

julia> @fetchfrom 4 myid()
4
[email protected] ── マクロ
@distributed

分散メモリを使った並列 for ループです。次の形式で使います:

@distributed [reducer] for var = range
    body
end

指定された区間 range が分割されて各ワーカーに割り当てられ、本体 body の実行はワーカーでローカルに行われます。省略可能な縮約関数 reducer を指定すると、@distributed はローカルな縮約を各ワーカーで実行し、それから呼び出したプロセスで最後の縮約を実行します。

縮約関数を使用しないと、@distributed は非同期に実行されます。つまり利用可能な全てのワーカーで独立したタスクが起動し、終了を待つことなくマクロはすぐに返ります。タスクの終了を待つには、次のように @sync を最初に付けてください:

@sync @distributed for var = range
    body
end
[email protected] ── マクロ
@everywhere [procs()] expr

procs に含まれる全てのプロセスで式 exprMain の下で実行します。任意のプロセスで起きたエラーは収集され、単一の CompositeException として送出されます。例えば

@everywhere bar = 1

Main.bar を現在起動している全てのプロセスで定義します。後から (addprocs() などで) 追加されるプロセスでは Main.bar は定義されません。

@spawnat とは異なり、@everywhere はローカル変数を一切キャプチャしません。その代わり、ローカル変数は補間を使ってブロードキャストできます:

foo = 1
@everywhere bar = $foo

省略可能な引数 procs を使うと、式を実行するプロセスをプロセス全体の部分集合にできます。

remotecall_eval(Main, procs, expr) の呼び出しと等価です。

Distributed.clear! ── メソッド
clear!(syms, pids=workers(); mod=Main)

モジュールに含まれるグローバル束縛を削除します。削除は束縛を nothing に更新することで行われます。symsSymbol 型の値もしくは Symbol のコレクションである必要があります。pidmod は再初期化するグローバル変数が存在するプロセスとモジュールを表します。mod で定義されていることが確認できた名前だけが削除されます。

グローバル定数の削除が要求された場合には例外が送出されます。

Distributed.remoteref_id ── 関数
remoteref_id(r::AbstractRemoteRef) -> RRID

FutureRemoteChannel は次のフィールドで識別されます:

  • where: リモートリファレンスが指すオブジェクト/ストレージが実際に存在するノードです。

  • whence: リモートリファレンスが作成されたノードです。これはリモートリファレンスが指すオブジェクトが実際に存在するノードと異なる概念であることに注意してください。例えばマスタープロセスで RemoteChannel(2) とすると、where2 であるのに対して whence1 となります。

  • id: リモートリファレンスの識別子です。識別子 whence を持つワーカーで作成されるリモートリファレンスは全て異なる id を持ちます。

まとめると、whenceid があれば全てのワーカーが持つリモートリファレンスの中から一つをユニークに識別できます。

remoteref_id はリモートリファレンスの whenceid を包んだ RRID オブジェクトを返す低水準 API です。

channel_from_id(id) -> c

remoteref_id が返す id を受け取って、対応する AbstractChannel を返す低水準 API です。この関数は対応するチャンネルが存在するノードでのみ呼び出せます。

worker_id_from_socket(s) -> pid

IO 接続または Worker を受け取り、それが接続しているワーカーの pid を返す低水準 API です。これは何らかの型に対して独自の serialize メソッドを書くときに、データを受け取るプロセス ID に応じて書き込むデータを最適化する場合に有用です。

cluster_cookie() -> cookie

クラスタークッキーを返します。

cluster_cookie(cookie) -> cookie

与えられたクッキーをクラスタークッキーに設定します。cookie を返します。

ClusterManager インターフェース

このインターフェースは様々なクラスター環境で Julia ワーカーを起動・管理する仕組みを提供します。Base には二種類のクラスターマネージャがあります: 同じホストで追加のワーカーを起動する LocalManager と、SSH を使ってリモートホストでワーカーを起動する SSHManager です。プロセス間の接続とメッセージのトランスポートには TCP/IP ソケットが使われますが、クラスターマネージャは異なるトランスポートを提供することもできます。

ClusterManager

クラスターマネージャを表す上位型です。ワーカープロセスをクラスターとして管理します。クラスターマネージャはワーカーの追加・削除・通信の方式を実装します。SSHManagerLocalManagerClusterManager の部分型です。

WorkerConfig

クラスターに追加されるワーカーを制御するために ClusterManager が使う型です。いくつかのフィールドはホストにアクセスするために全てのクラスターマネージャによって利用されます:

  • io: ワーカーにアクセスするときに使う接続 (IO もしくは Nothing)
  • host: ホストのアドレス (AbstractString もしくは Nothing)
  • port: ワーカーに接続するときに使うホストのポート番号 (Int もしくは Nothing)

いくつかのフィールドはクラスターマネージャが初期化済みのホストにワーカーを追加するときに利用されます:

  • count: ホストで起動するワーカーの個数
  • exename: ホストにおける Julia 実行形式のパス (デフォルトは "$(Sys.BINDIR)/julia" もしくは "$(Sys.BINDIR)/julia-debug")
  • exeflags: Julia をリモートで起動するときに使うフラグ

userdate フィールドは外部マネージャが各ワーカーの情報を格納するときに利用されます。

いくつかのフィールドは SSHManager およびそれに似たマネージャによって利用されます:

  • tunnel: true (トンネリングを行う)・false (トンネリングを行わない)・nothing (マネージャのデフォルトを使う) のいずれか
  • multiplex: true (SSH トンネリングで多重化を使う) もしくは false (使わない)
  • forward: SSH の -L オプションで使われるフォワーディングオプション
  • bind_addr: バインドを行うリモートホストのアドレス
  • sshflags: SSH 接続を確立するときに使うフラグ
  • max_parallel: ホストに並列に接続できるワーカーの個数の最大値

いくつかのフィールドは LocalManagerSSHManager の両方によって利用されます:

  • connect_at: セットアップされる通信がワーカー-ワーカー間とドライバ-ワーカー間のどちらか
  • process: 接続されるプロセス (通常は addprocs 中にマネージャが値を代入する)
  • ospid: ホスト OS におけるプロセス ID (ワーカープロセスを停止するときに使われる)
  • environ: LocalManager/SSHManager が一時的な情報を格納するのに使うプライベートな辞書
  • ident: ClusterManager がワーカーの識別に使う ID
  • connect_idents: 独自のトポロジーを使っているとき、このワーカーが接続しなければならないワーカーの ID のリスト
  • enable_threaded_blas: ワーカーでスレッド化された BLAS を使うかどうか (true, false, nothing のいずれか)
Distributed.launch ── 関数
launch(manager::ClusterManager,
       params::Dict,
       launched::Array,
       launch_ntfy::Condition)

クラスターマネージャによって実装されます。クラスターマネージャはこの関数で起動した Julia ワーカーのそれぞれに対して WorkerConfiglaunched に追加し、ワーカーが全て起動したら launch_ntfynotify するべきです。この関数は必ず、マネージャが要求したワーカーの起動が全て完了してから返る必要があります。paramsaddprocs を呼び出すときに使われたキーワード引数を全て納めた辞書です。

Distributed.manage ── 関数
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

クラスターマネージャによって実装されます。ワーカーでライフタイムに関するイベントが起こるたびにマスタープロセスで呼び出され、そのとき op が何が起こったかを示します:

  • :register/:deregister: ワーカーが Julia のワーカープールから追加/削除されたことを示します。
  • :interrupt: interrupt(workers) が呼ばれたことを示します。ClusterManager は適切なワーカーに停止シグナルを送るべきです。
  • :finalize: クリーンアップ用途で使われます。
Base.kill ── メソッド
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

クラスターマネージャによって実装されます。rmprocs を実行したときにマスタープロセスで呼び出されます。この関数は pid で指定されるリモートワーカーを終了させるべきです。kill(manager::ClusterManager.....)pid 上でリモートに exit() を実行します。

Sockets.connect ── メソッド
connect(manager::ClusterManager,
        pid::Int,
        config::WorkerConfig) -> (instrm::IO, outstrm::IO)

独自のトランスポートを使うクラスターマネージャで実装されます。この関数は config が指定する論理接続を ID が pid のワーカーに対して確立し、IO オブジェクトの組を返すべきです。pid から現在のプロセスへのメッセージは instrm から読めるようになり、pid へのメッセージは outstrm に書き込まれます。独自のトランスポート実装はメッセージの送受信が完全な形で順序を保って行われることを保証する必要があります。connect(manager::ClusterManager.....) は TCP/IP ソケット接続をワーカー間にセットアップします。

Distributed.init_worker ── 関数
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

独自のトランスポートを実装するクラスターマネージャによって呼ばれます。この関数は新しく起動されたプロセスをワーカーとして初期化します。コマンドライン引数 --worker[=<cookie>] を指定すると、プロセスはトランスポートに TCP/IP ソケットを使うワーカーとして初期化されます。cookiecluster_cookie を表します。

Distributed.start_worker ── 関数
start_worker([out::IO=stdout],
             cookie::AbstractString=readline(stdin);
             close_stdin::Bool=true,
             stderr_to_stdout::Bool=true)

start_worker は内部で使われる関数であり、TCP/IP で接続するワーカープロセスのデフォルトのエントリーポイントです。Julia クラスターワーカーとしてプロセスをセットアップします。

host:port の情報がストリーム out (デフォルトは stdout) に書き込まれます。

この関数は必要ならクッキーを stdin から読み、空いているポート (コマンドライン引数 --bind-to が指定されればそのポート) にリッスンし、内向きの TCP 接続とリクエストを処理するタスクをスケジュールします。さらに stdin を閉じ、stderrstdout にリダイレクトします (どちらも省略可能)。

この関数は返りません。

process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

独自のトランスポートを使うクラスターマネージャによって呼ばれます。この関数は独自のトランスポート実装がリモートワーカーからの最初のメッセージを受信したときに呼ばれるべきです。独自のトランスポートはリモートワーカーへの論理接続を管理し、二つの IO オブジェクトを提供しなければなりません。IO オブジェクトの一つは内向きのメッセージのためで、もう一つはリモートワーカーに向かうメッセージのためです。incomingtrue だと、リモートピアが接続を開始したことを表します。接続を開始した方が認証ハンドシェイクを実行するためのクラスタークッキーと Julia のバージョン番号を送信します。

cluster_cookie も参照してください。