Saya menulis aplikasi di Flask, yang bekerja dengan sangat baik kecuali yang WSGI
sinkron dan memblokir. Saya memiliki satu tugas khususnya yang memanggil API pihak ketiga dan tugas tersebut dapat memakan waktu beberapa menit untuk diselesaikan. Saya ingin melakukan panggilan itu (sebenarnya ini adalah serangkaian panggilan) dan membiarkannya berjalan. sementara kontrol dikembalikan ke Flask.
Pandangan saya terlihat seperti:
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
# do stuff
return Response(
mimetype='application/json',
status=200
)
Sekarang, yang ingin saya lakukan adalah memiliki garis
final_file = audio_class.render_audio()
jalankan dan sediakan callback untuk dieksekusi saat metode kembali, sementara Flask bisa terus memproses permintaan. Ini adalah satu-satunya tugas yang saya perlukan Flask untuk dijalankan secara asinkron, dan saya ingin beberapa saran tentang cara terbaik untuk mengimplementasikannya.
Saya telah melihat Twisted dan Klein, tetapi saya tidak yakin mereka berlebihan, karena mungkin Threading sudah cukup. Atau mungkin seledri adalah pilihan yang bagus untuk ini?
sumber
Jawaban:
Saya akan menggunakan Celery untuk menangani tugas asinkron untuk Anda. Anda harus memasang pialang untuk melayani sebagai antrian tugas Anda (RabbitMQ dan Redis disarankan).
app.py
:from flask import Flask from celery import Celery broker_url = 'amqp://guest@localhost' # Broker URL for RabbitMQ task queue app = Flask(__name__) celery = Celery(app.name, broker=broker_url) celery.config_from_object('celeryconfig') # Your celery configurations in a celeryconfig.py @celery.task(bind=True) def some_long_task(self, x, y): # Do some long task ... @app.route('/render/<id>', methods=['POST']) def render_script(id=None): ... data = json.loads(request.data) text_list = data.get('text_list') final_file = audio_class.render_audio(data=text_list) some_long_task.delay(x, y) # Call your async task and pass whatever necessary variables return Response( mimetype='application/json', status=200 )
Jalankan aplikasi Flask Anda, dan mulai proses lain untuk menjalankan pekerja seledri Anda.
Saya juga akan merujuk kepada Miguel Gringberg ini menulis up untuk lebih dalam panduan mendalam untuk menggunakan Seledri dengan Flask.
sumber
Threading adalah solusi lain yang mungkin. Meskipun solusi berbasis Celery lebih baik untuk aplikasi dalam skala besar, jika Anda tidak mengharapkan terlalu banyak lalu lintas pada titik akhir yang dipermasalahkan, threading adalah alternatif yang layak.
Solusi ini didasarkan pada presentasi PyCon 2016 Flask at Scale milik Miguel Grinberg , khususnya slide 41 di dek slide miliknya. Kodenya juga tersedia di github bagi mereka yang tertarik dengan sumber aslinya.
Dari perspektif pengguna, kode berfungsi sebagai berikut:
Untuk mengonversi panggilan api menjadi tugas latar belakang, cukup tambahkan dekorator @async_api.
Berikut adalah contoh lengkapnya:
from flask import Flask, g, abort, current_app, request, url_for from werkzeug.exceptions import HTTPException, InternalServerError from flask_restful import Resource, Api from datetime import datetime from functools import wraps import threading import time import uuid tasks = {} app = Flask(__name__) api = Api(app) @app.before_first_request def before_first_request(): """Start a background thread that cleans up old tasks.""" def clean_old_tasks(): """ This function cleans up old tasks from our in-memory data structure. """ global tasks while True: # Only keep tasks that are running or that finished less than 5 # minutes ago. five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60 tasks = {task_id: task for task_id, task in tasks.items() if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago} time.sleep(60) if not current_app.config['TESTING']: thread = threading.Thread(target=clean_old_tasks) thread.start() def async_api(wrapped_function): @wraps(wrapped_function) def new_function(*args, **kwargs): def task_call(flask_app, environ): # Create a request context similar to that of the original request # so that the task can have access to flask.g, flask.request, etc. with flask_app.request_context(environ): try: tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs) except HTTPException as e: tasks[task_id]['return_value'] = current_app.handle_http_exception(e) except Exception as e: # The function raised an exception, so we set a 500 error tasks[task_id]['return_value'] = InternalServerError() if current_app.debug: # We want to find out if something happened so reraise raise finally: # We record the time of the response, to help in garbage # collecting old tasks tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow()) # close the database session (if any) # Assign an id to the asynchronous task task_id = uuid.uuid4().hex # Record the task, and then launch it tasks[task_id] = {'task_thread': threading.Thread( target=task_call, args=(current_app._get_current_object(), request.environ))} tasks[task_id]['task_thread'].start() # Return a 202 response, with a link that the client can use to # obtain task status print(url_for('gettaskstatus', task_id=task_id)) return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)} return new_function class GetTaskStatus(Resource): def get(self, task_id): """ Return status about an asynchronous task. If this request returns a 202 status code, it means that task hasn't finished yet. Else, the response from the task is returned. """ task = tasks.get(task_id) if task is None: abort(404) if 'return_value' not in task: return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)} return task['return_value'] class CatchAll(Resource): @async_api def get(self, path=''): # perform some intensive processing print("starting processing task, path: '%s'" % path) time.sleep(10) print("completed processing task, path: '%s'" % path) return f'The answer is: {path}' api.add_resource(CatchAll, '/<path:path>', '/') api.add_resource(GetTaskStatus, '/status/<task_id>') if __name__ == '__main__': app.run(debug=True)
sumber
Anda juga dapat mencoba menggunakan
multiprocessing.Process
withdaemon=True
; yangprocess.start()
metode tidak memblokir dan Anda dapat kembali respon / statusnya segera ke pemanggil saat mengeksekusi fungsi mahal Anda di latar belakang.Saya mengalami masalah serupa saat bekerja dengan kerangka falcon dan menggunakan
daemon
proses membantu.Anda perlu melakukan hal berikut:
from multiprocessing import Process @app.route('/render/<id>', methods=['POST']) def render_script(id=None): ... heavy_process = Process( # Create a daemonic process with heavy "my_func" target=my_func, daemon=True ) heavy_process.start() return Response( mimetype='application/json', status=200 ) # Define some heavy function def my_func(): time.sleep(10) print("Process finished")
Anda harus segera mendapatkan tanggapan dan, setelah 10-an Anda akan melihat pesan tercetak di konsol.
CATATAN: Perlu diingat bahwa
daemonic
proses tidak diizinkan untuk menelurkan proses anak apa pun.sumber
/render/<id>
titik akhir mengharapkan sesuatu sebagai hasilnyamy_func()
?my_func
mengirim respons / detak jantung ke beberapa titik akhir lainnya misalnya. Atau Anda dapat membuat dan berbagi beberapa antrian pesan yang dapat Anda gunakan untuk berkomunikasimy_func