Atomics.wait()、Atomics.notify()、およびAtomics.waitAsync()の使用

静的メソッドAtomics.wait()およびAtomics.notify()は、ミューテックスおよび他の同様のメカニズムを実装するために使用できる低レベルの同期プリミティブです。ただし、メソッドAtomics.wait()がブロックしているため、メインスレッドで呼び出すことはできません(これを実行しようとすると、エラーがスローされますTypeError)。



バージョン8.7以降のV8エンジンは、Atomics.waitAsync()Atomics.wait()と呼ばれる非ブロッキングオプションをサポートしていますこの新しいメソッドは、メインスレッドで使用できます。 今日は、これらの低レベルAPIを使用して、同期(ワーカースレッド上)と非同期(ワーカースレッドまたはメインスレッド上)の両方で実行できるミューテックスを作成する方法を示します。











Atomics.wait()およびAtomics.waitAsync()



メソッドAtomics.wait()Atomics.waitAsync()次のパラメータ取ります。



  • buffer:に基づくタイプInt32Arrayまたはの配列BigInt64ArraySharedArrayBuffer
  • index:配列内の要素の実際のインデックス。
  • expectedValuebufferおよびで記述された場所で、メモリ内で表されると予想される値index
  • timeout:ミリ秒単位のタイムアウト(オプション、デフォルトはInfinity)。


Atomics.wait()文字列を返します。指定されたメモリ位置に期待値が見つからない場合は、Atomics.wait()すぐに終了し、文字列を返しますnot-equalそれ以外の場合、スレッドはブロックされます。ロックを解除するには、次のいずれかのイベントが発生する必要があります。1つ目は、メソッドの別のスレッドからの呼び出しAtomics.notify()であり、メソッドが関心を持っているメモリ内の場所を示しAtomics.wait()ます。2つ目は、タイムアウトの有効期限です。前者の場合Atomics.wait()は文字列返し、後者の場合okは文字列値返しますtimed-out



このメソッドAtomics.notify()は次のパラメーターを取ります。



  • typedArray:に基づくタイプInt32Arrayまたはの配列BigInt64ArraySharedArrayBuffer
  • index:配列内の要素の実際のインデックス。
  • count:通知を待機しているエージェントの数(オプションのパラメーターInfinityデフォルトで設定されています)。


方法のAtomics.notify()通知は、アドレスの通知を待っているエージェントの指定された数は、説明typedArray及びindexFIFO順でそれらをバイパスします。複数の呼び出しが行われたAtomics.wait()場合、またはAtomics.waitAsync()メモリ内の同じ場所を監視している場合、それらはすべて同じキューに入れられます。



メソッドとは異なりAtomics.wait()、この方法は、Atomics.waitAsync()すぐにそれが呼び出された値を返します。次のいずれかの値になります。



  • { async: false, value: 'not-equal' } -指定されたメモリ位置に期待値が含まれていない場合。
  • { async: false, value: 'timed-out' } -タイムアウトが0に設定されている場合のみ。
  • { async: true, value: promise } -その他の場合。


しばらくすると、promiseは文字列値によって正常に解決できますok(メソッドが呼び出された場合、Atomics.notify()渡されたメモリ内の場所に関する情報が渡されますAtomics.waitAsync())。値で解決できますtimed-out。この約束は決して拒否されません。



次の例は、使用の基本を示していますAtomics.waitAsync()



const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
//                                     |  |  ^ - ()
//                                     |  ^  
//                                     ^ 

if (result.value === 'not-equal') {
  //   SharedArrayBuffer   .
} else {
  result.value instanceof Promise; // true
  result.value.then(
    (value) => {
      if (value == 'ok') { /*   */ }
      else { /*  - */ }
    });
}

//      :
Atomics.notify(i32a, 0);


次に、同期モードと非同期モードの両方で使用できるミューテックスを作成する方法について説明します。ミューテックスの同期バージョンの実装については前に説明したことに注意してください。たとえば、この資料では。



