Asyncio.gather vs asyncio.wait

168

asyncio.gatherdan asyncio.waittampaknya memiliki kegunaan yang serupa: Saya memiliki banyak hal asinkron yang ingin saya jalankan / tunggu (tidak harus menunggu satu selesai sebelum yang berikutnya dimulai). Mereka menggunakan sintaks yang berbeda, dan berbeda dalam beberapa detail, tetapi tampaknya sangat tidak pythonic bagi saya untuk memiliki 2 fungsi yang memiliki fungsi yang sangat tumpang tindih. Apa yang saya lewatkan?

Claude
sumber

Jawaban:

196

Meskipun serupa dalam kasus umum ("jalankan dan dapatkan hasil untuk banyak tugas"), setiap fungsi memiliki beberapa fungsi khusus untuk kasus lain:

asyncio.gather()

Mengembalikan instance Future, memungkinkan pengelompokan tugas tingkat tinggi:

import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)

Semua tugas dalam grup dapat dibatalkan dengan menelepon group2.cancel()atau bahkan all_groups.cancel(). Lihat juga .gather(..., return_exceptions=True),

asyncio.wait()

Mendukung menunggu untuk dihentikan setelah tugas pertama selesai, atau setelah batas waktu yang ditentukan, memungkinkan tingkat presisi operasi yang lebih rendah:

import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()
Udi
sumber
6
"Bentuk tanda bintang tunggal (* args) digunakan untuk melewatkan daftar argumen panjang variabel tanpa kata kunci, dan formulir tanda bintang ganda digunakan untuk meneruskan daftar argumen panjang variabel yang diberi kata kunci"
laycat
47

asyncio.waitlebih rendah dari asyncio.gather.

Seperti namanya, asyncio.gatherfokus utamanya adalah mengumpulkan hasil. itu menunggu banyak masa depan dan mengembalikan hasilnya dalam urutan tertentu.

asyncio.waittunggu saja di masa depan. dan alih-alih memberi Anda hasil secara langsung, ini memberikan tugas yang selesai dan tertunda. Anda harus mengumpulkan nilai-nilai secara manual.

Selain itu, Anda dapat menentukan untuk menunggu semua masa depan selesai atau hanya dengan yang pertama wait.

ospider
sumber
1
Anda mengatakan: it waits on a bunch of futures and return their results in a given order. bagaimana jika saya memiliki 10000000000000 tugas dan semuanya mengembalikan data yang besar? apakah hasilnya akan membuat memory boom?
Kingname
1
@Kingname ..wat
Matt Joiner
18

Saya juga memperhatikan bahwa Anda dapat menyediakan sekelompok coroutine di wait () hanya dengan menentukan daftar:

result=loop.run_until_complete(asyncio.wait([
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ]))

Sedangkan pengelompokan dalam gathering () dilakukan dengan hanya menetapkan beberapa coroutine:

result=loop.run_until_complete(asyncio.gather(
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ))
Johny Ebanat
sumber
20
Daftar juga dapat digunakan dengan gather(), misalnya:asyncio.gather(*task_list)
tehfink
1
Begitu juga dengan generator
Jab
Bagaimana Anda dapat menggunakan pengumpulan ini tanpa memblokir sisa skrip?
thebeancounter
4

Perbedaan yang sangat penting, yang mudah terlewatkan, adalah perilaku default dari kedua fungsi ini, dalam hal pengecualian.


Saya akan menggunakan contoh ini untuk mensimulasikan coroutine yang akan memunculkan pengecualian, terkadang -

import asyncio
import random


async def a_flaky_tsk(i):
    await asyncio.sleep(i)  # bit of fuzz to simulate a real-world example

    if i % 2 == 0:
        print(i, "ok")
    else:
        print(i, "crashed!")
        raise ValueError

coros = [a_flaky_tsk(i) for i in range(10)]

await asyncio.gather(*coros) keluaran -

0 ok
1 crashed!
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
    asyncio.run(main())
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
    await asyncio.gather(*coros)
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

Seperti yang Anda lihat, coros setelah indeks 1tidak pernah dapat dieksekusi.


Tetapi await asyncio.wait(coros)terus menjalankan tugas, bahkan jika beberapa di antaranya gagal -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

Tentu saja, perilaku ini dapat diubah untuk keduanya dengan menggunakan -

