Saya memiliki scenerio seperti di bawah ini:
- Trigger a
Task 1
danTask 2
hanya ketika data baru avialable untuk mereka dalam tabel sumber (Athena). Pemicu untuk Task1 dan Task2 harus terjadi ketika partisi data baru dalam sehari. - Pemicu
Task 3
hanya pada penyelesaianTask 1
danTask 2
- Hanya memicu
Task 4
penyelesaianTask 3
Kode saya
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
Apa cara optimal terbaik untuk mencapainya?
Task1
danTask2
berjalan dalam lingkaran. Bagi saya data dimuat dalam tabel sumber Athena CET 10:00.Jawaban:
Saya yakin pertanyaan Anda membahas dua masalah utama:
schedule_interval
secara eksplisit sehingga @daily menyiapkan sesuatu yang tidak Anda harapkan.jawaban singkat: atur secara eksplisit schedule_interval Anda dengan format tugas cron dan gunakan operator sensor untuk memeriksa dari waktu ke waktu
di mana jam
startime
berapa tugas harian Anda akan dimulai,endtime
berapa kali terakhir dalam sehari Anda harus memeriksa apakah suatu peristiwa dilakukan sebelum ditandai sebagai gagal danpoke_time
adalah interval Andasensor_operator
akan memeriksa apakah peristiwa itu terjadi.Cara menangani pekerjaan cron secara eksplisit setiap kali Anda mengatur dag Anda agar
@daily
seperti yang Anda lakukan:dari dokumen , Anda dapat melihat Anda sedang melakukan:
@daily - Run once a day at midnight
Yang sekarang masuk akal mengapa Anda mendapatkan kesalahan batas waktu, dan gagal setelah 5 menit karena Anda mengatur
'retries': 1
dan'retry_delay': timedelta(minutes=5)
. Jadi ia mencoba menjalankan dag di tengah malam, gagal. coba lagi 5 menit setelah dan gagal lagi, sehingga ditandai sebagai gagal.Jadi pada dasarnya @daily run menetapkan pengaturan tugas cron implisit:
Format tugas cron adalah format di bawah ini dan Anda menetapkan nilainya
*
kapan pun Anda ingin mengatakan "semua".Minute Hour Day_of_Month Month Day_of_Week
Jadi @daily pada dasarnya mengatakan jalankan ini setiap: menit 0 jam 0 dari semua hari_ hari_bulan dari semua bulan semua hari_ hari_ minggu
Jadi casing Anda menjalankan ini setiap: menit 0 jam 10 dari semua days_of_month dari all_months semua days_of_week. Ini menerjemahkan dalam format cron job ke:
Cara memicu dan mencoba kembali eksekusi dag dengan benar ketika Anda bergantung pada peristiwa eksternal untuk menyelesaikan eksekusi
Anda bisa memicu aliran udara dari peristiwa eksternal dengan menggunakan perintah
airflow trigger_dag
. ini akan mungkin jika beberapa cara Anda dapat memicu fungsi / skrip python lambda untuk menargetkan aliran udara Anda.Jika Anda tidak dapat memicu dag secara eksternal, maka gunakan operator sensor seperti OP lakukan, atur poke_time untuk itu dan atur jumlah retries yang cukup tinggi.
sumber
airflow trigger_dag
. ini akan mungkin jika beberapa cara Anda dapat memicu fungsi / skrip python lambda untuk menargetkan aliran udara Anda.AwsGlueCatalogPartitionSensor
bersama dengan perintah aliran udara{{ds_nodash}}
untuk keluar dari partisi. Pertanyaan saya lalu bagaimana cara menjadwalkan ini.