Ubah Java Future menjadi CompletableFuture

97

Java 8 memperkenalkan CompletableFuture, implementasi baru Future yang dapat disusun (termasuk sekumpulan metode thenXxx). Saya ingin menggunakan ini secara eksklusif, tetapi banyak pustaka yang ingin saya gunakan hanya mengembalikan Futurecontoh yang tidak dapat disusun .

Apakah ada cara untuk merangkum Futurecontoh yang dikembalikan di dalam a CompleteableFuturesehingga saya bisa membuatnya?

Dan Midwood
sumber

Jawaban:

57

Ada cara, tetapi Anda tidak akan menyukainya. Metode berikut mengubah a Future<T>menjadi CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

Jelas, masalah dengan pendekatan ini adalah, bahwa untuk setiap Masa Depan , sebuah utas akan diblokir untuk menunggu hasil dari Masa Depan - yang bertentangan dengan gagasan tentang masa depan. Dalam beberapa kasus, mungkin dilakukan lebih baik. Namun, secara umum, tidak ada solusi tanpa menunggu secara aktif hasil Masa Depan .

nosid
sumber
1
Ha, itulah yang saya tulis sebelum berpikir bahwa pasti ada cara yang lebih baik. Tapi, saya rasa tidak
Dan Midwood
12
Hmmm ... bukankah solusi ini memakan salah satu utas "kumpulan umum", hanya untuk menunggu? Benang "common pool" itu tidak boleh menghalangi ... hmmmm ...
Peti
1
@Peti: Anda benar. Namun, intinya adalah, jika Anda kemungkinan besar melakukan sesuatu yang salah, terlepas dari apakah Anda menggunakan kumpulan umum atau kumpulan utas tak terbatas .
nosid
4
Ini mungkin tidak sempurna, tetapi menggunakan CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())setidaknya tidak akan memblokir utas kumpulan umum.
MikeFHay
6
Tolong, jangan pernah lakukan itu
Laymain
56

Jika pustaka yang ingin Anda gunakan juga menawarkan metode gaya panggilan balik selain gaya Future, Anda bisa memberikannya penangan yang menyelesaikan CompletableFuture tanpa pemblokiran thread tambahan. Seperti:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

Tanpa callback, satu-satunya cara lain yang saya lihat untuk memecahkan masalah ini adalah dengan menggunakan loop polling yang menempatkan semua Future.isDone()pemeriksaan Anda pada satu utas dan kemudian meminta selesai setiap kali Future tersedia.

Kafkaesque
sumber
1
Saya menggunakan pustaka async Apache Http yang menerima FutureCallback. Itu membuat hidup saya mudah :)
Abhishek Gayakwad
13

Jika Anda Futureadalah hasil dari panggilan ke suatu ExecutorServicemetode (misalnya submit()), cara termudah adalah menggunakan CompletableFuture.runAsync(Runnable, Executor)metode tersebut.

Dari

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

untuk

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

The CompletableFuturekemudian dibuat "native".

EDIT: Mengejar komentar oleh @SamMefford yang dikoreksi oleh @MartinAndersson, jika Anda ingin meneruskan Callable, Anda perlu memanggil supplyAsync(), mengubahnya Callable<T>menjadi Supplier<T>, misalnya dengan:

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value
}, myExecutor);

Karena T Callable.call() throws Exception;mengeluarkan pengecualian dan T Supplier.get();tidak, Anda harus menangkap pengecualian tersebut agar prototipe kompatibel.

Matthieu
sumber
1
Atau, jika Anda menggunakan Callable <T> daripada Runnable, coba supplyAsync:CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
Sam Mefford
@SamMefford terima kasih, saya mengedit untuk menyertakan informasi itu.
Matthieu
supplyAsyncmenerima a Supplier. Kode tidak akan dikompilasi jika Anda mencoba memasukkan file Callable.
Martin Andersson
@MartinAndersson benar, terima kasih. Saya mengedit lebih lanjut untuk mengubah a Callable<T>menjadi Supplier<T>.
Matthieu
10

Saya menerbitkan sebuah proyek masa depan kecil yang mencoba membuat lebih baik daripada cara langsung dalam menjawabnya.

Ide utamanya adalah menggunakan satu-satunya utas (dan tentu saja dengan bukan hanya putaran putaran) untuk memeriksa semua status Futures di dalamnya, yang membantu menghindari pemblokiran utas dari kumpulan untuk setiap transformasi Future -> CompletableFuture.

Contoh penggunaan:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
Dmitry Spikhalskiy
sumber
Ini terlihat menarik. Apakah itu menggunakan utas pengatur waktu? Kenapa ini bukan jawaban yang diterima?
Kira
@Kira Ya, pada dasarnya ini menggunakan satu utas pengatur waktu untuk menunggu semua kontrak berjangka yang dikirimkan.
Dmitry Spikhalskiy
8

Saran:

http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

Tapi, pada dasarnya:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

Dan, CompletablePromise:

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

Contoh:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}
Gabriel Francisco
sumber
Ini cukup sederhana untuk dipertimbangkan & elegan & cocok untuk sebagian besar kasus penggunaan. Saya akan membuat CompletablePromiseContext tidak-statis dan mengambil param untuk interval pemeriksaan (yang diatur ke 1 ms di sini) kemudian membebani CompletablePromise<V>konstruktor untuk dapat menyediakan sendiri CompletablePromiseContextdengan interval pemeriksaan yang mungkin berbeda (lebih lama) untuk berjalan lama di Future<V>mana Anda tidak tidak harus benar-benar dapat menjalankan panggilan balik (atau menulis) segera setelah selesai, dan Anda juga dapat memiliki contoh CompletablePromiseContextuntuk menonton satu set Future(jika Anda memiliki banyak)
Dexter Legaspi
5

Izinkan saya menyarankan opsi lain (semoga, lebih baik): https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata / bersamaan

Singkatnya, idenya adalah sebagai berikut:

  1. Perkenalkan CompletableTask<V>antarmuka - penyatuan CompletionStage<V>+RunnableFuture<V>
  2. Warp ExecutorServiceuntuk kembali CompletableTaskdari submit(...)metode (bukan Future<V>)
  3. Selesai, kita punya Futures yang bisa dijalankan DAN disusun.

Implementasi menggunakan implementasi CompletionStage alternatif (perhatikan, CompletionStage daripada CompletableFuture):

Pemakaian:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 
Valery Silaev
sumber
2
Pembaruan kecil: kode dipindahkan ke proyek terpisah, github.com/vsilaev/tascalate-concurrent , dan sekarang dimungkinkan untuk menggunakan Executor-s out-of-the-way dari java.util.concurrent.
Valery Silaev