Ada cara, tetapi Anda tidak akan menyukainya. Metode berikut mengubah a Future<T>
menjadi CompletableFuture<T>
:
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
Jelas, masalah dengan pendekatan ini adalah, bahwa untuk setiap Masa Depan , sebuah utas akan diblokir untuk menunggu hasil dari Masa Depan - yang bertentangan dengan gagasan tentang masa depan. Dalam beberapa kasus, mungkin dilakukan lebih baik. Namun, secara umum, tidak ada solusi tanpa menunggu secara aktif hasil Masa Depan .
CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())
setidaknya tidak akan memblokir utas kumpulan umum.Jika pustaka yang ingin Anda gunakan juga menawarkan metode gaya panggilan balik selain gaya Future, Anda bisa memberikannya penangan yang menyelesaikan CompletableFuture tanpa pemblokiran thread tambahan. Seperti:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file")); // ... CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>(); open.read(buffer, position, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { completableFuture.complete(buffer); } @Override public void failed(Throwable exc, Void attachment) { completableFuture.completeExceptionally(exc); } }); completableFuture.thenApply(...)
Tanpa callback, satu-satunya cara lain yang saya lihat untuk memecahkan masalah ini adalah dengan menggunakan loop polling yang menempatkan semua
Future.isDone()
pemeriksaan Anda pada satu utas dan kemudian meminta selesai setiap kali Future tersedia.sumber
Jika Anda
Future
adalah hasil dari panggilan ke suatuExecutorService
metode (misalnyasubmit()
), cara termudah adalah menggunakanCompletableFuture.runAsync(Runnable, Executor)
metode tersebut.Dari
untuk
The
CompletableFuture
kemudian dibuat "native".EDIT: Mengejar komentar oleh @SamMefford yang dikoreksi oleh @MartinAndersson, jika Anda ingin meneruskan
Callable
, Anda perlu memanggilsupplyAsync()
, mengubahnyaCallable<T>
menjadiSupplier<T>
, misalnya dengan:CompletableFuture.supplyAsync(() -> { try { return myCallable.call(); } catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value }, myExecutor);
Karena
T Callable.call() throws Exception;
mengeluarkan pengecualian danT Supplier.get();
tidak, Anda harus menangkap pengecualian tersebut agar prototipe kompatibel.sumber
CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
supplyAsync
menerima aSupplier
. Kode tidak akan dikompilasi jika Anda mencoba memasukkan fileCallable
.Callable<T>
menjadiSupplier<T>
.Saya menerbitkan sebuah proyek masa depan kecil yang mencoba membuat lebih baik daripada cara langsung dalam menjawabnya.
Ide utamanya adalah menggunakan satu-satunya utas (dan tentu saja dengan bukan hanya putaran putaran) untuk memeriksa semua status Futures di dalamnya, yang membantu menghindari pemblokiran utas dari kumpulan untuk setiap transformasi Future -> CompletableFuture.
Contoh penggunaan:
sumber
Saran:
http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/
Tapi, pada dasarnya:
public class CompletablePromiseContext { private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor(); public static void schedule(Runnable r) { SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS); } }
Dan, CompletablePromise:
public class CompletablePromise<V> extends CompletableFuture<V> { private Future<V> future; public CompletablePromise(Future<V> future) { this.future = future; CompletablePromiseContext.schedule(this::tryToComplete); } private void tryToComplete() { if (future.isDone()) { try { complete(future.get()); } catch (InterruptedException e) { completeExceptionally(e); } catch (ExecutionException e) { completeExceptionally(e.getCause()); } return; } if (future.isCancelled()) { cancel(true); return; } CompletablePromiseContext.schedule(this::tryToComplete); } }
Contoh:
public class Main { public static void main(String[] args) { final ExecutorService service = Executors.newSingleThreadExecutor(); final Future<String> stringFuture = service.submit(() -> "success"); final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture); completableFuture.whenComplete((result, failure) -> { System.out.println(result); }); } }
sumber
CompletablePromiseContext
tidak-statis dan mengambil param untuk interval pemeriksaan (yang diatur ke 1 ms di sini) kemudian membebaniCompletablePromise<V>
konstruktor untuk dapat menyediakan sendiriCompletablePromiseContext
dengan interval pemeriksaan yang mungkin berbeda (lebih lama) untuk berjalan lama diFuture<V>
mana Anda tidak tidak harus benar-benar dapat menjalankan panggilan balik (atau menulis) segera setelah selesai, dan Anda juga dapat memiliki contohCompletablePromiseContext
untuk menonton satu setFuture
(jika Anda memiliki banyak)Izinkan saya menyarankan opsi lain (semoga, lebih baik): https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata / bersamaan
Singkatnya, idenya adalah sebagai berikut:
CompletableTask<V>
antarmuka - penyatuanCompletionStage<V>
+RunnableFuture<V>
ExecutorService
untuk kembaliCompletableTask
darisubmit(...)
metode (bukanFuture<V>
)Implementasi menggunakan implementasi CompletionStage alternatif (perhatikan, CompletionStage daripada CompletableFuture):
Pemakaian:
J8ExecutorService exec = J8Executors.newCachedThreadPool(); CompletionStage<String> = exec .submit( someCallableA ) .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b) .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
sumber