Apakah .NET's IObserver <T> dimaksudkan untuk berlangganan beberapa IObservables?

9

Ada IObservable dan IObserver interface di NET (juga di sini dan di sini ). Menariknya, implementasi konkret dari IObserver tidak memiliki referensi langsung ke IObservable. Tidak tahu dengan siapa berlangganannya. Itu hanya dapat memanggil unsubscriber. "Silakan tarik pin untuk berhenti berlangganan."

edit: The unsubscriber mengimplementasikan IDisposable. Saya pikir, skema ini digunakan untuk mencegah masalah pendengar yang murtad .

Namun, ada dua hal yang tidak sepenuhnya jelas bagi saya.

  1. Apakah kelas Unsubscriber dalam menyediakan perilaku berlangganan dan lupa? Siapa (dan kapan tepatnya) memanggil IDisposable.Dispose()Unsubscriber? Pengumpul sampah (GC) tidak deterministik.
    [Penafian: secara keseluruhan, saya menghabiskan lebih banyak waktu dengan C dan C ++ daripada dengan C #.]
  2. Apa yang harus terjadi jika saya ingin berlangganan pengamat K ke L1 yang dapat diamati dan pengamat sudah berlangganan beberapa L2 yang dapat diamati?

    K.Subscribe(L1);
    K.Subscribe(L2);
    K.Unsubscribe();
    L1.PublishObservation(1003);
    L2.PublishObservation(1004);
    

    Ketika saya menjalankan kode tes ini terhadap contoh MSDN, pengamat tetap berlangganan L1. Ini akan menjadi khas dalam pengembangan nyata. Secara potensial, ada 3 jalan untuk meningkatkan ini:

    • Jika pengamat sudah memiliki contoh unsubscriber (yaitu sudah berlangganan), maka diam-diam berhenti berlangganan dari penyedia asli sebelum berlangganan yang baru. Pendekatan ini menyembunyikan fakta bahwa itu tidak lagi berlangganan ke penyedia asli, yang mungkin akan mengejutkan nanti.
    • Jika pengamat sudah memiliki contoh unsubscriber, maka dilemparkan pengecualian. Kode panggilan yang baik harus berhenti berlangganan pengamat secara eksplisit.
    • Pengamat berlangganan beberapa penyedia. Ini adalah opsi yang paling menarik, tetapi bisakah ini diimplementasikan dengan IObservable dan IObserver? Ayo lihat. Adalah mungkin bagi pengamat untuk menyimpan daftar objek unsubscriber: satu untuk setiap sumber. Sayangnya, IObserver.OnComplete()tidak memberikan referensi kembali ke penyedia yang telah mengirimnya. Jadi, implementasi IObserver dengan beberapa penyedia tidak akan dapat menentukan dari yang berlangganan.
  3. Apakah .NET IObserver dimaksudkan untuk berlangganan beberapa IObservables?
    Apakah definisi buku teks dari pola pengamat mensyaratkan bahwa satu pengamat harus dapat berlangganan ke beberapa penyedia? Atau apakah itu opsional dan tergantung pada implementasi?

Nick Alexeev
sumber

Jawaban:

5

Kedua antarmuka sebenarnya adalah bagian dari Reactive Extensions (singkatnya Rx), Anda harus menggunakan perpustakaan itu cukup banyak setiap kali Anda ingin menggunakannya.

Antarmuka secara teknis dalam mscrolib, bukan di salah satu dari rx majelis. Saya pikir ini untuk memudahkan interoperabilitas: dengan cara ini, perpustakaan seperti TPL Dataflow dapat menyediakan anggota yang bekerja dengan antarmuka tersebut , tanpa benar-benar merujuk Rx.

Jika Anda menggunakan Rx's Subjectsebagai implementasi Anda IObservable, Subscribeakan mengembalikan IDisposableyang dapat digunakan untuk berhenti berlangganan:

var observable = new Subject<int>();

var unsubscriber =
    observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("1: {0}", i)));
observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("2: {0}", i)));

unsubscriber.Dispose();

observable.OnNext(1003);
observable.OnNext(1004);
svick
sumber
5

