Bagaimana cara membuat blok metode submit () ThreadPoolExecutor jika sudah jenuh?

102

Saya ingin membuat ThreadPoolExecutorsedemikian rupa sehingga ketika telah mencapai ukuran maksimum dan antrian penuh, submit()metode memblokir ketika mencoba menambahkan tugas baru. Apakah saya perlu menerapkan kustom RejectedExecutionHandleruntuk itu atau adakah cara yang sudah ada untuk melakukan ini menggunakan pustaka Java standar?

Fixpoint
sumber
2
Apakah yang Anda inginkan seperti metode offer () antrian pemblokiran Array?
ekstraneon
2
@bar Saya tidak setuju. Tanya Jawab ini terlihat lebih berharga (selain lebih tua).
JasonMArcher

Jawaban:

47

Salah satu solusi yang mungkin saya temukan:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Apakah ada solusi lain? Saya lebih suka sesuatu yang berdasarkan RejectedExecutionHandlerkarena sepertinya cara standar untuk menangani situasi seperti itu.

Fixpoint
sumber
2
Apakah ada kondisi balapan di sini antara titik ketika semafor dilepaskan di klausa akhirnya dan semafor diakuisisi?
volni
2
Seperti disebutkan di atas, implementasi ini cacat karena semaphore dirilis sebelum tugas selesai. Akan lebih baik menggunakan metode java.util.concurrent.ThreadPoolExecutor # afterExecute (Runnable, Throwable)
FelixM
2
@FelixM: menggunakan java.util.concurrent.ThreadPoolExecutor # afterExecute (Runnable, Throwable) tidak akan menyelesaikan masalah karena afterExecute dipanggil segera setelah task.run () di java.util.concurrent.ThreadPoolExecutor # runWorker (Worker w), sebelumnya mengambil elemen berikutnya dari antrian (lihat kode sumber openjdk 1.7.0.6).
Jaan
1
Jawaban ini dari buku Java Concurrency in Practice oleh Brian Goetz
orangepips
11
Jawaban ini tidak sepenuhnya benar, demikian juga komentarnya. Potongan kode ini memang berasal dari Java Concurrency in Practice, dan benar jika Anda mempertimbangkan konteksnya . Buku tersebut dengan jelas menyatakan, secara harfiah: "Dalam pendekatan seperti itu, gunakan antrian tak terbatas (...) dan setel batas pada semaphore agar sama dengan ukuran kumpulan ditambah jumlah tugas antrian yang ingin Anda izinkan". Dengan antrian tak terbatas, tugas tidak akan pernah ditolak, jadi memunculkan kembali pengecualian sama sekali tidak berguna! Yang, saya percaya, juga alasan mengapa throw e;adalah TIDAK dalam buku. JCIP benar!
Timmos
30

Anda dapat menggunakan ThreadPoolExecutor dan blockingQueue:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}
DiTime4Tea
sumber
Saya hanya ingin mengatakan bahwa ini adalah solusi yang sangat cepat dan mudah untuk diterapkan yang bekerja dengan sangat baik!
Ivan
58
Ini menjalankan tugas yang ditolak pada utas pengiriman. Yang secara fungsional tidak memenuhi persyaratan OP.
Persepsi
4
Ini menjalankan tugas "di utas pemanggil" alih-alih memblokir untuk meletakkannya di antrean, yang dapat memiliki beberapa efek buruk, seperti jika beberapa utas memanggilnya dengan cara ini, lebih dari "ukuran antrean" tugas akan dijalankan, dan jika Jika tugas berlangsung lebih lama dari yang diharapkan, utas "penghasil" Anda mungkin tidak membuat pelaksana sibuk. Tapi bekerja dengan baik di sini!
rogerdpack
4
Suara negatif: ini tidak memblokir saat TPE sudah jenuh. Ini hanya alternatif, bukan solusi.
Timmos
1
Suara positif: ini cukup cocok dengan 'desain TPE' dan 'secara alami memblokir' utas klien dengan memberi mereka rasa yang melimpah untuk dilakukan. Ini harus mencakup sebagian besar kasus penggunaan, tetapi tidak semuanya tentu saja dan Anda harus memahami apa yang terjadi di balik terpal.
Mike
12

