前書き
ApacheKafkaを使用してテストを作成するにはさまざまな方法があります。たとえば、TestContainersとEmbeddedKafkaを使用できます。これについては、たとえば、ここで読むことができます:KafkaStreamsをテストする際の落とし穴。ただし、KafkaServerを使用してテストを作成するためのオプションもあります。
何がテストされますか?
電子メール、電報など、さまざまなチャネルを介してメッセージを送信するためのサービスを開発する必要があるとします。
サービス名をSenderServiceとします。
サービスは次のことを行う必要があります。特定のチャネルをリッスンし、チャネルから必要なメッセージを選択し、メッセージを解析して、メッセージの最終配信のために目的のチャネルを介して送信します。
サービスをテストするには、メール送信チャネルを使用して送信するメッセージを作成し、メッセージが最終チャネルに送信されたことを確認する必要があります。
もちろん、実際のアプリケーションでは、テストはより困難になります。しかし、選択したアプローチを説明するには、そのようなテストで十分です。
サービスとテストは、Java 1.8、Kafka 2.1.0、JUnit 5.5.2、Maven3.6.1を使用して実装されます。
サービス
サービスは、その作業を開始および停止できるようになります。
void start()
void stop()
最初に、少なくとも次のパラメータを設定する必要があります。
String bootstrapServers
String senderTopic
EmailService emailService
bootstrapServers – kafka.
senderTopic – , .
emailService – .
.
«», , . «» . «» : Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.
Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
SenderConsumerLoop senderConsumerLoop =
new SenderConsumerLoop(
bootstrapServers,
senderTopic,
"sender",
"sender",
tasksExecutorService,
emailService
);
closeables.add(senderConsumerLoop);
senderTasksExecutor.submit(senderConsumerLoop);
}
«», .
«» . .
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (AutoCloseable autoCloseable : closeables) {
try {
autoCloseable.close();
} catch (Exception e) {
e.printStackTrace();
}
}
senderTasksExecutor.shutdown();
tasksExecutorService.shutdown();
stop();
try {
senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
.
«»
«» :
void run()
void close()
: run.
@Override
public void run() {
kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
kafkaConsumer.subscribe(Collections.singleton(topic));
while (true) {
calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));
}
}
«kafka-». «kafka-» . . .
json- , , .
:
{
"subject": {
"subject_type": "send"
},
"body": {
"method": "email",
"recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
"title": "42",
"message": "73"
}
}
subject_type — . «send».
method – . «email» — .
recipients – .
title – .
message – .
:
void calculate(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
calculate(record);
}
}
:
void calculate(ConsumerRecord<String, String> record) {
JSONParser jsonParser = new JSONParser();
Object parsedObject = null;
try {
parsedObject = jsonParser.parse(record.value());
} catch (ParseException e) {
e.printStackTrace();
}
if (parsedObject instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) parsedObject;
JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
if (SEND.equals(subjectType)) {
JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);
calculate(jsonBody);
}
}
}
:
void calculate(JSONObject jsonBody) {
String method = jsonBody.get(METHOD).toString();
if (EMAIL_METHOD.equals(method)) {
String recipients = jsonBody.get(RECIPIENTS).toString();
String title = jsonBody.get(TITLE).toString();
String message = jsonBody.get(MESSAGE).toString();
sendEmail(recipients, title, message);
}
}
:
void sendEmail(String recipients, String title, String message) {
tasksExecutorService.submit(() -> emailService.send(recipients, title, message));
}
.
.
«kafka-»:
static KafkaConsumer<String, String> createKafkaConsumerStringString(
String bootstrapServers,
String clientId,
String groupId
) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(properties);
}
:
interface EmailService {
void send(String recipients, String title, String message);
}
.
«kafka-».
«kafka-».
.
«kafka-». .
public class SenderServiceTest {
@Test
void consumeEmail() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String bootstrapServers = brokerHost + ":" + brokerPort;
String senderTopic = "sender_data";
try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
kafkaServerService.start();
kafkaServerService.createTopic(senderTopic);
}
}
}
. «kafka-». «kafka-» . .
«mock» :
SenderService.EmailService emailService = mock(SenderService.EmailService.class);
:
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();
:
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";
:
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
:
Thread.sleep(6000);
, :
verify(emailService).send(recipients, title, message);
:
senderService.stop();
:
public class SenderServiceTest {
@Test
void consumeEmail() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String bootstrapServers = brokerHost + ":" + brokerPort;
String senderTopic = "sender_data";
try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
kafkaServerService.start();
kafkaServerService.createTopic(senderTopic);
SenderService.EmailService emailService = mock(SenderService.EmailService.class);
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
Thread.sleep(6000);
verify(emailService).send(recipients, title, message);
senderService.stop();
}
}
}
:
public class SenderFactory {
public static final String SUBJECT = "subject";
public static final String SUBJECT_TYPE = "subject_type";
public static final String BODY = "body";
public static final String METHOD = "method";
public static final String EMAIL_METHOD = "email";
public static final String RECIPIENTS = "recipients";
public static final String TITLE = "title";
public static final String MESSAGE = "message";
public static final String SEND = "send";
public static String key() {
return UUID.randomUUID().toString();
}
public static String createMessage(String method, String recipients, String title, String message) {
Map<String, Object> map = new HashMap<>();
Map<String, Object> subject = new HashMap<>();
Map<String, Object> body = new HashMap<>();
map.put(SUBJECT, subject);
subject.put(SUBJECT_TYPE, SEND);
map.put(BODY, body);
body.put(METHOD, method);
body.put(RECIPIENTS, recipients);
body.put(TITLE, title);
body.put(MESSAGE, message);
return JSONObject.toJSONString(map);
}
}
«kafka-»
:
void start()
void close()
void createTopic(String topic)
«start» .
「zookeeper」を作成し、そのアドレスを保存します。
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();
動物園管理人クライアントを作成します。
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
サーバーのプロパティの設定:
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
サーバーの作成:
kafkaServer = TestUtils.createServer(config, new MockTime());
一緒:
public void start() {
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
kafkaServer = TestUtils.createServer(config, new MockTime());
}
サービスの停止:
@Override
public void close() {
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
トピックの作成:
public void createTopic(String topic) {
AdminUtils.createTopic(
zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}
結論
結論として、ここに示されているコードは、選択された方法のみを示していることに注意してください。
「kafka」を使用してサービスを作成およびテストするには、次のリソースを参照できます
。kafka-streams-examples
リンクとリソース
ソース
「kafkaサーバー」でテストするためのコード