Saya mengirim pesan String ke Kafka V. 0.8 dengan Java Producer API. Jika ukuran pesan sekitar 15 MB saya mendapatkan file MessageSizeTooLargeException
. Saya sudah mencoba menyetel message.max.bytes
ke 40 MB, tetapi saya masih mendapatkan pengecualian. Pesan kecil bekerja tanpa masalah.
(Pengecualian muncul di produsen, saya tidak memiliki konsumen dalam aplikasi ini.)
Apa yang dapat saya lakukan untuk menghilangkan pengecualian ini?
Contoh config produser saya
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Catatan eror:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
java
apache-kafka
Sonson123
sumber
sumber
Jawaban:
Anda perlu menyesuaikan tiga (atau empat) properti:
fetch.message.max.bytes
- ini akan menentukan ukuran terbesar dari sebuah pesan yang dapat diambil oleh konsumen.replica.fetch.max.bytes
- ini akan memungkinkan replika di pialang mengirim pesan di dalam klaster dan memastikan pesan direplikasi dengan benar. Jika ini terlalu kecil, maka pesan tidak akan pernah direplikasi, dan oleh karena itu, konsumen tidak akan pernah melihat pesan tersebut karena pesan tersebut tidak akan pernah dilakukan (direplikasi sepenuhnya).message.max.bytes
- ini adalah ukuran pesan terbesar yang dapat diterima oleh pialang dari produsen.max.message.bytes
- ini adalah ukuran pesan terbesar yang diizinkan broker untuk ditambahkan ke topik. Ukuran ini divalidasi sebelum kompresi. (Default untuk brokermessage.max.bytes
.)Saya menemukan cara yang sulit tentang nomor 2 - Anda tidak mendapatkan pengecualian, pesan, atau peringatan APAPUN dari Kafka, jadi pastikan untuk mempertimbangkan ini saat Anda mengirim pesan besar.
sumber
message.max.bytes
di kode sumber. Tetapi saya harus menetapkan nilai-nilai ini dalam konfigurasi server Kafkaconfig/server.properties
. Sekarang pesan yang lebih besar juga berfungsi :).fetch.message.max.bytes
memori untuk setiap partisi. Artinya jika Anda menggunakan jumlah yang besar untukfetch.message.max.bytes
digabungkan dengan jumlah partisi yang banyak, maka akan memakan banyak memori. Faktanya, karena proses replikasi antar broker juga merupakan konsumen khusus, hal ini juga akan menghabiskan memori broker tersebut.max.message.bytes
konfigurasi per-topik yang bisa lebih rendah dari brokermessage.max.bytes
./.*fetch.*bytes/
tampaknya tidak menjadi batasan yang ketat: "Ini bukan maksimum absolut, jika [...] lebih besar dari nilai ini, kumpulan catatan akan masih dikembalikan untuk memastikan bahwa kemajuan dapat dicapai. "Perubahan kecil diperlukan untuk Kafka 0.10 dan konsumen baru dibandingkan dengan jawaban laughing_man :
message.max.bytes
danreplica.fetch.max.bytes
.message.max.bytes
harus sama atau lebih kecil (*) darireplica.fetch.max.bytes
.max.request.size
untuk mengirim pesan yang lebih besar.max.partition.fetch.bytes
untuk menerima pesan yang lebih besar.(*) Baca komentar untuk mempelajari lebih lanjut tentang
message.max.bytes
<=replica.fetch.max.bytes
sumber
message.max.bytes
harus lebih kecil darireplica.fetch.max.bytes
?replica.fetch.max.bytes
harus benar-benar lebih besarmessage.max.bytes
. Seorang karyawan Confluent hari ini mengkonfirmasi apa yang saya duga: bahwa kedua kuantitas sebenarnya bisa sama.message.max.bytes<replica.fetch.max.bytes
ataumessage.max.bytes=replica.fetch.max.bytes
@Kostas?Anda perlu mengganti properti berikut:
Konfigurasi Broker ($ KAFKA_HOME / config / server.properties)
Konfigurasi Konsumen ($ KAFKA_HOME / config / consumer.properties)
Langkah ini tidak berhasil untuk saya. Saya menambahkannya ke aplikasi konsumen dan berfungsi dengan baik
Mulai ulang server.
lihat dokumentasi ini untuk info lebih lanjut: http://kafka.apache.org/08/configuration.html
sumber
Idenya adalah agar pesan yang dikirimkan dari Produsen Kafka ke Broker Kafka sama besarnya dan kemudian diterima oleh Konsumen Kafka yaitu
Produsen Kafka -> Kafka Broker -> Konsumen Kafka
Misalkan jika persyaratannya adalah mengirim pesan 15MB, maka Produsen , Pialang, dan Konsumen , ketiganya, harus sinkron.
Produser Kafka mengirimkan 15 MB -> Kafka Broker Memungkinkan / Menyimpan 15 MB -> Konsumen Kafka menerima 15 MB
Karena itu, pengaturannya harus:
a) di Broker:
b) pada Konsumen:
sumber
Satu hal penting yang perlu diingat bahwa
message.max.bytes
atribut harus sinkron denganfetch.message.max.bytes
properti konsumen . ukuran pengambilan harus setidaknya sebesar ukuran pesan maksimum jika tidak, mungkin terdapat situasi di mana produsen dapat mengirim pesan yang lebih besar daripada yang dapat dikonsumsi / diambil oleh konsumen. Mungkin ada baiknya untuk melihatnya.Versi Kafka mana yang Anda gunakan? Juga berikan beberapa detail lebih lanjut jejak yang Anda peroleh. apakah ada hal seperti ...
payload size of xxxx larger than 1000000
muncul di log?sumber
Jawaban dari @laughing_man cukup akurat. Tapi tetap, saya ingin memberikan rekomendasi yang saya pelajari dari ahli Kafka Stephane Maarek dari Quora.
Kafka tidak dimaksudkan untuk menangani pesan besar.
API Anda harus menggunakan penyimpanan cloud (Ex AWS S3), dan cukup dorong ke Kafka atau pialang pesan mana pun dengan referensi S3. Anda harus menemukan tempat untuk menyimpan data Anda, mungkin itu drive jaringan, mungkin itu apa pun, tetapi seharusnya tidak menjadi perantara pesan.
Sekarang, jika Anda tidak ingin menggunakan solusi di atas
Ukuran maksimal pesan adalah 1MB (pengaturan di broker Anda disebut
message.max.bytes
) Apache Kafka . Jika Anda benar-benar membutuhkannya, Anda dapat meningkatkan ukuran tersebut dan memastikan untuk meningkatkan buffer jaringan untuk produsen dan konsumen Anda.Dan jika Anda benar-benar peduli tentang pemisahan pesan Anda, pastikan setiap pesan yang dipisahkan memiliki kunci yang sama persis sehingga didorong ke partisi yang sama, dan konten pesan Anda harus melaporkan "id bagian" sehingga konsumen Anda dapat sepenuhnya merekonstruksi pesan tersebut. .
Anda juga dapat mempelajari kompresi, jika pesan Anda berbasis teks (gzip, snappy, kompresi lz4) yang dapat mengurangi ukuran data, tetapi tidak secara ajaib.
Sekali lagi, Anda harus menggunakan sistem eksternal untuk menyimpan data tersebut dan hanya mendorong referensi eksternal ke Kafka. Itu adalah arsitektur yang sangat umum, dan yang harus Anda ikuti dan diterima secara luas.
Ingatlah bahwa Kafka bekerja paling baik hanya jika pesan dalam jumlah besar tetapi tidak dalam ukuran.
Sumber: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
sumber
Untuk orang yang menggunakan landoop kafka: Anda dapat meneruskan nilai config di variabel lingkungan seperti:
Dan jika Anda menggunakan rdkafka, teruskan message.max.bytes di konfigurasi produser seperti:
Begitu pula bagi konsumen,
sumber