Anda harus menggunakan CallerRunsPolicy, yang menjalankan tugas yang ditolak di thread pemanggil. Dengan cara ini, itu tidak dapat mengirimkan tugas baru apa pun ke pelaksana sampai tugas itu selesai, pada titik mana akan ada beberapa utas kumpulan gratis atau proses akan berulang.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

Dari dokumen:

Tugas yang ditolak

Tugas baru yang dikirimkan dalam metode eksekusi (java.lang.Runnable) akan ditolak saat Pelaksana telah ditutup, dan juga saat Pelaksana menggunakan batas terbatas untuk utas maksimum dan kapasitas antrian kerja, dan jenuh. Dalam kedua kasus, metode eksekusi memanggil metode RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) dari RejectedExecutionHandler-nya. Empat kebijakan penangan yang telah ditentukan disediakan:

  1. Di ThreadPoolExecutor.AbortPolicy default, penangan melontarkan runtime RejectedExecutionException saat ditolak.
  2. Di ThreadPoolExecutor.CallerRunsPolicy, thread yang memanggil eksekusi itu sendiri menjalankan tugas. Ini memberikan mekanisme kontrol umpan balik sederhana yang akan memperlambat kecepatan pengiriman tugas baru.
  3. Di ThreadPoolExecutor.DiscardPolicy, tugas yang tidak bisa dijalankan akan dibuang begitu saja.
  4. Di ThreadPoolExecutor.DiscardOldestPolicy, jika pelaksana tidak dimatikan, tugas di kepala antrean pekerjaan akan dihapus, lalu eksekusi dicoba lagi (yang bisa gagal lagi, menyebabkan ini diulangi.)

Selain itu, pastikan untuk menggunakan antrean terbatas, seperti ArrayBlockingQueue, saat memanggil ThreadPoolExecutorkonstruktor. Jika tidak, tidak ada yang akan ditolak.

Edit: sebagai tanggapan atas komentar Anda, setel ukuran ArrayBlockingQueue agar sama dengan ukuran maksimal kumpulan utas dan gunakan AbortPolicy.

Edit 2: Oke, saya mengerti apa yang Anda maksud. Bagaimana dengan ini: timpa beforeExecute()metode untuk memeriksa yang getActiveCount()tidak melebihi getMaximumPoolSize(), dan jika ya, tidur dan coba lagi?

danben
sumber
3
Saya ingin memiliki jumlah tugas yang dijalankan secara bersamaan untuk dibatasi secara ketat (dengan jumlah utas di Pelaksana), inilah mengapa saya tidak dapat mengizinkan utas pemanggil untuk menjalankan tugas ini sendiri.
Fixpoint
1
AbortPolicy akan menyebabkan utas pemanggil menerima RejectedExecutionException, sementara saya membutuhkannya untuk memblokir.
Fixpoint
2
Saya meminta untuk memblokir, tidak tidur & polling;)
Fixpoint
@danben: Tidakkah maksud Anda CallerRunsPolicy ?
pengguna359996
7
Masalah dengan CallerRunPolicy adalah bahwa jika Anda memiliki produsen utas tunggal, Anda akan sering memiliki utas yang tidak digunakan jika tugas yang berjalan lama kebetulan ditolak (karena tugas lain di kumpulan utas akan selesai sementara tugas yang berjalan lama masih berjalan).
Adam Gent
6

Hibernate memiliki BlockPolicyyang sederhana dan dapat melakukan apa yang Anda inginkan:

Lihat: Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}
Nate Murray
sumber
4
Setelah dipikir-pikir, ini adalah ide yang sangat buruk. Saya tidak menyarankan Anda menggunakannya. Untuk alasan yang bagus lihat di sini: stackoverflow.com/questions/3446011/…
Nate Murray
Juga, ini tidak menggunakan "pustaka Java standar", sesuai permintaan OP. Menghapus?
pengguna359996
1
Woah, jelek sekali. Pada dasarnya solusi ini mengganggu internal TPE. Javadoc for ThreadPoolExecutoreven secara harfiah mengatakan: "Metode getQueue () memungkinkan akses ke antrian pekerjaan untuk tujuan pemantauan dan debugging. Penggunaan metode ini untuk tujuan lain sangat tidak disarankan.". Bahwa ini tersedia di perpustakaan yang sangat terkenal, sungguh menyedihkan untuk dilihat.
Timmos
1
com.amazonaws.services.simpleworkflow.flow.worker.BlockCallerPolicy serupa.
Adrian Baker
6

