Menghapus semua tugas yang tertunda dalam seledri / rabbitmq

199

Bagaimana saya bisa menghapus semua tugas yang tertunda tanpa mengetahui task_iduntuk setiap tugas?

nabizan
sumber

Jawaban:

296

Dari dokumen :

$ celery -A proj purge

atau

from proj.celery import app
app.control.purge()

(EDIT: Diperbarui dengan metode saat ini.)

Philip Southam
sumber
56
Atau, dari Django, untuk seledri 3.0+: manage.py celery purge( celeryctlsekarang sudah usang dan akan hilang dalam 3.1).
Henrik Heimbuerger
3
Saya menemukan jawaban ini mencari bagaimana melakukan ini dengan backis redis. Metode terbaik yang saya temukan adalah redis-cli KEYS "celery*" | xargs redis-cli DELyang bekerja untuk saya. Ini akan menghapus semua tugas yang disimpan di redis backend yang Anda gunakan.
Melignus
1
Bagaimana saya bisa melakukan ini di seledri 3.0?
luistm
2
Bagi saya, itu sederhana celery purge(di dalam virtual env yang relevan). Ooops - ada jawaban dengan yang sama di bawah ini ..... stackoverflow.com/a/20404976/1213425
Erve1879
Untuk Celery 4.0+ yang dikombinasikan dengan Django, ini adalah perintah ini lagi, di mana argumennya -Aadalah aplikasi Django di mana celery.pyberada.
gitaarik
120

Untuk seledri 3.0+:

$ celery purge

Untuk membersihkan antrian tertentu:

$ celery -Q queue_name purge
ToonAlfrink
sumber
9
Jika Anda mendapatkan kesalahan koneksi, pastikan Anda menentukan aplikasi, mis celery -A proj purge.
Kamil Sindi
25

Untuk Seledri 2.x dan 3.x:

Saat menggunakan pekerja dengan parameter -Q untuk menentukan antrian, misalnya

celery worker -Q queue1,queue2,queue3

maka celery purgetidak akan berfungsi, karena Anda tidak dapat meneruskan parade antrian ke sana. Itu hanya akan menghapus antrian default. Solusinya adalah memulai pekerja Anda dengan --purgeparameter seperti ini:

celery worker -Q queue1,queue2,queue3 --purge

Namun ini akan menjalankan pekerja.

Pilihan lainnya adalah menggunakan sub-perintah amqp seledri

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
smido
sumber
Ya, ini untuk versi seledri yang lebih tua (2.x dan mungkin 3.x). Saya tidak dapat mengedit jawabannya
smido
9

Saya menemukan itu celery purgetidak bekerja untuk konfigurasi seledri saya yang lebih kompleks. Saya menggunakan beberapa antrian bernama untuk tujuan yang berbeda:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

Kolom pertama adalah nama antrian, yang kedua adalah jumlah pesan yang menunggu dalam antrian, dan yang ketiga adalah jumlah pendengar untuk antrian itu. Antriannya adalah:

  • seledri - Antrian untuk tugas seledri standar dan idempoten
  • apns - Antrian untuk tugas Layanan Pemberitahuan Push Apple, tidak cukup sebagai idempoten
  • analytics - Antrian untuk analitik malam yang panjang
  • * .pidbox - Antrian untuk perintah pekerja, seperti shutdown dan reset, satu per pekerja (2 pekerja seledri, satu pekerja apns, satu pekerja analitik)
  • bcast. * - Antrean siaran, untuk mengirim pesan ke semua pekerja yang mendengarkan antrian (bukan hanya yang pertama mengambilnya)
  • celeryev. * - Antrian acara seledri, untuk melaporkan analitik tugas

Tugas analitik adalah tugas brute force yang bekerja sangat baik pada set data kecil, tetapi sekarang membutuhkan waktu lebih dari 24 jam untuk diproses. Kadang-kadang, ada sesuatu yang salah dan itu akan macet menunggu di database. Perlu ditulis ulang, tetapi sampai saat itu, ketika macet saya membunuh tugas, mengosongkan antrian, dan coba lagi. Saya mendeteksi "kebuntuan" dengan melihat jumlah pesan untuk antrian analitik, yang seharusnya 0 (analisis selesai) atau 1 (menunggu analitik semalam selesai). 2 atau lebih tinggi buruk, dan saya mendapat email.

