ジェットストリームの背圧の解剖学

リアクティブストリームのトピックに関する多数の記事を読むことにより、読者は次のような結論に達する可能性があります。



  • 背圧は涼しい
  • バックプレッシャは、リアクティブストリーム仕様を実装するライブラリでのみ使用できます
  • この仕様は非常に複雑なので、自分で実装しようとしないでください。


この記事では、次のことを示します。



  • 背圧は非常に簡単です
  • 非同期バックプレッシャーを実装するには、セマフォの非同期バージョンを作成するだけで十分です。
  • 非同期セマフォ実装がある場合、org.reactivestreams.Publisherインターフェイス数十行のコードで実装されます


バックプレッシャは、データプロデューサーの速度をコンシューマーの速度に一致するように調整するフィードバックです。このような接続がない場合、より高速なプロデューサーがコンシューマーのバッファーをオーバーフローするか、バッファーが無次元の場合はすべてのRAMを使い果たす可能性があります。



マルチスレッドプログラミングでは、この問題は、新しい同期メカニズムであるセマフォを提案したDijkstroyによって解決されました。セマフォは許可カウンターと考えることができます。プロデューサーは、リソースを大量に消費するアクションを実行する前に、セマフォに許可を要求すると想定されます。セマフォが空の場合、プロデューサスレッドはブロックされます。



非同期プログラムはスレッドをブロックできないため、許可を得るために空のセマフォにアクセスすることはできません(ただし、他のすべてのセマフォ操作を実行することはできます)。別の方法で実行をブロックする必要があります。この別の方法は、実行していたワーカースレッドをそのままにしておくことですが、その前に、セマフォがいっぱいになるとすぐに仕事に戻るように手配します。



非同期プログラムを一時停止および再開する最も洗練された方法はポートを備えデータフローアクターとして構造化することです 。データフローモデル-ポートを備えたアクター、ポート間の有向接続、および初期トークン。取得元:データフローアクターとそのアプリケーションの構造化された説明











入力ポートと出力ポートがあります。入力ポートは、他のアクターの出力ポートからトークン(メッセージとシグナル)を受け取ります。入力ポートにトークンが含まれていて、出力ポートにトークンを配置する場所がある場合、アクティブであると見なされます。アクターのすべてのポートがアクティブな場合、実行のために送信されます。したがって、作業を再開するときに、アクタープログラムは入力ポートからトークンを安全に読み取り、週末に書き込むことができます。非同期プログラミングのすべての知恵は、この単純なメカニズムにあります。ポートを個別のアクターサブオブジェクトとして割り当てると、非同期プログラムのコーディングが大幅に簡素化され、異なるタイプのポートを組み合わせることで、その多様性を高めることができます。



従来のヒューイットアクターには2つのポートがあります。1つは受信メッセージ用のバッファーを備えた表示、もう1つは非表示のバイナリで、アクターが実行のために送信されるときにブロックするため、最初の起動が終了するまでアクターが再起動しません。必要な非同期セマフォは、これら2つのポート間のクロスです。メッセージバッファのように、多くのトークンを格納できます。隠しポートのように、これらのトークンは黒です。つまり、ペトリネットのように区別できず、トークンカウンタで十分に格納できます。



階層の最初のレベルには、AbstractActorベースクラスPortとデリバティブクラスの3つのネストされたクラスAsyncSemaPortInPort、ブロックされたポートがない場合に実行するアクターを起動するメカニズムを備えたクラスがあります。要するに、それはこのように見えます:



public abstract class AbstractActor {
    /**    */
    private int blocked = 0;

    protected synchronized void restart() {
            controlPort.unBlock();
    }

    private synchronized void incBlockCount() {
        blocked++;
    }

    private synchronized void decBlockCount() {
        blocked--;
        if (blocked == 0) {
            controlPort.block();
            excecutor.execute(this::run);
        }
    }

    protected abstract void turn() throws Throwable;

    /**   */
    private void run() {
        try {
            turn();
            restart();
        } catch (Throwable throwable) {
            whenError(throwable);
        }
    }
}


これには、最小限のポートクラスのセットが含まれています。-



Portすべてのポートの基本クラス



    protected  class Port {
        private boolean isBlocked = true;

        public Port() {
            incBlockCount();
        }

        protected synchronized void block() {
            if (isBlocked) {
                return;
            }
            isBlocked = true;
            incBlockCount();
        }

        protected synchronized void unBlock() {
            if (!isBlocked) {
                return;
            }
            isBlocked = false;
            decBlockCount();
        }
    }


非同期セマフォ:



    public class AsyncSemaPort extends Port {
        private long permissions = 0;

        public synchronized void release(long n) {
            permissions += n;
            if (permissions > 0) {
                unBlock();
            }
        }

        public synchronized void aquire(long delta) {
            permissions -= delta;
            if (permissions <= 0) { 
                //    
                //        ,
                //       
                block();
            }
        }
    }


InPort -1つの着信メッセージの最小バッファー:



    public class InPort<T> extends Port implements OutMessagePort<T> {
        private T item;

        @Override
        public void onNext(T item) {
            this.item = item;
            unBlock();
        }

        public synchronized T poll() {
            T res = item;
            item = null;
            return res;
        }
    }


クラスの完全版はここAbstractActorで見ることができます。