The BoundedExecutorjawaban yang dikutip di atas dari Jawa Concurrency dalam Praktek hanya bekerja dengan benar jika Anda menggunakan antrian tak terbatas untuk Pelaksana, atau semaphore terikat tidak lebih besar dari ukuran antrian. Semaphore adalah state yang dibagi antara thread pengirim dan thread dalam kumpulan, sehingga memungkinkan untuk memenuhi pelaksana bahkan jika ukuran antrian <terikat <= (ukuran antrian + ukuran kelompok).

Penggunaan CallerRunsPolicyhanya valid jika tugas Anda tidak berjalan selamanya, dalam hal ini thread pengiriman Anda akan tetap ada rejectedExecutionselamanya, dan ide yang buruk jika tugas Anda memakan waktu lama untuk dijalankan, karena thread pengiriman tidak dapat mengirimkan tugas baru atau melakukan hal lain jika menjalankan tugas sendiri.

Jika itu tidak dapat diterima, maka saya sarankan untuk memeriksa ukuran antrian terikat pelaksana sebelum mengirimkan tugas. Jika antrian sudah penuh, tunggu beberapa saat sebelum mencoba mengirim lagi. Throughput akan menderita, tetapi saya menyarankan ini adalah solusi yang lebih sederhana daripada banyak solusi lain yang diusulkan dan Anda dijamin tidak ada tugas yang akan ditolak.

stephan f
sumber
Saya tidak yakin bagaimana memeriksa panjang antrian sebelum mengirimkan menjamin tidak ada tugas yang ditolak dalam lingkungan multi-utas dengan banyak pembuat tugas. Kedengarannya tidak aman untuk thread.
Tim
5

Saya tahu, ini adalah peretasan, tetapi menurut saya peretasan paling bersih di antara yang ditawarkan di sini ;-)

Karena ThreadPoolExecutor menggunakan antrean pemblokiran "offer" daripada "put", mari kita timpa perilaku "offer" dari antrean pemblokiran:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

Saya mengujinya dan tampaknya berhasil. Menerapkan beberapa kebijakan batas waktu ditinggalkan sebagai latihan pembaca.

Jakub
sumber
Lihat stackoverflow.com/a/4522411/2601671 untuk versi bersih ini. Saya setuju, itu cara terbersih untuk melakukannya.
Trenton
3

Kelas berikut membungkus ThreadPoolExecutor dan menggunakan Semaphore untuk memblokir maka antrian pekerjaan penuh:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

Kelas pembungkus ini didasarkan pada solusi yang diberikan dalam buku Java Concurrency in Practice oleh Brian Goetz. Solusi dalam buku ini hanya membutuhkan dua parameter konstruktor: an Executordan batas yang digunakan untuk semaphore. Ini ditunjukkan dalam jawaban yang diberikan oleh Fixpoint. Ada masalah dengan pendekatan itu: ia bisa berada dalam keadaan di mana utas kumpulan sibuk, antrean penuh, tetapi semafor baru saja merilis izin. (semaphore.release() di blok terakhir). Dalam keadaan ini, tugas baru bisa mengambil izin yang baru saja dirilis, tetapi ditolak karena antrean tugas penuh. Tentu saja ini bukanlah sesuatu yang Anda inginkan; yang ingin Anda blokir dalam kasus ini.

Untuk mengatasi ini, kita harus menggunakan antrian tak terbatas , seperti yang disebutkan JCiP dengan jelas. Semaphore bertindak sebagai penjaga, memberikan efek ukuran antrian virtual. Ini memiliki efek samping yang memungkinkan bahwa unit dapat berisi maxPoolSize + virtualQueueSize + maxPoolSizetugas. Mengapa demikian? Karena semaphore.release()di blok akhirnya. Jika semua utas pangkalan memanggil pernyataan ini pada saat yang sama, makamaxPoolSize izin dilepaskan, memungkinkan jumlah tugas yang sama untuk memasuki unit. Jika kami menggunakan antrian terbatas, itu akan tetap penuh, sehingga tugas ditolak. Sekarang, karena kita tahu bahwa ini hanya terjadi ketika utas kumpulan hampir selesai, ini bukan masalah. Kami tahu bahwa utas kumpulan tidak akan diblokir, jadi tugas akan segera diambil dari antrean.

