Bagaimana Anda menguji unit Celery?

Jawaban:

61

Dimungkinkan untuk menguji tugas secara sinkron menggunakan sembarang lib unittest di luar sana. Saya biasanya melakukan 2 sesi tes berbeda saat mengerjakan tugas seledri. Yang pertama (seperti yang saya sarankan di bawah) benar-benar sinkron dan harus menjadi salah satu yang memastikan algoritme melakukan apa yang seharusnya dilakukan. Sesi kedua menggunakan keseluruhan sistem (termasuk broker) dan memastikan saya tidak mengalami masalah serialisasi atau distribusi lainnya, masalah komunikasi.

Begitu:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

Dan tes Anda:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Semoga membantu!

FlaPer87
sumber
1
Itu berfungsi kecuali pada tugas-tugas yang menggunakan HttpDispatchTask - docs.celeryproject.org/en/latest/userguide/remote-tasks.html di mana saya harus mengatur celery.conf.CELERY_ALWAYS_EAGER = True tetapi bahkan dengan juga mengatur celery.conf.CELERY_IMPORTS = ('celery.task.http') tes gagal dengan NotRegistered: celery.task.http.HttpDispatchTask
davidmytton
Aneh, Anda yakin tidak mengalami masalah impor? Tes ini berhasil (perhatikan bahwa saya memalsukan respons sehingga mengembalikan apa yang diharapkan seledri). Juga, modul yang ditentukan dalam CELERY_IMPORTS akan diimpor selama inisialisasi pekerja , untuk menghindari hal ini saya sarankan Anda untuk menelepon celery.loader.import_default_modules().
FlaPer87
Saya juga menyarankan Anda untuk melihat di sini . Itu mengolok-olok permintaan http. Entah apakah itu membantu, saya rasa Anda ingin menguji layanan yang aktif dan berjalan, bukan?
FlaPer87
52

Saya menggunakan ini:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Docs: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER memungkinkan Anda menjalankan tugas secara sinkron, dan Anda tidak memerlukan server seledri.

guettli
sumber
1
Saya pikir ini sudah ketinggalan zaman - saya mengerti ImportError: No module named celeryconfig.
Tanggal
7
Saya percaya di atas mengasumsikan modul celeryconfig.pyada dalam paket seseorang. Lihat docs.celeryproject.org/en/latest/getting-started/… .
Kamil Sindi
1
Saya tahu ini sudah tua tetapi dapatkah Anda memberikan contoh lengkap tentang cara meluncurkan tugas adddari pertanyaan OP di dalam TestCasekelas?
Kruupös
@ MaxChrétien maaf, saya tidak bisa memberikan contoh lengkap, karena saya tidak menggunakan seledri lagi. Anda dapat mengedit pertanyaan saya, jika Anda memiliki poin reputasi yang cukup. Jika Anda tidak memiliki cukup, tolong beri tahu saya apa yang harus saya salin + tempel ke jawaban ini.
guettli
1
@ miken terima kasih. Karena jawaban terbaru entah bagaimana mengatasi masalah yang ingin saya bantu, saya baru saja meninggalkan komentar bahwa dokumen resmi untuk 4.0 tidak menyarankan penggunaan CELERY_TASK_ALWAYS_EAGERuntuk pengujian unit.
krassowski
33

Tergantung pada apa sebenarnya yang ingin Anda uji.

  • Uji kode tugas secara langsung. Jangan panggil "task.delay (...)" panggil saja "task (...)" dari unit test Anda.
  • Gunakan CELERY_ALWAYS_EAGER . Ini akan menyebabkan tugas Anda dipanggil segera pada saat Anda mengatakan "task.delay (...)", sehingga Anda dapat menguji seluruh jalur (tetapi bukan perilaku asinkron apa pun).
slacy
sumber
24

unittest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

perlengkapan py.test

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Tambahan: buat send_task respek menjadi bersemangat

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
Kamil Sindi
sumber
22

Bagi mereka yang menggunakan Celery 4 itu:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Karena nama pengaturan telah diubah dan perlu diperbarui jika Anda memilih untuk meningkatkan, lihat

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

okrutny
sumber
11
Menurut dokumen resmi , penggunaan "task_always_eager" (sebelumnya "CELERY_ALWAYS_EAGER") tidak cocok untuk pengujian unit. Alih-alih, mereka mengusulkan beberapa cara hebat lainnya untuk menguji aplikasi Celery Anda.
krassowski
4
Saya hanya akan menambahkan bahwa alasan mengapa Anda tidak menginginkan tugas yang bersemangat dalam pengujian unit Anda adalah karena Anda tidak menguji misalnya serialisasi parameter yang akan terjadi setelah Anda menggunakan kode dalam produksi.
Sialan
15

Pada Seledri 3.0 , salah satu cara untuk mengatur CELERY_ALWAYS_EAGERdi Django adalah:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
Aaron Lelevier
sumber
7

Sejak Celery v4.0 , perlengkapan py.test disediakan untuk memulai pekerja seledri hanya untuk pengujian dan akan ditutup setelah selesai:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: [email protected] (running)>
    assert myfunc.delay().wait(3)

Di antara perlengkapan lain yang dijelaskan di http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test , Anda dapat mengubah opsi default seledri dengan mendefinisikan ulang celery_configperlengkapan dengan cara ini:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

Secara default, pekerja pengujian menggunakan broker dalam memori dan hasil backend. Tidak perlu menggunakan Redis atau RabbitMQ lokal jika tidak menguji fitur tertentu.

alanjds
sumber
Dear downvoter, maukah Anda berbagi mengapa ini jawaban yang buruk? Hormat kami, terima kasih.
alanjds
2
Tidak berhasil untuk saya, rangkaian pengujian hanya hang. Bisakah Anda memberikan lebih banyak konteks? (Saya belum memilih;)).
dualitas_
Dalam kasus saya, saya harus secara eksplisit mengatur perlengkapan celey_config untuk menggunakan broker memori dan cache + backend memori
sanzoghenzo
5

referensi menggunakan pytest.

def test_add(celery_worker):
    mytask.delay()

jika Anda menggunakan flask, setel konfigurasi aplikasi

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

dan masuk conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application
Yoge
sumber
2

Dalam kasus saya (dan saya berasumsi banyak orang lain), yang saya inginkan hanyalah menguji logika batin tugas menggunakan pytest.

TL; DR; akhirnya mengejek semuanya ( OPSI 2 )


Contoh Kasus Penggunaan :

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

tetapi, karena shared_taskdekorator melakukan banyak logika internal seledri, ini sebenarnya bukan pengujian unit.

Jadi, bagi saya, ada 2 opsi:

OPSI 1: Pisahkan logika internal

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

Ini terlihat sangat aneh, dan selain membuatnya kurang dapat dibaca, ini juga memerlukan secara manual mengekstrak dan meneruskan atribut yang merupakan bagian dari permintaan, misalnya task_idjika Anda membutuhkannya, yang membuat logikanya kurang murni.

OPSI 2:
mengejek bagian dalam seledri

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

yang kemudian memungkinkan saya untuk memalsukan objek permintaan (sekali lagi, jika Anda memerlukan sesuatu dari permintaan, seperti id, atau penghitung percobaan ulang.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

Solusi ini jauh lebih manual, tetapi, ini memberi saya kontrol yang saya perlukan untuk benar-benar menguji unit , tanpa mengulang sendiri, dan tanpa kehilangan ruang lingkup seledri.

Daniel Dubovski
sumber