Azure Data Factoryを使って、新規・変更済みファイルのみをデータレイク間でコピーする。新規・変更済みファイルのみを移動対象とすることで、無駄なデータ移動が発生せず、料金も抑えることができるようになる。
準備
パイプライン
パラメーターを2つ用意する。このパラメーターは、トリガーから値を受け取る際に必要になる。受け渡された値の期間にファイルの変更があった場合、変更対象となりファイルがコピーされる。今回は次の名称にした。
- LastModified_From
- LastModified_To
アクティビティ
コピーアクティビティを使用し、ソースの設定項目を変更する。シンク側は特に設定する必要はないので省略した。
ソース
ソースに値を設定する。
「ファイルパスの種類」と「再帰的に実行」は、データレイクのルートフォルダを指定し、そこから全てのファイルを調べたいため設定した。この部分は、プロジェクトによって設定を変更してほしい。 「最終変更日時でフィルター」でトリガーから与えられる期間をファイルの変更検知対象とし、この期間に変更があったファイルだけコピーされるようになる。
- ファイルパスの種類
- ワイルドカード ファイル パス
- 最終変更日時でフィルター
- 開始時刻
@{pipeline().parameters.LastModified_From}
- 終了時刻
@{pipeline().parameters.LastModified_To}
- 開始時刻
- 再帰的に実行
- オン
トリガー
トリガーを作成し、作成したパイプラインに紐付ける設定をする。
トリガーの種類はTumblingWindowTrigger
を選択する。タンブリング ウィンドウトリガーで使用できるシステム変数@trigger().outputs.windowStartTime
と@trigger().outputs.windowEndTime
をパイプラインのパラメーターに渡す設定をおこなう。これでトリガー実行されるたびに、パイプラインの開始時刻の@{pipeline().parameters.LastModified_From}
とパイプラインの終了時刻の@{pipeline().parameters.LastModified_To}
が動的に変更されるようになる。
繰り返しを15分に設定した。こうすると15分毎にトリガーが実行される。
- 00:00~00:15
- 00:00~00:30
- 00:30~00:45
- 00:45~01:00
- 01:00~01:15
- ...続く
今回のパイプラインとの動きと関連してみると、各15分間の間に変更があったファイルが変更対象となり、コピーが実施される。トリガーの間隔はデータの更新頻度や想定されるデータ量などプロジェクトにあった値を設定する。
実行
実際に動かしたところを見てみる。
ソース元となるsource
データレイクの中身はルートディレクトリに1つのファイルが配置され、test_system_a
ディレクトリの中に2つファイルが配置されている。
実行すると、シンク先となるdestination
データレイクの中身に、同じ構成かつ同じファイルが配置されている。
1つのファイルに変更があった場合はそのファイル単独で実行される。例えばtest_system_a
ディレクトリの中のtarget_a.txt
だけ変更した場合は、そのファイルのみ最終更新日時が変わっていることがわかる。
注意点
記事のタイトルにもあるように、新規・変更済みファイルのみを対象とするので、ソース元からファイルが削除されても検知されない。これはファイルの更新日時を見てコピーするかしないかを判断しているためである。なので、データレイク間で完全同期を求める用途には適しておらず、そういった場合には料金とデータ量が増えてしまうが完全コピーするようなコピーアクティビティを作成するのがいいと思う。