並列コンピューティング

並列コンピューティング

マルチスレッドコンピューティングと並列コンピューティングの初心者にとっては、Julia が提供する並列処理のさまざまなレベルを最初に理解すると便利です。Julia の並列処理は3つの主要なカテゴリに分類することができます:

  1. Julia コルーチン(グリーンスレッド)
  2. マルチスレッド
  3. マルチコア処理または分散処理

まず、Julia タスク(別名 コルーチン)

とJuliaランタイムライブラリに依存する他のモジュールを検討し、直接OSのスケジューラとやりとりをすることなく、タスク間の通信を完全に制御して計算を中断および再開できるようにします。 また、Julia はwaitfetch などの操作を通じたタスク間通信をサポートしています。通信とデータの同期は、タスク間通信を提供する導管である Channels を介して管理されます。

Julia は 実験的なマルチスレッド もサポートしています。実験的なマルチスレッドでは処理の実行がフォークされ、無名関数がすべてのスレッドで実行されます。 フォーク結合アプローチと呼ばれるもので、並行スレッドは独立して実行されますが、最終的には逐次実行を続行できるようにJulia のメイン スレッドに結合する必要があります。 マルチスレッドは、 Base.Threads モジュールでサポートされていますが、Julia がまだ完全にスレッドセーフではなく、実験段階のものです。特に、I/O 操作およびタスクの切り替え中にセグメンテーションフォールトが発生する可能性があります。最新情報を得るには Issue tracker

を参照してください。マルチスレッドを使うには、グローバル変数、ロック、およびアトミックを考慮しなくてはなりません。以下でこれらについて説明します。

最後に、分散コンピューティングと並列コンピューティングに対する Julia のアプローチを紹介します。科学的計算での利用を念頭に、Julia はネイティブにインターフェイスを実装しており、複数のコアまたはマシンにプロセスを分散します。 また、MPI.jlDistributedArrays.jlなどの分散プログラミングに役立つ外部パッケージについても言及します。

コルーチン

Julia の並列プログラミング プラットフォームでは、 タスク (別名 Coroutines) を使用して複数の計算を切り替えます。 軽量スレッド間で実行順序を指定するには、通信プリミティブが必要です。 Julia には Channel(func::Function, ctype=Any, csize=0, taskref=nothing) という関数があります。funcから新しいタスクを作成し、型ctypeとサイズcsizeの新しいチャネルにタスクをバインド・スケジュールします。 Channels はタスク間の通信方法として機能するもので、Channel{T}(sz:Int) が型 T とサイズ sz のバッファリングチャネルを作成するようにします。コードが fetchwait などの通信操作を実行するたびに、現在のタスクは中断され、スケジューラは実行する別のタスクを選択します。 タスクは、待機中のイベントが完了するとまた続きの処理を開始します。

多くの問題では、タスクのことを直接考える必要はありません。ただし、タスクは、同時に処理される複数のイベントを待機するのに使用できるため、動的スケジューリングを行うことができます。動的スケジューリングでは、プログラムは、他のジョブがいつ終了するかに基づいて、何を計算するか、またはどこで計算するかを決定します。これは、予測不能なワークロードや不均衡なワークロードに必要です。こうした状況では、プロセスが現在のタスクを完了した時だけ、暇になったプロセスにより多くのタスクを割り当てる、というようなことをやりたくなります。

チャンネル

制御フローのページの タスク セクションでは、協調的に動作する複数の関数の実行について議論しました。Channelは、実行中のタスク、特に I/O 操作を含むタスク間でデータを渡すのに非常に役立ちます。

I/O に関連する操作の例としては、ファイルの読み取り/書き込み、Web サービスへのアクセス、外部プログラムの実行などがあります。いずれの場合も、ファイルの読み取り中、または外部サービス/プログラムの完了を待機している間に他のタスクを実行できる場合は、全体的な実行時間を改善できます。

チャネルはパイプとみなすことができます。つまり、チャンネルは "write end" と "read end"を持ちます:

チャンネルは、forループないのイテラブルオブジェクトとして使うことができ、チャンネルがデータを持つか、オープンされている状態である限り、ループが回ります。ループ変数はチャンネルに追加された全ての値を引き受けます。forループが終了されるのは、チャンネルが閉じられ、データが空になったときです。

たとえば、次の場合、for ループは(訳注: チャンネルが閉じられていないため) さらなるデータを待機します:

julia> c = Channel{Int}(10);

julia> foreach(i->put!(c, i), 1:3) # add a few entries

julia> data = [i for i in c]

対して、こちらは、チャンネル内の全てのデータを読んだらリターンします:

julia> c = Channel{Int}(10);

julia> foreach(i->put!(c, i), 1:3); # add a few entries

julia> close(c);                    # `for` loops can exit

julia> data = [i for i in c]
3-element Array{Int64,1}:
 1
 2
 3

タスク間通信にチャネルを使用する簡単な例を考えてみましょう。私たちは、単一のjobsチャネルからデータを処理する4つのタスクを開始します。ID (job_id) によって識別されるジョブは、チャネルに書き込まれます。 このシミュレーションの各タスクはjob_idを読み取り、ランダムな時間をウェイトしてから、job_id のタプルとシミュレートされた時間の長さを 結果チャネルに書き戻します。最後に、すべてのresults が印刷されます。

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # simulates elapsed time doing actual work
                                               # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> @async make_jobs(n); # feed the jobs channel with "n" jobs

julia> for i in 1:4 # start 4 tasks to process requests in parallel
           @async do_work()
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

現在のバージョンの Julia は、すべてのタスクを単一の OS スレッドに多重化します。したがって、I/O 操作を含むタスクは並列実行の恩恵を受けますが、バインドされたタスクは単一の OS スレッドで効果的に順番に実行されます。Julia の将来のバージョンでは、複数のスレッドでのタスクのスケジューリングがサポートされるかもしれません。

マルチスレッド(実験的実装)

タスクに加えて、Julia はマルチスレッドをネイティブにサポートします。 このセクションは実験的なものであり、インターフェイスは将来変更される可能性があることに注意してください。

セットアップ

デフォルトでは、Julia は単一の実行スレッドで起動します。これは、コマンド Threads.nthreads() を使用して確認できます:

julia> Threads.nthreads()
1

Julia が開始するスレッドの数は、JULIA_NUM_THREADS という環境変数によって制御されます。 さて、4つのスレッドでジュリアを起動してみましょう:

export JULIA_NUM_THREADS=4

(上記のコマンドは、Linux および OSX 上のボーンシェルで動作します。これらのプラットフォームで C シェルを使用している場合は、exportの代わりに set というキーワードを使用する必要があります。Windows を使用している場合は、julia.exe の場所でコマンド ラインを起動し、export の代わりに set を使用します。)

4つのスレッドがあることを確認してみましょう。

julia> Threads.nthreads()
4

しかし、現在はマスタースレッドに入っています。確認するには、関数 Threads.threadidを使用します

julia> Threads.threadid()
1

The @threads マクロ

ネイティブスレッドを使用して簡単な例を見てみましょう。ゼロの配列を作成してみましょう:

julia> a = zeros(10)
10-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

