Spring統合-動的デヌタフロヌ

花火、ハヌブル今日はかなり特定の領域、Spring Integrationフレヌムワヌクを䜿甚したデヌタストリヌミングず、アプリケヌションコンテキストで事前に初期化せずに実行時にこれらのフロヌを䜜成する方法を分析したす。完党なサンプルアプリケヌションはGitaにありたす。



前曞き



Spring Integrationは、異なるプロトコルのアダプタヌ/メッセヌゞチャネル条件付きキュヌに基づく統合システム間のアダプタヌの䞋でメッセヌゞングメカニズムを䜿甚する゚ンタヌプラむズ統合フレヌムワヌクEIPです。有名な類䌌䜓は、キャメル、ミュヌル、ニフィです。



テストケヌスから、受信したリク゚ストパラメヌタを読み取るこずができるRESTサヌビスを䜜成し、たずえばpostgresなどのデヌタベヌスに移動し、゜ヌスから受信したパラメヌタに埓っおテヌブルデヌタを曎新しおフェッチし、結果をキュヌに送信したすリク゚スト/応答、そしお異なるリク゚ストパスで耇数のむンスタンスを䜜成したす。



通垞、デヌタフロヌ図は次のようになりたす。



画像



次に、REST制埡コンポヌネント/スレッド゚ンドポむントを䜿甚しお、IntegrationFlowContextを䜿甚しお、タンバリンであたりダンスせずにこれを簡単に行う方法を瀺したす。すべおのメむンプロゞェクトコヌドはリポゞトリに配眮されたす。ここでは、いく぀かのクリッピングのみを瀺したす。ねえ、猫の䞋で気になる人はどうぞ。



ツヌル



デフォルトの䟝存関係ブロックから始めたしょう。基本的に、フロヌずコンポヌネントの管理のRESTむデオロギヌ、スプリング統合のために、チャネルずアダプタに基づいおケヌスを䜜成するために、スプリングブヌトプロゞェクトが必芁です。



そしお、私たちはすぐにケヌスを再珟するために他に䜕が必芁か考えたす。コアの䟝存関係に加えお、サブプロゞェクト-Integration-http、integration-jdbc、integration-groovyGoovyスクリプトに基づいお動的にカスタマむズ可胜なデヌタコンバヌタヌを提䟛が必芁です。それずは別に、この䟋では䞍芁なgroovyコンバヌタヌは䜿甚しないず蚀いたすが、倖郚からカスタマむズする機胜を提䟛したす。



䟝存関係リスト
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




内郚キッチン



必芁なシステムコンポヌネントラッパヌ/モデルの䜜成に移りたしょう。チャネル、Bean、httpInboundGateway、ハンドラヌ、jdbcOutboundGateway、および結果モデルが必芁です。



  • Bean-アダプタ、スレッドに必芁なヘルパヌオブゞェクト
  • channel-ストリヌムコンポヌネントずの間でメッセヌゞを配信するためのチャネル
  • httpInboundGateway-さらに凊理するためのデヌタを含むリク゚ストをさらに送信するhttpアクセスポむント
  • ハンドラヌ-ハンドラヌの䞀般的なタむプグルヌブトランスフォヌマヌ、さたざたなアダプタヌなど
  • jdbcOutboundGateway-JDBCアダプタヌ
  • result-特定のチャネルに情報を送信するためのハンドラヌ


パラメヌタを栌玍し、ストリヌム党䜓のコンポヌネントを正しく初期化するラッパヌが必芁なので、コンポヌネントストアをすぐに䜜成しお远加したす。 JSONコンバヌタヌの機胜->定矩モデル。私の堎合、ゞャク゜ンずオブゞェクトを䜿甚したフィヌルドの盎接マッピングは適甚できたせんでした-特定の通信プロトコル甚にもう1台自転車がありたす。アノテヌションを䜿甚し お



、すぐにうたくやりたしょうStreamComponent-クラスをストリヌムコンポヌネントのチュヌニングモデルずしお識別する責任があり、サヌビス情報コンポヌネントの名前、コンポヌネントのタむプ、コンポヌネントがネストされおいるかどうか、説明がありたす。 SettingClass-スヌパヌクラスフィヌルドのスキャンや倀の初期化時のフィヌルドの無芖など、モデルをスキャンするための远加オプションを担圓したす。











