マルチスレッディング

[email protected] ── マクロ
[email protected] [schedule] for ... end

for ループを並列化して複数のスレッドで実行するためのマクロです。反復空間を分割して複数のタスクに割り当て、それらのタスクをスケジュールポリシーに従って複数のスレッドで実行します。ループの終端にバリアが敷かれ、全てのタスクが実行を終えるのを待機します。

引数 schedule は特定のスケジュールポリシーを要請するために利用します。現在サポートされる値は :static だけであり、こうするとスレッドごとに一つのタスクが作成され、各タスクに反復が平等に割り当てられます。他の @threads の内部あるいは ID が 1 でないスレッドで :static を指定するとエラーになります。

デフォルトのスケジュール (schedule 引数を与えないときに使われるスケジュール) は将来変更される可能性があります。

Julia 1.5

引数 schedule は Julia 1.5 以降でサポートされます。

[email protected] ── マクロ

利用できる適当なスレッドで Task を作成・実行します。タスクの終了を待つには、このマクロの返り値に対して wait を呼び出してください。fetch を呼び出せばタスクの終了を待った上で返り値を取得できます。

@spawn に渡す式では $ を使って値を補間でき、補間された値は内部のクロージャに直接コピーされます。この機能を使って変数 value の値を非同期なコードに埋め込めば、現在のタスクで変数の値を変更しても非同期にコードを実行する新しいタスクに影響しないようにできます。

情報

重要な注意点についてはマニュアルのスレッディングの章を参照してください。

Julia 1.3

このマクロは Julia 1.3 以降でサポートされます。

Julia 1.4

$ を使った値の補間は Julia 1.4 以降でサポートされます。

Base.Threads.threadid ── 関数
Threads.threadid()

現在実行中のスレッドの ID 番号を取得します。マスタースレッドの ID は 1 です。

Base.Threads.nthreads ── 関数
Threads.nthreads()

Julia プロセスで利用可能なスレッドの個数を取得します。threadid()nthreads() 以下の値を返します。

同期

Threads.Condition([lock])

スレッドセーフなバージョンの Base.Condition です。

Threads.Condition に対して wait および notify を呼び出すときは、まず条件を lock する必要があります。このロックは wait が呼び出されブロックしている間は不可分に解放され、wait が返るとき再取得されます。そのため Threads.Condition 型の値 c を使うコードは通常次のような形をしているはずです:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

この機能は Julia 1.2 以降でサポートされます。

Base.Event ──
Event()

レベルトリガのイベントソースを作成します。Event に対して wait したタスクは、他のタスクが同じ Event に対して notify するまで実行を停止します。一度 notifyEvent に対して呼ばれると、そのイベントは以降シグナルされたままとなり、wait() されてもタスクをブロックしません。

Julia 1.1

この記法は Julia 1.1 以降でサポートされます。

タスクの同期 も参照してください。

不可分操作

注意

不可分操作の API はまだ完全に決まっておらず、将来おそらく変更されます。

Base.Threads.Atomic ──
Threads.Atomic{T}

この型の値は T 型のオブジェクトへの参照を保持し、そのオブジェクトに対するアクセスを不可分な (つまり、スレッドセーフな) アクセスだけに制限します。

不可分に扱えるのは一部の “簡単な” 真偽値・整数・浮動小数点数だけです。具体的には次の型の値です:

  • Bool
  • Int8, Int16, Int32, Int64, Int128 Int256
  • UInt8, UInt16, UInt32, UInt64, UInt128 UInt256
  • Float16, Float32, Float64

新しい Atomic 型のオブジェクトは不可分でない値から作成できます。値を指定しなければ、新しい不可分オブジェクトはゼロで初期化されます。

不可分オブジェクトへのアクセスは [] 記法で行います:

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> x[] = 1
1

julia> x[]
1

atomic_add!atomic_xchg! のように、不可分操作には名前の最初に atomic_ が付いています。

Base.Threads.atomic_cas! ── 関数
Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T

x に対して不可分に compare-and-set を行います。

x が保持する値と cmp を比較し、もし等しいなら newvalx に書き込む」という処理が不可分に行われます。それ以外のとき x は変更されず、x の古い値がそのまま返ります。返り値と cmp=== で比較することで、x が更新されたかどうかを判定できます。更新されていれば xnewval を保持しています。

詳細は LLVM の cmpxchg 命令のドキュメントを参照してください。

この関数はトランザクションセマンティクスの実装に使われます。トランザクションを行う前に x の値を記録しておき、トランザクションを行っている間に x の値が変わっていないときに限って新しい値を保存するという使い方です。

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 4, 2);

julia> x
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 3, 2);

julia> x
Base.Threads.Atomic{Int64}(2)
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T

