Cara yang tepat untuk membuat alur kerja dinamis di Airflow

99

Masalah

Apakah ada cara di Airflow untuk membuat alur kerja sehingga jumlah tugas B. * tidak diketahui hingga tugas A selesai? Saya telah melihat subdag tetapi sepertinya itu hanya dapat bekerja dengan serangkaian tugas statis yang harus ditentukan saat pembuatan Dag.

Akankah pemicu belati bekerja? Dan jika demikian, bisakah Anda memberikan contoh.

Saya memiliki masalah di mana tidak mungkin mengetahui jumlah tugas B yang akan diperlukan untuk menghitung Tugas C hingga Tugas A selesai. Setiap Tugas B. * membutuhkan waktu beberapa jam untuk dihitung dan tidak dapat digabungkan.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Ide # 1

Saya tidak suka solusi ini karena saya harus membuat ExternalTaskSensor yang memblokir dan semua Tugas B. * akan memakan waktu antara 2-24 jam untuk menyelesaikannya. Jadi saya tidak menganggap ini sebagai solusi yang layak. Tentunya ada cara yang lebih mudah? Atau apakah Airflow tidak dirancang untuk ini?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

Sampai sekarang pertanyaan ini masih belum memiliki jawaban yang bagus . Saya telah dihubungi oleh beberapa orang yang mencari solusi.

costrouc
sumber
Apakah semua tugas B * serupa, sehingga dapat dibuat dalam satu putaran?
Daniel Lee
Ya, semua B. * tugas dapat dibuat dengan cepat dalam satu lingkaran setelah Tugas A selesai. Tugas A membutuhkan waktu sekitar 2 jam untuk menyelesaikannya.
costrouc
Apakah Anda menemukan solusi untuk masalah tersebut? maukah Anda mempostingnya mungkin?
Daniel Dubovski
3
Sumber daya yang berguna untuk Ide # 1: linkedin.com/pulse/…
Juan Riaza
1
Inilah artikel yang saya tulis menjelaskan bagaimana melakukan ini linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine
Kyle Bridenstine

Jawaban:

33

Inilah cara saya melakukannya dengan permintaan serupa tanpa subdag apa pun:

Pertama buat metode yang mengembalikan nilai apa pun yang Anda inginkan

def values_function():
     return values

Selanjutnya buat metode yang akan menghasilkan pekerjaan secara dinamis:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

Dan kemudian gabungkan mereka:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete
Oleg Yamin
sumber
Dimana nilai-nilai didefinisikan?
biarawan
11
Alih-alih for i in values_function()saya akan mengharapkan sesuatu seperti for i in push_func_output. Masalahnya adalah saya tidak dapat menemukan cara untuk mendapatkan keluaran itu secara dinamis. Output dari PythonOperator akan berada di Xcom setelah eksekusi tetapi saya tidak tahu apakah saya dapat mereferensikannya dari definisi DAG.
Ena
@Ena Apakah Anda menemukan cara untuk mencapai itu?
eldos
1
@eldos lihat jawaban saya di bawah
Ena
1
Bagaimana jika kita harus melakukan serangkaian langkah yang bergantung pada langkah dalam loop? Apakah akan ada rantai ketergantungan kedua dalam groupfungsi tersebut?
CodingInCircles
12

Saya telah menemukan cara untuk membuat alur kerja berdasarkan hasil tugas sebelumnya.
Pada dasarnya yang ingin Anda lakukan adalah memiliki dua subdag dengan berikut ini:

  1. Xcom mendorong daftar (atau apa pun yang Anda perlukan untuk membuat alur kerja dinamis nanti) di subdag yang dijalankan pertama kali (lihat test1.py def return_list())
  2. Teruskan objek dag utama sebagai parameter ke subdag kedua Anda
  3. Sekarang jika Anda memiliki objek dag utama, Anda dapat menggunakannya untuk mendapatkan daftar contoh tugasnya. Dari daftar contoh tugas tersebut, Anda dapat memfilter tugas yang dijalankan saat ini dengan menggunakan parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]), seseorang mungkin dapat menambahkan lebih banyak filter di sini.
  4. Dengan contoh tugas itu, Anda dapat menggunakan xcom pull untuk mendapatkan nilai yang Anda butuhkan dengan menentukan dag_id ke salah satu subdag pertama: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Gunakan daftar / nilai untuk membuat tugas Anda secara dinamis