Anda dapat menggunakan antrian terbatas. Pastikan ukurannya sama virtualQueueSize + maxPoolSize. Ukuran yang lebih besar tidak berguna, semaphore akan mencegah masuknya lebih banyak item. Ukuran yang lebih kecil akan mengakibatkan tugas ditolak. Kemungkinan tugas ditolak meningkat seiring dengan berkurangnya ukuran. Misalnya, Anda menginginkan eksekutor terikat dengan maxPoolSize = 2 dan virtualQueueSize = 5. Kemudian ambil semaphore dengan 5 + 2 = 7 izin dan ukuran antrian aktual 5 + 2 = 7. Jumlah tugas sebenarnya yang bisa di unit tersebut kemudian 2 + 5 + 2 = 9. Ketika pelaksana penuh (5 tugas dalam antrian, 2 di kumpulan utas, jadi 0 izin tersedia) dan SEMUA utas kumpulan melepaskan izin mereka, maka tepat 2 izin dapat diambil oleh tugas yang masuk.

Sekarang solusi dari JCiP agak rumit untuk digunakan karena tidak memberlakukan semua batasan ini (antrian tidak terbatas, atau dibatasi dengan batasan matematika tersebut, dll.). Saya pikir ini hanya berfungsi sebagai contoh yang baik untuk mendemonstrasikan bagaimana Anda dapat membangun kelas aman thread baru berdasarkan bagian-bagian yang sudah tersedia, tetapi bukan sebagai kelas dewasa yang dapat digunakan kembali. Saya tidak berpikir bahwa yang terakhir adalah niat penulisnya.

Timmos
sumber
2

Anda dapat menggunakan RejectedExecutionHandler kustom seperti ini

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });
Amir David Nissan Cohen
sumber
1
Dokumen untuk getQueue () secara eksplisit menyebutkan bahwa Akses ke antrean tugas ditujukan terutama untuk debugging dan pemantauan.
Chadi
0

Buat antrean pemblokiran Anda sendiri untuk digunakan oleh Pelaksana, dengan perilaku pemblokiran yang Anda cari, sambil selalu mengembalikan kapasitas yang tersisa (memastikan pelaksana tidak akan mencoba membuat lebih banyak utas daripada kumpulan intinya, atau memicu penangan penolakan).

Saya yakin ini akan membuat Anda mendapatkan perilaku memblokir yang Anda cari. Penangan penolakan tidak akan pernah sesuai dengan tagihan, karena itu menunjukkan pelaksana tidak dapat melakukan tugas. Apa yang bisa saya bayangkan di sana adalah bahwa Anda mendapatkan semacam 'sibuk menunggu' di pawang. Bukan itu yang Anda inginkan, Anda menginginkan antrian untuk pelaksana yang memblokir pemanggil ...

Hoeben Goreng
sumber
2
ThreadPoolExecutormenggunakan offermetode untuk menambahkan tugas ke antrian. Jika saya membuat kebiasaan BlockingQueueyang menghalangi offer, itu akan memutuskan BlockingQueuekontrak.
Fixpoint
@Shooshpanchick, itu akan memutuskan kontrak BlockingQueues. terus? jika Anda sangat tertarik, Anda dapat secara eksplisit mengaktifkan perilaku hanya selama submit () (ini akan membutuhkan ThreadLocal)
bestsss
Lihat juga jawaban atas pertanyaan lain yang menjelaskan alternatif ini.
Robert Tupelo-Schneck
apakah ada alasan mengapa ThreadPoolExecutordiimplementasikan untuk digunakan offerdan bukan put(versi pemblokiran)? Selain itu, jika ada cara bagi kode klien untuk menentukan mana yang akan digunakan kapan, banyak orang yang mencoba menggunakan solusi kustom akan merasa lega
asgs
0

Untuk menghindari masalah dengan solusi @FixPoint. Seseorang dapat menggunakan ListeningExecutorService dan melepaskan semaphore onSuccess dan onFailure di dalam FutureCallback.

