ApacheKafka開発者が知っておくべき5つのこと







Apache Kafka — , 30% Fortune 500. Kafka , , , , Kafka.







, — . , , Kafka, .







1.



KafkaProducer



acks



. acks



, , . :







none



— . .







one



— , .







all



— .







, : , , .







acks=all



. acks



all Kafka, Kafka — . , . , . :









,

.







acks=all



. , . . - , . . , , ! , , .







:









- ,

, «»

.







acks=all



, . , , , .







, , . acks=all



. .







: min.insync.replicas



. min.insync.replicas



, , . min.insync.replicas



, . min.insync.replicas



— 1. , , 2.







:









! min.insync.replicas=2 , .







, . NotEnoughReplicasException



NotEnoughReplicasAfterAppendException



, . , , , .







min.insync.replicas



acks , .







— Kafka. API Kafka , .







2. sticky partitioner API



Kafka , . Kafka -, null. Kafka . . . Partitioner .







ProducerRecord ProducerRecord. .







, ProducerRecord , . , .

ProducerRecord , Kafka (round-robin). 0, — 1 , . 0 .







:









. round robin , .







. : . , . , .







. . , , :









: 2 0.

.

round robin. .







, . — , .







Apache Kafka 2.4.0 sticky partitioner, . Sticky partitioner round robin , , . , , sticky partitioner:









sticky partitioner, 0. — 1. — 2, .







, , . , partitioner . , , .







, Apache Kafka sticky partitioner, KIP-480.







.







3. stop-the-world



Kafka — , , . , Kafka , — , . , , Kafka , .







Kafka 2.4 — . , .







, , . group.id . . .







:













, . , ? , ? , — .







:













, 2 - . . () . , 2 , . , , .







. , , stop-the-world. , ConsumerPartitionAssignor



, , , .







eager rebalancing



. — , . , .







. , , — (incremental cooperative rebalancing). Kafka Connect Apache Kafka 2.3, . . . , , .







, , , , . , .







, , . , . CooperativeStickyAssignor



. CooperativeStickyAssignor



, .







, partition.assignment.strategy



CooperativeStickyAssignor



. . , . Kafka Streams, . Kafka Streams , .







4.



Apache Kafka bin



. . console-consumer



, console-producer



, dump-log



delete-records



.







Kafka console producer







console-producer . -, . console-producer:







kafka-console-producer --topic  \
                        --broker-list <broker-host:port> 
      
      





. Enter, .







, . , — :







kafka-console-producer --topic  \
                       --broker-list <broker-host:port> \
                       --property parse.key=true \
                       --property key.separator=":"
      
      





key.separator



. . - . Confluent Schema Registry, CLI Avro, Protobuf JSON.







: .







Kafka console consumer







console-consumer . . , . , , :







kafka-console-consumer --topic  \
                          --bootstrap-server <broker-host:port>
      
      





( ). , --from-beginning



.







kafka-console-consumer --topic <topic-name> \
                          --bootstrap-server <broker-host:port> \
                          --from-beginning
      
      





Schema Registry, CLI Avro, Protobuf JSON. Schema Registry Avro, Protobuf JSON, Java: String, Long, Double, Integer . . String.







, --key-deserializer



--value-deserializer



(FQN) .







, , . , :







kafka-console-consumer --topic  \
                          --bootstrap-server <broker-host:port> \
  --property print.key=true
  --property key.separator=":"
      
      





, .







Dump log







Kafka — - . kafka-dump-log



. , example



:







kafka-dump-log \
  --print-data-log \ 
  --files  ./var/lib/kafka/data/example-0/00000000000000000000.log 
      
      





  • --print-data-log



    .
  • --files



    . , .

    , kafka-dump-log



    --help



    .


:







