Panggilan metode throttling ke permintaan M dalam N detik

140

Saya membutuhkan komponen / kelas yang membatasi eksekusi beberapa metode ke panggilan M maksimum dalam N detik (atau ms atau nanos, tidak masalah).

Dengan kata lain saya perlu memastikan bahwa metode saya dijalankan tidak lebih dari M kali dalam jendela geser N detik.

Jika Anda tidak mengetahui kelas yang ada, jangan ragu untuk memposting solusi / ide Anda bagaimana Anda akan menerapkannya.

vtrubnikov.dll
sumber
4
Ada beberapa jawaban bagus untuk masalah ini di stackoverflow.com/questions/667508/…
skaffman
> Saya perlu memastikan bahwa metode saya> dieksekusi tidak lebih dari M kali dalam> jendela geser N detik. Saya baru-baru ini menulis posting blog tentang cara melakukan ini di .NET. Anda mungkin bisa membuat sesuatu yang serupa di Java. Pembatasan Tingkat Lebih Baik di .NET
Jack Leitch
Pertanyaan asli terdengar sangat mirip dengan masalah yang diselesaikan dalam posting blog ini: [Java Multi-Channel Asynchronous Throttler] ( cordinc.com/blog/2010/04/java-multichannel-asynchronous.html ). Untuk tingkat M panggilan dalam N detik, throttler yang dibahas di blog ini menjamin bahwa setiap interval dengan panjang N pada timeline tidak akan berisi lebih dari M panggilan.
Hbf

Jawaban:

85

Saya akan menggunakan ring buffer cap waktu dengan ukuran tetap M. Setiap kali metode dipanggil, Anda memeriksa entri terlama, dan jika kurang dari N detik di masa lalu, Anda mengeksekusi dan menambahkan entri lain, jika tidak, Anda tidur untuk perbedaan waktu.

Michael Borgwardt
sumber
4
Menyenangkan. Hanya yang saya butuhkan. Upaya cepat menunjukkan ~ 10 baris untuk mengimplementasikan ini dan footprint memori minimal. Hanya perlu memikirkan keamanan benang dan antrian permintaan yang masuk.
vtrubnikov
5
Itulah mengapa Anda menggunakan DelayQueue dari java.util.concurrent. Ini mencegah masalah beberapa utas yang bekerja pada entri yang sama.
erickson
5
Untuk kasus multi-utas, pendekatan ember token mungkin merupakan pilihan yang lebih baik, menurut saya.
Michael Borgwardt
1
Tahukah Anda bagaimana algoritma ini dipanggil jika memiliki nama sama sekali?
Vlado Pandžić
84

Apa yang berhasil di luar kotak bagi saya adalah Google Guava RateLimiter .

// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);

private void someMethod() {
    throttle.acquire();
    // Do something
}
schnatterer
sumber
19
Saya tidak akan merekomendasikan solusi ini karena Guava RateLimiter akan memblokir utas dan itu akan menguras kumpulan utas dengan mudah.
kaviddiss
20
@kaviddiss jika Anda tidak ingin memblokir maka gunakantryAquire()
slf
7
Masalah dengan penerapan RateLimiter saat ini (setidaknya untuk saya) adalah bahwa itu tidak memungkinkan untuk jangka waktu lebih dari 1 detik dan karenanya kecepatan misalnya 1 per menit.
John B
4
@John B Sejauh yang saya mengerti, Anda dapat mencapai 1 permintaan per menit dengan RateLimiter dengan menggunakan RateLimiter.create (60.0) + rateLimiter.acquire (60)
divideByZero
3
@radiantRazor Ratelimiter.create (1.0 / 60) dan memperoleh () mencapai 1 panggilan per menit.
bizentass
30

Secara konkret, Anda harus dapat menerapkan ini dengan a DelayQueue. Inisialisasi antrean dengan M Delayedinstance dengan penundaan yang awalnya disetel ke nol. Saat permintaan ke metode masuk, taketoken, yang menyebabkan metode diblokir hingga persyaratan pembatasan terpenuhi. Saat token telah diambil, addtoken baru ke antrian dengan penundaan N.

