Bagaimana saya bisa mengirim pesan besar dengan Kafka (lebih dari 15MB)?

118

Saya mengirim pesan String ke Kafka V. 0.8 dengan Java Producer API. Jika ukuran pesan sekitar 15 MB saya mendapatkan file MessageSizeTooLargeException. Saya sudah mencoba menyetel message.max.byteske 40 MB, tetapi saya masih mendapatkan pengecualian. Pesan kecil bekerja tanpa masalah.

(Pengecualian muncul di produsen, saya tidak memiliki konsumen dalam aplikasi ini.)

Apa yang dapat saya lakukan untuk menghilangkan pengecualian ini?

Contoh config produser saya

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Catatan eror:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Sonson123
sumber
5
Naluri pertama saya adalah meminta Anda untuk membagi pesan besar ini menjadi beberapa pesan yang lebih kecil: - / Dugaan saya adalah bahwa ini tidak mungkin karena beberapa alasan tetapi Anda mungkin ingin mempertimbangkannya kembali: Pesan besar biasanya berarti ada cacat desain suatu tempat yang harus diperbaiki.
Aaron Digulla
1
Terima kasih, tapi itu akan membuat logika saya jauh lebih kompleks. Mengapa menggunakan Kafka untuk pesan sekitar 15MB merupakan ide yang buruk ? Apakah 1 MB adalah batas ukuran pesan maksimum yang dapat digunakan? Saya tidak menemukan banyak hal tentang batas ukuran pesan di dokumentasi Kafka.
Sonson123
2
Ini sama sekali tidak terkait dengan Kafka atau sistem pemrosesan pesan lainnya. Alasan saya: Jika ada yang tidak beres dengan file 15MB Anda, maka membersihkan kekacauan setelahnya sangat mahal. Itulah mengapa saya biasanya membagi file besar menjadi banyak pekerjaan yang lebih kecil (yang biasanya dapat dijalankan secara paralel juga).
Aaron Digulla
apakah kamu sudah menggunakan kompresi apapun? bisakah Anda membagikan beberapa detail lebih lanjut, agak sulit untuk menebak sesuatu hanya dari satu kata
pengguna2720864

Jawaban:

181

Anda perlu menyesuaikan tiga (atau empat) properti:

  • Sisi konsumen:fetch.message.max.bytes - ini akan menentukan ukuran terbesar dari sebuah pesan yang dapat diambil oleh konsumen.
  • Sisi pialang: replica.fetch.max.bytes- ini akan memungkinkan replika di pialang mengirim pesan di dalam klaster dan memastikan pesan direplikasi dengan benar. Jika ini terlalu kecil, maka pesan tidak akan pernah direplikasi, dan oleh karena itu, konsumen tidak akan pernah melihat pesan tersebut karena pesan tersebut tidak akan pernah dilakukan (direplikasi sepenuhnya).
  • Sisi broker: message.max.bytes - ini adalah ukuran pesan terbesar yang dapat diterima oleh pialang dari produsen.
  • Sisi broker (per topik): max.message.bytes- ini adalah ukuran pesan terbesar yang diizinkan broker untuk ditambahkan ke topik. Ukuran ini divalidasi sebelum kompresi. (Default untuk broker message.max.bytes.)

Saya menemukan cara yang sulit tentang nomor 2 - Anda tidak mendapatkan pengecualian, pesan, atau peringatan APAPUN dari Kafka, jadi pastikan untuk mempertimbangkan ini saat Anda mengirim pesan besar.

