Axonを介した通信を備えたマイクロサービス

この簡単なチュートリアルでは、Spring Bootでいくつかのマイクロサービスを作成し、Axonフレームワークを介してそれらの間の通信を整理します。










そのようなタスクがあるとしましょう。



株式市場には取引の源があります。このソースは、Restインターフェースを介してトランザクションを送信します。



これらのトランザクションを取得してデータベースに保存し、便利なメモリ内ストレージを作成する必要があります。



このリポジトリは、次の機能を実行する必要があります。



  • 取引のリストを返します。
  • フルポジションを返します。表「機器」-「現在の証券数」;
  • 特定の機器の戻り位置。


このタスクにどのようにアプローチしますか?



マイクロサービスファッションの原則に従って、タスクをマイクロサービスコンポーネントに分割する必要があります。



  • Restによるトランザクションの受信。
  • トランザクションをデータベースに保存します。
  • 位置データを表示するためのメモリ内ストレージ。


このチュートリアルのフレームワーク内で最初と3番目のサービスを作成し、2番目のサービスを2番目の部分に残しましょう(興味深い場合はコメントに書き込んでください)。



したがって、2つのマイクロサービスがあります。



1つ目は、外部からデータを受信します。



2つ目は、このデータを処理し、着信要求に応答します。



もちろん、マイクロサービスの水平スケーリング、ノンストップアップデート、その他のメリットを享受したいと考えています。



私たちの前で非常に難しい仕事は何ですか?



実際にはたくさんありますが、これらのマイクロサービス間でデータがどのように流れるかについて説明しましょう。あなたは彼らの間に休息を作ることもできます、あなたはある種の待ち行列を置くことができます、あなたは彼らの賛否両論で多くのことを思いつくことができます。



考えられるアプローチの1つであるAxonフレームワークを介した非同期通信を見てみましょう



このソリューションの利点は何ですか?



まず、非同期通信は柔軟性を高めます(はい、ここにはマイナスがありますが、これまでのところ長所についてのみ話します)。



次に、イベントソーシングCQRSをすぐに利用できます

第三に、Axonは既製のインフラストラクチャを提供し、ビジネスロジックの開発に集中するだけで済みます。



始めましょう。



プロジェクトは段階的に進められます。3つのモジュールがあります。



  • 一般。共通のデータ構造を持つモジュール(コピー&ペーストは好きではありません);
  • tradeCreator。Restでトランザクションを受け入れるためのマイクロサービスを備えたモジュール。
  • tradeQueries。位置を表示するためのマイクロサービスを備えたモジュール。


Spring Bootをベースにして、Axonスターターを接続しましょう。



AxonはSpringがなくても問題なく動作しますが、一緒に使用します。



ここで立ち止まって、Axonについて少しお話しする必要があります。



クライアントサーバーシステムです。サーバーがあります-これは別のアプリケーションです。dockerで実行します。



そして、マイクロサービスに自分自身を組み込むクライアントがあります。

これが写真です。最初に(docker内の)Axonサーバーが起動され、次にマイクロサービスが起動されます。



起動時に、マイクロサービスはサーバーを探し、サーバーとの対話を開始します。相互作用は、条件付きで技術とビジネスの2つのタイプに分けることができます。



技術的なものは、「私は生きています」というメッセージの交換です(このようなメッセージはデバッグログモードで表示されます)。



ビジネスは「新しい取引」のようなメッセージによって隠されています。



重要な機能は、マイクロサービスを開始した後、Axonサーバーに「何が起こったのか」を尋ねることができ、サーバーは蓄積されたイベントをマイクロサービスに送信します。したがって、マイクロサービスは、データを失うことなく比較的安全に再起動できます。

この交換スキームを使用する

、マイクロサービスの多くのインスタンスをさまざまなホストで非常に簡単に実行できます。



はい、Axon Serverの1つのインスタンスは信頼できませんが、これまでのところです。



私たちはイベントソーシングとCQRSパラダイムで働いています。これは、「チーム」、「イベント」、「サンプル」が必要であることを意味します。



1つのコマンド:「取引の作成」、1つのイベント「取引の作成」、および「すべての取引の表示」、「位置の表示」、「楽器の位置の表示」の3つの選択肢があります。



