よしたく blog

ほぼ週刊で記事を書いています

Azure SQL Databaseの監査ログを取得する

Azure SQL Databaseでは監査ログをかんたんに取得することができる。文字通り監査のタイミングで必要になるので、基本的には取得しておいたほうがいいものになる。Azure SQL Databaseではデータベースのイベントを追跡し、Blobストレージに監査ログを書き込む。

SQL Databaseにアクセスし、「セキュリティ」の「監査」へアクセスする。 「Azure SQL 監査を有効にする」をオンにする。

f:id:yoshitaku_jp:20210730220615p:plain

その後、出力先の「ストレージ」にチェックを入れ、対象のBlobストレージがある「サブスクリプション」と「Blobストレージ」を選択する。

f:id:yoshitaku_jp:20210730220420p:plain

設定が完了するとBlobストレージに専用のコンテナが作成されている。

f:id:yoshitaku_jp:20210730221344p:plain

Apache AirflowのDAGファイルの最小設定

Apache Airflowで自作ファイルを作成しようとしたが、設定できる項目多く迷うことが多かった。そこでチュートリアルで用意されているものから最低限必要なものを抜き出してみた。 それが以下になる。

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

with DAG(
    "hello_airflow",
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
) as dag:

    t1 = BashOperator(
        task_id="hello_world",
        bash_command="date",
    )

t1

DAGの中の設定値

with DAGの中には次のものがないと、正しく設定ファイルを読み取ってくれなかったり、エラーなる。エラーはAirflowの管理画面に表示されるので認識しやすい。しかし、値を正しく読み取ってくれないパターンはわかりやすく表示してくれるわけではないので注意が必要ということがわかった。

  • DAGの名称
  • 実行する間隔
  • 最初の実行日

これらはDAG Detailsの中に表示される。

f:id:yoshitaku_jp:20210725225025p:plain

DAGの後

as dagのあとにコロンが付いていて、そのまま実際におこなう処理を記述する。直列で処理をつなげることもできるし、並列で動かすこともできる。今回の例ではわかりやすくするために1つとしている。また、処理の中に記述するものも最低限のものに絞った。

まとめ

まずは感嘆なDAGファイルを作成した。このあと、後続に処理をつなげたり、並列実行してみたり、また様々なデータソースへもつないでいってみようと思う。

Apache Airflowのチュートリアルを実行してみた

Airbnb 社が開発し、今は Apache ソフトウェア財団のトッププロジェクトとなっている Apache Airflow。 今回は業務の中でワークフロー製品を扱っていることもあり、OSS の Airflow の感触を確かめるべく触ってみた。

準備

Airflow の公式ページでインストール方法が 2 つ記されている。

  • ローカル PC での起動
  • Docker での起動

airflow.apache.org

Docker が用意されているので、Docker で実行する。公式が用意してくれているのはとても助かる!

mkdir workspace/sandbox-airflowとして今回のお試しディレクトリを作成し、そこにdocker-compose.yamlをダウンロードした。

2021/07/18 現在用意されていたものを貼っておく。最新版は公式のページから確認を! curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.2/docker-compose.yaml'

起動

docker-compose upでコンテナは 7 つ起動する。

  • airflow-init
    • Airflow を初期化するためのコンテナ
  • airflow-scheduler
    • すべてのタスクと DAG(Direct acyclic graph)を監視し、依存関係が完了するとタスクインスタンスをトリガーするコンテナ
      • DAG についての詳しい説明はこちらに載っていたのでリンクさせておく
  • airflow-webserver
  • airflow-worker
    • スケジューラーによって指定されたタスクを実行するワーカーのコンテナ
  • flower
    • 環境を監視するための GUI アプリケーション flower のコンテナ
    • http://localhost:5555 でアクセスができる
  • postgres
  • redis

自分は Mac で実行したが、Airflow を実行している間はファンが大きく回ったのでマシンスペックは必要かもしれない。

サンプルアプリ

http://localhost:8080 にアクセスすると、ログイン画面が表示される。

このチュートリアル用 Docker ではユーザ名パスワードともに airflow なので、こちらを入力する。

  • ユーザ名
    • airflow
  • パスワード
    • airflow

f:id:yoshitaku_jp:20210718094043p:plain

ログインが完了すると、ワークフローが並んでいる画面になる。

f:id:yoshitaku_jp:20210718094059p:plain

一番上にある「example_bash_operator」をクリックすると、詳細の画面に移って確認することができる。 これはすでに実行済みの画面なので、初回アクセスとは情報に差分があるかもしれない。

f:id:yoshitaku_jp:20210718094114p:plain

グラフの形で実行順序を確認もできる。

f:id:yoshitaku_jp:20210718094134p:plain

カレンダーの形で実行日を記録もされている。 GitHubの草の用な感じで表示されるので馴染み深い部分がある。

f:id:yoshitaku_jp:20210718094149p:plain

この画面は設定の詳細の画面になる。 Filepathの部分に/home/airflow/.local/lib/python3.6/site-packages/airflow/example_dags/example_bash_operator.pyがある。 ここに、このサンプルワークフローの設定が入っている。 これは自分がなかなか見つけられなかったものなので、参考になれば嬉しい。

f:id:yoshitaku_jp:20210718094202p:plain

【Pandas】GroupBy.first関数の動きを確認する

