前に 記事 を書いた通り、Airflow で DAG 間の依存関係を扱う際は、下流の DAG が ExternalTaskSensor で上流の DAG の完了を待ち受けるのが良いと思っている。
この ExternalTaskSensor では、上流の DAG Run を一意に特定するために、上流の DAG ID と、上流の execution_date を得るための「下流と上流の DAG の execution_date の差(execution_delta)」または「上流の DAG の execution_date を返す関数(execution_date_fn)」を与える必要がある。
ただ、実用上は上流の DAG ID を指定するだけで直近の DAG Run を待ち受けるくらい簡単になってほしいので、次の通り PythonOperator と組み合わせてクラス化なりしておくと、 2 つの DAG の interval の大小関係に関わらず(ここが大事)、DAG からは上流の DAG ID を指定するだけで ExternalTaskSensor を使って依存関係を定義できるようになる。