前に 記事 を書いた通り、Airflow で DAG 間の依存関係を扱う際は、下流の DAG が ExternalTaskSensor で上流の DAG の完了を待ち受けるのが良いと思っている。

この ExternalTaskSensor では、上流の DAG Run を一意に特定するために、上流の DAG ID と、上流の execution_date を得るための「下流と上流の DAG の execution_date の差(execution_delta)」または「上流の DAG の execution_date を返す関数(execution_date_fn)」を与える必要がある。

ただ、実用上は上流の DAG ID を指定するだけで直近の DAG Run を待ち受けるくらい簡単になってほしいので、次の通り PythonOperator と組み合わせてクラス化なりしておくと、 2 つの DAG の interval の大小関係に関わらず(ここが大事)、DAG からは上流の DAG ID を指定するだけで ExternalTaskSensor を使って依存関係を定義できるようになる。

続きを読む

ワークフローエンジンを運用していると、あるワークフローの完了後に別のワークフローを実行したい、という要求が出てくる。

Airflow では、DAG(ワークフロー)間の依存関係を扱う方法が以下の通り複数あるが、どの方法を選ぶのが良いのか一見分かりづらいように思う。1年くらい Airflow を運用して色々試した結果、自分のユースケースだとこれかなという方法が定まったので書いておく。

続きを読む

プロフィール画像

HIROKA Zaitsu

🎮 Fallout 76, Splatoon 3 / 🚙 SUBARU WRX S4

Data Scientist at GMO Pepabo, Inc.

Fukuoka, Japan