Pandas のGroupBy.first を使うと、グループの中で一番最初の値を取得できる。

pandas.pydata.org

以前SQLの分析関数であるFIRST_VALUEとの動きを確認した。Pandasでどのように実現していくのか見ていくものになる。

yoshitaku-jp.hatenablog.com

実行環境

今回は環境構築がいらないGoogle Colaboratoryを使った。

colab.research.google.com

準備と確認

まずはPandasのインポートと、はじめに使用するデータを用意する。 データはFIRST_VALUE関数で確認したものと同じものにした。

import pandas as pd

dict1=dict(position=['FW','FW','MF','MF','MF','MF','DF','DF','DF','DF','GK'], score=[10,5,4,3,3,2,3,2,1,0,0])
df1 = pd.DataFrame(data=dict1)
   position  score
0        FW     10
1        FW      5
2        MF      4
3        MF      3
4        MF      3
5        MF      2
6        DF      3
7        DF      2
8        DF      1
9        DF      0
10       GK      0

GroupBy.firstを実行する

そのままGroupBy.first関数を実行し結果を確認する。

df1.groupby(['position'])['score'].first()
position
DF     3
FW    10
GK     0
MF     4
Name: score, dtype: int64

グループ化された中で一番最初の値だけが戻り値となっている。 SQLはデータの集合に対して関数を適用し、列を返していたがPandasでは該当の値だけが返ってくる。

ドキュメントにも

Returns Series or DataFrame Computed first of values within each group.

戻り値 系列またはデータフレーム 各グループ内の値の最初の値を計算します。

と書かれていて、動きとしては間違っていないことがわかる。

GroupBy.first関数の値を元のデータフレームに戻してみる

取り出した最初の値を、データフレームにし、カラム名も付与する。

first_score_df = df1.groupby(["position"]).first().reset_index()
first_score_df.columns = ["position", "first_score"]
first_score_df

その後、Pandasのmergeでデータフレーム同士をinner joinで結合すれば完成する。

pd.merge(df1, first_score_df, on="position")
   position  score  first_score
0        FW     10           10
1        FW      5           10
2        MF      4            4
3        MF      3            4
4        MF      3            4
5        MF      2            4
6        DF      3            3
7        DF      2            3
8        DF      1            3
9        DF      0            3
10       GK      0            0

まとめ

GroupBy.first関数を使いながら動きを確認できた。 LAST_VALUE 関数のように使いたい場合はGroupBy.lastを使うと逆の動きになってくれる。

pandas.pydata.org

👇SQLのFIRST_VALUE関数とLAST_VALUE関数はこちら👇

yoshitaku-jp.hatenablog.com

【Pandas】shift関数の動きを確認する

Pandas のshift を使うと、現在の行の値と前後の行の値を比較できる。

pandas.pydata.org

以前SQLの分析関数であるLAGとLEADの動きを確認したが、Pandasではどのように実現していくのか見ていくものになる。LAG関数とLEAD関数も比較して見てもらえると!

yoshitaku-jp.hatenablog.com

実行環境

今回は環境構築がいらないGoogle Colaboratoryを使った。

colab.research.google.com

準備と確認

まずはPandasのインポートと、はじめに使用するデータを用意する。 データはLAG関数で確認したものと同じものにした。

import pandas as pd

dict1=dict(id=[1,2,3], Money=[1000,2000,3000], day=['2021-01-01','2021-02-01','2021-03-01'])
df1 = pd.DataFrame(data=dict1)
df1
   id  Money         day
0   1   1000  2021-01-01
1   2   2000  2021-02-01
2   3   3000  2021-03-01

shiftを実行する

そのままshift関数を実行し結果を確認する。

df1.shift()
    id   Money         day
0  NaN     NaN         NaN
1  1.0  1000.0  2021-01-01
2  2.0  2000.0  2021-02-01

ここでわかることは、1行まるごとshiftされていることである。 id列、Money列、day列全てが1行分ズレていている。 単純に実行しただけではSQLのLAG関数のように「特定列だけズレる」ようには動いてくれないことがわかった。

特定の列だけズラす

特定の列だけズラすために、データを変更する。 変更としては、

  • idとMoneyを6個分増やす
  • day列を削除し、代わりの日付データをインデックスとして再定義した。

shift関数へもパラメータを設定する変更を加えている。 今回はperiodsとfreqを指定する。

公式ドキュメントからDeepL翻訳したものを掲載する。 periodsはLAG関数のoffsetと同じもので、ズラす数を書く。1と書けば1つズレるし、3と書けば3つ分ズレることになる。

periods int シフトする期間の数。正または負を指定できます。

freqはLAG関数には無いものになる。 時系列データに対して、freqでずらす期間を指定できる。 コードの中ではDを指定していて、日単位でズラすようにしている。

freq DateOffset, tseries.offsets, timedelta, or str, optional tseriesモジュールまたはタイムルールから使用するオフセット(例:'EOM')。freqが指定された場合、インデックス値はシフトされますが、データは再調整されません。つまり、シフト時にインデックスを拡張し、>元のデータを維持したい場合は、freqを使用します。freqが "infer "として指定された場合は、インデックスのfreqまたはinferred_freq属性から推測されます。これらの属性が存在しない場合、ValueErrorが投げられます。

実行しているコードにコメントを差し込んでいるので、実際に何をコードを確認してほしい。

