Meskipun serupa dalam kasus umum ("jalankan dan dapatkan hasil untuk banyak tugas"), setiap fungsi memiliki beberapa fungsi khusus untuk kasus lain:
Mengembalikan instance Future, memungkinkan pengelompokan tugas tingkat tinggi:
import asyncio
from pprint import pprint
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(1, 3))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])
all_groups = asyncio.gather(group1, group2, group3)
results = loop.run_until_complete(all_groups)
loop.close()
pprint(results)
Semua tugas dalam grup dapat dibatalkan dengan menelepon group2.cancel()
atau bahkan all_groups.cancel()
. Lihat juga .gather(..., return_exceptions=True)
,
Mendukung menunggu untuk dihentikan setelah tugas pertama selesai, atau setelah batas waktu yang ditentukan, memungkinkan tingkat presisi operasi yang lebih rendah:
import asyncio
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(0.5, 5))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
print("Get first result:")
finished, unfinished = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
for task in finished:
print(task.result())
print("unfinished:", len(unfinished))
print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
asyncio.wait(unfinished, timeout=2))
for task in finished2:
print(task.result())
print("unfinished2:", len(unfinished2))
print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))
for task in finished3:
print(task.result())
loop.close()
asyncio.wait
lebih rendah dariasyncio.gather
.Seperti namanya,
asyncio.gather
fokus utamanya adalah mengumpulkan hasil. itu menunggu banyak masa depan dan mengembalikan hasilnya dalam urutan tertentu.asyncio.wait
tunggu saja di masa depan. dan alih-alih memberi Anda hasil secara langsung, ini memberikan tugas yang selesai dan tertunda. Anda harus mengumpulkan nilai-nilai secara manual.Selain itu, Anda dapat menentukan untuk menunggu semua masa depan selesai atau hanya dengan yang pertama
wait
.sumber
it waits on a bunch of futures and return their results in a given order
. bagaimana jika saya memiliki 10000000000000 tugas dan semuanya mengembalikan data yang besar? apakah hasilnya akan membuat memory boom?Saya juga memperhatikan bahwa Anda dapat menyediakan sekelompok coroutine di wait () hanya dengan menentukan daftar:
result=loop.run_until_complete(asyncio.wait([ say('first hello', 2), say('second hello', 1), say('third hello', 4) ]))
Sedangkan pengelompokan dalam gathering () dilakukan dengan hanya menetapkan beberapa coroutine:
result=loop.run_until_complete(asyncio.gather( say('first hello', 2), say('second hello', 1), say('third hello', 4) ))
sumber
gather()
, misalnya:asyncio.gather(*task_list)
Perbedaan yang sangat penting, yang mudah terlewatkan, adalah perilaku default dari kedua fungsi ini, dalam hal pengecualian.
Saya akan menggunakan contoh ini untuk mensimulasikan coroutine yang akan memunculkan pengecualian, terkadang -
import asyncio import random async def a_flaky_tsk(i): await asyncio.sleep(i) # bit of fuzz to simulate a real-world example if i % 2 == 0: print(i, "ok") else: print(i, "crashed!") raise ValueError coros = [a_flaky_tsk(i) for i in range(10)]
await asyncio.gather(*coros)
keluaran -0 ok 1 crashed! Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module> asyncio.run(main()) File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run return loop.run_until_complete(main) File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main await asyncio.gather(*coros) File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError
Seperti yang Anda lihat, coros setelah indeks
1
tidak pernah dapat dieksekusi.Tetapi
await asyncio.wait(coros)
terus menjalankan tugas, bahkan jika beberapa di antaranya gagal -0 ok 1 crashed! 2 ok 3 crashed! 4 ok 5 crashed! 6 ok 7 crashed! 8 ok 9 crashed! Task exception was never retrieved future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError Task exception was never retrieved future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError Task exception was never retrieved future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError Task exception was never retrieved future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError Task exception was never retrieved future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()> Traceback (most recent call last): File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk raise ValueError ValueError
Tentu saja, perilaku ini dapat diubah untuk keduanya dengan menggunakan -
asyncio.gather(..., return_exceptions=True)
atau,
asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)
Tapi itu tidak berakhir di sini!
Memperhatikan:
Task exception was never retrieved
di log di atas.asyncio.wait()
tidak akan memunculkan kembali pengecualian dari tugas anak sampai Andaawait
melakukannya satu per satu. (Pelacakan tumpukan di log hanyalah pesan, mereka tidak dapat ditangkap!)done, pending = await asyncio.wait(coros) for tsk in done: try: await tsk except Exception as e: print("I caught:", repr(e))
Keluaran -
0 ok 1 crashed! 2 ok 3 crashed! 4 ok 5 crashed! 6 ok 7 crashed! 8 ok 9 crashed! I caught: ValueError() I caught: ValueError() I caught: ValueError() I caught: ValueError() I caught: ValueError()
Di sisi lain, untuk menangkap pengecualian
asyncio.gather()
, Anda harus -results = await asyncio.gather(*coros, return_exceptions=True) for result_or_exc in results: if isinstance(result_or_exc, Exception): print("I caught:", repr(result_or_exc))
(Output yang sama seperti sebelumnya)
sumber
Selain semua jawaban sebelumnya, saya ingin memberi tahu tentang perilaku yang berbeda dari
gather()
danwait()
seandainya dibatalkan .Kumpulkan pembatalan
Jika
gather()
dibatalkan, semua awaitables yang diajukan (yang belum selesai) juga dibatalkan .Tunggu pembatalan
Jika
wait()
tugas dibatalkan, itu hanya melemparCancelledError
dan tugas yang menunggu tetap utuh.Contoh sederhana:
import asyncio async def task(arg): await asyncio.sleep(5) return arg async def cancel_waiting_task(work_task, waiting_task): await asyncio.sleep(2) waiting_task.cancel() try: await waiting_task print("Waiting done") except asyncio.CancelledError: print("Waiting task cancelled") try: res = await work_task print(f"Work result: {res}") except asyncio.CancelledError: print("Work task cancelled") async def main(): work_task = asyncio.create_task(task("done")) waiting = asyncio.create_task(asyncio.wait({work_task})) await cancel_waiting_task(work_task, waiting) work_task = asyncio.create_task(task("done")) waiting = asyncio.gather(work_task) await cancel_waiting_task(work_task, waiting) asyncio.run(main())
Keluaran:
Terkadang perlu untuk menggabungkan
wait()
dangather()
fungsionalitas. Misalnya, kami ingin menunggu penyelesaian setidaknya satu tugas dan membatalkan tugas tertunda lainnya setelah itu, dan jikawaiting
itu sendiri dibatalkan , maka batalkan juga semua yang tertunda tugas yang .Sebagai contoh nyata, katakanlah kita memiliki acara pemutusan hubungan dan tugas kerja. Dan kami ingin menunggu hasil tugas pekerjaan, tetapi jika koneksi terputus, maka batalkan. Atau kami akan membuat beberapa permintaan paralel, tetapi setelah menyelesaikan setidaknya satu respons, batalkan yang lainnya.
Itu bisa dilakukan dengan cara ini:
import asyncio from typing import Optional, Tuple, Set async def wait_any( tasks: Set[asyncio.Future], *, timeout: Optional[int] = None, ) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]: tasks_to_cancel: Set[asyncio.Future] = set() try: done, tasks_to_cancel = await asyncio.wait( tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED ) return done, tasks_to_cancel except asyncio.CancelledError: tasks_to_cancel = tasks raise finally: for task in tasks_to_cancel: task.cancel() async def task(): await asyncio.sleep(5) async def cancel_waiting_task(work_task, waiting_task): await asyncio.sleep(2) waiting_task.cancel() try: await waiting_task print("Waiting done") except asyncio.CancelledError: print("Waiting task cancelled") try: res = await work_task print(f"Work result: {res}") except asyncio.CancelledError: print("Work task cancelled") async def check_tasks(waiting_task, working_task, waiting_conn_lost_task): try: await waiting_task print("waiting is done") except asyncio.CancelledError: print("waiting is cancelled") try: await waiting_conn_lost_task print("connection is lost") except asyncio.CancelledError: print("waiting connection lost is cancelled") try: await working_task print("work is done") except asyncio.CancelledError: print("work is cancelled") async def work_done_case(): working_task = asyncio.create_task(task()) connection_lost_event = asyncio.Event() waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait()) waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task})) await check_tasks(waiting_task, working_task, waiting_conn_lost_task) async def conn_lost_case(): working_task = asyncio.create_task(task()) connection_lost_event = asyncio.Event() waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait()) waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task})) await asyncio.sleep(2) connection_lost_event.set() # <--- await check_tasks(waiting_task, working_task, waiting_conn_lost_task) async def cancel_waiting_case(): working_task = asyncio.create_task(task()) connection_lost_event = asyncio.Event() waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait()) waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task})) await asyncio.sleep(2) waiting_task.cancel() # <--- await check_tasks(waiting_task, working_task, waiting_conn_lost_task) async def main(): print("Work done") print("-------------------") await work_done_case() print("\nConnection lost") print("-------------------") await conn_lost_case() print("\nCancel waiting") print("-------------------") await cancel_waiting_case() asyncio.run(main())
Keluaran:
Work done ------------------- waiting is done waiting connection lost is cancelled work is done Connection lost ------------------- waiting is done connection is lost work is cancelled Cancel waiting ------------------- waiting is cancelled waiting connection lost is cancelled work is cancelled
sumber