作業のスキームは次のとおりです。



  1. TradeCreatorマイクロサービスはRestトランザクションを受け入れます。
  2. tradeCreatorマイクロサービスは「createtrade」コマンドを作成し、それをAxonサーバーに送信します。
  3. Axonサーバーはコマンドを受信し、関心のある受信者にコマンドを送信します。この場合は、tradeCreatorマイクロサービスです。
  4. tradeCreatorマイクロサービスはコマンドを受信し、「ディール作成」イベントを生成して、それをAxonサーバーに送信します。
  5. Axonサーバーはイベントを受信し、関心のあるサブスクライバーに転送します。
  6. 現在、関心のある受信者は、tradeQueriesマイクロサービスの1つだけです。
  7. tradeQueriesマイクロサービスはイベントを受信し、内部データを更新します。


(イベントが形成された時点では、tradeQueries Microserviceが利用できない場合がありますが、開始するとすぐにイベントを受信することが重要です)。



はい、axonサーバーはコミュニケーションセンターにあり、すべてのメッセージはそれを通過します。



コーディングに移りましょう。



投稿がコードで乱雑にならないように、以下ではフラグメントのみを示します。例全体へのリンクは以下のとおりです。



共通モジュールから始めましょう。



その中で、一般的な部分はイベント(クラスCreatedTradeEvent)です。名前に注意してください。実際、これはこのイベントを生成したチームの名前ですが、過去の緊張状態です。過去には、最初にコマンドが表示され、イベントが作成されます。



他の一般的な構造には、ポジション(クラスポジション)、トレード(クラストレード)、トレードのサイド(エナムサイド)を記述するためのクラスが含まれます。売買。



tradeCreatorモジュールに移りましょう。



このモジュールには、取引を受け入れるためのRestインターフェイス(クラスTradeController)があります。

「ディールの作成」コマンドは、受信したディールから形成され、axon-serverに送信されます。



    @PostMapping("/trade")
    public ResponseEntity<String> create(@RequestBody Trade trade) {
        var createTradeCommand = CreateTradeCommand.builder()
                .tradeId(trade.getTradeId())
	...
                .build();
        var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
        return ResponseEntity.ok(result.get().toString());
    }


コマンドを処理するために、クラスTradeAggregateが使用されます。

Axonがそれを見つけるために、@ Aggregateアノテーションを追加します。

コマンドを処理する方法は次のようになります(省略形):



    @CommandHandler
    public TradeAggregate(CreateTradeCommand command) {
        log.info("command: {}", command);
        var event = CreatedTradeEvent.builder()
                .tradeId(command.tradeId())
		....
                .build();
        AggregateLifecycle.apply(event);
    }


コマンドからイベントが生成され、サーバーに送信されます。

コマンドはCreateTradeCommandクラスにあります。



最後のtradeQueriesモジュールを見てみましょう。



選択内容はクエリパッケージに記載されています。

このモジュールには、

パブリッククラスTradeControllerRestインターフェイスもあります



たとえば、「すべてのトランザクションを表示する」というリクエストの処理を見てみましょう。



    @GetMapping("/trade/all")
    public List<Trade> findAllTrades() {
        return queryGateway.query(new FindAllTradesQuery(),
                ResponseTypes.multipleInstancesOf(Trade.class)).join();
    }


フェッチ要求が作成され、サーバーに送信されます。



TradesEventHandlerクラスは、フェッチ要求を処理するために使用されます。

注釈付きのメソッドがあります



   @QueryHandler
    public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)


インメモリストレージからデータをフェッチするのは彼です。



このストアで情報がどのように更新されるかについて疑問が生じます。



そもそも、これは特定の選択に合わせて調整されたConcurrentHashMapsのコレクションにすぎません。

それらを更新するには、次の方法が適用されます。



    @EventHandler
    public void on(CreatedTradeEvent event) {
        log.info("event:{}", event);

        var trade = Trade.builder()
	...
                .build();
        trades.put(event.tradeId(), trade);
        position.merge(event.shortName(), event.size(),
                (oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
    }


「ディール作成」イベントを受信し、マップを更新します。



これらは、マイクロサービス開発のハイライトです。



Axonの欠点はどうですか?



まず、これはインフラストラクチャの複雑さであり、障害のポイントが現れました-Axonサーバー、すべての通信はそれを通過します。



第二に、そのような分散システムの欠点は非常に明確に現れます-一時的なデータの不整合。私たちの場合、新しい取引を受け取ってからサンプルのデータを更新するまでに、容認できないほど長い時間がかかる可能性があります。



舞台裏には何が残っていますか?



イベントソーシングとCQRSについては、それが何であり、何のためにあるのかについては何も言われていません。

これらの概念を開示しないと、いくつかの点が明確にならない可能性があります。



おそらく、一部のコードフラグメントも説明が必要です。



これについては、オープンウェビナーで話しまし



完全な例



All Articles