Menunggu daftar Masa Depan

145

Saya memiliki metode yang mengembalikan masa Listdepan

List<Future<O>> futures = getFutures();

Sekarang saya ingin menunggu sampai semua futures selesai diproses dengan sukses atau salah satu tugas yang outputnya dikembalikan oleh pelemparan masa depan. Bahkan jika satu tugas melempar pengecualian, tidak ada gunanya menunggu untuk masa depan lainnya.

Pendekatan yang sederhana adalah dengan

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

Tapi masalahnya di sini adalah jika, misalnya, 4 masa depan melempar pengecualian, maka saya akan menunggu tidak perlu untuk 3 berjangka pertama yang akan tersedia.

Bagaimana cara mengatasinya? Apakah menghitung mundur kait akan membantu? Saya tidak dapat menggunakan Masa Depan isDonekarena kata java doc

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
pengguna93796
sumber
1
siapa yang menghasilkan masa depan itu? Mereka jenis apa? Antarmuka java.util.concurrent.Future tidak menyediakan fungsionalitas yang Anda inginkan, satu-satunya cara adalah menggunakan Futures Anda sendiri dengan callback.
Alexei Kaigorodov
Anda dapat membuat contoh ExecutionServiceuntuk setiap "batch" tugas, mengirimkannya ke sana, lalu segera mematikan layanan dan menggunakannya awaitTermination(), saya kira.
milimoose
Anda bisa menggunakan a CountDownLatchjika Anda membungkus tubuh semua masa depan Anda dalam try..finallyuntuk memastikan kaitnya juga berkurang.
milimoose
docs.oracle.com/javase/7/docs/api/java/util/concurrent/… melakukan apa yang Anda butuhkan.
assylias
@AlexeiKaigorodov YA, masa depan saya adalah tipe java.util.concurrent.I menggugat masa depan dengan callable. Saya mendapatkan Futture ketika saya mengirimkan tugas ke layanan execureervice
user93796

Jawaban:

124

Anda dapat menggunakan CompletionService untuk menerima futures segera setelah mereka siap dan jika salah satu dari mereka melempar pengecualian membatalkan pemrosesan. Sesuatu seperti ini:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

Saya pikir Anda dapat lebih meningkatkan untuk membatalkan tugas yang masih berjalan jika salah satu dari mereka melakukan kesalahan.

dcernahoschi
sumber
1
: Kode Anda memiliki masalah yang sama dengan yang saya sebutkan di posting saya. Jika di masa depan ada pengecualian maka kode tersebut masih menunggu 1,2,3 di masa depan untuk diselesaikan. atau akan completSerice.take) akan mengembalikan masa depan yang selesai terlebih dahulu?
user93796
1
Bagaimana dengan timeout? Dapatkah saya memberi tahu layanan penyelesaian untuk menunggu X detik maksimal?
user93796
1
Seharusnya tidak. Itu tidak mengulangi masa depan, tetapi begitu seseorang siap itu diproses / diverifikasi jika tidak ada pengecualian.
dcernahoschi
2
Untuk batas waktu menunggu masa depan muncul di antrian ada metode polling (detik) pada CompletionService.
dcernahoschi
Berikut adalah contoh kerja di github: github.com/princegoyal1987/FutureDemo
user18853
107

Jika Anda menggunakan Java 8 maka Anda dapat melakukan ini dengan lebih mudah dengan CompletableFuture dan CompletableFuture.allOf , yang menerapkan panggilan balik hanya setelah semua CompletableFutures yang disediakan selesai.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}
Andrejs
sumber
3
Hai @Andrejs, bisa tolong jelaskan apa yang dilakukan cuplikan kode ini. Saya melihat ini disarankan di banyak tempat tetapi saya bingung apa yang sebenarnya terjadi. Bagaimana pengecualian ditangani jika salah satu utas gagal?
VSEWHGHP
2
@VSEWHGHP Dari javadoc: Jika salah satu dari CompletableFutures yang diberikan lengkap secara luar biasa, maka CompletableFuture yang dikembalikan juga melakukannya, dengan CompletionException memegang pengecualian ini sebagai penyebabnya.
Andrejs
1
Benar jadi saya menindaklanjutinya, apakah ada cara untuk menggunakan potongan ini tetapi mendapatkan nilai untuk semua utas lainnya yang berhasil diselesaikan? Haruskah saya hanya mengulang daftar CompletableFutures dan mengabaikan panggilan CompletableFuture <Daftar <T>> karena fungsi urutan memastikan semua utas lengkap baik dengan hasil atau pengecualian?
VSEWHGHP
6
Ini memecahkan masalah yang berbeda. Jika Anda memiliki Futureinstance, Anda tidak dapat menerapkan metode ini. Tidak mudah dikonversi Futuremenjadi CompletableFuture.
Jarekczek
itu tidak akan berfungsi jika kita memiliki pengecualian dalam beberapa tugas.
slisnychyi
21

Gunakan CompletableFuturedi Java 8

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());
sendon1982
sumber
1
Ini harus menjadi jawaban yang diterima. Juga merupakan bagian dari dokumentasi resmi Spring: spring.io/guides/gs/async-method
maaw
Bekerja seperti yang diharapkan.
Dimon
15

Anda dapat menggunakan ExecutorCompletionService . Dokumentasi tersebut bahkan memiliki contoh kasus penggunaan yang tepat:

