ワークフローエンジンを運用していると、あるワークフローの完了後に別のワークフローを実行したい、という要求が出てくる。

Airflow では、DAG(ワークフロー)間の依存関係を扱う方法が以下の通り複数あるが、どの方法を選ぶのが良いのか一見分かりづらいように思う。1年くらい Airflow を運用して色々試した結果、自分のユースケースだとこれかなという方法が定まったので書いておく。

「DAG_1 の完了後に DAG_2 を実行する」方法👇

  • DAG_2 が ExternalTaskSensor で DAG_1 の完了を待ち受ける
  • DAG_1 が TriggerDagRunOperator で DAG_2 を起動する
  • DAG_2 が SubDagOperator で DAG_1 をタスク(SubDag)として実行する

結論

DAG_2 が ExternalTaskSensor で DAG_1 の完了を待ち受ける のが良い。

メリットは主に、DAG_2 がスケジュール(interval)を持つため DAG_2 にタイムアウトを設定できることと、DAG_2 と DAG_1 両方のコード/タスク構成で他の DAG に依存している/されていることが明らかになること。

特に、Airflow では DAG が interval を持っていないとタイムアウトを設定しても効かないが、タイムアウトが設定できないと運用的にかなり厳しい。「タスクが詰まって DAG が n 日間 running のままでした」とかはやりたくない。また、DAG が interval を持っていると、定期実行の有無によって処理を分けるみたいな時も素直に実装できる。

# DAG 1

from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

with DAG(
    dag_id='dag_1',
    default_args={'start_date': datetime(2020, 1, 1, 0, 0, 0), ...},
    ...) as dag:

        tasks = ... # 実行したいタスク群
        external_task_sensor_target = DummyOperator(task_id='external_task_sensor_target')
        tasks >> external_task_sensor_target
# DAG 2

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.sensors import ExternalTaskSensor

with DAG(
    dag_id='dag_2',
    default_args={'start_date': datetime(2020, 1, 1, 1, 0, 0), ...},
    ...) as dag:

        wait_task_for_dependent_dag = ExternalTaskSensor(
            task_id='wait_task_for_dependent_dag',
            external_dag_id='dag_1',
            external_task_id='external_task_sensor_target',
            execution_delta=timedelta(hours=1))  #  DAG_1 との execution_date の差
        tasks = ... # 実行したいタスク群
        wait_task_for_dependent_dag >> tasks

他の方法を採らない理由

DAG_1 が TriggerDagRunOperator で DAG_2 を起動する 方法は、DAG_2 側のコード/タスク構成から他の DAG に依存していることが分からない、本来は DAG_1 の内容を前提としてタスクを実行することが DAG_2 の責務なのに DAG_2 の起動(DAG_2 が DAG_1 の内容を前提とすること)までが DAG_1 の責務になる、DAG_2 が interval を持たないため DAG にタイムアウトを設定できない上に実際の状態(定期実行している)と DAG のインスタンスの状態(interval を持たない)が乖離する、といった問題がある。

DAG_2 が SubDagOperator で DAG_1 をタスク(SubDag)として実行する 方法は、DAG_1 に依存する DAG が1つに制約される。また、以下に引用する Airflow の SaaS 各社と Airflow のドキュメントにもある通り、単一のタスクとして DAG を実行することによる問題が生じたり、タスクの直列実行が推奨されていて SubDag の実行に時間が掛かったりする。SaaS 各社では SubDagOperator は非推奨になっている。

SubDagOperator の使用はおすすめしません。 SubDagOperator によってカプセル化は実現できますが、SubDag タスクでタスクスロットが必要になります。SubDag タスクを実行しているワーカーが終了すると、SubDag 内のすべてのタスクが失敗し、ワークフローの信頼性が低下します。

ref. DAG(ワークフロー)の作成  |  Cloud Composer  |  Google Cloud

Slots on the worker pool The SubDagOperator kicks off an entire DAG when it is put on a worker slot. Each task in the child DAG takes up a slot until the entire SubDag has completed. The parent operator will take up a worker slot until each child task has completed. This could cause delays in other task processing

ref. Using SubDAGs in Airflow | Apache Airflow Guides

it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG > in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot

ref. Concepts — Airflow Documentation