4つのスレッドを使用して、この配列を同時に操作してみましょう。各スレッドで各要素にスレッド ID を書き込みます。

Julia は Threads.@threads マクロを使用した並列ループをサポートしています。このマクロをfor ループの前に追加すると、ループがマルチスレッドで実行すべき領域であることを Julia に指示できます:

julia> Threads.@threads for i = 1:10
           a[i] = Threads.threadid()
       end

イテレーションスペースはスレッド間で分割され、その後、各スレッドは割り当てられた場所にスレッド ID を書き込みます:

julia> a
10-element Array{Float64,1}:
 1.0
 1.0
 1.0
 2.0
 2.0
 2.0
 3.0
 3.0
 4.0
 4.0

Threads.@threadsには、@distributedのようなオプションの縮小パラメーターがないことに注意してください。

アトミック操作

Julia は、アトミックな、つまり、競合状態を避けるためのスレッドセーフな方法で、値のアクセスと変更をサポートしています 。アトミックにアクセスする必要があることを示す値 (プリミティブ型である必要があります) を Threads.Atomic としてラップできます。 例を見てみましょう:

julia> i = Threads.Atomic{Int}(0);

julia> ids = zeros(4);

julia> old_is = zeros(4);

julia> Threads.@threads for id in 1:4
           old_is[id] = Threads.atomic_add!(i, id)
           ids[id] = id
       end

julia> old_is
4-element Array{Float64,1}:
 0.0
 1.0
 7.0
 3.0

julia> ids
4-element Array{Float64,1}:
 1.0
 2.0
 3.0
 4.0

アトミックタグなしで加算を試すと、競合によって間違った答えが得られたかもしれません。競合を避けなかった場合にどうなるかの例は下記の通りです:

julia> using Base.Threads

julia> nthreads()
4

julia> acc = Ref(0)
Base.RefValue{Int64}(0)

julia> @threads for i in 1:1000
          acc[] += 1
       end

julia> acc[]
926

julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)

julia> @threads for i in 1:1000
          atomic_add!(acc, 1)
       end

julia> acc[]
1000

!!!メモ すべてのプリミティブ型を Atomic タグでラップできるわけではありません。 サポートされている型はInt8Int16Int32Int64Int128UInt8UInt16UInt32UInt64UInt128Float16Float32、およびFloat64です。 さらに Int128 および UInt128 は AAarch32 および ppc64le ではサポートされていません。

副作用と変更可能な関数引数

マルチスレッドを使用時、純粋 でない関数を使う場合には、間違った答えが得られる可能性があるので、注意が必要です。例えば、 名前が!で終わる関数は、慣習的に引数に指定された値を書き換えるため、純粋な関数ではありません。しかし、副作用を持つにも関わらず、その名前が!で終わらない関数があります。例えば、 findfirst(regex, str) は、引数 regexを変更しますし、rand() は、Base.GLOBAL_RNGを変更します :

julia> using Base.Threads

julia> nthreads()
4

julia> function f()
           s = repeat(["123", "213", "231"], outer=1000)
           x = similar(s, Int)
           rx = r"1"
           @threads for i in 1:3000
               x[i] = findfirst(rx, s[i]).start
           end
           count(v -> v == 1, x)
       end
f (generic function with 1 method)

julia> f() # the correct result is 1000
1017

julia> function g()
           a = zeros(1000)
           @threads for i in 1:1000
               a[i] = rand()
           end
           length(unique(a))
       end
g (generic function with 1 method)

julia> Random.seed!(1); g() # the result for a single thread is 1000
781

このような場合は、競合状態に陥るの可能性を避けるためにコードを再設計するか、同期プリミティブを使ってください。

たとえば、上記の findfirst の例を解決するには、スレッドごとに変数rxを別々にコピーして使う必要があります:

julia> function f_fix()
             s = repeat(["123", "213", "231"], outer=1000)
             x = similar(s, Int)
             rx = [Regex("1") for i in 1:nthreads()]
             @threads for i in 1:3000
                 x[i] = findfirst(rx[threadid()], s[i]).start
             end
             count(v -> v == 1, x)
         end
f_fix (generic function with 1 method)

julia> f_fix()
1000

r"1"の代わりにRegex("1")を使って Julia が rxベクトルのそれぞれのエントリに対して、別々のRegexオブジェクトを生成するようにしています。

randの例については、もう少し複雑です。各スレッドが重複しない擬似乱数シーケンスを使用するようにする必要があるためです。これは、単に Future.randjump 関数を使用して保証することができます:

julia> using Random; import Future

julia> function g_fix(r)
           a = zeros(1000)
           @threads for i in 1:1000
               a[i] = rand(r[threadid()])
           end
           length(unique(a))
       end
g_fix (generic function with 1 method)

julia>  r = let m = MersenneTwister(1)
                [m; accumulate(Future.randjump, fill(big(10)^20, nthreads()-1), init=m)]
            end;

julia> g_fix(r)
1000

rベクトルをg_fixに渡しているのは、複数の RGN を生成するのがコストのかかる処理であるためで、関数を実行するたびにRGNの生成を繰り返す必要はありません。

@threadcall(実験的な実装)

