Bagaimana cara memeriksa status tugas di Celery?

95

Bagaimana cara memeriksa apakah tugas sedang berjalan dalam seledri (khususnya, saya menggunakan celery-django)?

Saya telah membaca dokumentasinya, dan saya telah mencari di Google, tetapi saya tidak dapat melihat panggilan seperti:

my_example_task.state() == RUNNING

Kasus penggunaan saya adalah saya memiliki layanan eksternal (java) untuk transcoding. Ketika saya mengirim dokumen untuk ditranskode, saya ingin memeriksa apakah tugas yang menjalankan layanan itu sedang berjalan, dan jika tidak, untuk memulainya (kembali).

Saya menggunakan versi stabil saat ini - 2.4, saya yakin.

Marcin
sumber

Jawaban:

98

Kembalikan task_id (yang diberikan dari .delay ()) dan tanyakan contoh seledri sesudahnya tentang status:

x = method.delay(1,2)
print x.task_id

Saat bertanya, dapatkan AsyncResult baru menggunakan task_id ini:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Gregor
sumber
11
Terima kasih, tetapi bagaimana jika saya tidak memiliki akses ke x?
Marcin
4
Di mana Anda mengantri pekerjaan Anda ke seledri? Di sana Anda harus mengembalikan task_id untuk melacak pekerjaan di masa mendatang.
Gregor
Tidak seperti @ Marcin, jawaban ini tidak menggunakan metode statis Task.AsyncResult () sebagai pabrik AsyncResult, yang secara membantu menggunakan kembali konfigurasi backend, jika tidak, kesalahan akan muncul saat mencoba mendapatkan hasilnya.
ArnauOrriols
2
@Chris Kontroversi dengan kode @gregor ada dalam contoh async_result. Dalam kasus penggunaan Anda, Anda sudah memiliki instance, Anda siap menggunakannya. Tetapi apa yang terjadi jika Anda hanya memiliki id tugas, dan perlu membuat async_resultinstance agar dapat memanggil async_result.get()? Ini adalah turunan dari AsyncResultkelas, tetapi Anda tidak dapat menggunakan kelas mentah celery.result.AsyncResult, Anda harus mendapatkan kelas dari fungsi yang dibungkus oleh app.task(). Dalam kasus Anda, Anda akan melakukannyaasync_result = run_instance.AsyncResult('task-id')
ArnauOrriols
1
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task(). - Saya pikir ini adalah cara yang seharusnya digunakan. Baca kodenya: github.com/celery/celery/blob/…
nevelis
74

Membuat AsyncResultobjek dari id tugas adalah cara yang direkomendasikan di FAQ untuk mendapatkan status tugas ketika satu-satunya yang Anda miliki adalah id tugas.

Namun, pada Seledri 3.x, ada peringatan signifikan yang dapat menggigit orang jika mereka tidak memperhatikannya. Itu benar-benar tergantung pada skenario kasus penggunaan tertentu.

Secara default, Celery tidak merekam status "berjalan".

Agar Celery dapat merekam bahwa tugas sedang berjalan, Anda harus mengatur task_track_startedke True. Berikut adalah tugas sederhana yang menguji ini:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

When task_track_startedis False, yang merupakan default, status menunjukkan PENDINGmeskipun tugas telah dimulai. Jika Anda disetel task_track_startedke True, statusnya akan menjadi STARTED.

Negara PENDINGberarti "Saya tidak tahu."

Sebuah AsyncResultdengan negara PENDINGtidak berarti lebih dari itu Celery tidak tahu status tugasnya. Ini bisa jadi karena sejumlah alasan.

Untuk satu hal, AsyncResultdapat dibangun dengan id tugas yang tidak valid. "Tugas" seperti itu akan dianggap tertunda oleh Celery:

>>> task.AsyncResult("invalid").status
'PENDING'

Oke, jadi tidak ada yang akan memberikan id yang jelas tidak valid AsyncResult. Cukup adil, tetapi juga memiliki efek yang AsyncResultjuga akan mempertimbangkan tugas yang telah berhasil dijalankan tetapi Celery telah dilupakan PENDING. Sekali lagi, dalam beberapa skenario kasus penggunaan, ini bisa menjadi masalah. Sebagian dari masalah bergantung pada bagaimana Celery dikonfigurasi untuk menyimpan hasil tugas, karena bergantung pada ketersediaan "batu nisan" di backend hasil. ( "Batu nisan" adalah penggunaan istilah dalam dokumentasi Seledri untuk potongan data yang merekam bagaimana tugas berakhir.) Menggunakan AsyncResulttidak akan bekerja sama sekali jika task_ignore_resultini True. Masalah yang lebih menjengkelkan adalah Celery secara default membuat batu nisan mati. Ituresult_expirespengaturan secara default diatur ke 24 jam. Jadi jika Anda meluncurkan tugas, dan merekam id dalam penyimpanan jangka panjang, dan lebih dari 24 jam kemudian, Anda membuat AsyncResultdengan itu, statusnya akan menjadi PENDING.