erickson
sumber
1
Ya, ini akan berhasil. Tapi saya tidak terlalu suka DelayQueue karena menggunakan (melalui PriortyQueue) hash biner yang seimbang (yang berarti banyak perbandingan offerdan kemungkinan pertumbuhan array), dan semuanya agak berat bagi saya. Saya rasa untuk orang lain ini mungkin baik-baik saja.
vtrubnikov
5
Sebenarnya, dalam aplikasi ini, karena elemen baru yang ditambahkan ke heap akan hampir selalu menjadi elemen maksimum dalam heap (yaitu, memiliki penundaan paling lama), biasanya diperlukan satu perbandingan per penambahan. Selain itu, array tidak akan pernah bertambah jika algoritme diterapkan dengan benar, karena satu elemen ditambahkan hanya setelah mengambil satu elemen.
erickson
3
Saya menemukan ini membantu juga dalam kasus di mana Anda tidak ingin permintaan terjadi dalam ledakan besar dengan menjaga ukuran M dan menunda N relatif kecil dalam urutan beberapa milidetik. misalnya. M = 5, N = 20ms akan memberikan through put 250 / detik kepping burst terjadi dalam ukuran 5.
FUD
Akankah skala ini untuk satu juta rpm dan ketika permintaan bersamaan diperbolehkan? Saya perlu menambahkan satu juta elemen tertunda. Juga kasus sudut akan tinggi pada latensi - kasus di mana beberapa utas memanggil poll () dan akan terkunci setiap saat.
Aditya Joshee
@AdityaJoshee Saya belum membandingkannya, tetapi jika saya punya waktu, saya akan mencoba memahami overhead. Satu hal yang perlu diperhatikan adalah Anda tidak memerlukan 1 juta token yang kedaluwarsa dalam 1 detik. Anda dapat memiliki 100 token yang kedaluwarsa dalam 10 milidetik, 10 token yang kedaluwarsa dalam milidetik, dll. Ini sebenarnya memaksa tingkat sesaat menjadi lebih dekat ke tingkat rata-rata, merapikan lonjakan, yang dapat menyebabkan cadangan di klien, tetapi itu adalah konsekuensi alami dari pembatasan tarif. 1 juta RPM hampir tidak terdengar seperti throttling. Jika Anda dapat menjelaskan kasus penggunaan Anda, saya mungkin memiliki ide yang lebih baik.
erickson
21

Baca tentang algoritme keranjang Token . Pada dasarnya, Anda memiliki ember dengan token di dalamnya. Setiap kali Anda menjalankan metode ini, Anda mengambil token. Jika tidak ada lagi token, Anda memblokir sampai Anda mendapatkannya. Sementara itu, ada beberapa aktor eksternal yang mengisi ulang token pada interval tetap.

Saya tidak mengetahui perpustakaan untuk melakukan ini (atau yang serupa). Anda dapat menulis logika ini ke dalam kode Anda atau menggunakan AspectJ untuk menambahkan perilakunya.

Kevin
sumber
3
Terima kasih atas sarannya, algo yang menarik. Tapi itu bukan yang saya butuhkan. Misalnya, saya perlu membatasi eksekusi hingga 5 panggilan per detik. Jika saya menggunakan Token bucket dan 10 permintaan masuk pada saat yang sama, 5 panggilan pertama akan mengambil semua token yang tersedia dan dieksekusi sesaat, sementara 5 panggilan tersisa akan dijalankan pada interval tetap 1/5 detik. Dalam situasi seperti itu saya membutuhkan 5 panggilan tersisa untuk dieksekusi dalam ledakan tunggal hanya setelah 1 detik berlalu.
vtrubnikov
5
Bagaimana jika Anda menambahkan 5 token ke keranjang setiap detik (atau 5 - (sisa 5) alih-alih 1 setiap 1/5 detik?
Kevin
@Kevin tidak ini masih tidak akan memberi saya efek 'jendela geser'
vtrubnikov
2
@ Valery ya itu akan. (Ingatlah untuk membatasi token di M)
nos
tidak perlu "aktor eksternal". Semuanya bisa dilakukan dengan satu utas jika Anda menyimpan metadata tentang waktu permintaan.
Marsellus Wallace
8

Jika Anda memerlukan pembatas kecepatan jendela geser berbasis Java yang akan beroperasi di seluruh sistem terdistribusi, Anda mungkin ingin melihat proyek https://github.com/mokies/ratelimitj .

Konfigurasi yang didukung Redis, untuk membatasi permintaan oleh IP hingga 50 per menit akan terlihat seperti ini:

import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;

RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);

boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");

Lihat https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis untuk detail lebih lanjut tentang konfigurasi Redis.

pengguna2326162
sumber
5

Ini tergantung pada aplikasinya.

Bayangkan kasus di mana beberapa thread ingin token untuk melakukan beberapa tindakan global tingkat-terbatas dengan tidak meledak diperbolehkan (yaitu Anda ingin membatasi 10 tindakan per 10 detik, tetapi Anda tidak ingin 10 tindakan terjadi di kedua pertama dan kemudian tetap 9 detik berhenti).

DelayedQueue memiliki kelemahan: urutan thread yang meminta token mungkin bukan urutan pemenuhan permintaannya. Jika beberapa utas diblokir menunggu token, tidak jelas mana yang akan mengambil token yang tersedia berikutnya. Anda bahkan bisa memiliki utas menunggu selamanya, menurut saya.

Salah satu solusinya adalah memiliki interval waktu minimum antara dua tindakan berurutan , dan mengambil tindakan dalam urutan yang sama seperti yang diminta.

Berikut implementasinya:

public class LeakyBucket {
    protected float maxRate;
    protected long minTime;
    //holds time of last action (past or future!)
    protected long lastSchedAction = System.currentTimeMillis();

    public LeakyBucket(float maxRate) throws Exception {
        if(maxRate <= 0.0f) {
            throw new Exception("Invalid rate");
        }
        this.maxRate = maxRate;
        this.minTime = (long)(1000.0f / maxRate);
    }

    public void consume() throws InterruptedException {
        long curTime = System.currentTimeMillis();
        long timeLeft;

        //calculate when can we do the action
        synchronized(this) {
            timeLeft = lastSchedAction + minTime - curTime;
            if(timeLeft > 0) {
                lastSchedAction += minTime;
            }
            else {
                lastSchedAction = curTime;
            }
        }

        //If needed, wait for our time
        if(timeLeft <= 0) {
            return;
        }
        else {
            Thread.sleep(timeLeft);
        }
    }
}
Duarte Meneses
sumber
apa minTimeartinya disini? Apa fungsinya? bisakah kamu menjelaskan itu?
flash
minTimeadalah jumlah waktu minimum yang harus dilalui setelah token digunakan sebelum token berikutnya dapat digunakan.
Duarte Meneses
3

Meskipun bukan itu yang Anda tanyakan, ThreadPoolExecutoryang dirancang untuk membatasi ke M permintaan secara bersamaan, bukan M permintaan dalam N detik, juga bisa berguna.

Eugene Yokota
sumber
2

Saya telah menerapkan algoritma pelambatan sederhana. Coba tautan ini, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html

Penjelasan singkat tentang Algoritma,

Algoritma ini memanfaatkan kemampuan Java Delayed Queue . Buat objek tertunda dengan penundaan yang diharapkan (di sini 1000 / M untuk TimeUnit milidetik ). Letakkan objek yang sama ke dalam antrian tertunda yang akan menyediakan jendela bergerak untuk kita. Kemudian sebelum setiap panggilan metode mengambil objek dari antrian, ambil adalah panggilan pemblokiran yang akan kembali hanya setelah penundaan yang ditentukan, dan setelah panggilan metode jangan lupa untuk meletakkan objek ke dalam antrian dengan waktu yang diperbarui (di sini milidetik saat ini) .

Di sini kita juga dapat memiliki beberapa objek tertunda dengan penundaan berbeda. Pendekatan ini juga akan memberikan throughput yang tinggi.

