Apa itu algoritma pembatasan tingkat yang baik?

155

Saya bisa menggunakan beberapa pseudo-code, atau lebih baik, Python. Saya mencoba menerapkan antrian pembatas laju untuk bot IRC Python, dan sebagian berfungsi, tetapi jika seseorang memicu lebih sedikit pesan daripada batas (misalnya, batas laju adalah 5 pesan per 8 detik, dan orang itu hanya memicu 4), dan pemicu berikutnya adalah selama 8 detik (misalnya, 16 detik kemudian), bot mengirim pesan, tetapi antrian menjadi penuh dan bot menunggu 8 detik, meskipun itu tidak diperlukan karena periode 8 detik telah berlalu.

miniman
sumber

Jawaban:

231

Di sini algoritma paling sederhana , jika Anda ingin hanya mengirim pesan ketika mereka datang terlalu cepat (daripada mengantri, itu masuk akal karena antriannya bisa menjadi besar secara sewenang-wenang):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

Tidak ada struktur data, pengatur waktu, dll. Dalam solusi ini dan ini berfungsi dengan baik :) Untuk melihat ini, 'kelonggaran' tumbuh pada kecepatan 5/8 unit per detik paling banyak, yaitu paling banyak lima unit per delapan detik. Setiap pesan yang diteruskan mengurangi satu unit, sehingga Anda tidak dapat mengirim lebih dari lima pesan per setiap delapan detik.

Perhatikan bahwa rateharus berupa bilangan bulat, yaitu tanpa bagian desimal non-nol, atau algoritme tidak akan berfungsi dengan benar (laju aktual tidak akan rate/per). Misalnya rate=0.5; per=1.0;tidak berfungsi karena allowancetidak akan pernah tumbuh menjadi 1,0. Tapi rate=1.0; per=2.0;berfungsi dengan baik.

Antti Huima
sumber
4
Perlu juga ditunjukkan bahwa dimensi dan skala 'time_passed' harus sama dengan 'per', misalnya detik.
skaffman
2
Hi skaffman, terima kasih atas pujian --- aku melemparkannya keluar dari lengan saya tapi dengan 99,9% probabilitas seseorang telah sebelumnya datang dengan solusi yang sama :)
Antti Huima
52
Itu adalah algoritma standar — ini adalah token bucket, tanpa antrian. Embernya adalah allowance. Ukuran ember adalah rate. The allowance += …garis adalah optimalisasi menambahkan token setiap tingkat ÷ per detik.
derobert
5
@zwirbeltier Apa yang Anda tulis di atas tidak benar. 'Allowance' selalu dibatasi oleh 'rate' (lihat baris "// throttle") sehingga hanya akan memungkinkan ledakan pesan 'rate' yang tepat pada waktu tertentu, yaitu 5.
Antti Huima
8
Ini bagus, tetapi bisa melebihi tarif. Katakanlah pada saat 0 Anda meneruskan 5 pesan, kemudian pada waktu N * (8/5) untuk N = 1, 2, ... Anda dapat mengirim pesan lain, menghasilkan lebih dari 5 pesan dalam periode 8 detik
mindvirus
48

Gunakan dekorator ini @RateLimited (ratepersec) sebelum fungsi Anda yang enqueues.

Pada dasarnya, ini memeriksa apakah 1 / tingkat detik telah berlalu sejak terakhir kali dan jika tidak, menunggu sisa waktu, jika tidak maka tidak akan menunggu. Ini secara efektif membatasi Anda untuk menilai / detik. Dekorator dapat diterapkan ke fungsi apa pun yang Anda inginkan dibatasi tarif.

