並列コンピューティング
マルチスレッドコンピューティングと並列コンピューティングの初心者にとっては、Julia が提供する並列処理のさまざまなレベルを最初に理解すると便利です。Julia の並列処理は3つの主要なカテゴリに分類することができます:
- Julia コルーチン(グリーンスレッド)
- マルチスレッド
- マルチコア処理または分散処理
まず、Julia タスク(別名 コルーチン)
とJuliaランタイムライブラリに依存する他のモジュールを検討し、直接OSのスケジューラとやりとりをすることなく、タスク間の通信を完全に制御して計算を中断および再開できるようにします。 また、Julia はwait
や fetch
などの操作を通じたタスク間通信をサポートしています。通信とデータの同期は、タスク間通信を提供する導管である Channel
s を介して管理されます。
Julia は 実験的なマルチスレッド もサポートしています。実験的なマルチスレッドでは処理の実行がフォークされ、無名関数がすべてのスレッドで実行されます。 フォーク結合アプローチと呼ばれるもので、並行スレッドは独立して実行されますが、最終的には逐次実行を続行できるようにJulia のメイン スレッドに結合する必要があります。 マルチスレッドは、 Base.Threads
モジュールでサポートされていますが、Julia がまだ完全にスレッドセーフではなく、実験段階のものです。特に、I/O 操作およびタスクの切り替え中にセグメンテーションフォールトが発生する可能性があります。最新情報を得るには Issue tracker
を参照してください。マルチスレッドを使うには、グローバル変数、ロック、およびアトミックを考慮しなくてはなりません。以下でこれらについて説明します。
最後に、分散コンピューティングと並列コンピューティングに対する Julia のアプローチを紹介します。科学的計算での利用を念頭に、Julia はネイティブにインターフェイスを実装しており、複数のコアまたはマシンにプロセスを分散します。 また、MPI.jl
やDistributedArrays.jl
などの分散プログラミングに役立つ外部パッケージについても言及します。
コルーチン
Julia の並列プログラミング プラットフォームでは、 タスク (別名 Coroutines) を使用して複数の計算を切り替えます。 軽量スレッド間で実行順序を指定するには、通信プリミティブが必要です。 Julia には Channel(func::Function, ctype=Any, csize=0, taskref=nothing)
という関数があります。func
から新しいタスクを作成し、型ctype
とサイズcsize
の新しいチャネルにタスクをバインド・スケジュールします。 Channels
はタスク間の通信方法として機能するもので、Channel{T}(sz:Int)
が型 T
とサイズ sz
のバッファリングチャネルを作成するようにします。コードが fetch
や wait
などの通信操作を実行するたびに、現在のタスクは中断され、スケジューラは実行する別のタスクを選択します。 タスクは、待機中のイベントが完了するとまた続きの処理を開始します。
多くの問題では、タスクのことを直接考える必要はありません。ただし、タスクは、同時に処理される複数のイベントを待機するのに使用できるため、動的スケジューリングを行うことができます。動的スケジューリングでは、プログラムは、他のジョブがいつ終了するかに基づいて、何を計算するか、またはどこで計算するかを決定します。これは、予測不能なワークロードや不均衡なワークロードに必要です。こうした状況では、プロセスが現在のタスクを完了した時だけ、暇になったプロセスにより多くのタスクを割り当てる、というようなことをやりたくなります。
チャンネル
制御フローのページの タスク セクションでは、協調的に動作する複数の関数の実行について議論しました。Channel
は、実行中のタスク、特に I/O 操作を含むタスク間でデータを渡すのに非常に役立ちます。
I/O に関連する操作の例としては、ファイルの読み取り/書き込み、Web サービスへのアクセス、外部プログラムの実行などがあります。いずれの場合も、ファイルの読み取り中、または外部サービス/プログラムの完了を待機している間に他のタスクを実行できる場合は、全体的な実行時間を改善できます。
チャネルはパイプとみなすことができます。つまり、チャンネルは "write end" と "read end"を持ちます:
異なるタスクの複数のWriter が、
put!
関数の呼び出し経由で同じチャネルに同時に書き込みできます。異なるタスクの複数のReader は、
take!
呼び出しを介して同時にデータを読み取ることができます。例:
# Given Channels c1 and c2, c1 = Channel(32) c2 = Channel(32) # and a function `foo` which reads items from c1, processes the item read # and writes a result to c2, function foo() while true data = take!(c1) [...] # process data put!(c2, result) # write out result end end # we can schedule `n` instances of `foo` to be active concurrently. for _ in 1:n @async foo() end
チャンネルは、
Channel{T}(sz)
コンストラクタで生成されます。チャンネルは、型Tのオブジェクトのみを保持します。 型が指定されなかった場合は、チャンネルは、どんな型も保持することができます。sz
は、チャンネルでいつでも保持できる最大要素数です。例えば、Channel(32)
で生成されるのは、任意の型のオブジェクトを最大32個保持できるチャンネルを生成します。Channel{MyType}(64)
は、MyType
のオブジェクトを 64個まで、いつでも保持できます。Channel
が空であれば、readerは(take!
関数をよんでも)データが利用可能になるまでブロックされます。Channel
がいっぱいの場合 writerは、(put!
を読んでも) データを書くスペースができるまでブロックされます。isready
は、チャンネル内の任意のオブジェクトの有無についてチェックする関数で、wait
関数はオブジェクトが利用可能になるまで待機するための関数です。Channel
は、最初はオープンの状態です。つまり、take!
関数や、put!
関数経由で、自由に読み書きできる状態で、close
で、チャンネルをクローズできます。 クローズされたチャンネルでは、put!
は失敗します。例えば:julia> c = Channel(2); julia> put!(c, 1) # `put!` on an open channel succeeds 1 julia> close(c); julia> put!(c, 2) # `put!` on a closed channel throws an exception. ERROR: InvalidStateException("Channel is closed.",:closed) Stacktrace: [...]
クローズされたチャンネルへの、
take!
とfetch
(値を取得はしても、削除しません)は、チャンネルが空になるまで、チャンネルに存在する値を正常に返します。上の例を続けると:julia> fetch(c) # Any number of `fetch` calls succeed. 1 julia> fetch(c) 1 julia> take!(c) # The first `take!` removes the value. 1 julia> take!(c) # No more data available on a closed channel. ERROR: InvalidStateException("Channel is closed.",:closed) Stacktrace: [...]
チャンネルは、for
ループないのイテラブルオブジェクトとして使うことができ、チャンネルがデータを持つか、オープンされている状態である限り、ループが回ります。ループ変数はチャンネルに追加された全ての値を引き受けます。for
ループが終了されるのは、チャンネルが閉じられ、データが空になったときです。
たとえば、次の場合、for
ループは(訳注: チャンネルが閉じられていないため) さらなるデータを待機します:
julia> c = Channel{Int}(10);
julia> foreach(i->put!(c, i), 1:3) # add a few entries
julia> data = [i for i in c]
対して、こちらは、チャンネル内の全てのデータを読んだらリターンします:
julia> c = Channel{Int}(10);
julia> foreach(i->put!(c, i), 1:3); # add a few entries
julia> close(c); # `for` loops can exit
julia> data = [i for i in c]
3-element Array{Int64,1}:
1
2
3
タスク間通信にチャネルを使用する簡単な例を考えてみましょう。私たちは、単一のjobs
チャネルからデータを処理する4つのタスクを開始します。ID (job_id
) によって識別されるジョブは、チャネルに書き込まれます。 このシミュレーションの各タスクはjob_id
を読み取り、ランダムな時間をウェイトしてから、job_id
のタプルとシミュレートされた時間の長さを 結果チャネルに書き戻します。最後に、すべてのresults
が印刷されます。
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
@async do_work()
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
現在のバージョンの Julia は、すべてのタスクを単一の OS スレッドに多重化します。したがって、I/O 操作を含むタスクは並列実行の恩恵を受けますが、バインドされたタスクは単一の OS スレッドで効果的に順番に実行されます。Julia の将来のバージョンでは、複数のスレッドでのタスクのスケジューリングがサポートされるかもしれません。
マルチスレッド(実験的実装)
タスクに加えて、Julia はマルチスレッドをネイティブにサポートします。 このセクションは実験的なものであり、インターフェイスは将来変更される可能性があることに注意してください。
セットアップ
デフォルトでは、Julia は単一の実行スレッドで起動します。これは、コマンド Threads.nthreads()
を使用して確認できます:
julia> Threads.nthreads()
1
Julia が開始するスレッドの数は、JULIA_NUM_THREADS
という環境変数によって制御されます。 さて、4つのスレッドでジュリアを起動してみましょう:
export JULIA_NUM_THREADS=4
(上記のコマンドは、Linux および OSX 上のボーンシェルで動作します。これらのプラットフォームで C シェルを使用している場合は、export
の代わりに set
というキーワードを使用する必要があります。Windows を使用している場合は、julia.exe
の場所でコマンド ラインを起動し、export
の代わりに set
を使用します。)
4つのスレッドがあることを確認してみましょう。
julia> Threads.nthreads()
4
しかし、現在はマスタースレッドに入っています。確認するには、関数 Threads.threadid
を使用します
julia> Threads.threadid()
1
The @threads
マクロ
ネイティブスレッドを使用して簡単な例を見てみましょう。ゼロの配列を作成してみましょう:
julia> a = zeros(10)
10-element Array{Float64,1}:
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
4つのスレッドを使用して、この配列を同時に操作してみましょう。各スレッドで各要素にスレッド ID を書き込みます。
Julia は Threads.@threads
マクロを使用した並列ループをサポートしています。このマクロをfor
ループの前に追加すると、ループがマルチスレッドで実行すべき領域であることを Julia に指示できます:
julia> Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
イテレーションスペースはスレッド間で分割され、その後、各スレッドは割り当てられた場所にスレッド ID を書き込みます:
julia> a
10-element Array{Float64,1}:
1.0
1.0
1.0
2.0
2.0
2.0
3.0
3.0
4.0
4.0
Threads.@threads
には、@distributed
のようなオプションの縮小パラメーターがないことに注意してください。
アトミック操作
Julia は、アトミックな、つまり、競合状態を避けるためのスレッドセーフな方法で、値のアクセスと変更をサポートしています 。アトミックにアクセスする必要があることを示す値 (プリミティブ型である必要があります) を Threads.Atomic
としてラップできます。 例を見てみましょう:
julia> i = Threads.Atomic{Int}(0);
julia> ids = zeros(4);
julia> old_is = zeros(4);
julia> Threads.@threads for id in 1:4
old_is[id] = Threads.atomic_add!(i, id)
ids[id] = id
end
julia> old_is
4-element Array{Float64,1}:
0.0
1.0
7.0
3.0
julia> ids
4-element Array{Float64,1}:
1.0
2.0
3.0
4.0
アトミックタグなしで加算を試すと、競合によって間違った答えが得られたかもしれません。競合を避けなかった場合にどうなるかの例は下記の通りです:
julia> using Base.Threads
julia> nthreads()
4
julia> acc = Ref(0)
Base.RefValue{Int64}(0)
julia> @threads for i in 1:1000
acc[] += 1
end
julia> acc[]
926
julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)
julia> @threads for i in 1:1000
atomic_add!(acc, 1)
end
julia> acc[]
1000
!!!メモ すべてのプリミティブ型を Atomic
タグでラップできるわけではありません。 サポートされている型はInt8
、Int16
、Int32
、Int64
、Int128
、UInt8
、UInt16
、UInt32
、UInt64
、UInt128
、Float16
、Float32
、およびFloat64
です。 さらに Int128
および UInt128
は AAarch32 および ppc64le ではサポートされていません。
副作用と変更可能な関数引数
マルチスレッドを使用時、純粋 でない関数を使う場合には、間違った答えが得られる可能性があるので、注意が必要です。例えば、 名前が!
で終わる関数は、慣習的に引数に指定された値を書き換えるため、純粋な関数ではありません。しかし、副作用を持つにも関わらず、その名前が!
で終わらない関数があります。例えば、 findfirst(regex, str)
は、引数 regex
を変更しますし、rand()
は、Base.GLOBAL_RNG
を変更します :
julia> using Base.Threads
julia> nthreads()
4
julia> function f()
s = repeat(["123", "213", "231"], outer=1000)
x = similar(s, Int)
rx = r"1"
@threads for i in 1:3000
x[i] = findfirst(rx, s[i]).start
end
count(v -> v == 1, x)
end
f (generic function with 1 method)
julia> f() # the correct result is 1000
1017
julia> function g()
a = zeros(1000)
@threads for i in 1:1000
a[i] = rand()
end
length(unique(a))
end
g (generic function with 1 method)
julia> Random.seed!(1); g() # the result for a single thread is 1000
781
このような場合は、競合状態に陥るの可能性を避けるためにコードを再設計するか、同期プリミティブを使ってください。
たとえば、上記の findfirst
の例を解決するには、スレッドごとに変数rx
を別々にコピーして使う必要があります:
julia> function f_fix()
s = repeat(["123", "213", "231"], outer=1000)
x = similar(s, Int)
rx = [Regex("1") for i in 1:nthreads()]
@threads for i in 1:3000
x[i] = findfirst(rx[threadid()], s[i]).start
end
count(v -> v == 1, x)
end
f_fix (generic function with 1 method)
julia> f_fix()
1000
r"1"
の代わりにRegex("1")
を使って Julia が rx
ベクトルのそれぞれのエントリに対して、別々のRegex
オブジェクトを生成するようにしています。
rand
の例については、もう少し複雑です。各スレッドが重複しない擬似乱数シーケンスを使用するようにする必要があるためです。これは、単に Future.randjump
関数を使用して保証することができます:
julia> using Random; import Future
julia> function g_fix(r)
a = zeros(1000)
@threads for i in 1:1000
a[i] = rand(r[threadid()])
end
length(unique(a))
end
g_fix (generic function with 1 method)
julia> r = let m = MersenneTwister(1)
[m; accumulate(Future.randjump, fill(big(10)^20, nthreads()-1), init=m)]
end;
julia> g_fix(r)
1000
r
ベクトルをg_fix
に渡しているのは、複数の RGN を生成するのがコストのかかる処理であるためで、関数を実行するたびにRGNの生成を繰り返す必要はありません。
@threadcall(実験的な実装)
Julia では、すべての I/O タスク、タイマー、REPL コマンドなどは、イベント ループを介して 1 つの OS スレッドに多重化されています。これは、libuv ([http://docs.libuv.org/en/v1.x/])のパッチバージョンで提供している機能です。"yield" は、複数のタスクを同じ OS スレッドに同時にスケジューリングするための機能を提供します。I/O タスクとタイマーはイベントが発生するのを待機している間に暗黙的に生成されます。yield
を呼び出すと、(訳者註: 現在処理しているタスクが中止されるので) 他のタスクがスケジュールされるように明示的に許可を与えることができます。
したがって、ccall
を実行するタスクは、呼び出しから制御が戻るまで Julia スケジューラが他のタスクを実行するのを効果的に防ぎます。これは、外部ライブラリへのすべての呼び出しに当てはまります。例外は、逆にJulia を呼び出すカスタム C コードを呼び出す場合 (その後、あとから yield する可能性があります) または jl_yield()
(これはyield
のCバージョンです) を呼び出すCコードを、呼び出す場合です。
Julia コードは(既定では) 単一のスレッド で実行されますが、Julia が使用するライブラリは独自の内部スレッドを起動する場合があることに注意してください。たとえば、BLAS ライブラリは、マシン上のコアと同じ数のスレッドを開始できます。
@threadcall
マクロは、 ccall
が、Juliaのメインイベントループをブロックして欲しくない時に使うことができます。別のスレッドで実行する C 関数をスケジュールします。これには、デフォルト・サイズが 4 のスレッド・プールが使用されます。スレッドプールのサイズは、環境変数 UV_THREADPOOL_SIZE
で設定できます。スレッドが空くのを待機している間、およびスレッドが使用可能になって関数が実行されている間に、(メイン Julia イベント ループ上で)C関数のコールを要求するタスクは他のタスクにyield します。@threadcall
を使うと、その実行完了まで制御が戻らないことに注意してくださいしたがって、ユーザーの観点から見ると、他の Julia API と同様にブロッキング呼び出しが行われますように見えます。
非常に重要なことですが、呼び出された関数がセグメンテーションフォルトになっても、Julia にコールバックしません。
@threadcall
は、Julia 将来のバージョンで削除/変更される可能性があります。
マルチコア処理または分散処理
Julia 付属の標準ライブラリの一部である、モジュール 'Distributed' を用いて、分散メモリ並列処理を実装できます。
現代のコンピュータの多くは複数の CPU を搭載しており、複数のコンピュータをクラスタ内で組み合わせることができます。これらの複数の CPU のパワーを利用することで、多くの計算をより迅速に完了できます。パフォーマンスに影響を与える主な要因には、CPU 自体の速度とメモリへのアクセス速度の 2 つがあります。クラスタでは、ある CPU が最速でアクセスできるのは、CPUと同じコンピュータ(ノード)内のRAMであることは明らかです。より驚くべきことに、典型的なマルチコアラップトップでも、メインメモリとキャッシュの速度の違いによって同様の問題が起きています。したがって、優れたマルチプロセッシング環境では、特定の CPU によるメモリチャンクの「所有権」を制御できる必要があります。 Julia は、メッセージ・パッシングに基づくマルチプロセッシング環境を提供し、プログラムが別々のメモリ・ドメイン内の複数のプロセスで一度に実行できるようにします。
Julia のメッセージパッシングの実装は、MPI [1] などの他の環境とは異なります。 Julia の通信は一般に"一方向" です。プログラマが明示的に管理する必要があるのは、2 つのプロセスのうち、1 つのプロセスのみです。さらに通常、これらの操作は見かけ上、"メッセージ送信" や "メッセージ受信" のような形でなく、ユーザー関数の呼び出しのような高レベルの操作に似た形になります。
Julia の分散プログラミングは、リモート参照 と リモート呼び出しの 2 つのプリミティブに基づいて構築されています。 リモート参照は、任意のプロセスから特定のプロセスに格納されているオブジェクトを参照するために使用できるオブジェクトです。リモート呼び出しとは、あるプロセスが別の (おそらく同じ) プロセスの特定の引数に対して特定の関数を呼び出すリクエストのことです。
リモート参照には、Future
とRemoteChannel
の2つのフレーバーがあります。
リモート呼び出しを行うと、Future
が戻り値として得られ、さらに、プロセスの制御は、呼び出し側に直ちに返されます; すなわち、呼び出しを行ったプロセスは、リモート呼び出しが別の場所で行われる間、次の操作に進みます。リモート呼び出しから返された Future
に対する wait
関数を呼び出すことで、呼び出したプロセスが終了するのを待つことができます。また、fetch
を使って呼び出したプロセスの結果全ての値を取得することができます。
一方、RemoteChannel
は書き換え可能です。たとえば、複数のプロセスが同じリモートの 'Channel' を参照して処理を調整することができます。
各プロセスには、識別子が関連付けられます。インタラクティブなJulia プロンプトを提供するプロセスは、常に1に等しいid
を持っています。並列処理にデフォルトで使用されるプロセスは、"ワーカー" と呼ばれます。プロセスが 1 つしかない場合、プロセス 1 はワーカーと見なされます。それ以外の場合、ワーカーはプロセス 1 以外のすべてのプロセスと見なされます。
それではやってみましょう。julia -p n
と julia
を起動することで、ローカルマシン上のn
個のワーカープロセスを使うことができます。一般に、'n' がマシン上の CPU スレッド (論理コア) の数と等しくするのが理にかなっているでしょう。オプション引数 -p
は Distributed
モジュールを暗黙的に読み込むことに注意してください。
$ ./julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
remotecall
の最初の引数は、リモートで呼び出す関数です。Julia のほとんどの並列プログラミングでは、特定のプロセスや使用可能なプロセスの数は参照されませんが、remotecall
は、細かい制御を提供する低レベルのインターフェイスで、2番目の引数で、作業を行うプロセスのid
を指定できます。残りの引数は呼び出される関数に渡されます。
ご覧の通り、最初の行ではプロセス 2 に 2✕2 のランダム行列を作成するよう依頼し、2 行目では 1 を追加するように依頼しました。両方の計算の結果は、r
と s
の 2 つのfuture
で使用できます。@spawnat
マクロは、第一引数で指定されたプロセスid で、第二引数で指定された式を評価します。
場合によっては、リモートで計算された値がすぐに必要になる場合があります。典型例は、次のローカル操作でリモートの処理結果が必要になり、リモート オブジェクトから値を読み取る必要がある、という場合で、こういう時には、remotecall_fetch
が使えます。これは fetch(remotecall(...))
とするのと同等ですが、より効率的です(訳者註: 効率的というのはタイプ量が減るという意味で? それとも余計なオブジェクトができない、というリソース的な意味で? )。
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.18526337335308085
getindex(r,1,1)
は、r[1,1]
と等しいことを覚えて置いてください。この呼び出しはfutureのr
の最初の要素をフィッチします。
To make things easier, the symbol :any
can be passed to [@spawnat
], which picks where to do the operation for you:
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
1 .+ r
ではなく、 1 .+ fetch(r)
を使ったところに注意してください。 これは、コードがどこで実行されるかわからないため、一般的に、加算を行うプロセスに r
を移動するために fetch
が必要になる場合があります。この場合、@spawnat
はr
を所有するプロセスで計算を実行するのに十分にスマートな方法になっていて、fetch
はno-op(作業は行われない)ことになります。
( @spawnat
は組み込み関数ではなく、Julia で マクロ として定義されていること注目する価値があります。このような仕組みを自分で独自に定義することは可能です。)
覚えておくべき重要なことは、一度結果がフェッチされると、Future
はその値をローカルにキャッシュするということです。さらにfetch
呼び出しは、ネットワークホップを伴いません。一度、Future
s のすべての参照が取得されると、リモートに保存された値は削除されます。
@spawnat
に似たものに、@async
がありますが、@async
でタスクが実行できるのはローカルプロセスのみです。@async
を使うと、タスク毎にフィーダータスクが生成されます。それぞれのタスクは、計算する必要があるインデックスを選択し、そのプロセスが終了するのを待つ、ということを、インデックスがなくなるまで繰り返します。フィーダー タスクは、メインタスクが @sync
ブロックの最後に達するまで実行を開始せず、その時点で制御が放棄され、関数から戻る前にすべてのローカル タスクが完了するのを待つことに注意してください。 Julia の v0.7 以降では、フィーダー タスクはすべて同じプロセスで実行されるため、nextidx
を介して各タスクの状態を共有できます。 タスクが協調的にスケジュールされている場合でも、非同期 I/Oのように、ロックが必要な場合があります。 つまり、コンテキストスイッチは、remotecall_fetch
が呼び出されたときに、明確に定義されたポイントでのみ発生します: これは現在の実装の状態であり、将来的には変更されるかもしれませんが、現在は、M
個のプロセスに対してN
個のタスクを実行することが可能なように実装されています。これを別名M:Nスレッドと言います。ロックの取得/リリースモデルをnextidx
に対して用いる必要があります。複数のプロセスが、同時にリソースを読み書きできるのは危険なためです。
コードの可用性とパッケージの読み込み
何かコードを実行するならば、実行プロセスからそのコードが利用可能でなければなりません。例えば、Julia のプロンプトに次のように入力してみましょう:
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]
プロセス1は rand2
関数を知っていましたが、プロセス2は知りませんでした。
大抵の場合は、コードをファイルやパッケージからロードすることで、あなたはかなり柔軟にプロセスがロードするコードを制御することができます。下記のコードを含む、DummyModule.jl
というファイルを考えてみましょう:
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
すべてのプロセスで MyType
を参照するには、すべてのプロセスに DummyModule.jl
をロードする必要があります。 include("DummyModule.jl")
を呼び出すと、単一のプロセスでのみ、コードが読み込まれます。すべてのプロセスにロードするには、@everywhere
マクロを使用します(Juliaを julia -p 2
で開始して):
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
いつも(シングルコアのケース)と同様に、上記は、DummyModule
を全てのプロセスでスコープ内に持ち込むものでありません。using
や import
をする必要があります。さらに、あるプロセスで DummyModule
がスコープに入ったからと言って、他のプロセスのスコープに DummyModule
はありません:
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: MyType not defined
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
ただし、DummyModule
がスコープになくても、ロードされていれば、そのプロセスに対して、例えば、MyType
を送信することは可能です:
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
また、ファイルは、Julia 起動時に、-L
フラグオプションで複数プロセスにプリロードでき、実行対象の"driver.jl" スクリプトが 計算を駆動することができます:
julia -p <n> -L file1.jl -L file2.jl driver.jl
上記の例で、ドライバスクリプトを実行している Julia プロセスは、対話型プロンプトを提供するプロンプトと同様にプロセスid
1です。
最後に、もし、DummyModule.jl
が、スタンドアロンのファイルではなくパッケージだった場合 using DummyModule
とすると、DummyModule.jl
のコードは全てのプロセスで ロード されますが、DummyModule
がスコープにはいるのは、using
が実行されたプロセスだけになります。
ワーカー プロセスの開始と管理
Julia をインストールすると、以下2種類のクラスタをサポートする機能が含まれています:
- 上述の
-p
オプションで指定するローカルクラスタ (ノード内マルチコア)。 --machine-file
オプションをつかった複数計算機をまたぐクラスタ。これはパスワード無しのssh
ログインを使って、現在のホストと同じ経路で、指定された計算機のJulia のワーカープロセスを起動します。
addprocs
, rmprocs
, workers
などの関数で、クラスタ内のプロセスを追加・削除・問い合わせするプログラミング手段が提供されます。
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
Distributed
モジュールは、addprocs
を呼び出す前に、マスターとなるプロセスに明示的にロードする必要があります。 ワーカー プロセスで自動的に使用可能になります。
ワーカーは ~/.julia/config/startup.jl
の起動スクリプトを実行せず、グローバル状態 (グローバル変数、新しいメソッド定義、読み込まれたモジュールなど) も他の実行中プロセスと同期されないことに注意が必要です。
他のタイプのクラスターは、ClusterManager
セクションで説明するように、独自のClusterManager
を記述することによってサポートできます。
データの移動
メッセージの送信とデータの移動は、分散プログラムのオーバーヘッドの大部分を占めています。パフォーマンスとスケーラビリティを実現するには、メッセージの数と送信されるデータ量を減らすことが重要です。 そのためには、Julia のさまざまな分散プログラミングコンストラクトによって実行されるデータ移動を理解することが重要です。
fetch
は、オブジェクトをローカル マシンに移動することを直接要求するため、明示的なデータ移動操作と見なすことができます。@spawnat
(およびいくつかの関連する構成要素) もデータを移動しますが、これは明示的でないため、暗黙的なデータ移動操作と呼ぶことができます。ランダム行列を構築および二乗する方法を以下の2つ考えてみましょう:
方法 1:
julia> A = rand(1000,1000);
julia> Bref = @spawnat :any A^2;
[...]
julia> fetch(Bref);
方法 2:
julia> Bref = @spawnat :any rand(1000,1000)^2;
[...]
julia> fetch(Bref);
違いは些細に見えますが、実際には@spawnat
の動作を考えると非常に重要な違いです。 最初の方法では、ランダム行列がローカルで構築され、その後、別のプロセス送信されて、そこで2乗されます。2 番目の方法では、ランダム行列が別のプロセスで構築された後二乗されます。したがって、2 番目のメソッドのデータ送信量は、最初のメソッドでの送信量よりはるかに少ないです。
このToy example では、これら2つの方法を区別し、選択するのは簡単です。しかし、実際のプログラムでデータの移動を設計するには、より多くのことを考慮し、おそらくはなんらかの測定が必要になる場合があります。たとえば、最初のプロセスに行列 A
が必要な場合は、最初のメソッドの方が適している可能性があります。または、A
の計算が高価で、現在のプロセス(ローカルのプロセス)だけでそれが計算されているのならば、別のプロセスへのデータ転送は避けられないかもしれません。または、現在のプロセスで @spawnat
と fetch(Bref)
の間で行うことはほとんどない場合は、並列処理を完全に排除する方が良いかもしれません。または、rand(1000,1000)
がより高価な操作に置き換えられると想像してみてください。次に、この手順に@spawnat
ステートメントを追加するのが理にかなっている場合があります。
グローバル変数
@spawnat
を介してリモートで実行される式、または remotecall
を使用してリモート実行用に指定されたクロージャは、グローバル変数を参照できます。モジュール Main
のグローバル バインディングは、他のモジュールのグローバル バインディングとは少し異なる方法で扱われます。次のコード スニペットについて考えてみましょう:
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
この場合、sum
はリモート プロセスで定義する必要があります。 A
は、ローカル ワークスペースで定義されたグローバル変数であることに注意してください。ワーカープロセス 2 は、Main
の下で呼び出された変数A
を所有していません。クロージャー ()->sum(A)
をワーカープロセス2 に送られた結果として、Main.A
がワーカプロセス2に定義されます。 remotecall_fetch
から制御がローカルに戻ったあとも、main.A
はワーカープロセス 2 に存在し続けます。 埋め込みグローバル参照 (Main
モジュールのみ) を伴う、リモート呼び出しは、グローバル変数を下記のように管理します:
新しいグローバルバインディングがリモート呼び出しの一部として参照されている場合には、そのグローバルバインディングは、ワーカー上に作成されます。
グローバル定数は、リモート ノードでも定数として宣言されます。
グローバルは、ワーカープロセスに再送信されるのは、リモート呼び出しのコンテキストで、さらにその値が変更された場合にのみです。また、クラスターはノード間でグローバル バインディングを同期しません。 例えば:
A = rand(10,10) remotecall_fetch(()->sum(A), 2) # worker 2 A = rand(10,10) remotecall_fetch(()->sum(A), 3) # worker 3 A = nothing
上記のスニペットを実行すると、ノード 1 の
Main.A
の値は "nothing" に設定されているのですが、ワーカー 2のMain.A
は、ワーカー3のそれと異なる値を持ちます。
これでお分かりのように、グローバルに関連付けられたメモリはマスターで別の値が代入されたときにに収集される可能性がありますが、ワーカーではそのようなアクションは実行されません。バインディングが引き続き有効だからです。 clear!
を使用すると、リモートノード上の特定のグローバルを、不要になったときに手動で nothing
を再代入できます。これにより、通常のガベージ コレクション サイクルの一部として、それらに関連付けられているすべてのメモリが解放されます。
したがって、プログラムではリモート呼び出しでのグローバル参照をする時は、注意が必要です。実際には、可能であればリモートでのグローバル参照は完全に避けるのが好ましいです。グローバルを参照する必要がある場合は、let
ブロックを使用してグローバル変数をローカライズすることを検討してください。
例えば:
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
ここでわかるように、グローバル変数 A
は、ワーカー2で定義されていますが、B
はローカル変数として補足されるため、ワーカー2では B
のバインディングは存在しません。
並行マップと並行ループ
幸いにも、便利な並列計算の多くは、データの移動を必要としません。一般的な例は、複数のプロセスが独立したシミュレーション試行を同時に処理できるモンテカルロシミュレーションです。2つのプロセスでコインを反転させるために@spawnat
を使用することができます。まず、count_heads.jl
に次の関数を書きます:
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
関数 count_heads
は単に n
個のランダムビットの和をとります。2 台のマシンで試行を実行し、それぞれの結果の和をとる方法を次に説明します:
julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
この例で示されているのは、、並列プログラミングにおける、強力かつ頻出のパターンです。多くのイテレーションは複数のプロセスで独立に実行され、その結果は何らかの関数を使用して統合されます。その統合のプロセスはreduceと呼ばれます。その処理は、一般的にテンソルランク低減で、数値ベクトルが単一の数値に減少するとか、行列が単一の行または列などに減らされるような処理だからです。コードでは、通常、x
がアキュムレータ、f
がreduce関数、v[i]
が削減される要素であるパターンで、 x= f(x,v[i])
のようなコードになります。f
は、操作が実行される順序に関係ない、つまり結合律を満たすことが望ましいです。
count_heads
を使用したこのパターンの使用は一般化できることに注意してください。2 つの明示的な @spawnat
ステートメントを使用して、並列処理を 2 つのプロセスに制限しました。任意の数のプロセスで実行するには、分散メモリで実行される parallel for loop を、次のように @distributed
を使って書くことができます:
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
このコンストラクトは、繰り返し処理を複数のプロセスに計算させて、その結果を、指定された reduce 処理(この場合は (+)
)を使って統合する 実装パターンを実現しています。各反復の結果としてループ内の最後の式の値が受け取られます。並列ループ式全体が、最終的な計算結果として評価されます。
並行処理の for ループは、逐次処理の for ループと似ているようにみえますが、その動作は大きく異なることに注意が必要です。特に、反復は指定された順序で行われないこと、反復が異なるプロセスで実行されるので、変数または配列への書き込みはグローバルからは見えないこと等に注意しましょう。並列ループ内で使用される変数すべては、各プロセスにコピーされ、ブロードキャストされます。
たとえば、次のコードは意図したとおりに機能しません:
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
このコードは、a
の全ての要素を初期化できません。各プロセスには個別のコピーがあるからです。このような並行 for ループは避けなければいけません。さいわい、 Shared Array を使用して、この制限を回避できます:
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
並列ループにおける "外部" 変数の使用は、変数が読み取り専用の場合に全く問題有りません:
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
ここでは、各反復は、すべてのプロセスで共有されるベクトル a
からランダムに選択されたサンプルに f
を適用します。
ご覧の通り、reduce 演算子は必要ない場合は省略できます。その場合、ループは非同期的に実行され、つまり使用可能なすべてのワーカーに対して独立したタスクが生成され、完了を待たずにすぐに Future
の配列を返します。呼び出し元は、fetch
を呼び出して Future
の完了を後で待つこともできますし、@sync
をつけて、つまり、 @sync @distributed for
ループを使って、ループの最後で完了を待つことができます。
場合によっては、reduce 演算子は必要なく、ある範囲のすべての整数 (または一般的には、一部のコレクション内のすべての要素) に関数を適用するだけということもあるでしょう。これは、パラレル マップと呼ばれる便利な処理で、Julia ではpmap
関数として実装されています。たとえば、複数の大きなランダム行列の特異値を次のように並列に計算できます:
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Julia のpmap
は、1回の関数呼び出しのなかで大量の処理を行う場合を想定して設計されています。対して、@distributed for
は、各反復が小さく、おそらく単に 2 つの数値を合計するだけのような状況を処理できます。並列計算にはpmap
と@distributed for
の両方でワーカー プロセスのみが使用されます。@distributed
の場合、最終的なreduce処理は呼び出しプロセスで行われます。
リモート参照と抽象チャネル
リモート参照は常にAbstractChannel
の実装を指します。
抽象チャンネル
(「チャネル」など)の具体的な実装は、put!
、take!
、fetch
、isready
、wait
をインストールするのに必要です。 Future
が参照するリモートオブジェクトは、 Channel{Any}(1)
すなわち サイズ1でAny
型を保持するチャンネルです。
RemoteChannel
は、再書き込み可能であり、チャネルの任意のタイプとサイズ、またはAbstractChannel
の他の実装を指すことができます。
コンストラクタ RemoteChannel(f::Function,pid)()
を使用すると、特定の型の複数の値を保持するチャネルへの参照を構築できます。f
は pid
で実行される関数であり、AbstractChannel
を返す必要があります。
たとえば、RemoteChannel(()->Channel{Int}(10), pid)
は、Int
型でサイズ 10のチャネルへの参照を返します。 チャネルはワーカー pid
に存在します。
RemoteChannel
上のメソッド put!
, take!
, fetch
, isready
and wait
はリモートプロセスのバッキングストアにプロキシされます。
よって、RemoteChannel
はユーザーが実装した AbstractChannel
オブジェクトを参照するのに使うことができます。これのシンプルな例はExamples repositoryのdictchannel.jl
に見ることができます。これは辞書をリモートストアとして辞書を使うような例です。
チャネルとリモートチャネル
Channel
は、プロセスに対してローカルなものです。ワーカー2は、ワーカー3のChannel
を直接参照することはできません。ただし、RemoteChannel
は、ワーカーをまたいで値のput/take
を行うことができます。RemoteChannel
は、Channel
に対する ハンドル と考えることができます。RemoteChannel
に関連付けられたプロセスIDpid
は、バッキングストア、すなわち "backingChannel
" が存在するプロセスを識別します。RemoteChannel
を参照するプロセスは、RemoteChannel
から、値のput/take
を行うことができます。 データはRemoteChannel
が関連付けられているプロセスに自動的に送信(または取得)されます。Channel
をシリアル化すると、チャネルに存在するデータもシリアル化されます。したがって、逆シリアル化すると、元のオブジェクトのコピーを効果的に作成できます。- 一方、
RemoteChannel
のシリアル化には、ハンドルによって参照されるChannel
の場所とインスタンスを識別する識別子のシリアル化のみが含まれます。任意のワーカー上の 逆シリアル化されたRemoteChannel
オブジェクトは、元のバッキング ストアと同じバッキング ストアを指します。
上記のチャネル例は、以下に示すように、プロセス間通信用に変更できます。
ここでは、ジョブを格納する 単一のリモートチャネルを処理するために4つのワーカーを立ち上げます。ID (job_id
) によって識別されるジョブは、チャネルに書き込まれます。このシミュレーションでリモートで実行している各タスクは job_id
を読み取り、ランダムな時間だけ待機し、job_id
のタプル、時間、および独自のpid
を "results" チャネルに書き戻します。最後に、すべての結果がマスタープロセスにプリント出力されます。
julia> addprocs(4); # add worker processes
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
リモート参照と分散ガベージ コレクション
リモート参照で参照されるオブジェクトは、クラスター内に保持されている全ての 参照が削除された場合にのみ解放できます。
値が格納されているノードは、どのワーカーがその値を参照しているかを追跡します。 RemoteChannel
または(フェッチされていない)Future
がワーカーにシリアル化されるたびに、参照によって指し示されたノードが通知されます。また、RemoteChannel
または(フェッチされていない)Future
がローカルに収集されるたびに、値を所有するノードが再度通知されます。これは、内部クラスター対応シリアライザーで実装されます。リモート参照は、実行中のクラスターのコンテキストでのみ有効です。通常の IO
オブジェクトからの参照、通常の IO
オブジェクトへの参照のシリアル化と逆シリアル化はサポートされていません。
通知は、「トラッキング」メッセージの送信を介して行われます。このメッセージとは、参照が別のプロセスにシリアル化されるときには、「参照の追加」メッセージであり、参照がローカルのガベージ コレクションで処理されたときには「参照を削除」メッセージに相当します。
Future
は、書き込みが一度で、ローカルにキャッシュされるので、 fetch
の振る舞いは、その値を所有しているノードの参照追跡情報の更新も行います。
値を所有するノードは、値へのすべての参照がクリアされると、値(のメモリを)解放します。
Future
を使用すると、既にフェッチされた Future
を別のノードにシリアル化すると、(参照だけでなく)値そのものも送信されます。元のリモート ストアが現時点までの間に値を収集した可能性があるためです。
いつオブジェクトがローカルに収集されるかは、オブジェクトのサイズとシステム内の現在のメモリ負荷によって異なる、ということは注意すべきです。
リモート参照の場合、リモートノードにある値が非常に大きくなる可能性があるのですが、ローカル参照オブジェクトのサイズは非常に小さくなります。ローカルオブジェクトはすぐに収集されないことがあるため、RemoteChannel
のローカルインスタンスや、フェッチされていないFuture
に対して、finalize
を明示的に呼ぶのはよいアイディアです。Future
に対してfetch
を呼ぶと、その参照もリモートストアから削除されてしまうので、フェッチされたFuture
については、finalizeする必要はありません。明示的にfinalize
を呼ぶとリモートノードに対してすぐにメッセージが送信されて、処理が先に進み、値への参照が削除されます。
いったんファイナライズされると、参照は無効になり、それ以降の呼び出しでは使用できません。
ローカル呼び出し
データは、処理が実行されるリモートノードに必ずコピーされます。これは、remotecallされた場合、データがRemoteChannel
に格納されたり別のノードで Future
が呼ばれた場合でも同様です。予想できるように、これはリモートノードでのシリアル化されたオブジェクトのコピーとなります。すなわち、呼び出しを行うプロセスのIDと、リモートのノードのIDは同じで、ローカル呼び出しとして実行されます。大抵の場合(常に、ではありません)は、別のタスクで実行され、しかしそこではデータのシリアライズ/逆シリアライズの処理は発生しません。呼び出しでは、受け渡されたオブジェクトインスタンスと全く同じものを参照して、コピーはつくられません。ここで述べた振る舞いは、以下の例でよく分かります:
julia> using Distributed;
julia> rc = RemoteChannel(()->Channel(3)); # RemoteChannel created on local node
julia> v = [0];
julia> for i in 1:3
v[1] = i # Reusing `v`
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> rc = RemoteChannel(()->Channel(3), workers()[1]); # RemoteChannel created on remote node
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
ご覧の通り、ローカルで所有されたRemoteChannel
とv
に対してput!
を(複数回)呼ぶ場合、その複数回の呼び出しの間に v
を修正すると、単一のオブジェクトインスタンスが格納されます。対象的に、rc
を所有するノードが別のノードである時、(その時々の)v
のコピーが生成されるのと対象的です。
この振る舞いは、一般的には大した問題にはならないと思ってください。この振る舞いを考慮する必要があるとすれば、それは、オブジェクトがローカルに格納され、その後 (put!
などの)呼び出しの後で、そのオブジェクトが変更された状況だけです。このような場合は、オブジェクトの「ディープコピー」を保存するのが適切でしょう。
これは、次の例に示すように、ローカル ノードのremotecall
にも当てはまります:
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # Executed on local node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
ここでもまたご覧の通り、ローカル ノードへの remotecall
は、直接呼び出しと同様に機能します。 ローカルノードへの remotecall
は、引数として渡されたローカル オブジェクトを変更します。リモート呼び出しでは、引数のコピーに対する処理になります。
繰り返しますが、一般的にこのふるまいが問題になることはありません。もし、ローカル ノードがコンピューティング ノードとしても使用され、引数が ローカル呼び出しの後にも使用されるとすれば、ここで触れた振る舞いを考慮する必要があります。具体的には、必要に応じて引数のディープコピーを用意してローカルノードで呼び出された呼び出しに渡す必要があります。しかし、リモート ノードの呼び出しは、常に引数のコピーで動作します。
共有配列
共有アレイは、システムの共有メモリを使用して、複数のプロセスで同じ配列をマップします。 SharedArray
は、 DArray
とはいくつかの類似点があるものの、 その動作は全く異なります。 DArray
では、 各プロセスはデータチャンクに対するローカルアクセス権を持ち、 2つのプロセスが同じチャンクを共有しません。 対照的に、 SharedArray
では、配列の共有に「参加している」プロセスはアレイ全体にアクセスできます。 SharedArray
は、同じマシン上の複数のプロセスが共同で、大量のデータアクセスさせたい場合に適しています。
共有配列にサポートするには、参加するすべてのワーカーで SharedArray
モジュールを明示的にロードする必要があります。
SharedArray
の添字アクセス (値の代入と参照) は通常の配列と同様に機能します。また、実際に値が格納されているメモリにローカル プロセスからアクセスできるため効率的です。したがって、(マルチプロセス想定で実装された) ほとんどのアルゴリズムは、単一プロセスモードだとしても SharedArray
上で自然に動作します。アルゴリズムに Array
入力が必要な場合、実際に値を格納している配列はsdata
を呼び出すことによって SharedArray
から取得できます。他の AbstractArray
型の場合は、 sdata
はオブジェクトそのものを返します。なので、( SharedArray
かどうか気にせず) Array
型オブジェクトで sdata
を使用しても安全です。
共有配列のコンストラクタは、次の形式で書きます:
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
このコンストラクタは、ビット型 T
を要素にもち、dims
で指定されるサイズの N
次元共有配列を pids
で指定されたプロセス全体で作成します。分散配列とは異なり、共有配列はpids
名前付き引数で指定されている 共有に参加しているワーカーからのみアクセスできます (同じホスト上にある場合はワーカーを作成したメインプロセスからもアクセス可能です)。
シグネチャ initfn(S:SharedArray)
の init
関数が指定されている場合は、その初期化関数は、参加しているすべてのワーカーで呼び出されます。各ワーカーが配列の別個の部分で init
関数を実行し、初期化を並列化することを指定できます。
簡単な例を次に示します:
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = myid())
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
は、インデックスの 1 次元のなかのあるインデックス範囲をコア間で重なりなく提供します。プロセス間でタスクを分割するのに便利なことがあります。もちろん、作業を任意の方法で分割することもできます:
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = myid())
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
すべてのプロセスは実データにアクセスできるため、競合が発生しないよう注意する必要があります。例えば:
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
上記のコードの動作は未定義です。各プロセスは配列全要素に自分自身のpid
の数字を代入しようとするため、(それぞれのS
の要素に対して)代入計算を最後に実行したプロセスのpid
を保持することになります。
より発展的・複雑な例として、次の "カーネル" を並行して実行する場合を考えてみましょう:
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
この場合、1 次元インデックスを使用して作業を分割しようとすると、次のような問題が発生する可能性があります: q[i,j,t]
があるワーカーに割り当てられたブロックの終わり付近にあり、q[i,j,t+1]
が別のワーカーに割り当てられたブロックの先頭付近にある場合、q[i,j,t]
が計算に必要な時点で、準備出来ていない可能性が非常に高いです。このような場合は、手動で配列をチャンクする方が良いでしょう。2 番目の次元に沿って分割してみましょう。 このワーカーに割り当てられた (irange,jrange)
インデックスを返す関数を定義します:
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
次に、カーネルを定義します:
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
また、SharedArray
実装に利便なラッパーも定義します
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
さぁ、3 つの異なるバージョンを比較してみましょう。1つ目は1つのコアで実行されるもの:
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
もうひとつは、 @distributed
を使うもの:
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
そして、チャンク単位で別々のコアに計算を任せるもの:
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
SharedArray
を作成し、これらの関数の実行時間を測定すると、次の結果が得られます ( julia -p 4
で実行したとします):
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
JITコンパイルのために一度関数を実行してから、二回目の実行で測定します:
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
advection_shared!
の最大の利点は、ワーカー間のトラフィックを最小限に抑え、割り当てられたピースに対して長時間計算できるようにすることです。
共有配列と分散ガベージ コレクション
リモート参照と同様に、共有配列もまた、配列が生成されたノードのガベージ コレクションに依存します。ここでいうガベージコレクションは、その配列を共有しているすべてのワーカーからの参照を解放するものです。サイズが小さい共有配列をたくさん生成するようなコードは、これらのオブジェクトをできるだけ早く明示的にファイナライズすると、得るものが多いでしょう。結果、メモリハンドル・ファイルハンドルの両方で、より早く解放される共有セグメントをマッピングしてくれます。
クラスタマネージャ
Julia プロセスの起動、管理、および論理クラスタへのネットワーキングは、クラスタ マネージャを介して行われます。ClusterManager
が担当するのは
- クラスタ環境でワーカープロセスを起動する
- 各ワーカーの有効期間中のイベントの管理
- 必要に応じて、データ転送を提供する
Julia クラスターには、次の特徴があります:
master
とも呼ばれる最初のJuliaプロセスは特別であり、1のid
を持っています。- ワーカープロセスを追加または削除できるのは
master
プロセスだけです。 - すべてのプロセスは、互いに直接通信できます。
(組み込みの TCP/IP トランスポートを使用して) ワーカー間の接続は、次の方法で確立されます:
addprocs
は、ClusterManager
オブジェクトを持つマスター・プロセスで呼び出されます。addprocs
は、要求された数のワーカープロセスを適切なマシン上で起動する適切なlaunch
メソッドを呼び出します。- 各ワーカーは空きポートでリッスンを開始し、ホストとポートの情報を
stdout
に書き出します。 - クラスタ マネージャは、各ワーカーの
stdout
をキャプチャし、マスタープロセスで使用できるようにします。 - マスタ プロセスは、この情報を解析し、各ワーカーへの TCP/IP 接続を設定します。
- すべてのワーカーには、クラスター内の他のワーカーの情報が知らされます。
- 各ワーカーは、
id
がそのワーカー自身のid
より小さいすべてのワーカーに接続します。 - このようにしてメッシュ ネットワークが確立され、すべてのワーカーが他のすべてのワーカーと直接接続されます。
既定のトランスポート層では、プレーン TCPSocket
が使われますが、Julia クラスターが独自のトランスポートを提供する場合があります。
Julia は、次の 2 つの組み込みクラスタ マネージャを提供します:
LocalManager
:addprocs()
もしくはaddprocs(np::Integer)
が呼び出された時に使われますSSHManager
:addprocs(hostnames::Array)
が hostname のリストを伴って呼び出された時に使われます
LocalManager
は、同じホスト上で追加のワーカーを起動するために使用され、マルチコアおよびマルチプロセッサ ハードウェアを活用します。
したがって、最小限のクラスタ マネージャーは次のことが必要になります:
- 抽象型
ClusterManager
のサブタイプである - 新しいワーカーを立ち上げるメソッド
launch
を実装する - ワーカーの有効期間中にさまざまなイベントで呼び出される
manage
を実装する (例えば、割り込み信号の送信など)
addprocs(manager::FooManager)
が正しく処理されるためには FooManager
に次の関数が実装されている必要があります:
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
例として、同じホストでワーカーを開始するマネージャLocalManager
がどのように実装されているかを見てみましょう:
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
launch
メソッドは、次の引数を受け取ります:
manager::ClusterManager
: このクラスターマネージャを用いて、addprocs
が呼ばれますparams::Dict
:addprocs
に渡されるすべてのキーワード引数を格納しますlaunched::Array
: 1 つ以上のWorkerConfig
オブジェクトを追加する配列c::Condition
: ワーカーが起動されたときに通知される条件変数
launch
メソッドは、独立したタスクで非同期的に呼び出されます。このタスクが終了したということは、要求されたすべてのワーカーが起動されたということです。したがって、launch
関数は、要求されたすべてのワーカーが起動されるとすぐに終了する必要があります。
新しく起動されたワーカーは、互いにワーカー同士で、そしてマスタープロセスと 網羅的に接続されます。 コマンドライン引数 --worker[=<cookie>]
を指定すると、起動されたプロセスが自分自身をワーカーとして初期化し、接続がTCP/IP ソケットを介してセットアップされます。
クラスター内のすべてのワーカーは、マスターと同じ cookie を共有します。 クッキーが指定されていない、つまり--worker
オプションを使用すると、ワーカーは標準入力からクッキーを読み取ろうとします。 LocalManager
とSSHManager
の両方が、新しく立ち上げたワーカーに標準入力を介してクッキーを渡します。
既定では、ワーカーは getipaddr()
の呼び出しによって返されるアドレスで空きポートをリッスンします。 リッスンする特定のアドレスは、オプション引数 --bind-to bind_addr[:port]
で指定できます。 これは、マルチホームド(複数のネットワークアドレスを持つ)ホストでの利用に役立ちます。
TCP/IPトランスポート以外の例としては、MPI を利用した実装が考えられます。MPI利用においては、--worker
オプションによる指定を使っていはいけません。全ての並行処理の構成要素が用いられる前に、新しく起動したワーカーは、init_worker(cookie)
を呼び Cookieの指定をしなくてはなりません。
起動するすべてのワーカーに対して、launch
メソッドは、(適切なフィールドを初期化した) workerConfig
オブジェクト を launched
に追加する必要があります
mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# Used when launching additional workers at a host
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Any
# SSHManager / SSH tunnel connections to workers
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Used by Local/SSH managers
connect_at::Any
[...]
end
WorkerConfig
のフィールドのほとんどは、組み込みのマネージャによって使用されます。独自実装のクラスタ マネージャーは、通常、io
または host
/ port
のみを指定します:
io
が指定されている場合は、ホスト/ポート情報の読み取りに使用されます。Julia のワーカーは、起動時にバインドされた IPアドレスとポートをプリント出力します。これにより、Julia ワーカーは、ワーカー のポートを手動で設定する代わりに、使用可能な任意の空きポートをリッスンできます。io
が指定されていない場合は、host
,port
を使用して接続します。count
,exename
,exeflags
は、ワーカーから別の追加のワーカーを起動する機能に関連するもので。例えば、クラスターマネージャーがノード毎に1つのワーカーを起動し、そのワーカーに追加のワーカーを起動させることができます。count
は 整数値n
を指定すると、そのマシンで合計n
個のワーカーを起動します。count
に:auto
値を指定すると、そのマシンのCPUスレッド(論理コア)数と同じだけのワーカーを起動します。exename
は、フルパスを含むjulia
プログラムの名前です。exeflags
には、新しいワーカに必要なコマンドライン引数を設定する必要があります。.
tunnel
,bind_addr
,sshflags
,max_parallel
は、ssh トンネルがマスタープロセスからワーカーに接続する必要があるときに用いられますuserdata
は、独自実装のクラスターマネージャーが、ワーカー固有の情報を取得・保持する目的で提供されます
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
is called at different times during the worker's lifetime with appropriate op
values:
- with
:register
/:deregister
when a worker is added / removed from the Julia worker pool. - with
:interrupt
wheninterrupt(workers)
is called. TheClusterManager
should signal the appropriate worker with an interrupt signal. - with
:finalize
for cleanup purposes.
カスタム トランスポートを使用したクラスタ マネージャ
Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a little more involved. Each Julia process has as many communication tasks as the workers it is connected to. For example, consider a Julia cluster of 32 processes in an all-to-all mesh network:
- Each Julia process thus has 31 communication tasks.
- Each task handles all incoming messages from a single remote worker in a message-processing loop.
- The message-processing loop waits on an
IO
object (for example, aTCPSocket
in the default implementation), reads an entire message, processes it and waits for the next one. - Sending messages to a process is done directly from any Julia task–not just communication tasks–again, via the appropriate
IO
object.
Replacing the default transport requires the new implementation to set up connections to remote workers and to provide appropriate IO
objects that the message-processing loops can wait on. The manager-specific callbacks to be implemented are:
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
The default implementation (which uses TCP/IP sockets) is implemented as connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
.
connect
should return a pair of IO
objects, one for reading data sent from worker pid
, and the other to write data that needs to be sent to worker pid
. Custom cluster managers can use an in-memory BufferStream
as the plumbing to proxy data between the custom, possibly non-IO
transport and Julia's in-built parallel infrastructure.
A BufferStream
is an in-memory IOBuffer
which behaves like an IO
–it is a stream which can be handled asynchronously.
The folder clustermanager/0mq
in the Examples repository contains an example of using ZeroMQ to connect Julia workers in a star topology with a 0MQ broker in the middle. Note: The Julia processes are still all logically connected to each other–any worker can message any other worker directly without any awareness of 0MQ being used as the transport layer.
When using custom transports:
- Julia workers must NOT be started with
--worker
. Starting with--worker
will result in the newly launched workers defaulting to the TCP/IP socket transport implementation. - For every incoming logical connection with a worker,
Base.process_messages(rd::IO, wr::IO)()
must be called. This launches a new task that handles reading and writing of messages from/to the worker represented by theIO
objects. init_worker(cookie, manager::FooManager)
must be called as part of worker process initialization.- Field
connect_at::Any
inWorkerConfig
can be set by the cluster manager whenlaunch
is called. The value of this field is passed in in allconnect
callbacks. Typically, it carries information on how to connect to a worker. For example, the TCP/IP socket transport uses this field to specify the(host, port)
tuple at which to connect to a worker.
kill(manager, pid, config)
is called to remove a worker from the cluster. On the master process, the corresponding IO
objects must be closed by the implementation to ensure proper cleanup. The default implementation simply executes an exit()
call on the specified remote worker.
The Examples folder clustermanager/simple
is an example that shows a simple implementation using UNIX domain sockets for cluster setup.
LocalManager と SSHManager のネットワーク要件
Julia のクラスターは、ローカルラップトップ、部門のクラスタ、クラウドなどのインフラ上で既にセキュリティで保護されている環境を想定して設計されています。 この節では、組み込みのLocalManager
と SSHManager
のネットワークのセキュリティ要件について説明します:
The master process does not listen on any port. It only connects out to the workers.
Each worker binds to only one of the local interfaces and listens on an ephemeral port number assigned by the OS.
LocalManager
, used byaddprocs(N)
, by default binds only to the loopback interface. This means that workers started later on remote hosts (or by anyone with malicious intentions) are unable to connect to the cluster. Anaddprocs(4)
followed by anaddprocs(["remote_host"])
will fail. Some users may need to create a cluster comprising their local system and a few remote systems. This can be done by explicitly requestingLocalManager
to bind to an external network interface via therestrict
keyword argument:addprocs(4; restrict=false)
.SSHManager
, used byaddprocs(list_of_remote_hosts)
, launches workers on remote hosts via SSH. By default SSH is only used to launch Julia workers. Subsequent master-worker and worker-worker connections use plain, unencrypted TCP/IP sockets. The remote hosts must have passwordless login enabled. Additional SSH flags or credentials may be specified via keyword argumentsshflags
.addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
is useful when we wish to use SSH connections for master-worker too. A typical scenario for this is a local laptop running the Julia REPL (i.e., the master) with the rest of the cluster on the cloud, say on Amazon EC2. In this case only port 22 needs to be opened at the remote cluster coupled with SSH client authenticated via public key infrastructure (PKI). Authentication credentials can be supplied viasshflags
, for examplesshflags=`-i <keyfile>`
.In an all-to-all topology (the default), all workers connect to each other via plain TCP sockets. The security policy on the cluster nodes must thus ensure free connectivity between workers for the ephemeral port range (varies by OS).
Securing and encrypting all worker-worker traffic (via SSH) or encrypting individual messages can be done via a custom
ClusterManager
.
クラスタークッキー
All processes in a cluster share the same cookie which, by default, is a randomly generated string on the master process:
cluster_cookie()
returns the cookie, whilecluster_cookie(cookie)()
sets it and returns the new cookie.- All connections are authenticated on both sides to ensure that only workers started by the master are allowed to connect to each other.
- The cookie may be passed to the workers at startup via argument
--worker=<cookie>
. If argument--worker
is specified without the cookie, the worker tries to read the cookie from its standard input (stdin
). Thestdin
is closed immediately after the cookie is retrieved. ClusterManager
s can retrieve the cookie on the master by callingcluster_cookie()
. Cluster managers not using the default TCP/IP transport (and hence not specifying--worker
) must callinit_worker(cookie, manager)
with the same cookie as on the master.
Note that environments requiring higher levels of security can implement this via a custom ClusterManager
. For example, cookies can be pre-shared and hence not specified as a startup argument.
ネットワーク トポロジの指定 (実験的実装)
The keyword argument topology
passed to addprocs
is used to specify how the workers must be connected to each other:
:all_to_all
, the default: all workers are connected to each other.:master_worker
: only the driver process, i.e.pid
1, has connections to the workers.:custom
: thelaunch
method of the cluster manager specifies the connection topology via the fieldsident
andconnect_idents
inWorkerConfig
. A worker with a cluster-manager-provided identityident
will connect to all workers specified inconnect_idents
.
Keyword argument lazy=true|false
only affects topology
option :all_to_all
. If true
, the cluster starts off with the master connected to all workers. Specific worker-worker connections are established at the first remote invocation between two workers. This helps in reducing initial resources allocated for intra-cluster communication. Connections are setup depending on the runtime requirements of a parallel program. Default value for lazy
is true
.
Currently, sending a message between unconnected workers results in an error. This behaviour, as with the functionality and interface, should be considered experimental in nature and may change in future releases.
注目に値する外部パッケージ
Outside of Julia parallelism there are plenty of external packages that should be mentioned. For example MPI.jl is a Julia wrapper for the MPI
protocol, or DistributedArrays.jl, as presented in Shared Arrays. A mention must be made of Julia's GPU programming ecosystem, which includes:
Low-level (C kernel) based operations OpenCL.jl and CUDAdrv.jl which are respectively an OpenCL interface and a CUDA wrapper.
Low-level (Julia Kernel) interfaces like CUDAnative.jl which is a Julia native CUDA implementation.
High-level vendor-specific abstractions like CuArrays.jl and CLArrays.jl
High-level libraries like ArrayFire.jl and GPUArrays.jl
In the following example we will use both DistributedArrays.jl
and CuArrays.jl
to distribute an array across multiple processes by first casting it through distribute()
and CuArray()
.
Remember when importing DistributedArrays.jl
to import it across all processes using @everywhere
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CuArrays
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
Keep in mind that some Julia features are not currently supported by CUDAnative.jl [2] , especially some functions like sin
will need to be replaced with CUDAnative.sin
(cc: @maleadt).
In the following example we will use both DistributedArrays.jl
and CuArrays.jl
to distribute an array across multiple processes and call a generic function on it.
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # or (M*v) ./ v
end
power_method
repeatedly creates a new vector and normalizes it. We have not specified any type signature in function declaration, let's see if it works with the aforementioned datatypes:
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
To end this short exposure to external packages, we can consider MPI.jl
, a Julia wrapper of the MPI protocol. As it would take too long to consider every inner function, it would be better to simply appreciate the approach used to implement the protocol.
Consider this toy script which simply calls each subprocess, instantiate its rank and when the master process is reached, performs the ranks' sum
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl
In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding rma to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see https://mpi-forum.org/docs.