よしたく blog

ほぼ週刊で記事を書いています

Azure Data Factory でデータレイクから最新日付のファイルを取得する

f:id:yoshitaku_jp:20210827174443p:plain Azureを使ってデータ分析基盤を構築すると、Azure Data Lake Storage Gen2にデータを溜めていく事が多いです。 この日々溜まっていくデータから最新日付のファイルのみを取り出したいときに使えるテクニックです。

前提条件

前提条件として、XXX_yyyyMMdd.csvZZZ_yyyyMMddHHmmss.csvといった、同じファイル名の末尾に日時の情報がくっついているデータを取得するケースとします。

準備

パイプライン

まずはパイプラインに変数を用意します。今回は最新日付のファイルだったので、latest_filename としました。

次にアクティビティを用意します。 今回使うのはGetMetadata変数の設定です。2つとも「全般」の中に存在しています。

変数とアクティビティの設定が終わると、画像のようになります。

f:id:yoshitaku_jp:20210827170605p:plain

データセット

ファイルが置いてあるディレクトリへのデータセットを作成します。

ファイルパスの部分には「ディレクトリ」を設定し、「ファイル」の設定はおこないません。

f:id:yoshitaku_jp:20210827171509p:plain

GetMetadataアクティビティに、データセットを設定し、フィールドリストに子項目を設定します。

f:id:yoshitaku_jp:20210827171654p:plain

変数の設定

変数の設定アクティビティの変数タブへ移動し、下記の設定をします。

  • 名前
    • latest_filename
    • @last(activity('get_files').output['childItems'])['name']

ここでは、latest_filenameという変数に @last(activity('get_files').output['childItems'])['name']の値を入れています。 分解していくと、activity('get_files')は一つ前のアクティビティです。activity('get_files').output['childItems']は一つ前のアクティビティの出力結果の子要素、今回はactivity('get_files')のデータセットに、フォルダを指定していたので複数のファイル群になります。 その複数のファイル群の中からlast関数の実行結果である最後のファイルの['name']latest_filenameという変数に格納していることになります。

実行

今回はフォルダ配下に、2ファイル配置しました。test_20210609110816.csvが取得できれば良さそうです。デバッグ実行してみます。

f:id:yoshitaku_jp:20210827173418p:plain

get_filesアクティビティの出力結果は2ファイル取れています。

f:id:yoshitaku_jp:20210827173534p:plain

set_filenameアクティビティの出力結果は1ファイル、そして最新のファイルの結果が取れています。

f:id:yoshitaku_jp:20210827173554p:plain

まとめ

Azure Data Factory でデータレイクから最新日付のファイルを取得してみました。 デフォルトの機能をうまく使いこなすことで、少し込み入ったことも出来るようになりそうです。

Reposでブランチを削除する方法

f:id:yoshitaku_jp:20210827153230p:plain

どこから削除ができるのか探してしまったのでメモ。

まずはReposの「Files」に、現在いるものとする。

f:id:yoshitaku_jp:20210827151938p:plain

左のReposメニューから「Branches」を選択する。

f:id:yoshitaku_jp:20210827152059p:plain

削除したいブランチの右に配置されている点から「Delete branch」を選択する。

f:id:yoshitaku_jp:20210827152410p:plain

確認画面が表示されるので削除したければ「Delete」をクリックする。

f:id:yoshitaku_jp:20210827152549p:plain

ブランチが削除される。

f:id:yoshitaku_jp:20210827152715p:plain

Azure Data FactoryのGit運用で困ったら取り入れてみること

Azure Data Factoryでは、パイプライン開発をバージョン管理するためにGit連携ができます。 GUIでの開発なのにバージョン管理できるのは、Data Factoryの実態がJSONファイルになっているからです。

Azure Data FactoryとAzure DevOpsなどを連携すると、デフォルトではmasterブランチとadf_publishブランチの2つが用意されます。 masterブランチは通常の開発として利用されるブランチになっていて、adf_publishブランチはAzure Data Factoryにおける本番リリース用となっています。

図解すると下記のようになります。 1人の開発者が1つのシステム連携を作成しているときは、シンプルで使いやすいものになります。

