Bagaimana cara mendengarkan perubahan pada koleksi MongoDB?

200

Saya membuat semacam sistem antrian pekerjaan latar belakang dengan MongoDB sebagai penyimpan data. Bagaimana saya bisa "mendengarkan" untuk memasukkan koleksi MongoDB sebelum menelurkan pekerja untuk memproses pekerjaan? Apakah saya perlu melakukan polling setiap beberapa detik untuk melihat apakah ada perubahan dari terakhir kali, atau apakah ada cara agar skrip saya dapat menunggu sampai terjadi insert? Ini adalah proyek PHP yang sedang saya kerjakan, tetapi jangan ragu untuk menjawab dalam bahasa Ruby atau bahasa agnostik.

Andrew
sumber
1
Ubah Aliran ditambahkan dalam MongoDB 3.6 untuk mengatasi skenario Anda. docs.mongodb.com/manual/changeStreams Juga jika Anda menggunakan MongoDB Atlas, Anda dapat memanfaatkan Stitch Triggers yang memungkinkan Anda untuk menjalankan fungsi sebagai respons untuk menyisipkan / memperbarui / menghapus / dll. docs.mongodb.com/stitch/triggers/overview Tidak perlu lagi menguraikan oplog.
Robert Walters

Jawaban:

111

Apa yang Anda pikirkan tentang pemicu suara sangat mirip. MongoDB tidak memiliki dukungan untuk pemicu, namun beberapa orang telah "menggulung sendiri" menggunakan beberapa trik. Kuncinya di sini adalah oplog.

Saat Anda menjalankan MongoDB dalam Kumpulan Replika, semua tindakan MongoDB dicatat ke log operasi (dikenal sebagai oplog). Oplog pada dasarnya hanyalah daftar modifikasi yang dibuat untuk data. Set Replika berfungsi dengan mendengarkan perubahan pada oplog ini dan kemudian menerapkan perubahan secara lokal.

Apakah ini terdengar familier?

Saya tidak dapat merinci seluruh proses di sini, ini beberapa halaman dokumentasi, tetapi alat yang Anda butuhkan tersedia.

Pertama beberapa artikel di oplog - Deskripsi singkat - Layout localkoleksi (yang berisi oplog)

Anda juga ingin memanfaatkan kursor yang tersedia . Ini akan memberi Anda cara untuk mendengarkan perubahan alih-alih memilihnya. Perhatikan bahwa replikasi menggunakan kursor yang tersedia, jadi ini adalah fitur yang didukung.

