Java, Cara mendapatkan jumlah pesan dalam sebuah topik di apache kafka

93

Saya menggunakan apache kafka untuk perpesanan. Saya telah menerapkan produsen dan konsumen di Jawa. Bagaimana kita bisa mendapatkan jumlah pesan dalam suatu topik?

Chetan
sumber

Jawaban:

34

Satu-satunya cara yang terlintas dalam pikiran untuk hal ini dari sudut pandang konsumen adalah dengan benar-benar mengonsumsi pesan dan menghitungnya kemudian.

Pialang Kafka mengekspos counter JMX untuk jumlah pesan yang diterima sejak start-up tetapi Anda tidak dapat mengetahui berapa banyak dari mereka yang telah dibersihkan.

Dalam skenario yang paling umum, pesan di Kafka paling baik dilihat sebagai aliran tak terbatas dan mendapatkan nilai diskrit dari berapa banyak yang saat ini disimpan di disk tidak relevan. Lebih jauh lagi, hal-hal menjadi lebih rumit ketika berhadapan dengan sekelompok broker yang semuanya memiliki subset pesan dalam suatu topik.

Lundahl
sumber
Lihat jawaban saya stackoverflow.com/a/47313863/2017567 . Klien Java Kafka memungkinkan untuk mendapatkan informasi itu.
Christophe Quintard
97

Ini bukan java, tapi semoga bermanfaat

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'
ssemichev.dll
sumber
12
Bukankah seharusnya ini merupakan perbedaan antara offset paling awal dan terbaru per jumlah partisi? bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609 Dan kemudian perbedaannya mengembalikan pesan tertunda yang sebenarnya dalam topik? Apakah saya benar?
Kisna
1
Ya itu benar. Anda harus menghitung perbedaan jika offset paling awal tidak sama dengan nol.
ssemichev
Itulah yang saya pikir :).
Kisna
1
Apakah ada cara untuk menggunakannya sebagai API dan di dalam kode (JAVA, Scala atau Python)?
salvob
Ini adalah campuran kode saya dan kode dari Kafka. Semoga bermanfaat. Saya menggunakannya untuk streaming Spark - Integrasi Kafka KafkaClient gist.github.com/ssemichev/c2d94dce7ad65339c9637e1b461f86cf KafkaCluster gist.github.com/ssemichev/fa3605c7b10cb6c7b9c8 Aprab54ffbc5865
ssemich
17

Saya sebenarnya menggunakan ini untuk membandingkan POC saya. Item yang ingin Anda gunakan ConsumerOffsetChecker. Anda dapat menjalankannya menggunakan skrip bash seperti di bawah ini.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

Dan di bawah ini adalah hasilnya: masukkan deskripsi gambar di sini Seperti yang Anda lihat di kotak merah, 999 adalah jumlah pesan yang saat ini ada dalam topik.

Pembaruan: ConsumerOffsetChecker tidak digunakan lagi sejak 0.10.0, Anda mungkin ingin mulai menggunakan ConsumerGroupCommand.

Rudy
sumber
1
Harap dicatat bahwa ConsumerOffsetChecker tidak digunakan lagi dan akan dihapus dalam rilis setelah 0.9.0. Gunakan ConsumerGroupCommand sebagai gantinya. (kafka.tools.ConsumerOffsetChecker $)
Szymon Sadło
1
Ya, itulah yang saya katakan.
Rudy
Kalimat terakhir Anda tidak akurat. Perintah di atas masih berfungsi di 0.10.0.1 dan peringatannya sama dengan komentar saya sebelumnya.
Szymon Sadło
14

Terkadang yang menjadi perhatian adalah mengetahui jumlah pesan di setiap partisi, misalnya, saat menguji pemartisi kustom. Langkah-langkah selanjutnya telah diuji untuk bekerja dengan Kafka 0.10.2.1-2 dari Confluent 3.2. Diberikan topik Kafka, ktdan baris perintah berikut:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

Itu mencetak keluaran sampel yang menunjukkan jumlah pesan di tiga partisi:

kt:2:6138
kt:1:6123
kt:0:6137

Jumlah baris bisa lebih banyak atau lebih sedikit tergantung pada jumlah partisi untuk topik tersebut.

pdp
sumber
4
Jika pemadatan log diaktifkan, menjumlahkan offset partisi mungkin tidak memberikan jumlah pesan yang tepat dalam topik tersebut.
14

Karena ConsumerOffsetCheckertidak lagi didukung, Anda dapat menggunakan perintah ini untuk memeriksa semua pesan dalam topik:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

Di mana LAGjumlah pesan di partisi topik:

masukkan deskripsi gambar di sini

Anda juga dapat mencoba menggunakan kafkacat . Ini adalah proyek open source yang dapat membantu Anda membaca pesan dari topik dan partisi dan mencetaknya ke stdout. Berikut adalah contoh yang membaca 10 pesan terakhir dari sample-kafka-topictopik, lalu keluar:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
ruslangm
sumber
11

Gunakan https://prestodb.io/docs/current/connector/kafka-tutorial.html

Mesin super SQL, disediakan oleh Facebook, yang terhubung ke beberapa sumber data (Cassandra, Kafka, JMX, Redis ...).