Semua "tugas nyata" dimulai di PENDINGnegara bagian. Jadi mendapatkan PENDINGtugas bisa berarti bahwa tugas tersebut diminta tetapi tidak pernah berkembang lebih jauh dari ini (untuk alasan apa pun). Atau bisa juga berarti tugasnya berjalan tetapi Celery lupa keadaannya.

Aduh! AsyncResulttidak akan berhasil untuk saya. Apa lagi yang bisa saya lakukan?

Saya lebih suka melacak tujuan daripada melacak tugas itu sendiri . Saya menyimpan beberapa informasi tugas tetapi itu benar-benar sekunder untuk melacak tujuan. Sasaran disimpan dalam penyimpanan terpisah dari Seledri. Ketika sebuah permintaan perlu melakukan penghitungan tergantung pada beberapa tujuan yang telah dicapai, ia memeriksa apakah tujuan telah tercapai, jika ya, maka ia menggunakan tujuan yang di-cache ini, jika tidak ia memulai tugas yang akan mempengaruhi tujuan, dan mengirim ke klien yang membuat permintaan HTTP respon yang menunjukkan itu harus menunggu hasil.


Nama variabel dan hyperlink di atas adalah untuk Celery 4.x. Dalam 3.x yang sesuai variabel dan hyperlink adalah: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

Louis
sumber
Jadi jika saya ingin memeriksa hasilnya nanti (bahkan mungkin dalam proses lain), saya lebih baik dengan implementasi saya sendiri? Menyimpan hasilnya ke database secara manual?
Franklin Yu
Ya, saya akan memisahkan melacak "tujuan" dari melacak "tugas". Saya menulis "melakukan perhitungan yang bergantung pada beberapa tujuan". Biasanya, "tujuan" juga merupakan perhitungan. Misalnya jika saya ingin menampilkan artikel X kepada pengguna, saya harus mengubahnya dari XML ke HTML, tetapi sebelum itu, saya harus menyelesaikan semua referensi bibliografi. (X seperti artikel jurnal.) Saya memeriksa apakah tujuan "artikel X dengan semua referensi bibliografi terselesaikan" ada dan menggunakannya daripada mencoba memeriksa status tugas tugas Celery yang akan menghitung tujuan yang saya inginkan.
Louis
Dan informasi "artikel X dengan semua referensi bibliografi diselesaikan" disimpan dalam cache memori dan disimpan dalam database eXist-db.
Louis
63

Setiap Taskobjek memiliki .requestproperti, yang berisi AsyncRequestobjek itu . Karena itu, baris berikut memberikan status Tugas task:

task.AsyncResult(task.request.id).state
Marcin
sumber
2
Apakah ada cara untuk menyimpan persentase kemajuan tugas?
patrick
5
Ketika saya melakukan ini, saya mendapatkan AsyncResult yang TERTUNDA secara permanen, bahkan jika saya menunggu cukup lama untuk menyelesaikan tugas. Adakah cara untuk membuat ini melihat perubahan keadaan? Saya yakin backend saya dikonfigurasi, dan saya mencoba menyetel CELERY_TRACK_STARTED = True, tetapi tidak berhasil.
dstromberg
1
@dstromberg Sayangnya sudah 4 tahun sejak ini menjadi masalah bagi saya, jadi saya tidak dapat membantu. Anda hampir pasti perlu mengkonfigurasi seledri untuk melacak status.
Marcin
Menambah pengamatan @ dstromberg, hanya untuk konfirmasi, saya mengambil tugas seledri yang saya tahu pasti berhasil dan memeriksa statepropertinya, masih dikembalikan PENDING. Ini tampaknya bukan cara yang dapat diandalkan untuk melacak status tugas seledri dari terminal. Selain itu, saya menjalankan Bunga Seledri (Alat Pemantau Seledri), untuk beberapa alasan tidak muncul tugas yang saya cari dalam daftar tugas yang telah dijalankan. Saya mungkin harus melihat ke pengaturan Bunga untuk melihat apakah ada yang mengatakan hanya muncul hingga jam tertentu di masa lalu.
Jauh
16

Anda juga dapat membuat status kustom dan memperbarui eksekusi tugas duting nilainya. Contoh ini dari dokumen:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

