Saya membutuhkan komponen / kelas yang membatasi eksekusi beberapa metode ke panggilan M maksimum dalam N detik (atau ms atau nanos, tidak masalah).
Dengan kata lain saya perlu memastikan bahwa metode saya dijalankan tidak lebih dari M kali dalam jendela geser N detik.
Jika Anda tidak mengetahui kelas yang ada, jangan ragu untuk memposting solusi / ide Anda bagaimana Anda akan menerapkannya.
java
throttling
vtrubnikov.dll
sumber
sumber
Jawaban:
Saya akan menggunakan ring buffer cap waktu dengan ukuran tetap M. Setiap kali metode dipanggil, Anda memeriksa entri terlama, dan jika kurang dari N detik di masa lalu, Anda mengeksekusi dan menambahkan entri lain, jika tidak, Anda tidur untuk perbedaan waktu.
sumber
Apa yang berhasil di luar kotak bagi saya adalah Google Guava RateLimiter .
// Allow one request per second private RateLimiter throttle = RateLimiter.create(1.0); private void someMethod() { throttle.acquire(); // Do something }
sumber
tryAquire()
Secara konkret, Anda harus dapat menerapkan ini dengan a
DelayQueue
. Inisialisasi antrean denganM
Delayed
instance dengan penundaan yang awalnya disetel ke nol. Saat permintaan ke metode masuk,take
token, yang menyebabkan metode diblokir hingga persyaratan pembatasan terpenuhi. Saat token telah diambil,add
token baru ke antrian dengan penundaanN
.sumber
offer
dan kemungkinan pertumbuhan array), dan semuanya agak berat bagi saya. Saya rasa untuk orang lain ini mungkin baik-baik saja.Baca tentang algoritme keranjang Token . Pada dasarnya, Anda memiliki ember dengan token di dalamnya. Setiap kali Anda menjalankan metode ini, Anda mengambil token. Jika tidak ada lagi token, Anda memblokir sampai Anda mendapatkannya. Sementara itu, ada beberapa aktor eksternal yang mengisi ulang token pada interval tetap.
Saya tidak mengetahui perpustakaan untuk melakukan ini (atau yang serupa). Anda dapat menulis logika ini ke dalam kode Anda atau menggunakan AspectJ untuk menambahkan perilakunya.
sumber
Jika Anda memerlukan pembatas kecepatan jendela geser berbasis Java yang akan beroperasi di seluruh sistem terdistribusi, Anda mungkin ingin melihat proyek https://github.com/mokies/ratelimitj .
Konfigurasi yang didukung Redis, untuk membatasi permintaan oleh IP hingga 50 per menit akan terlihat seperti ini:
import com.lambdaworks.redis.RedisClient; import es.moki.ratelimitj.core.LimitRule; RedisClient client = RedisClient.create("redis://localhost"); Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules); boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");
Lihat https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis untuk detail lebih lanjut tentang konfigurasi Redis.
sumber
Ini tergantung pada aplikasinya.
Bayangkan kasus di mana beberapa thread ingin token untuk melakukan beberapa tindakan global tingkat-terbatas dengan tidak meledak diperbolehkan (yaitu Anda ingin membatasi 10 tindakan per 10 detik, tetapi Anda tidak ingin 10 tindakan terjadi di kedua pertama dan kemudian tetap 9 detik berhenti).
DelayedQueue memiliki kelemahan: urutan thread yang meminta token mungkin bukan urutan pemenuhan permintaannya. Jika beberapa utas diblokir menunggu token, tidak jelas mana yang akan mengambil token yang tersedia berikutnya. Anda bahkan bisa memiliki utas menunggu selamanya, menurut saya.
Salah satu solusinya adalah memiliki interval waktu minimum antara dua tindakan berurutan , dan mengambil tindakan dalam urutan yang sama seperti yang diminta.
Berikut implementasinya:
public class LeakyBucket { protected float maxRate; protected long minTime; //holds time of last action (past or future!) protected long lastSchedAction = System.currentTimeMillis(); public LeakyBucket(float maxRate) throws Exception { if(maxRate <= 0.0f) { throw new Exception("Invalid rate"); } this.maxRate = maxRate; this.minTime = (long)(1000.0f / maxRate); } public void consume() throws InterruptedException { long curTime = System.currentTimeMillis(); long timeLeft; //calculate when can we do the action synchronized(this) { timeLeft = lastSchedAction + minTime - curTime; if(timeLeft > 0) { lastSchedAction += minTime; } else { lastSchedAction = curTime; } } //If needed, wait for our time if(timeLeft <= 0) { return; } else { Thread.sleep(timeLeft); } } }
sumber
minTime
artinya disini? Apa fungsinya? bisakah kamu menjelaskan itu?minTime
adalah jumlah waktu minimum yang harus dilalui setelah token digunakan sebelum token berikutnya dapat digunakan.Meskipun bukan itu yang Anda tanyakan,
ThreadPoolExecutor
yang dirancang untuk membatasi ke M permintaan secara bersamaan, bukan M permintaan dalam N detik, juga bisa berguna.sumber
Saya telah menerapkan algoritma pelambatan sederhana. Coba tautan ini, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html
Penjelasan singkat tentang Algoritma,
Algoritma ini memanfaatkan kemampuan Java Delayed Queue . Buat objek tertunda dengan penundaan yang diharapkan (di sini 1000 / M untuk TimeUnit milidetik ). Letakkan objek yang sama ke dalam antrian tertunda yang akan menyediakan jendela bergerak untuk kita. Kemudian sebelum setiap panggilan metode mengambil objek dari antrian, ambil adalah panggilan pemblokiran yang akan kembali hanya setelah penundaan yang ditentukan, dan setelah panggilan metode jangan lupa untuk meletakkan objek ke dalam antrian dengan waktu yang diperbarui (di sini milidetik saat ini) .
Di sini kita juga dapat memiliki beberapa objek tertunda dengan penundaan berbeda. Pendekatan ini juga akan memberikan throughput yang tinggi.
sumber
Implementasi saya di bawah ini dapat menangani presisi waktu permintaan sewenang-wenang, ia memiliki O (1) kompleksitas waktu untuk setiap permintaan, tidak memerlukan buffer tambahan, misalnya kompleksitas ruang O (1), selain itu tidak memerlukan thread latar belakang untuk melepaskan token, sebagai gantinya token dirilis sesuai dengan waktu berlalu sejak permintaan terakhir.
class RateLimiter { int limit; double available; long interval; long lastTimeStamp; RateLimiter(int limit, long interval) { this.limit = limit; this.interval = interval; available = 0; lastTimeStamp = System.currentTimeMillis(); } synchronized boolean canAdd() { long now = System.currentTimeMillis(); // more token are released since last request available += (now-lastTimeStamp)*1.0/interval*limit; if (available>limit) available = limit; if (available<1) return false; else { available--; lastTimeStamp = now; return true; } } }
sumber
Coba gunakan pendekatan sederhana ini:
public class SimpleThrottler { private static final int T = 1; // min private static final int N = 345; private Lock lock = new ReentrantLock(); private Condition newFrame = lock.newCondition(); private volatile boolean currentFrame = true; public SimpleThrottler() { handleForGate(); } /** * Payload */ private void job() { try { Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98))); } catch (InterruptedException e) { e.printStackTrace(); } System.err.print(" J. "); } public void doJob() throws InterruptedException { lock.lock(); try { while (true) { int count = 0; while (count < N && currentFrame) { job(); count++; } newFrame.await(); currentFrame = true; } } finally { lock.unlock(); } } public void handleForGate() { Thread handler = new Thread(() -> { while (true) { try { Thread.sleep(1 * 900); } catch (InterruptedException e) { e.printStackTrace(); } finally { currentFrame = false; lock.lock(); try { newFrame.signal(); } finally { lock.unlock(); } } } }); handler.start(); }
}
sumber
Apache Camel juga mendukung dilengkapi dengan mekanisme Throttler sebagai berikut:
from("seda:a").throttle(100).asyncDelayed().to("seda:b");
sumber
Ini adalah pembaruan untuk kode LeakyBucket di atas. Ini berfungsi untuk lebih dari 1000 permintaan per detik.
import lombok.SneakyThrows; import java.util.concurrent.TimeUnit; class LeakyBucket { private long minTimeNano; // sec / billion private long sched = System.nanoTime(); /** * Create a rate limiter using the leakybucket alg. * @param perSec the number of requests per second */ public LeakyBucket(double perSec) { if (perSec <= 0.0) { throw new RuntimeException("Invalid rate " + perSec); } this.minTimeNano = (long) (1_000_000_000.0 / perSec); } @SneakyThrows public void consume() { long curr = System.nanoTime(); long timeLeft; synchronized (this) { timeLeft = sched - curr + minTimeNano; sched += minTimeNano; } if (timeLeft <= minTimeNano) { return; } TimeUnit.NANOSECONDS.sleep(timeLeft); } }
dan yang paling unittest untuk di atas:
import com.google.common.base.Stopwatch; import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class LeakyBucketTest { @Test @Ignore public void t() { double numberPerSec = 10000; LeakyBucket b = new LeakyBucket(numberPerSec); Stopwatch w = Stopwatch.createStarted(); IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach( x -> b.consume()); System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS)); } }
sumber
minTimeNano
artinya disini? bisakah kamu menjelaskan?Berikut adalah versi lanjutan dari pembatas kecepatan sederhana
/** * Simple request limiter based on Thread.sleep method. * Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request. * If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit */ public class RequestRateLimiter { private long minTime; private long lastSchedAction; private double avgSpent = 0; ArrayList<RatePeriod> periods; @AllArgsConstructor public static class RatePeriod{ @Getter private LocalTime start; @Getter private LocalTime end; @Getter private float maxRate; } /** * Create request limiter with maxRate - maximum number of requests per second * @param maxRate - maximum number of requests per second * @return */ public static RequestRateLimiter create(float maxRate){ return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0), LocalTime.of(23,59,59), maxRate))); } /** * Create request limiter with ratePeriods calendar - maximum number of requests per second in every period * @param ratePeriods - rate calendar * @return */ public static RequestRateLimiter create(List<RatePeriod> ratePeriods){ return new RequestRateLimiter(ratePeriods); } private void checkArgs(List<RatePeriod> ratePeriods){ for (RatePeriod rp: ratePeriods ){ if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end ) throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length"); } } private float getCurrentRate(){ LocalTime now = LocalTime.now(); for (RatePeriod rp: periods){ if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) ) return rp.maxRate; } return Float.MAX_VALUE; } private RequestRateLimiter(List<RatePeriod> ratePeriods){ checkArgs(ratePeriods); periods = new ArrayList<>(ratePeriods.size()); periods.addAll(ratePeriods); this.minTime = (long)(1000.0f / getCurrentRate()); this.lastSchedAction = System.currentTimeMillis() - minTime; } /** * Call this method before making actual request. * Method call locks until current rate falls down below the limit * @throws InterruptedException */ public void consume() throws InterruptedException { long timeLeft; synchronized(this) { long curTime = System.currentTimeMillis(); minTime = (long)(1000.0f / getCurrentRate()); timeLeft = lastSchedAction + minTime - curTime; long timeSpent = curTime - lastSchedAction + timeLeft; avgSpent = (avgSpent + timeSpent) / 2; if(timeLeft <= 0) { lastSchedAction = curTime; return; } lastSchedAction = curTime + timeLeft; } Thread.sleep(timeLeft); } public synchronized float getCuRate(){ return (float) ( 1000d / avgSpent); } }
Dan tes unit
import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class RequestRateLimiterTest { @Test(expected = IllegalArgumentException.class) public void checkSingleThreadZeroRate(){ // Zero rate RequestRateLimiter limiter = RequestRateLimiter.create(0); try { limiter.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void checkSingleThreadUnlimitedRate(){ // Unlimited RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE); long started = System.currentTimeMillis(); for ( int i = 0; i < 1000; i++ ){ try { limiter.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } long ended = System.currentTimeMillis(); System.out.println( "Current rate:" + limiter.getCurRate() ); Assert.assertTrue( ((ended - started) < 1000)); } @Test public void rcheckSingleThreadRate(){ // 3 request per minute RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f); long started = System.currentTimeMillis(); for ( int i = 0; i < 3; i++ ){ try { limiter.consume(); Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } } long ended = System.currentTimeMillis(); System.out.println( "Current rate:" + limiter.getCurRate() ); Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000)); } @Test public void checkSingleThreadRateLimit(){ // 100 request per second RequestRateLimiter limiter = RequestRateLimiter.create(100); long started = System.currentTimeMillis(); for ( int i = 0; i < 1000; i++ ){ try { limiter.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } long ended = System.currentTimeMillis(); System.out.println( "Current rate:" + limiter.getCurRate() ); Assert.assertTrue( (ended - started) >= ( 10000 - 100 )); } @Test public void checkMultiThreadedRateLimit(){ // 100 request per second RequestRateLimiter limiter = RequestRateLimiter.create(100); long started = System.currentTimeMillis(); List<Future<?>> tasks = new ArrayList<>(10); ExecutorService exec = Executors.newFixedThreadPool(10); for ( int i = 0; i < 10; i++ ) { tasks.add( exec.submit(() -> { for (int i1 = 0; i1 < 100; i1++) { try { limiter.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } }) ); } tasks.stream().forEach( future -> { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); long ended = System.currentTimeMillis(); System.out.println( "Current rate:" + limiter.getCurRate() ); Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) ); } @Test public void checkMultiThreaded32RateLimit(){ // 0,2 request per second RequestRateLimiter limiter = RequestRateLimiter.create(0.2f); long started = System.currentTimeMillis(); List<Future<?>> tasks = new ArrayList<>(8); ExecutorService exec = Executors.newFixedThreadPool(8); for ( int i = 0; i < 8; i++ ) { tasks.add( exec.submit(() -> { for (int i1 = 0; i1 < 2; i1++) { try { limiter.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } }) ); } tasks.stream().forEach( future -> { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); long ended = System.currentTimeMillis(); System.out.println( "Current rate:" + limiter.getCurRate() ); Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) ); } @Test public void checkMultiThreadedRateLimitDynamicRate(){ // 100 request per second RequestRateLimiter limiter = RequestRateLimiter.create(100); long started = System.currentTimeMillis(); List<Future<?>> tasks = new ArrayList<>(10); ExecutorService exec = Executors.newFixedThreadPool(10); for ( int i = 0; i < 10; i++ ) { tasks.add( exec.submit(() -> { Random r = new Random(); for (int i1 = 0; i1 < 100; i1++) { try { limiter.consume(); Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }) ); } tasks.stream().forEach( future -> { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); long ended = System.currentTimeMillis(); System.out.println( "Current rate:" + limiter.getCurRate() ); Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) ); } }
sumber
Solusi saya: Metode util sederhana, Anda dapat memodifikasinya untuk membuat kelas pembungkus.
public static Runnable throttle (Runnable realRunner, long delay) { Runnable throttleRunner = new Runnable() { // whether is waiting to run private boolean _isWaiting = false; // target time to run realRunner private long _timeToRun; // specified delay time to wait private long _delay = delay; // Runnable that has the real task to run private Runnable _realRunner = realRunner; @Override public void run() { // current time long now; synchronized (this) { // another thread is waiting, skip if (_isWaiting) return; now = System.currentTimeMillis(); // update time to run // do not update it each time since // you do not want to postpone it unlimited _timeToRun = now+_delay; // set waiting status _isWaiting = true; } try { Thread.sleep(_timeToRun-now); } catch (InterruptedException e) { e.printStackTrace(); } finally { // clear waiting status before run _isWaiting = false; // do the real task _realRunner.run(); } }}; return throttleRunner; }
Ambil dari JAVA Thread Debounce dan Throttle
sumber