Perbedaan antara CompletableFuture, Future dan RxJava's Observable

194

Saya ingin tahu perbedaan antara CompletableFuture, Futuredan Observable RxJava.

Yang saya tahu semuanya asinkron tapi

Future.get() memblokir utas

CompletableFuture memberikan metode panggilan balik

RxJava Observable--- mirip CompletableFuturedengan manfaat lainnya (tidak yakin)

Misalnya: jika klien perlu membuat beberapa panggilan layanan dan kapan kami menggunakan Futures(Java) Future.get()akan dieksekusi secara berurutan ... ingin tahu bagaimana yang lebih baik di RxJava ..

Dan dokumentasi http://reactivex.io/intro.html mengatakan

Sulit untuk menggunakan Futures untuk secara optimal menyusun alur eksekusi asinkron bersyarat (atau tidak mungkin, karena latensi setiap permintaan bervariasi pada saat runtime). Ini bisa dilakukan, tentu saja, tetapi dengan cepat menjadi rumit (dan karenanya rawan kesalahan) atau terlalu dini memblokir Future.get (), yang menghilangkan manfaat dari eksekusi asinkron.

Sangat tertarik untuk mengetahui bagaimana RxJavamenyelesaikan masalah ini. Saya merasa sulit untuk memahami dari dokumentasi.

shiv455
sumber
Sudahkah Anda membaca dokumentasi untuk masing-masing? Saya benar-benar tidak terbiasa dengan RxJava, tetapi sekilas dokumentasi tampak sangat menyeluruh. Tampaknya tidak sebanding dengan dua masa depan.
FThompson
Saya telah melalui tetapi tidak bisa mendapatkan betapa berbedanya dengan Java futures ... benar saya jika saya salah
shiv455
Bagaimana bisa diamati mirip dengan berjangka?
FThompson
2
ingin tahu di mana perbedaannya seperti apakah itu berbeda dalam manajemen utas ?? EX: Future.get () memblokir utas .... bagaimana penanganannya di Observable ???
shiv455
2
setidaknya sedikit membingungkan bagi saya ... perbedaan tingkat tinggi akan sangat membantu !!
shiv455

Jawaban:

281

Berjangka

Futures diperkenalkan di Java 5 (2004). Mereka pada dasarnya adalah penampung untuk hasil operasi yang belum selesai. Setelah operasi selesai, Futureakan berisi hasil itu. Sebagai contoh, suatu operasi bisa menjadi contoh Runnable atau Callable yang dikirimkan ke ExecutorService . Pengirim operasi dapat menggunakan Futureobjek untuk memeriksa apakah operasi itu dilakukan () , atau menunggu sampai selesai menggunakan metode blocking get () .

Contoh:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures diperkenalkan di Java 8 (2014). Mereka sebenarnya merupakan evolusi Futures biasa, terinspirasi oleh Google's Listenable Futures , bagian dari perpustakaan Guava . Mereka adalah Futures yang juga memungkinkan Anda untuk merangkai tugas bersama dalam sebuah rantai. Anda dapat menggunakannya untuk memberi tahu beberapa utas pekerja untuk "lakukan tugas X, dan setelah selesai, lakukan hal lain ini menggunakan hasil X". Menggunakan CompletableFutures, Anda dapat melakukan sesuatu dengan hasil operasi tanpa benar-benar memblokir utas untuk menunggu hasilnya. Berikut ini contoh sederhana:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava adalah seluruh perpustakaan untuk pemrograman reaktif yang dibuat di Netflix. Sepintas, sepertinya akan mirip dengan aliran Java 8 . Ya, kecuali itu jauh lebih kuat.

Demikian pula untuk Futures, RxJava dapat digunakan untuk merangkai sekelompok tindakan sinkron atau asinkron untuk membuat pipa pemrosesan. Tidak seperti Futures, yang merupakan sekali pakai, RxJava bekerja pada aliran nol atau lebih item. Termasuk aliran yang tidak pernah berakhir dengan jumlah item yang tak terbatas. Ini juga jauh lebih fleksibel dan kuat berkat serangkaian operator yang luar biasa kaya .

Tidak seperti stream Java 8, RxJava juga memiliki mekanisme tekanan balik , yang memungkinkannya untuk menangani kasus di mana berbagai bagian dari pipa pemrosesan Anda beroperasi di utas yang berbeda, dengan laju yang berbeda .

Kelemahan dari RxJava adalah bahwa meskipun dokumentasi yang solid, ini adalah perpustakaan yang menantang untuk dipelajari karena perubahan paradigma yang terlibat. Kode Rx juga bisa menjadi mimpi buruk untuk debug, terutama jika beberapa utas terlibat, dan bahkan lebih buruk - jika tekanan balik diperlukan.

