Metode panggilan asinkron dengan Python?

178

Saya bertanya-tanya apakah ada perpustakaan untuk panggilan metode asinkron dengan Python . Akan lebih bagus jika Anda bisa melakukan sesuatu seperti

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

Atau untuk memanggil rutin non-async secara asinkron

def longComputation()
    <code>

token = asynccall(longComputation())

Alangkah baiknya memiliki strategi yang lebih halus sebagai asli dalam inti bahasa. Apakah ini dipertimbangkan?

Stefano Borini
sumber
Pada Python 3.4: docs.python.org/3/library/asyncio.html (ada backport untuk 3.3 dan mengkilap baru asyncdan awaitsintaksis dari 3.5).
jonrsharpe
Tidak ada mekanisme panggilan balik, tetapi Anda dapat mengumpulkan hasil dalam kamus dan didasarkan pada modul multiprosesor Python. Saya yakin Anda dapat menambahkan satu parameter lagi ke fungsi yang didekorasi sebagai panggilan balik. github.com/alex-sherman/deco .
RajaRaviVarma
Untuk memulai. Dokumentasi Resmi - docs.python.org/3/library/concurrency.html
Adarsh ​​Madrecha

Jawaban:

141

Anda dapat menggunakan modul multiprosesing yang ditambahkan dengan Python 2.6. Anda dapat menggunakan kumpulan proses dan kemudian mendapatkan hasil secara tidak sinkron dengan:

apply_async(func[, args[, kwds[, callback]]])

Misalnya:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

Ini hanya satu alternatif. Modul ini menyediakan banyak fasilitas untuk mencapai apa yang Anda inginkan. Juga akan sangat mudah untuk membuat dekorator dari ini.

Lucas S.
sumber
5
Lucas S., contoh Anda tidak berfungsi, sayangnya. Fungsi panggilan balik tidak pernah dipanggil.
DataGreed
6
Mungkin perlu diingat bahwa ini memunculkan proses yang terpisah dan bukannya utas yang terpisah dalam suatu proses. Ini mungkin beberapa implikasi.
user47741
11
Ini berfungsi: result = pool.apply_async (f, [10], callback = finish)
MJ
6
Untuk benar-benar melakukan apa pun secara asinkron dalam python, perlu menggunakan modul multiprosesor untuk menghasilkan proses baru. Hanya membuat utas baru masih tergantung pada Global Interpreter Lock yang mencegah proses python melakukan banyak hal sekaligus.
Drahkar
2
Jika Anda tidak ingin menelurkan proses baru saat menggunakan solusi ini - ubah impor ke from multiprocessing.dummy import Pool. multiprocessing.dummy memiliki perilaku yang sama persis diimplementasikan di atas utas alih-alih proses
Almog Cohen
203

Sesuatu seperti:

import threading

thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done

Lihat dokumentasi di https://docs.python.org/library/threading.html untuk lebih jelasnya.

Drakosha
sumber
1
ya, jika Anda hanya perlu melakukan hal-hal secara serempak, mengapa tidak hanya menggunakan utas? setelah semua thread lebih ringan dari proses
kk1957
22
Catatan penting: implementasi standar (CPython) dari utas tidak akan membantu dengan tugas yang terikat dengan komputasi, karena "Kunci Penerjemah Global". Lihat dokumen perpustakaan: tautan
ikan larut
3
Apakah menggunakan thread.join () benar-benar tidak sinkron? Bagaimana jika Anda tidak ingin memblokir utas (mis. Utas UI) dan tidak menggunakan banyak sumber daya melakukan loop sementara?
Mgamerz
1
@Mgamerz bergabung bersifat sinkron. Anda bisa membiarkan utas untuk memasukkan hasil eksekusi dalam beberapa antrian, atau / dan memanggil panggilan balik. Kalau tidak, Anda tidak tahu kapan itu dilakukan (jika sama sekali).
Drakosha
1
Apakah mungkin untuk memanggil fungsi callback di akhir eksekusi utas seperti yang dapat Anda lakukan dengan multiprocessing.Pool
Reda Drissi
49

