mencoba membuat subdag dinamis dari dag induk berdasarkan array nama file

10

Saya mencoba memindahkan file s3 dari bucket "non-deleting" (artinya saya tidak bisa menghapus file) ke GCS menggunakan aliran udara. Saya tidak dapat dijamin bahwa file baru akan ada di sana setiap hari, tetapi saya harus memeriksa file baru setiap hari.

masalah saya adalah penciptaan dinamis dari subdags. Jika ada file, saya perlu subdags. Jika tidak ada file, saya tidak perlu subdags. Masalah saya adalah pengaturan hulu / hilir. Dalam kode saya, itu mendeteksi file, tetapi tidak memulai subdags sebagaimana mestinya. Saya melewatkan sesuatu.

ini kode saya:

from airflow import models
from  airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging

args = {
    'owner': 'Airflow',
    'start_date': dates.days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_success': True,
}

bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []

parent_dag = models.DAG(
    dag_id='My_Ingestion',
    default_args=args,
    schedule_interval='@daily',
    catchup=False
)

def Check_For_Files(**kwargs):
    s3 = S3Hook(aws_conn_id='S3_BOX')
    s3.get_conn()
    bucket = bucket
    LastBDEXDate = int(Variable.get("last_publish_date"))
    maxdate = LastBDEXDate
    files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
    for file in files:
        print(file)
        print(file.split("_")[-2])
        print(file.split("_")[-2][-8:])  ##proves I can see a date in the file name is ok.
        maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
    if maxdate > LastBDEXDate:
        return 'Start_Process'
    return 'finished'

def create_subdag(dag_parent, dag_id_child_prefix, file_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)

    # dag
    subdag = models.DAG(dag_id=dag_id_child,
              default_args=args,
              schedule_interval=None)

    # operators
    s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
        task_id=dag_id_child,
        bucket=bucket,
        prefix=file_name,
        dest_gcs_conn_id='GCP_Account',
        dest_gcs='gs://my_files/To_Process/',
        replace=False,
        gzip=True,
        dag=subdag)


    return subdag

def create_subdag_operator(dag_parent, filename, index):
    tid_subdag = 'file_{}'.format(index)
    subdag = create_subdag(dag_parent, tid_subdag, filename)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

def create_subdag_operators(dag_parent, file_list):
    subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
    # chain subdag-operators together
    chain(*subdags)
    return subdags

check_for_files = BranchPythonOperator(
    task_id='Check_for_s3_Files',
    provide_context=True,
    python_callable=Check_For_Files,
    dag=parent_dag
)

finished = DummyOperator(
    task_id='finished',
    dag=parent_dag
)

decision_to_continue = DummyOperator(
    task_id='Start_Process',
    dag=parent_dag
)

if len(files) > 0:
    subdag_ops = create_subdag_operators(parent_dag, files)
    check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished


check_for_files >> finished
arcee123
sumber
Pekerjaan apa yang dijalankan di backend DAGS ini adalah sparkpekerjaan ini atau beberapa pythonskrip dan apa yang Anda gunakan untuk menjalankannya seperti livyatau metode lain
ashwin agrawal
Maaf, saya tidak mengerti pertanyaannya. bisakah Anda menyatakan kembali?
arcee123
Maksud saya Anda hanya menggunakan skrip python sederhana dan tidak menggunakan pekerjaan percikan kan?
ashwin agrawal
Iya. operator sederhana yang default dalam aliran udara. Saya ingin menambahkan operator yang ada pada tingkat dinamis berdasarkan file yang ditandai di S3 Saya ingin menelan ke dalam GCS.
arcee123
Mengapa filesdaftar kosong?
Oluwafemi Sule

Jawaban:

3

Di bawah ini adalah cara yang disarankan untuk membuat DAG atau sub-DAG dinamis dalam aliran udara, meskipun ada cara lain juga, tapi saya kira ini akan sangat berlaku untuk masalah Anda.

Pertama, buat file (yaml/csv)yang berisi daftar semua s3file dan lokasi, dalam kasus Anda, Anda telah menulis fungsi untuk menyimpannya dalam daftar, saya akan mengatakan menyimpannya dalam yamlfile terpisah dan memuatnya pada saat dijalankan dalam aliran udara env dan kemudian membuat DAGs.

Di bawah ini adalah contoh yamlfile: dynamicDagConfigFile.yaml

job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
    - File1: 'S3Loc1'
    - File2: 'S3Loc2'
    - File3: 'S3Loc3'

Anda dapat memodifikasi Check_For_Filesfungsi Anda untuk menyimpannya dalam yamlfile.

Sekarang kita bisa beralih ke pembuatan dag dinamis:

Pertama, tentukan dua tugas menggunakan operator dummy, yaitu mulai dan tugas akhir. Tugas-tugas semacam itu adalah tugas-tugas di mana kita akan membangun kita DAGdengan secara dinamis menciptakan tugas-tugas di antara mereka:

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

Dynamic DAG: Kami akan gunakan PythonOperatorsdalam aliran udara. Fungsi harus menerima argumen sebagai id tugas; fungsi python yang akan dieksekusi, yaitu python_callable untuk operator Python; dan seperangkat argumen yang akan digunakan selama eksekusi.

Sertakan argumen task id. Jadi, kita dapat bertukar data di antara tugas-tugas yang dihasilkan secara dinamis, misalnya via XCOM.

Anda dapat menentukan fungsi operasi Anda dalam dag dinamis seperti ini s3_to_gcs_op.

def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Akhirnya berdasarkan lokasi yang ada di file yaml Anda dapat membuat dag dinamis, pertama baca yamlfile seperti di bawah ini dan buat dag dinamis:

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.

Definisi DAG akhir:

Idenya adalah itu

#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks. 
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

Kode aliran udara lengkap agar:

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)



with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.


start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
ashwin agrawal
sumber
Terima kasih banyak. jadi salah satu masalah yang saya alami adalah apa yang terjadi jika tidak ada file baru? salah satu masalah yang saya hadapi, adalah bahwa akan selalu ada file di tempat ini, tetapi tidak dijamin file BARU untuk ditarik, yang berarti bagian upload_s3_toGCStidak akan ada, dan kesalahan dalam aliran udara.
arcee123
Anda dapat memecahkan masalah dengan menghapus file dari yamlfile setelah semua file ini diunggah ke GCS, dengan cara ini hanya file baru yang akan hadir dalam yamlfile. Dan jika tidak ada file baru file yamlakan kosong dan tidak ada dag dinamis yang akan dibuat. Inilah sebabnya mengapa yamlfile adalah pilihan yang jauh lebih baik dibandingkan dengan menyimpan file dalam daftar.
ashwin agrawal
The yamlFile juga akan membantu dalam menjaga logging file s3 dengan cara, jika misalkan beberapa file s3 gagal di-upload ke GCS, maka Anda juga dapat mempertahankan bendera yang sesuai dengan file itu dan kemudian coba lagi ini pada DAG berikutnya run.
ashwin agrawal
Dan jika tidak ada file baru Anda dapat meletakkan ifkondisi sebelum DAG yang akan memeriksa file baru dalam yamlfile jika ada file baru jalankan jika tidak lewati saja.
ashwin agrawal
masalahnya di sini adalah bahwa downstream diatur. jika downstream diatur tanpa pekerjaan yang sebenarnya (karena tidak ada file), itu akan salah.
arcee123