Distributed
Distributed.addprocs
── 関数
addprocs(manager::ClusterManager; kwargs...) -> プロセス ID のリスト
指定したクラスターマネージャを通してワーカープロセスを起動します。
例えば Beowulf クラスターは ClusterManagers.jl
が実装する独自のクラスターマネージャを通してサポートされます。
新しく起動したワーカーがマスターからの接続確立を待機する秒数は、ワーカープロセスの環境変数 JULIA_WORKER_TIMEOUT
で指定できます。この値は TCP/IP トランスポートを利用するときにだけ意味を持ちます。
REPL をブロックせずにワーカーを起動するには、addprocs
を個別のタスクとして実行してください。ワーカーをプログラム的に起動する関数でも同様です。
例
# 忙しいクラスターでは、addprocs を非同期的に呼び出す。
t = @async addprocs(...)
# オンラインになったワーカーを利用する。
if nprocs() > 1 # 少なくとも一つの新しいワーカーが利用可能なことを確認する。
.... # 分散計算を行う
end
# 新しく起動したワーカーの ID またはエラーメッセージを取得する。
if istaskdone(t) # fetch がブロックしないように addprocs が完了したことを確認する。
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines;
tunnel=false,
sshflags=``,
max_parallel=10,
kwargs...) -> プロセス ID のリスト
SSH を通してリモートマシン上にプロセスを追加します。各ノードに julia
が必要であり、同じ場所にインストールされているか、共有ファイルシステムで利用可能でなければなりません。
machines
はマシン仕様 (machine specification) のベクトルです。ワーカーはマシン仕様ごとに起動されます。
マシン仕様は文字列 machine_spec
またはタプル (machine_spec, count)
で表されます。
machine_spec
は [user@]host[:port] [bind_addr[:port]]
という形の文字列です。デフォルトでは user
は現在のユーザー、port
は標準の SSH ポートとなります。[bind_addr[:port]]
が指定されると、このワーカーに対する他のワーカーからの接続は指定された bind_addr
と port
を使うようになります。
count
は指定されたホストで起動されるワーカーの個数です。:auto
を指定すると、そのホストにおける CPU スレッドと同じ個数のワーカーが起動します。
キーワード引数
-
tunnel
:true
だと、マスタープロセスからワーカーへの接続で SSH トンネリングが使われます。デフォルトはfalse
です。 -
multiplex
:true
だと、SSH トンネリングで SSH 多重化が使われます。デフォルトはfalse
です。 -
sshflags
: SSH の追加オプションを指定します。例えばsshflags=`-i /home/foo/bar.pem`
などとします。 -
max_parallel
: ホストへ同時に接続できるワーカーの個数の上限です。デフォルトは10
です。 -
dir
: ワーカーのワーキングディレクトリを指定します。デフォルトはホストのカレントディレクトリ (pwd()
の返り値) です。 -
enable_threaded_blas
:true
だと、追加されたプロセスで BLAS がマルチスレッドで実行されるようになります。デフォルトはfalse
です。 -
exename
:julia
実行形式の名前です。デフォルトは場合に応じて"$(Sys.BINDIR)/julia"
または"$(Sys.BINDIR)/julia-debug"
です。 -
exeflags
: ワーカープロセスに渡される追加のフラグです。 -
topology
: ワーカー同士がどのように接続するかを指定します。接続されていないワーカー間でメッセージを送信するとエラーとなります。-
topology=:all_to_all
: 全てのプロセスが互いに接続します。デフォルトです。 -
topology=:master_worker
:pid
が 1 のドライバプロセスだけがワーカーと接続し、ワーカー同士は接続しません。 -
topology=:custom
: クラスターマネージャのlaunch
メソッドがWorkerConfig
のident
とconnect_idents
のフィールドを通じて接続トポロジーを指定します。ident
というクラスターマネージャ ID を持つワーカーはconnect_idents
に含まれる全てのワーカーと接続します。
-
-
lazy
:topology=:all_to_all
のときにだけ意味を持ちます。true
だと、ワーカー同士の接続確立が遅延されます。つまりワーカー間で初めてリモートコールが起こったときに接続がセットアップされます。デフォルトはtrue
です。
環境変数
マスタープロセスが新しく起動したワーカーとの接続を 60.0 秒が経過するまでの間に確立できなかった場合、ワーカーはそれを致命的な状況と扱い、終了します。このタイムアウトは環境変数 JULIA_WORKER_TIMEOUT
で制御でき、マスタープロセスにおける JULIA_WORKER_TIMEOUT
の値が新しいワーカーが接続確立を待機する秒数を指定します。
addprocs(; kwargs...) -> プロセス ID のリスト
addprocs(Sys.CPU_THREADS; kwargs...)
と等価です。
ワーカーはスタートアップスクリプト .julia/config/startup.jl
を実行しないことに注意してください。またグローバルな状態 (グローバル変数・新しいメソッドの定義・読み込まれたモジュールなど) を実行中の他プロセスと同期することもありません。
addprocs(np::Integer; restrict=true, kwargs...) -> プロセス ID のリスト
組み込みの LocalManager
を使ってワーカーを起動します。このマネージャはローカルホストでのみワーカーを起動するので、マルチコアを活用できます。例えば addprocs(4)
はローカルマシンに四つのプロセスを追加します。restrict
が true
だとバインド先が 127.0.0.1
に制限されます。キーワード引数 dir
, exename
, exeflags
, topology
, lazy
, enable_threaded_blas
の効果は addprocs
のドキュメントにある通りです。
Distributed.nprocs
── 関数
nprocs()
利用可能なプロセスの個数を返します。
例
julia> nprocs()
3
julia> workers()
5-element Array{Int64,1}:
2
3
Distributed.nworkers
── 関数
Distributed.procs
── メソッド
Distributed.procs
── メソッド
procs(pid::Integer)
同じ物理ノード上にある全てのプロセス ID のリストを返します。正確に言うと、pid
と同じ IP アドレスにバインドされた全てのワーカーが返ります。
Distributed.workers
── 関数
workers()
全てのワーカープロセス ID のリストを返します。
例
$ julia -p 5
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
── 関数
rmprocs(pids...; waitfor=typemax(Int))
指定されたワーカーを削除します。ワーカーを追加/削除できるのはプロセス 1 だけであることに注意してください。
キーワード引数 waitfor
はワーカーがシャットダウンするまで待つ秒数を指定します。
waitfor
を指定しないと、rmprocs
は要求されたpids
が全て削除されるまで待ちます。- 要求した
waitfor
秒までの間に全てのワーカーが終了できないと、ErrorException
が送出されます。 -
waitfor
が0
だとrmprocs
はすぐに返り、違うタスクでワーカーの削除がスケジュールされます。スケジュールされたTask
オブジェクトが返ります。ユーザーは他の並列な関数を呼び出す前にrmprocs
が返すタスクに対してwait
を行うべきです。
例
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
── 関数
interrupt(pids::Integer...)
指定されたワーカーで現在実行中のタスクを中断します。これはローカルマシンで Ctrl-C を入力するのと等価です。引数が与えられなければ、全てのワーカーが中断されます。
interrupt(pids::AbstractVector=workers())
指定されたワーカーで現在実行中のタスクを中断します。これはローカルマシンで Ctrl-C を入力するのと等価です。引数が与えられなければ、全てのワーカーが中断されます。
Distributed.myid
── 関数
Distributed.pmap
── 関数
pmap(f,
[::AbstractWorkerPool],
c...;
distributed=true,
batch_size=1,
on_error=nothing,
retry_delays=[],
retry_check=nothing) -> collection
コレクション c
を変形します。利用可能なワーカーとタスクを使って c
の各要素に f
が適用されます。
c
に複数のコレクションが渡されると、それぞれから要素を一つずつ取ったものが f
の引数となります。
f
は全てのワーカープロセスで利用可能である必要があることに注意が必要です。詳細はマニュアルのパッケージの読み込みとコードの可視性の章を参照してください。
ワーカープールが指定されないと、利用可能な全てのワーカー (デフォルトのワーカープール) が使われます。
デフォルトで pmap
は指定された全てのワーカーに計算を分散します。ローカルプロセスだけを使ってタスクで計算を分散させるには distributed=false
を指定してください。このときの振る舞いは asyncmap
と等価であり、例えば pmap(f, c; distributed=false)
は asyncmap(f, c; ntasks=()->nworkers())
と同じです。
引数 batch_size
を使うと、pmap
でプロセスとタスクを混ぜて利用できます。batch_size
を 1 より大きくすると、コレクションは複数のバッチで処理されます。各バッチは batch_size
以下の長さを持ち、利用可能なワーカーに単一のリクエストとして送られます。これに対してローカルな asyncmap
では、各要素はバッチの一部として複数の並行なタスクを使って処理されます。
エラーが一つでも発生すると、pmap
はコレクションの残りの部分の処理を止めます。この振る舞いを上書きするには、引数 on_error
にエラー処理関数を指定してください。この関数はエラーを唯一の引数として受け取り、エラーを再度送出して処理を止めるか、何らかの値を返して処理を継続させるかを選択できます。後者では返した値が結果として呼び出し側に返ります。
on_error
の使用例を二つ示します。一つ目では例外オブジェクトが返り値の配列に組み込まれ、二つ目では例外が発生した場所が 0
となっています:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
失敗した計算をやり直すことでエラーを処理することもできます。pmap
のキーワード引数 retry_delays
と retry_check
が retry
にキーワード引数 delays
および check
として渡されます。バッチを使っていてバッチ全体が失敗したときは、そのバッチに含まれる要素が全てやり直されます。
on_error
と retry_delays
が両方指定されると、やり直しの前に on_error
が呼ばれます。on_error
が例外を (再) 送出しなかった場合には、要素に対する処理のやり直しは行われません。
例: 次の呼び出しはエラーが起きた要素に対して f
を最大三回やり直します。やり直しの間に遅延は挿入されません。
pmap(f, c; retry_delays = zeros(3))
例: 次の呼び出しは例外が InexactError
でないときに限って f
をやり直します。やり直しは指数的に長くなる遅延を使って最大三回行われます。InexactError
が起こった場所には NaN
が挿入されます。
pmap(f, c;
on_error = e->(isa(e, InexactError) ? NaN : rethrow()),
retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
── 型
RemoteException(captured)
リモート計算で発生した例外をローカルでキャプチャして再度送出するときに使われる例外型です。RemoteException
型の値はワーカーの pid
とキャプチャされた例外を包みます。CapturedException
型のフィールド captured
は例外が送出された時点におけるリモートのコールスタックをシリアライズ可能な形にしたものと例外オブジェクトを保持します。
Distributed.Future
── 型
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Future
は終了までの時間と終了状態が未知である単一の計算を表すプレースホルダーです。複数の計算に対しては RemoteChannel
を使ってください。AbstractRemoteRef
の特定には remoteref_id
が利用できます。
Distributed.RemoteChannel
── 型
RemoteChannel(pid::Integer=myid())
プロセス pid
に存在する Channel{Any}(1)
に対する参照を作成します。デフォルトの pid
は現在のプロセスです。
RemoteChannel(f::Function, pid::Integer=myid())
指定されたサイズと型を持つリモートのチャンネルに対する参照を作成します。f
は pid
を受け取って AbstractChannel
の実装を返す関数でなければなりません。
例えば RemoteChannel(()->Channel{Int}(10), pid)
は、pid
に存在するサイズ 10 で Int
型のチャンネルに対する参照を返します。
Base.fetch
── メソッド
fetch(x::Future)
Future
を待機し、その値を取得します。フェッチされた値はローカルでキャッシュされ、一度呼び出した後に同じ参照に対して fetch
を呼び出すとキャッシュされた値が返ります。リモートが返した値が例外の場合は、リモートの例外とバックトレースを捕捉した RemoteException
を送出します。
Base.fetch
── メソッド
fetch(c::RemoteChannel)
RemoteChannel
を待機し、その値を取得します。リモートで発生した例外は Future
と同じように処理されます。フェッチした要素を削除しません。
Distributed.remotecall
── メソッド
Distributed.remotecall_wait
── メソッド
remotecall_wait(f, id::Integer, args...; kwargs...)
メッセージを一つだけ用いる高速な wait(remotecall(...))
を行います。関数はワーカー ID が id
の Worker
で実行されます。キーワード引数があれば f
にそのまま渡されます。
wait
, remotecall
も参照してください。
Distributed.remotecall_fetch
── メソッド
remotecall_fetch(f, id::Integer, args...; kwargs...)
メッセージを一つだけ使って fetch(remotecall(...))
を行います。キーワード引数があれば f
にそのまま渡されます。リモートで発生した例外はキャプチャされて RemoteException
として送出されます。
参照:fetch
, remotecall
例
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
Distributed.remote_do
── メソッド
remote_do(f, id::Integer, args...; kwargs...) -> nothing
ワーカー id
で f
を非同期に実行します。remotecall
と異なり、計算の結果は保存されず、終了を待つことはできません。
成功した remote_do
の呼び出しは実行の要求がリモートノードで受け付けられたことを意味します。
同じワーカーに対する連続した remotecall
は呼び出しの順番でシリアライズされますが、リモートワーカーにおける実行の順序は決まっていません。例えば remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
とすると f1
, f2
, f3
という順序で呼び出しがシリアライズされますが、ワーカー 2 で f1
が f3
より先に実行されることは保証されません。
f
から送出された任意の例外はリモートワーカーの stderr
に出力されます。
キーワード引数があれば f
にそのまま渡されます。
Base.put!
── メソッド
put!(rr::RemoteChannel, args...)
RemoteChannel
に渡された値を格納します。チャンネルが満杯なら、空間が空くまでブロックします。第一引数を返します。
Base.put!
── メソッド
Base.take!
── メソッド
Base.isready
── メソッド
isready(rr::RemoteChannel, args...)
RemoteChannel
が値を保持しているかどうかを判定します。この関数に頼って処理を行うと競合状態が起こる可能性があることに注意してください。isready
を呼び出したプロセスが返り値を受け取るときには条件が成り立たなくなっている場合があるためです。ただし Future
は一度だけ値が設定されるので、Future
に対しては安全に使用できます。
Base.isready
── メソッド
Distributed.AbstractWorkerPool
── 型
AbstractWorkerPool
WorkerPool
や CachingPool
といったワーカープールを表す上位型です。AbstractWorkerPool
が実装すべきメソッドは次の通りです:
-
push!
: overall プール (利用可能なワーカーおよび実行中のワーカーが入るプール) に新しいワーカーを追加します。 -
put!
: ワーカーを available プールに入れます。 -
take!
: available プールから (リモートコールのために) ワーカーを取得します。 -
length
: overall プールにある利用可能なワーカーの個数を返します。 -
isready
: プールに対するtake!
がブロックするならfalse
を、ブロックしないならtrue
を返します。
これらのメソッドの (AbstractWorkerPool
に対する) デフォルト実装は次のフィールドを必要とします:
channel::Channel{Int}
workers::Set{Int}
channel
には利用可能なワーカーのプロセス ID が格納されます。workers
はこのプールに関連付いた全てのワーカーのプロセス ID の集合です。
Distributed.WorkerPool
── 型
WorkerPool(workers::Vector{Int})
ワーカー ID のベクトルから WorkerPool
を作成します。
例
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
Distributed.CachingPool
── 型
CachingPool(workers::Vector{Int})
AbstractWorkerPool
の実装です。remote
, remotecall_fetch
, pmap
(および関数をリモートで実行する他のリモートコール) では、CachingPool
型のワーカープールを使ってワーカーノードにある関数 (特に大きなデータをキャプチャする可能性のあるクロージャ) をシリアライズ/デシリアライズした形で保持することで効率を向上できます。
リモートキャッシュは返される CachingPool
オブジェクトと同じ寿命を持ちます。それよりも早くキャッシュを削除するには clear!(pool)
を使ってください。
クロージャでグローバル変数をキャプチャするとき、キャプチャされるのは束縛だけであり、データはキャプチャされません。グローバル変数のデータもキャプチャするには let
ブロックを使ってください。
例
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(wp, i -> sum(foo) + i, 1:100);
end
このコードは foo
を各ノードに一度だけ送信します。
Distributed.default_worker_pool
── 関数
default_worker_pool()
アイドル状態の worker
を保持する WorkerPool
です。remote(f)
と pmap
がデフォルトで利用します。
例
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
── メソッド
Distributed.remote
── 関数
remote([p::AbstractWorkerPool], f) -> Function
remotecall_fetch
を使って f
を利用可能なワーカーで実行する無名関数を返します。WorkerPool
型の値 p
が与えられれば、f
はそこから取ってきたワーカーで実行されます。
Distributed.remotecall
── メソッド
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
を受け取るバージョンの remotecall(f, pid, ....)
です。pool
でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall
を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
この例では、ID 1 のプロセスが呼び出したタスクが ID 2 のプロセスで実行されます。
Distributed.remotecall_wait
── メソッド
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
を受け取るバージョンの remotecall_wait(f, pid, ....)
です。pool
でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_wait
を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
── メソッド
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
を受け取るバージョンの remotecall_fetch(f, pid, ....)
です。pool
でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_wait
を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
── メソッド
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
WorkerPool
を受け取るバージョンの remote_do(f, pid, ....)
です。pool
でワーカーが利用可能になるのを待ち、利用可能になったらワーカーを取得して remotecall_do
を実行します。
Distributed.@spawnat
── マクロ
@spawnat p expr
式を包むクロージャを作成し、それをプロセス p
で非同期に実行します。返り値に対する Future
を返します。p
がクオートされたリテラルシンボル :any
なら、expr
を実行するプロセスはシステムによって自動的に選択されます。
例
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
引数 :any
は Julia 1.3 以降でサポートされます。
Distributed.@fetch
── マクロ
Distributed.@fetchfrom
── マクロ
Distributed.@distributed
── マクロ
@distributed
分散メモリを使った並列 for ループです。次の形式で使います:
@distributed [reducer] for var = range
body
end
指定された区間 range
が分割されて各ワーカーに割り当てられ、本体 body
の実行はワーカーでローカルに行われます。省略可能な縮約関数 reducer
を指定すると、@distributed
はローカルな縮約を各ワーカーで実行し、それから呼び出したプロセスで最後の縮約を実行します。
縮約関数を使用しないと、@distributed
は非同期に実行されます。つまり利用可能な全てのワーカーで独立したタスクが起動し、終了を待つことなくマクロはすぐに返ります。タスクの終了を待つには、次のように @sync
を最初に付けてください:
@sync @distributed for var = range
body
end
Distributed.@everywhere
── マクロ
@everywhere [procs()] expr
procs
に含まれる全てのプロセスで式 expr
を Main
の下で実行します。任意のプロセスで起きたエラーは収集され、単一の CompositeException
として送出されます。例えば
@everywhere bar = 1
は Main.bar
を現在起動している全てのプロセスで定義します。後から (addprocs()
などで) 追加されるプロセスでは Main.bar
は定義されません。
@spawnat
とは異なり、@everywhere
はローカル変数を一切キャプチャしません。その代わり、ローカル変数は補間を使ってブロードキャストできます:
foo = 1
@everywhere bar = $foo
省略可能な引数 procs
を使うと、式を実行するプロセスをプロセス全体の部分集合にできます。
remotecall_eval(Main, procs, expr)
の呼び出しと等価です。
Distributed.clear!
── メソッド
Distributed.remoteref_id
── 関数
remoteref_id(r::AbstractRemoteRef) -> RRID
Future
と RemoteChannel
は次のフィールドで識別されます:
-
where
: リモートリファレンスが指すオブジェクト/ストレージが実際に存在するノードです。 -
whence
: リモートリファレンスが作成されたノードです。これはリモートリファレンスが指すオブジェクトが実際に存在するノードと異なる概念であることに注意してください。例えばマスタープロセスでRemoteChannel(2)
とすると、where
は2
であるのに対してwhence
は1
となります。 -
id
: リモートリファレンスの識別子です。識別子whence
を持つワーカーで作成されるリモートリファレンスは全て異なるid
を持ちます。
まとめると、whence
と id
があれば全てのワーカーが持つリモートリファレンスの中から一つをユニークに識別できます。
remoteref_id
はリモートリファレンスの whence
と id
を包んだ RRID
オブジェクトを返す低水準 API です。
Distributed.channel_from_id
── 関数
channel_from_id(id) -> c
remoteref_id
が返す id
を受け取って、対応する AbstractChannel
を返す低水準 API です。この関数は対応するチャンネルが存在するノードでのみ呼び出せます。
Distributed.worker_id_from_socket
── 関数
Distributed.cluster_cookie
── メソッド
Distributed.cluster_cookie
── メソッド
ClusterManager
インターフェース
このインターフェースは様々なクラスター環境で Julia ワーカーを起動・管理する仕組みを提供します。Base には二種類のクラスターマネージャがあります: 同じホストで追加のワーカーを起動する LocalManager
と、SSH を使ってリモートホストでワーカーを起動する SSHManager
です。プロセス間の接続とメッセージのトランスポートには TCP/IP ソケットが使われますが、クラスターマネージャは異なるトランスポートを提供することもできます。
Distributed.ClusterManager
── 型
ClusterManager
クラスターマネージャを表す上位型です。ワーカープロセスをクラスターとして管理します。クラスターマネージャはワーカーの追加・削除・通信の方式を実装します。SSHManager
と LocalManager
は ClusterManager
の部分型です。
Distributed.WorkerConfig
── 型
WorkerConfig
クラスターに追加されるワーカーを制御するために ClusterManager
が使う型です。いくつかのフィールドはホストにアクセスするために全てのクラスターマネージャによって利用されます:
io
: ワーカーにアクセスするときに使う接続 (IO
もしくはNothing
)host
: ホストのアドレス (AbstractString
もしくはNothing
)port
: ワーカーに接続するときに使うホストのポート番号 (Int
もしくはNothing
)
いくつかのフィールドはクラスターマネージャが初期化済みのホストにワーカーを追加するときに利用されます:
count
: ホストで起動するワーカーの個数exename
: ホストにおける Julia 実行形式のパス (デフォルトは"$(Sys.BINDIR)/julia"
もしくは"$(Sys.BINDIR)/julia-debug"
)exeflags
: Julia をリモートで起動するときに使うフラグ
userdate
フィールドは外部マネージャが各ワーカーの情報を格納するときに利用されます。
いくつかのフィールドは SSHManager
およびそれに似たマネージャによって利用されます:
-
tunnel
:true
(トンネリングを行う)・false
(トンネリングを行わない)・nothing
(マネージャのデフォルトを使う) のいずれか multiplex
:true
(SSH トンネリングで多重化を使う) もしくはfalse
(使わない)forward
: SSH の-L
オプションで使われるフォワーディングオプションbind_addr
: バインドを行うリモートホストのアドレスsshflags
: SSH 接続を確立するときに使うフラグmax_parallel
: ホストに並列に接続できるワーカーの個数の最大値
いくつかのフィールドは LocalManager
と SSHManager
の両方によって利用されます:
connect_at
: セットアップされる通信がワーカー-ワーカー間とドライバ-ワーカー間のどちらか-
process
: 接続されるプロセス (通常はaddprocs
中にマネージャが値を代入する) ospid
: ホスト OS におけるプロセス ID (ワーカープロセスを停止するときに使われる)environ
:LocalManager
/SSHManager
が一時的な情報を格納するのに使うプライベートな辞書-
ident
:ClusterManager
がワーカーの識別に使う ID connect_idents
: 独自のトポロジーを使っているとき、このワーカーが接続しなければならないワーカーの ID のリストenable_threaded_blas
: ワーカーでスレッド化された BLAS を使うかどうか (true
,false
,nothing
のいずれか)
Distributed.launch
── 関数
launch(manager::ClusterManager,
params::Dict,
launched::Array,
launch_ntfy::Condition)
クラスターマネージャによって実装されます。クラスターマネージャはこの関数で起動した Julia ワーカーのそれぞれに対して WorkerConfig
を launched
に追加し、ワーカーが全て起動したら launch_ntfy
を notify
するべきです。この関数は必ず、マネージャが要求したワーカーの起動が全て完了してから返る必要があります。params
は addprocs
を呼び出すときに使われたキーワード引数を全て納めた辞書です。
Distributed.manage
── 関数
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
クラスターマネージャによって実装されます。ワーカーでライフタイムに関するイベントが起こるたびにマスタープロセスで呼び出され、そのとき op
が何が起こったかを示します:
:register
/:deregister
: ワーカーが Julia のワーカープールから追加/削除されたことを示します。:interrupt
:interrupt(workers)
が呼ばれたことを示します。ClusterManager
は適切なワーカーに停止シグナルを送るべきです。:finalize
: クリーンアップ用途で使われます。
Base.kill
── メソッド
Sockets.connect
── メソッド
connect(manager::ClusterManager,
pid::Int,
config::WorkerConfig) -> (instrm::IO, outstrm::IO)
独自のトランスポートを使うクラスターマネージャで実装されます。この関数は config
が指定する論理接続を ID が pid
のワーカーに対して確立し、IO
オブジェクトの組を返すべきです。pid
から現在のプロセスへのメッセージは instrm
から読めるようになり、pid
へのメッセージは outstrm
に書き込まれます。独自のトランスポート実装はメッセージの送受信が完全な形で順序を保って行われることを保証する必要があります。connect(manager::ClusterManager.....)
は TCP/IP ソケット接続をワーカー間にセットアップします。
Distributed.init_worker
── 関数
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
独自のトランスポートを実装するクラスターマネージャによって呼ばれます。この関数は新しく起動されたプロセスをワーカーとして初期化します。コマンドライン引数 --worker[=<cookie>]
を指定すると、プロセスはトランスポートに TCP/IP ソケットを使うワーカーとして初期化されます。cookie
は cluster_cookie
を表します。
Distributed.start_worker
── 関数
start_worker([out::IO=stdout],
cookie::AbstractString=readline(stdin);
close_stdin::Bool=true,
stderr_to_stdout::Bool=true)
start_worker
は内部で使われる関数であり、TCP/IP で接続するワーカープロセスのデフォルトのエントリーポイントです。Julia クラスターワーカーとしてプロセスをセットアップします。
host:port
の情報がストリーム out
(デフォルトは stdout
) に書き込まれます。
この関数は必要ならクッキーを stdin
から読み、空いているポート (コマンドライン引数 --bind-to
が指定されればそのポート) にリッスンし、内向きの TCP 接続とリクエストを処理するタスクをスケジュールします。さらに stdin
を閉じ、stderr
を stdout
にリダイレクトします (どちらも省略可能)。
この関数は返りません。
Distributed.process_messages
── 関数
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
独自のトランスポートを使うクラスターマネージャによって呼ばれます。この関数は独自のトランスポート実装がリモートワーカーからの最初のメッセージを受信したときに呼ばれるべきです。独自のトランスポートはリモートワーカーへの論理接続を管理し、二つの IO
オブジェクトを提供しなければなりません。IO
オブジェクトの一つは内向きのメッセージのためで、もう一つはリモートワーカーに向かうメッセージのためです。incoming
が true
だと、リモートピアが接続を開始したことを表します。接続を開始した方が認証ハンドシェイクを実行するためのクラスタークッキーと Julia のバージョン番号を送信します。
cluster_cookie
も参照してください。