Cloud Composer へのワークフローのデプロイを簡単に行うために、Go で trinity というツールを作りました。

Cloud Composer とは

Apache Airflow を使ったワークフロー実行環境を構築する GCP のサービスです。私が働いているペパボでは DWH のバックエンドを Treasure Data から GCP に移行中で、それに伴いワークフローサービスも Treasure Workflow (マネージドな Digdag) から Cloud Composer に移行しているところです。

gcloud コマンドを使ったデプロイの概要

Cloud Composer では dags(*1) ディレクトリ配下にワークフローごとのサブディレクトリを切ってワークフローに関連するファイルを入れていきます。gcloud コマンドではこのサブディレクトリ単位またはファイル単位でデプロイ操作を行います。

*1 Airflow ではタスク同士の依存関係を有向非巡回グラフ(Directed Acyclic Graph, DAG)で定義します。

ワークフローを追加または更新する場合は、 gcloud composer emvironments storage dags import で source にサブディレクトリを指定します。ディレクトリに含まれるファイルが Cloud Storage へアップロードされ、Airflow にワークフローが登録されます。

$ gcloud composer environments storage dags import
--environment ENVIRONMENT_NAME
--location LOCATION
--source LOCAL_FILE_TO_UPLOAD

ワークフローを削除する場合は、2段階の操作が必要です。gcloud composer environments storage dags delete でファイルを指定して Cloud Storage から関連ファイルを削除した後に、gcloud composer environments run で Airflow のコマンド delete_dag を実行して Airflow からワークフローを削除します。

$ gcloud composer environments storage dags import
--environment ENVIRONMENT_NAME
--location LOCATION
--source LOCAL_FILE_TO_UPLOAD $ gcloud composer environments run --location LOCATION
ENVIRONMENT_NAME delete_dag -- DAG_NAME

Cloud Composer へのデプロイ時の困りごと

上に述べた通り、gcloud コマンドの実行はワークフローやファイルに対して個別に行う必要があります。また、import は Cloud Storage のファイルを上書きするため、コードベースで削除したファイルは個別に Cloud Storage から削除する必要があります。

しかし、ペパボのワークフローは現時点でも数十個に上り、開発によって日々差分が生まれていきます。これらの差分に対して手作業でデプロイ作業を行うのは非効率かつ不確実であり、差分を機械的に検出して Cloud Composer に冪等・同期的にデプロイしたいというモチベーションが生まれました。

既存の方法による解決の試み

コードベースと Cloud Composer を同期する方法として、gsutil rsync コマンドで手元のコードベースをCloud Storage に同期することと、Git リポジトリを Cloud Storage に同期する ことを検討しましたが、今回のユースケースには合わなかったため見送りました。

gsutil rsync コマンド

ファイルの更新時刻に差異があると同期対象と判定されるため、内容が変更されていない場合も処理対象になってしまいます。また、Cloud Storage に依存することになりますが、Airflow は GCP 以外でも構築できるため、将来的なポータビリティを保てるよう GCP のコマンドに依存することは避けたいと考えました。

Git リポジトリを Cloud Storage に同期する

例えば Airflow がサポートしている kubernetes/git-sync を使う場合は、Airflow の設定ファイルに単一の git リポジトリを指定することになります。本番環境に master のコードを同期するには良さそうですが、開発環境や CI では feature branch のコードをデプロイしたいので見送りました。

trinity による解決の試み

trinity は、コードベースと Cloud Storage と Airflow の3つを同期するために、ディレクトリ構造とファイル内容からワークフロー定義を表すハッシュ値を計算します。コードベースから計算したハッシュ値と、Cloud Storage に保存されているハッシュ値を比較して、異なる場合は同期操作の対象とします

使い方は、Cloud Storage の dags ディレクトリに同期したいディレクトリ、バケット、Cloud Composer 環境を指定するだけです。

$ trinity --src=SOURCE --bucket=BUCKET_NAME
--composer-env=ENV_NAME --composer-location=LOCATION

Go 製なので、クロスコンパイルにより、ペパボ社内で利用されている Mac, Linux, Windows に対応可能になっています。また、ワークフロー単位で並列化が可能なので、将来的に goroutine で並列化することにより処理時間の短縮を図る予定です。

今後について

  • Airflow のデプロイの対象として dags 以外に plugins に対応する
  • dry-run など運用に必要になる機能を追加する
  • 並列化による高速化

などを予定しています。

発表資料

trinity について Fukuoka.go#14+Umeda.go で発表しました。 こちら が発表資料です。

また、発表中の説明に一部誤りがあったので訂正しております。