ExecutorService yang menyela tugas setelah waktu tunggu

95

Saya mencari implementasi ExecutorService yang dapat diberikan dengan waktu tunggu. Tugas-tugas yang dikirimkan ke ExecutorService akan terganggu jika mereka membutuhkan waktu lebih lama dari waktu tunggu untuk dijalankan. Menerapkan binatang seperti itu bukanlah tugas yang sulit, tetapi saya bertanya-tanya apakah ada yang tahu tentang implementasi yang ada.

Inilah yang saya dapatkan berdasarkan beberapa diskusi di bawah ini. Ada komentar?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}
Edward Dale
sumber
Apakah itu 'waktu mulai' dari waktu tunggu waktu pengiriman? Atau saat tugas mulai dijalankan?
Tim Bender
Pertanyaan bagus. Saat itu mulai dijalankan. Agaknya menggunakan protected void beforeExecute(Thread t, Runnable r)kail.
Edward Dale
@ scompt.com apakah Anda masih menggunakan solusi ini atau telah digantikan
Paul Taylor
@PaulTaylor Pekerjaan tempat saya menerapkan solusi ini telah digantikan. :-)
Edward Dale
Saya membutuhkan ini, kecuali a) Saya perlu layanan penjadwal utama saya menjadi kumpulan utas dengan utas layanan tunggal karena perlu tugas saya untuk dijalankan secara bersamaan dan b) Saya harus dapat menentukan durasi waktu tunggu untuk setiap tugas di saat tugas diserahkan. Saya telah mencoba menggunakan ini sebagai titik awal tetapi memperpanjang ScheduledThreadPoolExecutor, tetapi saya tidak dapat melihat cara untuk mendapatkan durasi batas waktu yang ditentukan yang akan ditentukan pada waktu pengiriman tugas hingga metode beforeExecute. Setiap saran sangat dihargai!
Michael Ellis

Jawaban:

91

Anda dapat menggunakan ScheduledExecutorService untuk ini. Pertama Anda akan mengirimkannya hanya sekali untuk segera memulai dan mempertahankan masa depan yang telah dibuat. Setelah itu Anda dapat mengirimkan tugas baru yang akan membatalkan masa depan yang dipertahankan setelah beberapa periode waktu.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

Ini akan menjalankan penangan Anda (fungsi utama akan diinterupsi) selama 10 detik, kemudian akan membatalkan (yaitu menginterupsi) tugas spesifik tersebut.

John Vint
sumber
13
Ide yang menarik, tetapi bagaimana jika tugas selesai sebelum batas waktu (yang biasanya akan dilakukan)? Saya lebih suka tidak memiliki banyak tugas pembersihan yang menunggu untuk dijalankan hanya untuk mengetahui tugas yang ditugaskan telah selesai. Akan perlu ada utas lain yang memantau Masa Depan saat mereka selesai menghapus tugas pembersihan mereka.
Edward Dale
4
Pelaksana hanya akan menjadwalkan pembatalan ini satu kali. Jika tugas selesai maka pembatalan adalah tidak ada operasi dan pekerjaan terus tidak berubah. Hanya perlu ada satu thread tambahan yang melakukan scheudling untuk membatalkan tugas dan satu thread untuk menjalankannya. Anda dapat memiliki dua pelaksana, satu untuk menyerahkan tugas utama Anda dan satu lagi untuk membatalkannya.
John Vint
3
Itu benar, tetapi bagaimana jika batas waktu adalah 5 jam dan dalam waktu itu 10 ribu tugas dijalankan. Saya ingin menghindari semua no-op yang ada di sekitar mengambil memori dan menyebabkan pengalihan konteks.
Edward Dale
1
@Computer Belum tentu. Akan ada 10k pemanggilan future.cancel (), namun jika masa depan selesai maka pembatalan akan cepat keluar dan tidak melakukan pekerjaan yang tidak perlu. Jika Anda tidak ingin 10k pemanggilan pembatalan ekstra maka ini mungkin tidak berhasil, tetapi jumlah pekerjaan yang diselesaikan saat tugas diselesaikan sangat kecil.
John Vint
6
@ John W .: Saya baru menyadari masalah lain dengan penerapan Anda. Saya memerlukan waktu tunggu untuk memulai saat tugas mulai dijalankan, seperti yang saya komentari sebelumnya. Saya pikir satu-satunya cara untuk melakukannya adalah dengan menggunakan beforeExecutekail.
Edward Dale
6

