RxJS: Bagaimana saya memperbarui Observable secara "manual"?

146

Saya pikir saya pasti salah paham tentang sesuatu yang fundamental, karena dalam pikiran saya ini harus menjadi kasus paling mendasar untuk diamati, tetapi untuk kehidupan saya, saya tidak tahu bagaimana melakukannya dari dokumen.

Pada dasarnya, saya ingin bisa melakukan ini:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

Tapi saya belum bisa menemukan metode seperti itu push. Saya menggunakan ini untuk click handler, dan saya tahu mereka memilikinya Observable.fromEvent, tetapi saya mencoba menggunakannya dengan React dan saya lebih suka dapat memperbarui datastream dalam callback, daripada menggunakan yang sama sekali berbeda sistem penanganan acara. Jadi pada dasarnya saya menginginkan ini:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

Yang paling dekat yang saya dapatkan adalah menggunakan observer.onNext('foo'), tetapi itu tampaknya tidak benar-benar berfungsi dan itu disebut pengamat, yang tampaknya tidak benar. Pengamat seharusnya yang bereaksi terhadap aliran data, bukan mengubahnya, bukan?

Apakah saya hanya tidak memahami hubungan pengamat / yang dapat diamati?

LiamD
sumber
Lihat ini untuk mengklarifikasi ide Anda (Pengantar Pemrograman Reaktif yang Anda lewatkan): gist.github.com/staltz/868e7e9bc2a7b8c1f754 . Di sini juga ada banyak sumber daya yang dapat Anda gunakan untuk meningkatkan pemahaman Anda: github.com/Reactive-Extensions/RxJS#resources
user3743222
Saya telah memeriksa yang pertama, sepertinya sumber daya yang solid. Yang kedua adalah daftar yang bagus, di atasnya saya menemukan aaronstacy.com/writings/reactive-programming-and-mvc yang membantu saya menemukan Rx.Subject, yang memecahkan masalah saya. Jadi terima kasih! Setelah saya menulis lebih banyak aplikasi, saya akan memposting solusi saya, hanya ingin sedikit menguji pertarungan.
LiamD
Hehe, terima kasih banyak telah menanyakan pertanyaan ini, saya akan menanyakan pertanyaan yang sama dengan contoh kode yang sama :-)
arturh

Jawaban:

157

Di RX, Observer dan Observable adalah entitas yang berbeda. Seorang pengamat berlangganan Observable. Sebuah Observable memancarkan item ke pengamatnya dengan memanggil metode pengamat. Jika Anda perlu memanggil metode pengamat di luar ruang lingkup Observable.create()Anda dapat menggunakan Subjek, yang merupakan proxy yang bertindak sebagai pengamat dan dapat diamati pada saat yang sama.

Anda bisa melakukan seperti ini:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

Anda dapat menemukan informasi lebih lanjut tentang subjek di sini:

luisgabriel.dll
sumber
1
Ini sebenarnya yang akhirnya saya lakukan! Saya terus mengerjakannya untuk melihat apakah saya dapat menemukan cara yang lebih baik untuk melakukan apa yang perlu saya lakukan, tetapi ini jelas merupakan solusi yang layak. Saya melihatnya pertama kali di artikel ini: aaronstacy.com/writings/reactive-programming-and-mvc .
LiamD
37

Saya percaya Observable.create()tidak mengambil pengamat sebagai param panggilan balik tetapi emitor. Jadi jika Anda ingin menambahkan nilai baru ke Observable Anda, coba ini sebagai gantinya:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Ya, Subjek membuatnya lebih mudah, menyediakan Observable dan Observer dalam objek yang sama, tetapi tidak persis sama, karena Subjek memungkinkan Anda untuk mendaftarkan beberapa pengamat ke observable yang sama ketika sebuah observable hanya mengirim data ke pengamat berlangganan terakhir, jadi gunakan secara sadar . Ini JsBin jika Anda ingin mengotak- atiknya .

theFreedomPisang
sumber
apakah kemungkinan penimpaan properti emitter didokumentasikan di suatu tempat di manual RxJS?
Tomas
Dalam hal ini emitterhanya next()nilai baru untuk pengamat yang berlangganan terakhir. Pendekatan yang lebih baik adalah mengumpulkan semua emitterdalam sebuah array dan mengulanginya semua dan nextnilai masing-masing
Eric Gopak