Julia では、すべての I/O タスク、タイマー、REPL コマンドなどは、イベント ループを介して 1 つの OS スレッドに多重化されています。これは、libuv ([http://docs.libuv.org/en/v1.x/])のパッチバージョンで提供している機能です。"yield" は、複数のタスクを同じ OS スレッドに同時にスケジューリングするための機能を提供します。I/O タスクとタイマーはイベントが発生するのを待機している間に暗黙的に生成されます。yieldを呼び出すと、(訳者註: 現在処理しているタスクが中止されるので) 他のタスクがスケジュールされるように明示的に許可を与えることができます。

したがって、ccallを実行するタスクは、呼び出しから制御が戻るまで Julia スケジューラが他のタスクを実行するのを効果的に防ぎます。これは、外部ライブラリへのすべての呼び出しに当てはまります。例外は、逆にJulia を呼び出すカスタム C コードを呼び出す場合 (その後、あとから yield する可能性があります) または jl_yield()(これはyield のCバージョンです) を呼び出すCコードを、呼び出す場合です。

Julia コードは(既定では) 単一のスレッド で実行されますが、Julia が使用するライブラリは独自の内部スレッドを起動する場合があることに注意してください。たとえば、BLAS ライブラリは、マシン上のコアと同じ数のスレッドを開始できます。

@threadcall マクロは、 ccallが、Juliaのメインイベントループをブロックして欲しくない時に使うことができます。別のスレッドで実行する C 関数をスケジュールします。これには、デフォルト・サイズが 4 のスレッド・プールが使用されます。スレッドプールのサイズは、環境変数 UV_THREADPOOL_SIZE で設定できます。スレッドが空くのを待機している間、およびスレッドが使用可能になって関数が実行されている間に、(メイン Julia イベント ループ上で)C関数のコールを要求するタスクは他のタスクにyield します。@threadcallを使うと、その実行完了まで制御が戻らないことに注意してくださいしたがって、ユーザーの観点から見ると、他の Julia API と同様にブロッキング呼び出しが行われますように見えます。

非常に重要なことですが、呼び出された関数がセグメンテーションフォルトになっても、Julia にコールバックしません。

@threadcallは、Julia 将来のバージョンで削除/変更される可能性があります。

マルチコア処理または分散処理

Julia 付属の標準ライブラリの一部である、モジュール 'Distributed' を用いて、分散メモリ並列処理を実装できます。

現代のコンピュータの多くは複数の CPU を搭載しており、複数のコンピュータをクラスタ内で組み合わせることができます。これらの複数の CPU のパワーを利用することで、多くの計算をより迅速に完了できます。パフォーマンスに影響を与える主な要因には、CPU 自体の速度とメモリへのアクセス速度の 2 つがあります。クラスタでは、ある CPU が最速でアクセスできるのは、CPUと同じコンピュータ(ノード)内のRAMであることは明らかです。より驚くべきことに、典型的なマルチコアラップトップでも、メインメモリとキャッシュの速度の違いによって同様の問題が起きています。したがって、優れたマルチプロセッシング環境では、特定の CPU によるメモリチャンクの「所有権」を制御できる必要があります。 Julia は、メッセージ・パッシングに基づくマルチプロセッシング環境を提供し、プログラムが別々のメモリ・ドメイン内の複数のプロセスで一度に実行できるようにします。

Julia のメッセージパッシングの実装は、MPI [1] などの他の環境とは異なります。 Julia の通信は一般に"一方向" です。プログラマが明示的に管理する必要があるのは、2 つのプロセスのうち、1 つのプロセスのみです。さらに通常、これらの操作は見かけ上、"メッセージ送信" や "メッセージ受信" のような形でなく、ユーザー関数の呼び出しのような高レベルの操作に似た形になります。

Julia の分散プログラミングは、リモート参照リモート呼び出しの 2 つのプリミティブに基づいて構築されています。 リモート参照は、任意のプロセスから特定のプロセスに格納されているオブジェクトを参照するために使用できるオブジェクトです。リモート呼び出しとは、あるプロセスが別の (おそらく同じ) プロセスの特定の引数に対して特定の関数を呼び出すリクエストのことです。

リモート参照には、FutureRemoteChannelの2つのフレーバーがあります。

リモート呼び出しを行うと、Futureが戻り値として得られ、さらに、プロセスの制御は、呼び出し側に直ちに返されます; すなわち、呼び出しを行ったプロセスは、リモート呼び出しが別の場所で行われる間、次の操作に進みます。リモート呼び出しから返された Future に対する wait 関数を呼び出すことで、呼び出したプロセスが終了するのを待つことができます。また、fetchを使って呼び出したプロセスの結果全ての値を取得することができます。

一方、RemoteChannelは書き換え可能です。たとえば、複数のプロセスが同じリモートの 'Channel' を参照して処理を調整することができます。

各プロセスには、識別子が関連付けられます。インタラクティブなJulia プロンプトを提供するプロセスは、常に1に等しいidを持っています。並列処理にデフォルトで使用されるプロセスは、"ワーカー" と呼ばれます。プロセスが 1 つしかない場合、プロセス 1 はワーカーと見なされます。それ以外の場合、ワーカーはプロセス 1 以外のすべてのプロセスと見なされます。

それではやってみましょう。julia -p njuliaを起動することで、ローカルマシン上のn個のワーカープロセスを使うことができます。一般に、'n' がマシン上の CPU スレッド (論理コア) の数と等しくするのが理にかなっているでしょう。オプション引数 -pDistributed モジュールを暗黙的に読み込むことに注意してください。

$ ./julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.18526  1.50912
 1.16296  1.60607

remotecallの最初の引数は、リモートで呼び出す関数です。Julia のほとんどの並列プログラミングでは、特定のプロセスや使用可能なプロセスの数は参照されませんが、remotecallは、細かい制御を提供する低レベルのインターフェイスで、2番目の引数で、作業を行うプロセスのidを指定できます。残りの引数は呼び出される関数に渡されます。

ご覧の通り、最初の行ではプロセス 2 に 2✕2 のランダム行列を作成するよう依頼し、2 行目では 1 を追加するように依頼しました。両方の計算の結果は、rs の 2 つのfutureで使用できます。@spawnat マクロは、第一引数で指定されたプロセスid で、第二引数で指定された式を評価します。

場合によっては、リモートで計算された値がすぐに必要になる場合があります。典型例は、次のローカル操作でリモートの処理結果が必要になり、リモート オブジェクトから値を読み取る必要がある、という場合で、こういう時には、remotecall_fetch が使えます。これは fetch(remotecall(...))とするのと同等ですが、より効率的です(訳者註: 効率的というのはタイプ量が減るという意味で? それとも余計なオブジェクトができない、というリソース的な意味で? )。

julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.18526337335308085

getindex(r,1,1) は、r[1,1]と等しいことを覚えて置いてください。この呼び出しはfutureのrの最初の要素をフィッチします。

To make things easier, the symbol :any can be passed to [@spawnat], which picks where to do the operation for you:

julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)

julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.38854  1.9098
 1.20939  1.57158

1 .+ rではなく、 1 .+ fetch(r)を使ったところに注意してください。 これは、コードがどこで実行されるかわからないため、一般的に、加算を行うプロセスに r を移動するために fetch が必要になる場合があります。この場合、@spawnatrを所有するプロセスで計算を実行するのに十分にスマートな方法になっていて、fetchはno-op(作業は行われない)ことになります。

( @spawnat は組み込み関数ではなく、Julia で マクロ として定義されていること注目する価値があります。このような仕組みを自分で独自に定義することは可能です。)

覚えておくべき重要なことは、一度結果がフェッチされると、Future はその値をローカルにキャッシュするということです。さらにfetch 呼び出しは、ネットワークホップを伴いません。一度、Futures のすべての参照が取得されると、リモートに保存された値は削除されます。

@spawnatに似たものに、@asyncがありますが、@async でタスクが実行できるのはローカルプロセスのみです。@async を使うと、タスク毎にフィーダータスクが生成されます。それぞれのタスクは、計算する必要があるインデックスを選択し、そのプロセスが終了するのを待つ、ということを、インデックスがなくなるまで繰り返します。フィーダー タスクは、メインタスクが @sync ブロックの最後に達するまで実行を開始せず、その時点で制御が放棄され、関数から戻る前にすべてのローカル タスクが完了するのを待つことに注意してください。 Julia の v0.7 以降では、フィーダー タスクはすべて同じプロセスで実行されるため、nextidxを介して各タスクの状態を共有できます。 タスクが協調的にスケジュールされている場合でも、非同期 I/Oのように、ロックが必要な場合があります。 つまり、コンテキストスイッチは、remotecall_fetchが呼び出されたときに、明確に定義されたポイントでのみ発生します: これは現在の実装の状態であり、将来的には変更されるかもしれませんが、現在は、M個のプロセスに対してN個のタスクを実行することが可能なように実装されています。これを別名M:Nスレッドと言います。ロックの取得/リリースモデルをnextidx に対して用いる必要があります。複数のプロセスが、同時にリソースを読み書きできるのは危険なためです。

コードの可用性とパッケージの読み込み

何かコードを実行するならば、実行プロセスからそのコードが利用可能でなければなりません。例えば、Julia のプロンプトに次のように入力してみましょう:

julia> function rand2(dims...)
           return 2*rand(dims...)
       end

julia> rand2(2,2)
2×2 Array{Float64,2}:
 0.153756  0.368514
 1.15119   0.918912

julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]