# ディクショナリでidとMoneyをdict2変数に定義
dict2=dict(id=[1,2,3,4,5,6], Money=[1000,2000,3000,4000,5000,6000])

# date_range関数で2021-01-01から6日分をday変数に格納
day=pd.date_range("2021-01-01", periods=6)

# dict2とdayを元にDataFrameを作成
df2 = pd.DataFrame(data=dict2,index=day)

# periodsで1ずつ、さらにfreq="D"で日付ごとズラすことを指定し、Money列の値を取得、lag列に入れる
df2['lag'] = df2.shift(periods=1,freq="D")['Money']
df2
            id  Money     lag
2021-01-01   1   1000     NaN
2021-01-02   2   2000  1000.0
2021-01-03   3   3000  2000.0
2021-01-04   4   4000  3000.0
2021-01-05   5   5000  4000.0
2021-01-06   6   6000  5000.0

グループ内でshiftする

LAG関数はpartition byでグループ化したものをズラすことが出来た。 こちらもshift関数では少し工夫が必要になる。

データはDate列を戻し、グループ化させたいので下記データを2つずつ用意した。

  • 2021-01-01
  • 2021-02-01
  • 2021-03-01

再びコードの中で何をしているかはコメントを差し込んでいるので確認してほしい。

# ディクショナリでidとMoneyとDateをdict3変数に定義
dict3=dict(id=[1,2,3,4,5,6], Money=[1000,2000,3000,4000,5000,6000], Date=['2021-01-01','2021-01-01','2021-02-01','2021-02-01','2021-03-01','2021-03-01'])

# dict3を元にDataFrameを作成
df3 = pd.DataFrame(data=dict3)

# df3のDate列を対象にgroupbyを実行、さらにMoney列をshiftしたものをlag列に入れる
df3['lag'] = df3.groupby(['Date'])['Money'].shift()
df3

無事にpartition byと同じことが出来ている事がわかる。

   id  Money        Date     lag
0   1   1000  2021-01-01     NaN
1   2   2000  2021-01-01  1000.0
2   3   3000  2021-02-01     NaN
3   4   4000  2021-02-01  3000.0
4   5   5000  2021-03-01     NaN
5   6   6000  2021-03-01  5000.0

NaNの代わりに値を設定する

shift(fill_value=0)のようにパラメータのfill_valueを使うと、NaNにデフォルトの値としてセットすることができる。 今回は0を設定している。

dict3=dict(id=[1,2,3,4,5,6], Money=[1000,2000,3000,4000,5000,6000], Date=['2021-01-01','2021-01-01','2021-02-01','2021-02-01','2021-03-01','2021-03-01'])
df3 = pd.DataFrame(data=dict3)
df3['lag'] = df3.groupby(['Date'])['Money'].shift(fill_value=0)
df3
   id  Money        Date   lag
0   1   1000  2021-01-01     0
1   2   2000  2021-01-01  1000
2   3   3000  2021-02-01     0
3   4   4000  2021-02-01  3000
4   5   5000  2021-03-01     0
5   6   6000  2021-03-01  5000

まとめ

shift関数を使いながら動きを確認できた。 LEAD 関数のように使いたい場合はshift(-1)のようにマイナス値を設定すると上にずれてくれる。

dict4=dict(id=[1,2,3,4,5,6], Money=[1000,2000,3000,4000,5000,6000], Date=['2021-01-01','2021-01-01','2021-02-01','2021-02-01','2021-03-01','2021-03-01'])
df4 = pd.DataFrame(data=dict4)
df4['lag'] = df4.groupby(['Date'])['Money'].shift(-1,fill_value=0)
print(df4)
   id  Money        Date   lag
0   1   1000  2021-01-01  2000
1   2   2000  2021-01-01     0
2   3   3000  2021-02-01  4000
3   4   4000  2021-02-01     0
4   5   5000  2021-03-01  6000
5   6   6000  2021-03-01     0

そのまま手を動かせるものにしたので、実行して確認してもらえれば👌

👇SQLのLAG関数とLEAD関数はこちら👇

yoshitaku-jp.hatenablog.com

【SQL】RANK関数・DENSE_RANK関数の動きを確認する

SQL の分析関数である RANK と DENSE_RANK を使うと、順序が付けられた値のセットの中で何番目の値かを返すことができる。 今回は RANK と DENSE_RANK を使って、動きを確認していく。

実行環境

テストの場所は、SQL Fiddleで試した。

sqlfiddle.com

エンジンのバージョンはPostgreSQL 9.6を使っている。

https://www.postgresql.jp/document/9.3/html/functions-window.html

データ投入

今回使用するデータはclass,score の列からなっていて、10行分のデータが入っている。

CREATE TABLE RANK_TEST
(
 class  CHAR(3) ,
 score INTEGER
);

BEGIN TRANSACTION;
INSERT INTO RANK_TEST VALUES ('A', 90);
INSERT INTO RANK_TEST VALUES ('A', 90);
INSERT INTO RANK_TEST VALUES ('A', 40);
INSERT INTO RANK_TEST VALUES ('A', 30);
INSERT INTO RANK_TEST VALUES ('A', 20);
INSERT INTO RANK_TEST VALUES ('B', 100);
INSERT INTO RANK_TEST VALUES ('B', 90);
INSERT INTO RANK_TEST VALUES ('B', 40);
INSERT INTO RANK_TEST VALUES ('B', 40);
INSERT INTO RANK_TEST VALUES ('B', 10);
COMMIT;