Sayangnya solusinya cacat. Ada semacam bug dengan ScheduledThreadPoolExecutor, juga dilaporkan dalam pertanyaan ini : membatalkan tugas yang dikirimkan tidak sepenuhnya melepaskan sumber daya memori yang terkait dengan tugas; sumber daya dilepaskan hanya saat tugas berakhir.

Oleh karena itu, jika Anda membuat TimeoutThreadPoolExecutordengan waktu kedaluwarsa yang cukup lama (penggunaan biasa), dan mengirimkan tugas dengan cukup cepat, Anda akhirnya akan mengisi memori - meskipun tugas sebenarnya berhasil diselesaikan.

Anda dapat melihat masalah dengan program uji (sangat kasar) berikut:

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

Program ini menghabiskan memori yang tersedia, meskipun menunggu hingga pemijahan Runnableselesai.

Saya memikirkan hal ini untuk sementara waktu, tetapi sayangnya saya tidak dapat menemukan solusi yang baik.

EDIT: Saya menemukan masalah ini dilaporkan sebagai bug JDK 6602600 , dan tampaknya telah diperbaiki baru-baru ini.

Flavio
sumber
4

Gabungkan tugas di FutureTask dan Anda bisa menentukan waktu tunggu untuk FutureTask. Lihat contoh dalam jawaban saya untuk pertanyaan ini,

Java native Process timeout

ZZ Coder
sumber
1
Saya menyadari ada beberapa cara untuk melakukan ini dengan menggunakan java.util.concurrentkelas, tetapi saya sedang mencari ExecutorServiceimplementasi.
Edward Dale
1
Jika Anda mengatakan bahwa Anda ingin ExecutorService menyembunyikan fakta bahwa waktu tunggu sedang ditambahkan dari kode klien, Anda dapat mengimplementasikan ExecutorService Anda sendiri yang membungkus setiap runnable yang diserahkan kepadanya dengan FutureTask sebelum mengeksekusinya.
erikprice
2

Setelah banyak waktu untuk survei,
Akhirnya, saya menggunakan invokeAllmetode ExecutorServiceuntuk memecahkan masalah ini.
Itu akan mengganggu tugas saat tugas berjalan.
Berikut ini contohnya

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

Pro adalah Anda juga dapat mengirimkan ListenableFuturepada saat yang sama ExecutorService.
Ubah sedikit baris kode pertama.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorServiceadalah fitur Mendengarkan ExecutorServicedi proyek google guava ( com.google.guava ))

Johnny
sumber
2
Terima kasih telah menunjukkannya invokeAll. Itu bekerja dengan sangat baik. Hanya kata hati-hati bagi siapa pun yang berpikir untuk menggunakan ini: meskipun invokeAllmengembalikan daftar Futureobjek, sebenarnya tampaknya operasi pemblokiran.
mxro
1

Tampaknya masalah bukan pada bug JDK 6602600 (itu diselesaikan pada 2010-05-22), tetapi dalam panggilan tidur yang salah (10) dalam lingkaran. Sebagai catatan tambahan, Thread utama harus memberikan KESEMPATAN langsung ke thread lain untuk merealisasikan tugas mereka dengan memanggil SLEEP (0) di SETIAP cabang lingkaran luar. Lebih baik, saya pikir, menggunakan Thread.yield () daripada Thread.sleep (0)

Hasil koreksi bagian dari kode masalah sebelumnya seperti ini:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

Ini bekerja dengan benar dengan jumlah penghitung luar hingga 150.000.000 lingkaran yang diuji.

Sergey
sumber
1

Menggunakan jawaban John W, saya membuat implementasi yang memulai waktu tunggu dengan benar saat tugas mulai dieksekusi. Saya bahkan menulis tes unit untuk itu :)

