タスク
Core.Task
── 型
Task(func)
与えられた関数 func
を実行するタスク (コルーチン) を作成します。func
は引数を取らない関数である必要があります。タスクは func
が返ったときに終了します。
例
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
この例の b
は実行可能な Task
であり、まだ開始されてはいません。
Base.@task
── マクロ
Base.@async
── マクロ
Base.asyncmap
── 関数
asyncmap(f, c...; ntasks=0, batch_size=nothing)
並列な複数のタスクを使って f
をコレクションの各要素に適用します。c
に複数のコレクションが渡されると、それぞれから要素を一つずつ取ったものが f
の引数となります。
ntasks
には並行に実行されるタスクの数を指定します。ntasks
を指定しなければ、コレクションの長さに応じて最大で 100 個のタスクが使われます。
ntasks
にはゼロ引数の関数を指定することもできます。そうすると各要素の処理を開始する前に ntasks
を通して並行に実行するタスクの個数の上限が確認され、ntasks
が返した値が現在の並列タスクの個数より大きいときに限って新しいタスクが開始されるようになります1。
batch_size
が指定されると、コレクションはバッチモードで処理されます。このとき f
はコレクションの要素を並べたタプルからなる Vector
を引数に受け取り、その処理結果からなる Vector
を返す必要があります。入力のベクトルは batch_size
と同じかそれ以下の長さを持ちます。
次に示すいくつかの例では、タスクの objectid
を返す関数を使うことで asyncmap
に渡された関数がどのタスクに割り当てられるかを確認しています。
まず ntasks
が指定されないとき、全ての要素は異なるタスクで処理されます:
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
ntasks=2
とすると、全ての要素が二つのタスクで処理されます:
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
batch_size
を指定するとき、asyncmap
に渡す関数はコレクションの要素を並べたタプルからなる配列を受け取り、結果の配列を返す必要があります。次の例では map
を使って配列に対応しています:
julia> batch_func(input) = map(x->string("args_tuple: ", x,
", element_val: ", x[1],
", task: ", tskoid()),
input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
現在、Julia における全てのタスクは単一の OS スレッドで協調して実行されます。asyncmap
を使う意味があるのは、渡される関数が IO (ディスク・ネットワーク・リモートワーカーの起動など) を行うときだけです。
Base.asyncmap!
── 関数
Base.current_task
── 関数
Base.istaskdone
── 関数
istaskdone(t::Task) -> Bool
タスクが終了したかどうかを判定します。
例
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
── 関数
istaskstarted(t::Task) -> Bool
タスクが実行を開始したかどうかを判定します。
例
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
── 関数
istaskfailed(t::Task) -> Bool
タスクが例外の送出により終了したかどうかを判定します。
例
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Base.task_local_storage
── メソッド
Base.task_local_storage
── メソッド
Base.task_local_storage
── メソッド
task_local_storage(body, key, value)
タスクローカルストレージの key
に value
を対応させた状態で body
を呼び出し、その後 key
に対応する値 (もしくは key
に値が割り当てられていない状態) を復元します。動的スコープのエミュレートに有用です。
スケジューリング
Base.yield
── 関数
yield()
スケジューラに実行を切り替えて、スケジュールされた他のタスクを実行できるようにします。この関数を呼び出したタスクは依然 runnable であり、他に runnable なタスクが存在しなければすぐに再開されます。
yield(t::Task, arg = nothing)
schedule(t, arg); yield()
と等価です。このメソッドの方が二つの関数を別々に呼び出すより高速ですが、スケジューラを呼び出すことなく t
に直接 yield するので、使うとスケジュールが不平等になります。
Base.yieldto
── 関数
yieldto(t::Task, arg = nothing)
指定されたタスクに実行を切り替えます。あるタスクに対して実行が最初に切り替わったとき、タスクの関数は引数無しに呼ばれます。以降の切り替わりでは、タスクが最後に呼んだ yieldto
の返り値が arg
となります。これはタスクの切り替えのみを行う低水準の関数であり、タスクの状態やスケジュールを一切関知しません。この関数の使用は推奨されません。
Base.sleep
── 関数
Base.schedule
── 関数
schedule(t::Task, [val]; error=false)
スケジューラのキューにタスク t
を追加します。引数のタスク t
が wait
のようなブロックする処理を実行せず、さらにシステムが他に実行するタスクを持たないなら、タスク t
の実行が最後まで継続されます。
第二引数 val
が与えられると、それはタスク t
が実行を再開するときに (yieldto
の返り値として) 渡されます。error
が true
なら、再開されたタスクで val
を値とする例外が送出されます。
例
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
同期
Base.@sync
── マクロ
@sync
字句的に囲まれた範囲で使われる @async
, @spawn
, spawnat
, @distributed
が全て完了するのを待ちます。内部の非同期処理で送出された全ての例外はまとめられ、一つの CompositeException
として送出されます。
Base.wait
── 関数
Threads.Condition
に対する wait
に関する特別な注意点:
呼び出し側は、wait
を呼び出す前に対象の Condition
に対する lock
を取得する必要があります。wait
を呼び出したタスクは他のタスクによって (通常は同じ Condition
に対する notify
で) 起動されるまでブロックします。ブロックしている間 (再帰的にロックされていたとしても) ロックは不可分に解放され、返るときに再取得されます。
wait([x])
何らかのイベントが起こるまで現在のタスクをブロックします。待つイベントは引数の型で決まります:
-
Channel
: チャンネルに値が追加されるまで待つ。 -
Condition
: 条件に対するnotify
を待つ。 Process
: プロセスあるいはプロセスチェーンが終了するのを待つ。プロセスの成功/失敗はプロセスのexitcode
フィールドで判定できる。-
Task
: タスクが終了するのを待つ。タスクが例外を送出して失敗した場合には、失敗したタスクをラップするTaskFailedException
が送出される。 -
RawFD
: ファイル記述子の変更を待つ (FileWatching
パッケージを参照)。
引数が渡されないと、ブロック時間は未定義となります。こうなったタスクは schedule
または yieldto
を明示的に呼び出すことで再開できます。
wait
は while
ループの中で処理を進めるのに必要な条件が成り立っていることを確認するためよく使われます。
Base.fetch
── メソッド
fetch(t::Task)
タスクが終わるのを待ち、その結果を返します。タスクが例外を送出して失敗すると、そのタスクをラップした TaskFailedException
が送出されます。
Base.timedwait
── 関数
timedwait(testcb::Function, timeout::Real; pollint::Real=0.1)
testcb
が true
を返すか、timeout
秒が経過するまで待ちます。どちらかが起こればこの関数は返ります。testcb
は pollint
秒ごとに状態を確認されます。timeout
と pollint
に指定できるのは 1 ミリ秒 (0.001
) 以上の値です。
:ok
または :timed_out
を返します。
Base.Condition
── 型
Condition()
タスクが wait
できるエッジトリガのイベントソースを作成します。Condition
に対して wait
を行ったタスクは停止され、キューに入れられます。その後キューに入れられたタスクは同じ Condition
に対して notify
が呼ばれたときに起動されます。「エッジトリガ」とは、notify
が呼ばれたときに待っているタスクだけが起動することを意味します。レベルトリガの通知を行うには、通知が起こったかを記録する状態を別に用意する必要があります。Channel
型と Threads.Event
型はこれを行うので、レベルトリガのイベントが必要なときはこれらを利用できます。
このオブジェクトはスレッドセーフではありません。スレッドセーフなバージョンについては Threads.Condition
を参照してください。
Base.notify
── 関数
notify(condition, val=nothing; all=true, error=false)
条件に対して待っているタスクを起動し、値 val
を渡します。all
が true
(デフォルト値) なら待っているタスクが全て起動され、all
が false
なら一つのタスクだけが起動されます。error
が true
なら、起動されるタスクで渡された値 val
が例外として送出されます。
起動したタスクの個数を返します。condition
に待っているタスクが無ければ 0
が返ります。
Base.Semaphore
── 型
Base.acquire
── 関数
acquire(s::Semaphore)
セマフォ s
のパーミットが取得可能になったら取得します。s
が持つ sem_size
個のパーミットのいずれかが取得できるようになるまでブロックします。
Base.release
── 関数
Base.AbstractLock
── 型
Base.lock
── 関数
lock(lock)
lock
が取得可能になったら取得します。lock
が他のタスク/スレッドによってロックされている場合には、利用可能になるまで待ちます。
lock
には必ず unlock
が対応する必要があります。
lock(f::Function, lock)
lock
を取得し、lock
を保持した状態で f
を実行し、f
が返ったら lock
を解放します。もし lock
が他のタスク/スレッドによってロックされている場合には、利用可能になるまで待ちます。
この関数が返るとき lock
は解放されているので、呼び出し側から unlock
する必要はありません。
Base.unlock
── 関数
Base.trylock
── 関数
Base.islocked
── 関数
Base.ReentrantLock
── 型
ReentrantLock()
Task
間の同期に利用する再入可能 (re-entrant) なロックを作成します。一つのタスクは ReentrantLock
を必要なだけ何度でも取得できます。ReentrantLock
の lock
には必ず unlock
が対応する必要があります。
lock
を呼び出すと、対応する unlock
を呼び出すまでそのスレッドでファイナライザの実行が行われなくなります。次に示す標準のロックパターンは自然にサポートされますが、try
と lock
の順番を間違えたり、try
ブロックの実行が飛ばされる (例えばロックを保持したまま関数から返る) ことのないよう注意してください:
lock(l)
try
<アトミックな処理>
finally
unlock(l)
end
チャンネル
Base.Channel
── 型
Channel{T=Any}(size::Int=0)
T
型のオブジェクトを最大で size
個だけ保持できる内部バッファを持った Channel
を構築します。満杯のチャンネルに対して put!
を呼び出すと、他のタスクが take!
を呼び出しチャンネルからオブジェクトを削除するまでブロックします。
Channel(0)
はバッファを持たないチャンネルを構築します。このチャンネルでは任意の put!
が take!
されるまでブロックし、逆も同様となります。
他のコンストラクタは次の通りです:
Channel()
: デフォルトコンストラクタであり、Channel{Any}(0)
と等価です。Channel(Inf)
:Channel{Any}(typemax(Int))
と等価です。Channel(sz)
:Channel{Any}(sz)
と等価です。
デフォルトコンストラクタ Channel()
とデフォルト値 size=0
は Julia 1.3 で追加されました。
Base.Channel
── メソッド
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)
func
から新しいタスクを作成し、T
型でサイズ size
の新しいチャンネルにタスクをバインドし、タスクをスケジュールするという処理を一度の呼び出しで行います。
func
は唯一の引数としてバインドされるチャンネルを受け取らなければなりません。
作成されるタスクを参照する必要があるなら、キーワード引数 taskref
に Ref{Task}
オブジェクトを渡してください。
spawn = true
だと、func
を実行する新しいタスクを他のスレッドで並列にスケジュールすることを許可できます。つまり Threads.@spawn
でタスクを作成するのと等価になります。
Channel
型の値を返します。
例
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
作成されたタスクを参照する例を示します:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
キーワード引数 spawn
は Julia 1.3 で追加されました。1.3 より前の Julia では、Channel
のコンストラクタで size
と T
を指定するのにキーワード引数が使われていました。現在このコンストラクタは非推奨です。
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(sz_max:1,sz_curr:1)
julia> String(collect(chnl))
"hello world"
Base.put!
── メソッド
Base.take!
── メソッド
Base.isready
── メソッド
Base.fetch
── メソッド
fetch(c::Channel)
チャンネルから値が利用可能になるのを待ち、利用可能になったら取得して返します。バッファを持たない (サイズが 0 の) チャンネルで fetch
はサポートされません。
Base.close
── メソッド
Base.bind
── メソッド
bind(chnl::Channel, task::Task)
チャンネル chnl
の寿命をタスク task
に関連付けます。chnl
はタスクが終了するときに自動的に閉じられるようになります。また task
で発生した未捕捉の例外は chnl
を待つ全てのタスクに伝播します。
chnl
オブジェクトはタスクの終了と関係ないタイミングで閉じることもできます。Channel
オブジェクトが閉じているならタスクが終了しても何も起きません。
一つのチャンネルが複数のタスクにバインドされると、タスクが一つでも終了した段階でチャンネルが閉じられます。複数のチャンネルが一つのタスクにバインドされると、タスクが終了するときにバインドされたチャンネルが全て閉じられます。
例
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException:
foo
Stacktrace:
[...]
-
訳注: 英語版には逆のことが書かれていたので、記述を修正した。[return]