シンプルな実行

SQL を組み立てて実行する。 RANK 関数とDENSE_RANK関数は次の形で指定する。

RANK ( ) OVER ( [ partition_by_clause ] order_by_clause )  
DENSE_RANK ( ) OVER ( [ <partition_by_clause> ] < order_by_clause > )  

実際の SQL にすると次になる。 score の列を対象に、ランク付けしている。 並び順は score に従っている。

SELECT 
      class
      ,score
      ,RANK() over (order by score desc)
      ,DENSE_RANK() over (order by score desc)
FROM RANK_TEST

class関係なく、スコア通りに順位がランク付けされている。 ここまでRANK関数・DENSE_RANK関数の違いの説明なくきたが、結果を見てもらうとすぐわかると思う。 RANK関数は重複している順位があれば、次の数字は重複している数の分だけスキップされその後順位付けが再開される。 DENSE_RANK関数は重複している順位があっても、次の数字をスキップすることなく、その後順位付けが再開される。 その結果、RANK関数は最後がランク10になっているのに対し、DENSE_RANK関数は最後のランクが6になっている。 DENSEの意味は「(人や物が)密集している」「(物の)密度が高い」である。この場合はランクの数が密集しているとなり、数がスキップされていないことがわかる。

| class | score | rank | dense_rank | |-------|-------|------|------------| | B | 100 | 1 | 1 | | A | 90 | 2 | 2 | | A | 90 | 2 | 2 | | B | 90 | 2 | 2 | | A | 40 | 5 | 3 | | B | 40 | 5 | 3 | | B | 40 | 5 | 3 | | A | 30 | 8 | 4 | | A | 20 | 9 | 5 | | B | 10 | 10 | 6 |

PARTITON BY

PARTITION BYscoreを指定して、クラスごとにスコアを比較する。

SELECT 
      class
      ,score
      ,RANK() over (partition by class order by score desc)
      ,DENSE_RANK() over (partition by class order by score desc)
FROM RANK_TEST

クラスごとにまとめられ、その中でスコアでソート、その後ランク付けがおこなわれているのがわかる。

| class | score | rank | dense_rank |
|-------|-------|------|------------|
|   A   |    90 |    1 |          1 |
|   A   |    90 |    1 |          1 |
|   A   |    40 |    3 |          2 |
|   A   |    30 |    4 |          3 |
|   A   |    20 |    5 |          4 |
|   B   |   100 |    1 |          1 |
|   B   |    90 |    2 |          2 |
|   B   |    40 |    3 |          3 |
|   B   |    40 |    3 |          3 |
|   B   |    10 |    5 |          4 |

まとめ

RANK関数・DENSE_RANK関数を使いながら動きを確認した。 そのまま手を動かせるものにしたので、実行して確認してもらえれば👌

【SQL】FIRST_VALUE関数・LAST_VALUE関数の動きを確認する

SQL の分析関数である FIRST_VALUE と LAST_VALUE を使うと、順序が付けられた値のセットの中で最初の値と最後の値を返すことができる。 今回は FIRST_VALUE を使って、動きを確認していく。 FIRST_VALUE は最初の値、LAST_VALUE は最後の値という違いだけで、構文は 同じになるので、LAST_VALUE を知りたい場合は適宜置換してもらえると!

実行環境

テストの場所は、SQL Fiddleで試した。

sqlfiddle.com

エンジンのバージョンはPostgreSQL 9.6を使っている。

https://www.postgresql.jp/document/9.3/html/functions-window.html

今回はサッカーのポジションとゴール数のデータを投入して動きを確認する。

データ投入

今回使用するデータはposition,score の列からなっていて、11 行分のデータが入っている。

CREATE TABLE FIRST_VALUE_TEST
(
 position  CHAR(6) ,
 score INTEGER
);

BEGIN TRANSACTION;
INSERT INTO FIRST_VALUE_TEST VALUES ('FW', 10);
INSERT INTO FIRST_VALUE_TEST VALUES ('FW', 5);
INSERT INTO FIRST_VALUE_TEST VALUES ('MF', 4);
INSERT INTO FIRST_VALUE_TEST VALUES ('MF', 3);
INSERT INTO FIRST_VALUE_TEST VALUES ('MF', 3);
INSERT INTO FIRST_VALUE_TEST VALUES ('MF', 2);
INSERT INTO FIRST_VALUE_TEST VALUES ('DF', 3);
INSERT INTO FIRST_VALUE_TEST VALUES ('DF', 2);
INSERT INTO FIRST_VALUE_TEST VALUES ('DF', 1);
INSERT INTO FIRST_VALUE_TEST VALUES ('DF', 0);
INSERT INTO FIRST_VALUE_TEST VALUES ('GK', 0);
COMMIT;

シンプルな実行

SQL を組み立てて実行する。 FIRST_VALUE 関数は次の形で指定する。

FIRST_VALUE ( [scalar_expression ] )  [ IGNORE NULLS | RESPECT NULLS ]
    OVER ( [ partition_by_clause ] order_by_clause [ rows_range_clause ] )

実際の SQL にすると次になる。 score の列を対象に、先頭の行を参照している。 並び順は score に従っている。

SELECT
  position,
  score,
  FIRST_VALUE(score) OVER (ORDER BY score DESC)

