タスク

Core.Task ── 型

Task(func)

与えられた関数 func を実行するタスク (コルーチン) を作成します。func は引数を取らない関数である必要があります。タスクは func が返ったときに終了します。

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

この例の b は実行可能な Task であり、まだ開始されてはいません。

[email protected] ── マクロ

@task

式を Task に包み、その Task を返します。このマクロはタスクを作成するだけで、実行はしません。

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true

[email protected] ── マクロ

@async

式を Task に包み、その Task をローカルマシンのスケジューラキューに追加します。

@async に渡す式では $ を使って値を補間でき、補間された値は内部のクロージャに直接コピーされます。この機能を使って変数 value の値を非同期なコードに埋め込めば、現在のタスクで変数の値を変更しても非同期にコードを実行する新しいタスクに影響しないようにできます。

Julia 1.4

$ を使った値の補間は Julia 1.4 以降でサポートされます。

Base.asyncmap ── 関数

asyncmap(f, c...; ntasks=0, batch_size=nothing)

並列な複数のタスクを使って f をコレクションの各要素に適用します。c に複数のコレクションが渡されると、それぞれから要素を一つずつ取ったものが f の引数となります。

ntasks には並行に実行されるタスクの数を指定します。ntasks を指定しなければ、コレクションの長さに応じて最大で 100 個のタスクが使われます。

ntasks にはゼロ引数の関数を指定することもできます。そうすると各要素の処理を開始する前に ntasks を通して並行に実行するタスクの個数の上限が確認され、ntasks が返した値が現在の並列タスクの個数より大きいときに限って新しいタスクが開始されるようになります1

batch_size が指定されると、コレクションはバッチモードで処理されます。このとき f はコレクションの要素を並べたタプルからなる Vector を引数に受け取り、その処理結果からなる Vector を返す必要があります。入力のベクトルは batch_size と同じかそれ以下の長さを持ちます。

次に示すいくつかの例では、タスクの objectid を返す関数を使うことで asyncmap に渡された関数がどのタスクに割り当てられるかを確認しています。

まず ntasks が指定されないとき、全ての要素は異なるタスクで処理されます:

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

ntasks=2 とすると、全ての要素が二つのタスクで処理されます:

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

batch_size を指定するとき、asyncmap に渡す関数はコレクションの要素を並べたタプルからなる配列を受け取り、結果の配列を返す必要があります。次の例では map を使って配列に対応しています:

