Bagaimana saya bisa `menunggu` di Rx Observable?

107

Saya ingin bisa menunggu yang bisa diamati, misalnya

const source = Rx.Observable.create(/* ... */)
//...
await source;

Upaya naif menghasilkan menunggu penyelesaian segera dan tidak memblokir eksekusi

Edit: Pseudocode untuk kasus penggunaan lengkap saya adalah:

if (condition) {
  await observable;
}
// a bunch of other code

Saya mengerti bahwa saya dapat memindahkan kode lain ke fungsi lain yang terpisah dan meneruskannya ke panggilan balik berlangganan, tetapi saya berharap dapat menghindarinya.

MurahSteaks
sumber
Bisakah Anda tidak memindahkan kode yang tersisa (yang Anda ingin menunggu sumbernya) ke dalam .subscribe()pemanggilan metode?
StriplingWarrior

Jawaban:

133

Anda harus berjanji await. Ubah acara berikutnya yang dapat diamati menjadi janji dan tunggu itu.

if (condition) {
  await observable.first().toPromise();
}

Edit catatan: Jawaban ini awalnya menggunakan .take (1) tetapi diubah menjadi menggunakan .first () yang menghindari masalah Promise yang tidak pernah terselesaikan jika streaming berakhir sebelum nilai muncul.

Macil
sumber
3
Daripada mengambil (1), bisakah Anda menggunakan await observable.first().toPromise();?
apricity
14
@apricity Jika tidak ada nilai saat penyelesaian, first()akan mengakibatkan penolakan, dan take(1)akan mengakibatkan janji tertunda.
Estus Flask
6
@apricity @AgentME Sebenarnya Anda TIDAK boleh menggunakan baik take(1)atau first()dalam kasus seperti ini. Karena Anda mengharapkan tepat SATU peristiwa terjadi, Anda harus menggunakan single()yang akan memunculkan pengecualian jika ada lebih dari 1, sementara tidak melempar pengecualian jika tidak ada. Jika ada lebih dari satu kemungkinan ada sesuatu yang salah dalam kode / model data Anda dll. Jika Anda tidak menggunakan tunggal Anda akan berakhir dengan sewenang-wenang memilih item pertama yang kembali tanpa peringatan bahwa ada lebih banyak. Anda harus berhati-hati dalam predikat pada sumber data upstream untuk selalu menjaga urutan yang sama.
ntziolis
3
Jangan lupa impor:import 'rxjs/add/operator/first';
Stephanie
7
Sekarang toPromise () tidak digunakan lagi, bagaimana kita harus melakukan ini?
Jus10
26

Sepertinya harus begitu

await observable.first().toPromise();

Seperti yang dicatat dalam komentar sebelumnya, ada perbedaan substansial antara operator take(1)dan first()ketika ada observasi selesai kosong.

Observable.empty().first().toPromise()akan mengakibatkan penolakan dengan EmptyErroryang dapat ditangani sesuai, karena memang tidak ada nilainya.

Dan Observable.empty().take(1).toPromise()akan menghasilkan resolusi dengan undefinednilai.

Estus Flask
sumber
Sebenarnya tidaktake(1) akan menghasilkan janji yang menunggu. Itu akan menghasilkan janji yang diselesaikan dengan . undefined
Johan t Hart
Terima kasih telah memperhatikan, itu benar. Saya tidak yakin mengapa posnya berbeda, mungkin perilakunya berubah di beberapa titik.
Estus Flask
8

Anda perlu awaitjanji, jadi Anda ingin menggunakan toPromise(). Lihat ini untuk detail lebih lanjut tentang toPromise().

Josh Durham
sumber
4

Jika toPromisetidak berlaku lagi untuk Anda, Anda dapat menggunakan .pipe(take(1)).toPromisetetapi seperti yang Anda lihat di sini , ini tidak usang.

Jadi tolong gunakan saja toPromise(RxJs 6) seperti yang dikatakan:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

async / await contoh:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

Baca lebih lanjut di sini .

Dan tolong hapus klaim yang salah ini mengatakan toPromisetidak berlaku lagi.

Emerica
sumber