Masalah
Apakah ada cara di Airflow untuk membuat alur kerja sehingga jumlah tugas B. * tidak diketahui hingga tugas A selesai? Saya telah melihat subdag tetapi sepertinya itu hanya dapat bekerja dengan serangkaian tugas statis yang harus ditentukan saat pembuatan Dag.
Akankah pemicu belati bekerja? Dan jika demikian, bisakah Anda memberikan contoh.
Saya memiliki masalah di mana tidak mungkin mengetahui jumlah tugas B yang akan diperlukan untuk menghitung Tugas C hingga Tugas A selesai. Setiap Tugas B. * membutuhkan waktu beberapa jam untuk dihitung dan tidak dapat digabungkan.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Ide # 1
Saya tidak suka solusi ini karena saya harus membuat ExternalTaskSensor yang memblokir dan semua Tugas B. * akan memakan waktu antara 2-24 jam untuk menyelesaikannya. Jadi saya tidak menganggap ini sebagai solusi yang layak. Tentunya ada cara yang lebih mudah? Atau apakah Airflow tidak dirancang untuk ini?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Edit 1:
Sampai sekarang pertanyaan ini masih belum memiliki jawaban yang bagus . Saya telah dihubungi oleh beberapa orang yang mencari solusi.
Jawaban:
Inilah cara saya melakukannya dengan permintaan serupa tanpa subdag apa pun:
Pertama buat metode yang mengembalikan nilai apa pun yang Anda inginkan
def values_function(): return values
Selanjutnya buat metode yang akan menghasilkan pekerjaan secara dinamis:
def group(number, **kwargs): #load the values if needed in the command you plan to execute dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}" return BashOperator( task_id='JOB_NAME_{}'.format(number), bash_command='script.sh {} {}'.format(dyn_value, number), dag=dag)
Dan kemudian gabungkan mereka:
push_func = PythonOperator( task_id='push_func', provide_context=True, python_callable=values_function, dag=dag) complete = DummyOperator( task_id='All_jobs_completed', dag=dag) for i in values_function(): push_func >> group(i) >> complete
sumber
for i in values_function()
saya akan mengharapkan sesuatu sepertifor i in push_func_output
. Masalahnya adalah saya tidak dapat menemukan cara untuk mendapatkan keluaran itu secara dinamis. Output dari PythonOperator akan berada di Xcom setelah eksekusi tetapi saya tidak tahu apakah saya dapat mereferensikannya dari definisi DAG.group
fungsi tersebut?Saya telah menemukan cara untuk membuat alur kerja berdasarkan hasil tugas sebelumnya.
Pada dasarnya yang ingin Anda lakukan adalah memiliki dua subdag dengan berikut ini:
def return_list()
)parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), seseorang mungkin dapat menambahkan lebih banyak filter di sini.dag_id='%s.%s' % (parent_dag_name, 'test1')
Sekarang saya telah menguji ini di instalasi aliran udara lokal saya dan berfungsi dengan baik. Saya tidak tahu apakah bagian penarik xcom akan mengalami masalah jika ada lebih dari satu contoh dag yang berjalan pada saat yang sama, tetapi Anda mungkin akan menggunakan kunci unik atau sesuatu seperti itu untuk mengidentifikasi xcom secara unik nilai yang Anda inginkan. Satu mungkin bisa mengoptimalkan 3. langkah menjadi 100% yakin untuk mendapatkan tugas tertentu dari dag utama saat ini, tetapi untuk penggunaan saya ini bekerja cukup baik, saya pikir seseorang hanya perlu satu objek task_instance untuk menggunakan xcom_pull.
Saya juga membersihkan xcoms untuk subdag pertama sebelum setiap eksekusi, hanya untuk memastikan bahwa saya tidak mendapatkan nilai yang salah secara tidak sengaja.
Saya sangat buruk dalam menjelaskan, jadi saya harap kode berikut akan membuat semuanya jelas:
test1.py
from airflow.models import DAG import logging from airflow.operators.python_operator import PythonOperator from airflow.operators.postgres_operator import PostgresOperator log = logging.getLogger(__name__) def test1(parent_dag_name, start_date, schedule_interval): dag = DAG( '%s.test1' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date, ) def return_list(): return ['test1', 'test2'] list_extract_folder = PythonOperator( task_id='list', dag=dag, python_callable=return_list ) clean_xcoms = PostgresOperator( task_id='clean_xcoms', postgres_conn_id='airflow_db', sql="delete from xcom where dag_id='{{ dag.dag_id }}'", dag=dag) clean_xcoms >> list_extract_folder return dag
test2.py
from airflow.models import DAG, settings import logging from airflow.operators.dummy_operator import DummyOperator log = logging.getLogger(__name__) def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None): dag = DAG( '%s.test2' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date ) if len(parent_dag.get_active_runs()) > 0: test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull( dag_id='%s.%s' % (parent_dag_name, 'test1'), task_ids='list') if test_list: for i in test_list: test = DummyOperator( task_id=i, dag=dag ) return dag
dan alur kerja utama:
test.py
from datetime import datetime from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator from subdags.test1 import test1 from subdags.test2 import test2 DAG_NAME = 'test-dag' dag = DAG(DAG_NAME, description='Test workflow', catchup=False, schedule_interval='0 0 * * *', start_date=datetime(2018, 8, 24)) test1 = SubDagOperator( subdag=test1(DAG_NAME, dag.start_date, dag.schedule_interval), task_id='test1', dag=dag ) test2 = SubDagOperator( subdag=test2(DAG_NAME, dag.start_date, dag.schedule_interval, parent_dag=dag), task_id='test2', dag=dag ) test1 >> test2
sumber
_ _init_ _.py
ke folder subdag. kesalahan rookieYa ini mungkin saya telah membuat contoh DAG yang menunjukkan ini.
import airflow from airflow.operators.python_operator import PythonOperator import os from airflow.models import Variable import logging from airflow import configuration as conf from airflow.models import DagBag, TaskInstance from airflow import DAG, settings from airflow.operators.bash_operator import BashOperator main_dag_id = 'DynamicWorkflow2' args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2), 'provide_context': True } dag = DAG( main_dag_id, schedule_interval="@once", default_args=args) def start(*args, **kwargs): value = Variable.get("DynamicWorkflow_Group1") logging.info("Current DynamicWorkflow_Group1 value is " + str(value)) def resetTasksStatus(task_id, execution_date): logging.info("Resetting: " + task_id + " " + execution_date) dag_folder = conf.get('core', 'DAGS_FOLDER') dagbag = DagBag(dag_folder) check_dag = dagbag.dags[main_dag_id] session = settings.Session() my_task = check_dag.get_task(task_id) ti = TaskInstance(my_task, execution_date) state = ti.current_state() logging.info("Current state of " + task_id + " is " + str(state)) ti.set_state(None, session) state = ti.current_state() logging.info("Updated state of " + task_id + " is " + str(state)) def bridge1(*args, **kwargs): # You can set this value dynamically e.g., from a database or a calculation dynamicValue = 2 variableValue = Variable.get("DynamicWorkflow_Group2") logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue)) logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue)) os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue)) variableValue = Variable.get("DynamicWorkflow_Group2") logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue)) # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460 for i in range(dynamicValue): resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date'])) def bridge2(*args, **kwargs): # You can set this value dynamically e.g., from a database or a calculation dynamicValue = 3 variableValue = Variable.get("DynamicWorkflow_Group3") logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue)) logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue)) os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue)) variableValue = Variable.get("DynamicWorkflow_Group3") logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue)) # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460 for i in range(dynamicValue): resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date'])) def end(*args, **kwargs): logging.info("Ending") def doSomeWork(name, index, *args, **kwargs): # Do whatever work you need to do # Here I will just create a new file os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt') starting_task = PythonOperator( task_id='start', dag=dag, provide_context=True, python_callable=start, op_args=[]) # Used to connect the stream in the event that the range is zero bridge1_task = PythonOperator( task_id='bridge1', dag=dag, provide_context=True, python_callable=bridge1, op_args=[]) DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1") logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1)) for index in range(int(DynamicWorkflow_Group1)): dynamicTask = PythonOperator( task_id='firstGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['firstGroup', index]) starting_task.set_downstream(dynamicTask) dynamicTask.set_downstream(bridge1_task) # Used to connect the stream in the event that the range is zero bridge2_task = PythonOperator( task_id='bridge2', dag=dag, provide_context=True, python_callable=bridge2, op_args=[]) DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2") logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2)) for index in range(int(DynamicWorkflow_Group2)): dynamicTask = PythonOperator( task_id='secondGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['secondGroup', index]) bridge1_task.set_downstream(dynamicTask) dynamicTask.set_downstream(bridge2_task) ending_task = PythonOperator( task_id='end', dag=dag, provide_context=True, python_callable=end, op_args=[]) DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3") logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3)) for index in range(int(DynamicWorkflow_Group3)): # You can make this logic anything you'd like # I chose to use the PythonOperator for all tasks # except the last task will use the BashOperator if index < (int(DynamicWorkflow_Group3) - 1): dynamicTask = PythonOperator( task_id='thirdGroup_' + str(index), dag=dag, provide_context=True, python_callable=doSomeWork, op_args=['thirdGroup', index]) else: dynamicTask = BashOperator( task_id='thirdGroup_' + str(index), bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt', dag=dag) bridge2_task.set_downstream(dynamicTask) dynamicTask.set_downstream(ending_task) # If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream # and your tasks will run simultaneously instead of in your desired stream order. starting_task.set_downstream(bridge1_task) bridge1_task.set_downstream(bridge2_task) bridge2_task.set_downstream(ending_task)
Sebelum Anda menjalankan DAG, buat tiga Variabel Aliran Udara ini
airflow variables --set DynamicWorkflow_Group1 1 airflow variables --set DynamicWorkflow_Group2 0 airflow variables --set DynamicWorkflow_Group3 0
Anda akan melihat bahwa DAG berubah dari ini
Untuk ini setelah itu berlari
Anda dapat melihat informasi selengkapnya tentang DAG ini di artikel saya tentang membuat Aliran Udara Aliran Kerja Dinamis .
sumber
OA: "Apakah ada cara di Airflow untuk membuat alur kerja sedemikian rupa sehingga jumlah tugas B. * tidak diketahui hingga tugas A selesai?"
Jawaban singkatnya adalah tidak. Aliran udara akan membangun aliran DAG sebelum mulai menjalankannya.
Konon kami sampai pada kesimpulan sederhana, yaitu kami tidak memiliki kebutuhan seperti itu. Ketika Anda ingin memparalelkan beberapa pekerjaan, Anda harus mengevaluasi sumber daya yang Anda miliki dan bukan jumlah item untuk diproses.
Kami melakukannya seperti ini: kami secara dinamis membuat sejumlah tugas tetap, katakanlah 10, yang akan membagi pekerjaan. Misalnya jika kita perlu memproses 100 file setiap tugas akan memproses 10 di antaranya. Saya akan memposting kode hari ini.
Memperbarui
Ini kodenya, maaf atas keterlambatannya.
from datetime import datetime, timedelta import airflow from airflow.operators.dummy_operator import DummyOperator args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 8), 'email': ['[email protected]'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(seconds=5) } dag = airflow.DAG( 'parallel_tasks_v1', schedule_interval="@daily", catchup=False, default_args=args) # You can read this from variables parallel_tasks_total_number = 10 start_task = DummyOperator( task_id='start_task', dag=dag ) # Creates the tasks dynamically. # Each one will elaborate one chunk of data. def create_dynamic_task(current_task_number): return DummyOperator( provide_context=True, task_id='parallel_task_' + str(current_task_number), python_callable=parallelTask, # your task will take as input the total number and the current number to elaborate a chunk of total elements op_args=[current_task_number, int(parallel_tasks_total_number)], dag=dag) end = DummyOperator( task_id='end', dag=dag) for page in range(int(parallel_tasks_total_number)): created_task = create_dynamic_task(page) start_task >> created_task created_task >> end
Penjelasan kode:
Di sini kita memiliki satu tugas awal dan satu tugas akhir (keduanya dummy).
Kemudian dari start task dengan for loop kita buat 10 task dengan callable python yang sama. Tugas dibuat dalam fungsi create_dynamic_task.
Untuk setiap python callable, kami meneruskan sebagai argumen jumlah total tugas paralel dan indeks tugas saat ini.
Misalkan Anda memiliki 1000 item untuk diuraikan: tugas pertama akan menerima masukan yang harus mengelaborasi potongan pertama dari 10 potongan. Ini akan membagi 1000 item menjadi 10 bagian dan menguraikan yang pertama.
sumber
parallelTask
tidak ditentukan: apakah saya melewatkan sesuatu?Apa yang menurut saya Anda cari adalah membuat DAG secara dinamis. Saya mengalami situasi seperti ini beberapa hari yang lalu setelah beberapa pencarian Saya menemukan blog ini .
Pembuatan Tugas Dinamis
start = DummyOperator( task_id='start', dag=dag ) end = DummyOperator( task_id='end', dag=dag) def createDynamicETL(task_id, callableFunction, args): task = PythonOperator( task_id = task_id, provide_context=True, #Eval is used since the callableFunction var is of type string #while the python_callable argument for PythonOperators only receives objects of type callable not strings. python_callable = eval(callableFunction), op_kwargs = args, xcom_push = True, dag = dag, ) return task
Mengatur alur kerja DAG
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f: # Use safe_load instead to load the YAML file configFile = yaml.safe_load(f) # Extract table names and fields to be processed tables = configFile['tables'] # In this loop tasks are created for each table defined in the YAML file for table in tables: for table, fieldName in table.items(): # In our example, first step in the workflow for each table is to get SQL data from db. # Remember task id is provided in order to exchange data among tasks generated in dynamic way. get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table), 'getSQLData', {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass', 'dbname': configFile['dbname']}) # Second step is upload data to s3 upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table), 'uploadDataToS3', {'previous_task_id': '{}-getSQLData'.format(table), 'bucket_name': configFile['bucket_name'], 'prefix': configFile['prefix']}) # This is where the magic lies. The idea is that # once tasks are generated they should linked with the # dummy operators generated in the start and end tasks. # Then you are done! start >> get_sql_data_task get_sql_data_task >> upload_to_s3_task upload_to_s3_task >> end
Beginilah tampilan DAG kita setelah menyatukan kode
import yaml import airflow from airflow import DAG from datetime import datetime, timedelta, time from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator start = DummyOperator( task_id='start', dag=dag ) def createDynamicETL(task_id, callableFunction, args): task = PythonOperator( task_id=task_id, provide_context=True, # Eval is used since the callableFunction var is of type string # while the python_callable argument for PythonOperators only receives objects of type callable not strings. python_callable=eval(callableFunction), op_kwargs=args, xcom_push=True, dag=dag, ) return task end = DummyOperator( task_id='end', dag=dag) with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f: # use safe_load instead to load the YAML file configFile = yaml.safe_load(f) # Extract table names and fields to be processed tables = configFile['tables'] # In this loop tasks are created for each table defined in the YAML file for table in tables: for table, fieldName in table.items(): # In our example, first step in the workflow for each table is to get SQL data from db. # Remember task id is provided in order to exchange data among tasks generated in dynamic way. get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table), 'getSQLData', {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass', 'dbname': configFile['dbname']}) # Second step is upload data to s3 upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table), 'uploadDataToS3', {'previous_task_id': '{}-getSQLData'.format(table), 'bucket_name': configFile['bucket_name'], 'prefix': configFile['prefix']}) # This is where the magic lies. The idea is that # once tasks are generated they should linked with the # dummy operators generated in the start and end tasks. # Then you are done! start >> get_sql_data_task get_sql_data_task >> upload_to_s3_task upload_to_s3_task >> end
Itu sangat membantu harapan penuh Ini juga akan membantu orang lain
sumber
Saya rasa saya telah menemukan solusi yang lebih baik untuk ini di https://github.com/mastak/airflow_multi_dagrun , yang menggunakan antrean sederhana DagRuns dengan memicu beberapa dagruns, mirip dengan TriggerDagRuns . Sebagian besar kredit masuk ke https://github.com/mastak , meskipun saya harus menambal beberapa detail agar berfungsi dengan aliran udara terbaru.
Solusinya menggunakan operator khusus yang memicu beberapa DagRuns :
from airflow import settings from airflow.models import DagBag from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from airflow.utils import timezone class TriggerMultiDagRunOperator(TriggerDagRunOperator): CREATED_DAGRUN_KEY = 'created_dagrun_key' @apply_defaults def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs): super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs) self.op_args = op_args or [] self.op_kwargs = op_kwargs or {} def execute(self, context): context.update(self.op_kwargs) session = settings.Session() created_dr_ids = [] for dro in self.python_callable(*self.op_args, **context): if not dro: break if not isinstance(dro, DagRunOrder): dro = DagRunOrder(payload=dro) now = timezone.utcnow() if dro.run_id is None: dro.run_id = 'trig__' + now.isoformat() dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) dr = trigger_dag.create_dagrun( run_id=dro.run_id, execution_date=now, state=State.RUNNING, conf=dro.payload, external_trigger=True, ) created_dr_ids.append(dr.id) self.log.info("Created DagRun %s, %s", dr, now) if created_dr_ids: session.commit() context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids) else: self.log.info("No DagRun created") session.close()
Anda kemudian dapat mengirimkan beberapa dagrun dari fungsi yang dapat dipanggil di PythonOperator Anda, misalnya:
from airflow.operators.dagrun_operator import DagRunOrder from airflow.models import DAG from airflow.operators import TriggerMultiDagRunOperator from airflow.utils.dates import days_ago def generate_dag_run(**kwargs): for i in range(10): order = DagRunOrder(payload={'my_variable': i}) yield order args = { 'start_date': days_ago(1), 'owner': 'airflow', } dag = DAG( dag_id='simple_trigger', max_active_runs=1, schedule_interval='@hourly', default_args=args, ) gen_target_dag_run = TriggerMultiDagRunOperator( task_id='gen_target_dag_run', dag=dag, trigger_dag_id='common_target', python_callable=generate_dag_run )
Saya membuat garpu dengan kode di https://github.com/flinz/airflow_multi_dagrun
sumber
Grafik pekerjaan tidak dibuat pada saat berjalan. Sebaliknya, grafik dibuat saat diambil oleh Airflow dari folder dags Anda. Oleh karena itu, tidak mungkin memiliki grafik yang berbeda untuk pekerjaan setiap kali dijalankan. Anda dapat mengonfigurasi pekerjaan untuk membuat grafik berdasarkan kueri pada waktu muat . Grafik tersebut akan tetap sama untuk setiap proses setelahnya, yang mungkin tidak terlalu berguna.
Anda bisa mendesain grafik yang menjalankan tugas berbeda di setiap proses berdasarkan hasil kueri dengan menggunakan Operator Cabang.
Apa yang telah saya lakukan adalah melakukan pra-konfigurasi sekumpulan tugas dan kemudian mengambil hasil kueri dan mendistribusikannya ke seluruh tugas. Ini mungkin lebih baik karena jika kueri Anda mengembalikan banyak hasil, Anda mungkin tidak ingin membanjiri penjadwal dengan banyak tugas bersamaan. Agar lebih aman, saya juga menggunakan kumpulan untuk memastikan konkurensi saya tidak lepas kendali dengan kueri besar yang tidak terduga.
""" - This is an idea for how to invoke multiple tasks based on the query results """ import logging from datetime import datetime from airflow import DAG from airflow.hooks.postgres_hook import PostgresHook from airflow.operators.mysql_operator import MySqlOperator from airflow.operators.python_operator import PythonOperator, BranchPythonOperator from include.run_celery_task import runCeleryTask ######################################################################## default_args = { 'owner': 'airflow', 'catchup': False, 'depends_on_past': False, 'start_date': datetime(2019, 7, 2, 19, 50, 00), 'email': ['rotten@stackoverflow'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 0, 'max_active_runs': 1 } dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None) totalBuckets = 5 get_orders_query = """ select o.id, o.customer from orders o where o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval and o.is_test = false and o.is_processed = false """ ########################################################################################################### # Generate a set of tasks so we can parallelize the results def createOrderProcessingTask(bucket_number): return PythonOperator( task_id=f'order_processing_task_{bucket_number}', python_callable=runOrderProcessing, pool='order_processing_pool', op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'}, provide_context=True, dag=dag ) # Fetch the order arguments from xcom and doStuff() to them def runOrderProcessing(task_bucket, **context): orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket) if orderList is not None: for order in orderList: logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}") doStuff(**op_kwargs) # Discover the orders we need to run and group them into buckets for processing def getOpenOrders(**context): myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id') # initialize the task list buckets tasks = {} for task_number in range(0, totalBuckets): tasks[f'order_processing_task_{task_number}'] = [] # populate the task list buckets # distribute them evenly across the set of buckets resultCounter = 0 for record in myDatabaseHook.get_records(get_orders_query): resultCounter += 1 bucket = (resultCounter % totalBuckets) tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])}) # push the order lists into xcom for task in tasks: if len(tasks[task]) > 0: logging.info(f'Task {task} has {len(tasks[task])} orders.') context['ti'].xcom_push(key=task, value=tasks[task]) else: # if we didn't have enough tasks for every bucket # don't bother running that task - remove it from the list logging.info(f"Task {task} doesn't have any orders.") del(tasks[task]) return list(tasks.keys()) ################################################################################################### # this just makes sure that there aren't any dangling xcom values in the database from a crashed dag clean_xcoms = MySqlOperator( task_id='clean_xcoms', mysql_conn_id='airflow_db', sql="delete from xcom where dag_id='{{ dag.dag_id }}'", dag=dag) # Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our # query returns fewer results than we have buckets, we don't try to run them all. # Unfortunately I couldn't get BranchPythonOperator to take a list of results like the # documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now. get_orders_task = PythonOperator( task_id='get_orders', python_callable=getOpenOrders, provide_context=True, dag=dag ) get_orders_task.set_upstream(clean_xcoms) # set up the parallel tasks -- these are configured at compile time, not at run time: for bucketNumber in range(0, totalBuckets): taskBucket = createOrderProcessingTask(bucketNumber) taskBucket.set_upstream(get_orders_task) ###################################################################################################
sumber
for tasks in tasks
loop dalam contoh saya, saya menghapus objek yang saya iterasi. Itu ide yang buruk. Alih-alih dapatkan daftar kunci dan ulangi itu - atau lewati penghapusan. Demikian pula, jika xcom_pull mengembalikan Tidak Ada (bukan daftar atau daftar kosong), perulangan for juga gagal. Seseorang mungkin ingin menjalankan xcom_pull sebelum 'for', dan kemudian memeriksa apakah itu None - atau pastikan setidaknya ada daftar kosong di sana. YMMV. Semoga berhasil!open_order_task
?Tidak mengerti apa masalahnya?
Berikut adalah contoh standarnya. Sekarang jika di function subdag ganti
for i in range(5):
denganfor i in range(random.randint(0, 10)):
maka semuanya akan bekerja. Sekarang bayangkan operator 'mulai' meletakkan data dalam file, dan alih-alih nilai acak, fungsi akan membaca data ini. Kemudian operator 'start' akan mempengaruhi jumlah tugas.Masalahnya hanya akan ditampilkan di UI karena saat memasukkan subdag, jumlah tugas akan sama dengan yang terakhir dibaca dari file / database / XCom saat ini. Yang secara otomatis memberikan batasan pada beberapa peluncuran satu dag pada satu waktu.
sumber
Saya menemukan posting Medium ini yang sangat mirip dengan pertanyaan ini. Namun ini penuh dengan kesalahan ketik, dan tidak berfungsi ketika saya mencoba menerapkannya.
Jawaban saya di atas adalah sebagai berikut:
Jika Anda membuat tugas secara dinamis, Anda harus melakukannya dengan mengulang sesuatu yang tidak dibuat oleh tugas upstream, atau dapat ditentukan secara independen dari tugas itu. Saya belajar bahwa Anda tidak dapat melewatkan tanggal eksekusi atau variabel aliran udara lainnya ke sesuatu di luar template (misalnya, tugas) seperti yang telah ditunjukkan banyak orang sebelumnya. Lihat juga posting ini .
sumber