Namun, ini tidak sesuai dengan kebutuhan saya karena beberapa operasi IO tidak mengganggu saat Future.cancel()dipanggil (yaitu saat Thread.interrupt()dipanggil). Beberapa contoh operasi IO yang mungkin tidak terputus saat Thread.interrupt()dipanggil adalah Socket.connectdan Socket.read(dan saya menduga sebagian besar operasi IO diterapkan di java.io). Semua operasi IO di java.nioharus dapat diinterupsi saat Thread.interrupt()dipanggil. Misalnya, kasus untuk SocketChannel.opendan SocketChannel.read.

Pokoknya jika ada yang tertarik, saya membuat inti untuk eksekutor kumpulan utas yang memungkinkan tugas untuk waktu tunggu (jika mereka menggunakan operasi yang dapat disela ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

amanteaux
sumber
Kode yang menarik, saya menariknya ke sistem saya dan ingin tahu apakah Anda memiliki beberapa contoh operasi IO jenis apa yang tidak akan mengganggu sehingga saya dapat melihat apakah itu akan memengaruhi sistem saya. Terima kasih!
Duncan Krebs
@DuncanKrebs Saya merinci jawaban saya dengan contoh IO yang tidak dapat terputus: Socket.connectdanSocket.read
amanteaux
myThread.interrupted()bukan metode yang tepat untuk melakukan interupsi, karena metode ini MENGHAPUS flag interupsi. Gunakan myThread.interrupt()sebagai gantinya, dan itu harus dengan soket
DanielCuadra
@DanielCuadra: Terima kasih, sepertinya saya salah ketik karena Thread.interrupted()tidak dapat mengganggu utas. Namun, Thread.interrupt()tidak mengganggu java.iooperasi, ini hanya bekerja pada java.niooperasi.
amanteaux
Saya telah menggunakan interrupt()selama bertahun-tahun dan itu selalu mengganggu operasi java.io (serta metode pemblokiran lainnya, seperti thread sleep, koneksi jdbc, blockingqueue take, dll). Mungkin Anda menemukan kelas buggy atau JVM yang memiliki bug
DanielCuadra
0

Bagaimana dengan ide alternatif ini:

  • dua memiliki dua pelaksana:
    • untuk satu :
      • mengirimkan tugas, tanpa mempedulikan waktu tugas habis
      • menambahkan masa depan yang dihasilkan dan waktu ketika itu harus berakhir pada struktur internal
    • satu untuk melaksanakan pekerjaan internal yang memeriksa struktur internal jika beberapa tugas waktu habis dan jika harus dibatalkan.

Sampel kecil ada di sini:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}
Ionut Mesaros
sumber
0

periksa apakah ini berhasil untuk Anda,

    public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
      int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
      Map<K,V> context, Task<T,S,K,V> someTask){
    if(threadPoolExecutor==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
    }
    if(someTask==null){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
    }
    if(CollectionUtils.isEmpty(collection)){
      return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
    }

    LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
    collection.forEach(value -> {
      callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
    });
    LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();

    int count = 0;

    while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
      Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
      futures.offer(f);
      count++;
    }

    Collection<ResponseObject<T>> responseCollection = new ArrayList<>();

    while(futures.size()>0){
      Future<T> future = futures.poll();
      ResponseObject<T> responseObject = null;
        try {
          T response = future.get(timeToCompleteEachTask, timeUnit);
          responseObject = ResponseObject.<T>builder().data(response).build();
        } catch (InterruptedException e) {
          future.cancel(true);
        } catch (ExecutionException e) {
          future.cancel(true);
        } catch (TimeoutException e) {
          future.cancel(true);
        } finally {
          if (Objects.nonNull(responseObject)) {
            responseCollection.add(responseObject);
          }
          futures.remove(future);//remove this
          Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
          if(null!=callable){
            Future<T> f = threadPoolExecutor.submit(callable);
            futures.add(f);
          }
        }

    }
    return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
  }

  private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
    if(callableLinkedBlockingQueue.size()>0){
      return callableLinkedBlockingQueue.poll();
    }
    return null;
  }

Anda dapat membatasi tidak ada penggunaan utas dari penjadwal serta memberi waktu tunggu pada tugas.

Nitish Kumar
sumber