Saya mencoba memindahkan file s3 dari bucket "non-deleting" (artinya saya tidak bisa menghapus file) ke GCS menggunakan aliran udara. Saya tidak dapat dijamin bahwa file baru akan ada di sana setiap hari, tetapi saya harus memeriksa file baru setiap hari.
masalah saya adalah penciptaan dinamis dari subdags. Jika ada file, saya perlu subdags. Jika tidak ada file, saya tidak perlu subdags. Masalah saya adalah pengaturan hulu / hilir. Dalam kode saya, itu mendeteksi file, tetapi tidak memulai subdags sebagaimana mestinya. Saya melewatkan sesuatu.
ini kode saya:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
python
airflow
directed-acyclic-graphs
arcee123
sumber
sumber
spark
pekerjaan ini atau beberapapython
skrip dan apa yang Anda gunakan untuk menjalankannya sepertilivy
atau metode lainfiles
daftar kosong?Jawaban:
Di bawah ini adalah cara yang disarankan untuk membuat DAG atau sub-DAG dinamis dalam aliran udara, meskipun ada cara lain juga, tapi saya kira ini akan sangat berlaku untuk masalah Anda.
Pertama, buat file
(yaml/csv)
yang berisi daftar semuas3
file dan lokasi, dalam kasus Anda, Anda telah menulis fungsi untuk menyimpannya dalam daftar, saya akan mengatakan menyimpannya dalamyaml
file terpisah dan memuatnya pada saat dijalankan dalam aliran udara env dan kemudian membuat DAGs.Di bawah ini adalah contoh
yaml
file:dynamicDagConfigFile.yaml
Anda dapat memodifikasi
Check_For_Files
fungsi Anda untuk menyimpannya dalamyaml
file.Sekarang kita bisa beralih ke pembuatan dag dinamis:
Pertama, tentukan dua tugas menggunakan operator dummy, yaitu mulai dan tugas akhir. Tugas-tugas semacam itu adalah tugas-tugas di mana kita akan membangun kita
DAG
dengan secara dinamis menciptakan tugas-tugas di antara mereka:Dynamic DAG: Kami akan gunakan
PythonOperators
dalam aliran udara. Fungsi harus menerima argumen sebagai id tugas; fungsi python yang akan dieksekusi, yaitu python_callable untuk operator Python; dan seperangkat argumen yang akan digunakan selama eksekusi.Sertakan argumen
task id
. Jadi, kita dapat bertukar data di antara tugas-tugas yang dihasilkan secara dinamis, misalnya viaXCOM
.Anda dapat menentukan fungsi operasi Anda dalam dag dinamis seperti ini
s3_to_gcs_op
.Akhirnya berdasarkan lokasi yang ada di file yaml Anda dapat membuat dag dinamis, pertama baca
yaml
file seperti di bawah ini dan buat dag dinamis:Definisi DAG akhir:
Idenya adalah itu
Kode aliran udara lengkap agar:
sumber
upload_s3_toGCS
tidak akan ada, dan kesalahan dalam aliran udara.yaml
file setelah semua file ini diunggah ke GCS, dengan cara ini hanya file baru yang akan hadir dalamyaml
file. Dan jika tidak ada file baru fileyaml
akan kosong dan tidak ada dag dinamis yang akan dibuat. Inilah sebabnya mengapayaml
file adalah pilihan yang jauh lebih baik dibandingkan dengan menyimpan file dalam daftar.yaml
File juga akan membantu dalam menjaga logging file s3 dengan cara, jika misalkan beberapa file s3 gagal di-upload ke GCS, maka Anda juga dapat mempertahankan bendera yang sesuai dengan file itu dan kemudian coba lagi ini pada DAG berikutnya run.if
kondisi sebelum DAG yang akan memeriksa file baru dalamyaml
file jika ada file baru jalankan jika tidak lewati saja.