Bagaimana cara kerja pola pengganggu LMAX?

205

Saya mencoba memahami pola pengganggu . Saya telah menonton video InfoQ dan mencoba membaca makalah mereka. Saya mengerti ada buffer cincin yang terlibat, yang diinisialisasi sebagai array yang sangat besar untuk mengambil keuntungan dari cache lokalitas, menghilangkan alokasi memori baru.

Kedengarannya seperti ada satu atau lebih bilangan bulat atom yang melacak posisi. Setiap 'peristiwa' tampaknya mendapatkan id unik dan posisinya di atas ring ditemukan dengan menemukan modulusnya sehubungan dengan ukuran cincin, dll., Dll.

Sayangnya, saya tidak memiliki perasaan intuitif tentang cara kerjanya. Saya telah melakukan banyak aplikasi perdagangan dan mempelajari model aktor , melihat SEDA, dll.

Dalam presentasi mereka, mereka menyebutkan bahwa pola ini pada dasarnya adalah bagaimana router bekerja; Namun saya belum menemukan deskripsi yang baik tentang bagaimana router bekerja.

Apakah ada petunjuk yang bagus untuk penjelasan yang lebih baik?

Shahbaz
sumber

Jawaban:

210

Proyek Google Code mereferensikan sebuah makalah teknis tentang implementasi buffer cincin, namun agak kering, bersifat akademis, dan sulit bagi seseorang yang ingin mempelajari cara kerjanya. Namun ada beberapa posting blog yang sudah mulai menjelaskan internal dengan cara yang lebih mudah dibaca. Ada penjelasan tentang buffer cincin yang merupakan inti dari pola pengganggu, deskripsi hambatan konsumen (bagian yang terkait dengan membaca dari pengganggu) dan beberapa informasi tentang penanganan beberapa produsen yang tersedia.

Deskripsi paling sederhana dari Disruptor adalah: Ini adalah cara mengirim pesan di antara utas dengan cara yang seefisien mungkin. Ini dapat digunakan sebagai alternatif dari antrian, tetapi juga berbagi sejumlah fitur dengan SEDA dan Aktor.

Dibandingkan dengan Antrian:

Disruptor menyediakan kemampuan untuk mengirimkan pesan ke utas lainnya, membangunkannya jika diperlukan (mirip dengan BlockingQueue). Namun, ada 3 perbedaan yang berbeda.

  1. Pengguna Disruptor mendefinisikan bagaimana pesan disimpan dengan memperluas kelas Entry dan menyediakan pabrik untuk melakukan preallokasi. Ini memungkinkan penggunaan kembali memori (penyalinan) atau Entri dapat berisi referensi ke objek lain.
  2. Menempatkan pesan ke dalam Disruptor adalah proses 2 fase, pertama slot diklaim dalam buffer cincin, yang menyediakan pengguna dengan Entri yang dapat diisi dengan data yang sesuai. Maka entri harus dilakukan, pendekatan 2 fase ini diperlukan untuk memungkinkan penggunaan memori yang fleksibel yang disebutkan di atas. Komitlah yang membuat pesan dapat dilihat oleh utas konsumen.
  3. Merupakan tanggung jawab konsumen untuk melacak pesan yang telah dikonsumsi dari buffer cincin. Memindahkan tanggung jawab ini dari buffer cincin itu sendiri membantu mengurangi jumlah pertikaian penulisan karena setiap utas mempertahankan penghitungnya sendiri.

Dibandingkan dengan Aktor

Model Actor lebih dekat dengan Disruptor daripada kebanyakan model pemrograman lainnya, terutama jika Anda menggunakan kelas BatchConsumer / BatchHandler yang disediakan. Kelas-kelas ini menyembunyikan semua kerumitan mempertahankan nomor urut yang dikonsumsi dan memberikan satu set panggilan balik sederhana ketika peristiwa-peristiwa penting terjadi. Namun, ada beberapa perbedaan yang tidak kentara.

  1. Disruptor menggunakan model konsumen 1 utas - 1, di mana Aktor menggunakan model N: M yaitu Anda dapat memiliki sebanyak mungkin aktor yang Anda inginkan dan mereka akan didistribusikan di sejumlah utas (biasanya 1 per inti).
  2. Antarmuka BatchHandler menyediakan panggilan balik tambahan (dan sangat penting) onEndOfBatch(). Hal ini memungkinkan untuk konsumen yang lambat, misalnya mereka yang melakukan I / O untuk mengumpulkan acara bersama untuk meningkatkan throughput. Dimungkinkan untuk melakukan batching dalam kerangka Actor lain, namun karena hampir semua kerangka kerja lainnya tidak memberikan panggilan balik pada akhir batch Anda perlu menggunakan batas waktu untuk menentukan akhir batch, sehingga latensi yang buruk.