PrestoDB dijalankan sebagai server dengan pekerja opsional (ada mode mandiri tanpa pekerja tambahan), lalu Anda menggunakan JAR kecil yang dapat dieksekusi (disebut presto CLI) untuk membuat kueri.

Setelah Anda mengkonfigurasi server Presto dengan baik, Anda dapat menggunakan SQL tradisional:

SELECT count(*) FROM TOPIC_NAME;
Thomas Decaux
sumber
alat ini bagus, tetapi jika tidak berfungsi jika topik Anda memiliki lebih dari 2 titik.
armandfp
7

Perintah Apache Kafka untuk mendapatkan pesan yang tidak tertangani di semua partisi topik:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Cetakan:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

Kolom 6 adalah pesan yang tidak ditangani. Tambahkan mereka seperti ini:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

awk membaca baris, melewati baris header dan menjumlahkan kolom ke-6 dan pada akhirnya mencetak jumlahnya.

Cetakan

5
Eric Leschinski
sumber
5

Untuk mendapatkan semua pesan yang disimpan untuk topik, Anda dapat mencari konsumen ke awal dan akhir aliran untuk setiap partisi dan menjumlahkan hasilnya

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
AutomatedMike
sumber
1
btw, jika Anda mengaktifkan pemadatan, mungkin ada celah di streaming sehingga jumlah pesan sebenarnya mungkin lebih rendah dari total yang dihitung di sini. Untuk mendapatkan jumlah yang akurat, Anda harus memutar ulang pesan dan menghitungnya.
AutomatedMike
5

Jalankan perintah berikut (dengan asumsi kafka-console-consumer.shada di jalur):

kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Borislav Markov
sumber
Catatan: Saya menghapus --new-consumerkarena opsi itu tidak lagi tersedia (atau tampaknya diperlukan)
StephenBoesch
3

Dengan menggunakan klien Java Kafka 2.11-1.0.0, Anda dapat melakukan hal berikut:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Outputnya seperti ini:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
Christophe Quintard
sumber
Saya lebih suka Anda menjawab dibandingkan dengan jawaban @AutomatedMike karena jawaban Anda tidak mengacaukan seekToEnd(..)dan seekToBeginning(..)metode yang mengubah status consumer.
adaslaw
2

Pada Kafka Manager versi terbaru, terdapat kolom berjudul Summed Recent Offsets .

masukkan deskripsi gambar di sini

f01
sumber
2

Saya memiliki pertanyaan yang sama dan ini adalah bagaimana saya melakukannya, dari KafkaConsumer, di Kotlin:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()

Kode yang sangat kasar, karena saya baru saja membuat ini berfungsi, tetapi pada dasarnya Anda ingin mengurangi offset awal topik dari offset akhir dan ini akan menjadi jumlah pesan saat ini untuk topik tersebut.

Anda tidak bisa hanya mengandalkan offset akhir karena konfigurasi lain (kebijakan pembersihan, retensi-ms, dll.) Yang mungkin menyebabkan penghapusan pesan lama dari topik Anda. Offset hanya "bergerak" ke depan, jadi offset awal yang akan bergerak maju mendekati akhir offset (atau pada akhirnya ke nilai yang sama, jika topik tidak berisi pesan sekarang).

Pada dasarnya offset akhir mewakili jumlah keseluruhan pesan yang melewati topik itu, dan perbedaan di antara keduanya mewakili jumlah pesan yang dimuat dalam topik tersebut saat ini.

LeYAUable
sumber
0

Kutipan dari dokumen Kafka

Penghentian di 0.9.0.0

Kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) tidak lagi digunakan. Untuk selanjutnya, gunakan kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) untuk fungsi ini.

Saya menjalankan broker Kafka dengan SSL diaktifkan untuk server dan klien. Di bawah perintah yang saya gunakan

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

di mana / tmp / ssl_config adalah seperti di bawah ini

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
SR Bandi
sumber
0

Jika Anda memiliki akses ke antarmuka JMX server, offset awal & akhir ada di:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(Anda perlu mengganti TOPICNAME& PARTITIONNUMBER). Ingatlah bahwa Anda perlu memeriksa setiap replika dari partisi yang diberikan, atau Anda perlu mencari tahu broker mana yang menjadi pemimpin untuk partisi tertentu (dan ini dapat berubah seiring waktu).

Alternatifnya, Anda dapat menggunakan metode Konsumen KafkabeginningOffsets dan endOffsets.

Adam Kotwasinski
sumber
-1

Saya belum mencoba ini sendiri, tetapi tampaknya masuk akal.

Anda juga dapat menggunakan kafka.tools.ConsumerOffsetChecker( sumber ).

hba
sumber
-1

Cara termudah yang saya temukan adalah dengan menggunakan Kafdrop REST API /topic/topicNamedan menentukan kunci: "Accept"/ value: "application/json"header untuk mendapatkan kembali respons JSON.

Ini didokumentasikan di sini .

Sebastian Sturza
sumber
-1

Anda dapat menggunakan kafkatool . Silakan periksa tautan ini -> http://www.kafkatool.com/download.html

Kafka Tool adalah aplikasi GUI untuk mengelola dan menggunakan cluster Apache Kafka. Ini menyediakan UI intuitif yang memungkinkan seseorang untuk dengan cepat melihat objek dalam cluster Kafka serta pesan yang disimpan dalam topik cluster.masukkan deskripsi gambar di sini

Vinto
sumber