KafkaメッセージブローカーでのSpringCloudストリームバインディングの使用

みなさん、こんにちは!私の名前はVitalyです。Web3Techの開発者です。この投稿では、Kafkaメッセージブローカーをサポートおよび操作するためのSpring Cloud Streamフレームワークの基本的な概念と構成を、コンテキストユニットテストの完全なループとともに紹介します。Waves Enterpriseブロックチェーンプラットフォームでの全ロシアの電子投票のプロジェクトでは、このようなスキームを使用しています





SpringCloudプロジェクトチームの一部として、Spring CloudStreamはSpringBootに基づいており、SpringIntegrationを使用してメッセージブローカーとの通信を提供します。ただし、さまざまなメッセージブローカーと簡単に統合でき、イベント駆動型またはメッセージ駆動型のマイクロサービスを作成するために最小限の構成が必要です。





構成と依存関係

まず、spring-cloud-starter-stream-kafka依存関係をbuild.gradleに追加する必要があります





dependencies {
   implementation(kotlin("stdlib"))
   implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
   implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

   implementation("org.springframework.boot:spring-boot-starter-web")
   implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")

   testImplementation("org.springframework.boot:spring-boot-starter-test")
   testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
   testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
      
      



Spring Cloud Streamプロジェクトの構成では、KafkaブローカーのURL、キュー名(トピック)、およびその他のバインディングパラメーターを含める必要があります。以下は、application.yamlサービスのYAML構成の例です





spring:
 application:
   name: cloud-stream-binding-kafka-app
 cloud:
   stream:
     kafka:
       binder:
         brokers: 0.0.0.0:8080
         configuration:
           auto-offset-reset: latest
     bindings:
       customChannel:                   #Channel name
         destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)
         group: input-group-N
         contentType: application/json
         consumer:
           max-attempts: 1
           autoCommitOffset: true
           autoCommitOnError: false
      
      



コンセプトとクラス

, , Spring Cloud Stream, , (SpringCloudStreamBindingKafkaApp.kt):





@EnableBinding(ProducerBinding::class)

@SpringBootApplication
 
 class SpringCloudStreamBindingKafkaApp

 fun main(args: Array<String>) {

    SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)

 }
      
      



@EnableBinding , .





.





Binding — , .

Binder — middleware .

Channel — middleware .

StreamListeners — (beans), , MessageConverter middleware “DTO”.

Message Schema — , . .





send/receive, producer consumer. , Spring Cloud Stream.





Producer Kafka, (ProducerBinding.kt):





interface ProducerBinding {

   @Output(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



onsumer Kafka .





ConsumerBinding.kt:





interface ConsumerBinding {

   companion object {
       const val BINDING_TARGET_NAME = "customChannel"
   }

   @Input(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



Consumer.kt:





@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {

   @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
   fun process(
       @Payload message: Map<String, Any?>,
       @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
   ) {
       messageService.consume(message)
   }
}
      
      



Kafka . Kafka, spring-kafka-test.





MessageCollector

, . ProducerBinding payload ProducerTest.kt:





@SpringBootTest
class ProducerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var messageCollector: MessageCollector

   @Test
   fun `should produce somePayload to channel`() {
       // ARRANGE
       val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)

       // ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
       val payload = messageCollector.forChannel(producerBinding.messageChannel())
           .poll()
           .payload

       // ASSERT
       val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
       assertTrue(request.entries.stream().allMatch { re ->
           re.value == payloadAsMap[re.key.toString()]
       })

       messageCollector.forChannel(producerBinding.messageChannel()).clear()
   }
}
      
      



Embedded Kafka

@ClassRule . Kafka Zookeeper , . Kafka Zookeper (ConsumerTest.kt):





@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var objectMapper: ObjectMapper

   @MockBean
   lateinit var messageService: MessageService

   companion object {
       @ClassRule @JvmField
       var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
   }

   @Test
   fun `should consume via txConsumer process`() {
       // ACT
       val request = mapOf(1 to "foo", 2 to "bar")
       producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
           .setHeader("someHeaderName", "someHeaderValue")
           .build())

       // ASSERT
       val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
       runBlocking {
           delay(20)
           verify(messageService).consume(requestAsMap)
       }
   }
}
      
      



この投稿では、Spring CloudStreamの機能とKafkaでの使用方法を示しました。Spring Cloud Streamは、ブローカー構成の微妙な違いを簡素化したユーザーフレンドリーなインターフェースを提供し、迅速に実装され、安定して動作し、Kafkaなどの最新の人気のあるブローカーをサポートします。その結果、MessageCollectorを使用したEmbeddedKafkaRuleに基づく単体テストの例をいくつか示しました。





すべてのソースはGithubにあります。読んでくれてありがとう!








All Articles