プロセス1は rand2関数を知っていましたが、プロセス2は知りませんでした。

大抵の場合は、コードをファイルやパッケージからロードすることで、あなたはかなり柔軟にプロセスがロードするコードを制御することができます。下記のコードを含む、DummyModule.jl というファイルを考えてみましょう:

module DummyModule

export MyType, f

mutable struct MyType
    a::Int
end

f(x) = x^2+1

println("loaded")

end

すべてのプロセスで MyType を参照するには、すべてのプロセスに DummyModule.jl をロードする必要があります。 include("DummyModule.jl")を呼び出すと、単一のプロセスでのみ、コードが読み込まれます。すべてのプロセスにロードするには、@everywhereマクロを使用します(Juliaを julia -p 2 で開始して):

julia> @everywhere include("DummyModule.jl")
loaded
      From worker 3:    loaded
      From worker 2:    loaded

いつも(シングルコアのケース)と同様に、上記は、DummyModule を全てのプロセスでスコープ内に持ち込むものでありません。usingimport をする必要があります。さらに、あるプロセスで DummyModuleがスコープに入ったからと言って、他のプロセスのスコープに DummyModuleはありません:

julia> using .DummyModule

julia> MyType(7)
MyType(7)

julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: MyType not defined
⋮

julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)

ただし、DummyModule がスコープになくても、ロードされていれば、そのプロセスに対して、例えば、MyType を送信することは可能です:

julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)

また、ファイルは、Julia 起動時に、-Lフラグオプションで複数プロセスにプリロードでき、実行対象の"driver.jl" スクリプトが 計算を駆動することができます:

julia -p <n> -L file1.jl -L file2.jl driver.jl

上記の例で、ドライバスクリプトを実行している Julia プロセスは、対話型プロンプトを提供するプロンプトと同様にプロセスid 1です。

最後に、もし、DummyModule.jlが、スタンドアロンのファイルではなくパッケージだった場合 using DummyModuleとすると、DummyModule.jlのコードは全てのプロセスで ロード されますが、DummyModuleがスコープにはいるのは、usingが実行されたプロセスだけになります。

ワーカー プロセスの開始と管理

Julia をインストールすると、以下2種類のクラスタをサポートする機能が含まれています:

addprocs, rmprocs, workersなどの関数で、クラスタ内のプロセスを追加・削除・問い合わせするプログラミング手段が提供されます。

julia> using Distributed

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

Distributed モジュールは、addprocsを呼び出す前に、マスターとなるプロセスに明示的にロードする必要があります。 ワーカー プロセスで自動的に使用可能になります。

ワーカーは ~/.julia/config/startup.jlの起動スクリプトを実行せず、グローバル状態 (グローバル変数、新しいメソッド定義、読み込まれたモジュールなど) も他の実行中プロセスと同期されないことに注意が必要です。

他のタイプのクラスターは、ClusterManager セクションで説明するように、独自のClusterManager を記述することによってサポートできます。

データの移動

メッセージの送信とデータの移動は、分散プログラムのオーバーヘッドの大部分を占めています。パフォーマンスとスケーラビリティを実現するには、メッセージの数と送信されるデータ量を減らすことが重要です。 そのためには、Julia のさまざまな分散プログラミングコンストラクトによって実行されるデータ移動を理解することが重要です。

fetchは、オブジェクトをローカル マシンに移動することを直接要求するため、明示的なデータ移動操作と見なすことができます。@spawnat(およびいくつかの関連する構成要素) もデータを移動しますが、これは明示的でないため、暗黙的なデータ移動操作と呼ぶことができます。ランダム行列を構築および二乗する方法を以下の2つ考えてみましょう:

方法 1:

julia> A = rand(1000,1000);

julia> Bref = @spawnat :any A^2;

[...]

julia> fetch(Bref);

方法 2:

julia> Bref = @spawnat :any rand(1000,1000)^2;

[...]

julia> fetch(Bref);

違いは些細に見えますが、実際には@spawnat の動作を考えると非常に重要な違いです。 最初の方法では、ランダム行列がローカルで構築され、その後、別のプロセス送信されて、そこで2乗されます。2 番目の方法では、ランダム行列が別のプロセスで構築された後二乗されます。したがって、2 番目のメソッドのデータ送信量は、最初のメソッドでの送信量よりはるかに少ないです。

このToy example では、これら2つの方法を区別し、選択するのは簡単です。しかし、実際のプログラムでデータの移動を設計するには、より多くのことを考慮し、おそらくはなんらかの測定が必要になる場合があります。たとえば、最初のプロセスに行列 A が必要な場合は、最初のメソッドの方が適している可能性があります。または、A の計算が高価で、現在のプロセス(ローカルのプロセス)だけでそれが計算されているのならば、別のプロセスへのデータ転送は避けられないかもしれません。または、現在のプロセスで @spawnatfetch(Bref) の間で行うことはほとんどない場合は、並列処理を完全に排除する方が良いかもしれません。または、rand(1000,1000)がより高価な操作に置き換えられると想像してみてください。次に、この手順に@spawnat ステートメントを追加するのが理にかなっている場合があります。

グローバル変数

@spawnatを介してリモートで実行される式、または remotecall を使用してリモート実行用に指定されたクロージャは、グローバル変数を参照できます。モジュール Main のグローバル バインディングは、他のモジュールのグローバル バインディングとは少し異なる方法で扱われます。次のコード スニペットについて考えてみましょう:

A = rand(10,10)
remotecall_fetch(()->sum(A), 2)

この場合、sum はリモート プロセスで定義する必要があります。 A は、ローカル ワークスペースで定義されたグローバル変数であることに注意してください。ワーカープロセス 2 は、Mainの下で呼び出された変数Aを所有していません。クロージャー ()->sum(A) をワーカープロセス2 に送られた結果として、Main.A がワーカプロセス2に定義されます。 remotecall_fetch から制御がローカルに戻ったあとも、main.A はワーカープロセス 2 に存在し続けます。 埋め込みグローバル参照 (Mainモジュールのみ) を伴う、リモート呼び出しは、グローバル変数を下記のように管理します:

これでお分かりのように、グローバルに関連付けられたメモリはマスターで別の値が代入されたときにに収集される可能性がありますが、ワーカーではそのようなアクションは実行されません。バインディングが引き続き有効だからです。 clear! を使用すると、リモートノード上の特定のグローバルを、不要になったときに手動で nothingを再代入できます。これにより、通常のガベージ コレクション サイクルの一部として、それらに関連付けられているすべてのメモリが解放されます。

したがって、プログラムではリモート呼び出しでのグローバル参照をする時は、注意が必要です。実際には、可能であればリモートでのグローバル参照は完全に避けるのが好ましいです。グローバルを参照する必要がある場合は、let ブロックを使用してグローバル変数をローカライズすることを検討してください。