Krishas
sumber
6
Anda harus memposting ringkasan algoritme Anda. Jika tautan Anda hilang maka jawaban Anda menjadi tidak berguna.
jwr
Terima kasih, saya telah menambahkan ringkasannya.
Krishas
1

Implementasi saya di bawah ini dapat menangani presisi waktu permintaan sewenang-wenang, ia memiliki O (1) kompleksitas waktu untuk setiap permintaan, tidak memerlukan buffer tambahan, misalnya kompleksitas ruang O (1), selain itu tidak memerlukan thread latar belakang untuk melepaskan token, sebagai gantinya token dirilis sesuai dengan waktu berlalu sejak permintaan terakhir.

class RateLimiter {
    int limit;
    double available;
    long interval;

    long lastTimeStamp;

    RateLimiter(int limit, long interval) {
        this.limit = limit;
        this.interval = interval;

        available = 0;
        lastTimeStamp = System.currentTimeMillis();
    }

    synchronized boolean canAdd() {
        long now = System.currentTimeMillis();
        // more token are released since last request
        available += (now-lastTimeStamp)*1.0/interval*limit; 
        if (available>limit)
            available = limit;

        if (available<1)
            return false;
        else {
            available--;
            lastTimeStamp = now;
            return true;
        }
    }
}
tonywl.dll
sumber
0

Coba gunakan pendekatan sederhana ini:

public class SimpleThrottler {

private static final int T = 1; // min
private static final int N = 345;

private Lock lock = new ReentrantLock();
private Condition newFrame = lock.newCondition();
private volatile boolean currentFrame = true;

public SimpleThrottler() {
    handleForGate();
}

/**
 * Payload
 */
private void job() {
    try {
        Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98)));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.err.print(" J. ");
}

public void doJob() throws InterruptedException {
    lock.lock();
    try {

        while (true) {

            int count = 0;

            while (count < N && currentFrame) {
                job();
                count++;
            }

            newFrame.await();
            currentFrame = true;
        }

    } finally {
        lock.unlock();
    }
}