Pada Python 3.5, Anda dapat menggunakan generator yang disempurnakan untuk fungsi async.

import asyncio
import datetime

Sintaks generator yang ditingkatkan:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

async/awaitSintaks baru :

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
camabeh
sumber
8
@carnabeh, dapatkah Anda memperluas contoh itu untuk menyertakan fungsi "def longComputation ()" OP? Sebagian besar contoh menggunakan "await asyncio.sleep (1)", tetapi jika longComputation () mengembalikan, katakanlah, double, Anda tidak bisa menggunakan "await longComputation ()".
Hebat
Sepuluh tahun di masa depan dan ini harus menjadi jawaban yang diterima sekarang. Ketika Anda berbicara tentang async di python3.5 + apa yang terlintas dalam pikiran Anda harus kata kunci asyncio dan async.
zeh
31

Itu bukan dalam inti bahasa, tetapi perpustakaan yang sangat matang yang melakukan apa yang Anda inginkan adalah Twisted . Ini memperkenalkan objek Ditangguhkan, yang dapat Anda lampirkan panggilan balik atau penangan kesalahan ("errbacks"). A Deferred pada dasarnya adalah "janji" bahwa suatu fungsi pada akhirnya akan membuahkan hasil.

Meredith L. Patterson
sumber
1
Secara khusus, lihat twisted.internet.defer ( twistedmatrix.com/documents/8.2.0/api/… ).
Nicholas Riley
21

Anda dapat menerapkan dekorator untuk membuat fungsi Anda tidak sinkron, meskipun itu agak rumit. The multiprocessingModul penuh kebiasaan kecil dan pembatasan tampaknya sewenang-wenang - semua alasan untuk merangkum belakang antarmuka yang ramah, meskipun.

from inspect import getmodule
from multiprocessing import Pool


def async(decorated):
    r'''Wraps a top-level function around an asynchronous dispatcher.

        when the decorated function is called, a task is submitted to a
        process pool, and a future object is returned, providing access to an
        eventual return value.

        The future object has a blocking get() method to access the task
        result: it will return immediately if the job is already done, or block
        until it completes.

        This decorator won't work on methods, due to limitations in Python's
        pickling machinery (in principle methods could be made pickleable, but
        good luck on that).
    '''
    # Keeps the original function visible from the module global namespace,
    # under a name consistent to its __name__ attribute. This is necessary for
    # the multiprocessing pickling machinery to work properly.
    module = getmodule(decorated)
    decorated.__name__ += '_original'
    setattr(module, decorated.__name__, decorated)

    def send(*args, **opts):
        return async.pool.apply_async(decorated, args, opts)

    return send

Kode di bawah ini menggambarkan penggunaan dekorator:

@async
def printsum(uid, values):
    summed = 0
    for value in values:
        summed += value

    print("Worker %i: sum value is %i" % (uid, summed))

    return (uid, summed)


if __name__ == '__main__':
    from random import sample

    # The process pool must be created inside __main__.
    async.pool = Pool(4)

    p = range(0, 1000)
    results = []
    for i in range(4):
        result = printsum(i, sample(p, 100))
        results.append(result)

    for result in results:
        print("Worker %i: sum value is %i" % result.get())

Dalam kasus dunia nyata saya akan menjelaskan sedikit lebih banyak pada dekorator, menyediakan beberapa cara untuk mematikannya untuk debugging (sambil menjaga antarmuka masa depan di tempat), atau mungkin fasilitas untuk berurusan dengan pengecualian; tapi saya pikir ini menunjukkan prinsip dengan cukup baik.

xperroni
sumber
Ini harus menjadi jawaban terbaik. Saya suka bagaimana ini bisa mengembalikan nilai. Tidak seperti utas yang hanya berjalan secara tidak sinkron.
Aminah Nuraini
16

Hanya

import threading, time

def f():
    print "f started"
    time.sleep(3)
    print "f finished"