Dumping ./var/lib/kafka/data/example-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1599775774460 size: 81 magic: 2 compresscodec: NONE crc: 3162584294 isvalid: true
| offset: 0 CreateTime: 1599775774460 keysize: 3 valuesize: 10 sequence: -1 headerKeys: [] key: 887 payload: -2.1510235
baseOffset: 1 lastOffset: 9 count: 9 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 81 CreateTime: 1599775774468 size: 252 magic: 2 compresscodec: NONE crc: 2796351311 isvalid: true
| offset: 1 CreateTime: 1599775774463 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 5 payload: 33.440664
| offset: 2 CreateTime: 1599775774463 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 60024247 payload: 9.1408728
| offset: 3 CreateTime: 1599775774463 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 1 payload: 45.348946
| offset: 4 CreateTime: 1599775774464 keysize: 6 valuesize: 10 sequence: -1 headerKeys: [] key: 241795 payload: -63.786373
| offset: 5 CreateTime: 1599775774465 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 53596698 payload: 69.431393
| offset: 6 CreateTime: 1599775774465 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 33219463 payload: 88.307875
| offset: 7 CreateTime: 1599775774466 keysize: 1 valuesize: 9 sequence: -1 headerKeys: [] key: 0 payload: 39.940350
| offset: 8 CreateTime: 1599775774467 keysize: 5 valuesize: 9 sequence: -1 headerKeys: [] key: 78496 payload: 74.180098
| offset: 9 CreateTime: 1599775774468 keysize: 8 valuesize: 9 sequence: -1 headerKeys: [] key: 89866187 payload: 79.459314
      
      





dump-log



, key



, payload



, . -, 10 . . — . dump-log , --key-decoder-class



--value-decoder-class



.







Delete records







Kafka . . , . , Kafka :







  • log.retention.hours



    . 168 ( ).
  • log.retention.bytes



    , .


log.retention.bytes



-1, . , . . , . , Kafka .







kafka-delete-records



:







  • --bootstrap-server



    — ;
  • --offset-json-file



    — JSON .


JSON:







{
   "partitions": [
                  {"topic": "example", "partition": 0, "offset": -1}
                 ],
                 "version":1
 }
      
      





, JSON . JSON. JSON :







  • topic — , ;
  • partition — , ;
  • offset — , , .


dump-log, JSON. , JSON .







offset



JSON? 10 , . . , , , 42. -1



, high watermark



, . high watermark



— ( ).







, :







kafka-delete-records --bootstrap-server <broker-host:port> \
                     --offset-json-file offsets.json
      
      





:







Executing records delete operation
Records delete operation completed:
partition: example-0  low_watermark: 10
      
      





, Kafka example-0



. low_watermark



, 10, , . example topic



10 , 0 9, . : KIP-107 KIP-204.







5.



Apache Kafka 0.11 . Kafka, - . , , . , .







, ? .







  1. -, compacted , , . compaction , .
  2. -, , , . . , .


. KIP, , :







  • .
  • APM (, Appdynamics Dynatrace) , .
  • , , .
  • , .


, , , Kafka.







Kafka







Java ProducerRecord



:







ProducerRecord<String, String> producerRecord = new ProducerRecord<>("bizops", "value"); 

producerRecord.headers().add("client-id", "2334".getBytes(StandardCharsets.UTF_8)); 
producerRecord.headers().add("data-file", "incoming-data.txt".getBytes(StandardCharsets.UTF_8)); 

// Details left out for clarity
producer.send(producerRecord);
      
      





  • ProducerRecord



    .
  • ProducerRecord.headers()



    .
  • .


? header String



. . .







ProducerRecord



, Iterable<Header>



. , Header



, Iterable



. , .







, , .













:







//Details left out for clarity

ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { 
    for (Header header : consumerRecord.headers()) {  
        System.out.println("header key " + header.key() + "header value " + new String(header.value())); 
    }
}
      
      





  • ConsumerRecords



    .
  • ConsumerRecord



    .
  • .


, ConsumerRecord.headers()



, . . , . KIP-431 ConsoleConsumer. Apache Kafka 2.7.0.







kafkacat. :







kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'
      
      





Apache Kafka :



Apache Kafka

Kafka

Kafka ? ,

Kafka

Apache Kafka 200 ?








All Articles