Apakah ada cara untuk membersihkan topik dalam kafka?
Saya mendorong pesan yang terlalu besar ke topik pesan kafka di komputer lokal saya, sekarang saya mendapat pesan kesalahan:
kafka.common.InvalidMessageSizeException: invalid message size
Menambah fetch.size
itu tidak ideal di sini, karena saya sebenarnya tidak ingin menerima pesan sebesar itu.
apache-kafka
purge
Peter Klipfel
sumber
sumber
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
--delete-config retention.ms
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
Ini juga memungkinkan Anda untuk memeriksa periode penyimpanan saat ini, misalnya kafka-configs --zookeeper <zkhost>: 2181 - deskripsikan --entri-jenis topik --entitas-nama <nama topik>Untuk membersihkan antrian, Anda dapat menghapus topik:
lalu buat kembali:
sumber
delete.topic.enable=true
dalam fileconfig/server.properties
, sebagaimana peringatan yang dicetak oleh perintah yang disebutkan mengatakanNote: This will have no impact if delete.topic.enable is not set to true.
Berikut langkah-langkah yang saya ikuti untuk menghapus topik bernama
MyTopic
:rm -rf /tmp/kafka-logs/MyTopic-0
. Ulangi untuk partisi lain, dan semua replikazkCli.sh
lalurmr /brokers/MyTopic
Jika Anda merindukan Anda langkah 3, maka Apache Kafka akan terus melaporkan topik tersebut sebagai hadiah (misalnya saat Anda menjalankan
kafka-list-topic.sh
).Diuji dengan Apache Kafka 0.8.0.
sumber
./zookeeper-shell.sh localhost:2181
dan./kafka-topics.sh --list --zookeeper localhost:2181
zookeeper-client
sebagai gantinyazkCli.sh
(dicoba di Cloudera CDH5)Meskipun jawaban yang diterima benar, metode itu sudah usang. Konfigurasi topik sekarang harus dilakukan melalui
kafka-configs
.Konfigurasi yang diatur melalui metode ini dapat ditampilkan dengan perintah
sumber
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
Diuji dalam Kafka 0.8.2, untuk contoh mulai cepat: Pertama, Tambahkan satu baris ke file server.properties di bawah folder config:
lalu, Anda dapat menjalankan perintah ini:
sumber
Dari kafka 1.1
Bersihkan topik
tunggu 1 menit, agar aman bahwa kafka bersihkan topik hapus konfigurasi, dan kemudian pergi ke nilai default
sumber
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
kafka tidak memiliki metode langsung untuk membersihkan / membersihkan topik (Antrian), tetapi dapat melakukan ini dengan menghapus topik itu dan membuatnya kembali.
pertama-tama pastikan file sever.properties telah dan jika tidak ditambahkan
delete.topic.enable=true
lalu, Hapus topik
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
lalu buat lagi.
sumber
Kadang-kadang, jika Anda memiliki gugus jenuh (terlalu banyak partisi, atau menggunakan data topik terenkripsi, atau menggunakan SSL, atau pengontrol ada di node yang buruk, atau koneksi tidak stabil, akan butuh waktu lama untuk membersihkan topik tersebut .
Saya mengikuti langkah-langkah ini, terutama jika Anda menggunakan Avro.
1: Jalankan dengan alat kafka:
2: Jalankan pada simpul registri Skema:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Atur retensi topik kembali ke pengaturan awal, setelah topik kosong.
Semoga ini bisa membantu seseorang, karena tidak mudah diiklankan.
sumber
kafka-avro-console-consumer
tidak perluUPDATE: Jawaban ini relevan untuk Kafka 0.6. Untuk Kafka 0.8 dan yang lebih baru lihat jawaban oleh @Patrick.
Ya, hentikan kafka dan hapus semua file secara manual dari subdirektori yang sesuai (mudah ditemukan di direktori data kafka). Setelah kafka restart topiknya akan kosong.
sumber
Pendekatan paling sederhana adalah mengatur tanggal file log individual menjadi lebih lama dari periode penyimpanan. Kemudian broker harus membersihkannya dan menghapusnya untuk Anda dalam beberapa detik. Ini menawarkan beberapa keuntungan:
Dalam pengalaman saya dengan Kafka 0.7.x, menghapus file log dan memulai kembali broker dapat menyebabkan pengecualian offset tidak valid untuk konsumen tertentu. Ini akan terjadi karena broker me-restart offset di nol (tanpa adanya file log yang ada), dan seorang konsumen yang sebelumnya mengkonsumsi dari topik akan menyambung kembali untuk meminta offset spesifik [sekali valid]. Jika offset ini jatuh di luar batas log topik baru, maka tidak ada salahnya dan konsumen kembali pada awal atau akhir. Tetapi, jika offset berada dalam batas log topik baru, broker mencoba untuk mengambil set pesan tetapi gagal karena offset tidak selaras dengan pesan yang sebenarnya.
Ini bisa dikurangi dengan juga membersihkan offset konsumen di zookeeper untuk topik itu. Tetapi jika Anda tidak memerlukan topik perawan dan hanya ingin menghapus konten yang ada, maka cukup 'menyentuh'-log beberapa topik jauh lebih mudah dan lebih dapat diandalkan, daripada menghentikan broker, menghapus log topik, dan membersihkan node zookeeper tertentu .
sumber
Saran Thomas sangat bagus tetapi sayangnya
zkCli
di Zookeeper versi lama (misalnya 3.3.6) tampaknya tidak mendukungrmr
. Misalnya membandingkan implementasi command line di Zookeeper modern dengan versi 3.3 .Jika Anda dihadapkan dengan Zookeeper versi lama, salah satu solusinya adalah menggunakan pustaka klien seperti zc.zk untuk Python. Untuk orang yang tidak terbiasa dengan Python, Anda perlu menginstalnya menggunakan pip atau easy_install . Kemudian mulai shell Python (
python
) dan Anda dapat melakukannya:atau bahkan
jika Anda ingin menghapus semua topik dari Kafka.
sumber
Untuk membersihkan semua pesan dari topik tertentu menggunakan grup aplikasi Anda (GroupName harus sama dengan nama grup aplikasi kafka).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
sumber
Mengikuti jawaban appleyard @steven, saya menjalankan perintah berikut pada Kafka 2.2.0 dan mereka bekerja untuk saya.
sumber
Banyak jawaban bagus di sini tetapi di antara mereka, saya tidak menemukan satu tentang buruh pelabuhan. Saya menghabiskan beberapa waktu untuk mencari tahu bahwa menggunakan wadah broker adalah salah untuk kasus ini (jelas !!!)
dan saya seharusnya menggunakan
zookeeper:2181
bukan--zookeeper localhost:2181
sebagai file menulis sayaperintah yang benar adalah
Semoga ini akan menghemat waktu seseorang.
Perlu diketahui juga bahwa pesan tidak akan segera dihapus dan itu akan terjadi ketika segmen log akan ditutup.
sumber
localhost:2181
... Misalnya Anda salah memahami fitur jaringan Docker. Selain itu, tidak semua wadah Zookeeper memilikikafka-topics
, jadi yang terbaik adalah tidak menggunakannya seperti itu. Instalasi Kafka terbaru memungkinkan untuk--bootstrap-servers
mengubah topik alih-alih--zookeeper
you can use
--zookeeper zookeeper: 2181` dari wadah Kafka adalah poin saya. Atau bahkan mengeluarkan garis Zookeeper dari file server.propertiesTidak dapat menambahkan sebagai komentar karena ukuran: Tidak yakin apakah ini benar, selain memperbarui retention.ms dan retention.bytes, tetapi saya perhatikan kebijakan pembersihan topik harus "delete" (default), jika "compact", itu akan tahan pesan lebih lama, yaitu, jika "kompak", Anda harus menentukan delete.retention.ms juga.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Juga harus memantau paling awal / offset terbaru harus sama untuk mengkonfirmasi ini berhasil terjadi, juga dapat memeriksa du-h / tmp / kafka-log / test-topik-3-100- *
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
Masalah lainnya adalah, Anda harus mendapatkan konfigurasi saat ini terlebih dahulu sehingga Anda ingat untuk mengembalikan setelah penghapusan berhasil:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
sumber
Pendekatan lain, agak manual, untuk membersihkan suatu topik adalah:
di broker:
sudo service kafka stop
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
di penjaga kebun binatang:
sudo /usr/lib/zookeeper/bin/zkCli.sh
rmr /brokers/topic/<some_topic_name>
di broker lagi:
sudo service kafka start
sumber
Ini harus memberi
retention.ms
dikonfigurasi. Kemudian Anda dapat menggunakan perintah alter di atas untuk mengubah ke 1detik (dan kemudian kembali ke default).sumber
Dari Jawa, gunakan yang baru dan
AdminZkClient
bukan yang sudah usangAdminUtils
:sumber
AdminClient
atauKafkaAdminClient