Saya menggunakan Confluent.Kafka .NET klien versi 1.3.0. Saya mengikuti dokumen :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
Masalahnya adalah bahwa bahkan jika saya berkomentar keluar baris consumer.StoreOffset(consumerResult);
, saya terus mendapatkan pesan yang tidak dikonsumsi berikutnya saat saya Mengkonsumsinya berikutnya , yaitu offset terus meningkat yang tampaknya tidak seperti yang diklaim oleh dokumentasi, yaitu setidaknya satu pengiriman .
Bahkan jika saya mengatur EnableAutoCommit = false
dan menghapus 'EnableAutoOffsetStore = false' dari konfigurasi, dan ganti consumer.StoreOffset(consumerResult)
dengan consumer.Commit()
, saya masih melihat perilaku yang sama, yaitu bahkan jika saya berkomentar Commit
, saya masih terus mendapatkan pesan yang tidak digunakan berikutnya.
Saya merasa seperti kehilangan sesuatu yang mendasar di sini, tetapi tidak dapat menemukan apa. Bantuan apa pun dihargai!
EnableAutoCommit
yang diatur. Katakanlah kita punyaEnableAutoCommit = false
, dan ketika sayaConsume
, saya mendapatkan kembali pesan dengan offset 11. Saya berharap untuk terus mendapatkan pesan yang sama dengan offset 11 berulang-ulang jika memproses pesan terus melempar dan karenanya tidak ada panggilan untukCommit
dibuat.Consume
) dengan menggunakanCommit
setelah Anda sudahSubscribe
ke topik .. Kafka (seperti dalam lib klien) di belakang layar mempertahankan semua offset yang telah dikirim ke aplikasi diConsume
dan akan mengirimkannya secara linear. Jadi untuk memproses ulang pesan seperti dalam skenario kegagalan, Anda harus melacaknya dalam kode Anda dan berusaha mengimbangi dan mulai memproses pesan dan dan Anda juga harus tahu apa yang harus dilewati jika sudah diproses dalam permintaan sebelumnya. Saya tidak terbiasa dengan .net library tetapi seharusnya tidak terlalu penting karena ini adalah desain kafka.Jawaban:
Maaf saya belum bisa menambahkan komentar. Konsumen Kafka mengkonsumsi pesan dalam batch, jadi mungkin Anda masih melakukan iterate melalui batch yang diambil sebelumnya oleh utas latar .
Anda dapat memeriksa apakah konsumen Anda benar-benar melakukan offset atau tidak menggunakan util kafka
kafka-consumer-groups.sh
sumber
Anda mungkin ingin memiliki logika coba ulang untuk memproses masing-masing pesan Anda untuk beberapa kali seperti katakanlah 5. Jika tidak berhasil selama 5 percobaan ini, Anda mungkin ingin menambahkan pesan ini ke topik lain untuk menangani semua pesan gagal yang diutamakan dari topik aktual Anda. Atau Anda mungkin ingin menambahkan pesan gagal ke topik yang sama sehingga akan diambil kemudian setelah semua pesan lainnya dikonsumsi.
Jika pemrosesan pesan apa pun berhasil dalam 5 percobaan tersebut, Anda dapat melompat ke pesan berikutnya dalam antrian.
sumber