Gunakan case untuk penjadwal RxJava

253

Di RxJava ada 5 penjadwal berbeda untuk dipilih:

  1. direct () : Membuat dan mengembalikan Penjadwal yang menjalankan pekerjaan segera di utas saat ini.

  2. trampoline () : Membuat dan mengembalikan Penjadwal yang antrean bekerja di utas saat ini untuk dieksekusi setelah pekerjaan saat ini selesai.

  3. newThread () : Membuat dan mengembalikan Penjadwal yang membuat Thread baru untuk setiap unit kerja.

  4. perhitungan () : Membuat dan mengembalikan Penjadwal yang dimaksudkan untuk pekerjaan komputasi. Ini dapat digunakan untuk acara-loop, memproses panggilan balik dan pekerjaan komputasi lainnya. Jangan melakukan pekerjaan yang terikat IO pada penjadwal ini. Gunakan Penjadwal. io () sebagai gantinya.

  5. io () : Membuat dan mengembalikan Penjadwal yang ditujukan untuk pekerjaan yang terikat IO. Implementasi didukung oleh thread-pelaksana yang akan tumbuh sesuai kebutuhan. Ini dapat digunakan untuk melakukan pemblokiran IO secara asinkron. Jangan melakukan pekerjaan komputasi pada penjadwal ini. Gunakan Penjadwal. perhitungan () sebagai gantinya.

Pertanyaan:

3 penjadwal pertama cukup jelas; Namun, saya agak bingung tentang perhitungan dan io .

  1. Apa sebenarnya "pekerjaan terikat IO"? Apakah ini digunakan untuk berurusan dengan stream ( java.io) dan file ( java.nio.files)? Apakah ini digunakan untuk permintaan basis data? Apakah ini digunakan untuk mengunduh file atau mengakses REST API?
  2. Apa perbedaan komputasi () dengan newThread () ? Apakah itu semua perhitungan () panggilan berada di utas tunggal (latar belakang) alih-alih utas baru (latar belakang) setiap kali?
  3. Mengapa memanggil komputasi itu buruk () ketika melakukan IO bekerja buruk?
  4. Mengapa memanggil io () saat melakukan pekerjaan komputasi itu buruk ?
bcorso
sumber

Jawaban:

332

Pertanyaan bagus, saya pikir dokumentasi bisa dilakukan dengan lebih detail.

  1. io()didukung oleh thread-pool yang tidak terbatas dan merupakan jenis hal yang akan Anda gunakan untuk tugas-tugas non-komputasi intensif, yaitu hal-hal yang tidak banyak memuat CPU. Jadi ya interaksi dengan sistem file, interaksi dengan database atau layanan pada host yang berbeda adalah contoh yang baik.
  2. computation()didukung oleh thread-pool yang dibatasi dengan ukuran yang sama dengan jumlah prosesor yang tersedia. Jika Anda mencoba menjadwalkan pekerjaan intensif CPU secara paralel di lebih dari prosesor yang tersedia (katakanlah menggunakannewThread() ) maka Anda siap untuk overhead penciptaan thread dan konteks beralih overhead sebagai thread bersaing untuk prosesor dan berpotensi hit kinerja besar.
  3. Yang terbaik adalah pergi computation() pekerjaan intensif CPU hanya jika Anda tidak akan mendapatkan pemanfaatan CPU yang baik.
  4. Sangat buruk untuk memanggil io()pekerjaan komputasi karena alasan yang dibahas dalam 2. io()tidak terikat dan jika Anda menjadwalkan seribu tugas komputasi io()secara paralel maka masing-masing dari seribu tugas tersebut masing-masing akan memiliki utas masing-masing dan bersaing untuk CPU yang menimbulkan biaya penggantian konteks.
Dave Moten
sumber
5
Melalui keakraban dengan sumber RxJava. Itu adalah sumber kebingungan bagi saya untuk waktu yang lama dan saya pikir dokumentasi perlu ditingkatkan dalam hal ini.
Dave Moten
2
@IgorGanapolsky Saya menduga itu adalah sesuatu yang jarang ingin Anda lakukan. Membuat utas baru untuk setiap unit pekerjaan jarang kondusif bagi efisiensi karena utas mahal untuk dibangun dan diruntuhkan. Anda biasanya ingin menggunakan kembali utas yang perhitungan () dan penjadwal lainnya lakukan. Satu-satunya waktu newThread () mungkin memiliki penggunaan yang sah (setidaknya saya bisa memikirkan) adalah memulai tugas-tugas yang terisolasi, jarang, berjalan lama. Bahkan kemudian saya mungkin menggunakan io () untuk skenario itu.
tmn
4
Bisakah Anda menunjukkan contoh di mana trampolin () akan bermanfaat? Saya mengerti konsepnya tetapi saya tidak dapat menemukan skenario yang akan saya gunakan dalam praktik. Ini satu-satunya penjadwal yang id masih menjadi misteri bagi saya
tmn
32
Untuk panggilan jaringan gunakan Schedulers.io () dan jika Anda perlu membatasi jumlah panggilan jaringan simultan gunakan Scheduler.from (Executors.newFixedThreadPool (n)).
Dave Moten
4
Anda mungkin berpikir bahwa menempatkan timeoutsecara default pada computation()Anda akan memblokir utas tetapi bukan itu masalahnya. Di bawah selimut computation()menggunakan tindakan ScheduledExecutorServicesehingga waktu tertunda tidak menghalangi. Mengingat fakta ini computation()adalah ide yang baik karena jika berada di utas lain maka kita akan dikenakan biaya penggantian ulir.
Dave Moten
3

