Apakah ada ExecutorService yang menggunakan utas saat ini?

94

Apa yang saya kejar adalah cara yang kompatibel untuk mengkonfigurasi penggunaan kumpulan thread atau tidak. Idealnya, sisa kode tidak akan terpengaruh sama sekali. Saya bisa menggunakan kumpulan utas dengan 1 utas tetapi itu bukan yang saya inginkan. Ada ide?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);

// es.execute / es.submit / new ExecutorCompletionService(es) etc
Michael Rutherfurd
sumber

Jawaban:

69

Berikut adalah implementasi yang sangat sederhana Executor(tidak ExecutorService, ingatlah) yang hanya menggunakan utas saat ini. Mencuri ini dari "Java Concurrency in Practice" (bacaan penting).

public class CurrentThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService adalah antarmuka yang lebih rumit, tetapi dapat ditangani dengan pendekatan yang sama.

terlalu banyak berpikir
sumber
4
+1: Seperti yang Anda katakan, sebuah ExecutorService dapat ditangani dengan cara yang sama, mungkin dengan membuat subclass AbstractExecutorService.
Paul Cager
@Paul Yep, AbstractExecutorServicesepertinya cara terbaik.
terlalu memikirkan
15
Di Java8 Anda dapat mengurangi ini menjadi hanyaRunnable::run
Jon Freedman
@Juude itu akan selalu berjalan di utas yang memanggil eksekutor.
Gustav Karlsson
Bukankah inti dari eksekutor thread yang sama, untuk dapat menjadwalkan lebih banyak tugas dari dalam execute ()? Jawaban ini tidak akan berhasil. Saya tidak dapat menemukan jawaban yang memuaskan ini.
haelix
82

Anda dapat menggunakan Guava's MoreExecutors.newDirectExecutorService(), atau MoreExecutors.directExecutor()jika Anda tidak membutuhkan ExecutorService.

Jika memasukkan Jambu biji terlalu berat, Anda dapat menerapkan sesuatu yang hampir sama baiknya:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
  private final CountDownLatch signal = new CountDownLatch(1);

  private SameThreadExecutorService() {
    super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }

  @Override public void shutdown() {
    super.shutdown();
    signal.countDown();
  }

  public static ExecutorService getInstance() {
    return SingletonHolder.instance;
  }

  private static class SingletonHolder {
    static ExecutorService instance = createInstance();    
  }

  private static ExecutorService createInstance() {
    final SameThreadExecutorService instance
        = new SameThreadExecutorService();

    // The executor has one worker thread. Give it a Runnable that waits
    // until the executor service is shut down.
    // All other submitted tasks will use the RejectedExecutionHandler
    // which runs tasks using the  caller's thread.
    instance.submit(new Runnable() {
        @Override public void run() {
          boolean interrupted = false;
          try {
            while (true) {
              try {
                instance.signal.await();
                break;
              } catch (InterruptedException e) {
                interrupted = true;
              }
            }
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
        }});
    return Executors.unconfigurableScheduledExecutorService(instance);
  }
}
NamshubWriter
sumber
1
Untuk Android, ini return Executors.unconfigurableExecutorService (instance);
Maragues
jika yang kita gunakan hanyalah utas saat ini , mengapa sinkronisasi bersifat primitif? mengapa Latch?
haelix
@haelix pengunci diperlukan karena meskipun pekerjaan dilakukan di utas yang sama dengan utas yang menambahkan pekerjaan, utas apa pun dapat mematikan pelaksana.
NamshubWriter
64

Gaya Java 8:

Executor e = Runnable::run;

lpandzic.dll
sumber
7
Benar-benar kotor. Aku menyukainya.
Rogue
Apa yang kotor tentang itu? Elegan :)
lpandzic
Ini adalah jenis @Ipandzic kotor terbaik, tidak biasa dan ringkas.
Rogue
12

Saya menulis ExecutorServiceberdasarkan AbstractExecutorService.

/**
 * Executes all submitted tasks directly in the same thread as the caller.
 */
public class SameThreadExecutorService extends AbstractExecutorService {

    //volatile because can be viewed by other threads
    private volatile boolean terminated;

    @Override
    public void shutdown() {
        terminated = true;
    }

    @Override
    public boolean isShutdown() {
        return terminated;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
        shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
        return terminated;
    }

    @Override
    public List<Runnable> shutdownNow() {
        return Collections.emptyList();
    }

    @Override
    public void execute(Runnable theCommand) {
        theCommand.run();
    }
}
Eric Obermühlner
sumber
bidang yang diakhiri tidak dilindungi dengan sinkronisasi.
Daneel S. Yaitskov
1
Kolom @ DaneelS.Yaitskov terminatedtidak akan mendapatkan keuntungan dari akses tersinkronisasi berdasarkan kode yang sebenarnya ada di sini. Operasi pada bidang 32-bit bersifat atomik di Java.
Christopher Schultz
Saya kira metode isTerminated () di atas kurang tepat karena isTerminated () hanya seharusnya mengembalikan true jika tidak ada tugas yang sedang dijalankan. Guava melacak jumlah tugas di variabel lain, yang mungkin menjadi alasan mengapa keduanya melindungi kedua variabel dengan kunci.
Jeremy K
7

Anda dapat menggunakan RejectedExecutionHandler untuk menjalankan tugas di utas saat ini.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        r.run();
    }
});

Anda hanya perlu satu dari ini.

Peter Lawrey
sumber
Pintar! Seberapa amankah ini (pertanyaan jujur)? Apakah ada cara agar tugas ditolak di tempat yang sebenarnya tidak Anda inginkan untuk menjalankannya di thread saat ini? Apakah tugas ditolak jika ExecutorService dimatikan atau dihentikan?
terlalu memikirkan
Karena ukuran maksimum adalah 0, setiap tugas ditolak. Namun perilaku yang ditolak dijalankan di utas saat ini. Hanya akan ada masalah jika tugas TIDAK ditolak.
Peter Lawrey
8
perhatikan, sudah ada implementasi kebijakan ini, tidak perlu mendefinisikan sendiri java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy.
jtahlborn
7
Tidak mungkin lagi membuat ThreadPoolExecutor dengan ukuran kumpulan maksimal 0. Saya rasa itu akan mungkin untuk mereproduksi perilaku menggunakan blockingQueue ukuran 0, tetapi tampaknya tidak ada implementasi default yang mengizinkannya.
Axelle Ziegler
yang tidak dapat dikompilasi karena {code} if (corePoolSize <0 || maximumPoolSize <= 0 || maximumPoolSize <corePoolSize || keepAliveTime <0) {code} di java.util.ThreadPoolExecutor (setidaknya openJdk 7)
Bogdan
6

Saya harus menggunakan "CurrentThreadExecutorService" yang sama untuk tujuan pengujian dan, meskipun semua solusi yang disarankan bagus (terutama yang menyebutkan cara Guava ), saya menemukan sesuatu yang mirip dengan apa yang disarankan Peter Lawrey di sini .

Seperti yang disebutkan oleh Axelle Ziegler di sini , sayangnya solusi Peter tidak benar-benar berfungsi karena cek yang dimasukkan ThreadPoolExecutorpada maximumPoolSizeparameter konstruktor (yaitu maximumPoolSizetidak dapat<=0 ).

Untuk menghindari itu, saya melakukan hal berikut:

private static ExecutorService currentThreadExecutorService() {
    CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
        @Override
        public void execute(Runnable command) {
            callerRunsPolicy.rejectedExecution(command, this);
        }
    };
}
fabriziocucci
sumber