例えば:

julia> A = rand(10,10);

julia> remotecall_fetch(()->A, 2);

julia> B = rand(10,10);

julia> let B = B
           remotecall_fetch(()->B, 2)
       end;

julia> @fetchfrom 2 InteractiveUtils.varinfo()
name           size summary
––––––––– ––––––––– ––––––––––––––––––––––
A         800 bytes 10×10 Array{Float64,2}
Base                Module
Core                Module
Main                Module

ここでわかるように、グローバル変数 A は、ワーカー2で定義されていますが、Bはローカル変数として補足されるため、ワーカー2では Bのバインディングは存在しません。

並行マップと並行ループ

幸いにも、便利な並列計算の多くは、データの移動を必要としません。一般的な例は、複数のプロセスが独立したシミュレーション試行を同時に処理できるモンテカルロシミュレーションです。2つのプロセスでコインを反転させるために@spawnatを使用することができます。まず、count_heads.jl に次の関数を書きます:

function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

関数 count_heads は単に n 個のランダムビットの和をとります。2 台のマシンで試行を実行し、それぞれの結果の和をとる方法を次に説明します:

julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")

julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)

julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)

julia> fetch(a)+fetch(b)
100001564

この例で示されているのは、、並列プログラミングにおける、強力かつ頻出のパターンです。多くのイテレーションは複数のプロセスで独立に実行され、その結果は何らかの関数を使用して統合されます。その統合のプロセスはreduceと呼ばれます。その処理は、一般的にテンソルランク低減で、数値ベクトルが単一の数値に減少するとか、行列が単一の行または列などに減らされるような処理だからです。コードでは、通常、xがアキュムレータ、fがreduce関数、v[i] が削減される要素であるパターンで、 x= f(x,v[i])のようなコードになります。f は、操作が実行される順序に関係ない、つまり結合律を満たすことが望ましいです。

count_heads を使用したこのパターンの使用は一般化できることに注意してください。2 つの明示的な @spawnat ステートメントを使用して、並列処理を 2 つのプロセスに制限しました。任意の数のプロセスで実行するには、分散メモリで実行される parallel for loop を、次のように @distributed を使って書くことができます:

nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

このコンストラクトは、繰り返し処理を複数のプロセスに計算させて、その結果を、指定された reduce 処理(この場合は (+))を使って統合する 実装パターンを実現しています。各反復の結果としてループ内の最後の式の値が受け取られます。並列ループ式全体が、最終的な計算結果として評価されます。

並行処理の for ループは、逐次処理の for ループと似ているようにみえますが、その動作は大きく異なることに注意が必要です。特に、反復は指定された順序で行われないこと、反復が異なるプロセスで実行されるので、変数または配列への書き込みはグローバルからは見えないこと等に注意しましょう。並列ループ内で使用される変数すべては、各プロセスにコピーされ、ブロードキャストされます。

たとえば、次のコードは意図したとおりに機能しません:

a = zeros(100000)
@distributed for i = 1:100000
    a[i] = i
end

このコードは、a の全ての要素を初期化できません。各プロセスには個別のコピーがあるからです。このような並行 for ループは避けなければいけません。さいわい、 Shared Array を使用して、この制限を回避できます:

using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

並列ループにおける "外部" 変数の使用は、変数が読み取り専用の場合に全く問題有りません:

a = randn(1000)
@distributed (+) for i = 1:100000
    f(a[rand(1:end)])
end

ここでは、各反復は、すべてのプロセスで共有されるベクトル a からランダムに選択されたサンプルに f を適用します。

ご覧の通り、reduce 演算子は必要ない場合は省略できます。その場合、ループは非同期的に実行され、つまり使用可能なすべてのワーカーに対して独立したタスクが生成され、完了を待たずにすぐに Future の配列を返します。呼び出し元は、fetch を呼び出して Futureの完了を後で待つこともできますし、@sync をつけて、つまり、 @sync @distributed for ループを使って、ループの最後で完了を待つことができます。

場合によっては、reduce 演算子は必要なく、ある範囲のすべての整数 (または一般的には、一部のコレクション内のすべての要素) に関数を適用するだけということもあるでしょう。これは、パラレル マップと呼ばれる便利な処理で、Julia ではpmap 関数として実装されています。たとえば、複数の大きなランダム行列の特異値を次のように並列に計算できます:

julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];

julia> pmap(svdvals, M);

Julia のpmapは、1回の関数呼び出しのなかで大量の処理を行う場合を想定して設計されています。対して、@distributed for は、各反復が小さく、おそらく単に 2 つの数値を合計するだけのような状況を処理できます。並列計算にはpmap@distributed for の両方でワーカー プロセスのみが使用されます。@distributedの場合、最終的なreduce処理は呼び出しプロセスで行われます。

リモート参照と抽象チャネル

リモート参照は常にAbstractChannelの実装を指します。

抽象チャンネル(「チャネル」など)の具体的な実装は、put!take!fetchisreadywait をインストールするのに必要です。 Future が参照するリモートオブジェクトは、 Channel{Any}(1) すなわち サイズ1でAny型を保持するチャンネルです。

RemoteChannelは、再書き込み可能であり、チャネルの任意のタイプとサイズ、またはAbstractChannelの他の実装を指すことができます。

コンストラクタ RemoteChannel(f::Function,pid)() を使用すると、特定の型の複数の値を保持するチャネルへの参照を構築できます。fpid で実行される関数であり、AbstractChannel を返す必要があります。

たとえば、RemoteChannel(()->Channel{Int}(10), pid) は、Int型でサイズ 10のチャネルへの参照を返します。 チャネルはワーカー pid に存在します。

RemoteChannel上のメソッド put!, take!, fetch, isready and wait はリモートプロセスのバッキングストアにプロキシされます。

よって、RemoteChannel はユーザーが実装した AbstractChannel オブジェクトを参照するのに使うことができます。これのシンプルな例はExamples repositorydictchannel.jlに見ることができます。これは辞書をリモートストアとして辞書を使うような例です。

チャネルとリモートチャネル

上記のチャネル例は、以下に示すように、プロセス間通信用に変更できます。

ここでは、ジョブを格納する 単一のリモートチャネルを処理するために4つのワーカーを立ち上げます。ID (job_id) によって識別されるジョブは、チャネルに書き込まれます。このシミュレーションでリモートで実行している各タスクは job_idを読み取り、ランダムな時間だけ待機し、job_idのタプル、時間、および独自のpidを "results" チャネルに書き戻します。最後に、すべての結果がマスタープロセスにプリント出力されます。

julia> addprocs(4); # add worker processes

julia> const jobs = RemoteChannel(()->Channel{Int}(32));

julia> const results = RemoteChannel(()->Channel{Tuple}(32));

julia> @everywhere function do_work(jobs, results) # define work function everywhere
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # simulates elapsed time doing actual work
               put!(results, (job_id, exec_time, myid()))
           end
       end

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> @async make_jobs(n); # feed the jobs channel with "n" jobs

julia> for p in workers() # start tasks on the workers to process requests in parallel
           remote_do(do_work, p, jobs, results)
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time, where = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
           global n = n - 1
       end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741