msangel
sumber
11

Pertanyaan lama tapi saya baru saja mengalami masalah ini.

Jika Anda mencoba mendapatkan task_id Anda dapat melakukannya seperti ini:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Sekarang Anda tahu persis apa itu task_id dan sekarang dapat menggunakannya untuk mendapatkan AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
Cesar Rios
sumber
4
Anda sama sekali tidak perlu membuat ID tugas Anda sendiri dan meneruskannya ke apply_async. Objek yang dikembalikan apply_async adalah AsyncResultobjek, yang memiliki id dari tugas yang dihasilkan Celery.
Louis
1
Koreksi saya jika saya salah, tetapi bukankah terkadang berguna untuk membuat UUID berdasarkan beberapa masukan, sehingga semua panggilan yang mendapatkan masukan yang sama mendapatkan UUID yang sama? IOW, mungkin terkadang berguna untuk menentukan task_id Anda.
dstromberg
1
@dstromberg Pertanyaan yang diajukan oleh OP adalah "bagaimana cara memeriksa status tugas" dan jawabannya di sini mengatakan "Jika Anda mencoba untuk mendapatkan task_id ...". Baik memeriksa status tugas, tidak task_idmeminta Anda membuat id tugas sendiri. Dalam komentar Anda, Anda telah membayangkan alasan di atas dan di luar "bagaimana cara memeriksa status tugas" dan "Jika Anda mencoba untuk mendapatkan task_id ...` Bagus jika Anda memiliki kebutuhan itu tetapi bukan itu masalahnya di sini. (Selain itu, menggunakan uuid()untuk menghasilkan id tugas sama sekali tidak melakukan apa pun selain apa yang dilakukan Celery secara default.)
Louis
Saya setuju bahwa OP tidak secara khusus menanyakan cara mendapatkan ID tugas yang dapat diprediksi, tetapi jawaban untuk pertanyaan OP saat ini adalah "lacak ID tugas dan lakukan x". Menurut saya, melacak ID tugas tidak praktis dalam berbagai situasi sehingga jawabannya mungkin tidak benar-benar memuaskan. Jawaban ini membantu saya menyelesaikan kasus penggunaan saya (jika saya dapat mengatasi batasan lain yang disebutkan) untuk alasan yang sama yang ditunjukkan @dstromberg - apakah itu termotivasi untuk alasan itu atau tidak.
claytond
7

Cukup gunakan API ini dari FAQ seledri

result = app.AsyncResult(task_id)

Ini bekerja dengan baik.

David Ding
sumber
1

Jawaban tahun 2020:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
Adrian Garcia Moreno
sumber
0

Mencoba:

task.AsyncResult(task.request.id).state

ini akan memberikan status Celery Task. Jika Celery Task sudah dalam status FAILURE , maka akan muncul Exception:

raised unexpected: KeyError('exc_type',)

gogasca.dll
sumber
0

Saya menemukan informasi yang berguna di

Panduan Pekerja Proyek Celery memeriksa pekerja

Untuk kasus saya, saya memeriksa untuk melihat apakah Celery sedang bekerja.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Anda bisa bermain dengan inspect untuk mendapatkan kebutuhan Anda.

zerocog
sumber
0
  • Pertama, di APP seledri Anda :

vi my_celery_apps / app1.py

app = Celery(worker_name)
  • dan selanjutnya, ubah ke file tugas, impor aplikasi dari modul aplikasi seledri Anda.

vi tugas / task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

Anda ZhengChuan
sumber
-1

Selain dari pendekatan Programatik di atas status Menggunakan Tugas Bunga dapat dengan mudah dilihat.

Pemantauan waktu nyata menggunakan Celery Events. Flower adalah alat berbasis web untuk memantau dan mengatur kelompok Seledri.

  1. Kemajuan tugas dan sejarah
  2. Kemampuan untuk menampilkan detail tugas (argumen, waktu mulai, waktu proses, dan lainnya)
  3. Grafik dan statistik

Dokumen Resmi: Bunga - Alat pemantauan seledri

Instalasi:

$ pip install flower

Pemakaian:

http://localhost:5555
Roshan Bagdiya
sumber
-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
Saurabh I
sumber
2
Harap jangan hanya memposting kode sebagai jawaban, tetapi juga berikan penjelasan tentang fungsi kode Anda dan bagaimana kode tersebut menyelesaikan masalah pertanyaan. Jawaban dengan penjelasan biasanya lebih bermanfaat dan berkualitas lebih baik, dan lebih cenderung menarik suara positif.
Mark Rotteveel