f:id:yoshitaku_jp:20210822180235p:plain

次に2人の開発者が2つのシステム連携を作成しているときは、どうでしょうか。 別々のシステム連携を作成しているとき、開発中は特に問題にはなりません。

f:id:yoshitaku_jp:20210822180650p:plain

しかし、adf_publishブランチへ本番リリースをしたいタイミングでは事情が変わってきます。 本番リリースをする際にはmasterブランチの作成物すべてが正常に動くか検証し、adf_publishブランチへマージします。 つまり、下記画像のように1人の開発者が作成物を本番リリースしたくても、もうひとりが開発中で中途半端な状態だとadf_publishブランチへマージすることができません。 最後まで完成させてもらうか、エラーが出ない切りのいい部分まで進めて貰う必要があります。 エラーが出ない切りのいい部分まで進めてもらったとしても、中途半端な成果物が本番リリースへと運ばれてしまいます。

f:id:yoshitaku_jp:20210822182415p:plain

この状態を回避するために、masterブランチから各システムごとのブランチを作成します。 この状態で開発をすすめることで、Bシステムでの開発が終わっていなくても、Aブランチの作業だけmasterに反映し、開発済みのキレイな作成物だけをadf_publishブランチへ持っていくことができます。

f:id:yoshitaku_jp:20210822183229p:plain

まとめ

Azure Data Factoryを使い始めた人は、Gitの部分よりもAzure Data Factoryになれるために、ひとまずGitを横へ置いてどんどん開発していくと思います。 自分も開発に慣れてきて、さらには新しい人も入ってきて開発のスピードを上げていこうとしたタイミングで起こりうる問題として、解決策を取り上げてみました。 まずは開発のスピードを落とさず、問題が起きないようにするための第一歩として、参考になれば幸いです。 ここからさらにGit-flowといったものへ目を移していけばいいと思っています。

リレーショナルデータベースにおけるカーディナリティについて

カーディナリティは、データベースのカラムに入っているデータの種類がどれぐらい存在しているかを表す。 もともとの英単語の訳としては濃度という訳が当てはまる。

例えば、genderカラムが存在していたとする。単純な話としたいので、このカラムに入ってくる値は男か女とする。

id gender
1
2

10人分のデータがあったとしても入っている種類は男か女で種類が2つしかない。

id gender
1
2
3
4
5
6
7
8
9
10

こういった場合はデータの種類が少ない、つまりカーディナリティが低いと言います。

もう一つ、カーディナリティが低い例として血液型がある。 bloodカラムがあったとして、ここに入ってくるデータは4種類。 つまり - A - B - O - AB

となる。

id blood
1 O
2 A
3 AB
4 AB
5 B
6 A
7 A
8 O
9 AB
10 B

逆にカーディナリティが高い状態はgenderカラムとbloodカラムの説明でつけたidカラムを言う。 idカラムは1から10の数値を示している。 このように1から順番に上がっていくサロゲートキーのような数値は、1から10まで種類があるのでカーディナリティが高い状態と言える。

ストレングスファインダーをやってみた

「さあ、才能(じぶん)に目覚めよう 新版 ストレングス・ファインダー2.0」を買って、実践してみた。

この本は自分の才能に気がつき、今後の人生をより良い方向へ進める指針となるものが分かる本になっている。 目次は次のようになっている。

  • 第I部 まず、あなたの強みを見つけよう
  • 第II部 あなたの強みを活用しよう――34の資質と行動アイデア

1部では、なぜ才能に気づかなければいけないかが端的に書いてある。この部分は30ページほどだが、すぐにストレングスファインダーを受けたくなるように駆り立てる感じがあった。 2部では、ストレングスファインダーを実施し自分の資質を把握したあとに読むものとなっている。全て読んでも良いものなのだろうが、自分の資質とは関係がないのでひとまずは読まなくても良さそう。 「この資質を持っている人との付き合い方」といったような項目もあるので、友人と結果をシェアした際に読むと盛り上がりそうだ。

実際のストレングスファインダーは巻末にあるアクセスコードを使ってWebサイトから実施する。実施の時間は40分目安と書いてあり、テンポよく実施したつもりだったが自分は35分ほどかかった。深く考えないように1問20秒の制限もあるが、深く考えすぎる進めていくほうが良さそう。

