Airflow で上流の DAG ID を指定するだけで直近の DAG Run を待ち受けるタスク群を作る

前に 記事 を書いた通り、Airflow で DAG 間の依存関係を扱う際は、下流の DAG が ExternalTaskSensor で上流の DAG の完了を待ち受けるのが良いと思っている。

この ExternalTaskSensor では、上流の DAG Run を一意に特定するために、上流の DAG ID と、上流の execution_date を得るための「下流と上流の DAG の execution_date の差(execution_delta)」または「上流の DAG の execution_date を返す関数(execution_date_fn)」を与える必要がある。

ただ、実用上は上流の DAG ID を指定するだけで直近の DAG Run を待ち受けるくらい簡単になってほしいので、次の通り PythonOperator と組み合わせてクラス化なりしておくと、 2 つの DAG の interval の大小関係に関わらず(ここが大事)、DAG からは上流の DAG ID を指定するだけで ExternalTaskSensor を使って依存関係を定義できるようになる。

  1. PythonOperator で、2 つの DAG の execution_delta を計算する
    • PythonOperator で実行する関数には任意の引数を与えられるため、上流の DAG ID を DAG から指定できる
    • PythonOperator で実行する関数には context を引数に与えられる = 下流の execution_date を与えられるため、下流の interval が上流の interval より小さい場合も execution_delta を計算できる(後述)
  2. ExternalTaskSensor の execution_date_fn 引数に与える関数で、 delta と下流の execution_date から上流の execution_date を計算する
    • PythonOperator で計算した execution_delta は xcom 経由で受け取る
      • timedelta, relativedelta は xcom で扱えないので json にして扱う

書いてみるとシンプルだけど、個人的に ExternalTaskSensor の引数は execution_delta に慣れていて、execution_date_fn の使い方に戸惑った(仕事で話題に上るまで存在も忘れてた)ので、誰かの役に立つかもということで書いておく。

補足: ExternalTaskSensor の引数と、依存関係にある DAG の interval の大小関係の話

ExternalTaskSensor では、上流の DAG Run を一意に特定するために、次のいずれかの引数を与える必要がある。

  • execution_delta に下流と上流の DAG の execution_date の差を与える
  • execution_date_fn に「上流の DAG の execution_date を返す関数」を与える

前者の execution_delta 引数を使うのは比較的わかりやすくて、直近で実行した上流 DAG を待ち受ける前提だと、「下流と上流の DAG の interval が同じ」か「下流の interval が上流の interval より大きい」場合は 2 つの DAG の start_date と interval から execution_delta をタスク外で計算できるので、運用している interval の組み合わせがこれらだけであれば execution_delta 引数だけ使えばよい。

一方、「下流の interval が上流の interval より小さい」場合は実行ごとに execution_delta が変わるため、タスク内で下流の DAG の execution_date を用いて execution_delta を計算する必要がある。execution_date_fn に与える関数は引数に「自身(下流)の execution_date と context」を取れるのでこれを使いたくなるが、「上流の DAG のスケジュール」は引数に取れないので、PythonOperator と組み合わせる必要があるのでした。