asyncio.gather(..., return_exceptions=True)

atau,

asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)


Tapi itu tidak berakhir di sini!

Memperhatikan: Task exception was never retrieved di log di atas.

asyncio.wait()tidak akan memunculkan kembali pengecualian dari tugas anak sampai Anda awaitmelakukannya satu per satu. (Pelacakan tumpukan di log hanyalah pesan, mereka tidak dapat ditangkap!)

done, pending = await asyncio.wait(coros)
for tsk in done:
    try:
        await tsk
    except Exception as e:
        print("I caught:", repr(e))

Keluaran -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()

Di sisi lain, untuk menangkap pengecualian asyncio.gather(), Anda harus -

results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
    if isinstance(result_or_exc, Exception):
        print("I caught:", repr(result_or_exc))

(Output yang sama seperti sebelumnya)

Dev Aggarwal
sumber
0

Selain semua jawaban sebelumnya, saya ingin memberi tahu tentang perilaku yang berbeda dari gather()dan wait()seandainya dibatalkan .

Kumpulkan pembatalan

Jika gather()dibatalkan, semua awaitables yang diajukan (yang belum selesai) juga dibatalkan .

Tunggu pembatalan

Jika wait()tugas dibatalkan, itu hanya melemparCancelledError dan tugas yang menunggu tetap utuh.

Contoh sederhana:

import asyncio


async def task(arg):
    await asyncio.sleep(5)
    return arg


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def main():
    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.create_task(asyncio.wait({work_task}))
    await cancel_waiting_task(work_task, waiting)

    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.gather(work_task)
    await cancel_waiting_task(work_task, waiting)


asyncio.run(main())

Keluaran:

asyncio.wait()
Waiting task cancelled
Work result: done
----------------
asyncio.gather()
Waiting task cancelled
Work task cancelled

Terkadang perlu untuk menggabungkan wait()dan gather()fungsionalitas. Misalnya, kami ingin menunggu penyelesaian setidaknya satu tugas dan membatalkan tugas tertunda lainnya setelah itu, dan jika waitingitu sendiri dibatalkan , maka batalkan juga semua yang tertunda tugas yang .

Sebagai contoh nyata, katakanlah kita memiliki acara pemutusan hubungan dan tugas kerja. Dan kami ingin menunggu hasil tugas pekerjaan, tetapi jika koneksi terputus, maka batalkan. Atau kami akan membuat beberapa permintaan paralel, tetapi setelah menyelesaikan setidaknya satu respons, batalkan yang lainnya.

Itu bisa dilakukan dengan cara ini:

import asyncio
from typing import Optional, Tuple, Set


async def wait_any(
        tasks: Set[asyncio.Future], *, timeout: Optional[int] = None,
) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
    tasks_to_cancel: Set[asyncio.Future] = set()
    try:
        done, tasks_to_cancel = await asyncio.wait(
            tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
        )
        return done, tasks_to_cancel
    except asyncio.CancelledError:
        tasks_to_cancel = tasks
        raise
    finally:
        for task in tasks_to_cancel:
            task.cancel()


async def task():
    await asyncio.sleep(5)


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def check_tasks(waiting_task, working_task, waiting_conn_lost_task):
    try:
        await waiting_task
        print("waiting is done")
    except asyncio.CancelledError:
        print("waiting is cancelled")

    try:
        await waiting_conn_lost_task
        print("connection is lost")
    except asyncio.CancelledError:
        print("waiting connection lost is cancelled")

    try:
        await working_task
        print("work is done")
    except asyncio.CancelledError:
        print("work is cancelled")


async def work_done_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def conn_lost_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    connection_lost_event.set()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def cancel_waiting_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    waiting_task.cancel()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def main():
    print("Work done")
    print("-------------------")
    await work_done_case()
    print("\nConnection lost")
    print("-------------------")
    await conn_lost_case()
    print("\nCancel waiting")
    print("-------------------")
    await cancel_waiting_case()


asyncio.run(main())

Keluaran:

Work done
-------------------
waiting is done
waiting connection lost is cancelled
work is done

Connection lost
-------------------
waiting is done
connection is lost
work is cancelled

Cancel waiting
-------------------
waiting is cancelled
waiting connection lost is cancelled
work is cancelled
Alex Noname
sumber