結果

自分の5つの項目は以下となった。

  1. 原点思考
  2. 回復思考
  3. 社交性
  4. 慎重さ
  5. 学習欲

この結果にはあまり驚いておらず、普段自分自身に対して思っていることと同じで安堵する気持ちがあった。

原点思考は過去をふりかえる資質になる。過去を振り返ることで計画段階の原型だったり、その中から意図を知ることができる。 今の段階の現状を知らされたときに、まず最初に聞くことは「なぜ」「どうして」といった言葉が多いことに改めて気がついた。 何事にも軌跡があり、過去と現在があるからこそ、未来を描けていけるようなイメージを自分は持っている。 ソフトウェア開発においても、「昔はこうだったが、今はこうなっている、理由はこういうことだからである」と言われたほうがすんなり入っていくし、頭にも残っていることが多い。 こういった特性を改めて認識できると、例えば本を読んで学習する際にも歴史的な流れが記されているものを手にとったほうが良さそうといったことが自分の中でわかってくる。

回復思考は、要は問題解決することが好きという資質になる。 現在もエンジニアとして仕事をしているが、IT技術で課題を解決していくということでとてもあっていると感じた。 逆に言うと、問題解決が伴わない単純に作業などは苦手であることを再認識できた。できればそういうことからは離れていこうと思った。 今後、進んでいくべき方向としては、知識を高めてスキルをどうやって高めていくか、それをどうやって問題解決能力に転換していくのかが、人生単位では必要になっていくんじゃないかと思った。

まとめ

ストレングスファインダーの紹介と、想像がつかない「原点思考」と「回復思考」についてどんな事が書かれており、それを受けて自分がどう感じたかを書いてみた。 自己分析だったり、今後の人生の指針に悩んでいる人がいたら参考になると思うのでぜひやってほしいと思った。

Azure SQL Databaseの監査ログを取得する

Azure SQL Databaseでは監査ログをかんたんに取得することができる。文字通り監査のタイミングで必要になるので、基本的には取得しておいたほうがいいものになる。Azure SQL Databaseではデータベースのイベントを追跡し、Blobストレージに監査ログを書き込む。

SQL Databaseにアクセスし、「セキュリティ」の「監査」へアクセスする。 「Azure SQL 監査を有効にする」をオンにする。

f:id:yoshitaku_jp:20210730220615p:plain

その後、出力先の「ストレージ」にチェックを入れ、対象のBlobストレージがある「サブスクリプション」と「Blobストレージ」を選択する。

f:id:yoshitaku_jp:20210730220420p:plain

設定が完了するとBlobストレージに専用のコンテナが作成されている。

f:id:yoshitaku_jp:20210730221344p:plain

Apache AirflowのDAGファイルの最小設定

Apache Airflowで自作ファイルを作成しようとしたが、設定できる項目多く迷うことが多かった。そこでチュートリアルで用意されているものから最低限必要なものを抜き出してみた。 それが以下になる。

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

with DAG(
    "hello_airflow",
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
) as dag:

    t1 = BashOperator(
        task_id="hello_world",
        bash_command="date",
    )

t1

DAGの中の設定値

with DAGの中には次のものがないと、正しく設定ファイルを読み取ってくれなかったり、エラーなる。エラーはAirflowの管理画面に表示されるので認識しやすい。しかし、値を正しく読み取ってくれないパターンはわかりやすく表示してくれるわけではないので注意が必要ということがわかった。

  • DAGの名称
  • 実行する間隔
  • 最初の実行日

これらはDAG Detailsの中に表示される。

f:id:yoshitaku_jp:20210725225025p:plain

DAGの後

as dagのあとにコロンが付いていて、そのまま実際におこなう処理を記述する。直列で処理をつなげることもできるし、並列で動かすこともできる。今回の例ではわかりやすくするために1つとしている。また、処理の中に記述するものも最低限のものに絞った。

まとめ

まずは感嘆なDAGファイルを作成した。このあと、後続に処理をつなげたり、並列実行してみたり、また様々なデータソースへもつないでいってみようと思う。