x が保持する値を不可分に交換します。

x が保持する値を newval に置き換えて、古い値を返す」という処理が不可分に行われます。

詳細は LLVM の atomicrmw xchg 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_xchg!(x, 2)
3

julia> x[]
2
Base.Threads.atomic_add! ── 関数
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

x が保持する値に val を不可分に足します。

x[] += val を不可分に実行し、x古い値を返します。Atomic{Bool} に対しては定義されません。

詳細は LLVM の atomicrmw add 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5
Base.Threads.atomic_sub! ── 関数
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

x が保持する値から val を不可分に引きます。

x[] -= val を不可分に実行し、x古い値を返します。Atomic{Bool} に対しては定義されません。

詳細は LLVM の atomicrmw sub 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_sub!(x, 2)
3

julia> x[]
1
Base.Threads.atomic_and! ── 関数
Threads.atomic_and!(x::Atomic{T}, val::T) where T

x が保持する値を val とのビット単位の論理積で更新します。

x[] &= val を不可分に実行し、x古い値を返します。

詳細は LLVM の atomicrmw and 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_and!(x, 2)
3

julia> x[]
2
Threads.atomic_nand!(x::Atomic{T}, val::T) where T

x が保持する値を val との否定論理積で更新します。

x[] = ~(x[] & val) を不可分に実行し、x古い値を返します。

詳細は LLVM の atomicrmw nand 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_nand!(x, 2)
3

julia> x[]
-3
Base.Threads.atomic_or! ── 関数
Threads.atomic_or!(x::Atomic{T}, val::T) where T

x が保持する値を val との論理和で更新します。

x[] |= val を不可分に実行し、x古い値を返します。

詳細は LLVM の atomicrmw or 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_or!(x, 7)
5

julia> x[]
7
Base.Threads.atomic_xor! ── 関数
Threads.atomic_xor!(x::Atomic{T}, val::T) where T

x が保持する値を val との排他論理和で更新します。

x[] $= val を不可分に実行し、x古い値を返します。

詳細は LLVM の atomicrmw xor 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_xor!(x, 7)
5

julia> x[]
2
Base.Threads.atomic_max! ── 関数
Threads.atomic_max!(x::Atomic{T}, val::T) where T

x が保持する値と val の大きい方を x に保存します。

x[] = max(x[], val) を不可分に実行し、x古い値を返します。

詳細は LLVM の atomicrmw max 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_max!(x, 7)
5

julia> x[]
7
Base.Threads.atomic_min! ── 関数
Threads.atomic_min!(x::Atomic{T}, val::T) where T

x が保持する値と val の小さい方を x に保存します。

x[] = min(x[], val) を不可分に実行し、x古い値を返します。

詳細は LLVM の atomicrmw min 命令のドキュメントを見てください。

julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)

julia> Threads.atomic_min!(x, 5)
7

julia> x[]
5
Threads.atomic_fence()

逐次一貫性に関するメモリフェンスを敷きます。

処理がプログラムにある順序で逐次的に実行されることを保証するメモリフェンスを敷きます。この機能が必要なアルゴリズム、つまり acquire/release フェンスでは不十分なアルゴリズムが存在します。

これは多くの場合で非常に低速な操作です。Julia の不可分操作は全て acquire/release の意味論を持つので、明示的なフェンスはまず必要にならないはずです。

詳細は LLVM の fence 命令のドキュメントを見てください。

スレッドプールを使った ccall (実験的)

[email protected] ── マクロ
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

@threadcall マクロの呼び出し方は ccall と同じですが、処理を他のスレッドで行います。ブロックする C 関数を呼ぶときに @threadcall を使うとメインの julia スレッドをブロックせずに済むので有用です。並行性は libuv のスレッドプールのサイズで制限されます。このサイズのデフォルト値は 4 ですが、環境変数 UV_THREADPOOL_SIZE を変更して julia プロセスを再起動すれば変更できます。

@threadcall で呼び出された関数から Julia を呼び返してはいけないことに注意してください。

低水準同期プリミティブ

これらは通常の同期オブジェクトを作るのに使われる基礎単位です。

SpinLock()

再入可能でない、test-and-test-and-set を使ったスピンロックです。再帰的なロックを試みるとデッドロックします。この種のロックはブロックが起こらずほとんど時間のかからない処理 (IO の実行など) でのみ使われるべきです。基本的には ReentrantLock を使ってください。

lock には必ず unlock が対応する必要があります。

test-and-test-and-set で実装されるスピンロックは競合しているスレッドが 30 程度以下である場合に最速となります。もしそれ以上のスレッドがあるなら、他のアプローチによる同期を考えてみるべきです。