julia> batch_func(input) = map(x->string("args_tuple: ", x,
                                         ", element_val: ", x[1],
                                         ", task: ", tskoid()),
                                input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
情報

現在、Julia における全てのタスクは単一の OS スレッドで協調して実行されます。asyncmap を使う意味があるのは、渡される関数が IO (ディスク・ネットワーク・リモートワーカーの起動など) を行うときだけです。

Base.asyncmap! ── 関数

asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

基本的に asyncmap と同様ですが、結果のコレクションを返すのではなく results に格納します。

Base.current_task ── 関数

current_task()

現在実行中の Task を返します。

Base.istaskdone ── 関数

istaskdone(t::Task) -> Bool

タスクが終了したかどうかを判定します。

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true

Base.istaskstarted ── 関数

istaskstarted(t::Task) -> Bool

タスクが実行を開始したかどうかを判定します。

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false

Base.istaskfailed ── 関数

istaskfailed(t::Task) -> Bool

タスクが例外の送出により終了したかどうかを判定します。

julia> a4() = error("task failed");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true

Base.task_local_storage ── メソッド

task_local_storage(key)

現在のタスクのタスクローカルストレージの key に対応する値を取得します。

Base.task_local_storage ── メソッド

task_local_storage(key, value)

現在のタスクのタスクローカルストレージの key に対応する値を設定します。

Base.task_local_storage ── メソッド

task_local_storage(body, key, value)

タスクローカルストレージの keyvalue を対応させた状態で body を呼び出し、その後 key に対応する値 (もしくは key に値が割り当てられていない状態) を復元します。動的スコープのエミュレートに有用です。

スケジューリング

Base.yield ── 関数

yield()

スケジューラに実行を切り替えて、スケジュールされた他のタスクを実行できるようにします。この関数を呼び出したタスクは依然 runnable であり、他に runnable なタスクが存在しなければすぐに再開されます。

yield(t::Task, arg = nothing)

schedule(t, arg); yield() と等価です。このメソッドの方が二つの関数を別々に呼び出すより高速ですが、スケジューラを呼び出すことなく t に直接 yield するので、使うとスケジュールが不平等になります。

Base.yieldto ── 関数

yieldto(t::Task, arg = nothing)

指定されたタスクに実行を切り替えます。あるタスクに対して実行が最初に切り替わったとき、タスクの関数は引数無しに呼ばれます。以降の切り替わりでは、タスクが最後に呼んだ yieldto の返り値が arg となります。これはタスクの切り替えのみを行う低水準の関数であり、タスクの状態やスケジュールを一切関知しません。この関数の使用は推奨されません。

Base.sleep ── 関数

sleep(seconds)

現在のタスクの実行を指定された秒数だけブロックします。指定できる最小の時間は 1 ミリ秒 (0.001) です。

Base.schedule ── 関数

schedule(t::Task, [val]; error=false)

スケジューラのキューにタスク t を追加します。引数のタスク twait のようなブロックする処理を実行せず、さらにシステムが他に実行するタスクを持たないなら、タスク t の実行が最後まで継続されます。

第二引数 val が与えられると、それはタスク t が実行を再開するときに (yieldto の返り値として) 渡されます。errortrue なら、再開されたタスクで val を値とする例外が送出されます。

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true

同期

[email protected] ── マクロ

@sync

字句的に囲まれた範囲で使われる @async, @spawn, spawnat, @distributed が全て完了するのを待ちます。内部の非同期処理で送出された全ての例外はまとめられ、一つの CompositeException として送出されます。

Base.wait ── 関数

Threads.Condition に対する wait に関する特別な注意点:

呼び出し側は、wait を呼び出す前に対象の Condition に対する lock を取得する必要があります。wait を呼び出したタスクは他のタスクによって (通常は同じ Condition に対する notify で) 起動されるまでブロックします。ブロックしている間 (再帰的にロックされていたとしても) ロックは不可分に解放され、返るときに再取得されます。

wait([x])

何らかのイベントが起こるまで現在のタスクをブロックします。待つイベントは引数の型で決まります:

  • Channel: チャンネルに値が追加されるまで待つ。
  • Condition: 条件に対する notify を待つ。
  • Process: プロセスあるいはプロセスチェーンが終了するのを待つ。プロセスの成功/失敗はプロセスの exitcode フィールドで判定できる。
  • Task: タスクが終了するのを待つ。タスクが例外を送出して失敗した場合には、失敗したタスクをラップする TaskFailedException が送出される。
  • RawFD: ファイル記述子の変更を待つ (FileWatching パッケージを参照)。

引数が渡されないと、ブロック時間は未定義となります。こうなったタスクは schedule または yieldto を明示的に呼び出すことで再開できます。

waitwhile ループの中で処理を進めるのに必要な条件が成り立っていることを確認するためよく使われます。

wait(r::Future)

指定された Future の値が利用可能になるまで待ちます。

wait(r::RemoteChannel, args...)

指定された RemoteChannel で値が利用可能になるまで待ちます。

Base.fetch ── メソッド

fetch(t::Task)

タスクが終わるのを待ち、その結果を返します。タスクが例外を送出して失敗すると、そのタスクをラップした TaskFailedException が送出されます。

Base.timedwait ── 関数

timedwait(testcb::Function, timeout::Real; pollint::Real=0.1)

testcbtrue を返すか、timeout 秒が経過するまで待ちます。どちらかが起こればこの関数は返ります。testcbpollint 秒ごとに状態を確認されます。timeoutpollint に指定できるのは 1 ミリ秒 (0.001) 以上の値です。

:ok または :timed_out を返します。

Base.Condition ── 型

Condition()

タスクが wait できるエッジトリガのイベントソースを作成します。Condition に対して wait を行ったタスクは停止され、キューに入れられます。その後キューに入れられたタスクは同じ Condition に対して notify が呼ばれたときに起動されます。「エッジトリガ」とは、notify が呼ばれたときに待っているタスクだけが起動することを意味します。レベルトリガの通知を行うには、通知が起こったかを記録する状態を別に用意する必要があります。Channel 型と Threads.Event 型はこれを行うので、レベルトリガのイベントが必要なときはこれらを利用できます。

このオブジェクトはスレッドセーフではありません。スレッドセーフなバージョンについては Threads.Condition を参照してください。

Base.notify ── 関数

notify(condition, val=nothing; all=true, error=false)

条件に対して待っているタスクを起動し、値 val を渡します。alltrue (デフォルト値) なら待っているタスクが全て起動され、allfalse なら一つのタスクだけが起動されます。errortrue なら、起動されるタスクで渡された値 val が例外として送出されます。

起動したタスクの個数を返します。condition に待っているタスクが無ければ 0 が返ります。

Base.Semaphore ── 型

Semaphore(sem_size)

同時に最大でも sem_size 個のタスクから取得できる計数セマフォ (counting semaphore) を作成します。

Base.acquire ── 関数

acquire(s::Semaphore)

セマフォ s のパーミットが取得可能になったら取得します。s が持つ sem_size 個のパーミットのいずれかが取得できるようになるまでブロックします。

Base.release ── 関数

release(s::Semaphore)

パーミットをプールに返却します。これにより他のタスクがパーミットを取得して実行を進められるようになります。

Base.AbstractLock ── 型

AbstractLock

同期プリミティブを実装する型を表す抽象上位型です。同期プリミティブとは lock, trylock, unlock, islocked のことです。

Base.lock ── 関数

lock(lock)

lock が取得可能になったら取得します。lock が他のタスク/スレッドによってロックされている場合には、利用可能になるまで待ちます。

lock には必ず unlock が対応する必要があります。

lock(f::Function, lock)

lock を取得し、lock を保持した状態で f を実行し、f が返ったら lock を解放します。もし lock が他のタスク/スレッドによってロックされている場合には、利用可能になるまで待ちます。

この関数が返るとき lock は解放されているので、呼び出し側から unlock する必要はありません。

Base.unlock ── 関数

unlock(lock)

lock の所有権を手放します。

lock が過去に取得された再帰的なロックなら、内部のカウンタを 1 減らしてすぐに返ります。

Base.trylock ── 関数

trylock(lock) -> Success (Boolean)

ロックが取得可能なら取得して true を返し、他のタスク/スレッドによってロックされていれば false を返します。

成功した trylock には必ず unlock が対応する必要があります。

Base.islocked ── 関数

islocked(lock) -> Status (Boolean)

lock がタスク/スレッドによって取得されているかどうかを判定します。この関数を同期のために使うべきではありません (trylock を使ってください)。

Base.ReentrantLock ── 型

ReentrantLock()

Task 間の同期に利用する再入可能 (re-entrant) なロックを作成します。一つのタスクは ReentrantLock を必要なだけ何度でも取得できます。ReentrantLocklock には必ず unlock が対応する必要があります。

lock を呼び出すと、対応する unlock を呼び出すまでそのスレッドでファイナライザの実行が行われなくなります。次に示す標準のロックパターンは自然にサポートされますが、trylock の順番を間違えたり、try ブロックの実行が飛ばされる (例えばロックを保持したまま関数から返る) ことのないよう注意してください:

lock(l)
try
    <アトミックな処理>
finally
    unlock(l)
end

チャンネル

Base.Channel ── 型

Channel{T=Any}(size::Int=0)

T 型のオブジェクトを最大で size 個だけ保持できる内部バッファを持った Channel を構築します。満杯のチャンネルに対して put! を呼び出すと、他のタスクが take! を呼び出しチャンネルからオブジェクトを削除するまでブロックします。

Channel(0) はバッファを持たないチャンネルを構築します。このチャンネルでは任意の put!take! されるまでブロックし、逆も同様となります。

他のコンストラクタは次の通りです:

  • Channel(): デフォルトコンストラクタであり、Channel{Any}(0) と等価です。
  • Channel(Inf): Channel{Any}(typemax(Int)) と等価です。
  • Channel(sz): Channel{Any}(sz) と等価です。
Julia 1.3

デフォルトコンストラクタ Channel() とデフォルト値 size=0 は Julia 1.3 で追加されました。

Base.Channel ── メソッド

Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)