Apache Airflowのチュートリアルを実行してみた

Airbnb 社が開発し、今は Apache ソフトウェア財団のトッププロジェクトとなっている Apache Airflow。 今回は業務の中でワークフロー製品を扱っていることもあり、OSS の Airflow の感触を確かめるべく触ってみた。

準備

Airflow の公式ページでインストール方法が 2 つ記されている。

  • ローカル PC での起動
  • Docker での起動

airflow.apache.org

Docker が用意されているので、Docker で実行する。公式が用意してくれているのはとても助かる!

mkdir workspace/sandbox-airflowとして今回のお試しディレクトリを作成し、そこにdocker-compose.yamlをダウンロードした。

2021/07/18 現在用意されていたものを貼っておく。最新版は公式のページから確認を! curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.2/docker-compose.yaml'

起動

docker-compose upでコンテナは 7 つ起動する。

  • airflow-init
    • Airflow を初期化するためのコンテナ
  • airflow-scheduler
    • すべてのタスクと DAG(Direct acyclic graph)を監視し、依存関係が完了するとタスクインスタンスをトリガーするコンテナ
      • DAG についての詳しい説明はこちらに載っていたのでリンクさせておく
  • airflow-webserver
  • airflow-worker
    • スケジューラーによって指定されたタスクを実行するワーカーのコンテナ
  • flower
    • 環境を監視するための GUI アプリケーション flower のコンテナ
    • http://localhost:5555 でアクセスができる
  • postgres
  • redis

自分は Mac で実行したが、Airflow を実行している間はファンが大きく回ったのでマシンスペックは必要かもしれない。

サンプルアプリ

http://localhost:8080 にアクセスすると、ログイン画面が表示される。

このチュートリアル用 Docker ではユーザ名パスワードともに airflow なので、こちらを入力する。

  • ユーザ名
    • airflow
  • パスワード
    • airflow

f:id:yoshitaku_jp:20210718094043p:plain

ログインが完了すると、ワークフローが並んでいる画面になる。

f:id:yoshitaku_jp:20210718094059p:plain

一番上にある「example_bash_operator」をクリックすると、詳細の画面に移って確認することができる。 これはすでに実行済みの画面なので、初回アクセスとは情報に差分があるかもしれない。

f:id:yoshitaku_jp:20210718094114p:plain

グラフの形で実行順序を確認もできる。

f:id:yoshitaku_jp:20210718094134p:plain

カレンダーの形で実行日を記録もされている。 GitHubの草の用な感じで表示されるので馴染み深い部分がある。

f:id:yoshitaku_jp:20210718094149p:plain

この画面は設定の詳細の画面になる。 Filepathの部分に/home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_bash_operator.pyがある。 ここに、このサンプルワークフローの設定が入っている。 これは自分がなかなか見つけられなかったものなので、参考になれば嬉しい。

f:id:yoshitaku_jp:20210718094202p:plain

【Pandas】GroupBy.first関数の動きを確認する

Pandas のGroupBy.first を使うと、グループの中で一番最初の値を取得できる。

pandas.pydata.org

以前SQLの分析関数であるFIRST_VALUEとの動きを確認した。Pandasでどのように実現していくのか見ていくものになる。

yoshitaku-jp.hatenablog.com

実行環境

今回は環境構築がいらないGoogle Colaboratoryを使った。

colab.research.google.com

準備と確認

まずはPandasのインポートと、はじめに使用するデータを用意する。 データはFIRST_VALUE関数で確認したものと同じものにした。

import pandas as pd

dict1=dict(position=['FW','FW','MF','MF','MF','MF','DF','DF','DF','DF','GK'], score=[10,5,4,3,3,2,3,2,1,0,0])
df1 = pd.DataFrame(data=dict1)
   position  score
0        FW     10
1        FW      5
2        MF      4
3        MF      3
4        MF      3
5        MF      2
6        DF      3
7        DF      2
8        DF      1
9        DF      0
10       GK      0

GroupBy.firstを実行する

そのままGroupBy.first関数を実行し結果を確認する。

df1.groupby(['position'])['score'].first()
position
DF     3
FW    10
GK     0
MF     4
Name: score, dtype: int64