vamsu
sumber
Itu memiliki masalah inheren yang sama seperti hanya membungkus Runnablekarena metode tersebut masih dipanggil sebelum pekerja membersihkan secara normal ThreadPoolExecutor. Artinya, Anda masih perlu menangani pengecualian penolakan.
Adam Gent
0

Baru-baru ini saya menemukan pertanyaan ini mengalami masalah yang sama. OP tidak mengatakannya secara eksplisit, tapi kami tidak ingin menggunakan RejectedExecutionHandleryang mengeksekusi tugas di thread pengirim, karena ini akan mengurangi penggunaan thread pekerja jika tugas ini berjalan lama.

Membaca semua jawaban dan komentar, khususnya solusi yang cacat dengan semaphore atau menggunakan, afterExecutesaya telah melihat lebih dekat kode ThreadPoolExecutor untuk melihat apakah ada jalan keluar. Saya kagum melihat ada lebih dari 2000 baris kode (berkomentar), beberapa di antaranya membuat saya pusing . Mengingat persyaratan yang agak sederhana yang sebenarnya saya miliki - satu produsen, beberapa konsumen, biarkan produsen memblokir ketika tidak ada konsumen yang dapat bekerja - saya memutuskan untuk membuat solusi sendiri. Ini bukan ExecutorServicehanya sebuah Executor. Dan itu tidak menyesuaikan jumlah utas dengan beban kerja, tetapi hanya menampung sejumlah utas, yang juga sesuai dengan kebutuhan saya. Ini kodenya. Jangan ragu untuk mengomel tentang itu :-)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}
Harald
sumber
0

Saya yakin ada cara yang cukup elegan untuk menyelesaikan masalah ini dengan menggunakan java.util.concurrent.Semaphoredan mendelegasikan perilaku Executor.newFixedThreadPool. Layanan eksekutor baru hanya akan menjalankan tugas baru jika ada utas untuk melakukannya. Pemblokiran dikelola oleh Semaphore dengan jumlah izin sama dengan jumlah utas. Ketika tugas selesai, itu mengembalikan izin.

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}
Radoslaw Kachel
sumber
Saya menerapkan BoundedExecutor yang dijelaskan dalam Java Concurrency in Practice dan menemukan bahwa Semaphore harus diinisialisasi dengan bendera keadilan yang disetel ke true untuk memastikan bahwa izin Semaphore ditawarkan dalam permintaan pesanan dibuat. Lihat docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . untuk detailnya
Prahalad Deshpande
0

Saya memiliki kebutuhan yang sama di masa lalu: semacam antrean pemblokiran dengan ukuran tetap untuk setiap klien yang didukung oleh kumpulan utas bersama. Saya akhirnya menulis jenis ThreadPoolExecutor saya sendiri:

UserThreadPoolExecutor (antrian pemblokiran (per klien) + threadpool (dibagikan di antara semua klien))

Lihat: https://github.com/d4rxh4wx/UserThreadPoolExecutor

Setiap UserThreadPoolExecutor diberi jumlah maksimum utas dari ThreadPoolExecutor bersama

Setiap UserThreadPoolExecutor dapat:

  • mengirimkan tugas ke pelaksana kumpulan thread bersama jika kuotanya tidak tercapai. Jika kuotanya tercapai, pekerjaan akan masuk antrean (pemblokiran non-konsumtif menunggu CPU). Setelah salah satu tugas yang dikirim selesai, kuota dikurangi, memungkinkan tugas lain menunggu untuk dikirim ke ThreadPoolExecutor
  • tunggu hingga tugas lainnya selesai
d4rxh4wx
sumber
0

