Bagaimana memicu tugas Aliran Udara hanya ketika partisi / data baru dalam avialable di tabel AWS athena menggunakan DAG di python?

9

Saya memiliki scenerio seperti di bawah ini:

  1. Trigger a Task 1dan Task 2hanya ketika data baru avialable untuk mereka dalam tabel sumber (Athena). Pemicu untuk Task1 dan Task2 harus terjadi ketika partisi data baru dalam sehari.
  2. Pemicu Task 3hanya pada penyelesaian Task 1danTask 2
  3. Hanya memicu Task 4penyelesaianTask 3

masukkan deskripsi gambar di sini

Kode saya

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

Apa cara optimal terbaik untuk mencapainya?

pankaj
sumber
apakah Anda mengalami masalah dengan solusi ini?
Bernardo stearns Reisen
@ Bernardostearnsreisen, Terkadang Task1dan Task2berjalan dalam lingkaran. Bagi saya data dimuat dalam tabel sumber Athena CET 10:00.
pankaj
terjadi pada loop yang Anda maksud, aliran udara mencoba kembali Task1 dan Task2 berkali-kali sampai berhasil?
Bernardo stearns Reisen
@Bernardostearnsreisen, ya persis
pankaj
1
@Bernardostearnsreisen, saya tidak tahu bagaimana cara menghadiahkan hadiah :)
pankaj

Jawaban:

1

Saya yakin pertanyaan Anda membahas dua masalah utama:

  1. lupa mengonfigurasi schedule_intervalsecara eksplisit sehingga @daily menyiapkan sesuatu yang tidak Anda harapkan.
  2. Cara memicu dan mencoba kembali eksekusi dag dengan benar ketika Anda bergantung pada peristiwa eksternal untuk menyelesaikan eksekusi

jawaban singkat: atur secara eksplisit schedule_interval Anda dengan format tugas cron dan gunakan operator sensor untuk memeriksa dari waktu ke waktu

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

di mana jam startimeberapa tugas harian Anda akan dimulai, endtimeberapa kali terakhir dalam sehari Anda harus memeriksa apakah suatu peristiwa dilakukan sebelum ditandai sebagai gagal dan poke_timeadalah interval Anda sensor_operatorakan memeriksa apakah peristiwa itu terjadi.

Cara menangani pekerjaan cron secara eksplisit setiap kali Anda mengatur dag Anda agar@dailyseperti yang Anda lakukan:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

dari dokumen , Anda dapat melihat Anda sedang melakukan: @daily - Run once a day at midnight

Yang sekarang masuk akal mengapa Anda mendapatkan kesalahan batas waktu, dan gagal setelah 5 menit karena Anda mengatur 'retries': 1dan 'retry_delay': timedelta(minutes=5). Jadi ia mencoba menjalankan dag di tengah malam, gagal. coba lagi 5 menit setelah dan gagal lagi, sehingga ditandai sebagai gagal.

Jadi pada dasarnya @daily run menetapkan pengaturan tugas cron implisit:

@daily -> Run once a day at midnight -> 0 0 * * *

Format tugas cron adalah format di bawah ini dan Anda menetapkan nilainya *kapan pun Anda ingin mengatakan "semua".

Minute Hour Day_of_Month Month Day_of_Week

Jadi @daily pada dasarnya mengatakan jalankan ini setiap: menit 0 jam 0 dari semua hari_ hari_bulan dari semua bulan semua hari_ hari_ minggu

Jadi casing Anda menjalankan ini setiap: menit 0 jam 10 dari semua days_of_month dari all_months semua days_of_week. Ini menerjemahkan dalam format cron job ke:

0 10 * * *

Cara memicu dan mencoba kembali eksekusi dag dengan benar ketika Anda bergantung pada peristiwa eksternal untuk menyelesaikan eksekusi

  1. Anda bisa memicu aliran udara dari peristiwa eksternal dengan menggunakan perintah airflow trigger_dag. ini akan mungkin jika beberapa cara Anda dapat memicu fungsi / skrip python lambda untuk menargetkan aliran udara Anda.

  2. Jika Anda tidak dapat memicu dag secara eksternal, maka gunakan operator sensor seperti OP lakukan, atur poke_time untuk itu dan atur jumlah retries yang cukup tinggi.

Bernardo stearns Reisen
sumber
Terima kasih untuk ini. Juga jika saya ingin memicu tugas-tugas berdasarkan pada acara daripada waktu yaitu segera partisi data baru dapat ditransfer dalam sumber `AWS Athena Tables` tugas berikutnya harus dipicu. Lalu bagaimana cara saya menjadwalkan. Apakah kode saya saat ini cukup tepat.
pankaj
@ Pankaj, saya hanya melihat dua alternatif. Saya tidak tahu banyak tentang aws athena, tetapi Anda bisa memicu aliran udara dari peristiwa eksternal dengan menggunakan perintah airflow trigger_dag. ini akan mungkin jika beberapa cara Anda dapat memicu fungsi / skrip python lambda untuk menargetkan aliran udara Anda.
Bernardo stearns Reisen
alternatif lain kurang lebih adalah apa yang Anda lakukan, karena Anda tidak memiliki pemicu berbasis peristiwa, Anda perlu memeriksa secara berkala jika peristiwa ini terjadi. Jadi menggunakan solusi saat ini akan menetapkan tugas cron untuk rentang jam menjalankan dag dalam frekuensi tinggi ... banyak yang akan gagal tetapi akan dapat menangkap cukup cepat setelah peristiwa itu terjadi
Bernardo stearns reisen
@Bernado, saya telah menemukan paket di Airflow yang dipanggil AwsGlueCatalogPartitionSensorbersama dengan perintah aliran udara {{ds_nodash}}untuk keluar dari partisi. Pertanyaan saya lalu bagaimana cara menjadwalkan ini.
pankaj
@ Benado, Dapatkah Anda melihat kode saya di mana saya telah menerapkan cek yang disebutkan di atas dan memberikan masukan Anda
pankaj