JavaからAzureServiceBusを使用する

こんにちは同僚!たまたま、アプリケーションはjavaスタックで記述されていますが、Azureでホストされています。そして、クラウドプロバイダーの管理サービスを最大限に活用しようとしています。



その1つがAzureService Busです。今日は、通常のSpringBootアプリケーションで使用する機能について説明します。



レーキ機能 について読みたい場合は、記事の最後までスクロールしてください



Azure ServiceBusとは



Azure Service Busについて少し説明すると、クラウドメッセージブローカー(RabbitMQ、ActiveMQのクラウド代替)です。キュー(メッセージが1人の受信者に配信される)とトピック(公開/サブスクライブメカニズム)を サポートします-詳細はこちら



サポートが宣言されています:



  1. 順序付けられたメッセージ-ドキュメントにはこれはFIFOであると記載されていますが、メッセージセッションの概念を使用して実装されています-キュー全体ではなく、メッセージのグループです。メッセージの順序を保証する必要がある場合は、メッセージを1つのグループに結合すると、グループ内のメッセージがFIFOとして配信されます。したがって、Azure Service Bus QueueはFIFOではなく、メッセージを適切なランダムに配信します
  2. デッドレターキュー-ここではすべてが単純で、N回の試行または一定期間後にメッセージを正常に配信できませんでした-DLQに移動しました
  3. スケジュールされた配信-配信前に遅延を設定できます
  4. メッセージの延期-キュー内のメッセージを非表示にします。メッセージは自動的に配信されませんが、IDで取得できます。このIDをどこかに保存する必要があります


Azure ServiceBusと統合する方法



Azure ServiceBusはAMQP1.0をサポートしているため、RabbitMQクライアントとの互換性はありません。bunnyはAMQP0.9.1を使用し



ますサービスバスで動作できる唯一の「標準」クライアントはApacheQpidです。



SpringBootアプリケーションをServiceBusとペアリングする方法は3つあります。



  1. JMS + QPID — , — QPID — .

    timeout producer — — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — , . Service Bus

    ( 1.2.6) — , azure service bus java sdk.



    Spring Integration — , «Scheduled delivery» «Message deferral» .



    sdk, MessageAndSessionPump

  3. azure service bus java sdk — ,


Spring Cloud — Azure Service Bus



この方法について詳しく説明し、

サンプルアプリケーションを使用する機能について説明します。公式リポジトリにあるため、コードを複製しても意味がありません例のあるリポジトリはここにあります



なぜなら それはSpringIntegration Messagingであり、すべてChannel、MessageHandler、MessagingGateway、ServiceActivatorに帰着します。



そして、ServiceBusQueueTemplateがあります。



メッセージの送信



送信したいメッセージを書き込むチャネルが 必要です。もう一方の端には、サービスバスにメッセージを送信するMessageHandlerがあります。MessagHandlerがあるcom.microsoft.azure.spring.integration.core.DefaultMessageHandler -これは、外部サービスへのコネクタです。 それをチャネルにバインドする方法は? -注釈を追加します- @ ServiceActivator(inputChannel = OUTPUT_CHANNEL)これで、MessagHandlerはOUTPUT_CHANNELチャネルをリッスンします 次に、何らかの方法でチャネルにメッセージ書き込む必要があります。ここでも春の魔法です。MessagingGatewayをアナウンスし、名前でチャネルにバインドします。















例の 抜粋



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}


以上です:ゲートウェイ->チャネル-> MessagHandler- > ServiceBusQueueTemplate- > ServiceBusMessageConverter



コードでは、ゲートウェイを挿入しsendメソッドを呼び出す必要がありますコールチェーンでServiceBusMessageConverter



について言及したのには、理由があります。カスタムヘッダー(CORRELATION_IDなど)をメッセージに追加する場合、これは、org.springframework.messaging.MessageHeadersから紺碧のメッセージに移動する必要がある場所です。

特別なメソッドsetCustomHeaders



この場合、ゲートウェイは次のようになります。



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}


メッセージの受信



わかりました、メッセージを送信する方法、今すぐ取得する方法を知っていますか?



ここではすべてが同じである-のMessageProducerは- >チャネル- >ハンドラ



ザ・のMessageProducerがあるcom.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter -これは、外部サービスへの私たちのコネクタです。内部では、ServiceBusMessageConverter同じServiceBusQueueTemplateを使用してカスタムヘッダーを読み取りSpring統合メッセージに配置できます。



チャネルはすでに手動でインストールされています。



@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}


ただし、ハンドラー自体は@ServiceActivatorを介してチャネルに接続されます。



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......


あなたはすぐに次の行を得ることができます:



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......


奇妙なCheckpointerチェックポインターパラメーターに気付いたかもしれません。これはメッセージ処理を手動で確認するために使用されます。ServiceBusQueueInboundChannelAdapterの

作成時にCheckpointMode.MANUALを設定した場合は、メッセージの確認を自分で送信する必要があります。CheckpointMode.RECORD を使用すると、確認が自動的に送信されます詳細はServiceBusQueueTemplateコードにあります







使用の特徴



だから、私たちがすでに行った「レーキ」と「チップ」のリスト。