Dibandingkan dengan SEDA

LMAX membangun pola Disruptor untuk menggantikan pendekatan berbasis SEDA.

  1. Peningkatan utama yang diberikannya pada SEDA adalah kemampuan untuk melakukan pekerjaan secara paralel. Untuk melakukan ini, Disruptor mendukung multi-casting pesan yang sama (dalam urutan yang sama) ke banyak konsumen. Ini menghindari perlunya fork stage dalam pipeline.
  2. Kami juga memungkinkan konsumen untuk menunggu hasil dari konsumen lain tanpa harus menempatkan tahap antrian di antara mereka. Seorang konsumen dapat dengan mudah melihat nomor urut konsumen yang menjadi sandarannya. Ini menghindari kebutuhan untuk bergabung dalam tahapan dalam pipeline.

Dibandingkan dengan Memory Barriers

Cara lain untuk memikirkannya adalah sebagai penghalang memori terstruktur dan teratur. Di mana penghalang produsen membentuk penghalang tulis dan penghalang konsumen adalah penghalang baca.

Michael Barker
sumber
1
Terima kasih Michael. Tulisan dan tautan yang Anda berikan telah membantu saya memahami cara kerjanya. Sisanya, saya pikir saya hanya perlu membiarkannya meresap.
Shahbaz
Saya masih memiliki pertanyaan: (1) bagaimana cara kerja 'komit'? (2) Ketika buffer cincin penuh, bagaimana produsen mendeteksi bahwa semua konsumen telah melihat data sehingga produsen dapat menggunakan kembali entri?
Qwertie
@ Kwertie, mungkin layak memposting pertanyaan baru.
Michael Barker
1
Bukankah seharusnya kalimat pertama dari poin terakhir (angka 2) di bawah Dibandingkan dengan SEDA alih-alih membaca "Kami juga memungkinkan konsumen untuk menunggu hasil dari konsumen lain dengan harus meletakkan tahap antrian lain di antara mereka" membaca "Kami juga mengizinkan konsumen menunggu hasil dari konsumen lain tanpa harus meletakkan tahap antrian lain di antara mereka "(mis." dengan "harus diganti dengan" tanpa ")?
runeks
@ runeks, ya seharusnya begitu.
Michael Barker
135

Pertama-tama kami ingin memahami model pemrograman yang ditawarkannya.

Ada satu atau lebih penulis. Ada satu atau lebih pembaca. Ada garis entri, benar-benar dipesan dari yang lama ke yang baru (digambarkan sebagai kiri ke kanan). Penulis dapat menambahkan entri baru di ujung kanan. Setiap pembaca membaca entri secara berurutan dari kiri ke kanan. Pembaca tidak dapat membaca penulis masa lalu, jelas.

Tidak ada konsep penghapusan entri. Saya menggunakan "pembaca" alih-alih "konsumen" untuk menghindari gambar entri yang dikonsumsi. Namun kami memahami bahwa entri di sebelah kiri pembaca terakhir menjadi tidak berguna.

Umumnya pembaca dapat membaca secara bersamaan dan mandiri. Namun kami dapat mendeklarasikan ketergantungan di antara para pembaca. Ketergantungan pembaca dapat berupa grafik asiklik yang berubah-ubah. Jika pembaca B tergantung pada pembaca A, pembaca B tidak dapat membaca pembaca sebelumnya A.

Ketergantungan pembaca muncul karena pembaca A dapat membuat anotasi entri, dan pembaca B tergantung pada anotasi itu. Misalnya, A melakukan perhitungan pada entri, dan menyimpan hasilnya di bidang adalam entri. A kemudian pindah, dan sekarang B dapat membaca entri, dan nilai aA disimpan. Jika pembaca C tidak bergantung pada A, C seharusnya tidak mencoba membaca a.

Ini memang model pemrograman yang menarik. Terlepas dari kinerjanya, model itu sendiri dapat menguntungkan banyak aplikasi.