Sekarang saya telah menguji ini di instalasi aliran udara lokal saya dan berfungsi dengan baik. Saya tidak tahu apakah bagian penarik xcom akan mengalami masalah jika ada lebih dari satu contoh dag yang berjalan pada saat yang sama, tetapi Anda mungkin akan menggunakan kunci unik atau sesuatu seperti itu untuk mengidentifikasi xcom secara unik nilai yang Anda inginkan. Satu mungkin bisa mengoptimalkan 3. langkah menjadi 100% yakin untuk mendapatkan tugas tertentu dari dag utama saat ini, tetapi untuk penggunaan saya ini bekerja cukup baik, saya pikir seseorang hanya perlu satu objek task_instance untuk menggunakan xcom_pull.

Saya juga membersihkan xcoms untuk subdag pertama sebelum setiap eksekusi, hanya untuk memastikan bahwa saya tidak mendapatkan nilai yang salah secara tidak sengaja.

Saya sangat buruk dalam menjelaskan, jadi saya harap kode berikut akan membuat semuanya jelas:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

dan alur kerja utama:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2
Christopher Beck
sumber
di Airflow 1.9 ini tidak dimuat saat ditambahkan ke folder DAG, apakah saya melewatkan sesuatu?
Anthony Keane
@AnthonyKeane apakah Anda meletakkan test1.py dan test2.py ke folder bernama subdag di folder dag Anda?
Christopher Beck
Saya melakukannya ya. Menyalin kedua file ke subdag dan menempatkan test.py di folder dag, masih mendapatkan kesalahan ini. Rusak DAG: [/home/airflow/gcs/dags/test.py] Tidak ada modul bernama subdags.test1 Catatan Saya menggunakan Google Cloud Composer (Aliran Udara yang dikelola Google 1.9.0)
Anthony Keane
@AnthonyKeane, apakah ini satu-satunya kesalahan yang Anda lihat di log? Rusak DAG dapat disebabkan oleh subdag yang mengalami kesalahan kompilasi.
Christopher Beck
3
Hai @Christopher Beck, saya menemukan kesalahan SAYA. Saya perlu menambahkan _ _init_ _.pyke folder subdag. kesalahan rookie
Anthony Keane
11

Ya ini mungkin saya telah membuat contoh DAG yang menunjukkan ini.

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

Sebelum Anda menjalankan DAG, buat tiga Variabel Aliran Udara ini

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

Anda akan melihat bahwa DAG berubah dari ini

masukkan deskripsi gambar di sini

Untuk ini setelah itu berlari

masukkan deskripsi gambar di sini

Anda dapat melihat informasi selengkapnya tentang DAG ini di artikel saya tentang membuat Aliran Udara Aliran Kerja Dinamis .

Kyle Bridenstine
sumber
1
Tetapi apa yang terjadi jika Anda memiliki beberapa DagRun dari DAG ini. Apakah mereka semua berbagi Variabel yang sama?
Mar-k
1
Ya, mereka akan menggunakan variabel yang sama; Saya membahas ini di artikel saya di bagian paling akhir. Anda perlu membuat variabel secara dinamis dan menggunakan id dag run dalam nama variabel. Contoh saya sederhana hanya untuk mendemonstrasikan kemungkinan yang dinamis tetapi Anda harus membuatnya berkualitas produksi :)
Kyle Bridenstine
Apakah jembatan diperlukan saat membuat tugas dinamis? Akan membaca artikel Anda sepenuhnya sebentar, tetapi ingin bertanya. Saya berjuang dengan membuat tugas dinamis berdasarkan tugas upstream sekarang, dan saya mulai mencari tahu di mana kesalahan saya. Masalah saya saat ini adalah karena alasan tertentu saya tidak bisa mendapatkan DAG untuk disinkronkan ke DAG-Bag. DAG saya disinkronkan ketika saya menggunakan daftar statis dalam modul, tetapi berhenti ketika saya mengganti daftar statis itu untuk dibuat dari tugas hulu.
lucid_goose
Ini sangat pintar
jvans
1
@jvans terima kasih itu pintar tapi kemungkinan bukan kualitas produksi
Kyle Bridenstine
6