リモート参照と分散ガベージ コレクション

リモート参照で参照されるオブジェクトは、クラスター内に保持されている全ての 参照が削除された場合にのみ解放できます。

値が格納されているノードは、どのワーカーがその値を参照しているかを追跡します。 RemoteChannelまたは(フェッチされていない)Futureがワーカーにシリアル化されるたびに、参照によって指し示されたノードが通知されます。また、RemoteChannelまたは(フェッチされていない)Futureがローカルに収集されるたびに、値を所有するノードが再度通知されます。これは、内部クラスター対応シリアライザーで実装されます。リモート参照は、実行中のクラスターのコンテキストでのみ有効です。通常の IOオブジェクトからの参照、通常の IOオブジェクトへの参照のシリアル化と逆シリアル化はサポートされていません。

通知は、「トラッキング」メッセージの送信を介して行われます。このメッセージとは、参照が別のプロセスにシリアル化されるときには、「参照の追加」メッセージであり、参照がローカルのガベージ コレクションで処理されたときには「参照を削除」メッセージに相当します。

Futureは、書き込みが一度で、ローカルにキャッシュされるので、 fetchの振る舞いは、その値を所有しているノードの参照追跡情報の更新も行います。

値を所有するノードは、値へのすべての参照がクリアされると、値(のメモリを)解放します。

Futureを使用すると、既にフェッチされた Future を別のノードにシリアル化すると、(参照だけでなく)値そのものも送信されます。元のリモート ストアが現時点までの間に値を収集した可能性があるためです。

いつオブジェクトがローカルに収集されるかは、オブジェクトのサイズとシステム内の現在のメモリ負荷によって異なる、ということは注意すべきです。

リモート参照の場合、リモートノードにある値が非常に大きくなる可能性があるのですが、ローカル参照オブジェクトのサイズは非常に小さくなります。ローカルオブジェクトはすぐに収集されないことがあるため、RemoteChannelのローカルインスタンスや、フェッチされていないFutureに対して、finalize を明示的に呼ぶのはよいアイディアです。Futureに対してfetchを呼ぶと、その参照もリモートストアから削除されてしまうので、フェッチされたFutureについては、finalizeする必要はありません。明示的にfinalizeを呼ぶとリモートノードに対してすぐにメッセージが送信されて、処理が先に進み、値への参照が削除されます。

いったんファイナライズされると、参照は無効になり、それ以降の呼び出しでは使用できません。

ローカル呼び出し

データは、処理が実行されるリモートノードに必ずコピーされます。これは、remotecallされた場合、データがRemoteChannel に格納されたり別のノードで Futureが呼ばれた場合でも同様です。予想できるように、これはリモートノードでのシリアル化されたオブジェクトのコピーとなります。すなわち、呼び出しを行うプロセスのIDと、リモートのノードのIDは同じで、ローカル呼び出しとして実行されます。大抵の場合(常に、ではありません)は、別のタスクで実行され、しかしそこではデータのシリアライズ/逆シリアライズの処理は発生しません。呼び出しでは、受け渡されたオブジェクトインスタンスと全く同じものを参照して、コピーはつくられません。ここで述べた振る舞いは、以下の例でよく分かります:

julia> using Distributed;

julia> rc = RemoteChannel(()->Channel(3));   # RemoteChannel created on local node

julia> v = [0];

julia> for i in 1:3
           v[1] = i                          # Reusing `v`
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[3], [3], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1

julia> addprocs(1);

julia> rc = RemoteChannel(()->Channel(3), workers()[1]);   # RemoteChannel created on remote node

julia> v = [0];

julia> for i in 1:3
           v[1] = i
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[1], [2], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3

ご覧の通り、ローカルで所有されたRemoteChannelv に対してput! を(複数回)呼ぶ場合、その複数回の呼び出しの間に v を修正すると、単一のオブジェクトインスタンスが格納されます。対象的に、rc を所有するノードが別のノードである時、(その時々の)vのコピーが生成されるのと対象的です。

この振る舞いは、一般的には大した問題にはならないと思ってください。この振る舞いを考慮する必要があるとすれば、それは、オブジェクトがローカルに格納され、その後 (put!などの)呼び出しの後で、そのオブジェクトが変更された状況だけです。このような場合は、オブジェクトの「ディープコピー」を保存するのが適切でしょう。

これは、次の例に示すように、ローカル ノードのremotecallにも当てはまります:

julia> using Distributed; addprocs(1);

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v);     # Executed on local node

julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node

julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false

ここでもまたご覧の通り、ローカル ノードへの remotecall は、直接呼び出しと同様に機能します。 ローカルノードへの remotecall は、引数として渡されたローカル オブジェクトを変更します。リモート呼び出しでは、引数のコピーに対する処理になります。

繰り返しますが、一般的にこのふるまいが問題になることはありません。もし、ローカル ノードがコンピューティング ノードとしても使用され、引数が ローカル呼び出しの後にも使用されるとすれば、ここで触れた振る舞いを考慮する必要があります。具体的には、必要に応じて引数のディープコピーを用意してローカルノードで呼び出された呼び出しに渡す必要があります。しかし、リモート ノードの呼び出しは、常に引数のコピーで動作します。

共有配列

共有アレイは、システムの共有メモリを使用して、複数のプロセスで同じ配列をマップします。 SharedArray は、 DArray とはいくつかの類似点があるものの、 その動作は全く異なります。 DArray では、 各プロセスはデータチャンクに対するローカルアクセス権を持ち、 2つのプロセスが同じチャンクを共有しません。 対照的に、 SharedArray では、配列の共有に「参加している」プロセスはアレイ全体にアクセスできます。 SharedArray は、同じマシン上の複数のプロセスが共同で、大量のデータアクセスさせたい場合に適しています。

共有配列にサポートするには、参加するすべてのワーカーで SharedArray モジュールを明示的にロードする必要があります。

SharedArray の添字アクセス (値の代入と参照) は通常の配列と同様に機能します。また、実際に値が格納されているメモリにローカル プロセスからアクセスできるため効率的です。したがって、(マルチプロセス想定で実装された) ほとんどのアルゴリズムは、単一プロセスモードだとしても SharedArray 上で自然に動作します。アルゴリズムに Array 入力が必要な場合、実際に値を格納している配列はsdata を呼び出すことによって SharedArray から取得できます。他の AbstractArray 型の場合は、 sdata はオブジェクトそのものを返します。なので、( SharedArray かどうか気にせず) Array 型オブジェクトで sdata を使用しても安全です。

共有配列のコンストラクタは、次の形式で書きます:

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

このコンストラクタは、ビット型 T を要素にもち、dims で指定されるサイズの N次元共有配列を pids で指定されたプロセス全体で作成します。分散配列とは異なり、共有配列はpids 名前付き引数で指定されている 共有に参加しているワーカーからのみアクセスできます (同じホスト上にある場合はワーカーを作成したメインプロセスからもアクセス可能です)。

シグネチャ initfn(S:SharedArray)init 関数が指定されている場合は、その初期化関数は、参加しているすべてのワーカーで呼び出されます。各ワーカーが配列の別個の部分で init 関数を実行し、初期化を並列化することを指定できます。