Tentu saja, tujuan utama LMAX adalah kinerja. Ini menggunakan ring entri yang dialokasikan sebelumnya. Cincin itu cukup besar, tetapi dibatasi sehingga sistem tidak akan dimuat melebihi kapasitas desain. Jika cincin penuh, penulis akan menunggu hingga pembaca yang paling lambat maju dan memberi ruang.

Objek yang masuk telah dialokasikan sebelumnya dan hidup selamanya, untuk mengurangi biaya pengumpulan sampah. Kami tidak memasukkan objek entri baru atau menghapus objek entri lama, sebagai gantinya, seorang penulis meminta entri yang sudah ada sebelumnya, mengisi bidangnya, dan memberi tahu pembaca. Tindakan 2 fase yang tampak ini sebenarnya hanyalah tindakan atom

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }

Pra-alokasi entri juga berarti entri yang berdekatan (sangat mungkin) terletak di sel memori yang berdekatan, dan karena pembaca membaca entri secara berurutan, ini penting untuk memanfaatkan cache CPU.

Dan banyak upaya untuk menghindari penguncian, CAS, bahkan penghalang memori (misalnya menggunakan variabel urutan non-volatil jika hanya ada satu penulis)

Untuk pengembang pembaca: Pembaca anotasi yang berbeda harus menulis ke bidang yang berbeda, untuk menghindari pertikaian penulisan. (Sebenarnya mereka harus menulis ke baris cache yang berbeda.) Pembaca yang memberi penjelasan tidak boleh menyentuh apa pun yang mungkin dibaca oleh pembaca yang tidak bergantung. Inilah sebabnya saya mengatakan bahwa para pembaca ini memberi anotasi pada entri, alih-alih memodifikasi entri.

tak dapat disangkal
sumber
2
Terlihat oke untuk saya. Saya suka menggunakan istilah anotasi.
Michael Barker
21
+1 ini adalah satu-satunya jawaban yang mencoba menggambarkan bagaimana pola pengganggu sebenarnya bekerja, seperti yang ditanyakan OP.
G-Wiz
1
Jika cincin penuh, penulis akan menunggu hingga pembaca yang paling lambat maju dan memberi ruang. - satu masalah dengan antrian FIFO yang dalam adalah membuat mereka terlalu mudah penuh di bawah beban, karena mereka tidak benar-benar mencoba tekanan balik sampai mereka diisi dan latensi sudah tinggi.
bestsss
1
@irreputable Bisakah Anda juga menulis penjelasan serupa untuk sisi penulis?
Buchi
Saya menyukainya tetapi saya menemukan ini "seorang penulis meminta entri yang sudah ada sebelumnya, mengisi bidangnya, dan memberi tahu pembaca. Tindakan 2 fase ini sebenarnya hanya sebuah tindakan atom" yang membingungkan dan mungkin salah? Tidak ada "beri tahu" kan? Juga bukan atom, itu hanya satu tulisan yang efektif / terlihat, benar? Jawaban bagus hanya bahasa yang ambigu?
Selamat
41

Martin Fowler telah menulis artikel tentang LMAX dan pola pengganggu, Arsitektur LMAX , yang dapat memperjelasnya lebih lanjut.

Membuang
sumber
17

Saya benar-benar meluangkan waktu untuk mempelajari sumber yang sebenarnya, karena penasaran belaka, dan ide di baliknya cukup sederhana. Versi terbaru pada saat menulis posting ini adalah 3.2.1.

Ada buffer yang menyimpan acara yang dialokasikan sebelumnya yang akan menyimpan data untuk dibaca konsumen.

Buffer didukung oleh array flags (integer array) dengan panjangnya yang menggambarkan ketersediaan slot buffer (lihat lebih lanjut untuk detailnya). Array diakses seperti java # AtomicIntegerArray, jadi untuk tujuan penjelasan ini Anda juga dapat menganggapnya sebagai satu.

