よしたく blog

ほぼ週刊で記事を書いています

【Python】ETLツールのPrefectをさわってみた

ETLツールをいろいろさわってみたいと思って、Python製のETLツールPrefectをさわってみた。

インストール

pip install prefect

バージョン確認

$ python --version
Python 3.11.1

$ prefect --version
2.8.3

サンプルコード

Prefect公式で提供されている「Web APIにアクセスしてその値を返すフローとタスク」のサンプルコードを動かしてみた。 アクセス先だけは変更し、Brew Dogのビール情報を提供しているジョークのWeb APIにしてみた。

ETLのフローにはflowのデコレーターを、ETLで実際に動かしたいタスクにはtaskのデコレーターを付ける。

import requests
from prefect import flow, task

@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()

@flow
def api_flow(url):
    fact_json = call_api(url)
    return fact_json

print(api_flow("https://api.punkapi.com/v2/beers/random"))

実行結果(各行の間に改行を入れた)

15:00:43.902 | INFO    | prefect.engine - Created flow run 'tunneling-clam' for flow 'api-flow'

15:00:44.314 | INFO    | Flow run 'tunneling-clam' - Created task run 'call_api-0' for task 'call_api'

15:00:44.316 | INFO    | Flow run 'tunneling-clam' - Executing 'call_api-0' immediately...

200

15:00:45.032 | INFO    | Task run 'call_api-0' - Finished in state Completed()

15:00:45.162 | INFO    | Flow run 'tunneling-clam' - Finished in state 
Completed()

