Saya menggunakan apache kafka untuk perpesanan. Saya telah menerapkan produsen dan konsumen di Jawa. Bagaimana kita bisa mendapatkan jumlah pesan dalam suatu topik?
sumber
Saya menggunakan apache kafka untuk perpesanan. Saya telah menerapkan produsen dan konsumen di Jawa. Bagaimana kita bisa mendapatkan jumlah pesan dalam suatu topik?
Satu-satunya cara yang terlintas dalam pikiran untuk hal ini dari sudut pandang konsumen adalah dengan benar-benar mengonsumsi pesan dan menghitungnya kemudian.
Pialang Kafka mengekspos counter JMX untuk jumlah pesan yang diterima sejak start-up tetapi Anda tidak dapat mengetahui berapa banyak dari mereka yang telah dibersihkan.
Dalam skenario yang paling umum, pesan di Kafka paling baik dilihat sebagai aliran tak terbatas dan mendapatkan nilai diskrit dari berapa banyak yang saat ini disimpan di disk tidak relevan. Lebih jauh lagi, hal-hal menjadi lebih rumit ketika berhadapan dengan sekelompok broker yang semuanya memiliki subset pesan dalam suatu topik.
Ini bukan java, tapi semoga bermanfaat
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
sumber
bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609
Dan kemudian perbedaannya mengembalikan pesan tertunda yang sebenarnya dalam topik? Apakah saya benar?Saya sebenarnya menggunakan ini untuk membandingkan POC saya. Item yang ingin Anda gunakan ConsumerOffsetChecker. Anda dapat menjalankannya menggunakan skrip bash seperti di bawah ini.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
Dan di bawah ini adalah hasilnya: Seperti yang Anda lihat di kotak merah, 999 adalah jumlah pesan yang saat ini ada dalam topik.
Pembaruan: ConsumerOffsetChecker tidak digunakan lagi sejak 0.10.0, Anda mungkin ingin mulai menggunakan ConsumerGroupCommand.
sumber
Terkadang yang menjadi perhatian adalah mengetahui jumlah pesan di setiap partisi, misalnya, saat menguji pemartisi kustom. Langkah-langkah selanjutnya telah diuji untuk bekerja dengan Kafka 0.10.2.1-2 dari Confluent 3.2. Diberikan topik Kafka,
kt
dan baris perintah berikut:$ kafka-run-class kafka.tools.GetOffsetShell \ --broker-list host01:9092,host02:9092,host02:9092 --topic kt
Itu mencetak keluaran sampel yang menunjukkan jumlah pesan di tiga partisi:
kt:2:6138 kt:1:6123 kt:0:6137
Jumlah baris bisa lebih banyak atau lebih sedikit tergantung pada jumlah partisi untuk topik tersebut.
sumber
Karena
ConsumerOffsetChecker
tidak lagi didukung, Anda dapat menggunakan perintah ini untuk memeriksa semua pesan dalam topik:bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \ --group my-group \ --bootstrap-server localhost:9092 \ --describe
Di mana
LAG
jumlah pesan di partisi topik:Anda juga dapat mencoba menggunakan kafkacat . Ini adalah proyek open source yang dapat membantu Anda membaca pesan dari topik dan partisi dan mencetaknya ke stdout. Berikut adalah contoh yang membaca 10 pesan terakhir dari
sample-kafka-topic
topik, lalu keluar:kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
sumber
Gunakan https://prestodb.io/docs/current/connector/kafka-tutorial.html
Mesin super SQL, disediakan oleh Facebook, yang terhubung ke beberapa sumber data (Cassandra, Kafka, JMX, Redis ...).
PrestoDB dijalankan sebagai server dengan pekerja opsional (ada mode mandiri tanpa pekerja tambahan), lalu Anda menggunakan JAR kecil yang dapat dieksekusi (disebut presto CLI) untuk membuat kueri.
Setelah Anda mengkonfigurasi server Presto dengan baik, Anda dapat menggunakan SQL tradisional:
SELECT count(*) FROM TOPIC_NAME;
sumber
Perintah Apache Kafka untuk mendapatkan pesan yang tidak tertangani di semua partisi topik:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group
Cetakan:
Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 none
Kolom 6 adalah pesan yang tidak ditangani. Tambahkan mereka seperti ini:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}'
awk membaca baris, melewati baris header dan menjumlahkan kolom ke-6 dan pada akhirnya mencetak jumlahnya.
Cetakan
5
sumber
Untuk mendapatkan semua pesan yang disimpan untuk topik, Anda dapat mencari konsumen ke awal dan akhir aliran untuk setiap partisi dan menjumlahkan hasilnya
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
sumber
Jalankan perintah berikut (dengan asumsi
kafka-console-consumer.sh
ada di jalur):kafka-console-consumer.sh --from-beginning \ --bootstrap-server yourbroker:9092 --property print.key=true \ --property print.value=false --property print.partition \ --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
sumber
--new-consumer
karena opsi itu tidak lagi tersedia (atau tampaknya diperlukan)Dengan menggunakan klien Java Kafka 2.11-1.0.0, Anda dapat melakukan hal berikut:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); } } }
Outputnya seperti ini:
offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13
sumber
seekToEnd(..)
danseekToBeginning(..)
metode yang mengubah statusconsumer
.Pada Kafka Manager versi terbaru, terdapat kolom berjudul Summed Recent Offsets .
sumber
Saya memiliki pertanyaan yang sama dan ini adalah bagaimana saya melakukannya, dari KafkaConsumer, di Kotlin:
Kode yang sangat kasar, karena saya baru saja membuat ini berfungsi, tetapi pada dasarnya Anda ingin mengurangi offset awal topik dari offset akhir dan ini akan menjadi jumlah pesan saat ini untuk topik tersebut.
Anda tidak bisa hanya mengandalkan offset akhir karena konfigurasi lain (kebijakan pembersihan, retensi-ms, dll.) Yang mungkin menyebabkan penghapusan pesan lama dari topik Anda. Offset hanya "bergerak" ke depan, jadi offset awal yang akan bergerak maju mendekati akhir offset (atau pada akhirnya ke nilai yang sama, jika topik tidak berisi pesan sekarang).
Pada dasarnya offset akhir mewakili jumlah keseluruhan pesan yang melewati topik itu, dan perbedaan di antara keduanya mewakili jumlah pesan yang dimuat dalam topik tersebut saat ini.
sumber
Kutipan dari dokumen Kafka
Penghentian di 0.9.0.0
Kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) tidak lagi digunakan. Untuk selanjutnya, gunakan kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) untuk fungsi ini.
Saya menjalankan broker Kafka dengan SSL diaktifkan untuk server dan klien. Di bawah perintah yang saya gunakan
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
di mana / tmp / ssl_config adalah seperti di bawah ini
sumber
Jika Anda memiliki akses ke antarmuka JMX server, offset awal & akhir ada di:
(Anda perlu mengganti
TOPICNAME
&PARTITIONNUMBER
). Ingatlah bahwa Anda perlu memeriksa setiap replika dari partisi yang diberikan, atau Anda perlu mencari tahu broker mana yang menjadi pemimpin untuk partisi tertentu (dan ini dapat berubah seiring waktu).Alternatifnya, Anda dapat menggunakan metode Konsumen Kafka
beginningOffsets
danendOffsets
.sumber
Saya belum mencoba ini sendiri, tetapi tampaknya masuk akal.
Anda juga dapat menggunakan
kafka.tools.ConsumerOffsetChecker
( sumber ).sumber
Cara termudah yang saya temukan adalah dengan menggunakan Kafdrop REST API
/topic/topicName
dan menentukan kunci:"Accept"
/ value:"application/json"
header untuk mendapatkan kembali respons JSON.Ini didokumentasikan di sini .
sumber
Anda dapat menggunakan kafkatool . Silakan periksa tautan ini -> http://www.kafkatool.com/download.html
sumber