OA: "Apakah ada cara di Airflow untuk membuat alur kerja sedemikian rupa sehingga jumlah tugas B. * tidak diketahui hingga tugas A selesai?"

Jawaban singkatnya adalah tidak. Aliran udara akan membangun aliran DAG sebelum mulai menjalankannya.

Konon kami sampai pada kesimpulan sederhana, yaitu kami tidak memiliki kebutuhan seperti itu. Ketika Anda ingin memparalelkan beberapa pekerjaan, Anda harus mengevaluasi sumber daya yang Anda miliki dan bukan jumlah item untuk diproses.

Kami melakukannya seperti ini: kami secara dinamis membuat sejumlah tugas tetap, katakanlah 10, yang akan membagi pekerjaan. Misalnya jika kita perlu memproses 100 file setiap tugas akan memproses 10 di antaranya. Saya akan memposting kode hari ini.

Memperbarui

Ini kodenya, maaf atas keterlambatannya.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Penjelasan kode:

Di sini kita memiliki satu tugas awal dan satu tugas akhir (keduanya dummy).

Kemudian dari start task dengan for loop kita buat 10 task dengan callable python yang sama. Tugas dibuat dalam fungsi create_dynamic_task.

Untuk setiap python callable, kami meneruskan sebagai argumen jumlah total tugas paralel dan indeks tugas saat ini.

Misalkan Anda memiliki 1000 item untuk diuraikan: tugas pertama akan menerima masukan yang harus mengelaborasi potongan pertama dari 10 potongan. Ini akan membagi 1000 item menjadi 10 bagian dan menguraikan yang pertama.