Poin yang paling penting adalah bahwa kedua penjadwal.io dan Penjadwal.komputasi didukung oleh kumpulan utas yang tidak terikat sebagai lawan dari yang lain yang disebutkan dalam pertanyaan. Karakteristik ini hanya dibagikan oleh Schedulers.from (Executor) dalam hal Executor dibuat dengan newCachedThreadPool (tidak terikat dengan kumpulan utas reklamasi otomatis).

Sebagaimana banyak dijelaskan dalam tanggapan sebelumnya dan beberapa artikel di web, Penjadwal.io dan Penjadwal. Perhitungan harus digunakan dengan hati-hati karena mereka dioptimalkan untuk jenis pekerjaan atas nama mereka. Tapi, menurut saya, peran mereka yang paling penting adalah memberikan konkurensi nyata ke aliran reaktif .

Bertentangan dengan kepercayaan pendatang baru, aliran reaktif tidak secara bersamaan berbarengan tetapi secara inheren tidak sinkron dan berurutan. Karena alasan ini, Schedulers.io hanya akan digunakan ketika operasi I / O memblokir (misalnya: menggunakan perintah pemblokiran seperti Apache IOUtils FileUtils.readFileAsString (...) ) sehingga akan membekukan utas panggilan sampai operasi selesai

Menggunakan metode asinkron seperti Java AsynchronousFileChannel (...) tidak akan memblokir utas panggilan selama operasi sehingga tidak ada gunanya menggunakan utas terpisah. Bahkan, Schedulers.io thread sebenarnya tidak cocok untuk operasi asinkron karena mereka tidak menjalankan loop acara dan panggilan balik tidak akan pernah ... dipanggil.

Logika yang sama berlaku untuk akses basis data atau panggilan API jarak jauh. Jangan gunakan Schedulers.io jika Anda dapat menggunakan API asinkron atau reaktif untuk melakukan panggilan.

Kembali ke konkurensi. Anda mungkin tidak memiliki akses ke async atau API reaktif untuk melakukan operasi I / O secara serempak atau serempak, jadi satu-satunya alternatif Anda adalah mengirim beberapa panggilan pada utas terpisah. Alas, Reaktif sungai yang berurutan di ujungnya tetapi kabar baiknya adalah bahwa para flatMap () operator dapat memperkenalkan concurrency di inti mereka .

Konkurensi harus dibangun dalam konstruk stream, biasanya menggunakan operator flatMap () . Operator yang kuat ini dapat dikonfigurasikan untuk secara internal menyediakan konteks multi-utas ke flatMap Anda () Fungsi yang tertanam <T, R>. Konteks itu disediakan oleh Penjadwal multi-utas seperti Penjadwal.io atau Penjadwal.komputasi .

Temukan detail lebih lanjut dalam artikel tentang Penjadwal dan Konkurensi RxJava2 di mana Anda akan menemukan contoh kode dan penjelasan terperinci tentang cara menggunakan Penjadwal secara berurutan dan bersamaan.

Semoga ini membantu,

Softjake

softjake
sumber
2

Posting blog ini memberikan jawaban yang sangat baik

Dari posting blog:

Schedulers.io () didukung oleh kumpulan utas tak terbatas. Ini digunakan untuk pekerjaan tipe I / O non-intensif CPU termasuk interaksi dengan sistem file, melakukan panggilan jaringan, interaksi basis data, dll. Kumpulan utas ini dimaksudkan untuk digunakan untuk melakukan pemblokiran IO secara asinkron.

Penjadwal.komputasi () didukung oleh kumpulan utas terbatas dengan ukuran hingga jumlah prosesor yang tersedia. Ini digunakan untuk pekerjaan komputasi atau CPU-intensif seperti mengubah ukuran gambar, memproses set data besar, dll. Berhati-hatilah: ketika Anda mengalokasikan lebih banyak thread komputasi daripada core yang tersedia, kinerja akan menurun karena pengalihan konteks dan overhead pembuatan thread saat thread bersaing untuk waktu prosesor.

Schedulers.newThread () membuat utas baru untuk setiap unit pekerjaan yang dijadwalkan. Penjadwal ini mahal karena utas baru muncul setiap kali dan tidak ada penggunaan ulang yang terjadi.

Schedulers.from (Pelaksana pelaksana) membuat dan mengembalikan penjadwal khusus yang didukung oleh pelaksana yang ditentukan. Untuk membatasi jumlah utas simultan di kumpulan utas, gunakan Scheduler.from (Executors.newFixedThreadPool (n)). Ini menjamin bahwa jika tugas dijadwalkan ketika semua utas ditempati, itu akan diantrekan. Utas di kumpulan akan ada sampai ditutup secara eksplisit.

Utas utama atau AndroidSchedulers.mainThread () disediakan oleh pustaka ekstensi RxAndroid ke RxJava. Utas utama (juga dikenal sebagai utas UI) adalah tempat interaksi pengguna terjadi. Harus berhati-hati untuk tidak membebani utas ini untuk mencegah UI yang tidak responsif atau, lebih buruk lagi, dialog Aplikasi Tidak Menanggapi ”(ANR).

Schedulers.single () adalah baru di RxJava 2. Penjadwal ini didukung oleh satu thread melaksanakan tugas secara berurutan dalam urutan yang diminta.

Schedulers.trampoline () menjalankan tugas dengan cara FIFO (Masuk Pertama, Keluar Pertama) oleh salah satu utas pekerja yang berpartisipasi. Ini sering digunakan saat menerapkan rekursi untuk menghindari bertambahnya tumpukan panggilan.

joe
sumber