Bisa ada sejumlah produsen. Ketika produsen ingin menulis ke buffer, nomor yang panjang dihasilkan (seperti dalam memanggil AtomicLong # getAndIncrement, Disruptor sebenarnya menggunakan implementasinya sendiri, tetapi ia bekerja dengan cara yang sama). Sebut saja ini lama dihasilkan producerCallId. Dengan cara yang sama, consumerCallId dihasilkan ketika konsumen SELESAI membaca slot dari buffer. ConsumerCallId terbaru diakses.

(Jika ada banyak konsumen, panggilan dengan id terendah dipilih.)

Id ini kemudian dibandingkan, dan jika perbedaan antara keduanya lebih kecil dari sisi buffer, produser diperbolehkan untuk menulis.

(Jika producerCallId lebih besar dari consumerCallId + bufferSize baru-baru ini, itu berarti buffer sudah penuh, dan produser dipaksa untuk menunggu bus sampai suatu tempat tersedia.)

Produser kemudian diberi slot dalam buffer berdasarkan callId-nya (yang merupakan modulo bufferSize prducerCallId, tetapi karena bufferSize selalu memiliki kekuatan 2 (batas yang diberlakukan pada pembuatan buffer), operasi sebenarnya yang digunakan adalah producerCallId & (bufferSize - 1 )). Maka bebas untuk mengubah acara di slot itu.

(Algoritme yang sebenarnya sedikit lebih rumit, yang melibatkan caching consumerId baru-baru ini dalam referensi atom terpisah, untuk keperluan optimasi.)

Ketika acara itu diubah, perubahan itu "diterbitkan". Saat menerbitkan slot masing-masing dalam array bendera diisi dengan bendera yang diperbarui. Nilai flag adalah jumlah loop (producerCallId dibagi dengan bufferSize (sekali lagi karena bufferSize adalah kekuatan 2, operasi sebenarnya adalah pergeseran yang benar).

Dengan cara yang sama bisa ada sejumlah konsumen. Setiap kali konsumen ingin mengakses buffer, consumerCallId dihasilkan (tergantung pada bagaimana konsumen ditambahkan ke pengganggu atom yang digunakan dalam generasi id dapat dibagi atau terpisah untuk masing-masing). ConsumerCallId ini kemudian dibandingkan denganCallId producent terbaru, dan jika lebih rendah dari keduanya, pembaca diperbolehkan untuk maju.

(Demikian pula jika producerCallId bahkan ke consumerCallId, itu berarti bahwa buffer itu kosong dan konsumen dipaksa untuk menunggu. Cara menunggu didefinisikan oleh WaitStrategy selama pembuatan disruptor.)

Untuk konsumen individu (yang memiliki id generator sendiri), hal berikutnya yang diperiksa adalah kemampuan untuk mengkonsumsi batch. Slot dalam buffer diperiksa secara berurutan dari masing-masing ke consumerCallId (indeks ditentukan dengan cara yang sama seperti untuk produsen), ke masing-masing ke producerCallId baru-baru ini.

Mereka diperiksa dalam satu lingkaran dengan membandingkan nilai flag yang ditulis dalam array flag, dengan nilai flag yang dihasilkan untuk consumerCallId. Jika benderanya cocok, itu artinya produsen yang mengisi slot telah melakukan perubahan. Jika tidak, loop rusak, dan changeId berkomitmen tertinggi dikembalikan. Slot dari ConsumerCallId hingga diterima di changeId dapat dikonsumsi secara batch.

Jika sekelompok konsumen membaca bersama (yang memiliki id generator bersama), masing-masing hanya mengambil satu ID call, dan hanya slot untuk ID call tunggal yang diperiksa dan dikembalikan.

Martin A. Kwasowiec
sumber
7

Dari artikel ini :

Pola pengganggu adalah antrian batching didukung oleh array melingkar (yaitu penyangga cincin) diisi dengan objek transfer pra-dialokasikan yang menggunakan hambatan memori untuk menyinkronkan produsen dan konsumen melalui urutan.

Hambatan-memori agak sulit untuk dijelaskan dan blog Trisha telah melakukan upaya terbaik menurut pendapat saya dengan posting ini: http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast. html

Tetapi jika Anda tidak ingin masuk ke detail tingkat rendah, Anda bisa tahu bahwa hambatan memori di Jawa diimplementasikan melalui volatilekata kunci atau melalui java.util.concurrent.AtomicLong. Urutan pola gangguan adalah AtomicLongdan dikomunikasikan bolak-balik antara produsen dan konsumen melalui hambatan memori, bukan kunci.

Saya merasa lebih mudah untuk memahami konsep melalui kode, jadi kode di bawah ini adalah helloworld sederhana dari CoralQueue , yang merupakan implementasi pola pengganggu yang dilakukan oleh CoralBlocks yang saya berafiliasi. Dalam kode di bawah ini Anda dapat melihat bagaimana pola pengganggu mengimplementasikan batching dan bagaimana buffer cincin (yaitu array melingkar) memungkinkan komunikasi bebas sampah antara dua utas:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}
rdalmeida
sumber