laughing_man
sumber
3
Oke, Anda dan pengguna2720864 benar. Saya hanya mengatur message.max.bytesdi kode sumber. Tetapi saya harus menetapkan nilai-nilai ini dalam konfigurasi server Kafka config/server.properties. Sekarang pesan yang lebih besar juga berfungsi :).
Sonson123
3
Adakah kerugian yang diketahui jika menetapkan nilai-nilai ini terlalu tinggi?
Ivan Balashov
7
Iya. Di sisi konsumen, Anda mengalokasikan fetch.message.max.bytesmemori untuk setiap partisi. Artinya jika Anda menggunakan jumlah yang besar untuk fetch.message.max.bytesdigabungkan dengan jumlah partisi yang banyak, maka akan memakan banyak memori. Faktanya, karena proses replikasi antar broker juga merupakan konsumen khusus, hal ini juga akan menghabiskan memori broker tersebut.
laughing_man
3
Perhatikan juga ada max.message.byteskonfigurasi per-topik yang bisa lebih rendah dari broker message.max.bytes.
Peter Davis
1
Menurut dokumen resmi, parameter di sisi konsumen dan yang terkait dengan replikasi antar pialang /.*fetch.*bytes/tampaknya tidak menjadi batasan yang ketat: "Ini bukan maksimum absolut, jika [...] lebih besar dari nilai ini, kumpulan catatan akan masih dikembalikan untuk memastikan bahwa kemajuan dapat dicapai. "
Bluu
56

Perubahan kecil diperlukan untuk Kafka 0.10 dan konsumen baru dibandingkan dengan jawaban laughing_man :

  • Broker: Tidak ada perubahan, Anda masih perlu menambah properti message.max.bytesdan replica.fetch.max.bytes. message.max.bytesharus sama atau lebih kecil (*) darireplica.fetch.max.bytes .
  • Produser: Tingkatkan max.request.size untuk mengirim pesan yang lebih besar.
  • Konsumen: Tingkatkan max.partition.fetch.bytesuntuk menerima pesan yang lebih besar.

(*) Baca komentar untuk mempelajari lebih lanjut tentang message.max.bytes<=replica.fetch.max.bytes

Sascha Vetter
sumber
2
Tahukah Anda mengapa message.max.bytesharus lebih kecil dari replica.fetch.max.bytes?
Kostas
2
" replica.fetch.max.bytes (default: 1MB) - Ukuran maksimum data yang dapat direplikasi oleh broker. Ini harus lebih besar dari message.max.bytes , atau broker akan menerima pesan dan gagal untuk mereplikasi mereka. potensi kehilangan data. " Sumber: handling-large-messages-kafka
Sascha Vetter
2
Terima kasih telah menghubungi saya kembali dengan tautan. Ini sepertinya menggemakan apa yang disarankan panduan Cloudera juga. Namun keduanya salah - perhatikan bahwa mereka tidak menawarkan alasan teknis mengapa replica.fetch.max.bytes harus benar-benar lebih besar message.max.bytes. Seorang karyawan Confluent hari ini mengkonfirmasi apa yang saya duga: bahwa kedua kuantitas sebenarnya bisa sama.
Kostas
2
Apakah ada pembaruan tentang message.max.bytes<replica.fetch.max.bytesatau message.max.bytes=replica.fetch.max.bytes@Kostas?
Sascha Vetter
2
Ya, mereka bisa setara: mail-archive.com/[email protected]/msg25494.html (Ismael bekerja untuk Confluent)
Kostas
13

Anda perlu mengganti properti berikut:

Konfigurasi Broker ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Konfigurasi Konsumen ($ KAFKA_HOME / config / consumer.properties)
Langkah ini tidak berhasil untuk saya. Saya menambahkannya ke aplikasi konsumen dan berfungsi dengan baik

  • fetch.message.max.bytes

Mulai ulang server.

lihat dokumentasi ini untuk info lebih lanjut: http://kafka.apache.org/08/configuration.html

pengguna2550587
sumber
1
untuk konsumen baris perintah, saya perlu menggunakan bendera --fetch-size = <bytes>. Tampaknya file consumer.properties tidak terbaca (kafka 0.8.1). Saya juga merekomendasikan untuk mengaktifkan kompresi dari sisi produsen menggunakan opsi compression.codec.
Ziggy Eunicien
Komentar Ziggy berhasil untuk saya kafka 0.8.1.1. Terima kasih!
Yakobus
mungkinkah fetch.message.max.bytes diganti dengan max.partition.fetch.bytes di ConsumerConfig?
s_bei
12

Idenya adalah agar pesan yang dikirimkan dari Produsen Kafka ke Broker Kafka sama besarnya dan kemudian diterima oleh Konsumen Kafka yaitu

Produsen Kafka -> Kafka Broker -> Konsumen Kafka