FROM
  FIRST_VALUE_TEST

全ての行で1人目のFWの10が適応されている。

| position | score | first_value |
|----------|-------|-------------|
|     FW   |    10 |          10 |
|     FW   |     5 |          10 |
|     MF   |     4 |          10 |
|     DF   |     3 |          10 |
|     MF   |     3 |          10 |
|     MF   |     3 |          10 |
|     MF   |     2 |          10 |
|     DF   |     2 |          10 |
|     DF   |     1 |          10 |
|     DF   |     0 |          10 |
|     GK   |     0 |          10 |

PARTITON BY

PARTITON BYを使うとGROUP BYと似たようなことができる。

PARTITION BYpositionを指定して、ポジションごとにスコアを比較する。

SELECT
  position,
  score,
  FIRST_VALUE(score) OVER (partition by position ORDER BY score DESC)

FROM
  FIRST_VALUE_TEST

ポジションごとにまとめられ、その中でscoreでソート、その後比較がおこなわれているのがわかる。 DFはDFの中だけで最高得点者が、FWはFWの中だけで最高得点者が出されている。

position score first_value
DF 3 3
DF 2 3
DF 1 3
DF 0 3
FW 10 10
FW 5 10
GK 0 0
MF 4 4
MF 3 4
MF 3 4
MF 2 4

更に使い方の例として、最高得点者との差を算出するような場合は次のようになる。 これで今トップとどれぐらい差が離れているかがすぐに分かるかと思う。

SELECT
  position,
  score,
  FIRST_VALUE(score) OVER (partition by position ORDER BY score DESC),
  score - FIRST_VALUE(score) OVER (partition by position ORDER BY score DESC) as difference

FROM
  FIRST_VALUE_TEST
position score first_value difference
DF 3 3 0
DF 2 3 -1
DF 1 3 -2
DF 0 3 -3
FW 10 10 0
FW 5 10 -5
GK 0 0 0
MF 4 4 0
MF 3 4 -1
MF 3 4 -1
MF 2 4 -2

IGNORE NULLS と RESPECT NULLS

SQL標準ではIGNORE NULLS と RESPECT NULLSが変更できるが、PostgreSQLでは変更できないので確認ができなかった。 IGNORE NULLS と RESPECT NULLSはnullの扱いをどうするかの設定になる。 設定の変更はできないがPostgreSQLではRESPECT NULLSがデフォルトとなっているので、RESPECT NULLSの動きだけでも確認した。

例として、スコアを計算する必要のない、監督であるCOACHの行をnullで追加してみた。 データとしては次のようになる。

CREATE TABLE FIRST_VALUE_TEST
(
 position  CHAR(6) ,
 score INTEGER
);

BEGIN TRANSACTION;
INSERT INTO FIRST_VALUE_TEST VALUES ('FW', 10);
INSERT INTO FIRST_VALUE_TEST VALUES ('FW', 5);
INSERT INTO FIRST_VALUE_TEST VALUES ('MF', 4);
INSERT INTO FIRST_VALUE_TEST VALUES ('MF', 3);
INSERT INTO FIRST_VALUE_TEST VALUES ('MF', 3);
INSERT INTO FIRST_VALUE_TEST VALUES ('MF', 2);
INSERT INTO FIRST_VALUE_TEST VALUES ('DF', 3);
INSERT INTO FIRST_VALUE_TEST VALUES ('DF', 2);
INSERT INTO FIRST_VALUE_TEST VALUES ('DF', 1);
INSERT INTO FIRST_VALUE_TEST VALUES ('DF', 0);
INSERT INTO FIRST_VALUE_TEST VALUES ('GK', 0);
INSERT INTO FIRST_VALUE_TEST VALUES ('COACH', null);
COMMIT;

このときに全体を対象としたSQLを実行すると先頭行のnullが一番最初の値となり、すべての行に適応されている。 つまり、PostgreSQLではRESPECT NULLSがデフォルトとなっていることがわかった。

SELECT
  position,
  score,
  FIRST_VALUE(score) OVER (ORDER BY score DESC)

FROM
  FIRST_VALUE_TEST
position score first_value
COACH (null) (null)
FW 10 (null)
FW 5 (null)
MF 4 (null)
MF 3 (null)
DF 3 (null)
MF 3 (null)
DF 2 (null)
MF 2 (null)
DF 1 (null)
DF 0 (null)
GK 0 (null)

まとめ

FIRST_VALUE 関数を使いながら動きを確認した。 サッカーのポジションとゴール数を例にして、確認することが出来た。 基準から計算するようなことがあれば使ってみるといいかも。 そのまま手を動かせるものにしたので、実行して確認してもらえれば👌

【SQL】MAX・MIN関数に文字列を指定したときの動きを確認する

SQLのMAX・MIN関数には数値以外にも文字列を指定することが出来ます。 「最大値/ 最小値」と言われると数字しかイメージしてこなかったので、恥ずかしながらとても驚きました。

このあたりの動きをSQL Fiddleを使って確認していきます。

今回もSQL Fiddleを使います。 エンジンはPostgresSQL 9.6です。

sqlfiddle.com

データの準備

まずはデータを準備します。 英単語辞書に載っている最初の単語と最後の単語を検索しデータとしました。

CREATE TABLE test
    ("id" int, "name" varchar(8))
;
    
INSERT INTO test
    ("id", "name")
