目次
- 前書き プロデューサー/コンシューマータスク
- 仕事の始まり。チャネル
- チャネルからの消費。ChannelReader
- チャンネルへの録音。チャンネルライター
- 割り当てなしの非同期
- IValueTaskSourceインターフェイス
- CompareExchangeについて少し
- スタックダイブ問題
- AsyncOperation-実装の詳細
前書き
プロデューサー/コンシューマーの問題は、プログラマーの途中でかなり頻繁に、12年以上にわたって発生します。 Edsger Dijkstra自身もこの問題の解決に一役かっていました。彼は、プロデューサー/コンシューマーベースで作業を整理するときに、セマフォを使用してスレッドを同期させるというアイデアを思いつきました。そして、その最も単純な形式のソリューションは既知であり、取るに足らないものですが、現実の世界では、このパターン(製造元/消費者)ははるかに複雑な形式で見つけることができます。また、最新のプログラミング標準はその痕跡を残し、コードはより単純化された方法で記述され、再利用のために分解されます。品質の高いコードを書くためのしきい値を下げ、このプロセスを簡素化するために、すべてが行われています。そして問題の名前空間-System.Threading.Channels-は、この目標に向けた次のステップです。
少し前にSystem.IO.Pipelinesを見ていました。より注意深い作業と問題の深い理解が必要であり、SpanとMemoryが使用されました。効率的な作業のためには、明らかなメソッドを呼び出さず(不必要なメモリ割り当てを回避するため)、常にバイト単位で考える必要がありました。このため、Pipelineプログラミングインターフェイスは重要で直感的ではありませんでした。
System.Threading.Channelsは、操作するはるかに単純なAPIをユーザーに提供します。 APIは単純ですが、このツールは高度に最適化されており、作業中にメモリを割り当てられない可能性が高いことに言及する価値があります。おそらくこれは、ValueTaskが内部で広く使用されているためであり、実際の非同期の場合でも、IValueTaskSourceが使用されます。、これはさらなる操作のために再利用されます。これは、正確にチャネルの実装全体の関心事です。
チャネルは一般化されており、一般化のタイプは、ご想像のとおり、インスタンスが生成および消費されるタイプです。興味深いことに、1行に収まるChannelクラスの実装(github source ):
namespace System.Threading.Channels
{
public abstract class Channel<T> : Channel<T, T> { }
}
したがって、チャネルのメインクラスは、プロデューサーチャネルとコンシューマーチャネルの2つのタイプでパラメーター化されます。しかし、実現されたチャネルでは、これは使用されません。
パイプラインに精通している人にとって、開始するための一般的なアプローチはなじみがあるように見えます。つまり。製造元(ChannelWriter)と消費者(ChannelReader)を個別に取得する1つの中央クラスを作成します。名前にかかわらず、これはまさにプロデューサー/コンシューマーであり、同じ名前の別の従来のマルチスレッドタスクのリーダー/ライターではないことを覚えておく価値があります。ChannelReaderは、利用できなくなった一般チャネルの状態を変更します(値を引き出します)。だから彼はむしろ読んでいないが、消費しています。ただし、実装については後で詳しく説明します。
仕事の始まり。チャネル
チャネルの使用を開始するには、抽象Channel <T>クラスと、最も適切な実装を作成する静的Channelクラスから始めます。さらに、この共通チャネルから、チャネルに書き込むためのChannelWriterと、チャネルから消費するためのChannelReaderを取得できます。チャネルはChannelWriterおよびChannelReaderの一般情報のリポジトリであるため、チャネルに格納されるのはすべてのデータです。そして、すでにそれらの記録または消費のロジックはChannelWriterとChannelReaderに分散されています。最初のものは実装が簡単です。メモリが許す限り、制限なく書き込むことができます。後者は、レコード数の特定の最大値に制限されています。
ここで、非同期の性質が少し異なります。無制限のチャネルでは、記録操作は常に同期的に終了します。チャネルからの記録を停止する可能性のあるものはありません。限られたチャンネルでは状況が異なります。標準動作(置き換え可能)では、チャネル内に新しいインスタンスの余地がある限り、書き込み操作は同期的に終了します。チャネルがいっぱいになるとすぐに、(コンシューマがコンシュームを消費した後)スペースが解放されるまで、書き込み操作は終了しません。したがって、ここでの操作は、フローの変更および関連する変更とは本当に非同期になります(または、後で説明する変更なし)。
ほとんどの場合、リーダーの動作は同じです。チャネルに何かがある場合、リーダーは単にそれを読み取り、同期して終了します。何もない場合、彼は誰かが何かを書き留めることを期待します。
Channel静的クラスには、上記のチャネルを作成するための4つのメソッドが含まれています。
Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);
必要に応じて、チャネルを作成するためのより正確なオプションを指定できます。これにより、指定されたニーズに合わせてチャネルを最適化できます。
UnboundedChannelOptionsには、デフォルト値がfalseである3つのプロパティが含まれています。
- AllowSynchronousContinuations — , , . -. , . , , , . , , , . , - - , ;
- SingleReader — , . , ;
- SingleWriter — , ;
BoundedChannelOptionsには、同じ3つのプロパティと、さらに2つのプロパティが含まれています。
- AllowSynchronousContinuations-同じ;
- SingleReader-同じ;
- SingleWriterも同じです。
- 容量-チャネルに配置されたレコードの数。このパラメーターはコンストラクターパラメーターでもあります。
- FullMode-4つのオプションを持つBoundedChannelFullMode列挙体は、満たされたチャネルに書き込もうとするときの動作を決定します。
- 待機-空き領域が非同期操作を完了するまで待機します
- DropNewest-書き込まれるアイテムは、最新の既存のものを上書きし、同期的に終了します
- DropOldest-記録可能なアイテムは、既存のエンドの最も古いものを同期的に上書きします
- DropWrite-書き込まれるアイテムは書き込まれず、同期的に終了します
渡されたパラメーターと呼び出されたメソッドに応じて、SingleConsumerUnboundedChannel、UnboundedChannel、BoundedChannelの3つの実装のいずれかが作成されます。ただし、基本クラスChannel <TWrite、TRead>を通じてチャネルを使用するため、これはそれほど重要ではありません。
2つのプロパティがあります。
- ChannelReader <TRead>リーダー{get; 保護されたセット。}
- ChannelWriter <TWrite> Writer {get; 保護されたセット。}
また、ChannelReader <TRead>およびChannelWriter <TWrite>への暗黙的な型キャストの2つの演算子。
チャネルの使用開始の例:
Channel<int> channel = Channel.CreateUnbounded<int>();
//
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader;
//
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;
データはキューに格納されます。3つのタイプの場合、3つの異なるキューが使用されます-ConcurrentQueue <T>、Deque <T>およびSingleProducerSingleConsumerQueue <T>。この時点で、私は時代遅れになり、新しい最も単純なコレクションの束を見逃したように思えました。しかし、私はがっかりすることを急いだ-彼らは皆のためではない。内部としてマークされているため、使用しても機能しません。しかし、製品で突然必要になった場合は、ここ(SingleProducerConsumerQueue)とここ(Deque)で見つけることができます。後者の実装は非常に簡単です。知り合うことをお勧めします。これは非常にすばやく学習できます。
それでは、ChannelReaderとChannelWriter、および興味深い実装の詳細を直接調べてみましょう。これらはすべて非同期に要約され、IValueTaskSourceを使用したメモリ割り当てはありません。
ChannelReader-コンシューマー
コンシューマーオブジェクトが要求されると、ChannelReader <T>抽象クラスの実装の1つが返されます。繰り返しますが、APIパイプラインとは異なり、それは単純であり、メソッドはほとんどありません。実際にそれを使用する方法を理解するには、メソッドのリストを知っている必要があります。
方法:
- 仮想get-onlyプロパティTask Completion {get;
チャネルが閉じたときに完了するTaskタイプのオブジェクト。 - 仮想の取得専用プロパティint Count {get; }
ここでは、読み取りのために使用可能なオブジェクトの現在の数が返されることに留意すべきです。 - 仮想の取得専用プロパティbool CanCount {get; }
Countプロパティが利用可能であるかどうかを示します。 - bool TryRead(out T item)
. bool, , . out ( null, ); - ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
ValueTask true, , . ValueTask false, ( ); - ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
. , . .
, TryRead WaitToReadAsync. ( cancelation tokens), — TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . — , WaitToReadAsync, , , .
, , - .
ChannelWriter-メーカー
すべてがコンシューマーに似ているので、すぐにメソッドを見てみましょう。
- 仮想メソッドbool TryComplete(Exception?Error = null)
チャネルに完了のマークを付けようとします。これ以上データが書き込まれないことを示します。チャネルの終了を引き起こした例外は、オプションのパラメーターとして渡すことができます。正常に完了した場合はtrueを返し、それ以外の場合はfalse(チャネルがすでに完了しているか、終了をサポートしていない場合)を返します。 - 抽象メソッドbool TryWrite(T item)
チャネルへの値の書き込みを試みます。成功した場合はtrue、失敗した場合はfalseを返します - 抽象メソッドValueTask <bool> WaitToWriteAsync(CancellationToken cancelToken = default)
値がtrueのValueTaskを返します。これは、チャネルに記録する場所があるときに終了します。チャネルへの書き込みが許可されなくなった場合、値はfalseになります。 - 仮想メソッドValueTask WriteAsync(Tアイテム、CancellationToken cancelToken =デフォルト)
非同期でチャネルに書き込みます。たとえば、チャネルがいっぱいの場合、操作は実際には非同期であり、このレコードのスペースを解放した後でのみ完了します。 - メソッドvoid Complete(Exception?Error = null)
チャネルをTryCompleteで完了としてマークしようとし、失敗した場合は例外をスローします。
上記の小さな例(簡単に独自の実験を開始するため):
Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();
// ,
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;
//
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
// , , ,
writer.Complete();
//
int valueFromChannel = await reader.ReadAsync();
次に、最も興味深い部分に移りましょう。
割り当てなしの非同期
コードを書いて検討する過程で、これらすべての操作の実装にはほとんど何も面白くないことに気づきました。一般的に、このように記述することができます。競合するコレクションを使用した不要なロックや、メモリを節約する構造であるValueTaskの豊富な使用を回避します。ただし、PC上のすべてのファイルを調べて、すべてのタスクをValueTaskに置き換えることは、すぐに置き換える価値がないことを忘れないでください。ほとんどの場合、操作が同期的に完了する場合にのみ意味があります。結局のところ、私たちが覚えているように、非同期ではスレッドを変更することはかなり可能です。つまり、スタックは以前と同じではなくなります。とにかく、生産性の分野の真の専門家は知っています-問題が発生する前に最適化しないでください。
良い点の1つは、自分を専門家として登録しないことです。そのため、メモリ割り当てなしで非同期コードを作成する秘訣は何なのかを理解するときです。しかし、それも起こります。
IValueTaskSourceインターフェイス
.netコア2.0で追加され、2.1で修正され たValueTask構造から始めましょう。この構造の内部では、オブジェクト_objの狡猾なフィールドが非表示になっています。自明の名前に基づいて、このフィールドで3つのものの1つを非表示にできることは簡単に推測できます-null、タスク/タスク<T>またはIValueTaskSource。実際、これはValueTaskの作成方法に由来しています。
メーカーが保証するように、この構造は明らかにawaitキーワードでのみ使用する必要があります。つまり、同じValueTaskに何度もawaitを適用したり、コンビネーターを使用したり、いくつかの継続を追加したりするべきではありません。また、ValueTaskから結果を2回以上取得しないでください。そして、これは私たちが理解しようとしているという事実によるものです-メモリを割り当てずにこれらすべてのものを再利用します。
IValueTaskSource インターフェイスについてはすでに説明しました。メモリを節約するのを助けるのは彼です。これは、多くのタスクでIValueTaskSource自体を数回再利用することによって行われます。しかし、まさにこの再利用のために、ValueTaskにふける方法はありません。
したがって、IValueTaskSource。このインターフェイスには3つのメソッドがあり、実装することで、これらの貴重なバイトの割り当てでメモリと時間を節約できます。
- GetResult-実行時に非同期メソッド用に形成された状態マシンが結果を必要とするときに1回呼び出されます。ValueTaskには、同じ名前のインターフェースメソッドを呼び出すGetResultメソッドがあり、これを思い出すと、_objフィールドに格納できます。
- GetStatus-オペレーションのステータスを決定するためにステートマシンによって呼び出されます。また、ValueTaskを介して。
- OnCompleted-再び、その時点で未解決のタスクに継続を追加するために状態マシンによって呼び出されます。
しかし、シンプルなインターフェースにもかかわらず、実装にはある程度のスキルが必要になります。そして、ここで私たちが何から始めたかについて覚えています- チャンネル。この実装はAsyncOperationクラスを使用しますこれはIValueTaskSourceの実装です。このクラスは、内部アクセス修飾子の後ろに隠されています。しかし、これは基本的なメカニズムを理解するために止まるわけではありません。これは疑問を投げかけます、なぜ大衆にIValueTaskSourceの実装を与えないのですか?最初の理由(おもしろい)は、ハンマーが手にあり、どこにでも釘があり、IValueTaskSource実装が手にある場合、どこにでもメモリを使った読み書きができない作業があります。 2番目の理由(もっともらしい)は、インターフェースがシンプルで用途が広い一方で、実際の実装はアプリケーションの特定のニュアンスを使用する場合に最適であることです。そして、おそらくこの理由から、チャネルのフードの下でのAsyncOperation、新しいソケットAPI内のAsyncIOOperationなど、偉大で強力な.netのさまざまな部分に実装を見つけることができます。
ただし、公平に言えば、まだ1つの一般的な実装があります。ManualResetValueTaskSourceCore。しかし、これはすでに記事のトピックから遠すぎます。
CompareExchange
従来の同期プリミティブのオーバーヘッドを回避する人気のあるクラスのかなり人気のあるメソッド。ほとんどの人はそれに慣れていると思いますが、この構成はAsyncOperationで非常に頻繁に使用されるため、3ワードで説明する価値があります。
主流の文献では、この機能は比較およびスワップ(CAS)と呼ばれています。.netでは、Interlockedクラスで使用できます。
署名は次のとおりです。
public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;
int、long、float、double、IntPtr、objectのオーバーロードもあります。
メソッド自体はアトミックです。つまり、中断されることなく実行されます。2つの値を比較し、それらが等しい場合、変数への新しい値の割り当てを実行します。変数の値をチェックし、それに応じて変数を変更する必要がある場合、問題を解決します。
変数の値が10未満の場合に変数をインクリメントするとします。
次に2つのスレッドがあります。
ストリーム1 | ストリーム2 |
---|---|
トリガーされた条件(つまり、10未満)の変数の値をチェックします | - |
値の確認と変更の間 | 条件を満たさない値を変数に割り当てます(例:15) |
条件が満たされなくなったため、変更する必要はありませんが、値を変更します | - |
このメソッドを使用するときは、変数の実際の値を取得しながら、希望する値を正確に変更するか、変更しないでください。
location1は、値を変更する変数です。比較対象と比較され、等しい場合、値はlocation1に書き込まれます。操作が成功した場合、メソッドはlocation1変数の過去の値を返します。そうでない場合は、location1の現在の値が返されます。
より深く、これを行うアセンブリ言語命令cmpxchgがあります。ボンネットの下で使用されているのは彼女です。
スタックダイブ
このすべてのコードを考慮して、「Stack Dive」への言及をいくつか見つけました。これはとてもクールで興味深いもので、実際には非常に望ましくありません。要するに、継続の同期実行ではスタックリソースが不足する可能性があるということです。
10,000のタスクがスタイルされているとしましょう
//code1
await ...
//code2
最初のタスクが実行を完了し、それによって2番目のタスクの継続を解放するとします。これは、このスレッドで同期的に実行を開始します。つまり、この継続のフレームでスタックの一部を取得します。次に、この継続により、3番目のタスクの継続がブロック解除されます。これも、すぐに実行を開始します。等。継続またはスタックをフラッシュする何かでこれ以上待機がない場合は、スタックスペースを完全に消費します。StackOverflowとアプリケーションがクラッシュする原因は何ですか。私のコードレビューでは、AsyncOperationがこれに対してどのように戦うかについて触れます。
IValueTaskSource実装としてのAsyncOperation
ソースコード。
AsyncOperation内には、タイプAction <object>の_continuationフィールドがあります。フィールドは継続のために使用され、信じられないかもしれません。しかし、コードが現代的すぎる場合によくあるように、フィールドには追加の責任があります(ガベージコレクターやメソッドテーブルリンクの最後のビットなど)。同じシリーズのフィールド_continuation。継続自体とnullを除いて、このフィールドに格納できる2つの特別な値があります。s_availableSentinelおよびs_completedSentinel。これらのフィールドは、それぞれ操作が使用可能で完了したことを示しています。完全に非同期の操作で再利用するためだけにアクセスできます。
また、AsyncOperationはIThreadPoolWorkItemを実装します単一のメソッド-void Execute()=> SetCompletionAndInvokeContinuation()。 SetCompletionAndInvokeContinuationメソッドは、継続を行うためのものです。このメソッドは、AsyncOperationコードで直接呼び出されるか、前述のExecuteを通じて呼び出されます。結局のところ、IThreadPoolWorkItemを実装する型は、このThreadPool.UnsafeQueueUserWorkItem(this、preferLocal:false)のようにスレッドプールにスローできます。
Executeメソッドはスレッドプールによって実行されます。
継続の実行自体は非常に簡単です。
継続がローカル変数にコピーされ、その場所にs_completedSentinelが書き込まれます-タスクが完了したことを示す、人工の人形オブジェクト(または歩哨、私はスピーチで私にどう言うかわかりません)。そして、実際の継続のローカルコピーが単に実行されます。 ExecutionContextでは、これらのアクションがコンテキストに投稿されます。ここには秘密はありません。このコードは、クラスから直接呼び出すことができます-これらのアクションをカプセル化するメソッドを呼び出すだけで、またはスレッドプールのIThreadPoolWorkItemインターフェイスを介して呼び出すことができます。これで、継続実行を伴う関数が同期してどのように機能するかを推測できます。
IValueTaskSourceインターフェイスの最初のメソッドはGetResult(github)です。
それは簡単です、彼:
- _currentId.
_currentId — , . . ; - _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
- _result.
_result TrySetResult .
TrySetResultメソッド(github)。
この方法は簡単です。 -受け入れたパラメータを_resultに格納し、完了を通知します。つまり、非常に興味深いSignalCompleteionメソッドを呼び出します。
SignalCompletionメソッド(github)。
この方法では、最初に説明したすべてのものを使用します。
最初は、_continuation == nullの場合、s_completedSentinelパペットを記述します。
さらに、この方法は4つのブロックに分割できます。回路をわかりやすくするためにすぐに説明します。ブロック4は継続の同期実行にすぎません。つまり、IThreadPoolWorkItemの段落で説明したように、メソッドによる継続の簡単な実行です。
- _schedulingContext == null, .. ( if).
_runContinuationsAsynchronously == true, , — ( if).
IThreadPoolWorkItem . AsyncOperation . .
, if ( , ), , 2 3 , — .. 4 ; - _schedulingContext is SynchronizationContext, ( if).
_runContinuationsAsynchronously = true. . , , . , . 2 , :
sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
. , , ( , ), 4 — ; - , 2 . .
, _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . . - — . , .
IValueTaskSourceインターフェイスの2番目のメソッドは、 サンクトペテルブルクのロバのようにGetStatus(github)
です。
_continuation!= _CompletedSentinel、その後、ValueTaskSourceStatus.Pending戻ると
もしエラー== nullのが、その後、ValueTaskSourceStatus.Succeeded返し
た場合_error.SourceExceptionはOperationCanceledExceptionあり、その後、ValueTaskSourceStatus.Canceled返す
くらいはここまで来ているので、まあが、リターンはValueTaskSourceStatus.Faulted
3番目と最後のですが、IValueTaskSourceインターフェイスの最も複雑なメソッドはOnCompleted(github)です。
このメソッドは、完了時に実行される継続を追加します。
必要に応じて、ExecutionContextおよびSynchronizationContextをキャプチャします。
次に、上記のInterlocked.CompareExchangeを使用して、継続をフィールドに格納し、nullと比較します。 CompareExchangeが変数の現在の値を返すことを思い出します。
継続の保存が成功した場合、更新前に変数にあった値、つまりnullが返されます。これは、継続が書き込まれたときに操作がまだ完了していないことを意味します。そして、それを自分で完成させる人はそれを理解します(上で見たように)。また、追加のアクションを実行する意味がありません。これでメソッドの作業は完了です。
値が保存されなかった場合、つまり、null以外の何かがCompareExchangeから返されました。この場合、誰かがなんとかして私たちよりも早く値を入れました。つまり、2つの状況のうちの1つが発生しました。タスクがここまでに到達したよりも早く完了したか、1つ以上の継続を記録しようとしましたが、これを行うべきではありません。
したがって、戻り値をチェックして、それがs_completedSentinelと等しいかどうかを確認します。これは、完了時に書き込まれるものとまったく同じです。
- これがs_completedSentinelでない場合は、計画どおりに使用されなかったため、複数の継続を追加しようとしました。つまり、すでに書き留められているものと、私たちが書いているものです。そして、これは例外的な状況です。
- s_completedSentinel, , , . , _runContinuationsAsynchronously = false.
, , OnCompleted, awaiter'. . , AsyncOperation — System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.
この状況を回避するには、継続を非同期で実行する必要があります。これは、SignalCompleteionメソッドの最初の3つのブロックと同じスキームに従って実行されます-単にプール内、コンテキスト内、またはファクトリーとスケジューラーを介して
同期続編の例を次に示します。
class Program
{
static async Task Main(string[] args)
{
Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = true
});
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;
Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");
var writerTask = Task.Run(async () =>
{
Thread.Sleep(500);
int objectToWriteInChannel = 555;
Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
await writer.WriteAsync(objectToWriteInChannel);
Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
});
//Blocked here because there are no items in channel
int valueFromChannel = await reader.ReadAsync();
Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");
await writerTask;
Console.Read();
}
}
出力:
メイン、待機する前。スレッドID:1
書き込み待ちの前に、遅延を伴う書き込み用に作成されたスレッド。スレッドID:4
メイン、待機後(作成されたスレッドによって書き込み用に処理されます)。スレッドID:4
書き込み待機後、遅延を伴う書き込み用に作成されたスレッド。スレッドID:4