[{'id': 284, 'name': 'Hello My Name Is Helga', 'tagline': 'Cherry Double IPA.', 'first_brewed': '2017', 'description': 'Brewed exclusively for the German market, this Hello My Name brew features a twist of flavour inspired by Germany.', 'image_url': None, 'abv': 8.2, 'ibu': 70, 'target_fg': 1009, 'target_og': 1070, 'ebc': 15, 'srm': 8, 'ph': 4.4, 'attenuation_level': 87, 'volume': {'value': 20, 'unit': 'litres'}, 'boil_volume': {'value': 25, 'unit': 'litres'}, 'method': {'mash_temp': [{'temp': {'value': 66, 'unit': 'celsius'}, 'duration': 65}], 'fermentation': {'temp': {'value': 19, 'unit': 'celsius'}}, 'twist': None}, 'ingredients': {'malt': [{'name': 'Pale Ale', 'amount': {'value': 5.52, 'unit': 'kilograms'}}, {'name': 'Caramalt', 'amount': {'value': 0.12, 'unit': 'kilograms'}}], 'hops': [{'name': 'Simcoe', 'amount': {'value': 24, 'unit': 'grams'}, 'add': '90', 'attribute': 'Bittering'}, {'name': 'Chinook', 'amount': {'value': 20, 'unit': 'grams'}, 'add': '30', 'attribute': 'Flavour'}, {'name': 'Simcoe', 'amount': {'value': 30, 'unit': 'grams'}, 'add': '0', 'attribute': 'Aroma'}, {'name': 'Citra', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Chinook', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Centennial', 'amount': {'value': 20, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Simcoe', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}], 'yeast': 'Wyeast 1272 - American Ale II™'}, 'food_pairing': ['Roast pork chops', 'Beef in port stew', 'Cherry frangipane tart'], 'brewers_tips': 'Morello Cherries are the go to cherry variety for this beer (and Krieks), the sour compliments the residual sweetness of the malt. Works well if you reduce your IBU to the 20 to 30 range too.', 'contributed_by': 'John Jenkman <johnjenkman>'}]

この数行だけで、フローとタスクが作成できたので、とてもお手軽に感じられた。 また、フローが終わったログが出た後に、return fact_jsonの中身が出力されている点は実装する上で気をつけなければいけなさそうかなと思った。

15:00:45.162 | INFO    | Flow run 'tunneling-clam' - Finished in state 

Completed()

-- Prefectのフローが終わったログが出ている。
-- この後に、Web APIのレスポンスであるjsonの中身が出ている。

[{'id': 284, 'name': 'Hello My Name Is Helga', 'tagline': 'Cherry Double IPA.', 'first_brewed': '2017', 'description': 'Brewed exclusively for the German market, this Hello My Name brew features a twist of flavour inspired by Germany.', 'image_url': None, 'abv': 8.2, 'ibu': 70, 'target_fg': 1009, 'target_og': 1070, 'ebc': 15, 'srm': 8, 'ph': 4.4, 'attenuation_level': 87, 'volume': {'value': 20, 'unit': 'litres'}, 'boil_volume': {'value': 25, 'unit': 'litres'}, 'method': {'mash_temp': [{'temp': {'value': 66, 'unit': 'celsius'}, 'duration': 65}], 'fermentation': {'temp': {'value': 19, 'unit': 'celsius'}}, 'twist': None}, 'ingredients': {'malt': [{'name': 'Pale Ale', 'amount': {'value': 5.52, 'unit': 'kilograms'}}, {'name': 'Caramalt', 'amount': {'value': 0.12, 'unit': 'kilograms'}}], 'hops': [{'name': 'Simcoe', 'amount': {'value': 24, 'unit': 'grams'}, 'add': '90', 'attribute': 'Bittering'}, {'name': 'Chinook', 'amount': {'value': 20, 'unit': 'grams'}, 'add': '30', 'attribute': 'Flavour'}, {'name': 'Simcoe', 'amount': {'value': 30, 'unit': 'grams'}, 'add': '0', 'attribute': 'Aroma'}, {'name': 'Citra', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Chinook', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Centennial', 'amount': {'value': 20, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}, {'name': 'Simcoe', 'amount': {'value': 40, 'unit': 'grams'}, 'add': 'Dry Hop', 'attribute': 'Aroma'}], 'yeast': 'Wyeast 1272 - American Ale II™'}, 'food_pairing': ['Roast pork chops', 'Beef in port stew', 'Cherry frangipane tart'], 'brewers_tips': 'Morello Cherries are the go to cherry variety for this beer (and Krieks), the sour compliments the residual sweetness of the malt. Works well if you reduce your IBU to the 20 to 30 range too.', 'contributed_by': 'John Jenkman <johnjenkman>'}]

Gitで空のコミットを作る

チームメンバーへの共有の意味を兼ねてプルリクエストを作り、作業を始める前の作業想定やメモなどを書きたい時があった。しかし、そのプルリクエストを作るために、何かしら作業をしてコミットとプッシュをしなければいけなくなってしまうというジレンマもあった。 そのため、プルリクエストを作るだけの空のコミットを作れる方法がないか探していたら、まさにその通りの--allow-emptyオプションがあった。

git commit --allow-empty -m "作業開始"

メッセージなしのコミットは作れないので、作業開始とか入れておくとわかりやすそう。 git initした直後に空のコミットを作っておくと、のちのちgit rebaseをするときも便利

【Google Colab】ログインしたGCPからログアウトする

Google ColabからGCPへログインした後、ユーザを切り替えたいなどの理由でログアウトしたいときに実行する方法です。

  1. gcloudのrevokeコマンドを使ってログアウトする
! gcloud auth revoke
  1. Google Colabのランタイムを終了させる
from google.colab import runtime
runtime.unassign()

yoshitaku-jp.hatenablog.com

【Google Colab】GCPへログインする

Google ColabからGCPへアクセスしたい場合は、次のコードを実行するとGoogleアカウントへのログインフォームが出てきます。 ログインが完了すると、Google Clund SDKのリクエスト確認の画面へ移動し、「許可」をクリックします。 次に認証用コードが発行され、認証コードをGoogle Colabの「Enter Valification Code」の欄に入力をすることで、GCPとの連携が完了します。

from google.colab import auth 
auth.authenticate_user()

yoshitaku-jp.hatenablog.com

【Google Colab】%%writefile でセルの内容をファイルに書き出す

セルの内容をファイルに書き出す

マジックコマンドの%%writefileを使うことでセルの内容を書き出す事ができる。

%%bash
ls

sample_data
%%writefile sample.txt
test1
test2
%%bash
ls

sample_data sample.txt
%%bash
cat sample.txt

test1 test2

セルの内容をファイルに追記

-aオプションを使うと、追記することができる

%%writefile -a sample.txt
test3

Appending to sample.txt
%%bash
cat sample.txt

test1 test2 test3

応用

ファイルに書き出した内容を呼び出すこともできるので、Pythonプログラムを書き出して別のノートブックから呼び出すと言ったこともできる。

%%writefile hello.py
print('Hello world')
%run hello.py

Hello world
%%writefile pi.py
import math
print(math.pi)
%run pi.py

3.141592653589793

【VS Code】Edit CSVを使って、ExcelライクにCSVを編集する

VS Code拡張機能のEdit CSVがとても便利だったので紹介する。

概要

Edit CSVVS code拡張機能で、CSVファイルをExcel操作のように編集することができる。

紹介ページにも、次のように書いてある。

This extensions allows you to edit csv files with an excel like table ui

この拡張機能を使用すると、Excel のようなテーブル UI を使用して csv ファイルを編集できます。

CSV拡張機能としてはRainbow CSVが有名でとても見やすくなって重宝している。一方で、一部のデータが長かったり、項目が多すぎたりすると、対応している部分がわかりづらくなってしまう面もあり、そういったときにEdit CSVが便利だった。

使い方

Edit csvをインストールした後、CSVファイルをVS Codeで開くと右上に「Edit csv」が現れる。

クリックすると、「Edit csv」でCSVファイルを開ける。

別タブが展開され、Edit csv画面でCSVファイルが開ける。

編集

ここからは紹介ページにもあったように、ExcelのようなUIで操作ができる。12行目の値を左から順に2,い,11,bに置き換えてみた。一つ不満としては、今現在編集しているセルかどうか、どこにカーソルが合っているのかが分かりづらかったので、この部分は今後改善されると嬉しい。

この状態は、まだ編集状態なのでCSVファイル本体へ変更を適用してみる。

CSVファイルへ反映と保存

CSVファイルへの反映と保存は次の2つのボタンからできる。

  • Apply changes to file
  • Apply changes to file and save

Apply changes to fileがファイルへの反映のみで、Apply changes to file and saveがsaveとあるように反映と保存となっている。

ファイルへの反映 / Apply changes to file

まずはファイルへの反映をしてみる。 2,い,11,bとなっている状態でApply changes to fileをクリックしてみる。

12行目の赤枠の値が上書きされたことがわかる。 また、赤矢印が指すようにファイルが編集中で未保存となっていることもわかる。

ファイルへの保存 / Apply changes to file and save

ファイルへの反映と同じことをして保存してみる。 Apply changes to file and saveをクリックすると、CSVファイルに変更が反映され、またファイルも保存されている。その証拠として、未保存時のマークが消えていることがわかる。

その他の便利機能

ExcelのようにCSVを表示してデータを編集する以外に、ヘッダー有無での読み込み、行追加、列追加などができる。

Has header

Has headerにチェックを入れると、column1-4となっていた部分が、1行目の値になる。

ボタンをクリックするだけで切り替わり、ファイルの再読み込みが発生しないので気持ちがいい。

Add row / Add column

Add rowを押すと行が、Add columnを押すと列が追加される。 CSVデータへ追記したいときに便利!

Add row / Add columnのとなりにある矢印をクリックすると、現在カーソルが合っている部分の上下、左右に1行追加することができる。下の画像ではid列の1行目にカーソルが合っている状態で下と右を押して行列を追加した。

まとめ

VS Code拡張機能のEdit CSVがとても便利だったので紹介した。 VS Code上でCSVを編集するときに、とても便利だったので機会があれば試してほしい。

2022 振り返り

いまさらとなってしまったが、2022年の振り返りをする。

## 仕事

2022で自分の一番大きな変化といえば転職になる。7月までSIerでデータエンジニアの仕事をし、1ヶ月間有給消化後、9月から引き続きデータエンジニアとして事業会社で仕事をしている。

転職するきっかけとなったことがいくつかあり、前職はSIerだったため、

- 新規構築が多く、運用まで見れることが少ない
- 客先の環境なので、クラウドの権限周りなどの一歩踏み込んだ部分に手が届かない
- 社内での今後の主なキャリアアップがマネジメント活動になってしまう

といった部分があった。これらが、自分の今後のキャリアを考えた時に合っていないと感じ離れることとなった。

転職してからは、毎日を楽しく過ごせている。
初転職だったので色々戸惑うことがあると思っていたがチームメンバに支えられてここまでやってこれた。
知識の面では、SIer時代にやってきたデータ分析基盤の説明などの結果、概念的な考え方が身についていたので、現場に入ってからも構成理解に関してはスムーズだったと思う。
一方で、プログラミングやクラウドに関する部分だと、知識・技能共に足りてはいないのを如実に感じる。このあたりは日々新しいことを学べていること自体に感謝しつつ、戦力になるためにも日々研鑽しているつもりである。

## 生活周り

### 転居

転職後に引っ越しをした。転職先で出社があることもあり、始発終点で座れる場所を選んだ。飲食店も買い物に行くエリアも多く、こちらも大満足となっている。
コロナ禍で地方に移るエンジニアも多く見受けられたのでやってみたい気持ちもあったけど、自分は選択しなかった。実際に移るなら現在の状況と同じぐらいの大阪や博多あたりに住みたい。

### 英会話

転職後の話が続くが、英会話を始めた。ずっとやってみたいなと思っていたり、続かない自己学習には挑戦していたが、お金を出してスクールに通う形で続けている。今のところ、馬が合う先生に巡り会えて、その先生とプライベートレッスンを続けている。英会話を始めた理由としては、今後のキャリアを考えてのことがあった。今後データエンジニアを続けていった場合のキャリアアップとしては、

 1. 現職でより大きいことをやる、それができる企業に移る
 2. ベンダーに移る
 3. 現職での役職を上げる

あたりが候補になると考えた。前職時代に外資のベンダーに移る人が多く、このあたりは自分にも素養がある可能性が高いと思っている。足りない部分でいうと英語になるので、今のうちに手を付けている。
現職でより大きいことをやる、それができる企業に移るに関しても、日本市場だけだと頭打ちになっていく未来が見え、結局は英語が出来て損がない未来にはなるかと思うので、今のうちにやって置きたい気持ちがあった。
もちろん結果につながればいいが、それ自体よりもいま英会話を習っていることが楽しいので、全く問題がないように思っている。

## まとめ

書く書く詐欺をしていた2022年のまとめを書いた。
正直、転職と引っ越しが大きなウェイトを締めすぎていて、他のことが霞んでしまった。それぐらい自分の中では大きいことで、この年は忘れられない年になるんだろうなと言う気がしているし、半年近く経ってもこのことを書いているので、実際にそうなんだろうなという感じである。


### と、ここまでをChatGPTに要約するよう頼んだら次のようになった

2022年に転職してデータエンジニアの仕事をしている。転職のきっかけは前職がSIerで、新規構築が多く、運用まで見れることが少ない、クラウドの権限周りなどの一歩踏み込んだ部分に手が届かない、マネジメント活動になってしまうキャリアアップしかないと感じたことが原因であった。転職後は、毎日を楽しく過ごしており、チームメンバの支えもありやってこれている。また、引っ越しをし、英会話を習得することで、今後のキャリアアップに備えている。


まとまっていてすごい。

GROUP BYで範囲を集計する

SQL の GROUP BY を使うとカラムのデータごとに集計できる。 例えば次のようなデータがあり、GROUP BY を使って SQL を実行する。

実行環境はsqlfiddleで、MySQL5.6を使った。

sqlfiddle.com

データはテストデータ生成サイトで作った。 https://tm-webtools.com/Tools/TestData

野尻 成美, 48
杉江 絢子, 40
古畑 詩音, 47
沢井 金之助, 44
大前 伊都子, 22
大隅 一行, 34
今津 陽菜子, 2
東 遥華, 29
河村 一平, 35
堀川 乃愛, 57

age で GROUP BY すると次のようになり、当然だがデータごとに GROUP BY されるのでデータとしてはバラバラになる。 今回は 10 代、20 代、30 代...と、一定の範囲で GROUP BY したいときにどうするかをメモする。

SELECT
  age,
  count(*) as 人数
FROM
  Table1
GROUP BY
  age
ORDER BY
  age;
age 人数
2 1
22 1
29 1
34 1
35 1
40 1
44 1
47 1
48 1
57 1

1, CASE で分ける

まずは CASE 文を使って分ける方法がある。 考え方含めとてもシンプルに実行できる。

SELECT
  CASE
    WHEN age < 10 THEN '10代以下'
    WHEN age BETWEEN 10
    AND 19 THEN '10代'
    WHEN age BETWEEN 20
    AND 29 THEN '20代'
    WHEN age BETWEEN 30
    AND 39 THEN '30代'
    WHEN age BETWEEN 40
    AND 49 THEN '40代'
    WHEN age >= 50 THEN '50代以上'
    ELSE NULL
  END AS 年代,
  count(*) as 人数
FROM
  Table1
GROUP BY
  CASE
    WHEN age < 10 THEN '10代以下'
    WHEN age BETWEEN 10
    AND 19 THEN '10代'
    WHEN age BETWEEN 20
    AND 29 THEN '20代'
    WHEN age BETWEEN 30
    AND 39 THEN '30代'
    WHEN age BETWEEN 40
    AND 49 THEN '40代'
    WHEN age >= 50 THEN '50代以上'
    ELSE NULL
  END
ORDER BY
  age;
年代 人数
10 代以下 1
20 代 2
30 代 2
40 代 4
50 代以上 1

2, 計算をしてグループを作り出す

仮に年代を CASE 文で書くと、最大でも 10 代から 100 代までとなるが、もっと範囲が多いものになると辛くなる。 そういった場合は計算を使ってグループを作り出したほうが便利になる。

今回の年代のケースだと 10 の位があれば分類できるので、年齢を 10 で割り FLOOR で小数点になった 1 の位を切り落とし、GROUP BY している。 SELECT 文では# 1, CASE で分けると表記を合わせるために工夫しているが、基本的には次のようになる。

SELECT
  CONCAT(
    CASE
      FLOOR(age / 10) * 10
      WHEN 0 THEN 10
      ELSE FLOOR(age / 10) * 10
    END,
    '代'
  ) AS 年代,
  COUNT(*) AS 人数
FROM
  Table1
GROUP BY
  FLOOR(age / 10)
ORDER BY
  age;
年代 人数
10 代 1
20 代 2
30 代 2
40 代 4
50 代 1

Pythonのtranslateを使った文字の置換

Pythontranslateメソッドを使うとmaketransメソッドから生成される変換表を元に文字を置換できる。

適当な文字列を作成した。日本語の文章にカンマ、ピリオドなどが入っており、これを句読点に置き換えてみる。

greeting = '''
***
おはようございます,本日は下記を対応します.

 - 書類提出
 - ドキュメント読み込み
 - サンプル作成

 今日も、よろしくお願いいたします.
***
***
おはようございます、本日は下記を対応します。

 ・ 書類提出
 ・ ドキュメント読み込み
 ・ サンプル作成

 今日も、よろしくお願いいたします。
***

maketrans の第 3 引数にも指定する

文頭と文末に*が入っていて不要に見えるので、この置換作業の中で削除してみる。 maketransメソッドの 3 つ目の引数に指定して、削除されることも確認する。

greeting = '''
***
おはようございます,本日は下記を対応します.

 - 書類提出
 - ドキュメント読み込み
 - サンプル作成

 今日も、よろしくお願いいたします.
***
'''

print(greeting.translate(str.maketrans('.,-','。、・','*')))
おはようございます、本日は下記を対応します。

 ・ 書類提出
 ・ ドキュメント読み込み
 ・ サンプル作成

 今日も、よろしくお願いいたします。

maketrans の指定文字を片方の引数だけ増やしてみる

第 2 引数に#を追加すると、第 1 引数との数が一致していないのでエラーになる。

greeting = '''
***
おはようございます,本日は下記を対応します.

 - 書類提出
 - ドキュメント読み込み
 - サンプル作成

 今日も、よろしくお願いいたします.
***
'''

print(greeting.translate(str.maketrans('.,-', '。、・#', '*')))
      1 greeting = '''
      2 ***
      3 おはようございます,本日は下記を対応します.
   (...)
     10 ***
     11 '''
---> 13 print(greeting.translate(str.maketrans('.,-', '。、・#', '*')))

ValueError: the first two maketrans arguments must have equal length