よしたく blog

ほぼ週刊で記事を書いています

Azure Data Factoryを使って、新規・変更済みファイルのみをデータレイク間でコピーする

Azure Data Factoryを使って、新規・変更済みファイルのみをデータレイク間でコピーする。新規・変更済みファイルのみを移動対象とすることで、無駄なデータ移動が発生せず、料金も抑えることができるようになる。

準備

パイプライン

パラメーターを2つ用意する。このパラメーターは、トリガーから値を受け取る際に必要になる。受け渡された値の期間にファイルの変更があった場合、変更対象となりファイルがコピーされる。今回は次の名称にした。

  • LastModified_From
  • LastModified_To

f:id:yoshitaku_jp:20220107172214p:plain

アクティビティ

コピーアクティビティを使用し、ソースの設定項目を変更する。シンク側は特に設定する必要はないので省略した。

ソース

ソースに値を設定する。

「ファイルパスの種類」と「再帰的に実行」は、データレイクのルートフォルダを指定し、そこから全てのファイルを調べたいため設定した。この部分は、プロジェクトによって設定を変更してほしい。 「最終変更日時でフィルター」でトリガーから与えられる期間をファイルの変更検知対象とし、この期間に変更があったファイルだけコピーされるようになる。

  • ファイルパスの種類
  • 最終変更日時でフィルター
    • 開始時刻
      • @{pipeline().parameters.LastModified_From}
    • 終了時刻
      • @{pipeline().parameters.LastModified_To}
  • 再帰的に実行
    • オン

f:id:yoshitaku_jp:20220107174358p:plain

トリガー

トリガーを作成し、作成したパイプラインに紐付ける設定をする。

トリガーの種類はTumblingWindowTriggerを選択する。タンブリング ウィンドウトリガーで使用できるシステム変数@trigger().outputs.windowStartTime@trigger().outputs.windowEndTimeをパイプラインのパラメーターに渡す設定をおこなう。これでトリガー実行されるたびに、パイプラインの開始時刻の@{pipeline().parameters.LastModified_From}とパイプラインの終了時刻の@{pipeline().parameters.LastModified_To}が動的に変更されるようになる。

f:id:yoshitaku_jp:20220107180710p:plain

繰り返しを15分に設定した。こうすると15分毎にトリガーが実行される。

  • 00:00~00:15
  • 00:00~00:30
  • 00:30~00:45
  • 00:45~01:00
  • 01:00~01:15
  • ...続く

今回のパイプラインとの動きと関連してみると、各15分間の間に変更があったファイルが変更対象となり、コピーが実施される。トリガーの間隔はデータの更新頻度や想定されるデータ量などプロジェクトにあった値を設定する。

実行

実際に動かしたところを見てみる。 ソース元となるsourceデータレイクの中身はルートディレクトリに1つのファイルが配置され、test_system_aディレクトリの中に2つファイルが配置されている。

f:id:yoshitaku_jp:20220109101211p:plain

f:id:yoshitaku_jp:20220109101152p:plain

実行すると、シンク先となるdestinationデータレイクの中身に、同じ構成かつ同じファイルが配置されている。

f:id:yoshitaku_jp:20220109101226p:plain

f:id:yoshitaku_jp:20220109101238p:plain

1つのファイルに変更があった場合はそのファイル単独で実行される。例えばtest_system_aディレクトリの中のtarget_a.txtだけ変更した場合は、そのファイルのみ最終更新日時が変わっていることがわかる。

f:id:yoshitaku_jp:20220109103608p:plain

f:id:yoshitaku_jp:20220109103624p:plain

注意点

記事のタイトルにもあるように、新規・変更済みファイルのみを対象とするので、ソース元からファイルが削除されても検知されない。これはファイルの更新日時を見てコピーするかしないかを判断しているためである。なので、データレイク間で完全同期を求める用途には適しておらず、そういった場合には料金とデータ量が増えてしまうが完全コピーするようなコピーアクティビティを作成するのがいいと思う。