Dalam kasus Anda, jika Anda ingin maksimum 5 pesan per 8 detik, gunakan @RateLimited (0,625) sebelum fungsi sendToQueue Anda.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)
Carlos A. Ibarra
sumber
Saya suka ide menggunakan dekorator untuk tujuan ini. Mengapa lastTimeCalled daftar? Juga, saya ragu ini akan bekerja ketika beberapa benang memanggil fungsi RateLimited sama ...
Stephan202
8
Ini daftar karena tipe sederhana seperti float konstan ketika ditangkap oleh penutupan. Dengan membuatnya daftar, daftar itu konstan, tetapi isinya tidak. Ya, ini bukan thread-safe tetapi dapat dengan mudah diperbaiki dengan kunci.
Carlos A. Ibarra
time.clock()tidak memiliki resolusi yang cukup pada sistem saya, jadi saya mengadaptasi kode dan mengubahnya untuk digunakantime.time()
mtrbean
3
Untuk pembatasan tingkat, Anda tentu tidak ingin menggunakan time.clock(), yang mengukur waktu CPU yang telah berlalu. Waktu CPU dapat berjalan lebih cepat atau lebih lambat daripada waktu "sebenarnya". Anda ingin menggunakan time.time()sebagai gantinya, yang mengukur waktu dinding (waktu "aktual").
John Wiseman
1
BTW untuk sistem produksi nyata: menerapkan tarif yang membatasi dengan panggilan sleep () mungkin bukan ide yang baik karena akan memblokir utas dan karena itu mencegah klien lain menggunakannya.
Maresh
28

Token Bucket cukup mudah diterapkan.

Mulailah dengan ember dengan 5 token.

Setiap 5/8 detik: Jika ember memiliki kurang dari 5 token, tambahkan satu.

Setiap kali Anda ingin mengirim pesan: Jika ember memiliki ≥1 token, ambil satu token dan kirim pesan. Kalau tidak, tunggu / jatuhkan pesan / apa pun.

(jelas, dalam kode aktual, Anda akan menggunakan penghitung integer alih-alih token nyata dan Anda dapat mengoptimalkan setiap langkah 5/8 dengan menyimpan cap waktu)


Membaca pertanyaan lagi, jika batas nilai disetel penuh setiap 8 detik, maka berikut ini modifikasi:

Mulailah dengan stempel waktu,, last_sendpada waktu yang lama (misalnya, di zaman). Juga, mulailah dengan ember 5-token yang sama.

Pukul setiap aturan 5/8 detik.

Setiap kali Anda mengirim pesan: Pertama, periksa apakah last_send≥ 8 detik yang lalu. Jika demikian, isi ember (atur ke 5 token). Kedua, jika ada token dalam ember, kirim pesan (jika tidak, drop / tunggu / dll.). Ketiga, atur last_sendsekarang.

Itu harus bekerja untuk skenario itu.


Saya sebenarnya telah menulis bot IRC menggunakan strategi seperti ini (pendekatan pertama). Ini dalam Perl, bukan Python, tapi di sini ada beberapa kode untuk menggambarkan:

Bagian pertama di sini menangani penambahan token ke ember. Anda dapat melihat pengoptimalan penambahan token berdasarkan waktu (baris 2 hingga baris terakhir) dan kemudian baris terakhir menjepit konten bucket ke maksimum (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn adalah struktur data yang diedarkan. Ini ada di dalam metode yang berjalan secara rutin (menghitung kapan waktu berikutnya ada sesuatu yang harus dilakukan, dan tidur selama itu atau sampai mendapat lalu lintas jaringan). Bagian selanjutnya dari metode ini menangani pengiriman. Agak rumit, karena pesan memiliki prioritas yang terkait dengannya.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

Itu antrian pertama, yang dijalankan tidak peduli apa. Bahkan jika itu membuat koneksi kita terbunuh karena banjir. Digunakan untuk hal-hal yang sangat penting, seperti menanggapi PING server. Selanjutnya, sisa antrian:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Akhirnya, status bucket disimpan kembali ke struktur data $ conn (sebenarnya sedikit kemudian dalam metode; pertama-tama menghitung seberapa cepat itu akan memiliki lebih banyak pekerjaan)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

Seperti yang Anda lihat, kode penanganan bucket sebenarnya sangat kecil - sekitar empat baris. Sisa kode adalah penanganan antrian prioritas. Bot memiliki antrian prioritas sehingga misalnya, seseorang yang mengobrol dengannya tidak dapat mencegahnya melakukan tugas tendangan / larangan penting.

derobert
sumber
Apakah saya melewatkan sesuatu ... sepertinya ini akan membatasi Anda untuk 1 pesan setiap 8 detik setelah Anda melewati 5 pertama
menggigil42
@ menggigil42: Ya, saya salah baca pertanyaannya ... lihat bagian kedua jawabannya.
derobert
@chills: Jika last_send adalah <8 detik, Anda tidak menambahkan token apa pun ke dalam ember. Jika ember Anda berisi token, Anda dapat mengirim pesan; kalau tidak Anda tidak bisa (Anda sudah mengirim 5 pesan dalam 8 detik terakhir)
derobert
3
Saya akan menghargai jika orang-orang yang menolak ini akan menjelaskan mengapa ... Saya ingin memperbaiki masalah yang Anda lihat, tapi itu sulit dilakukan tanpa umpan balik!
derobert
10

untuk memblokir pemrosesan hingga pesan dapat dikirim, sehingga mengantri pesan lebih lanjut, solusi indah antti juga dapat dimodifikasi seperti ini:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

hanya menunggu sampai ada cukup uang untuk mengirim pesan. untuk tidak memulai dengan dua kali kurs, tunjangan juga dapat diinisialisasi dengan 0.

san
sumber
5
Ketika Anda tidur (1-allowance) * (per/rate), Anda perlu menambahkan jumlah yang sama last_check.
Alp
2

Pertahankan waktu pengiriman lima baris terakhir. Tahan pesan yang antri sampai waktu pesan kelima terbaru (jika ada) setidaknya 8 detik di masa lalu (dengan last_five sebagai larik waktu):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()
pesto
sumber
Tidak sejak kamu merevisinya aku tidak.
Pesto
Anda menyimpan lima prangko waktu dan berulang kali menggesernya melalui memori (atau melakukan operasi daftar tertaut). Saya menyimpan satu bilangan bulat bilangan bulat dan satu stempel waktu. Dan hanya melakukan aritmatika dan menetapkan.
derobert
2
Kecuali bahwa tambang akan berfungsi lebih baik jika mencoba mengirim 5 baris tetapi hanya 3 yang diizinkan dalam periode waktu tersebut. Milik Anda akan memungkinkan pengiriman tiga yang pertama, dan memaksa menunggu 8 detik sebelum mengirim 4 dan 5. Milik saya akan memungkinkan 4 dan 5 dikirim 8 detik setelah baris keempat dan kelima terbaru.
Pesto
1
Tetapi pada subjek, kinerja dapat ditingkatkan dengan menggunakan daftar bundar panjang 5 yang terhubung, menunjuk ke pengiriman kelima yang paling baru, menimpanya pada pengiriman baru, dan menggerakkan pointer ke depan.
Pesto
untuk bot irc dengan kecepatan limiter rate tidak menjadi masalah. saya lebih suka daftar solusi karena lebih mudah dibaca. jawaban ember yang telah diberikan membingungkan karena revisi, tetapi tidak ada yang salah dengan itu.
jheriko
2

Salah satu solusinya adalah melampirkan stempel waktu untuk setiap item antrian dan membuang item setelah 8 detik berlalu. Anda dapat melakukan pemeriksaan ini setiap kali antrian ditambahkan.

Ini hanya berfungsi jika Anda membatasi ukuran antrian hingga 5 dan membuang penambahan apa pun saat antrian penuh.

jheriko
sumber
1

Jika seseorang masih tertarik, saya menggunakan kelas callable sederhana ini dalam hubungannya dengan penyimpanan nilai kunci LRU waktunya untuk membatasi tingkat permintaan per IP. Menggunakan deque, tetapi dapat ditulis ulang untuk digunakan dengan daftar.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")
bernyanyi
sumber
1

Hanya implementasi kode python dari jawaban yang diterima.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler
Hodza
sumber
Telah disarankan kepada saya bahwa saya menyarankan Anda untuk menambahkan contoh penggunaan kode Anda .
Luc
0

Bagaimana dengan ini:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

sumber
0

Saya membutuhkan variasi dalam Scala. Ini dia:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A  B) extends (A  B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Inilah cara penggunaannya:

val f = Limiter((5d, 8d), { 
  _: Unit  
    println(System.currentTimeMillis) 
})
while(true){f(())}
Landon Kuhn
sumber