階層の次のレベルには、特定のポートを持つ3つの抽象アクターがありますが、処理ルーチンは定義されていません。



  • クラスAbstractProducerは、非同期セマフォタイプの1つのポート(およびデフォルトですべてのアクターに存在する内部制御ポート)を持つアクターです。
  • クラスAbstractTransformerは通常のヒューイットアクターであり、チェーン内の次のアクターの入力ポートへの参照があり、変換されたトークンを送信します。
  • クラスAbstractConsumerも通常のアクターですが、プロデューサーセマフォへのリンクがある間は、変換されたトークンをどこにも送信せず、入力トークンを吸収した後にこのセマフォを開きます。これにより、処理中のトークンの数が一定に保たれ、バッファオーバーフローは発生しません。


最後のレベルでは、すでにテストディレクトリにあり、テストで使用される特定のアクター定義されています。



  • クラスProducerActorは整数の有限ストリームを生成します。
  • クラスTransformerActorはストリームから次の番号を取得し、チェーンに沿ってさらに送信します。
  • クラスConsumerActor-結果の数値を受け入れて出力します


これで、次のように非同期の並列作業ハンドラーのチェーンを構築できます。プロデューサー-任意の数のトランスフォーマー-コンシューマー







したがって、バックプレッシャーを実装しました。リアクティブストリーム仕様よりも一般的な形式でも、フィードバックは任意の数の処理カスケードにまたがることができます。仕様のように、隣接するもののみ。



仕様を実装するには、request()メソッドを使用して渡されるアクセス許可の数に依存する出力ポートを定義する必要があります-これはになりPublisher、既存のポートInPortをこのメソッドの呼び出しで補足します-これはになりますSubscriber。つまり、インターフェイスPublisherSubscriberアクターではなく、ポートの動作を説明します。しかし、インターフェースのリストにはProcessor、ポートインターフェースにはなり得ないものもあるという事実から判断すると、仕様の作成者は、インターフェースをアクターインターフェースと見なしています。インターフェイス関数の実行を対応するポートに委任することで、これらすべてのインターフェイスを実装するアクターを作成できます。



簡単にするために、Publisher独自のバッファを持たず、バッファに直接書き込みSubscriberます。これを行うには、Subscriberサブスクライブして満たす誰かが必要ですrequest()。つまり、2つの条件があり、したがって、2つのポートが必要です-InPort<Subscriber>AsyncSemaPortそれらのどれも実装のベースとして適していませんPublisher'a、不要なメソッドが含まれているため、これらのポートを内部変数にします。



public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
    protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
    protected AbstractActor.AsyncSemaPort sema;

    public ReactiveOutPort(AbstractActor actor) {
        subscriber = actor.new InPort<>();
        sema = actor.new AsyncSemaPort();
    }
}


今回は、クラスReactiveOutPortをネストされたものとして定義しなかったため、ネストされたクラスとして定義されたポートをインスタンス化するには、コンストラクターパラメーター(囲んでいるアクターへの参照)が必要でした。



この方法subscribe(Subscriber subscriber)は、サブスクライバーを保存して呼び出すことになりsubscriber.onSubscribe()ます。



    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException();
        }
        if (this.subscriber.isFull()) {
            subscriber.onError(new IllegalStateException());
            return;
        }
        this.subscriber.onNext(subscriber);
        subscriber.onSubscribe(this);
    }


これは通常、呼び出しPublisher.request()でセマフォを上げることに要約される呼び出しになりますAsyncSemaPort.release()



    public synchronized void request(long n) {
        if (subscriber.isEmpty()) {
            return; // this spec requirement
        }
        if (n <= 0) {
            subscriber.current().onError(new IllegalArgumentException());
            return;
        }
        sema.release(n);
    }


そして今AsyncSemaPort.aquire()、リソースの使用時に呼び出し使用してセマフォを下げることを忘れないでください



    public synchronized void onNext(T item) {
        Subscriber<? super T> subscriber = this.subscriber.current();
        if (subscriber == null) {
            throw  new IllegalStateException();
        }
        sema.aquire();
        subscriber.onNext(item);
    }


AsyncSemaphore プロジェクトは、この記事のために特別に設計されました。読者を疲れさせないように、意図的に可能な限りコンパクトにしています。その結果、重大な制限があります。



  • Publisher' Subscriber'
  • Subscriber' 1


さらに、AsyncSemaPortこれは同期セマフォの完全な類似物ではありません。1つのクライアントのみが操作aquire()y AsyncSemaPort(囲んでいるアクターを意味します)を実行できます。しかし、これは不利な点ではありません-AsyncSemaPortそれはその役割を十分に果たします。原則として、あなたは違っそれを行うことができます-取るjava.util.concurrent.Semaphoreと、非同期サブスクリプション・インターフェース(参照とそれを補完AsyncSemaphore.javaをからDF4Jのプロジェクト)。このようなセマフォは、実行のアクターとスレッドを任意の順序でバインドできます。



一般に、各タイプの同期(ブロッキング)相互作用には、独自の非同期(非ブロッキング)対応物があります。したがって、同じDF4Jプロジェクトに実装がありますBlockingQueue、非同期インターフェイスによって補完されます。これにより、マルチスレッドプログラムを非同期プログラムに段階的に変換し、スレッドをアクターに部分的に置き換える可能性が広がります。



All Articles