Apakah ada cara untuk menghapus semua data dari topik atau menghapus topik sebelum setiap proses?
Dapatkah saya mengubah file KafkaConfig.scala untuk mengubah logRetentionHours
properti? Adakah cara agar pesan dihapus segera setelah konsumen membacanya?
Saya menggunakan produsen untuk mengambil data dari suatu tempat dan mengirim data ke topik tertentu di mana konsumen mengkonsumsi, dapatkah saya menghapus semua data dari topik itu di setiap proses? Saya hanya ingin data baru setiap kali dalam topik. Apakah ada cara untuk memulai kembali topik?
apache-kafka
apache-zookeeper
TommyT
sumber
sumber
Jawaban:
Jangan kira itu didukung.Lihat masalah JIRA ini "Tambahkan dukungan penghapusan topik".Untuk menghapus secara manual:
log.dir
atribut di file konfigurasi kafka ) serta data penjaga kebun binatangUntuk topik tertentu apa yang dapat Anda lakukan adalah
/tmp/kafka-logs/MyTopic-0
tempat/tmp/kafka-logs
yang ditentukan olehlog.dir
atributIni adalah
NOT
pendekatan yang bagus dan direkomendasikan tetapi seharusnya berhasil. Di file konfigurasi broker Kafka,log.retention.hours.per.topic
atribut digunakan untuk menentukanThe number of hours to keep a log file before deleting it for some specific topic
Dari Dokumentasi Kafka :
Untuk menemukan awal offset untuk membaca di Kafka 0,8 contoh Konsumen Sederhana yang mereka katakan
Anda juga dapat menemukan kode contoh di sana untuk mengelola offset di ujung konsumen Anda.
sumber
brokers/topics/<topic_to_delete>
serta log untuk membuangnya.kafka-run-class.sh kafka.admin.DeleteTopicCommand
.kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
Seperti yang saya sebutkan di sini Bersihkan Antrian Kafka :
Diuji di Kafka 0.8.2, untuk contoh quick-start: Pertama, Tambahkan satu baris ke file server.properties di bawah folder config:
lalu, Anda dapat menjalankan perintah ini:
sumber
Diuji dengan kafka 0.10
Catatan: jika Anda menghapus folder topik di dalam kafka-logs tetapi tidak dari folder zookeeper-data, maka Anda akan melihat topik masih ada.
sumber
Sebagai solusi kotor, Anda dapat menyesuaikan pengaturan retensi waktu proses per topik, misalnya
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1
( retention.bytes = 0 mungkin juga berfungsi)Setelah beberapa saat, kafka harus mengosongkan ruang. Tidak yakin apakah ini memiliki implikasi dibandingkan dengan membuat ulang topik.
ps. Lebih baik kembalikan pengaturan retensi, setelah kafka selesai dengan pembersihan.
Anda juga dapat menggunakan
retention.ms
untuk menyimpan data historissumber
Di bawah ini adalah script untuk mengosongkan dan menghapus topik Kafka dengan asumsi localhost sebagai server zookeeper dan Kafka_Home diatur ke direktori install:
Skrip di bawah ini akan mengosongkan topik dengan menyetel waktu retensinya ke 1 detik dan kemudian menghapus konfigurasi:
Untuk sepenuhnya menghapus topik, Anda harus menghentikan broker kafka yang berlaku dan menghapus direktorinya dari direktori log kafka (default: / tmp / kafka-logs) dan kemudian menjalankan skrip ini untuk menghapus topik dari penjaga kebun binatang. Untuk memverifikasi bahwa itu telah dihapus dari zookeeper, output ls / broker / topik tidak lagi menyertakan topik:
sumber
grep "log.retention.check.interval" $Kafka_Home/config/server.properties
--add config
begitu--add-config
Kami mencoba cukup banyak apa yang dijelaskan oleh jawaban lain dengan tingkat keberhasilan yang moderat. Apa yang benar-benar berhasil bagi kami (Apache Kafka 0.8.1) adalah perintah kelas
sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181
sumber
Error: Could not find or load main class kafka.admin.DeleteTopicCommand
Untuk pengguna minuman
Jika Anda menggunakan
brew
seperti saya dan membuang banyak waktu untuk mencarikafka-logs
folder yang terkenal itu , jangan takut lagi. (dan tolong beri tahu saya jika itu berhasil untuk Anda dan beberapa versi Homebrew, Kafka dll yang berbeda :))Anda mungkin akan menemukannya di bawah:
Lokasi:
/usr/local/var/lib/kafka-logs
Bagaimana cara menemukan jalan itu
(ini juga berguna untuk pada dasarnya setiap aplikasi yang Anda instal melalui minuman)
1)
brew services list
2) Buka dan baca yang
plist
Anda temukan di atas3) Temukan baris yang menentukan
server.properties
lokasi, buka, dalam kasus saya:/usr/local/etc/kafka/server.properties
4) Cari
log.dirs
baris:5) Pergi ke lokasi itu dan hapus log untuk topik yang Anda inginkan
6) Mulai ulang Kafka dengan
brew services restart kafka
sumber
Semua data tentang topik dan partisinya disimpan di
tmp/kafka-logs/
. Apalagi mereka disimpan dalam sebuah formattopic-partionNumber
, jadi jika Anda ingin menghapus suatu topiknewTopic
, Anda dapat:rm -rf /tmp/kafka-logs/newTopic-*
sumber
log.retention.hours
dan menambahkanlog.retention.ms=1000
. Itu akan membuat rekor di Kafka Topic hanya untuk satu detik.log.retention.hours
ke angka yang Anda inginkan.sumber
Pada versi kafka 2.3.0, ada cara alternatif untuk menghapus lunak Kafka (pendekatan lama tidak digunakan lagi).
Perbarui retention.ms menjadi 1 detik (1000ms) lalu setel lagi setelah satu menit, ke pengaturan default yaitu 7 hari (168 jam, 604.800.000 dalam ms)
Penghapusan lunak: - (rentention.ms = 1000) (menggunakan kafka-configs.sh)
Menyetel ke default: - 7 hari (168 jam, retention.ms = 604800000)
sumber
Dalam menghapus topik secara manual dari cluster kafka, Anda mungkin saja memeriksanya https://github.com/darrenfu/bigdata/issues/6 Langkah penting yang sering terlewatkan dalam sebagian besar solusi adalah menghapus
/config/topics/<topic_name>
in ZK.sumber
Saya menggunakan skrip ini:
sumber
Saya menggunakan utilitas di bawah ini untuk membersihkan setelah uji integrasi saya dijalankan.
Ini menggunakan
AdminZkClient
api terbaru . Api lama sudah tidak digunakan lagi.Ada opsi hapus topik. Tapi, itu menandai topik untuk dihapus. Zookeeper kemudian menghapus topik tersebut. Karena ini bisa sangat lama, saya lebih suka pendekatan retention.ms
sumber