celery purge menawarkan untuk menghapus tugas dari salah satu antrian siaran, dan saya tidak melihat opsi untuk memilih antrian dengan nama yang berbeda.

Inilah proses saya:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
jwhitlock
sumber
Tapi bukan jawaban, bukan? Namun sangat informatif!
amn
4
celeryctl purgetidak bekerja dengan antrian bernama. python manage.py celery amqp queue.purge <queue_name>melakukan. Saya pikir konteksnya bermanfaat bagi mereka yang memiliki pengaturan rumit, sehingga mereka dapat mengetahui apa yang harus mereka lakukan jika celeryctl purgegagal untuk mereka.
jwhitlock
Saya tidak dapat menemukan manage.pydi Seledri 3.1.17 saya, apakah file sudah dihapus atau baru dipukul? Saya menemukan apa yang tampak seperti antarmuka yang sesuai ( queue.purge) di */bin/amqp.py. Tetapi setelah mencoba menghubungkan isi file dengan dokumentasi, saya harus menyesal mengakui bahwa Celery sangat tidak berdokumen dan juga bagian dari pekerjaan yang sangat berbelit-belit, paling tidak menilainya dengan kode sumbernya.
amn
manage.pyadalah skrip manajemen Django, dan manage.py celerymenjalankan seledri setelah memuat konfigurasi dari pengaturan Django. Saya belum pernah menggunakan seledri di luar Django, tetapi celeryperintah yang disertakan mungkin apa yang Anda cari: celery.readthedocs.org/en/latest/userguide/monitoring.html
jwhitlock
5

Dalam Seledri 3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Bersihkan nama antrian:

 celery -A proj amqp queue.purge <queue name>

Bersihkan antrian yang dikonfigurasi

celery -A proj purge

Saya sudah membersihkan pesan, tetapi masih ada pesan yang tersisa di antrian? Jawaban: Tugas diakui (dihapus dari antrian) segera setelah mereka benar-benar dieksekusi. Setelah pekerja menerima tugas, itu akan memakan waktu sampai benar-benar dieksekusi, terutama jika ada banyak tugas yang sudah menunggu untuk dieksekusi. Pesan yang tidak diakui ditahan oleh pekerja hingga pesan itu menutup koneksi ke broker (server AMQP). Ketika koneksi ditutup (mis. Karena pekerja dihentikan) tugas akan dikirim kembali oleh broker ke pekerja berikutnya yang tersedia (atau pekerja yang sama ketika telah dinyalakan kembali), sehingga untuk membersihkan antrian tugas menunggu Anda dengan benar harus menghentikan semua pekerja, dan kemudian membersihkan tugas menggunakan selery.control.purge ().

Jadi untuk membersihkan seluruh pekerja antrian harus dihentikan.

oneklc
sumber
5

Jika Anda ingin menghapus semua tugas yang tertunda dan juga yang aktif dan dicadangkan untuk sepenuhnya menghentikan Seledri, inilah yang bekerja untuk saya:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)
Kahlo
sumber
2

1. Untuk membersihkan antrian tugas menunggu dengan benar, Anda harus menghentikan semua pekerja ( http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are- still-messages-left-in-the-queue ):

$ sudo rabbitmqctl stop

atau (dalam kasus RabbitMQ / broker pesan dikelola oleh Supervisor):

$ sudo supervisorctl stop all

2. ... lalu bersihkan tugas dari antrian tertentu:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Mulai RabbitMQ:

$ sudo rabbitmqctl start

atau (dalam kasus RabbitMQ dikelola oleh Supervisor):

$ sudo supervisorctl start all
Ukr
sumber
2

selery 4+ perintah selery purge untuk membersihkan semua antrian tugas yang dikonfigurasi

celery -A *APPNAME* purge

secara terprogram:

from proj.celery import app
app.control.purge()

semua tugas yang tertunda akan dihapus. Referensi: celerydoc

Roshan Bagdiya
sumber