ReceiveMode.PEEKLOCK



Azure Service BusはPEEKLOCKモードをサポートしてます。コンシューマーはメッセージを受け取り、サービスバスにロックし、一定時間(ロック期間)は誰もアクセスできなくなりますが、削除されません。割り当てられた時間内に、消費者が処理の確認を送信しなかった場合(成功/放棄、またはロックを延長しなかった場合)、メッセージは再び利用可能であると見なされ、新しい配信が試行されます。



興味深いことに、放棄するとロックがリセットされ、メッセージはすぐに再配信できるようになります。



ServiceBusQueueTemplateのデフォルトでは、QueueClientモードのReceiveMode.PEEKLOCKが作成さます未処理の例外



がハンドラーで発生した場合-確認応答はサーバーに送信されず、メッセージはロックされたままになり、タイムアウトによって再配信されます。

この場合、配信カウンターが増加しますが、これは論理的です。



これがバグなのか機能なのかはわかりませんが、必要な場合に再試行の間に遅延を設けると非常に便利です。



再試行してもメッセージを処理できない場合は、例外をキャッチしてメッセージを処理済みとしてマークし、アプリケーションにロジックを追加する必要があります。そうしないと、再配信数の制限(サービスバスにキューを作成するときに設定)に達するまで、メッセージが何度も配信されます。 )



同時実行性とプリフェッチメッセージ数



ご想像のとおり、同時実行設定は並列メッセージハンドラーの数を担当し、プリフェッチメッセージ数はサーバーからバッファーに入るメッセージの数です。



デフォルトでは、ServiceBusQueueTemplateは両方のパラメーターの値が1で自動構成されます(AzureServiceBusQueueAutoConfiguration)。デフォルトでは、各キューに1つの処理スレッドがありますが、個々のメッセージごとに確認応答を伴うサービスバスの概念は、多くの同時プロセッサを意味します。リクエストの処理に時間がかかる場合、これはさらに重要です。



残念ながら、これらの設定はアプリケーション構成(application.yml / application.properties)を介して設定することはできず、コードでのみ設定できます。ただし、コードを使用しても、キューごとに異なる設定を設定することはできません。



したがって、異なる設定を行う必要がある場合は、ServiceBusQueueInboundChannelAdapterごとに複数のServiceBusQueueTemplateBeanを作成する必要があります。



紺碧のサービスバスjavasdk内のCompletableFuture



自身が周り実装されている紺碧サービスバスjava sdkのCompletableFutureCachedThreadPoolエグゼキュータは- MessagingFactory.INTERNAL_THREAD_POOLはとてもスレッドローカル豆のすべての種類に注意してください



注文したメッセージ



サービスバスをジョブキューとして使用します。一部のジョブは相互に依存しているため、作成された順序で実行する必要があります。



上で述べたように、Tシャツはメッセージセッションの概念を使用します-メッセージがキーによってセッションにグループ化される場合(ヘッダーで送信される)、セッションキーを持つメッセージが少なくとも1つある限り、セッションは存在します-ドキュメントで詳細に説明されています

サービスバスは、追加する順序でそのようなグループ内のメッセージの配信を保証しますサーバー(つまり、サービスバスサーバーがリポジトリに書き込んだ順序)。



セッション対応キューを作成した場合も言及する価値があります。これは、すべてのメッセージにセッションキー付きのヘッダーが必要であることを意味します。



すぐに、サービスバスがFIFOキューにメッセージを並べる可能性に非常に満足しました-メッセージのグループであっても。



しかし、しばらくすると、問題に気づき始めました。



  • いくつかのメッセージが無限に到着し始めました
  • キュー処理が遅くなりました
  • サービスバスの統計では、リクエストの半分が失敗としてマークされ、アイドル状態の場合、失敗したリクエストは空のキューにも表示されます


sdkコードを調べると、セッションでの作業の特殊性がわかりました。



  1. 消費者はセッションをキャプチャし、セッション内の利用可能なすべてのメッセージの読み取りを開始します
  2. 同時に、セッション数は同時実行パラメーターと同じように処理されます
  3. unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
  4. — success abandon. — delay re-delivery

    .. abandon — , delivery counter .

    delivery count


その結果、彼らはこのサービスバス機能を放棄して自転車を書き、サービスバスがトリガーとして機能します。



セッション対応キューがキャンセルされるとすぐに、統計のエラー、つまりサービスバスへの要求が消えました。



JMS + Qpidバンドルの場合-この機能は使用できません。



1Gを超えるキューサイズの潜在的な問題



まだ会っていませんが、キューサイズが1Gを超えると不安定になり始めるそうです。



これに遭遇した場合、またはその逆の場合は、すべてが機能します-コメントに書き込んでください。



リクエストのトレースに関する問題



標準の紺碧のアプリケーションインサイトエージェントは、依存関係としてのメッセージ送信と要求としての受信メッセージを追跡できません。



コードを追加する必要がありました。



結果



メッセージ処理時間が長いジョブキューが必要で、キューが不要な場合は、を使用できます。



メッセージ処理が高速な場合(Azure Event Hubを使用)、通常のKafkaであれば、標準のクライアントで問題なく動作します。



All Articles