この例では、andをtimeout呼び出すときにパラメーターを使用しませんこのパラメーターは、タイムアウト関連の条件を実装するために使用できます。ミューテックスを表す クラスはバッファで動作し、次のメソッドを実装します。Atomics.wait()Atomics.waitAsync()



AsyncLockSharedArrayBuffer



  • lock():ミューテックスをキャプチャする機会が得られるまでスレッドをブロックします(ワーカースレッドにのみ適用可能)。
  • unlock():ミューテックスを解放します(これは反対ですlock())。
  • executeLocked(callback):スレッドをブロックせずにロックを取得しようとします。このメソッドは、メインスレッドで使用できます。ロックを取得できるときにコールバックを実行する予定です。


これらのメソッドを実装する方法を見てみましょう。クラス宣言には、定数と、バッファーを受け取るコンストラクターが含まれていますSharedArrayBuffer



class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab;
    this.i32a = new Int32Array(sab);
  }

  lock() {
    /* … */
  }

  unlock() {
    /* … */
  }

  executeLocked(f) {
    /* … */
  }
}


ここで、要素i32a[0]には値LOCKEDまたはが含まれていますUNLOCKED彼は、ほかに、利害そのメモリ内の場所を表すAtomics.wait()としますAtomics.waitAsync()このクラスAsyncLockは、次の基本機能を提供します。



  1. i32a[0] == LOCKEDスレッドは(Atomics.wait()またはが呼び出された後)待機状態にありAtomics.waitAsync()、監視i32a[0]していると、最終的に通知されます。
  2. スレッドは通知を受けた後、ロックを取得しようとします。成功した場合は、ロックを解除すると、を呼び出しますAtomics.notify()


同期ロックのキャプチャと解放



lock()ワーカースレッドからのみ呼び出すことができる メソッドのコードについて考えてみます。



lock() {
  while (true) {
    const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                        /*   >>> */  AsyncLock.UNLOCKED,
                        /*   >>> */  AsyncLock.LOCKED);
    if (oldValue == AsyncLock.UNLOCKED) {
      return;
    }
    Atomics.wait(this.i32a, AsyncLock.INDEX,
                 AsyncLock.LOCKED); // <<< ,    
  }
}


メソッドがスレッドから呼び出されると、lock()最初にロックを取得しようとし、それを使用してロックAtomics.compareExchange()の状態をからに変更UNLOCKEDLOCKEDます。このメソッドAtomics.compareExchange()は、ロック状態を変更するアトミック操作を実行しようとし、指定されたメモリ領域にある元の値を返します。元の値がであった場合、UNLOCKED状態の変更が成功し、スレッドがロックを取得したことがわかります。他に何もする必要はありません。ロックの状態を変更できなかった



場合Atomics.compareExchange()は、別のスレッドがロックを保持していることを意味します。その結果、メソッドが呼び出されたスレッドはメソッドlock()を使用しようとしますAtomics.wait()別のスレッドによってロックが解放されるまで待機するため。期待される値がまだ関心のあるメモリ領域(この場合は- AsyncLock.LOCKED)に保存されている場合、呼び出しはAtomics.wait()スレッドブロックします。からの戻りはAtomics.wait()、別のスレッドがを呼び出したときにのみ発生しますAtomics.notify()



このメソッドunlock()は、ロックを状態に設定してロックを解放し、ロックが解放されるのを待機しているエージェントに通知するためにロックをUNLOCKED呼び出しますAtomics.notify()。ロック状態変更操作は常に成功すると想定されます。これは、この操作を実行しているスレッドがロックを保持しているためです。したがって、現時点では他に何もメソッドを呼び出すべきではありませんunlock()



unlock() {
  const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                      /*   >>> */  AsyncLock.LOCKED,
                      /*   >>> */  AsyncLock.UNLOCKED);
  if (oldValue != AsyncLock.LOCKED) {
    throw new Error('Tried to unlock while not holding the mutex');
  }
  Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}


