ねえ!私の名前はAndreySerebryanskyです。私は、データ運用チームのデータエンジニアです。私たちのチームは、スノーフレークリポジトリを埋めるだけでなく、残りのチームがリアルタイムのデータを持っていることを確認する責任があります。たとえば、トランザクションのフィード(これらは、顧客の購入、転送、受け取ったキャッシュバック)は、当社のデータに基づいて入力されます。
これらすべてのタスクには、Kafka、そして最も重要なのはKafkaStreamsを使用します。今日は、Kafka Streamsを使用できるタスクについて説明し、簡単な例のコードを示します。これは、Kafkaを使用しているが、KafkaStreamsをまだ試したことがない人に役立ちます。Kafkaトピックの処理中に状態を保持したい場合、または一部のトピックを他のトピックからの情報で強化するための単純な構文を探している場合は、今日、これを簡単かつ実用的に箱から出して行う方法を示します。
記事の概要
KafkaStreamsについて少し
Kafka Streams - Java. Kafka Java/Scala.
exactly once processing kafka transactions.
Kafka Streams , stateful (, ).
Kafka Streams?
: , - , , , , .
, - . , , . , , , , , .
: , . , , , .
, .
, , , . . Kafka Streams. , ,
, .
№1.
, . (brand_id) ( ).
.
.
builder.streams("authorization-events")
.join(
builder.globalTable("brands"),
auth -> auth.get("brand_id"), // ,
(brand, auth) -> auth.set("brandName", brand.get("name")) //
);
builder? . :
import org.apache.kafka.streams.StreamsBuilder;
...
StreamsBuilder builder = new StreamsBuilder();
, Kafka Streams id ( , ).
id ?
Kafka Streams , , - . builder.globalTable(topicName)
.
. , , . , . , , .
, Kafka Streams .
№2. Origination
Vivid Money, , . Origination - Vivid.
Kafka Connect open-source dynamodb JSON.
, . , , . Apache AVRO. .
Avro
{
"type": "record",
"name": "OriginationClient",
"namespace": "datahub",
"fields": [
{
"name": "firstName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "lastName",
"type": [
"null",
"string"
],
"default": null
},
...
]
}
, :
Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);
builder.stream("origination-json-topic")
.mapValues(val -> avroConverter.convert(val))
.to("origination-avro-topic");
AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .
, . , , , . (diff) . , .
, . . . , Kafka Streams . , , .
:
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...
var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore**("state-store"**), //
Serdes.Bytes(), //
new GenericAvroSerde() //
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") // ,
.to(outputTopic);
ChangeTransformer :
public class ChangeTransformer implements Transformer {
private KeyValueStore<Bytes, GenericRecord> stateStore;
@Override
public void init(ProcessorContext processorContext) {
this.stateStore = processorContext.getStateStore("state-store");
}
@Override
public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
GenericRecord prevState = stateStore.get(recordKey);
return extractDiff(prevState, record);
}
...
}
?
StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
.filter(...)
.map(...)
.to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); //
...
kafkaStreams.stop();
Kafka Streams
Kafka Streams . . 16 , 16 . , .
, state-store ( ChangeTransformer-), , ! .
: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model
Kafka Streams :
stateful (join, get previous state). , .
. map, filter, join DSL. ,
transform()
. ChangeTransformer-, .
. . .
P.S. ) , !