VALUES
    (1, 'aardvark'),
    (2, 'abandon'),
    (3, 'aback'),
    (4, 'ZZZ')
;

MAX関数

まずはMAX関数です。 アルファベットはA→Zへ大きくなっていくので、Zを含むものが返されます。

select max(name)
from test
| max |
|-----|
| ZZZ |

MIN関数

次にMIN関数です。 Aに向かって小さくなるので、aardvarkが返されます。

select min(name)
from test
|      min |
|----------|
| aardvark |

日本語でも確認してみる

データを日本語にして動きを確認してみます。

CREATE TABLE test
    ("id" int, "name" varchar(10))
;
    
INSERT INTO test
    ("id", "name")
VALUES
    (1, 'おはよう'),
    (2, 'こんにちは'),
    (3, 'こんばんは')
;

MAX関数はこんばんはが返ってきています。

select max(name)
from test;
|   max |
|-------|
| こんばんは |

MIN関数はおはようが返ってきています。

select min(name)
from test;
|  min |
|------|
| おはよう |

注意するべきところ

次のデータを投入したときにMIN関数が返すものは、「おはよう」の結果から「おはようございます」になると思いますが、実は違う結果になります。

CREATE TABLE test
    ("id" int, "name" varchar(10))
;
    
INSERT INTO test
    ("id", "name")
VALUES
    (1, 'おはようございます'),
    (2, 'こんにちは'),
    (3, 'こんばんは')
;

今回の場合は「こんにちは」が返ってきます。 これは想定外の動きをしているわけではなく、文字列型の場合は文字コードとしての判定がおこなわれているからになります。 実際に使う場合は、文字コードで判定されていることを忘れずに使ってほしいと思います。

|   min |
|-------|
| こんにちは |

【SQL】LAG関数・LEAD関数の動きを確認する

SQL の分析関数である LAG と LEAD を使うと、現在の行の値と前後の行の値を比較できる。 今回は LAG を使って、動きを確認していく。 LAG は前の行、LEAD は後ろの行という違いだけで、構文は 同じになるので、LEAD を知りたい場合は適宜置換してもらえると!

実行環境

テストの場所としては以前も使った、SQL Fiddleで試した。 ブラウザ上でサクサク試せるので便利!

sqlfiddle.com

yoshitaku-jp.hatenablog.com

エンジンのバージョンはPostgreSQL 9.6を使っている。

https://www.postgresql.jp/document/9.3/html/functions-window.html

まずは小さなデータを投入してシンプルな動きを確認する。 その後、少しずつパラメータを追加していき、さらに動きを確認する。

データ投入

id,money,date の列からなっていて、3 行分のデータが入っている。わかりやすくするため 1 ずつ数字を上がっていくものとした。

CREATE TABLE LAG_TEST
(id     CHAR(4) NOT NULL,
 money  INTEGER ,
 date      DATE ,
  PRIMARY KEY (id));

BEGIN TRANSACTION;
INSERT INTO LAG_TEST VALUES ('0001', 1000, '2021-01-01');
INSERT INTO LAG_TEST VALUES ('0002', 2000, '2021-02-01');
INSERT INTO LAG_TEST VALUES ('0003', 3000, '2021-03-01');
COMMIT;

シンプルな実行

SQL を組み立てて実行する。 LAG 関数は次の形で指定する。

LAG (scalar_expression [,offset] [,default])
    OVER ( [ partition_by_clause ] order_by_clause )

実際の SQL にすると次になる。 money の列を対象に、1 行ずつ前のものを参照している。 並び順は date に従っている。

SELECT
  id,
  money,
  date,
  LAG(money, 1) OVER (ORDER BY date)

FROM
  LAG_TEST

id: 0002id: 0001の money を参照していて、id: 0003id: 0002の money を参照している。 無事に 1 つ前の行の値を見ていることがわかる。

id money date lag
0001 1000 2021-01-01 (null)
0002 2000 2021-02-01 1000
0003 3000 2021-03-01 2000

offset を変更してみる

先程は offset1 にして、1 つ前の行を参照することとした。 offset の値を 2 に変えてみる。

SELECT
  id,
  money,
  date,
  LAG(money, 2) OVER (ORDER BY date)

FROM
  LAG_TEST

id0003の行が参照している値が変化した。

先程は、id: 0002id: 0001の money を参照していて、id: 0003id: 0002の money を参照していた。 今は、id: 0002nullとなりid: 0003id: 0001の money を参照している。 これは参照する行が 2 つ前の行の値を見ることとなったからである。

id money date lag
0001 1000 2021-01-01 (null)
0002 2000 2021-02-01 (null)
0003 3000 2021-03-01 1000

デフォルトを設定してみる

次にLag関数の引数を増やしてみる。 この部分はデフォルトを指定する部分になる。 offset2 のままにして、後ろに 0 を追加する。

LAG(money, 2, 0) OVER (ORDER BY date)

SELECT
  id,
  money,
  date,
  LAG(money, 2, 0) OVER (ORDER BY date)

FROM
  LAG_TEST

「offset を変更してみる」では、参照していない行nullとなっていたが、今回は 0 が入っている。 defaultを設定すればその値が、設定しなければnullを設定することがわかった。

id money date lag
0001 1000 2021-01-01 0
0002 2000 2021-02-01 0
0003 3000 2021-03-01 1000