グループ化された中で一番最初の値だけが戻り値となっている。 SQLはデータの集合に対して関数を適用し、列を返していたがPandasでは該当の値だけが返ってくる。

ドキュメントにも

Returns Series or DataFrame Computed first of values within each group.

戻り値 系列またはデータフレーム 各グループ内の値の最初の値を計算します。

と書かれていて、動きとしては間違っていないことがわかる。

GroupBy.first関数の値を元のデータフレームに戻してみる

取り出した最初の値を、データフレームにし、カラム名も付与する。

first_score_df = df1.groupby(["position"]).first().reset_index()
first_score_df.columns = ["position", "first_score"]
first_score_df

その後、Pandasのmergeでデータフレーム同士をinner joinで結合すれば完成する。

pd.merge(df1, first_score_df, on="position")
   position  score  first_score
0        FW     10           10
1        FW      5           10
2        MF      4            4
3        MF      3            4
4        MF      3            4
5        MF      2            4
6        DF      3            3
7        DF      2            3
8        DF      1            3
9        DF      0            3
10       GK      0            0

まとめ

GroupBy.first関数を使いながら動きを確認できた。 LAST_VALUE 関数のように使いたい場合はGroupBy.lastを使うと逆の動きになってくれる。

pandas.pydata.org

👇SQLのFIRST_VALUE関数とLAST_VALUE関数はこちら👇

yoshitaku-jp.hatenablog.com

【Pandas】shift関数の動きを確認する

Pandas のshift を使うと、現在の行の値と前後の行の値を比較できる。

pandas.pydata.org

以前SQLの分析関数であるLAGとLEADの動きを確認したが、Pandasではどのように実現していくのか見ていくものになる。LAG関数とLEAD関数も比較して見てもらえると!

yoshitaku-jp.hatenablog.com

実行環境

今回は環境構築がいらないGoogle Colaboratoryを使った。

colab.research.google.com

準備と確認

まずはPandasのインポートと、はじめに使用するデータを用意する。 データはLAG関数で確認したものと同じものにした。

import pandas as pd

dict1=dict(id=[1,2,3], Money=[1000,2000,3000], day=['2021-01-01','2021-02-01','2021-03-01'])
df1 = pd.DataFrame(data=dict1)
df1
   id  Money         day
0   1   1000  2021-01-01
1   2   2000  2021-02-01
2   3   3000  2021-03-01

shiftを実行する

そのままshift関数を実行し結果を確認する。

df1.shift()
    id   Money         day
0  NaN     NaN         NaN
1  1.0  1000.0  2021-01-01
2  2.0  2000.0  2021-02-01

ここでわかることは、1行まるごとshiftされていることである。 id列、Money列、day列全てが1行分ズレていている。 単純に実行しただけではSQLのLAG関数のように「特定列だけズレる」ようには動いてくれないことがわかった。

特定の列だけズラす

特定の列だけズラすために、データを変更する。 変更としては、

  • idとMoneyを6個分増やす
  • day列を削除し、代わりの日付データをインデックスとして再定義した。

shift関数へもパラメータを設定する変更を加えている。 今回はperiodsとfreqを指定する。

公式ドキュメントからDeepL翻訳したものを掲載する。 periodsはLAG関数のoffsetと同じもので、ズラす数を書く。1と書けば1つズレるし、3と書けば3つ分ズレることになる。

periods int シフトする期間の数。正または負を指定できます。

freqはLAG関数には無いものになる。 時系列データに対して、freqでずらす期間を指定できる。 コードの中ではDを指定していて、日単位でズラすようにしている。

freq DateOffset, tseries.offsets, timedelta, or str, optional tseriesモジュールまたはタイムルールから使用するオフセット(例:'EOM')。freqが指定された場合、インデックス値はシフトされますが、データは再調整されません。つまり、シフト時にインデックスを拡張し、>元のデータを維持したい場合は、freqを使用します。freqが "infer "として指定された場合は、インデックスのfreqまたはinferred_freq属性から推測されます。これらの属性が存在しない場合、ValueErrorが投げられます。