threading.Thread(target=f).start()
Antigluk
sumber
8

Anda bisa menggunakan eventlet. Ini memungkinkan Anda menulis apa yang tampak sebagai kode sinkron, tetapi membuatnya beroperasi secara tidak sinkron melalui jaringan.

Berikut ini contoh crawler super minimal:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
     "https://wiki.secondlife.com/w/images/secondlife.jpg",
     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

import eventlet
from eventlet.green import urllib2

def fetch(url):

  return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
  print "got body", len(body)
Raj
sumber
7

Solusi saya adalah:

import threading

class TimeoutError(RuntimeError):
    pass

class AsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        if self.Thread.isAlive():
            raise TimeoutError()
        else:
            return self.Result

    def run(self, *args, **kwargs):
        self.Result = self.Callable(*args, **kwargs)
        if self.Callback:
            self.Callback(self.Result)

class AsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return AsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return AsyncMethod(fnc, callback)

Dan berfungsi persis seperti yang diminta:

@Async
def fnc():
    pass
Nevyn
sumber
5

Sesuatu seperti ini berfungsi untuk saya, Anda kemudian dapat memanggil fungsi, dan itu akan mengirim dirinya sendiri ke utas baru.

from thread import start_new_thread

def dowork(asynchronous=True):
    if asynchronous:
        args = (False)
        start_new_thread(dowork,args) #Call itself on a new thread.
    else:
        while True:
            #do something...
            time.sleep(60) #sleep for a minute
    return
Nicholas Hamilton
sumber
2

Apakah ada alasan untuk tidak menggunakan utas? Anda bisa menggunakan threadingkelas. Alih-alih finished()fungsi gunakan isAlive(). The result()Fungsi bisa join()benang dan mengambil hasilnya. Dan, jika Anda bisa, menimpa run()dan __init__fungsi untuk memanggil fungsi yang ditentukan dalam konstruktor dan menyimpan nilai di suatu tempat ke instance kelas.

ondra
sumber
2
Jika itu adalah fungsi threading yang mahal secara komputasional tidak akan memberi Anda apa-apa (itu mungkin akan membuat segalanya lebih lambat sebenarnya) karena proses Python terbatas pada satu inti CPU karena GIL.
Kurt
2
@ Kurt, sementara itu benar, OP tidak menyebutkan bahwa kinerja adalah urusannya. Ada alasan lain untuk ingin perilaku asynchronous ...
Peter Hansen
Utas dalam python tidak bagus ketika Anda ingin memiliki opsi untuk membunuh panggilan metode asinkron, karena hanya utas utama dalam python yang menerima sinyal.
CivFan
2

Anda dapat menggunakan concurrent.futures (ditambahkan dengan Python 3.2).

import time
from concurrent.futures import ThreadPoolExecutor


def long_computation(duration):
    for x in range(0, duration):
        print(x)
        time.sleep(1)
    return duration * 2


print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(long_computation, 5)
    while not future.done():
        print('waiting...')
        time.sleep(0.5)

    print(future.result())

print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))

print('waiting for callback')

executor.shutdown(False)  # non-blocking

print('shutdown invoked')
Labu Besar
sumber
Ini adalah jawaban yang sangat bagus, karena satu-satunya di sini yang memberikan kemungkinan threadpool dengan callback
Reda Drissi
Sayangnya, ini juga menderita dari "Global Interpreter Lock". Lihat doc perpustakaan: Link . Diuji dengan Python 3.7
Alex
0

Anda bisa menggunakan proses. Jika Anda ingin menjalankannya selamanya gunakan saat (seperti jaringan) dalam fungsi Anda:

from multiprocessing import Process
def foo():
    while 1:
        # Do something

p = Process(target = foo)
p.start()

jika Anda hanya ingin menjalankannya sekali saja, lakukan seperti itu:

from multiprocessing import Process
def foo():
    # Do something

p = Process(target = foo)
p.start()
p.join()
Keivan
sumber