Ena
sumber
1
Ini adalah solusi yang baik, selama Anda tidak memerlukan tugas khusus per item (seperti kemajuan, hasil, berhasil / gagal,
coba
@Ena parallelTasktidak ditentukan: apakah saya melewatkan sesuatu?
Anthony Keane
2
@AnthonyKeane Ini adalah fungsi python yang harus Anda panggil untuk benar-benar melakukan sesuatu. Seperti yang dikomentari dalam kode itu akan mengambil sebagai masukan jumlah total dan nomor saat ini untuk menguraikan potongan dari total elemen.
Ena
4

Apa yang menurut saya Anda cari adalah membuat DAG secara dinamis. Saya mengalami situasi seperti ini beberapa hari yang lalu setelah beberapa pencarian Saya menemukan blog ini .

Pembuatan Tugas Dinamis

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Mengatur alur kerja DAG

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

Beginilah tampilan DAG kita setelah menyatukan kode masukkan deskripsi gambar di sini

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

Itu sangat membantu harapan penuh Ini juga akan membantu orang lain

Muhammad Bin Ali
sumber
Sudahkah Anda mencapainya sendiri? Saya lelah. Tapi saya gagal.
Newt
Ya, itu berhasil untuk saya. Masalah apa yang Anda hadapi?
Muhammad Bin Ali
1
Saya mendapatkannya. Masalah saya telah terpecahkan. Terima kasih. Saya hanya tidak mendapatkan cara yang benar untuk membaca variabel lingkungan dalam gambar buruh pelabuhan.
Newt
1
bagaimana jika item tabel dapat berubah, sehingga kita tidak dapat menempatkannya dalam file yaml statis?
FrankZhu
Itu sangat tergantung di mana Anda menggunakannya. Meskipun saya akan tertarik dengan apa yang Anda sarankan. @ FrankZhu bagaimana itu harus dilakukan dengan benar?
Muhammad Bin Ali
3

Saya rasa saya telah menemukan solusi yang lebih baik untuk ini di https://github.com/mastak/airflow_multi_dagrun , yang menggunakan antrean sederhana DagRuns dengan memicu beberapa dagruns, mirip dengan TriggerDagRuns . Sebagian besar kredit masuk ke https://github.com/mastak , meskipun saya harus menambal beberapa detail agar berfungsi dengan aliran udara terbaru.

Solusinya menggunakan operator khusus yang memicu beberapa DagRuns :

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

Anda kemudian dapat mengirimkan beberapa dagrun dari fungsi yang dapat dipanggil di PythonOperator Anda, misalnya:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

Saya membuat garpu dengan kode di https://github.com/flinz/airflow_multi_dagrun

flinz
sumber
3

Grafik pekerjaan tidak dibuat pada saat berjalan. Sebaliknya, grafik dibuat saat diambil oleh Airflow dari folder dags Anda. Oleh karena itu, tidak mungkin memiliki grafik yang berbeda untuk pekerjaan setiap kali dijalankan. Anda dapat mengonfigurasi pekerjaan untuk membuat grafik berdasarkan kueri pada waktu muat . Grafik tersebut akan tetap sama untuk setiap proses setelahnya, yang mungkin tidak terlalu berguna.

Anda bisa mendesain grafik yang menjalankan tugas berbeda di setiap proses berdasarkan hasil kueri dengan menggunakan Operator Cabang.

Apa yang telah saya lakukan adalah melakukan pra-konfigurasi sekumpulan tugas dan kemudian mengambil hasil kueri dan mendistribusikannya ke seluruh tugas. Ini mungkin lebih baik karena jika kueri Anda mengembalikan banyak hasil, Anda mungkin tidak ingin membanjiri penjadwal dengan banyak tugas bersamaan. Agar lebih aman, saya juga menggunakan kumpulan untuk memastikan konkurensi saya tidak lepas kendali dengan kueri besar yang tidak terduga.

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################
busuk
sumber
Perhatikan bahwa tampaknya mungkin untuk membuat subdag dengan cepat sebagai hasil dari suatu tugas, namun, sebagian besar dokumentasi tentang subdag yang saya temukan sangat menyarankan untuk menjauh dari fitur itu karena menyebabkan lebih banyak masalah daripada menyelesaikannya umumnya. Saya telah melihat saran bahwa subdag dapat dihapus sebagai fitur bawaan dalam waktu dekat.
busuk
Juga perhatikan bahwa dalam for tasks in tasksloop dalam contoh saya, saya menghapus objek yang saya iterasi. Itu ide yang buruk. Alih-alih dapatkan daftar kunci dan ulangi itu - atau lewati penghapusan. Demikian pula, jika xcom_pull mengembalikan Tidak Ada (bukan daftar atau daftar kosong), perulangan for juga gagal. Seseorang mungkin ingin menjalankan xcom_pull sebelum 'for', dan kemudian memeriksa apakah itu None - atau pastikan setidaknya ada daftar kosong di sana. YMMV. Semoga berhasil!
busuk
1
apa yang ada di dalam open_order_task?
alltej
Anda benar, itu salah ketik dalam contoh saya. Ini harus get_orders_task.set_upstream (). Saya akan memperbaikinya.
busuk
0

Tidak mengerti apa masalahnya?

Berikut adalah contoh standarnya. Sekarang jika di function subdag ganti for i in range(5):denganfor i in range(random.randint(0, 10)): maka semuanya akan bekerja. Sekarang bayangkan operator 'mulai' meletakkan data dalam file, dan alih-alih nilai acak, fungsi akan membaca data ini. Kemudian operator 'start' akan mempengaruhi jumlah tugas.

Masalahnya hanya akan ditampilkan di UI karena saat memasukkan subdag, jumlah tugas akan sama dengan yang terakhir dibaca dari file / database / XCom saat ini. Yang secara otomatis memberikan batasan pada beberapa peluncuran satu dag pada satu waktu.

Denis Shcheglov
sumber
-1

Saya menemukan posting Medium ini yang sangat mirip dengan pertanyaan ini. Namun ini penuh dengan kesalahan ketik, dan tidak berfungsi ketika saya mencoba menerapkannya.

Jawaban saya di atas adalah sebagai berikut:

Jika Anda membuat tugas secara dinamis, Anda harus melakukannya dengan mengulang sesuatu yang tidak dibuat oleh tugas upstream, atau dapat ditentukan secara independen dari tugas itu. Saya belajar bahwa Anda tidak dapat melewatkan tanggal eksekusi atau variabel aliran udara lainnya ke sesuatu di luar template (misalnya, tugas) seperti yang telah ditunjukkan banyak orang sebelumnya. Lihat juga posting ini .

MarMat
sumber
Jika Anda melihat komentar saya, Anda akan melihat bahwa sebenarnya mungkin untuk membuat tugas berdasarkan hasil tugas hulu.
Christopher Beck