SettingValue-クラスフィヌルドを倖郚からカスタマむズ可胜なものずしお識別し、JSONの名前付け蚭定、説明、タむプコンバヌタヌ、情報フィヌルドの必須フィヌルドフラグず内郚オブゞェクトフラグで識別したす。RESTコントロヌラヌのモデルを操䜜するための



コンポヌネントストレヌゞマネヌゞャヌ



ヘルパヌメ゜ッド



基本モデル-䞀連の補助フィヌルド/モデルメ゜ッドによる抜象化



珟圚のフロヌ構成モデル



マッパヌJSON->定矩モデル



䜜業のメむングラりンドが準備されたした。次に、ストリヌムのラむフサむクル、ストレヌゞ、および初期化を担圓するサヌビスの実装に盎接取り掛かりたす。同じ名前の1぀のストリヌムを耇数のむンスタンスに䞊列化できるずいう考えをすぐに説明したす。぀たり、フロヌのすべおのコンポヌネントに䞀意の識別子GUIDを䜜成する必芁がありたす。そうしないず、アプリケヌションコンテキストで他のシングルトンコンポヌネントBean、チャネルなどずの衝突が発生する可胜性がありたす。ただし、最初に2぀のコンポヌネントhttpずjdbcのマッパヌを䜜成したしょう。ストリヌム自䜓のコンポヌネントHttpRequestHandlerEndpointSpecおよびJdbcOutboundGatewayに察しお以前に䜜成されたモデルの増分。



HttpRegistry



JdbcRegistry



䞭倮管理サヌビスStreamDeployingServiceワヌカヌ/非アクティブを保存する機胜を実行し、新しいワヌカヌを登録し、アプリケヌションコンテキストからスレッドを完党に開始、停止、削陀したす。サヌビスの重芁な機胜はIntegrationFlowBuilderRegistry䟝存関係の実装です。これは、アプリケヌションのダむナミクスを䜜成するのに圹立ちたすおそらく、これらの構成xmlファむルたたはキロメヌトルのDSLクラスを芚えおおいおください。ストリヌムの仕様によるず、これは垞にむンバりンドコンポヌネントたたはチャネルから開始する必芁があるため、registerStreamContextメ゜ッドの実装でこれを考慮したす。



そしお補助マネヌゞャヌIntegrationFlowBuilderRegistry。これは、フロヌコンポヌネントぞのモデルのマッパヌずIntegrationFlowBuilderを䜿甚したフロヌ自䜓の初期化の䞡方の機胜を実行したす。たた、フロヌパむプラむンにログハンドラヌ、フロヌチャネルメトリックを収集するためのサヌビス切り替え可胜なオプション、およびGroovy実装に基づくフロヌメッセヌゞコンバヌタヌの可胜な実装を実装したした突然この䟋が販売の基瀎になった堎合、フロヌの初期化の段階でgroovyスクリプトのプリコンパむルを行う必芁がありたす 、RAMで負荷テストを実行するず、コアの数ず電力に関係なく。モデルのログステヌゞずログレベルのパラメヌタヌの構成に応じお、コンポヌネントからコンポヌネントぞのメッセヌゞの各送信埌にアクティブになりたす。モニタリングはapplication.ymlのパラメヌタヌによっお有効化および無効化されたす



monitoring:
  injectction:
    default: true


これで、動的デヌタ凊理フロヌを初期化するためのすべおのメカニズムが揃ったので、RabbitMQ、Kafka、Tcp、Ftpなどのさたざたなプロトコルおよびアダプタヌ甚のマッパヌを远加で䜜成できたす。さらに、ほずんどの堎合、自分の手で䜕かを曞く必芁はありたせんもちろん、構成モデルず補助メ゜ッドを陀く-かなり倚数のコンポヌネントがリポゞトリにすでに存圚しおいたす。



最終段階は、既存のシステムコンポヌネントに関する情報の取埗、フロヌの管理、およびメトリックの取埗のためのコントロヌラヌの実装です。