Anggaplah sebaliknya Anda ingin menggunakan hasil non-nol pertama dari serangkaian tugas, mengabaikan semua yang menemukan pengecualian, dan membatalkan semua tugas lain saat yang pertama siap:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

Yang penting untuk diperhatikan di sini adalah bahwa ecs.take () akan mendapatkan tugas yang selesai pertama , bukan hanya yang pertama kali disampaikan. Dengan demikian Anda harus mendapatkannya dalam urutan menyelesaikan eksekusi (atau melempar pengecualian).

jmiserez
sumber
3

Jika Anda menggunakan Java 8 dan tidak ingin memanipulasi CompletableFuture, saya telah menulis alat untuk mengambil hasil untuk List<Future<T>>streaming menggunakan. Kuncinya adalah Anda dilarang melakukan map(Future::get)lemparan.

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

Ini membutuhkan AggregateExceptionyang berfungsi seperti C # 's

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

Komponen ini bertindak persis seperti Task.WaitAll C # . Saya sedang mengerjakan varian yang melakukan hal yang sama CompletableFuture.allOf(equivalento to Task.WhenAll)

Alasan mengapa saya melakukan ini adalah karena saya menggunakan Spring ListenableFuturedan tidak ingin port ke CompletableFuturemeskipun itu adalah cara yang lebih standar

usr-local-ΕΨΗΕΛΩΝ
sumber
1
Suara positif untuk melihat kebutuhan akan AggregateException yang setara.
granadaCoder
Contoh menggunakan fasilitas ini akan menyenangkan.
XDS
1

Jika Anda ingin menggabungkan Daftar CompletableFutures, Anda bisa melakukan ini:

List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures

// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()]));

// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

Untuk detail lebih lanjut tentang Future & CompletableFuture, tautan berguna:
1. Masa Depan: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3. CompletableFuture: https : //www.callicoder.com/java-8-completablefuture-tutorial/

Bohao LI
sumber
0

mungkin ini akan membantu (tidak ada yang akan diganti dengan utas mentah, yeah!) Saya sarankan menjalankan setiap Futureorang dengan utas terpisah (mereka berjalan paralel), maka ketika salah satu dari kesalahan didapat, itu hanya memberi sinyal manajer ( Handlerkelas).

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

Saya harus mengatakan kode di atas akan error (tidak memeriksa), tapi saya harap saya bisa menjelaskan solusinya. tolong coba.


sumber
0
 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };
Mohamed.Abdo
sumber
0

CompletionService akan mengambil Callables Anda dengan metode .submit () dan Anda dapat mengambil futures yang dihitung dengan metode .take ().

Satu hal yang tidak boleh Anda lupakan adalah menghentikan ExecutorService dengan memanggil metode .shutdown (). Anda juga hanya dapat memanggil metode ini ketika Anda telah menyimpan referensi ke layanan pelaksana jadi pastikan untuk menyimpannya.

Kode contoh - Untuk sejumlah item pekerjaan yang harus dikerjakan secara paralel:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

Kode contoh - Untuk sejumlah item pekerjaan yang dinamis untuk dikerjakan secara paralel:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}
fl0w
sumber
0

Saya memiliki kelas utilitas yang berisi ini:

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

Setelah Anda memilikinya, menggunakan impor statis, Anda dapat dengan mudah menunggu semua berjangka seperti ini:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

Anda juga dapat mengumpulkan semua hasil mereka seperti ini:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

Hanya meninjau kembali pos lama saya dan memperhatikan bahwa Anda memiliki kesedihan lain:

Tapi masalahnya di sini adalah jika, misalnya, 4 masa depan melempar pengecualian, maka saya akan menunggu tidak perlu untuk 3 berjangka pertama yang akan tersedia.

Dalam hal ini, solusi sederhana adalah melakukan ini secara paralel:

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

Dengan cara ini pengecualian pertama, meskipun tidak akan menghentikan masa depan, akan merusak pernyataan forEach, seperti pada contoh serial, tetapi karena semua menunggu secara paralel, Anda tidak perlu menunggu 3 yang pertama selesai.

Brixomatic
sumber
0
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Stack2 {   
    public static void waitFor(List<Future<?>> futures) {
        List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed
        while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks
            Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator();
            while (futureCopiesIterator.hasNext()) {
                Future<?> future = futureCopiesIterator.next();
                if (future.isDone()) {//already done
                    futureCopiesIterator.remove();
                    try {
                        future.get();// no longer waiting
                    } catch (InterruptedException e) {
                        //ignore
                        //only happen when current Thread interrupted
                    } catch (ExecutionException e) {
                        Throwable throwable = e.getCause();// real cause of exception
                        futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed
                        return;
                    }
                }
            }
        }
    }
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        Runnable runnable1 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
            }
        };
        Runnable runnable2 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                }
            }
        };


        Runnable fail = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(1000);
                    throw new RuntimeException("bla bla bla");
                } catch (InterruptedException e) {
                }
            }
        };

        List<Future<?>> futures = Stream.of(runnable1,fail,runnable2)
                .map(executorService::submit)
                .collect(Collectors.toList());

        double start = System.nanoTime();
        waitFor(futures);
        double end = (System.nanoTime()-start)/1e9;
        System.out.println(end +" seconds");

    }
}
Farhad Baghirov
sumber