Hanya untuk menjernihkan beberapa hal yang didokumentasikan dengan baik dalam Pedoman Desain Rx resmi dan panjang lebar di situs web saya IntroToRx.com :

  • Anda tidak bergantung pada GC untuk membersihkan langganan Anda. Tercakup secara rinci di sini
  • Tidak ada Unsubscribemetode. Anda berlangganan urutan yang dapat diamati dan diberikan langganan . Anda kemudian dapat membuang langganan yang menunjukkan bahwa Anda tidak lagi ingin panggilan balik dipanggil.
  • Urutan yang dapat diamati tidak dapat diselesaikan lebih dari sekali (lihat bagian 4 Pedoman Desain Rx).
  • Ada banyak cara untuk mengkonsumsi beberapa urutan yang dapat diamati. Ada juga banyak informasi mengenai itu di Reactivex.io dan lagi di IntroToRx.

Untuk lebih spesifik dan menjawab pertanyaan asli secara langsung, penggunaan Anda kembali ke depan. Anda tidak mendorong banyak urutan yang dapat diamati ke dalam satu pengamat. Anda menyusun urutan yang dapat diamati menjadi satu urutan yang dapat diamati. Anda kemudian berlangganan urutan tunggal itu.

Dari pada

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

Yang hanya kode semu dan tidak akan berfungsi dalam .NET implementasi Rx, Anda harus melakukan hal berikut:

var source1 = new Subject<int>(); //was L1
var source2 = new Subject<int>(); //was L2

var subscription = source1
    .Merge(source2)
    .Subscribe(value=>Console.WriteLine("OnNext({0})", value));


source1.OnNext(1003);
source2.OnNext(1004);

subscription.Dispose();

Sekarang ini tidak sesuai dengan pertanyaan awal, tetapi saya tidak tahu apa K.Unsubscribe()yang seharusnya dilakukan (berhenti berlangganan, berlangganan terakhir atau pertama ?!)

Lee Campbell
sumber
Bisakah saya cukup membungkus objek berlangganan di blok "menggunakan"?
Robert Oschler
1
Dalam kasus sinkron ini Anda bisa, namun Rx seharusnya asinkron. Dalam kasus asinkron, Anda biasanya tidak dapat menggunakan usingblok. Biaya untuk pernyataan berlangganan harus hampir nol, sehingga Anda akan neter blok menggunakan, berlangganan, meninggalkan blok menggunakan (sehingga berhenti berlangganan) membuat kode agak tidak berguna
Lee Campbell
3

Kamu benar. Contoh ini bekerja buruk untuk beberapa IObservables.

Saya kira OnComplete () tidak memberikan referensi kembali karena mereka tidak ingin IObservable harus menyimpannya. Jika saya menulis bahwa saya mungkin akan mendukung beberapa langganan dengan meminta Langganan mengambil pengidentifikasi sebagai parameter kedua, yang akan diteruskan kembali ke panggilan OnComplete (). Jadi bisa dibilang begitu

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

Seperti berdiri, tampaknya. NET IObserver tidak cocok untuk banyak pengamat. Tapi saya kira objek utama Anda (LocationReporter dalam contoh) bisa

public Dictionary<String,IObserver> Observers;

dan itu akan memungkinkan Anda untuk mendukung

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

demikian juga.

Saya kira Microsoft dapat berargumen bahwa oleh karena itu tidak perlu bagi mereka untuk secara langsung mendukung beberapa IObservables di antarmuka.

psr
sumber
Saya juga berpikir bahwa implementasi yang dapat diamati dapat memiliki daftar pengamat. Saya juga memperhatikan bahwa IObserver.OnComplete()tidak mengidentifikasi dari siapa panggilan itu berasal. Jika pengamat berlangganan lebih dari satu yang dapat diamati, maka tidak tahu dari siapa harus berhenti berlangganan. Antiklimaks. Saya bertanya-tanya, apakah .NET memiliki antarmuka yang lebih baik untuk pola pengamat?
Nick Alexeev
Jika Anda ingin memiliki referensi ke sesuatu, Anda sebenarnya harus menggunakan referensi, bukan string.
svick
Jawaban ini membantu saya dengan bug di kehidupan nyata. Saya menggunakan Observable.Create()untuk membangun yang diamati, dan merantai beberapa sumber yang dapat diamati menggunakan itu Subscribe(). Saya secara tidak sengaja melewati pengamatan yang dapat diamati dalam satu jalur kode. Ini melengkapi pengamatan saya yang baru dibuat, meskipun sumber-sumber lain tidak lengkap. Butuh waktu lama untuk bekerja di luar apa yang perlu saya lakukan - saklar Observable.Empty()untuk Observable.Never().
Olly
0