OVER 句

次に OVER 句を見ていく。 OVER 句の中ではORDER BYPARTITON BYを指定できる。

ORDER BY

ORDER BYは通常の SQL と同様、並び替えができる。 ここではdescを末尾に設定し、並び順を逆転させている。

SELECT
  id,
  money,
  date,
  LAG(money, 2, 0) OVER (ORDER BY date desc)

FROM
  LAG_TEST

ここで注意するべきことは、LAG 関数で参照した後に並び替えるのではなく、並び替えてから LAG 関数が適用されていることである。

id money date lag
0003 3000 2021-03-01 0
0002 2000 2021-02-01 0
0001 1000 2021-01-01 3000
LAG関数を適用してから...

| id   | money | date       | lag  |
| ---- | ----- | ---------- | ---- |
| 0001 | 1000  | 2021-01-01 | 0    |
| 0002 | 2000  | 2021-02-01 | 0    |
| 0003 | 3000  | 2021-03-01 | 1000 |

dateで並び替えているわけではない。

| id   | money | date       | lag  |
| ---- | ----- | ---------- | ---- |
| 0003 | 3000  | 2021-03-01 | 1000 |
| 0002 | 2000  | 2021-02-01 | 0    |
| 0001 | 1000  | 2021-01-01 | 0    |

PARTITON BY

PARTITON BYを使うとGROUP BYと似たようなことができる。 GROUP BYはグループでまとめた行を表示するが、PARTITON BYはグループ単位で仕切って行を表示するイメージになる。

PARTITION BY GROUP BY
グループ単位で仕切る グループ単位でまとめる

ここで少しデータを増やす。 4 月分のデータと、比較用の昨年度データを投入した。

CREATE TABLE LAG_TEST
(id     CHAR(4) NOT NULL,
 year  INTEGER ,
 month  INTEGER ,
 day  INTEGER ,
 money  INTEGER ,
 date      DATE ,
 PRIMARY KEY (id));

BEGIN TRANSACTION;
INSERT INTO LAG_TEST VALUES ('0001','2020','01','01', 100, '2020-01-01');
INSERT INTO LAG_TEST VALUES ('0002','2020','02','01', 200, '2020-02-01');
INSERT INTO LAG_TEST VALUES ('0003','2020','03','01', 300, '2020-03-01');
INSERT INTO LAG_TEST VALUES ('0004','2020','04','01', 400, '2020-04-01');
INSERT INTO LAG_TEST VALUES ('0005','2021','01','01', 500, '2021-01-01');
INSERT INTO LAG_TEST VALUES ('0006','2021','02','01', 600, '2021-02-01');
INSERT INTO LAG_TEST VALUES ('0007','2021','03','01', 700, '2021-03-01');
INSERT INTO LAG_TEST VALUES ('0008','2021','04','01', 800, '2021-04-01');
COMMIT;

PARTITION BYmonthを指定して、昨年との値を月ごとに比較する。

SELECT
  year,
  month,
  money,
  LAG(money, 1, 0) OVER (PARTITION BY month ORDER BY date) AS 昨年の値,
FROM
  LAG_TEST

PARTITON BYはグループ単位で仕切って行を表示するイメージ

と書いたが、1 月、2 月と月ごとにまとめられ、その中で比較がおこなわれているのがわかる。 3 行の| 2020 | 2 | 200 | 0 |は前の行の500を参照していないことからも理解できると思う。

year month money 昨年の値
2020 1 100 0
2021 1 500 100
2020 2 200 0
2021 2 600 200
2020 3 300 0
2021 3 700 300
2020 4 400 0
2021 4 800 400

まとめ

LAG 関数を使いながら動きを確認できた。 LEAD 関数は LAG と置き換えるだけと書いたが実際そのとおりになる。 試しに最後の SQL を LEAD 関数に置き換えてみる。列名も「翌年の値」と変更しておく。

SELECT
  year,
  month,
  money,
  LEAD(money, 1, 0) OVER (PARTITION BY month ORDER BY date) AS 翌年の値
FROM
  LAG_TEST
year month money 翌年の値
2020 1 100 500
2021 1 500 0
2020 2 200 600
2021 2 600 0
2020 3 300 700
2021 3 700 0
2020 4 400 800
2021 4 800 0

分析というと、現在から過去に向かっていくイメージを持つ人が多くいると思ったので、LEAD 関数ではなく LAG 関数で手を動かしてみた。 そのまま手を動かせるものにしたので、実行して確認してもらえれば👌

【Databricks】マネージドテーブルとアンマネージドテーブルの違い

Databricksで saveAsTable()関数を使うと、Spark SQL テーブルを作成できる。今回はsaveAsTable()関数を使って作成される「マネージドテーブル」「アンマネージドテーブル」の違いを見ていく。

マネージドテーブルとアンマネージドテーブルの違い

すべての Spark SQL テーブルには、スキーマとデータ自体を格納するメタデータ情報が含まれています。マネージテーブル は spark SQL テーブルで、spark はデータとメタデータの両方を管理します。 マネージテーブルの場合、Databricks はメタデータとデータをアカウントの DBFS に格納します。 Spark SQL はテーブルを管理するため、を実行する DROP TABLE example_data と、メタデータとデータの両方が 削除されます。

