from celery.app.control importInspect# Inspect all nodes.
i =Inspect()# Show the items that have an ETA or are scheduled for later processing
i.scheduled()# Show tasks that are currently active.
i.active()# Show tasks that have been claimed by workers
i.reserved()
Saya mencoba itu, tapi itu benar-benar lambat (seperti 1 detik). Saya menggunakannya secara serempak dalam aplikasi tornado untuk memantau kemajuan, jadi itu harus cepat.
JulienFr
41
Ini tidak akan mengembalikan daftar tugas dalam antrian yang belum diproses.
Ed J
9
Gunakan i.reserved()untuk mendapatkan daftar tugas yang antri.
Pisang
4
Adakah yang pernah mengalami bahwa saya.reserved () tidak akan memiliki daftar tugas aktif yang akurat? Saya memiliki tugas yang berjalan yang tidak muncul dalam daftar. Saya sedang di django-
selery
6
Ketika menentukan pekerja saya harus menggunakan daftar sebagai argumen: inspect(['celery@Flatty']). Peningkatan kecepatan besar berakhir inspect().
Adversus
42
jika Anda menggunakan rabbitMQ, gunakan ini di terminal:
sudo rabbitmqctl list_queues
itu akan mencetak daftar antrian dengan jumlah tugas yang tertunda. sebagai contoh:
Saya mengetahui hal ini ketika saya memiliki hak sudo, tetapi saya ingin pengguna sistem yang tidak terjangkau dapat memeriksa - saran?
bijak
Selain itu, Anda dapat mengirimkan melalui ini grep -e "^celery\s" | cut -f2untuk mengekstrak bahwa 166jika Anda ingin memproses angka itu nanti, katakan untuk statistik.
jamesc
22
Jika Anda tidak menggunakan tugas yang diprioritaskan, ini sebenarnya cukup sederhana jika Anda menggunakan Redis. Untuk mendapatkan jumlah tugas:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Tapi, tugas yang diprioritaskan menggunakan kunci yang berbeda dalam redis , sehingga gambaran lengkapnya sedikit lebih rumit. Gambaran lengkapnya adalah Anda harus meminta redis untuk setiap prioritas tugas. Dalam python (dan dari proyek Bunga), ini terlihat seperti:
PRIORITY_SEP ='\x06\x16'
DEFAULT_PRIORITY_STEPS =[0,3,6,9]def make_queue_name_for_pri(queue, pri):"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""if pri notin DEFAULT_PRIORITY_STEPS:raiseValueError('Priority not in priority steps')return'{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri)if pri else(queue,'','')))def get_queue_length(queue_name='celery'):"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names =[make_queue_name_for_pri(queue_name, pri)for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)return sum([r.llen(x)for x in priority_names])
Jika Anda ingin mendapatkan tugas yang sebenarnya, Anda dapat menggunakan sesuatu seperti:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0-1
Dari sana Anda harus membatalkan daftar yang dikembalikan. Dalam kasus saya, saya dapat mencapai ini dengan sesuatu seperti:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)
l = r.lrange('celery',0,-1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Harap diingat bahwa deserialisasi dapat berlangsung sesaat, dan Anda harus menyesuaikan perintah di atas untuk bekerja dengan berbagai prioritas.
Saya telah memperbarui hal di atas untuk menangani tugas-tugas yang diprioritaskan. Kemajuan!
mlissner
1
Hanya untuk menguraikan hal-hal, yang DATABASE_NUMBERdigunakan secara default adalah 0, dan apa QUEUE_NAMEadanya celery, sehingga redis-cli -n 0 llen celeryakan mengembalikan jumlah pesan yang antri.
Vineet Bansal
Untuk seledri saya, nama antriannya '{{{0}}}{1}{2}'bukan '{0}{1}{2}'. Selain itu, ini bekerja dengan sempurna!
Jika Anda menggunakan Seledri + Django cara paling sederhana untuk memeriksa tugas menggunakan perintah langsung dari terminal Anda di lingkungan virtual Anda atau menggunakan path lengkap ke seledri:
Jika Anda memiliki proyek define, Anda dapat menggunakancelery -A my_proj inspect reserved
sashaboulouds
6
Solusi salin-tempel untuk Redis dengan serialisasi json:
def get_celery_queue_items(queue_name):import base64
import json
# Get a configured instance of a celery app:from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True)as conn:
tasks = conn.default_channel.client.lrange(queue_name,0,-1)
decoded_tasks =[]for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)return decoded_tasks
Ini bekerja dengan Django. Hanya saja, jangan lupa untuk berubah yourproject.celery.
Jika Anda menggunakan serializer acar, maka Anda dapat mengubah body =jalurnya menjadi body = pickle.loads(base64.b64decode(j['body'])).
Jim Hunziker
4
Modul pemeriksaan seledri tampaknya hanya menyadari tugas-tugas dari perspektif pekerja. Jika Anda ingin melihat pesan-pesan yang ada dalam antrian (belum ditarik oleh para pekerja) Saya sarankan untuk menggunakan pyrabbit , yang dapat berinteraksi dengan rabbitmq http api untuk mengambil semua jenis informasi dari antrian.
Saya pikir satu-satunya cara untuk mendapatkan tugas-tugas yang sedang menunggu adalah menyimpan daftar tugas yang Anda mulai dan membiarkan tugas menghapus dirinya sendiri dari daftar ketika sudah dimulai.
@donconcode Saya tidak berpikir itu informasi yang cukup bagi saya untuk merespon dengan membantu. Anda bisa membuka pertanyaan Anda sendiri. Saya tidak berpikir itu akan menjadi duplikat yang satu ini jika Anda menentukan bahwa Anda ingin mengambil informasi dengan python. Saya akan kembali ke stackoverflow.com/a/19465670/9843399 , yang merupakan dasar dari jawaban saya, dan pastikan itu berfungsi lebih dulu.
Caleb Syring
@ CalebSyring Ini adalah pendekatan pertama yang benar-benar menunjukkan kepada saya tugas yang antri. Sangat bagus. Satu-satunya masalah bagi saya adalah bahwa daftar tambahan tampaknya tidak berfungsi. Adakah ide bagaimana saya dapat membuat fungsi callback menulis ke daftar?
Varlor
@ Varlor Maaf, ada orang yang mengedit jawaban saya dengan tidak tepat. Anda dapat melihat pada riwayat edit untuk jawaban asli, yang kemungkinan besar akan berhasil untuk Anda. Saya sedang berusaha memperbaiki ini. (EDIT: Saya baru saja masuk dan menolak hasil edit, yang memiliki kesalahan python yang jelas. Beri tahu saya apakah ini memperbaiki masalah Anda atau tidak.)
Caleb Syring
@ CalebSyring Saya sekarang menggunakan kode Anda di kelas, memiliki daftar sebagai atribut kelas berfungsi!
Varlor
2
Sejauh yang saya tahu, Selery tidak memberikan API untuk memeriksa tugas-tugas yang menunggu dalam antrian. Ini khusus broker. Jika Anda menggunakan Redis sebagai broker sebagai contoh, maka memeriksa tugas yang menunggu dalam celeryantrian (default) semudah:
terhubung ke database broker
daftar item dalam celerydaftar (perintah LRANGE misalnya)
Perlu diingat bahwa ini adalah tugas yang menunggu untuk dipilih oleh pekerja yang tersedia. Cluster Anda mungkin memiliki beberapa tugas berjalan - itu tidak akan ada dalam daftar ini karena mereka sudah dipilih.
Saya sampai pada kesimpulan cara terbaik untuk mendapatkan jumlah pekerjaan dalam antrian adalah dengan menggunakan rabbitmqctlseperti yang disarankan beberapa kali di sini. Untuk mengizinkan pengguna yang dipilih untuk menjalankan perintah dengan sudosaya mengikuti instruksi di sini (saya melewatkan mengedit bagian profil karena saya tidak keberatan mengetikkan sudo sebelum perintah.)
Saya juga mengambil jamesc grepdan cutsnippet dan membungkusnya dalam panggilan subproses.
from subprocess importPopen, PIPE
p1 =Popen(["sudo","rabbitmqctl","list_queues","-p","[name of your virtula host"], stdout=PIPE)
p2 =Popen(["grep","-e","^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 =Popen(["cut","-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()print("number of jobs on queue: %i"% int(p3.communicate()[0]))
from celery.task.control import inspect
def key_in_list(k, l):return bool([Truefor i in l if k in i.values()])def check_task(task_id):
task_value_dict = inspect().active().values()for task_list in task_value_dict:if self.key_in_list(task_id, task_list):returnTruereturnFalse
Jika Anda mengontrol kode tugas maka Anda dapat mengatasi masalah dengan membiarkan tugas memicu percobaan ulang yang sepele saat pertama kali dijalankan, lalu memeriksa inspect().reserved(). Coba lagi mendaftarkan tugas dengan backend hasil, dan seledri dapat melihatnya. Tugas harus menerima selfatau contextsebagai parameter pertama agar kami dapat mengakses jumlah coba lagi.
Jawaban:
Sunting: Lihat jawaban lain untuk mendapatkan daftar tugas dalam antrian.
Anda harus melihat di sini: Panduan Seledri - Memeriksa Pekerja
Pada dasarnya ini:
Tergantung apa yang kamu inginkan
sumber
i.reserved()
untuk mendapatkan daftar tugas yang antri.inspect(['celery@Flatty'])
. Peningkatan kecepatan besar berakhirinspect()
.jika Anda menggunakan rabbitMQ, gunakan ini di terminal:
itu akan mencetak daftar antrian dengan jumlah tugas yang tertunda. sebagai contoh:
angka di kolom kanan adalah jumlah tugas dalam antrian. di atas, antrian seledri memiliki 166 tugas yang tertunda.
sumber
grep -e "^celery\s" | cut -f2
untuk mengekstrak bahwa166
jika Anda ingin memproses angka itu nanti, katakan untuk statistik.Jika Anda tidak menggunakan tugas yang diprioritaskan, ini sebenarnya cukup sederhana jika Anda menggunakan Redis. Untuk mendapatkan jumlah tugas:
Tapi, tugas yang diprioritaskan menggunakan kunci yang berbeda dalam redis , sehingga gambaran lengkapnya sedikit lebih rumit. Gambaran lengkapnya adalah Anda harus meminta redis untuk setiap prioritas tugas. Dalam python (dan dari proyek Bunga), ini terlihat seperti:
Jika Anda ingin mendapatkan tugas yang sebenarnya, Anda dapat menggunakan sesuatu seperti:
Dari sana Anda harus membatalkan daftar yang dikembalikan. Dalam kasus saya, saya dapat mencapai ini dengan sesuatu seperti:
Harap diingat bahwa deserialisasi dapat berlangsung sesaat, dan Anda harus menyesuaikan perintah di atas untuk bekerja dengan berbagai prioritas.
sumber
DATABASE_NUMBER
digunakan secara default adalah0
, dan apaQUEUE_NAME
adanyacelery
, sehinggaredis-cli -n 0 llen celery
akan mengembalikan jumlah pesan yang antri.'{{{0}}}{1}{2}'
bukan'{0}{1}{2}'
. Selain itu, ini bekerja dengan sempurna!Untuk mengambil tugas dari backend, gunakan ini
sumber
Jika Anda menggunakan Seledri + Django cara paling sederhana untuk memeriksa tugas menggunakan perintah langsung dari terminal Anda di lingkungan virtual Anda atau menggunakan path lengkap ke seledri:
Doc : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
Juga jika Anda menggunakan Celery + RabbitMQ Anda dapat memeriksa daftar antrian menggunakan perintah berikut:
Info lebih lanjut : https://linux.die.net/man/1/rabbitmqctl
sumber
celery -A my_proj inspect reserved
Solusi salin-tempel untuk Redis dengan serialisasi json:
Ini bekerja dengan Django. Hanya saja, jangan lupa untuk berubah
yourproject.celery
.sumber
body =
jalurnya menjadibody = pickle.loads(base64.b64decode(j['body']))
.Modul pemeriksaan seledri tampaknya hanya menyadari tugas-tugas dari perspektif pekerja. Jika Anda ingin melihat pesan-pesan yang ada dalam antrian (belum ditarik oleh para pekerja) Saya sarankan untuk menggunakan pyrabbit , yang dapat berinteraksi dengan rabbitmq http api untuk mengambil semua jenis informasi dari antrian.
Contoh dapat ditemukan di sini: Ambil panjang antrian dengan Seledri (RabbitMQ, Django)
sumber
Saya pikir satu-satunya cara untuk mendapatkan tugas-tugas yang sedang menunggu adalah menyimpan daftar tugas yang Anda mulai dan membiarkan tugas menghapus dirinya sendiri dari daftar ketika sudah dimulai.
Dengan rabbitmqctl dan list_queues Anda bisa mendapatkan gambaran umum tentang berapa banyak tugas yang menunggu, tetapi bukan tugas itu sendiri: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Jika yang Anda inginkan termasuk tugas yang sedang diproses, tetapi belum selesai, Anda dapat menyimpan daftar tugas Anda dan memeriksa statusnya:
Atau Anda membiarkan Celery menyimpan hasilnya dengan CELERY_RESULT_BACKEND dan memeriksa tugas Anda yang tidak ada di sana.
sumber
Ini berfungsi untuk saya dalam aplikasi saya:
active_jobs
akan menjadi daftar string yang sesuai dengan tugas dalam antrian.Jangan lupa untuk menukar CELERY_APP_INSTANCE dengan milik Anda.
Terima kasih kepada @ashish karena mengarahkan saya ke arah yang benar dengan jawabannya di sini: https://stackoverflow.com/a/19465670/9843399
sumber
jobs
selalu nol ... ada ide?Sejauh yang saya tahu, Selery tidak memberikan API untuk memeriksa tugas-tugas yang menunggu dalam antrian. Ini khusus broker. Jika Anda menggunakan Redis sebagai broker sebagai contoh, maka memeriksa tugas yang menunggu dalam
celery
antrian (default) semudah:celery
daftar (perintah LRANGE misalnya)Perlu diingat bahwa ini adalah tugas yang menunggu untuk dipilih oleh pekerja yang tersedia. Cluster Anda mungkin memiliki beberapa tugas berjalan - itu tidak akan ada dalam daftar ini karena mereka sudah dipilih.
sumber
Saya sampai pada kesimpulan cara terbaik untuk mendapatkan jumlah pekerjaan dalam antrian adalah dengan menggunakan
rabbitmqctl
seperti yang disarankan beberapa kali di sini. Untuk mengizinkan pengguna yang dipilih untuk menjalankan perintah dengansudo
saya mengikuti instruksi di sini (saya melewatkan mengedit bagian profil karena saya tidak keberatan mengetikkan sudo sebelum perintah.)Saya juga mengambil jamesc
grep
dancut
snippet dan membungkusnya dalam panggilan subproses.sumber
sumber
Jika Anda mengontrol kode tugas maka Anda dapat mengatasi masalah dengan membiarkan tugas memicu percobaan ulang yang sepele saat pertama kali dijalankan, lalu memeriksa
inspect().reserved()
. Coba lagi mendaftarkan tugas dengan backend hasil, dan seledri dapat melihatnya. Tugas harus menerimaself
ataucontext
sebagai parameter pertama agar kami dapat mengakses jumlah coba lagi.Solusi ini adalah broker agnostik, yaitu. Anda tidak perlu khawatir tentang apakah Anda menggunakan RabbitMQ atau Redis untuk menyimpan tugas.
EDIT: setelah pengujian saya menemukan ini hanya solusi parsial. Ukuran yang dipesan terbatas pada pengaturan prefetch untuk pekerja.
sumber
Dengan
subprocess.run
:Hati-hati untuk mengubah
my_proj
denganyour_proj
sumber