Saya tahu ini adalah cara terlambat ke pesta, tapi ...

Antarmuka saya Observable<T>dan IObserver<T>yang tidak bagian dari Rx ... mereka jenis inti sedang ... tapi Rx membuat ekstensif menggunakan mereka.

Anda bebas untuk memiliki sebanyak (atau sesedikit) pengamat sesuka Anda. Jika Anda mengantisipasi beberapa pengamat, adalah tanggung jawab yang dapat diamati untuk merutekan OnNext()panggilan ke pengamat yang tepat untuk setiap peristiwa yang diamati. Yang diamati mungkin memerlukan daftar atau kamus seperti yang Anda sarankan.

Ada beberapa kasus yang baik untuk memungkinkan hanya satu - dan kasus yang baik untuk memungkinkan banyak. Misalnya, dalam implementasi CQRS / ES, Anda mungkin menerapkan penangan perintah tunggal per jenis perintah pada bus perintah, sementara Anda mungkin memberi tahu beberapa transformasi sisi-baca untuk peristiwa tertentu jenis di toko peristiwa.

Sebagaimana dinyatakan dalam jawaban lain, tidak ada Unsubscribe. Membuang benda yang Anda berikan saat Subscribeumumnya melakukan pekerjaan kotor. Pengamat, atau agennya, bertanggung jawab untuk memegang token sampai tidak lagi ingin menerima pemberitahuan lebih lanjut . (pertanyaan 1)

Jadi, dalam contoh Anda:

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

... akan lebih seperti:

using ( var l1Token = K.Subscribe( L1 ) )
{
  using ( var l2Token = K.Subscribe( L2 );
  {
    L1.PublishObservation( 1003 );
    L2.PublishObservation( 1004 );
  } //--> effectively unsubscribing to L2 here

  L2.PublishObservation( 1005 );
}

... di mana K akan mendengar 1003 dan 1004 tetapi tidak 1005.

Bagi saya, ini masih terlihat lucu karena secara nominal, berlangganan adalah hal yang berumur panjang ... sering selama durasi program. Mereka tidak berbeda dalam hal ini dengan acara .Net normal.

Dalam banyak contoh yang saya lihat, Disposetoken tidak bekerja untuk menghilangkan pengamat dari daftar pengamat yang diamati. Saya lebih suka bahwa token tidak membawa begitu banyak pengetahuan di sekitar ... dan jadi saya telah menggeneralisasi token berlangganan saya hanya memanggil lambda yang lewat (dengan mengidentifikasi informasi yang ditangkap pada waktu berlangganan:

public class SubscriptionToken<T>: IDisposable
{
  private readonly Action unsubscribe;

  private SubscriptionToken( ) { }
  public SubscriptionToken( Action unsubscribe )
  {
    this.unsubscribe = unsubscribe;
  }

  public void Dispose( )
  {
    unsubscribe( );
  }
}

... dan yang dapat diamati dapat menginstal perilaku berhenti berlangganan selama berlangganan:

IDisposable Subscribe<T>( IObserver<T> observer )
{
  var subscriberId = Guid.NewGuid( );
  subscribers.Add( subscriberId, observer );

  return new SubscriptionToken<T>
  (
    ( ) =>
    subscribers.Remove( subscriberId );
  );
}

Jika pengamat Anda menangkap peristiwa dari beberapa yang dapat diamati, Anda mungkin ingin memastikan bahwa ada semacam informasi korelasi dalam acara itu sendiri ... seperti .Net sender. Terserah Anda apakah itu penting atau tidak. Itu tidak dipanggang, seperti yang Anda alasankan dengan benar. (pertanyaan 3)

Tanah liat
sumber