Gates VP
sumber
1
hmm ... tidak persis apa yang ada dalam pikiran saya. Saya hanya menjalankan satu instance pada saat ini (tidak ada budak). Jadi mungkin solusi yang lebih mendasar?
Andrew
17
Anda dapat memulai server dengan --replSetopsi dan itu akan membuat / mengisi oplog. Bahkan tanpa yang sekunder. Ini jelas merupakan satu-satunya cara untuk "mendengarkan" perubahan dalam DB.
Gates VP
2
Ini adalah deskripsi yang bagus bagaimana mengatur oplog untuk mencatat perubahan ke DB secara lokal: loosexaml.wordpress.com/2012/09/03/...
johndodo
Cooooool! Itu yang saya inginkan. Dan saya menemukan perpustakaan bernama 'mongo-oplog' di npm. So happy ~
pjincz
Saya setuju pada saat penulisan jawaban ini pemicu mungkin tidak tersedia tetapi untuk semua yang mendarat di sini, Ada pilihan yang tersedia sekarang, Lihat Stitch MongoDB ( docs.mongodb.com/stitch/#stitch ) & Stitch trigger ( docs. mongodb.com/stitch/triggers ) ..
whoami
102

MongoDB memiliki apa yang disebut capped collectionsdan tailable cursorsyang memungkinkan MongoDB untuk mendorong data ke pendengar.

A capped collectionpada dasarnya adalah kumpulan yang merupakan ukuran tetap dan hanya memungkinkan penyisipan. Seperti apa tampilannya membuatnya:

db.createCollection("messages", { capped: true, size: 100000000 })

Kursor MongoDB Tailable ( pos asli oleh Jonathan H. Wage )

Rubi

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python (oleh Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (oleh Max )

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Sumber daya tambahan:

Ruby / Node.js Tutorial yang menuntun Anda melalui membuat aplikasi yang mendengarkan sisipan dalam koleksi yang dibatasi MongoDB.

Artikel yang membahas tentang kursor yang tersedia lebih detail.

Contoh PHP, Ruby, Python, dan Perl menggunakan kursor yang tersedia.

Andrew
sumber
70
tidur 1? Betulkah? untuk kode produksi? bagaimana itu tidak polling?
rbp
2
@ rbp haha, saya tidak pernah mengatakan itu kode produksi, tapi Anda benar, tidur sebentar bukan praktik yang baik. Cukup yakin saya mendapat contoh itu dari tempat lain. Tidak yakin bagaimana cara mengubahnya.
Andrew
14
@ kroe karena detail yang tidak relevan itu akan dimasukkan ke dalam kode produksi oleh pemrogram baru yang mungkin tidak mengerti mengapa itu buruk.
Lele
3
Saya mengerti maksud Anda, tetapi mengharapkan beberapa programmer baru untuk menambahkan "sleep 1" ke produksi hampir menyinggung! Maksudku, aku tidak akan terkejut ... Tapi jika seseorang memproduksinya, setidaknya akan belajar dengan cara yang sulit dan selamanya .. hahaha
kroe
19
apa yang salah dengan melakukan time.sleep (1) dalam produksi?
Al Johri
44

Sejak MongoDB 3.6 akan ada API pemberitahuan baru yang disebut Ubah Aliran yang dapat Anda gunakan untuk ini. Lihat posting blog ini sebagai contoh . Contoh dari itu:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
Mitar
sumber
4
Mengapa? Bisakah Anda menguraikan? Ini cara standar sekarang?
Mitar
1
bagaimana? jangan gunakan jajak pendapat - Anda memerlukan pendekatan yang dilakukan alih-alih saat loop, dll.
Alexander Mills
3
Di mana Anda melihat pemungutan suara di sini?
Mitar
Saya pikir dia mengacu pada loop terakhir. Tapi saya pikir PyMongo hanya mendukung itu. Motor mungkin memiliki implementasi gaya pendengar asink / acara.
Shane Hsu
41

Lihat ini: Ubah Aliran

10 Januari 2018 - Rilis 3.6

* EDIT: Saya menulis artikel tentang bagaimana melakukan ini https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


Ini baru di mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

Untuk menggunakan changeStreams , basis data harus berupa Replication Set

Lebih lanjut tentang Set Replikasi: https://docs.mongodb.com/manual/replication/

Database Anda akan menjadi " Standalone " secara default.

Cara Mengubah Standalone menjadi Set Replika: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


Berikut contoh adalah aplikasi praktis untuk bagaimana Anda dapat menggunakan ini.
* Khusus untuk Node.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Tautan yang berguna:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

Rio Weber
sumber
maaf tentang semua pengeditan, SO tidak menyukai "Tautan" saya (mengatakan bahwa kode itu diformat dengan tidak benar.)
Rio Weber
1
Anda tidak harus menanyakan database, saya pikir dengan menonton () atau serupa, data baru dapat dikirim ke server yang mendengarkan
Alexander Mills
22

MongoDB versi 3.6 sekarang termasuk aliran perubahan yang pada dasarnya adalah API di atas OpLog memungkinkan untuk kasus penggunaan yang memicu / pemberitahuan-seperti.

Berikut ini tautan ke contoh Java: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

Contoh NodeJS mungkin terlihat seperti:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });
Robert Walters
sumber
JSON.stringify sangat penting untuk menerima data ini di Android Studio (Aplikasi Android) ..
DragonFire
3

Atau, Anda bisa menggunakan metode Mongo FindAndUpdate standar, dan dalam panggilan balik, jalankan acara EventEmitter (dalam Node) saat panggilan balik dijalankan.

