みなさん、こんにちは!私の名前はVitalyです。Web3Techの開発者です。この投稿では、Kafkaメッセージブローカーをサポートおよび操作するためのSpring Cloud Streamフレームワークの基本的な概念と構成を、コンテキストユニットテストの完全なループとともに紹介します。Waves Enterpriseブロックチェーンプラットフォームでの全ロシアの電子投票のプロジェクトでは、このようなスキームを使用しています。
SpringCloudプロジェクトチームの一部として、Spring CloudStreamはSpringBootに基づいており、SpringIntegrationを使用してメッセージブローカーとの通信を提供します。ただし、さまざまなメッセージブローカーと簡単に統合でき、イベント駆動型またはメッセージ駆動型のマイクロサービスを作成するために最小限の構成が必要です。
構成と依存関係
まず、spring-cloud-starter-stream-kafka依存関係をbuild.gradleに追加する必要があります。
dependencies {
implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
Spring Cloud Streamプロジェクトの構成では、KafkaブローカーのURL、キュー名(トピック)、およびその他のバインディングパラメーターを含める必要があります。以下は、application.yamlサービスのYAML構成の例です。
spring:
application:
name: cloud-stream-binding-kafka-app
cloud:
stream:
kafka:
binder:
brokers: 0.0.0.0:8080
configuration:
auto-offset-reset: latest
bindings:
customChannel: #Channel name
destination: 0.0.0.0:8080 #Destination to which the message is sent (topic)
group: input-group-N
contentType: application/json
consumer:
max-attempts: 1
autoCommitOffset: true
autoCommitOnError: false
コンセプトとクラス
, , Spring Cloud Stream, , (SpringCloudStreamBindingKafkaApp.kt):
@EnableBinding(ProducerBinding::class)
@SpringBootApplication
class SpringCloudStreamBindingKafkaApp
fun main(args: Array<String>) {
SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)
}
@EnableBinding , .
.
Binding — , .
Binder — middleware .
Channel — middleware .
StreamListeners — (beans), , MessageConverter middleware “DTO”.
Message Schema — , . .
send/receive, producer consumer. , Spring Cloud Stream.
Producer Kafka, (ProducerBinding.kt):
interface ProducerBinding {
@Output(BINDING_TARGET_NAME)
fun messageChannel(): MessageChannel
}
onsumer Kafka .
ConsumerBinding.kt:
interface ConsumerBinding {
companion object {
const val BINDING_TARGET_NAME = "customChannel"
}
@Input(BINDING_TARGET_NAME)
fun messageChannel(): MessageChannel
}
Consumer.kt:
@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {
@StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
fun process(
@Payload message: Map<String, Any?>,
@Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
) {
messageService.consume(message)
}
}
Kafka . Kafka, spring-kafka-test.
MessageCollector
, . ProducerBinding payload ProducerTest.kt:
@SpringBootTest
class ProducerTest {
@Autowired
lateinit var producerBinding: ProducerBinding
@Autowired
lateinit var messageCollector: MessageCollector
@Test
fun `should produce somePayload to channel`() {
// ARRANGE
val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)
// ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
val payload = messageCollector.forChannel(producerBinding.messageChannel())
.poll()
.payload
// ASSERT
val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
assertTrue(request.entries.stream().allMatch { re ->
re.value == payloadAsMap[re.key.toString()]
})
messageCollector.forChannel(producerBinding.messageChannel()).clear()
}
}
Embedded Kafka
@ClassRule . Kafka Zookeeper , . Kafka Zookeper (ConsumerTest.kt):
@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {
@Autowired
lateinit var producerBinding: ProducerBinding
@Autowired
lateinit var objectMapper: ObjectMapper
@MockBean
lateinit var messageService: MessageService
companion object {
@ClassRule @JvmField
var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
}
@Test
fun `should consume via txConsumer process`() {
// ACT
val request = mapOf(1 to "foo", 2 to "bar")
producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
.setHeader("someHeaderName", "someHeaderValue")
.build())
// ASSERT
val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
runBlocking {
delay(20)
verify(messageService).consume(requestAsMap)
}
}
}
この投稿では、Spring CloudStreamの機能とKafkaでの使用方法を示しました。Spring Cloud Streamは、ブローカー構成の微妙な違いを簡素化したユーザーフレンドリーなインターフェースを提供し、迅速に実装され、安定して動作し、Kafkaなどの最新の人気のあるブローカーをサポートします。その結果、MessageCollectorを使用したEmbeddedKafkaRuleに基づく単体テストの例をいくつか示しました。
すべてのソースはGithubにあります。読んでくれてありがとう!