public void handleForGate() {
    Thread handler = new Thread(() -> {
        while (true) {
            try {
                Thread.sleep(1 * 900);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                currentFrame = false;

                lock.lock();
                try {
                    newFrame.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    handler.start();
}

}

SergeZ
sumber
0

Apache Camel juga mendukung dilengkapi dengan mekanisme Throttler sebagai berikut:

from("seda:a").throttle(100).asyncDelayed().to("seda:b");
gtonik
sumber
0

Ini adalah pembaruan untuk kode LeakyBucket di atas. Ini berfungsi untuk lebih dari 1000 permintaan per detik.

import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;

class LeakyBucket {
  private long minTimeNano; // sec / billion
  private long sched = System.nanoTime();

  /**
   * Create a rate limiter using the leakybucket alg.
   * @param perSec the number of requests per second
   */
  public LeakyBucket(double perSec) {
    if (perSec <= 0.0) {
      throw new RuntimeException("Invalid rate " + perSec);
    }
    this.minTimeNano = (long) (1_000_000_000.0 / perSec);
  }

  @SneakyThrows public void consume() {
    long curr = System.nanoTime();
    long timeLeft;

    synchronized (this) {
      timeLeft = sched - curr + minTimeNano;
      sched += minTimeNano;
    }
    if (timeLeft <= minTimeNano) {
      return;
    }
    TimeUnit.NANOSECONDS.sleep(timeLeft);
  }
}

dan yang paling unittest untuk di atas:

import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class LeakyBucketTest {
  @Test @Ignore public void t() {
    double numberPerSec = 10000;
    LeakyBucket b = new LeakyBucket(numberPerSec);
    Stopwatch w = Stopwatch.createStarted();
    IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
        x -> b.consume());
    System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
  }
}
peterreilly
sumber
apa minTimeNanoartinya disini? bisakah kamu menjelaskan?
flash
0

Berikut adalah versi lanjutan dari pembatas kecepatan sederhana

/**
 * Simple request limiter based on Thread.sleep method.
 * Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request.
 * If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit
 */
public class RequestRateLimiter {

    private long minTime;

    private long lastSchedAction;
    private double avgSpent = 0;

    ArrayList<RatePeriod> periods;


    @AllArgsConstructor
    public static class RatePeriod{

        @Getter
        private LocalTime start;

        @Getter
        private LocalTime end;

        @Getter
        private float maxRate;
    }


    /**
     * Create request limiter with maxRate - maximum number of requests per second
     * @param maxRate - maximum number of requests per second
     * @return
     */
    public static RequestRateLimiter create(float maxRate){
        return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0),
                LocalTime.of(23,59,59), maxRate)));
    }

    /**
     * Create request limiter with ratePeriods calendar - maximum number of requests per second in every period
     * @param ratePeriods - rate calendar
     * @return
     */
    public static RequestRateLimiter create(List<RatePeriod> ratePeriods){
        return new RequestRateLimiter(ratePeriods);
    }

    private void checkArgs(List<RatePeriod> ratePeriods){

        for (RatePeriod rp: ratePeriods ){
            if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end )
                throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length");
        }
    }

    private float getCurrentRate(){

        LocalTime now = LocalTime.now();

        for (RatePeriod rp: periods){
            if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) )
                return rp.maxRate;
        }

        return Float.MAX_VALUE;
    }



    private RequestRateLimiter(List<RatePeriod> ratePeriods){

        checkArgs(ratePeriods);
        periods = new ArrayList<>(ratePeriods.size());
        periods.addAll(ratePeriods);

        this.minTime = (long)(1000.0f / getCurrentRate());
        this.lastSchedAction = System.currentTimeMillis() - minTime;
    }

    /**
     * Call this method before making actual request.
     * Method call locks until current rate falls down below the limit
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {

        long timeLeft;

        synchronized(this) {
            long curTime = System.currentTimeMillis();

            minTime = (long)(1000.0f / getCurrentRate());
            timeLeft = lastSchedAction + minTime - curTime;

            long timeSpent = curTime - lastSchedAction + timeLeft;
            avgSpent = (avgSpent + timeSpent) / 2;

            if(timeLeft <= 0) {
                lastSchedAction = curTime;
                return;
            }

            lastSchedAction = curTime + timeLeft;
        }

        Thread.sleep(timeLeft);
    }

    public synchronized float getCuRate(){
        return (float) ( 1000d / avgSpent);
    }
}

Dan tes unit

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class RequestRateLimiterTest {


    @Test(expected = IllegalArgumentException.class)
    public void checkSingleThreadZeroRate(){

        // Zero rate
        RequestRateLimiter limiter = RequestRateLimiter.create(0);
        try {
            limiter.consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void checkSingleThreadUnlimitedRate(){

        // Unlimited
        RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) < 1000));
    }

    @Test
    public void rcheckSingleThreadRate(){

        // 3 request per minute
        RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 3; i++ ){

            try {
                limiter.consume();
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000));
    }



    @Test
    public void checkSingleThreadRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ));
    }

    @Test
    public void checkMultiThreadedRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreaded32RateLimit(){

        // 0,2 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(0.2f);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(8);
        ExecutorService exec = Executors.newFixedThreadPool(8);

        for ( int i = 0; i < 8; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 2; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreadedRateLimitDynamicRate(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {

                Random r = new Random();
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

}
Leonid Astakhov
sumber
Kodenya sangat sederhana. Anda cukup membuat limiter dengan maxRate atau dengan periode dan rate. Dan kemudian panggil saja mengkonsumsi setiap permintaan. Kapanpun rate tidak terlampaui, limiter segera kembali atau menunggu beberapa saat sebelum kembali ke request rate yang lebih rendah. Ini juga memiliki metode tarif saat ini yang mengembalikan rata-rata geser dari tarif saat ini.
Leonid Astakhov
0

Solusi saya: Metode util sederhana, Anda dapat memodifikasinya untuk membuat kelas pembungkus.

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

Ambil dari JAVA Thread Debounce dan Throttle

benbai123
sumber