実行しているコードにコメントを差し込んでいるので、実際に何をコードを確認してほしい。

# ディクショナリでidとMoneyをdict2変数に定義
dict2=dict(id=[1,2,3,4,5,6], Money=[1000,2000,3000,4000,5000,6000])

# date_range関数で2021-01-01から6日分をday変数に格納
day=pd.date_range("2021-01-01", periods=6)

# dict2とdayを元にDataFrameを作成
df2 = pd.DataFrame(data=dict2,index=day)

# periodsで1ずつ、さらにfreq="D"で日付ごとズラすことを指定し、Money列の値を取得、lag列に入れる
df2['lag'] = df2.shift(periods=1,freq="D")['Money']
df2
            id  Money     lag
2021-01-01   1   1000     NaN
2021-01-02   2   2000  1000.0
2021-01-03   3   3000  2000.0
2021-01-04   4   4000  3000.0
2021-01-05   5   5000  4000.0
2021-01-06   6   6000  5000.0

グループ内でshiftする

LAG関数はpartition byでグループ化したものをズラすことが出来た。 こちらもshift関数では少し工夫が必要になる。

データはDate列を戻し、グループ化させたいので下記データを2つずつ用意した。

  • 2021-01-01
  • 2021-02-01
  • 2021-03-01

再びコードの中で何をしているかはコメントを差し込んでいるので確認してほしい。

# ディクショナリでidとMoneyとDateをdict3変数に定義
dict3=dict(id=[1,2,3,4,5,6], Money=[1000,2000,3000,4000,5000,6000], Date=['2021-01-01','2021-01-01','2021-02-01','2021-02-01','2021-03-01','2021-03-01'])

# dict3を元にDataFrameを作成
df3 = pd.DataFrame(data=dict3)

# df3のDate列を対象にgroupbyを実行、さらにMoney列をshiftしたものをlag列に入れる
df3['lag'] = df3.groupby(['Date'])['Money'].shift()
df3

無事にpartition byと同じことが出来ている事がわかる。

   id  Money        Date     lag
0   1   1000  2021-01-01     NaN
1   2   2000  2021-01-01  1000.0
2   3   3000  2021-02-01     NaN
3   4   4000  2021-02-01  3000.0
4   5   5000  2021-03-01     NaN
5   6   6000  2021-03-01  5000.0

NaNの代わりに値を設定する

shift(fill_value=0)のようにパラメータのfill_valueを使うと、NaNにデフォルトの値としてセットすることができる。 今回は0を設定している。

dict3=dict(id=[1,2,3,4,5,6], Money=[1000,2000,3000,4000,5000,6000], Date=['2021-01-01','2021-01-01','2021-02-01','2021-02-01','2021-03-01','2021-03-01'])
df3 = pd.DataFrame(data=dict3)
df3['lag'] = df3.groupby(['Date'])['Money'].shift(fill_value=0)
df3
   id  Money        Date   lag
0   1   1000  2021-01-01     0
1   2   2000  2021-01-01  1000
2   3   3000  2021-02-01     0
3   4   4000  2021-02-01  3000
4   5   5000  2021-03-01     0
5   6   6000  2021-03-01  5000

まとめ

shift関数を使いながら動きを確認できた。 LEAD 関数のように使いたい場合はshift(-1)のようにマイナス値を設定すると上にずれてくれる。

dict4=dict(id=[1,2,3,4,5,6], Money=[1000,2000,3000,4000,5000,6000], Date=['2021-01-01','2021-01-01','2021-02-01','2021-02-01','2021-03-01','2021-03-01'])
df4 = pd.DataFrame(data=dict4)
df4['lag'] = df4.groupby(['Date'])['Money'].shift(-1,fill_value=0)
print(df4)
   id  Money        Date   lag
0   1   1000  2021-01-01  2000
1   2   2000  2021-01-01     0
2   3   3000  2021-02-01  4000
3   4   4000  2021-02-01     0
4   5   5000  2021-03-01  6000
5   6   6000  2021-03-01     0

そのまま手を動かせるものにしたので、実行して確認してもらえれば👌

👇SQLのLAG関数とLEAD関数はこちら👇

yoshitaku-jp.hatenablog.com