簡単な例を次に示します:

julia> using Distributed

julia> addprocs(3)
3-element Array{Int64,1}:
 2
 3
 4

julia> @everywhere using SharedArrays

julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = myid())
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

SharedArrays.localindices は、インデックスの 1 次元のなかのあるインデックス範囲をコア間で重なりなく提供します。プロセス間でタスクを分割するのに便利なことがあります。もちろん、作業を任意の方法で分割することもできます:

julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = myid())
3×4 SharedArray{Int64,2}:
 2  2  2  2
 3  3  3  3
 4  4  4  4

すべてのプロセスは実データにアクセスできるため、競合が発生しないよう注意する必要があります。例えば:

@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)
        end
    end
end

上記のコードの動作は未定義です。各プロセスは配列全要素に自分自身のpidの数字を代入しようとするため、(それぞれのSの要素に対して)代入計算を最後に実行したプロセスのpidを保持することになります。

より発展的・複雑な例として、次の "カーネル" を並行して実行する場合を考えてみましょう:

q[i,j,t+1] = q[i,j,t] + u[i,j,t]

この場合、1 次元インデックスを使用して作業を分割しようとすると、次のような問題が発生する可能性があります: q[i,j,t] があるワーカーに割り当てられたブロックの終わり付近にあり、q[i,j,t+1] が別のワーカーに割り当てられたブロックの先頭付近にある場合、q[i,j,t] が計算に必要な時点で、準備出来ていない可能性が非常に高いです。このような場合は、手動で配列をチャンクする方が良いでしょう。2 番目の次元に沿って分割してみましょう。 このワーカーに割り当てられた (irange,jrange) インデックスを返す関数を定義します:

julia> @everywhere function myrange(q::SharedArray)
           idx = indexpids(q)
           if idx == 0 # This worker is not assigned a piece
               return 1:0, 1:0
           end
           nchunks = length(procs(q))
           splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
           1:size(q,1), splits[idx]+1:splits[idx+1]
       end

次に、カーネルを定義します:

julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
           @show (irange, jrange, trange)  # display so we can see what's happening
           for t in trange, j in jrange, i in irange
               q[i,j,t+1] = q[i,j,t] + u[i,j,t]
           end
           q
       end

また、SharedArray 実装に利便なラッパーも定義します

julia> @everywhere advection_shared_chunk!(q, u) =
           advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)

さぁ、3 つの異なるバージョンを比較してみましょう。1つ目は1つのコアで実行されるもの:

julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);

もうひとつは、 @distributed を使うもの:

julia> function advection_parallel!(q, u)
           for t = 1:size(q,3)-1
               @sync @distributed for j = 1:size(q,2)
                   for i = 1:size(q,1)
                       q[i,j,t+1]= q[i,j,t] + u[i,j,t]
                   end
               end
           end
           q
       end;

そして、チャンク単位で別々のコアに計算を任せるもの:

julia> function advection_shared!(q, u)
           @sync begin
               for p in procs(q)
                   @async remotecall_wait(advection_shared_chunk!, p, q, u)
               end
           end
           q
       end;

SharedArray を作成し、これらの関数の実行時間を測定すると、次の結果が得られます ( julia -p 4 で実行したとします):

julia> q = SharedArray{Float64,3}((500,500,500));

julia> u = SharedArray{Float64,3}((500,500,500));

JITコンパイルのために一度関数を実行してから、二回目の実行で測定します:

julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
 830.220 milliseconds (216 allocations: 13820 bytes)

julia> @time advection_parallel!(q, u);
   2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

julia> @time advection_shared!(q,u);
        From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
        From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
        From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
        From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
 238.119 milliseconds (2264 allocations: 169 KB)

advection_shared! の最大の利点は、ワーカー間のトラフィックを最小限に抑え、割り当てられたピースに対して長時間計算できるようにすることです。

共有配列と分散ガベージ コレクション

リモート参照と同様に、共有配列もまた、配列が生成されたノードのガベージ コレクションに依存します。ここでいうガベージコレクションは、その配列を共有しているすべてのワーカーからの参照を解放するものです。サイズが小さい共有配列をたくさん生成するようなコードは、これらのオブジェクトをできるだけ早く明示的にファイナライズすると、得るものが多いでしょう。結果、メモリハンドル・ファイルハンドルの両方で、より早く解放される共有セグメントをマッピングしてくれます。

クラスタマネージャ

Julia プロセスの起動、管理、および論理クラスタへのネットワーキングは、クラスタ マネージャを介して行われます。ClusterManager が担当するのは

Julia クラスターには、次の特徴があります:

(組み込みの TCP/IP トランスポートを使用して) ワーカー間の接続は、次の方法で確立されます:

既定のトランスポート層では、プレーン TCPSocketが使われますが、Julia クラスターが独自のトランスポートを提供する場合があります。

Julia は、次の 2 つの組み込みクラスタ マネージャを提供します:

LocalManager は、同じホスト上で追加のワーカーを起動するために使用され、マルチコアおよびマルチプロセッサ ハードウェアを活用します。

したがって、最小限のクラスタ マネージャーは次のことが必要になります:

addprocs(manager::FooManager) が正しく処理されるためには FooManager に次の関数が実装されている必要があります:

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

例として、同じホストでワーカーを開始するマネージャLocalManagerがどのように実装されているかを見てみましょう:

struct LocalManager <: ClusterManager
    np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

launch メソッドは、次の引数を受け取ります:

launch メソッドは、独立したタスクで非同期的に呼び出されます。このタスクが終了したということは、要求されたすべてのワーカーが起動されたということです。したがって、launch 関数は、要求されたすべてのワーカーが起動されるとすぐに終了する必要があります。

新しく起動されたワーカーは、互いにワーカー同士で、そしてマスタープロセスと 網羅的に接続されます。 コマンドライン引数 --worker[=<cookie>]を指定すると、起動されたプロセスが自分自身をワーカーとして初期化し、接続がTCP/IP ソケットを介してセットアップされます。

クラスター内のすべてのワーカーは、マスターと同じ cookie を共有します。 クッキーが指定されていない、つまり--workerオプションを使用すると、ワーカーは標準入力からクッキーを読み取ろうとします。 LocalManagerSSHManagerの両方が、新しく立ち上げたワーカーに標準入力を介してクッキーを渡します。

既定では、ワーカーは getipaddr()の呼び出しによって返されるアドレスで空きポートをリッスンします。 リッスンする特定のアドレスは、オプション引数 --bind-to bind_addr[:port]で指定できます。 これは、マルチホームド(複数のネットワークアドレスを持つ)ホストでの利用に役立ちます。

TCP/IPトランスポート以外の例としては、MPI を利用した実装が考えられます。MPI利用においては、--worker オプションによる指定を使っていはいけません。全ての並行処理の構成要素が用いられる前に、新しく起動したワーカーは、init_worker(cookie) を呼び Cookieの指定をしなくてはなりません。

起動するすべてのワーカーに対して、launch メソッドは、(適切なフィールドを初期化した) workerConfig オブジェクト を launched に追加する必要があります

