본문 바로가기
Airflow

Airflow DAG에서 다른 DAG 호출하기 (DAG 종속성)

by Sunyoung95 2024. 1. 21.

 조사 동기

프로젝트를 진행하며 하나의 dag에 모든 task를 넣다보니 아래와 같은 문제점이 발생했다.
1. DAG가 복잡해져 가독성이 떨어진다.
2. task를 추가하는 등의 추가 작업이 어렵다
따라서, dag를 분리하여 확장성과 가독성을 향상시킬 필요성을 느꼈다.

목표

1. 이후에 진행되어야 할 dag를 어떻게 호출 할 수 있는 지
2. 이전 dag의 success / fail flag를 어떻게 받아올 지

DAG 종속성

  • dag를 설계할 때 dag끼리 종속성을 갖지 않는 것이 가장 좋지만 어쩔 수 없이 종속성을 만들어야하는 경우가 있다.
  • 예시
    • 두 dag는 종속되지만 일정이 다름
    • 두 dag는 종속되지만 서로 다른 팀에서 소유
    • task는 다른 task에 종속 되지만 excution_date가 다름

타 DAG 호출 방법

SubDAG를 사용하여 DAG 종속성을 처리할 수도 있지만 SubDAG가 성능 문제를 일으킬 수도 있으므로 dag 종속성으로 처리하는 것을 권장.

< TriggerDagRunOperator >

  • 해당 오퍼레이터를 사용하여 동일한 airflow 환경에서 다른 dag를 실행할 수 있다.
  • task에서 종속되어있는 DAG를 호출 시킬 수 있다.
  • 보통 dag branch의 마지막 task로 존재하고 이후 실행될 다른 dag를 실행한다.
  • subDAG를 이상적으로 교체 가능
from datetime import datetime

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

args = {
    'owner': 'syryu',
}

dag = DAG(
    dag_id='trigger_main'
    , default_args=args
    , start_date=datetime(2021, 11, 6, 0, 0, 0)
    , schedule_interval="@once"
    , tags=['trigger']
)

t1 = TriggerDagRunOperator(
    trigger_dag_id='trigger',  ## trigger 라는 dag_id를 가진 dag를 호출한다.
    task_id='trigger',
    execution_date='{{ execution_date }}',
    wait_for_completion=True,  ## 외부 dag가 완료될 때 까지 기다릴 지(기본값 False)
    poke_interval=30,
    allowed_states=['success'], ## 성공으로 판단할 state
    failed_states=['failure'],  ## 실패로 판단할 state
    reset_dag_run=True,
    dag=dag
)

 

< ExternalTaskSensor >

  • 해당 오퍼레이터를 사용하여 동일한 airflow 환경에서 다른 dag를 실행할 수 있다.
  • 상위 dag(부모 dag)가 완료될 때 까지 대기한다.
  • 여러 상위 dag에 종속되는 dag일 경우 유용하게 사용할 수 있다.
  • 종속성이 연결되어있는 dag(부모 dag)에 scheduler가 존재해야한다.
  • sensor이기 때문에 무한정 대기를 원하지 않는다면 mode / timeout을 설정해야한다. 
  • task 혹은 dag가 종료된 다음 실행할 수 있다.
  • 상위 dag(부모 dag)와 schedule_interval이 같아야한다.
from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor

args = {
    'owner': 'brownbear',
}

dag = DAG(
    dag_id='external', default_args=args, start_date=datetime(2021, 11, 6, 0, 0, 0),
    schedule_interval='@once', tags=['external'],
)

downstream_task1 = ExternalTaskSensor(
    task_id="downstream_task1",  ## 확인할 외부 task id, task id가 입력되면 외부 task가 종료되어야 실행을 한다. 입력되지 않으면 외부 dag가 종료될 때까지 대기(기본값 None)
        timeout: 600,
        mode: 'reschedule',
    external_dag_id='external_main',  ## 확인할 외부 dag id
    check_existence=True,  ## 외부 dag id나 task id가 존재하는지 체크, 기본적으로 외부 dag id가 존재하는지 체크하고 external_task_id가 None이 아니면 외부 task_id를 체크한다.(기본값: False)
    allowed_status=['success'],  ## 허용하는 상태값 (기본값 : ['success'])
    failed_states=['failed'],  ## 허용하지 않을 상태값 (기본값 : None)
    excution_delta=datetime.timedelta(days=1) # 부모 dag와 schedule이 다를경우 이 설정값을 통해서 schedule을 맞춰서 부모 dag의 run을 감지
)

start = DummyOperator(
    task_id='start',
    dag=dag
)

b1 = BashOperator(
    task_id='bash',
    bash_command='echo 123',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag
)

start >> downstream_task1 >> b1 >> end

 

 

 

 


참고

https://brownbears.tistory.com/593

 

[Airflow] DAG에서 다른 DAG 호출하기 (DAG 종속성)

dag를 설계할 때 dag끼리 종속성을 갖지 않는 것이 가장 좋지만 어쩔 수 없이 종속성을 만들어야 하는 경우가 있습니다. 아래와 같은 상황일 때, dag의 종속성을 갖는 것이 유용하게 사용됩니다. 두

brownbears.tistory.com

 

댓글