Saya menemukan kebijakan penolakan ini di klien pencarian elastis. Ini memblokir utas pemanggil saat memblokir antrian. Kode di bawah-

 static class ForceQueuePolicy implements XRejectedExecutionHandler 
 {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        {
            try 
            {
                executor.getQueue().put(r);
            } 
            catch (InterruptedException e) 
            {
                //should never happen since we never wait
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() 
        {
            return 0;
        }
}
Vladislav
sumber
0

Saya baru-baru ini memiliki kebutuhan untuk mencapai sesuatu yang serupa, tetapi pada a ScheduledExecutorService .

Saya juga harus memastikan bahwa saya menangani penundaan yang diteruskan pada metode dan memastikan bahwa tugas dikirim untuk dieksekusi pada saat yang diharapkan pemanggil atau hanya gagal sehingga melempar RejectedExecutionException .

Metode lain ScheduledThreadPoolExecutoruntuk mengeksekusi atau mengirimkan panggilan tugas secara internal #scheduleyang pada gilirannya akan memanggil metode yang diganti.

import java.util.concurrent.*;

public class BlockingScheduler extends ScheduledThreadPoolExecutor {
    private final Semaphore maxQueueSize;

    public BlockingScheduler(int corePoolSize,
                             ThreadFactory threadFactory,
                             int maxQueueSize) {
        super(corePoolSize, threadFactory, new AbortPolicy());
        this.maxQueueSize = new Semaphore(maxQueueSize);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
        return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
        return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long period,
                                                     TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable t) {
        super.afterExecute(runnable, t);
        try {
            if (t == null && runnable instanceof Future<?>) {
                try {
                    ((Future<?>) runnable).get();
                } catch (CancellationException | ExecutionException e) {
                    t = e;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                System.err.println(t);
            }
        } finally {
            releaseQueueUsage();
        }
    }

    private long beforeSchedule(Runnable runnable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
            return 0;
        }
    }

    private long beforeSchedule(Callable callable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
            return 0;
        }
    }

    private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
        final long beforeAcquireTimeStamp = System.currentTimeMillis();
        maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
        final long afterAcquireTimeStamp = System.currentTimeMillis();
        return afterAcquireTimeStamp - beforeAcquireTimeStamp;
    }

    private void releaseQueueUsage() {
        maxQueueSize.release();
    }
}

Saya memiliki kodenya di sini, akan menghargai masukan apa pun. https://github.com/AmitabhAwasthi/BlockingScheduler

Dev Amitabh
sumber
Jawaban ini sepenuhnya bergantung pada konten tautan eksternal. Jika mereka menjadi tidak valid, jawaban Anda tidak akan berguna. Jadi harap edit jawaban Anda dan tambahkan setidaknya ringkasan dari apa yang dapat ditemukan di sana. Terima kasih!
Fabio mengatakan Reinstate Monica
@fabio: terima kasih sudah menunjukkan. Saya telah menambahkan kode di sana sehingga sekarang lebih masuk akal bagi pembaca. Hargai komentar Anda :)
Dev Amitabh
0

Saya tidak selalu menyukai CallerRunsPolicy, terutama karena memungkinkan tugas yang ditolak untuk 'melewati antrian' dan dieksekusi sebelum tugas yang dikirimkan sebelumnya. Selain itu, menjalankan tugas pada thread pemanggil mungkin membutuhkan waktu lebih lama daripada menunggu slot pertama tersedia.

Saya memecahkan masalah ini menggunakan RejectedExecutionHandler kustom, yang hanya memblokir utas panggilan sebentar dan kemudian mencoba mengirimkan tugas lagi:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

Kelas ini hanya dapat digunakan dalam eksekutor kumpulan benang sebagai RejectedExecutinHandler seperti yang lain, misalnya:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

Satu-satunya downside yang saya lihat adalah bahwa utas panggilan mungkin terkunci sedikit lebih lama dari yang benar-benar diperlukan (hingga 250ms). Selain itu, karena eksekutor ini secara efektif dipanggil secara rekursif, waktu tunggu yang sangat lama hingga utas tersedia (jam) dapat mengakibatkan tumpukan berlebih.

Meskipun demikian, saya pribadi menyukai metode ini. Ringkas, mudah dipahami, dan berfungsi dengan baik.

TinkerTank
sumber
1
Seperti yang Anda katakan tentang diri Anda sendiri: ini dapat membuat stackoverflow. Bukan sesuatu yang ingin saya miliki dalam kode produksi.
Harald
Setiap orang harus membuat keputusan sendiri. Untuk beban kerja saya, ini bukan masalah. Tugas berjalan dalam hitungan detik, bukan jam yang diperlukan untuk menyelesaikan tumpukan. Selain itu, hal yang sama dapat dikatakan untuk hampir semua algoritma rekursif. Apakah itu alasan untuk tidak pernah menggunakan algoritma rekursif dalam produksi?
TinkerTank