func から新しいタスクを作成し、T 型でサイズ size の新しいチャンネルにタスクをバインドし、タスクをスケジュールするという処理を一度の呼び出しで行います。

func は唯一の引数としてバインドされるチャンネルを受け取らなければなりません。

作成されるタスクを参照する必要があるなら、キーワード引数 taskrefRef{Task} オブジェクトを渡してください。

spawn = true だと、func を実行する新しいタスクを他のスレッドで並列にスケジュールすることを許可できます。つまり [email protected] でタスクを作成するのと等価になります。

Channel 型の値を返します。

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

作成されたタスクを参照する例を示します:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

キーワード引数 spawn は Julia 1.3 で追加されました。1.3 より前の Julia では、Channel のコンストラクタで sizeT を指定するのにキーワード引数が使われていました。現在このコンストラクタは非推奨です。

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(sz_max:1,sz_curr:1)

julia> String(collect(chnl))
"hello world"

Base.put! ── メソッド

put!(c::Channel, v)

チャンネル c に要素 v を入れます。チャンネルが満杯ならブロックします。

バッファを持たないチャンネルでは、他のタスクが take! するまでブロックします。

Julia 1.1

Julia 1.1 以降では、put! される vconvert でチャンネルの型に変換されます。

Base.take! ── メソッド

