VividMoneyデータウェアハウスチームでKafkaStreamsをどのように使用しますか?

ねえ!私の名前はAndreySerebryanskyです。私は、データ運用チームのデータエンジニアです。私たちのチームは、スノーフレークリポジトリを埋めるだけでなく、残りのチームがリアルタイムのデータを持っていることを確認する責任があります。たとえば、トランザクションのフィード(これらは、顧客の購入、転送、受け取ったキャッシュバック)は、当社のデータに基づいて入力されます。





これらすべてのタスクには、Kafka、そして最も重要なのはKafkaStreamsを使用します。今日は、Kafka Streamsを使用できるタスクについて説明し、簡単な例のコードを示します。これは、Kafkaを使用しているが、KafkaStreamsをまだ試したことがない人に役立ちます。Kafkaトピックの処理中に状態を保持したい場合、または一部のトピックを他のトピックからの情報で強化するための単純な構文を探している場合は、今日、これを簡単かつ実用的に箱から出して行う方法を示します。





記事の概要

  1. KafkaStreamsについて少し





  2. なぜKafkaStreamsが必要なのですか





  3. ケースNo.1。ブランド情報でお客様の購入を充実させる





  4. ケース番号2。オリジネーションチームからストレージに顧客データを取得します





  5. これをすべて開始するにはどうすればよいですか?





  6. KafkaStreamsのスケーラビリティについて少し





  7. 結論





KafkaStreamsについて少し

Kafka Streams - Java. Kafka Java/Scala.





exactly once processing kafka transactions.





Kafka Streams , stateful (, ).





Kafka Streams?

: , - , , , , .





, - . , , . , , , , , .





: , . , , , .





ソースで問題が発生した場合に待機しながら、さまざまなソースからデータを順番にフェッチします。
, , -

, .





友達が多すぎる

, , , . . Kafka Streams. , ,





Kafka Streamsは、必要なデータを事前に取得します
Kafka Streams

, .





№1.

, . (brand_id) ( ).





トップブランド

.





承認トピック

.









builder.streams("authorization-events")
    .join(
        builder.globalTable("brands"), 
        auth -> auth.get("brand_id"), // ,      
        (brand, auth) -> auth.set("brandName", brand.get("name")) //  
    );

      
      



builder? . :





import org.apache.kafka.streams.StreamsBuilder;
...

StreamsBuilder builder = new StreamsBuilder();

      
      



, Kafka Streams id ( , ).





id ?

Kafka Streams , , - . builder.globalTable(topicName)



.





. , , . , . , , .





https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality
https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality

, Kafka Streams .





№2. Origination

Vivid Money, , . Origination - Vivid.





姓名に関する情報は、オリジネーションチームのデータベースに送られます
Origination

Kafka Connect open-source dynamodb JSON.





dynamodbからkafkaにデータを取得します
dynamodb

, . , , . Apache AVRO. .





Avro
{
  "type": "record",
  "name": "OriginationClient",
  "namespace": "datahub",
  "fields": [
    {
      "name": "firstName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "lastName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    ...
  ]
}

      
      



, :





Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);

builder.stream("origination-json-topic")
    .mapValues(val -> avroConverter.convert(val))
    .to("origination-avro-topic");

      
      



AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .





, . , , , . (diff) . , .





, . . . , Kafka Streams . , , .





:





import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...

var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore**("state-store"**), //                
  Serdes.Bytes(), //       
    new GenericAvroSerde() //       
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") //  ,  
    .to(outputTopic);

      
      



ChangeTransformer :





public class ChangeTransformer implements Transformer {
  private KeyValueStore<Bytes, GenericRecord> stateStore;

  @Override
  public void init(ProcessorContext processorContext) {
     this.stateStore = processorContext.getStateStore("state-store");
  }
  @Override
  public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
    GenericRecord prevState = stateStore.get(recordKey);
    return extractDiff(prevState, record);
  }
  ...
}

      
      



?

StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
        .filter(...)
        .map(...)
        .to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); // 
...
kafkaStreams.stop();

      
      



Kafka Streams

Kafka Streams . . 16 , 16 . , .





, state-store ( ChangeTransformer-), , ! .





: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model





Kafka Streams :





  • stateful (join, get previous state). , .





  • . map, filter, join DSL. , transform()



    . ChangeTransformer-, .





  • . . .





P.S. ) , !








All Articles