Jika Anda ingin masuk ke dalamnya, ada seluruh halaman berbagai tutorial di situs web resmi, ditambah dokumentasi resmi dan Javadoc . Anda juga dapat melihat beberapa video seperti ini yang memberikan intro singkat ke Rx dan juga berbicara tentang perbedaan antara Rx dan Futures.

Bonus: Java 9 Streaming Reaktif

Java 9's Reactive Streams alias Flow API adalah seperangkat Antarmuka yang diimplementasikan oleh berbagai perpustakaan aliran reaktif seperti RxJava 2 , Akka Streams , dan Vertx . Mereka memungkinkan pustaka reaktif ini untuk saling berhubungan, sambil mempertahankan semua tekanan balik yang penting.

Malt
sumber
Akan menyenangkan untuk memberikan contoh kode tentang bagaimana Rx melakukan ini
Zinan Xing
Jadi menggunakan Reactive Streams, kita dapat mencampur RxJava, Akka, dan Vertx dalam satu aplikasi?
IgorGanapolsky
1
@IgorGanapolsky Ya.
Malt
Dalam CompletableFutures kami menggunakan metode panggilan balik, metode panggilan balik ini juga akan memblokir jika output dari satu metode adalah input dari panggilan balik lainnya. Sebagai blok di masa depan dengan panggilan Future.get (). Mengapa dikatakan Future.get () memblokir panggilan sementara CompletableFutures tidak memblokir. Tolong jelaskan
Deepak
1
@Federico Sure. Masing Future- masing adalah pengganti untuk hasil tunggal yang mungkin atau mungkin belum selesai. Jika Anda melakukan operasi yang sama lagi, Anda akan mendapatkan Futureinstance baru . RxJava berkaitan dengan aliran hasil yang dapat datang kapan saja. Oleh karena itu serangkaian operasi dapat mengembalikan RxJava tunggal yang dapat diamati yang akan memompa banyak hasil. Agak seperti perbedaan antara amplop pos tunggal dan tabung pneumatik yang terus memompa keluar surat.
Malt
21

Saya telah bekerja dengan Rx Java sejak 0,9, sekarang di 1,3.2 dan segera bermigrasi ke 2.x Saya menggunakan ini dalam proyek pribadi di mana saya sudah bekerja selama 8 tahun.

Saya tidak akan memprogram tanpa perpustakaan ini sama sekali lagi. Pada awalnya saya skeptis tetapi ini adalah kondisi pikiran lain yang harus Anda ciptakan. Tenang sulit pada awalnya. Saya kadang-kadang melihat kelereng selama berjam-jam .. lol

Ini hanya masalah latihan dan benar-benar mengenal aliran (alias kontrak yang dapat diamati dan pengamat), begitu Anda sampai di sana, Anda akan benci melakukannya jika tidak.

Bagi saya sebenarnya tidak ada kerugian pada perpustakaan itu.

Gunakan case: Saya memiliki tampilan monitor yang berisi 9 alat pengukur (cpu, mem, jaringan, dll ...). Saat memulai tampilan, tampilan berlangganan sendiri ke kelas monitor sistem yang mengembalikan yang dapat diamati (interval) yang berisi semua data selama 9 meter. Ini akan mendorong setiap detik hasil baru ke tampilan (jadi jangan polling !!!). Bahwa diamati menggunakan flatmap untuk secara bersamaan (async!) Mengambil data dari 9 sumber yang berbeda dan ritsleting hasilnya menjadi model baru pandangan Anda akan dapatkan di onNext ().

Bagaimana Anda bisa melakukan itu dengan masa depan, selesai dll ... Semoga beruntung! :)

Rx Java memecahkan banyak masalah dalam pemrograman untuk saya dan membuat cara yang jauh lebih mudah ...

Keuntungan:

  • Statelss !!! (Yang penting untuk disebutkan, paling penting mungkin)
  • Manajemen utas di luar kotak
  • Membangun urutan yang memiliki siklus hidup mereka sendiri
  • Semuanya bisa diamati sehingga merantai itu mudah
  • Lebih sedikit kode untuk ditulis
  • Stoples tunggal pada classpath (sangat ringan)
  • Sangat bersamaan
  • Tidak ada panggilan balik lagi
  • Berbasis pelanggan (kontrak ketat antara konsumen dan produsen)
  • Strategi backpressure (seperti pemutus sirkuit)
  • Menangani dan memulihkan kesalahan yang indah
  • Dokumentasi yang sangat bagus (kelereng <3)
  • Kontrol penuh
  • Masih banyak lagi ...

Kekurangan: - Sulit untuk diuji

Kristoffer
sumber
13
~ " Aku tidak akan memprogram tanpa perpustakaan ini sama sekali lagi. " Jadi RxJava adalah yang paling baik untuk semua proyek perangkat lunak?
IgorGanapolsky
Apakah ini berguna bahkan jika saya tidak memiliki Stream peristiwa Asynchronous?
Mukesh Verma