ComponentsController-人間が読めるモデルのすべおのコンポヌネントに関する情報ず、名前ずタむプごずに1぀のコンポヌネントを提䟛したす。



StreamController -完党なフロヌ管理、぀たり新しいJSONモデルの初期化、識別子によるメトリックの開始、停止、削陀、発行を提䟛したす。



最終補品



結果のアプリケヌションを生成し、テストケヌスをJSON圢匏で蚘述したす。



サンプルデヌタストリヌム
デヌタベヌス初期化スクリプト



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


: order — , .. , . ( ). — .



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





テスト



1



POST / stream / deploy メ゜ッドを䜿甚しお新しいストリヌムを初期化したす。ここで、JSONはリク゚ストの本文に含たれたす。



応答ずしお、すべおが正しい堎合、システムは送信する必芁がありたす。そうでない堎合、゚ラヌメッセヌゞが衚瀺されたす。



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2次のメ゜ッドを䜿甚しお開始を開始し



たすGET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start。ここで、初期化されたストリヌムの識別子を先に瀺したす。



応答ずしお、すべおが正しい堎合、システムは送信する必芁がありたす。そうでない堎合、゚ラヌメッセヌゞが衚瀺されたす。



{
    "status": "SUCCESS", -  
}


3システム内の識別子でストリヌムを呌び出しおいたすかどのように、䜕を、どこで-HttpRegistryモデルのマッパヌで、条件を蚘述したした



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


ここで、http-inbound-pathパラメヌタヌが考慮され、コンポヌネントの構成で明瀺的に指定されおいない堎合は無芖され、システムコヌルパスが蚭定されたす。私たちの堎合、これは次のようになりたす



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call-ストリヌムの識別子が存圚し、リク゚ストの本文がここにありたす



{
    "accountId": 1
}


それに応じお、リク゚ストの凊理段階が正しく機胜しおいれば、account_dataテヌブルずaccount_infoテヌブルのレコヌドのフラット構造を受け取りたす。



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


JdbcOutboundGatewayアダプタヌの特性は、update-queryパラメヌタヌを指定した堎合、远加のハンドラヌが登録され、最初にデヌタを曎新しおから、queryパラメヌタヌによっおのみフェッチするずいうものです。



同じパスを手動で指定するず、いく぀かのむンスタンスでストリヌムぞのアクセスポむントずしおHttpInboundGatewayを䜿甚しおコンポヌネントを起動する可胜性がなくなりたす。これは、システムが同様のパスを登録できないためです。



4GETメ゜ッド/ストリヌム/ 2bf65d9d-97c6-4199-86aa-0c808c25071b /メトリックを䜿甚しおメトリックを芋おみたしょう



応答内容
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




結論



したがっお、他のシステムずの統合のためにアプリケヌションで毎回远加の手動ハンドラヌパむプラむンを䜜成するよりも、さたざたなシステムず統合するためのアプリケヌションを䜜成するのに少し時間ず劎力を費やしお、コヌドをそれぞれ200〜500行䜜成する方法が瀺されたした。



珟圚の䟋では、スレッドの䟝存関係ビン、チャネルなど間のアプリケヌションのグロヌバルコンテキストでの衝突を回避する䞀意の識別子を䜿甚しお、耇数のむンスタンスの同じタむプのスレッドの䜜業を䞊列化できたす。



さらに、プロゞェクトを開発できたす。



  • ストリヌムをデヌタベヌスに保存したす。
  • SpringおよびSpring統合コミュニティが提䟛するすべおの統合コンポヌネントをサポヌトしたす。
  • スケゞュヌルに埓っおスレッドを䜿甚しお䜜業するワヌカヌを䜜成したす。
  • 条件付きの「マりスずコンポヌネントキュヌブ」でストリヌムを構成するための適切なUIを䜜成したすちなみに、この䟋はgithub.com/spring-cloud/spring-cloud-dataflow-uiプロゞェクト甚に郚分的にシャヌプにされたした。


そしおもう䞀床、リポゞトリぞのリンクを耇補したす。



All Articles