別の方法として、データの場所を制御しながら、Spark SQLメタデータを管理することもできます。 これを アンマネージテーブル と呼びます。 Spark SQL は関連するメタデータを管理するため、を実行すると DROP TABLE <example-table> 、データ自体ではなく、メタデータのみが削除されます。 指定したパスには、データがまだ存在しています。

マネージドテーブルはDatabricks上でデータとメタデータを持っている。アンマネージドテーブルはDatabricks上でメタデータは持っているが、データは外部のストレージに持っていることになる。整理すると次の表になる。

マネージドテーブル アンマネージドテーブル
メタデータ Databricks ファイル システム Databricks ファイル システム
データ Databricks ファイル システム 外部ストレージ

ここからは実際に手を動かして、データの投入、データの更新、テーブルの削除をおこない動きを見ていく。

データの準備

簡単なデータを用意する。SparkSessionのrange()関数を使ってID列に1から10の値が存在するDataFrameを用意した。

df = spark.range(10)
display(df)
#id
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9

マネージドテーブルの作成

saveAsTable() 関数を実行するとマネージドテーブルが作成される。Dataのタブからmanaged_tableと名付けられたテーブルが作成されている。1~10の値も確認ができる。

df.write.saveAsTable("managed_table")

f:id:yoshitaku_jp:20210526145701p:plain

f:id:yoshitaku_jp:20210526145737p:plain

アンマネージドテーブルの作成

saveAsTable() 関数の前にoption('path', "<YOUR_PATH>")を実行するとアンマネージドテーブルが作成される。 その前の準備段階として、DBFS上の<YOUR_PATH>に外部のストレージをマウントをおこなう必要がある。 今回保存先にはAzure Data Lake Storage Gen2を選択した。 ADLSの作成手順は割愛する。

保存先のマウント

まずはセル上で変数の定義をおこなう。

storageaccount=""#<ストレージアカウント名>
storageaccountkey= ""#<ストレージアカウントのキー>
containername="" #<container名>

次に、mount()関数を使い、ADLSのURL、マウントするDBFSのパス、ストレージアカウントのキーを設定し実行する。 これでDBFSの/mnt/unmanaged_tableにADLSがマウントされる。

dbutils.fs.mount(
  source = "wasbs://" + containername + "@" + storageaccount + ".blob.core.windows.net/",
  mount_point = "/mnt/unmanaged_table",
  extra_configs = {"fs.azure.account.key."+storageaccount+".blob.core.windows.net":storageaccountkey})

option('path', "<YOUR_PATH>")を付けたsaveAsTable() 関数を実行するとアンマネージドテーブルが作成される。 今回は/mnt/unmanaged_tableにマウントしたので、こちらを指定する。

df.write.option('path', "/mnt/unmanaged_table").saveAsTable("unmanaged_table")

f:id:yoshitaku_jp:20210526212501p:plain f:id:yoshitaku_jp:20210526212524p:plain

マウント先のAzure Data Lake Storage Gen2のコンテナにもデータが保存されていることがわかる。

f:id:yoshitaku_jp:20210526212432p:plain

テーブルの更新

テーブルの内容を更新することもできる。 マネージドテーブルもアンマネージドテーブルもmode("overwrite")を付けて実行するだけになる。

マネージドテーブル

df = spark.range(20)
df.write.mode("overwrite").saveAsTable("managed_table") 

f:id:yoshitaku_jp:20210526212807p:plain

アンマネージドテーブル

df.write.mode("overwrite").option("path","/mnt/unmanaged_table").saveAsTable("unmanaged_table") 

f:id:yoshitaku_jp:20210526212859p:plain

ADLS上のデータも増えている。

f:id:yoshitaku_jp:20210526212938p:plain

テーブルの削除

テーブルの削除はSQLでおこなう。 ここまでPythonで作業してきたのでマジックコマンドの%sqlを付けて、DROP TABLEを実行する。 dataタブからテーブルが両方とも削除されていることを確認する。

%sql
DROP TABLE  managed_table
#OK
%sql
DROP TABLE  unmanaged_table
#OK

Databricks上からはmanaged_tableとunmanaged_tableが削除されている。

f:id:yoshitaku_jp:20210526214444p:plain

アンマネージドテーブルで作成したデータはストレージから削除されていないことが確認できる。

f:id:yoshitaku_jp:20210527091505p:plain

運用上の注意点

AzureDatabricksBestPracticesでは、本番データをDBFSフォルダに置かないよう注意している。

  • デフォルトのDBFSのライフサイクルは、ワークスペースに関連付けられています。ワークスペースを削除すると、デフォルトのDBFSも削除され、その内容が完全に削除されます。
  • このデフォルトフォルダとその内容へのアクセスを制限することはできません。

ワークスペースを削除するとDBFSも自動的に削除されてしまうことと、DBFS上のデータに対してセキュリティ的に制限をかけることが出来ないのでやめておいたほうがいいといった内容になる。

一番最初に登場した表で再度整理すると次のようになる。

マネージドテーブル アンマネージドテーブル
メタデータ Databricks ファイル システム Databricks ファイル システム
データ Databricks ファイル システム 外部ストレージ
本番データ 非推奨 推奨

まとめ

Databricks上で作成されるマネージドテーブルとアンマネージドを作成し動きを確認した。運用上の注意ドキュメントを見つけて改めて違いを確認したが、頭の中を整理できてよかった👌動くコードを提示できたので誰かの役に立てれば🙏