Misalkan jika persyaratannya adalah mengirim pesan 15MB, maka Produsen , Pialang, dan Konsumen , ketiganya, harus sinkron.

Produser Kafka mengirimkan 15 MB -> Kafka Broker Memungkinkan / Menyimpan 15 MB -> Konsumen Kafka menerima 15 MB

Karena itu, pengaturannya harus:

a) di Broker:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) pada Konsumen:

fetch.message.max.bytes=15728640
Ravi
sumber
2
mungkinkah fetch.message.max.bytes diganti dengan max.partition.fetch.bytes di ConsumerConfig?
s_bei
7

Satu hal penting yang perlu diingat bahwa message.max.bytesatribut harus sinkron dengan fetch.message.max.bytesproperti konsumen . ukuran pengambilan harus setidaknya sebesar ukuran pesan maksimum jika tidak, mungkin terdapat situasi di mana produsen dapat mengirim pesan yang lebih besar daripada yang dapat dikonsumsi / diambil oleh konsumen. Mungkin ada baiknya untuk melihatnya.
Versi Kafka mana yang Anda gunakan? Juga berikan beberapa detail lebih lanjut jejak yang Anda peroleh. apakah ada hal seperti ... payload size of xxxx larger than 1000000muncul di log?

pengguna2720864
sumber
1
Saya telah memperbarui pertanyaan saya dengan informasi lebih lanjut: Kafka Versi 2.8.0-0.8.0; sekarang saya hanya butuh produser.
Sonson123
6

Jawaban dari @laughing_man cukup akurat. Tapi tetap, saya ingin memberikan rekomendasi yang saya pelajari dari ahli Kafka Stephane Maarek dari Quora.

Kafka tidak dimaksudkan untuk menangani pesan besar.

API Anda harus menggunakan penyimpanan cloud (Ex AWS S3), dan cukup dorong ke Kafka atau pialang pesan mana pun dengan referensi S3. Anda harus menemukan tempat untuk menyimpan data Anda, mungkin itu drive jaringan, mungkin itu apa pun, tetapi seharusnya tidak menjadi perantara pesan.

Sekarang, jika Anda tidak ingin menggunakan solusi di atas

Ukuran maksimal pesan adalah 1MB (pengaturan di broker Anda disebut message.max.bytes) Apache Kafka . Jika Anda benar-benar membutuhkannya, Anda dapat meningkatkan ukuran tersebut dan memastikan untuk meningkatkan buffer jaringan untuk produsen dan konsumen Anda.

Dan jika Anda benar-benar peduli tentang pemisahan pesan Anda, pastikan setiap pesan yang dipisahkan memiliki kunci yang sama persis sehingga didorong ke partisi yang sama, dan konten pesan Anda harus melaporkan "id bagian" sehingga konsumen Anda dapat sepenuhnya merekonstruksi pesan tersebut. .

Anda juga dapat mempelajari kompresi, jika pesan Anda berbasis teks (gzip, snappy, kompresi lz4) yang dapat mengurangi ukuran data, tetapi tidak secara ajaib.

Sekali lagi, Anda harus menggunakan sistem eksternal untuk menyimpan data tersebut dan hanya mendorong referensi eksternal ke Kafka. Itu adalah arsitektur yang sangat umum, dan yang harus Anda ikuti dan diterima secara luas.

Ingatlah bahwa Kafka bekerja paling baik hanya jika pesan dalam jumlah besar tetapi tidak dalam ukuran.

Sumber: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Bhanu Hoysala
sumber
4
Anda mungkin ingin mencatat bahwa rekomendasi "Anda" adalah salinan hampir kata demi kata dari rekomendasi Quora Stéphane Maarek di quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
Mike
Kafka bekerja dengan pesan besar, sama sekali tidak ada masalah. Halaman intro di beranda Kafka bahkan merujuknya sebagai sistem penyimpanan.
calloc_org
3

Untuk orang yang menggunakan landoop kafka: Anda dapat meneruskan nilai config di variabel lingkungan seperti:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

Dan jika Anda menggunakan rdkafka, teruskan message.max.bytes di konfigurasi produser seperti:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Begitu pula bagi konsumen,

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
pelapor
sumber