その1つがAzureService Busです。今日は、通常のSpringBootアプリケーションで使用する機能について説明します。
Azure ServiceBusとは
Azure Service Busについて少し説明すると、クラウドメッセージブローカー(RabbitMQ、ActiveMQのクラウド代替)です。キュー(メッセージが1人の受信者に配信される)とトピック(公開/サブスクライブメカニズム)を サポートします-詳細はこちら
サポートが宣言されています:
- 順序付けられたメッセージ-ドキュメントにはこれはFIFOであると記載されていますが、メッセージセッションの概念を使用して実装されています-キュー全体ではなく、メッセージのグループです。メッセージの順序を保証する必要がある場合は、メッセージを1つのグループに結合すると、グループ内のメッセージがFIFOとして配信されます。したがって、Azure Service Bus QueueはFIFOではなく、メッセージを適切なランダムに配信します
- デッドレターキュー-ここではすべてが単純で、N回の試行または一定期間後にメッセージを正常に配信できませんでした-DLQに移動しました
- スケジュールされた配信-配信前に遅延を設定できます
- メッセージの延期-キュー内のメッセージを非表示にします。メッセージは自動的に配信されませんが、IDで取得できます。このIDをどこかに保存する必要があります
Azure ServiceBusと統合する方法
Azure ServiceBusはAMQP1.0をサポートしているため、RabbitMQクライアントとの互換性はありません。bunnyはAMQP0.9.1を使用し
ます。サービスバスで動作できる唯一の「標準」クライアントはApacheQpidです。
SpringBootアプリケーションをServiceBusとペアリングする方法は3つあります。
- JMS + QPID — , — QPID — .
timeout producer — — factory.setCacheProducers(false); - Spring Cloud — Azure Service Bus — , . Service Bus
( 1.2.6) — , azure service bus java sdk.
Spring Integration — , «Scheduled delivery» «Message deferral» .
sdk, MessageAndSessionPump
- azure service bus java sdk — ,
Spring Cloud — Azure Service Bus
この方法について詳しく説明し、
サンプルアプリケーションを使用する機能について説明します。公式リポジトリにあるため、コードを複製しても意味がありません。例のあるリポジトリはここにあります。
なぜなら それはSpringIntegration Messagingであり、すべてChannel、MessageHandler、MessagingGateway、ServiceActivatorに帰着します。
そして、ServiceBusQueueTemplateがあります。
メッセージの送信
送信したいメッセージを書き込むチャネルが 必要です。もう一方の端には、サービスバスにメッセージを送信するMessageHandlerがあります。MessagHandlerがあるcom.microsoft.azure.spring.integration.core.DefaultMessageHandler -これは、外部サービスへのコネクタです。 それをチャネルにバインドする方法は? -注釈を追加します- @ ServiceActivator(inputChannel = OUTPUT_CHANNEL)これで、MessagHandlerはOUTPUT_CHANNELチャネルをリッスンします。 次に、何らかの方法でチャネルにメッセージを書き込む必要があります。ここでも春の魔法です。MessagingGatewayをアナウンスし、名前でチャネルにバインドします。
例の 抜粋:
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
void send(String text);
}
以上です:ゲートウェイ->チャネル-> MessagHandler- > ServiceBusQueueTemplate- > ServiceBusMessageConverter。
コードでは、ゲートウェイを挿入してsendメソッドを呼び出す必要があります。コールチェーンでServiceBusMessageConverter
について言及したのには、理由があります。カスタムヘッダー(CORRELATION_IDなど)をメッセージに追加する場合、これは、org.springframework.messaging.MessageHeadersから紺碧のメッセージに移動する必要がある場所です。
特別なメソッドsetCustomHeaders。
この場合、ゲートウェイは次のようになります。
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}
メッセージの受信
わかりました、メッセージを送信する方法、今すぐ取得する方法を知っていますか?
ここではすべてが同じである-のMessageProducerは- >チャネル- >ハンドラ
ザ・のMessageProducerがあるcom.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter -これは、外部サービスへの私たちのコネクタです。内部では、ServiceBusMessageConverterと同じServiceBusQueueTemplateを使用して、カスタムヘッダーを読み取り、Spring統合メッセージに配置できます。
チャネルはすでに手動でインストールされています。
@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
ServiceBusQueueOperation queueOperation) {
queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
queueOperation);
adapter.setOutputChannel(inputChannel);
return adapter;
}
ただし、ハンドラー自体は@ServiceActivatorを介してチャネルに接続されます。
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
.......
あなたはすぐに次の行を得ることができます:
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......
奇妙なCheckpointerチェックポインターパラメーターに気付いたかもしれません。これは、メッセージ処理を手動で確認するために使用されます。ServiceBusQueueInboundChannelAdapterの
作成時にCheckpointMode.MANUALを設定した場合は、メッセージの確認を自分で送信する必要があります。CheckpointMode.RECORD を使用すると、確認が自動的に送信されます。詳細はServiceBusQueueTemplateコードにあります。
使用の特徴
だから、私たちがすでに行った「レーキ」と「チップ」のリスト。
ReceiveMode.PEEKLOCK
Azure Service BusはPEEKLOCKモードをサポートしています。コンシューマーはメッセージを受け取り、サービスバスにロックし、一定時間(ロック期間)は誰もアクセスできなくなりますが、削除されません。割り当てられた時間内に、消費者が処理の確認を送信しなかった場合(成功/放棄、またはロックを延長しなかった場合)、メッセージは再び利用可能であると見なされ、新しい配信が試行されます。
興味深いことに、放棄するとロックがリセットされ、メッセージはすぐに再配信できるようになります。
ServiceBusQueueTemplateのデフォルトでは、QueueClientモードのReceiveMode.PEEKLOCKが作成されます。未処理の例外
がハンドラーで発生した場合-確認応答はサーバーに送信されず、メッセージはロックされたままになり、タイムアウトによって再配信されます。
この場合、配信カウンターが増加しますが、これは論理的です。
これがバグなのか機能なのかはわかりませんが、必要な場合に再試行の間に遅延を設けると非常に便利です。
再試行してもメッセージを処理できない場合は、例外をキャッチしてメッセージを処理済みとしてマークし、アプリケーションにロジックを追加する必要があります。そうしないと、再配信数の制限(サービスバスにキューを作成するときに設定)に達するまで、メッセージが何度も配信されます。 )
同時実行性とプリフェッチメッセージ数
ご想像のとおり、同時実行設定は並列メッセージハンドラーの数を担当し、プリフェッチメッセージ数はサーバーからバッファーに入るメッセージの数です。
デフォルトでは、ServiceBusQueueTemplateは両方のパラメーターの値が1で自動構成されます(AzureServiceBusQueueAutoConfiguration)。デフォルトでは、各キューに1つの処理スレッドがありますが、個々のメッセージごとに確認応答を伴うサービスバスの概念は、多くの同時プロセッサを意味します。リクエストの処理に時間がかかる場合、これはさらに重要です。
残念ながら、これらの設定はアプリケーション構成(application.yml / application.properties)を介して設定することはできず、コードでのみ設定できます。ただし、コードを使用しても、キューごとに異なる設定を設定することはできません。
したがって、異なる設定を行う必要がある場合は、ServiceBusQueueInboundChannelAdapterごとに複数のServiceBusQueueTemplateBeanを作成する必要があります。
紺碧のサービスバスjavasdk内のCompletableFuture
自身が周り実装されている紺碧サービスバスjava sdkのCompletableFutureとCachedThreadPoolエグゼキュータは- MessagingFactory.INTERNAL_THREAD_POOLはとてもスレッドローカル豆のすべての種類に注意してください
注文したメッセージ
サービスバスをジョブキューとして使用します。一部のジョブは相互に依存しているため、作成された順序で実行する必要があります。
上で述べたように、Tシャツはメッセージセッションの概念を使用します-メッセージがキーによってセッションにグループ化される場合(ヘッダーで送信される)、セッションキーを持つメッセージが少なくとも1つある限り、セッションは存在します-ドキュメントで詳細に説明されています
サービスバスは、追加する順序でそのようなグループ内のメッセージの配信を保証しますサーバー(つまり、サービスバスサーバーがリポジトリに書き込んだ順序)。
セッション対応キューを作成した場合も言及する価値があります。これは、すべてのメッセージにセッションキー付きのヘッダーが必要であることを意味します。
すぐに、サービスバスがFIFOキューにメッセージを並べる可能性に非常に満足しました-メッセージのグループであっても。
しかし、しばらくすると、問題に気づき始めました。
- いくつかのメッセージが無限に到着し始めました
- キュー処理が遅くなりました
- サービスバスの統計では、リクエストの半分が失敗としてマークされ、アイドル状態の場合、失敗したリクエストは空のキューにも表示されます
sdkコードを調べると、セッションでの作業の特殊性がわかりました。
- 消費者はセッションをキャプチャし、セッション内の利用可能なすべてのメッセージの読み取りを開始します
- 同時に、セッション数は同時実行パラメーターと同じように処理されます
- unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
- — success abandon. — delay re-delivery
.. abandon — , delivery counter .
delivery count
その結果、彼らはこのサービスバス機能を放棄して自転車を書き、サービスバスがトリガーとして機能します。
セッション対応キューがキャンセルされるとすぐに、統計のエラー、つまりサービスバスへの要求が消えました。
JMS + Qpidバンドルの場合-この機能は使用できません。
1Gを超えるキューサイズの潜在的な問題
まだ会っていませんが、キューサイズが1Gを超えると不安定になり始めるそうです。
これに遭遇した場合、またはその逆の場合は、すべてが機能します-コメントに書き込んでください。
リクエストのトレースに関する問題
標準の紺碧のアプリケーションインサイトエージェントは、依存関係としてのメッセージ送信と要求としての受信メッセージを追跡できません。
コードを追加する必要がありました。
結果
メッセージ処理時間が長いジョブキューが必要で、キューが不要な場合は、を使用できます。
メッセージ処理が高速な場合(Azure Event Hubを使用)、通常のKafkaであれば、標準のクライアントで問題なく動作します。