よしたく blog

IT技術を中心に様々なことを書いています

GitHub ActionsでSQLFluffを実行する

SQLのLinterであるSQLfluffをCI /CDで回したかったので、GitHub Actionsで試してみた。今回は

  • PR上でCIが回る
  • checkout時のデータ量削減で、fetch-depth: 1の設定
  • リポジトリ内の全てのSQLファイルにLinterを実行するため、find . -name '*.sql' -type f | xargs -I {} sqlfluff lint {} --dialect bigquery
    • sqlfluff lint **/*.sqlで全て実行できるかと思ったが、うまく実行できなかった

とした。

sql_lint.yaml

name: SQLFluff

on: [pull_request]

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: "actions/checkout@v3"
        with:
          fetch-depth: 1
          ref: ${{ github.head_ref }}

      - uses: "actions/setup-python@v2"
        with:
          python-version: "3.8"

      - name: install sqlfluff
        run: pip install sqlfluff
        
      - name: run sqlfluff
        run: find . -name '*.sql' -type f | xargs -I {} sqlfluff lint {} --dialect bigquery

BigQueryのメタデータからDDL文を確認する

BigQueryのメタデータからDDL文を確認する方法を調べた。以前、過去に作ったテーブルのDDL文がわからず困った時に役立った。 INFORMATION_SCHEMA.TABLESddlカラムにDDL文が存在している。

SELECT
  table_name, ddl
FROM
  `<project_id>.<dataset_name>.INFORMATION_SCHEMA.TABLES`
 WHERE
  table_name = '<target_table>'
;

BigQueryのpublic-dataからbaseballデータセットのschedulesテーブルのDDL文を確認したいときは次のようになる。

SELECT
  table_name, ddl
FROM
  `bigquery-public-data.baseball.INFORMATION_SCHEMA.TABLES`
 WHERE
  table_name = 'schedules'

出力結果

table_name ddl
schedules CREATE TABLE bigquery-public-data.baseball.schedules
(
gameId STRING,
gameNumber INT64,
seasonId STRING,
year INT64,
type STRING,
dayNight STRING,
duration STRING,
duration_minutes INT64,
homeTeamId STRING,
homeTeamName STRING,
awayTeamId STRING,
awayTeamName STRING,
startTime TIMESTAMP,
attendance INT64,
status STRING,
created TIMESTAMP
);

【Python】ETLツールのLuigiをさわってみた

前回に引き続き、ETLツールをいろいろさわってみたいと思ったところから、今回はPython製のETLツールLuigiをさわってみた。

インストール

pip install luigi

バージョン確認

$ python --version
Python 3.9.1

luigi 3.2.0

サンプルコード

import luigi


class Hello(luigi.Task):
    def run(self):
        out = self.output()
        with out.open("w") as f:
            f.write("Hello")

    def output(self):
        return luigi.LocalTarget("hello.txt")


class World(luigi.Task):
    def requires(self):
        return Hello()

    def run(self):
        input_ = self.input()
        output = self.output()

        out = self.output()
        with out.open("w") as f:
            f.write("World")

    def output(self):
        return luigi.LocalTarget("World.txt")


def main():
    luigi.run(main_task_cls=World, local_scheduler=True)


if __name__ == "__main__":
    main()

実行結果

無事にHelloとWorldが出力されている。

$ python hello_world.py
DEBUG: Checking if World() is complete
DEBUG: Checking if Hello() is complete
INFO: Informed scheduler that task   World__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Hello__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running   Hello()
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done      Hello()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Hello__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running   World()
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done      World()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   World__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Hello()
    - 1 World()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

luigiの感想

Prefectはタスクとフローという概念があった。

yoshitaku-jp.hatenablog.com

luigiは、Worldクラスの中でHelloをrequires の形で呼んでいるところから、タスクを連続で擬似的なフローを実行していく思想で構成されているように感じた。個人的には、一つの処理の流れがまとまって見えたほうが見通しが良いと感じるので、luigiのWorldタスクの中でHelloタスクを参照している形は処理の流れが追いくくなるので苦手意識を感じた。

一方で、タスクの実行履歴コントロールをファイル出力で管理している点は個人的には手軽で良いと感じた。サンプルコードではoutput関数を呼んでいる部分になる。実行後のディレクトリ配下は次のようになっている。

$ ls
Pipfile  Pipfile.lock  World.txt  hello.txt  hello_world.py

実行履歴のコントロールは実際の運用面を考えてくると出る問題であるが、ファイル出力ができるということはクラウドのストレージを選べるということであり、安く運用することができる可能性が高まる。また、使いやすさの面でもブラウザ上からストレージ操作することも可能になるので、RDBで管理するよりも運用のハードルも下がると感じる。

【Python】ETLツールのPrefectをさわってみた

ETLツールをいろいろさわってみたいと思って、Python製のETLツールPrefectをさわってみた。

インストール

pip install prefect

バージョン確認

$ python --version
Python 3.11.1

$ prefect --version
2.8.3

サンプルコード

Prefect公式で提供されている「Web APIにアクセスしてその値を返すフローとタスク」のサンプルコードを動かしてみた。 アクセス先だけは変更し、Brew Dogのビール情報を提供しているジョークのWeb APIにしてみた。

ETLのフローにはflowのデコレーターを、ETLで実際に動かしたいタスクにはtaskのデコレーターを付ける。

import requests
from prefect import flow, task

@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()

@flow
def api_flow(url):
    fact_json = call_api(url)
    return fact_json

print(api_flow("https://api.punkapi.com/v2/beers/random"))

実行結果(各行の間に改行を入れた)

15:00:43.902 | INFO    | prefect.engine - Created flow run 'tunneling-clam' for flow 'api-flow'

15:00:44.314 | INFO    | Flow run 'tunneling-clam' - Created task run 'call_api-0' for task 'call_api'

15:00:44.316 | INFO    | Flow run 'tunneling-clam' - Executing 'call_api-0' immediately...

200

15:00:45.032 | INFO    | Task run 'call_api-0' - Finished in state Completed()

15:00:45.162 | INFO    | Flow run 'tunneling-clam' - Finished in state 
Completed()

[{'id': 284, 'name': 'Hello My Name Is Helga', 'tagline': 'Cherry Double IPA.', 'first_brewed': '2017', 'description': 'Brewed exclusively for the German market, this Hello My Name brew features a twist of flavour inspired by Germany.', 'image_url': None, 'abv': 8.2, 'ibu': 70, 'target_fg': 1009, 'target_og': 1070, 'ebc': 15, 'srm': 8, 'ph': 4.4, 'attenuation_level': 87, 'volume': {'value': 20, 'unit': 'litres'}, 'boil_volume': {'value': 25, 'unit': 'litres'}, 'method': {'mash_temp': [{'temp': {'value': 66, 'unit': 'celsius'}, 'duration': 65}], 'fermentation': {'temp': {'value': 19, 'unit': 'celsius'}}, 'twist': None}, 'ingredients': {'malt': [{'name': 'Pale Ale', 'amount': {'value': 5.52, 'unit': 'kilograms'}}, {'name': 'Caramalt', 'amount': {'value': 0.12, 'unit': 'kilograms'}}], 'hops': [{'name': 'Simcoe', 'amount': {'value': 24, 'unit': 'grams'}, 'add': '90', 'attribute': 'Bittering'}, {'name': 'Chinook', 'amount': {'value': 20, 'unit': 'grams'}, 'add': '30', 'attribute': 'Flavour'}, {'name': 'Simcoe', 'amount': {'value': 30, 'unit': 'grams'}, 'add': '0', 'attribute': 'Aroma'}, {'name': 'Citra', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Chinook', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Centennial', 'amount': {'value': 20, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Simcoe', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}], 'yeast': 'Wyeast 1272 - American Ale II™'}, 'food_pairing': ['Roast pork chops', 'Beef in port stew', 'Cherry frangipane tart'], 'brewers_tips': 'Morello Cherries are the go to cherry variety for this beer (and Krieks), the sour compliments the residual sweetness of the malt. Works well if you reduce your IBU to the 20 to 30 range too.', 'contributed_by': 'John Jenkman <johnjenkman>'}]

この数行だけで、フローとタスクが作成できたので、とてもお手軽に感じられた。 また、フローが終わったログが出た後に、return fact_jsonの中身が出力されている点は実装する上で気をつけなければいけなさそうかなと思った。

15:00:45.162 | INFO    | Flow run 'tunneling-clam' - Finished in state 

Completed()

-- Prefectのフローが終わったログが出ている。
-- この後に、Web APIのレスポンスであるjsonの中身が出ている。

[{'id': 284, 'name': 'Hello My Name Is Helga', 'tagline': 'Cherry Double IPA.', 'first_brewed': '2017', 'description': 'Brewed exclusively for the German market, this Hello My Name brew features a twist of flavour inspired by Germany.', 'image_url': None, 'abv': 8.2, 'ibu': 70, 'target_fg': 1009, 'target_og': 1070, 'ebc': 15, 'srm': 8, 'ph': 4.4, 'attenuation_level': 87, 'volume': {'value': 20, 'unit': 'litres'}, 'boil_volume': {'value': 25, 'unit': 'litres'}, 'method': {'mash_temp': [{'temp': {'value': 66, 'unit': 'celsius'}, 'duration': 65}], 'fermentation': {'temp': {'value': 19, 'unit': 'celsius'}}, 'twist': None}, 'ingredients': {'malt': [{'name': 'Pale Ale', 'amount': {'value': 5.52, 'unit': 'kilograms'}}, {'name': 'Caramalt', 'amount': {'value': 0.12, 'unit': 'kilograms'}}], 'hops': [{'name': 'Simcoe', 'amount': {'value': 24, 'unit': 'grams'}, 'add': '90', 'attribute': 'Bittering'}, {'name': 'Chinook', 'amount': {'value': 20, 'unit': 'grams'}, 'add': '30', 'attribute': 'Flavour'}, {'name': 'Simcoe', 'amount': {'value': 30, 'unit': 'grams'}, 'add': '0', 'attribute': 'Aroma'}, {'name': 'Citra', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Chinook', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Centennial', 'amount': {'value': 20, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Simcoe', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}], 'yeast': 'Wyeast 1272 - American Ale II™'}, 'food_pairing': ['Roast pork chops', 'Beef in port stew', 'Cherry frangipane tart'], 'brewers_tips': 'Morello Cherries are the go to cherry variety for this beer (and Krieks), the sour compliments the residual sweetness of the malt. Works well if you reduce your IBU to the 20 to 30 range too.', 'contributed_by': 'John Jenkman <johnjenkman>'}]

Gitで空のコミットを作る

チームメンバーへの共有の意味を兼ねてプルリクエストを作り、作業を始める前の作業想定やメモなどを書きたい時があった。しかし、そのプルリクエストを作るために、何かしら作業をしてコミットとプッシュをしなければいけなくなってしまうというジレンマもあった。 そのため、プルリクエストを作るだけの空のコミットを作れる方法がないか探していたら、まさにその通りの--allow-emptyオプションがあった。

git commit --allow-empty -m "作業開始"

メッセージなしのコミットは作れないので、作業開始とか入れておくとわかりやすそう。 git initした直後に空のコミットを作っておくと、のちのちgit rebaseをするときも便利