Setiap bagian lain dari aplikasi atau arsitektur yang mendengarkan acara ini akan diberitahu tentang pembaruan, dan semua data yang relevan dikirim juga. Ini adalah cara yang sangat sederhana untuk mendapatkan notifikasi dari Mongo.

Alex
sumber
ini sangat tidak efisien .. Anda mengunci db untuk setiap FindAndUpdate!
Yash Gupta
1
Dugaan saya adalah bahwa Alex menjawab pertanyaan yang sedikit berbeda (tidak secara khusus menangani penyisipan) tetapi terkait pertanyaan seperti cara melepaskan beberapa jenis pemberitahuan kepada klien ketika keadaan pekerjaan yang antri berubah yang kami anggap perlu terjadi ketika pekerjaan muncul. , berhasil atau gagal total. Dengan klien yang terhubung menggunakan soket web ke simpul, mereka semua dapat diberitahu tentang perubahan dengan acara siaran pada panggilan balik FIndAndUpdate yang dapat dipanggil saat menerima pesan perubahan status. Saya akan mengatakan bahwa ini tidak efisien karena pembaruan perlu dilakukan.
Peter Scott
3

Banyak dari jawaban ini hanya akan memberi Anda catatan baru dan bukan pembaruan dan / atau sangat tidak efisien

Satu-satunya cara yang andal dan performan untuk melakukan ini adalah dengan membuat kursor yang tersedia pada koleksi db: oplog.rs lokal untuk mendapatkan SEMUA perubahan pada MongoDB dan lakukan sesuai keinginan Anda. (MongoDB bahkan melakukan ini secara internal kurang lebih untuk mendukung replikasi!)

Penjelasan tentang isi oplog: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Contoh perpustakaan Node.js yang menyediakan API tentang apa yang tersedia untuk dilakukan dengan oplog: https://github.com/cayasso/mongo-oplog

John Culviner
sumber
2

Ada satu set layanan luar biasa yang tersedia yang disebut Stitch MongoDB . Lihatlah ke fungsi / pemicu tusuk . Perhatikan ini adalah layanan berbayar berbasis cloud (AWS). Dalam kasus Anda, pada sebuah sisipan, Anda dapat memanggil fungsi khusus yang ditulis dalam javascript.

masukkan deskripsi gambar di sini

Manish Jain
sumber
stackoverflow.com/users/486867/manish-jain - apakah Anda memiliki contoh bagaimana cara menjahit dapat digunakan untuk memberi tahu aplikasi REACT bahwa data dimasukkan ke dalam tabel?
MLissCetrus
1

Ada contoh java yang berfungsi yang dapat ditemukan di sini .

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

Kuncinya adalah PILIHAN QUERY yang diberikan di sini.

Anda juga dapat mengubah kueri temukan, jika Anda tidak perlu memuat semua data setiap saat.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
Maleen Abewardana
sumber
1

Sebenarnya, alih-alih menonton output, mengapa Anda tidak mendapatkan pemberitahuan ketika sesuatu yang baru dimasukkan dengan menggunakan perangkat tengah yang disediakan oleh skema luwak

Anda dapat menangkap acara memasukkan dokumen baru dan melakukan sesuatu setelah penyisipan ini selesai

Duong Nguyen
sumber
Salahku. Maaf pak.
Duong Nguyen
0

Setelah 3,6 orang diperbolehkan menggunakan basis data, basis data berikut ini memicu jenis:

  • pemicu yang didorong oleh peristiwa - berguna untuk memperbarui dokumen terkait secara otomatis, memberi tahu layanan hilir, menyebarkan data untuk mendukung beragam beban kerja, integritas & audit data
  • pemicu terjadwal - berguna untuk pengambilan data terjadwal, propagasi, arsip, dan beban kerja analitik

Masuk ke akun Atlas Anda dan pilih Triggersantarmuka dan tambahkan pemicu baru:

masukkan deskripsi gambar di sini

Luaskan setiap bagian untuk lebih banyak pengaturan atau detail.

Gotqn
sumber