take!(c::Channel)

Channel から値を一つ取り除き、取り除いた値を返します。データが利用可能になるまでブロックします。

バッファを持たないチャンネルでは、他のタスクが put! するまでブロックします。

Base.isready ── メソッド

isready(c::Channel)

Channel に値が保存されているかどうかを判定します。すぐに値を返し、ブロックしません。

バッファを持たないチャンネルでは、put! して待っているタスクがあるとき true を返します。

Base.fetch ── メソッド

fetch(c::Channel)

チャンネルから値が利用可能になるのを待ち、利用可能になったら取得して返します。バッファを持たない (サイズが 0 の) チャンネルで fetch はサポートされません。

Base.close ── メソッド

close(c::Channel[, excp::Exception])

チャンネルを閉じます。次の条件が成り立つと例外が発生しますが、そのときに発生する例外は excp で設定できます:

  • 閉じたチャンネルに put! したとき。
  • 閉じた空のチャンネルに take! または fetch したとき。

Base.bind ── メソッド

bind(chnl::Channel, task::Task)

チャンネル chnl の寿命をタスク task に関連付けます。chnl はタスクが終了するときに自動的に閉じられるようになります。また task で発生した未捕捉の例外は chnl を待つ全てのタスクに伝播します。

chnl オブジェクトはタスクの終了と関係ないタイミングで閉じることもできます。Channel オブジェクトが閉じているならタスクが終了しても何も起きません。

一つのチャンネルが複数のタスクにバインドされると、タスクが一つでも終了した段階でチャンネルが閉じられます。複数のチャンネルが一つのタスクにバインドされると、タスクが終了するときにバインドされたチャンネルが全て閉じられます。

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException:
foo
Stacktrace:
[...]

  1. 訳注: 英語版には逆のことが書かれていたので、記述を修正した。[return]


日本語 Julia 書籍 (Amazon アソシエイト)
1 から始める Julia プログラミング
Julia プログラミングクックブック―言語仕様からデータ分析、機械学習、数値計算まで
スタンフォード ベクトル・行列からはじめる最適化数学