Saya mencari implementasi ExecutorService yang dapat diberikan dengan waktu tunggu. Tugas-tugas yang dikirimkan ke ExecutorService akan terganggu jika mereka membutuhkan waktu lebih lama dari waktu tunggu untuk dijalankan. Menerapkan binatang seperti itu bukanlah tugas yang sulit, tetapi saya bertanya-tanya apakah ada yang tahu tentang implementasi yang ada.
Inilah yang saya dapatkan berdasarkan beberapa diskusi di bawah ini. Ada komentar?
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
timeoutExecutor.shutdown();
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
}
}
}
java
multithreading
concurrency
executorservice
Edward Dale
sumber
sumber
protected void beforeExecute(Thread t, Runnable r)
kail.Jawaban:
Anda dapat menggunakan ScheduledExecutorService untuk ini. Pertama Anda akan mengirimkannya hanya sekali untuk segera memulai dan mempertahankan masa depan yang telah dibuat. Setelah itu Anda dapat mengirimkan tugas baru yang akan membatalkan masa depan yang dipertahankan setelah beberapa periode waktu.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); final Future handler = executor.submit(new Callable(){ ... }); executor.schedule(new Runnable(){ public void run(){ handler.cancel(); } }, 10000, TimeUnit.MILLISECONDS);
Ini akan menjalankan penangan Anda (fungsi utama akan diinterupsi) selama 10 detik, kemudian akan membatalkan (yaitu menginterupsi) tugas spesifik tersebut.
sumber
beforeExecute
kail.Sayangnya solusinya cacat. Ada semacam bug dengan
ScheduledThreadPoolExecutor
, juga dilaporkan dalam pertanyaan ini : membatalkan tugas yang dikirimkan tidak sepenuhnya melepaskan sumber daya memori yang terkait dengan tugas; sumber daya dilepaskan hanya saat tugas berakhir.Oleh karena itu, jika Anda membuat
TimeoutThreadPoolExecutor
dengan waktu kedaluwarsa yang cukup lama (penggunaan biasa), dan mengirimkan tugas dengan cukup cepat, Anda akhirnya akan mengisi memori - meskipun tugas sebenarnya berhasil diselesaikan.Anda dapat melihat masalah dengan program uji (sangat kasar) berikut:
public static void main(String[] args) throws InterruptedException { ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES); //ExecutorService service = Executors.newFixedThreadPool(1); try { final AtomicInteger counter = new AtomicInteger(); for (long i = 0; i < 10000000; i++) { service.submit(new Runnable() { @Override public void run() { counter.incrementAndGet(); } }); if (i % 10000 == 0) { System.out.println(i + "/" + counter.get()); while (i > counter.get()) { Thread.sleep(10); } } } } finally { service.shutdown(); } }
Program ini menghabiskan memori yang tersedia, meskipun menunggu hingga pemijahan
Runnable
selesai.Saya memikirkan hal ini untuk sementara waktu, tetapi sayangnya saya tidak dapat menemukan solusi yang baik.
EDIT: Saya menemukan masalah ini dilaporkan sebagai bug JDK 6602600 , dan tampaknya telah diperbaiki baru-baru ini.
sumber
Gabungkan tugas di FutureTask dan Anda bisa menentukan waktu tunggu untuk FutureTask. Lihat contoh dalam jawaban saya untuk pertanyaan ini,
Java native Process timeout
sumber
java.util.concurrent
kelas, tetapi saya sedang mencariExecutorService
implementasi.Setelah banyak waktu untuk survei,
Akhirnya, saya menggunakan
invokeAll
metodeExecutorService
untuk memecahkan masalah ini.Itu akan mengganggu tugas saat tugas berjalan.
Berikut ini contohnya
ExecutorService executorService = Executors.newCachedThreadPool(); try { List<Callable<Object>> callables = new ArrayList<>(); // Add your long time task (callable) callables.add(new VaryLongTimeTask()); // Assign tasks for specific execution timeout (e.g. 2 sec) List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); for (Future<Object> future : futures) { // Getting result } } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown();
Pro adalah Anda juga dapat mengirimkan
ListenableFuture
pada saat yang samaExecutorService
.Ubah sedikit baris kode pertama.
ListeningExecutorService
adalah fitur MendengarkanExecutorService
di proyek google guava ( com.google.guava ))sumber
invokeAll
. Itu bekerja dengan sangat baik. Hanya kata hati-hati bagi siapa pun yang berpikir untuk menggunakan ini: meskipuninvokeAll
mengembalikan daftarFuture
objek, sebenarnya tampaknya operasi pemblokiran.Bagaimana jika menggunakan
ExecutorService.shutDownNow()
metode seperti yang dijelaskan di http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html ? Tampaknya ini solusi paling sederhana.sumber
Tampaknya masalah bukan pada bug JDK 6602600 (itu diselesaikan pada 2010-05-22), tetapi dalam panggilan tidur yang salah (10) dalam lingkaran. Sebagai catatan tambahan, Thread utama harus memberikan KESEMPATAN langsung ke thread lain untuk merealisasikan tugas mereka dengan memanggil SLEEP (0) di SETIAP cabang lingkaran luar. Lebih baik, saya pikir, menggunakan Thread.yield () daripada Thread.sleep (0)
Hasil koreksi bagian dari kode masalah sebelumnya seperti ini:
....................... ........................ Thread.yield(); if (i % 1000== 0) { System.out.println(i + "/" + counter.get()+ "/"+service.toString()); } // // while (i > counter.get()) { // Thread.sleep(10); // }
Ini bekerja dengan benar dengan jumlah penghitung luar hingga 150.000.000 lingkaran yang diuji.
sumber
Menggunakan jawaban John W, saya membuat implementasi yang memulai waktu tunggu dengan benar saat tugas mulai dieksekusi. Saya bahkan menulis tes unit untuk itu :)
Namun, ini tidak sesuai dengan kebutuhan saya karena beberapa operasi IO tidak mengganggu saat
Future.cancel()
dipanggil (yaitu saatThread.interrupt()
dipanggil). Beberapa contoh operasi IO yang mungkin tidak terputus saatThread.interrupt()
dipanggil adalahSocket.connect
danSocket.read
(dan saya menduga sebagian besar operasi IO diterapkan dijava.io
). Semua operasi IO dijava.nio
harus dapat diinterupsi saatThread.interrupt()
dipanggil. Misalnya, kasus untukSocketChannel.open
danSocketChannel.read
.Pokoknya jika ada yang tertarik, saya membuat inti untuk eksekutor kumpulan utas yang memungkinkan tugas untuk waktu tunggu (jika mereka menggunakan operasi yang dapat disela ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf
sumber
Socket.connect
danSocket.read
myThread.interrupted()
bukan metode yang tepat untuk melakukan interupsi, karena metode ini MENGHAPUS flag interupsi. GunakanmyThread.interrupt()
sebagai gantinya, dan itu harus dengan soketThread.interrupted()
tidak dapat mengganggu utas. Namun,Thread.interrupt()
tidak mengganggujava.io
operasi, ini hanya bekerja padajava.nio
operasi.interrupt()
selama bertahun-tahun dan itu selalu mengganggu operasi java.io (serta metode pemblokiran lainnya, seperti thread sleep, koneksi jdbc, blockingqueue take, dll). Mungkin Anda menemukan kelas buggy atau JVM yang memiliki bugBagaimana dengan ide alternatif ini:
Sampel kecil ada di sini:
public class AlternativeExecutorService { private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue = new CopyOnWriteArrayList(); private final ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job private final ListeningExecutorService threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for private ScheduledFuture scheduledFuture; private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; public AlternativeExecutorService() { scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); } public void pushTask(OwnTask task) { ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end } public void shutdownInternalScheduledExecutor() { scheduledFuture.cancel(true); scheduledExecutor.shutdownNow(); } long getCurrentMillisecondsTime() { return Calendar.getInstance().get(Calendar.MILLISECOND); } class ListenableFutureTask { private final ListenableFuture<Void> future; private final OwnTask task; private final long milliSecEndTime; private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime) { this.future = future; this.task = task; this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); } ListenableFuture<Void> getFuture() { return future; } OwnTask getTask() { return task; } long getMilliSecEndTime() { return milliSecEndTime; } } class TimeoutManagerJob implements Runnable { CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList() { return futureQueue; } @Override public void run() { long currentMileSecValue = getCurrentMillisecondsTime(); for (ListenableFutureTask futureTask : futureQueue) { consumeFuture(futureTask, currentMileSecValue); } } private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) { ListenableFuture<Void> future = futureTask.getFuture(); boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; if (isTimeout) { if (!future.isDone()) { future.cancel(true); } futureQueue.remove(futureTask); } } } class OwnTask implements Callable<Void> { private long timeoutDuration; private TimeUnit timeUnit; OwnTask(long timeoutDuration, TimeUnit timeUnit) { this.timeoutDuration = timeoutDuration; this.timeUnit = timeUnit; } @Override public Void call() throws Exception { // do logic return null; } public long getTimeoutDuration() { return timeoutDuration; } public TimeUnit getTimeUnit() { return timeUnit; } } }
sumber
periksa apakah ini berhasil untuk Anda,
public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor, int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection, Map<K,V> context, Task<T,S,K,V> someTask){ if(threadPoolExecutor==null){ return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build(); } if(someTask==null){ return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build(); } if(CollectionUtils.isEmpty(collection)){ return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build(); } LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size()); collection.forEach(value -> { callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything. }); LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>(); int count = 0; while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){ Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll()); futures.offer(f); count++; } Collection<ResponseObject<T>> responseCollection = new ArrayList<>(); while(futures.size()>0){ Future<T> future = futures.poll(); ResponseObject<T> responseObject = null; try { T response = future.get(timeToCompleteEachTask, timeUnit); responseObject = ResponseObject.<T>builder().data(response).build(); } catch (InterruptedException e) { future.cancel(true); } catch (ExecutionException e) { future.cancel(true); } catch (TimeoutException e) { future.cancel(true); } finally { if (Objects.nonNull(responseObject)) { responseCollection.add(responseObject); } futures.remove(future);//remove this Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue); if(null!=callable){ Future<T> f = threadPoolExecutor.submit(callable); futures.add(f); } } } return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build(); } private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){ if(callableLinkedBlockingQueue.size()>0){ return callableLinkedBlockingQueue.poll(); } return null; }
Anda dapat membatasi tidak ada penggunaan utas dari penjadwal serta memberi waktu tunggu pada tugas.
sumber