mutable struct WorkerConfig
    # Common fields relevant to all cluster managers
    io::Union{IO, Nothing}
    host::Union{AbstractString, Nothing}
    port::Union{Integer, Nothing}

    # Used when launching additional workers at a host
    count::Union{Int, Symbol, Nothing}
    exename::Union{AbstractString, Cmd, Nothing}
    exeflags::Union{Cmd, Nothing}

    # External cluster managers can use this to store information at a per-worker level
    # Can be a dict if multiple fields need to be stored.
    userdata::Any

    # SSHManager / SSH tunnel connections to workers
    tunnel::Union{Bool, Nothing}
    bind_addr::Union{AbstractString, Nothing}
    sshflags::Union{Cmd, Nothing}
    max_parallel::Union{Integer, Nothing}

    # Used by Local/SSH managers
    connect_at::Any

    [...]
end

WorkerConfig のフィールドのほとんどは、組み込みのマネージャによって使用されます。独自実装のクラスタ マネージャーは、通常、io または host / port のみを指定します:

manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) is called at different times during the worker's lifetime with appropriate op values:

カスタム トランスポートを使用したクラスタ マネージャ

Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a little more involved. Each Julia process has as many communication tasks as the workers it is connected to. For example, consider a Julia cluster of 32 processes in an all-to-all mesh network:

Replacing the default transport requires the new implementation to set up connections to remote workers and to provide appropriate IO objects that the message-processing loops can wait on. The manager-specific callbacks to be implemented are:

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

The default implementation (which uses TCP/IP sockets) is implemented as connect(manager::ClusterManager, pid::Integer, config::WorkerConfig).

connect should return a pair of IO objects, one for reading data sent from worker pid, and the other to write data that needs to be sent to worker pid. Custom cluster managers can use an in-memory BufferStream as the plumbing to proxy data between the custom, possibly non-IO transport and Julia's in-built parallel infrastructure.

A BufferStream is an in-memory IOBuffer which behaves like an IO–it is a stream which can be handled asynchronously.

The folder clustermanager/0mq in the Examples repository contains an example of using ZeroMQ to connect Julia workers in a star topology with a 0MQ broker in the middle. Note: The Julia processes are still all logically connected to each other–any worker can message any other worker directly without any awareness of 0MQ being used as the transport layer.

When using custom transports:

kill(manager, pid, config) is called to remove a worker from the cluster. On the master process, the corresponding IO objects must be closed by the implementation to ensure proper cleanup. The default implementation simply executes an exit() call on the specified remote worker.

The Examples folder clustermanager/simple is an example that shows a simple implementation using UNIX domain sockets for cluster setup.

LocalManager と SSHManager のネットワーク要件

Julia のクラスターは、ローカルラップトップ、部門のクラスタ、クラウドなどのインフラ上で既にセキュリティで保護されている環境を想定して設計されています。 この節では、組み込みのLocalManagerSSHManagerのネットワークのセキュリティ要件について説明します:

クラスタークッキー

All processes in a cluster share the same cookie which, by default, is a randomly generated string on the master process:

Note that environments requiring higher levels of security can implement this via a custom ClusterManager. For example, cookies can be pre-shared and hence not specified as a startup argument.

ネットワーク トポロジの指定 (実験的実装)

The keyword argument topology passed to addprocs is used to specify how the workers must be connected to each other:

Keyword argument lazy=true|false only affects topology option :all_to_all. If true, the cluster starts off with the master connected to all workers. Specific worker-worker connections are established at the first remote invocation between two workers. This helps in reducing initial resources allocated for intra-cluster communication. Connections are setup depending on the runtime requirements of a parallel program. Default value for lazy is true.

Currently, sending a message between unconnected workers results in an error. This behaviour, as with the functionality and interface, should be considered experimental in nature and may change in future releases.

注目に値する外部パッケージ

Outside of Julia parallelism there are plenty of external packages that should be mentioned. For example MPI.jl is a Julia wrapper for the MPI protocol, or DistributedArrays.jl, as presented in Shared Arrays. A mention must be made of Julia's GPU programming ecosystem, which includes:

  1. Low-level (C kernel) based operations OpenCL.jl and CUDAdrv.jl which are respectively an OpenCL interface and a CUDA wrapper.

  2. Low-level (Julia Kernel) interfaces like CUDAnative.jl which is a Julia native CUDA implementation.

  3. High-level vendor-specific abstractions like CuArrays.jl and CLArrays.jl

  4. High-level libraries like ArrayFire.jl and GPUArrays.jl

In the following example we will use both DistributedArrays.jl and CuArrays.jl to distribute an array across multiple processes by first casting it through distribute() and CuArray().

Remember when importing DistributedArrays.jl to import it across all processes using @everywhere

$ ./julia -p 4

julia> addprocs()

julia> @everywhere using DistributedArrays

julia> using CuArrays

julia> B = ones(10_000) ./ 2;

julia> A = ones(10_000) .* π;

julia> C = 2 .* A ./ B;

julia> all(C .≈ 4*π)
true

julia> typeof(C)
Array{Float64,1}

julia> dB = distribute(B);

julia> dA = distribute(A);

julia> dC = 2 .* dA ./ dB;

julia> all(dC .≈ 4*π)
true

julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}

julia> cuB = CuArray(B);

julia> cuA = CuArray(A);

julia> cuC = 2 .* cuA ./ cuB;

julia> all(cuC .≈ 4*π);
true

julia> typeof(cuC)
CuArray{Float64,1}

Keep in mind that some Julia features are not currently supported by CUDAnative.jl [2] , especially some functions like sin will need to be replaced with CUDAnative.sin(cc: @maleadt).

In the following example we will use both DistributedArrays.jl and CuArrays.jl to distribute an array across multiple processes and call a generic function on it.

function power_method(M, v)
    for i in 1:100
        v = M*v
        v /= norm(v)
    end

    return v, norm(M*v) / norm(v)  # or  (M*v) ./ v
end

power_method repeatedly creates a new vector and normalizes it. We have not specified any type signature in function declaration, let's see if it works with the aforementioned datatypes:

julia> M = [2. 1; 1 1];

julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877

julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)

julia> cuM = CuArray(M);

julia> cuv = CuArray(v);

julia> curesult = power_method(cuM, cuv);

julia> typeof(curesult)
CuArray{Float64,1}

julia> dM = distribute(M);

julia> dv = distribute(v);

julia> dC = power_method(dM, dv);

julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}

To end this short exposure to external packages, we can consider MPI.jl, a Julia wrapper of the MPI protocol. As it would take too long to consider every inner function, it would be better to simply appreciate the approach used to implement the protocol.

Consider this toy script which simply calls each subprocess, instantiate its rank and when the master process is reached, performs the ranks' sum

import MPI

MPI.Init()

comm = MPI.COMM_WORLD
MPI.Barrier(comm)

root = 0
r = MPI.Comm_rank(comm)

sr = MPI.Reduce(r, MPI.SUM, root, comm)

if(MPI.Comm_rank(comm) == root)
   @printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()
mpirun -np 4 ./julia example.jl
[1]

In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding rma to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see https://mpi-forum.org/docs.

[2]

Julia GPU man pages