通常、すべてが次のように発生します。ロックは解放され、スレッドT1がロックをキャプチャし、を使用して状態を変更しAtomics.compareExchange()ます。スレッドT2は、を呼び出してロックを取得しようとしますが、Atomics.compareExchange()その状態を変更することはできません。次に、T2が呼び出しますAtomics.wait()。この呼び出しは、スレッドをブロックします。しばらくすると、スレッドT1がロックを解除してを呼び出しますAtomics.notify()。これにより、Atomics.wait()T2への呼び出しが戻りok、スレッドT2がロックを終了します。次に、T2はロックの取得を再試行します。今回は成功します。



ここには2つの特別なケースがあります。彼らの分析の目的は、理由を証明するためAtomics.wait()、およびAtomics.waitAsync()配列要素の指定したインデックスにある特定の値をチェックします。これらは次の場合です。



  • T1 , T2 . T2 , Atomics.compareExchange(), . T1 , T2 Atomics.wait(). T2 Atomics.wait(), not-equal. T2 .
  • T1 , T2 Atomics.wait() . T1 , T2 ( Atomics.wait()) Atomics.compareExchange() . , T3, . . Atomics.compareExchange() T2 . T2 Atomics.wait() , T3 .


最後の特別なケースは、ミューテックスが正しく機能していないという事実を示しています。スレッドT2がロックが解放されるのを待っていたのに、T3はロックが解放された直後にロックを取得できた可能性があります。実際の使用により適したロックの実装では、存在するいくつかのロック状態を使用して、ロックが単に「取得」された状況と「取得中に競合が発生した」状況を区別できます。



非同期ロックキャプチャ



ノンブロッキングメソッドexecuteLocked()は、メソッドとは異なりlock()、メインスレッドから呼び出すことができます。唯一のパラメーターとしてコールバックを受信し、ロックが正常に取得された後にコールバックをスケジュールします。



executeLocked(f) {
  const self = this;

  async function tryGetLock() {
    while (true) {
      const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
                          /*   >>> */  AsyncLock.UNLOCKED,
                          /*   >>> */  AsyncLock.LOCKED);
      if (oldValue == AsyncLock.UNLOCKED) {
        f();
        self.unlock();
        return;
      }
      const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
                                       AsyncLock.LOCKED);
                                   //  ^ ,    
      await result.value;
    }
  }

  tryGetLock();
}


内部関数は、tryGetLock()最初にAtomics.compareExchange()。を使用してロックを取得しようとします。このメソッドを呼び出してロック状態の変更が成功した場合、関数はコールバックを呼び出してから、ロックを解除して終了できます。



呼び出しでAtomics.compareExchange()ロックの取得が許可されなかった場合は、ロックがおそらく解放される時点で、再度取得を試みる必要があります。ただし、スレッドをブロックしてロックが解除されるのを待つことはできません。代わりに、メソッドAtomics.waitAsync()とそれが返すpromiseを使用しロックを取得する新しい試行をスケジュールしています。



メソッドの実行に成功した場合Atomics.waitAsync()、このメソッドによって返されるpromiseは、ロックを保持しているスレッドが呼び出したときに解決されます。Atomics.notify()..。その後、ロックを取得したいスレッドは、以前と同様に、再度ロックを取得しようとします。



ここでは、同期バージョンに典型的な特殊なケースが考えられます(ロックは呼び出しAtomics.compareExchange()呼び出しの間で解放されますAtomics.waitAsync()。ロックは別のスレッドによってキャプチャされ、promiseの解決と呼び出しの間にこれを行いますAtomics.compareExchange())。したがって、実際のプロジェクトに適用できる同様のコードでは、これを考慮に入れる必要があります。



結果



本稿では、低レベルの同期プリミティブについて話しましたAtomics.wait()Atomics.waitAsync()Atomics.notify()それらに基づいてミューテックスを作成する例を分析しました。これは、メインスレッドとワーカースレッドの両方で使用できます。



Atomics.wait()、Atomics.waitAsync()、およびAtomics.notify()はプロジェクトで役立ちますか?



All Articles