よしたく blog

ほぼ週刊で記事を書いています

DataformCLIの2系を使ってBQに接続するまでの手順

2024 現在の Dataform は、Dataform 3 系の開発とttps://docs.dataform.co/(存在しない URL)にあった Docs の移行がおこなわれている。 最近 2 系の Docs にすぐ当たれない体験が増えてきたので、備忘録として残しておくー。

今回は Dataform CLI を使って、BQ に接続するまでの手順になる。

バージョン

$ node --version
v20.11.1
$ dataform --version
2.9.0

DataformCLI をインストールする

npm i -g @dataform/cli@^2.9.0

Dataform プロジェクトを初期化する

docs には、dataform initを実行すると dataform プロジェクトを初期化できると書いてあったが、BigQuery を使った Dataform プロジェクトを初期化するためには、下記のコマンドとオプションが必要になる。今回は sandbox-dataform ディレクトリの下で実行していくことにする。

$ pwd
/Users/yoshitaku/workspace/sandbox-dataform

$ dataform init bigquery <YOUR_DIRECTORY> --default-database <YOUR_BIGWUERY_PROJECT> --default-location <YOUR_BQ_LOCATION>

コマンドが成功すると、下記のディレクトリとファイルが作成される。

Writing project files...

Directories successfully created:
  /Users/yoshitaku/workspace/sandbox-dataform/dataform
  /Users/yoshitaku/workspace/sandbox-dataform/dataform/definitions
  /Users/yoshitaku/workspace/sandbox-dataform/dataform/includes
Files successfully written:
  /Users/yoshitaku/workspace/sandbox-dataform/dataform/dataform.json
  /Users/yoshitaku/workspace/sandbox-dataform/dataform/package.json
  /Users/yoshitaku/workspace/sandbox-dataform/dataform/.gitignore
NPM packages successfully installed.

また、@dataform/core もインストールされている。

$ cat dataform/package.json
{
    "dependencies": {
        "@dataform/core": "2.9.0"
    }
}

クレデンシャルファイルの生成

dataform init-credsコマンドを実行すると、実行環境へ接続するためのクレデンシャルファイルを生成することができる。 しかしここでも、このコマンドのままでは実行できず、末尾に bigquery を付ける必要があるdataform init-creds bigquery

実行すると対話式で必要情報を入力していく。聞かれることは下記の 2 つになる。

  • 接続する BigQuery が存在している location(おそらく asia-northeast1 を選択することが多い)
  • Google Cloud のプロジェクト ID

無事に完了すると Dataform CLI から、BigQuery に接続する準備ができたことになる。

サンプルファイルの作成と実行

次のコマンドを実行し、サンプルの SQL を作成する。なお、このコマンドは Dataform の公式 Docs に載っていたコマンドになる。

echo "config { type: 'view' } SELECT 1 AS test" > definitions/example.sqlx

dataform には dry-run のコマンドがあるので実行する。 無事に設定が完了しているとコンパイルが実行され、プレビュー実行される。

$ dataform run --dry-run
Compiling...

Compiled successfully.

Dry run (--dry-run) mode is turned on; not running the following actions against your warehouse:

1 dataset(s):
  dataform.example [table]

ちなみに、生成したクレデンシャルファイルを適当なディレクトリへ移動させると dry-run は失敗する。

$ dataform run --dry-run
Compiling...

Compiled successfully.

Dataform encountered an error: Missing credentials JSON file; not found at path '/Users/yoshitaku/workspace/sandbox-dataform/dataform/.df-credentials.json'

supabaseで全件削除するための小技

supabase の Python ライブラリであるsupabase-pyでは、2024-03-06 時点で全件削除できるような実装はないらしい。 そこで擬似的に全件削除のようなことをしたいときにはひと工夫する必要がある。

supabase.com

削除するときのフィルター条件でneqを使って、idが0では無い行を全て削除するようにすれば実現できる。

from supabase import create_client, Client

url = "YOUR_URL"
key = "YOUR_KEY"
supabase = create_client(url, key)

data, count = supabase.table('YOUR_TABLE').delete().neq('id', 0).execute()

Google Cloud Next Tokyo ’23 Day2に行った📝

DMM における AWS から BigQuery へのデータ基盤移行 / 合同会社DMM.com

背景

1500テーブル ほぼ日時処理 biから投げられる30000クエリ/ week 1500mau 新規テーブル・カラムの別作業が5回/week

課題

  • オペの省力化ができていないため、活用拡大にリソースを割きにくい
  • ガバナンス
  • データ活用の拡大
    • 今まではデータは社内利用に留まっていたが、広告システムなどの社外利用に拡大したかった

狙い

  • サーバレスにすること
  • Google製品に連携が用意できること
    • サーチコンソール
    • ADs

基盤移行の流れとポイント

  • データ同期
    • S3toGCSはSTSを使って同期している
    • GCStoBQはGCSの変更検知を使ってイベントドリブンに対象範囲を絞って更新している
  • データの差分検知
    • AWSとBQで全レコードのmd5を見て差分チェックしていた

information schemaでみたら全体で3.5兆レコードあったらしい

質問

  • 過去分のデータはどうしたか。一時的に二つの環境にデータがあることになるはず?
    • 現時点で、もうS3はサポートしない旨を言っていていずれは消す。
    • サービスからはデータをGCSに直接送ってもらっている
  • GCSに置いてあるデータのコスト削減とかは何をしているか
    • 外部テーブル群はデータセット単位で見せないようにしてる
    • また、日にちで区切ってライフサイクル化してる
      • bqに取り込んだ後はストレージクラスを下げたり、消してしまったり

BigQuery のデータ品質やデータ活用を高める Dataplex 等の活用 / GO株式会社

基盤利用者は100人 / week

  • 課題
    • 利用者がすぐにデータ活用できない
    • 障害や仕様変更の調査が大変
    • いつのまにかデータが壊れている
  • DataplexとDatahubで課題解決
  • Dataplex
    • データ周りをまとめた色々製品
      • この中にデータ品質管理がある
      • テスト定義をSQLに変換してテストできたりする
      • レポート配信などで使われる少数の手ブルに導入した
  • Datahub
    • Data CatalogがBQとLookerに対応していないから採用したが、バックエンド技術などがオーバースペックで微妙らしい

質問: - データのテストはどうやってるか - 辞書テストや範囲テスト、鮮度テストのやり方? - また、この時のSQLファイルはどうしているのか - Dataplexでできちゃうよ - データマートのメタデータの管理はどうしているか - データソース側はinformationからとってきていることはわかったけど、手動でつけてる? - DataHubでできちゃうよ - 発表にはなかったが、権限管理はされてる?どうしてる? - 今は全部見せる運用。これからやるならDataplexのデータメッシュで切ってやりたい

--

Dataform で BigQuery データパイプラインをより効率的に / Google Cloud

Goさんの発表で出てきたDataformが気になったので、追加で見てきた。 機能紹介と簡単な事例紹介だった。

Google Cloud Next Tokyo ’23 Day1に行った📝

基調講演

  • ハルシレーションを抑えるグラウンディングをし、事実に基づく生成AIにする
  • Vertexがより協力になっていく話
  • data residency (今日サービス発表)(drz)
  • vertex ai searchの日本語提供
  • Googleワークスペースのduet aiが強力になっていく話
    • Meetでの自動翻訳
    • 会議の途中経過を要約して教えてくれる
      • 質問して、詳細部分を回答をしてくれたりもする
  • duet ai in google cloud
    • cloud station
    • log explorer自然言語でエラー内容を説明してくれる
    • どう治したらいいかも教えてくれる
    • Looker studio pro
      • 自然言語でグラフを生成してくれる
        • そこからスライド作成もできる
    • Database migration
      • oracleからAlloy dbへ
      • duet aiで自動で変換してくれたり、ダメなところは確認が必要だと提供してくれたりする
    • Memorystore for redis clusterがnew

生成AIの話が多く、Google製品とのコラボレーションが熱くなっていきそうだった。 知らない製品多めでVertex AIもあまり知らなかったので、色々できること増えていきそうだなと思ったけど、自社に組み込むのはまだまだ先な感じがあるかなぁ


任天堂のデータ分析基盤 / ニンテンドーシステムズ株式会社

データ規模 アカウント3.3億ほど 900テーブル

  • 他プロジェクトのBigQueryを見るときは、承認済みデータセットを経由して参照やコピーをしている。
    • 承認済みデータセットは2段階に分けて承認してる
  • 元々はdatafusionをつかっていたが、今はdata transformはbatchを
  • 変換はcomposerでdbtを使っている。
  • データセットごとにdbtを用意してる。プロジェクト間は別で定義
  • bqは20000スロット
  • bigquery data transfer service
  • 開発中のデータソースとbqを統計値を使ってデータ比較検証
  • data visialasionを使っている
  • フォルダ単位でプロジェクトを分けている
  • terraformとGHEをつかって権限管理している
    • GHAで定期的に実環境とのドリフトを検出
    • 構成情報を社内wikiに連携
    • 利用者の権限洗い出しをおこなっている
  • インフォメーションスキーマで利用状況をチェックしてる
    • スロークエリを見つけて対処している
    • ex, 集計済みテーブルがあるのに生テーブル見てたり、ジョインするテーブルがデカすぎる

質問したかった内容: terraformでの権限管理はどうしている? スロークエリを気付ける仕組みは? 今後の展望でのデータカタログの充実をどのようにやっていこうと思っているか 今後の展望でのデータ制度の向上をどのようにやろうと思っているか

いろいろ質問事項あったのに、Ask The Spreakerはなかった...

—-

製造業での Cloud Run を中心としたシステム開発コラボレーション事例 / 株式会社LIXIL

  • cloud shellのteachmeコマンド知らなかった
  • デモ動画でのハンズオンであった

—-

ゴーゴーカレーのデジタル進化: Google の技術を中心とした新たな航路 / 株式会社ゴーゴーカレーグループ

  • ゴーゴーカレーのビジネスについてのトーク
    • いろんな販路があってすごかった
  • SmartHR、MoneyForward、OKRや1on1も実施しているらしい これからアプリの開発もしていく
  • デジタル方針はGoogle中心主義になることを掲げている。
    • Google Cloudのみをデータセンターとして活用
    • AppSheet
  • Google以外はSaaSを使う
    • バックオフィスはMoneyForward

gcloud auth loginとapplication-default loginの違いを整理した

記事の概要

Google Cloud SDKの認証コマンド、`gcloud auth login` と `gcloud auth application-default login` の違いについてまとめた。

gcloud auth login

目的

`gcloud` コマンドラインツールを使用するユーザー自身を認証するためのコマンドになる。

用途

  • 個人の開発マシンでの作業
  • 手動での `gcloud` コマンドの実行

具体例

Google Cloud Platform上で仮想マシンを手動で作成したい場合、以下のコマンドを使用して自分自身を認証する。

gcloud auth login

その後、以下のようなコマンドで仮想マシンを作成することができる。

gcloud compute instances create INSTANCE_NAME

gcloud auth application-default login

概要

特定のアプリケーションがGoogle Cloudのリソースにアクセスできるかどうかを確認するための認証。アプリケーションは、バックエンドのサービスやスクリプトなど、人の介入なしに動作するものを指す。

具体例

  1. WebアプリケーションがCloud Storageから画像を取得: ユーザーにプロフィール画像を表示するWebアプリケーションが、Cloud Storageのバケットから画像を自動的に取得する場合。
  2. ログ処理スクリプト: 一日の終わりに、Compute EngineのインスタンスからログファイルをBigQueryに自動的にアップロードするスクリプト
  3. 定期的なデータバックアップ: Cloud Schedulerを使用して、Cloud SQLのデータベースを定期的にバックアップするスクリプト
  4. 自動的なリソースのスケーリング: Cloud Pub/Subのメッセージキューのサイズに基づいて、App Engineのインスタンス数を自動的にスケーリングするアプリケーション。

具体的なコマンド

gcloud auth application-default login

違い

  • `gcloud auth login` はユーザーの認証のため、`gcloud auth application-default login` はアプリケーションの認証のために使用される。
  • `gcloud auth login` は主に手動の操作のため、`gcloud auth application-default login` はアプリケーションの開発の際に使用される。

転職して1年を振り返る

はじめに

2022の振り返りと称した、半年の振り返りはこれになる。

yoshitaku-jp.hatenablog.com

1年間での主な業務と成果

主に - ETL処理を組む - バッチ処理の面倒をみる - 集計依頼に対応する - データ基盤の速度改善 - データ基盤のコスト削減 を実施している。

定常業務的なものは慣れてきてこなせるようになってきた。 また自動化出来そうな部分があれば、自動化にも着手していて改善活動にも着手できている。

学びと成長

ドメイン知識以外で新しく吸収できたことはGCPとTerraform周りになる。 以前はAzureのクラウドコンソール画面からポチポチと選択してお仕事をしていたが、GCPとTerraformが導入されていることもあり、そちらを0からキャッチアップしていった。 自分で0から作る経験と、すでにあるものを改善する経験もし、慣れてきた実感がある。

Pythonで簡単なアプリケーションを作る経験もした。今まで個人的なスクリプトを書くことしかしてこなかったので、指摘される部分が多く、念入りにレビューしてもらった。チームメンバには感謝している。このあたりは次回以降生かしていく。

挑戦と困難

なれることに精一杯で挑戦という挑戦はしてこなかったことは認識している。このあたりは次の年の課題だと思っている。日々出会うことが新しいことだったので、こんなんだとは思っていないが日々困難では合ったかもしれない。プレッシャーに強いタイプではないので、期間が短かったり、先が見通せないタスクだと尻込みをしてしまう面は合ったと自覚している。このあたりも課題である。

1年後の自分への評価

データエンジニアとしてもっと成長していたいので、OKRでいい感じの目標を立ててやっていくつもり。今の段階ではそれぐらいのことしか思っていない。

今後の目標と展望

転職してなれることに精一杯だったので、最近ふとした時に目標や展望がないことに不安になっている(まだまだ出来ないことが多いので、それらのキャッチアップはやっていくとして)。 考えても始まらないのでとりあえず手を出すかと考えていて、プライベートの時間を使って新しく学ぶ習慣は崩れてしまっているので、まずはこれを取り戻したいと思っている。そのための振り返りブログだったりもする。

また、世界も落ち着いてきたので、エンジニア仲間とと直接の交流を増やして、エネルギーを吸収したいと思っている日々である。

おわりに

one secアプリがSNSの時間消費抑えに良かった

SNSの時間を抑えたいなと思っていたところone secにたどり着き、使ってみたところとても良かったので紹介します。

tl, dr

  • SNSの時間消費にone secがよかった
  • 対象のアプリが起動すると、one secの妨害が始まる
  • 妨害のカスタマイズもできる

one sec とは

one sec
長期的な、ソーシャルメディアの妨害から自由になる
ソーシャルメディアアプリの起動時にone secはあなたに深呼吸を吐かせます。これはシンプルかつ効果的で、摩擦が邪魔なアプリの魅力を削ぎます。

one-sec.app

紹介文にもあるように、SNSの使いすぎ防止を回避する目的で作られました。 アプリはApp StoreGoogle Playで提供されています。無課金でも1個のアプリに妨害設定できるので、まずはインストールしてみるのがいいんじゃないでしょうか。

妨害の設定 - iOS

自分がiPhoneを使っているのでiOSでの設定を紹介します。 iOSでは、one secとショートカットアプリを使って妨害の設定をすることになります。設定の解説は動画があるので、わかりやすく設定することができるかと思います。

www.youtube.com

one secの妨害の種類

妨害の種類は6つあって、

  • 呼吸エクササイズ
  • 呼吸(ミニマム)
  • 反転
    • AppleWatchの呼吸のような動きをして、開く前に一度間を置かせる
    • 深呼吸が終わったあとに、今まで何回開こうとしたかが画面に出て萎える
    • 反転は呼吸と同じ動きをしているので、このカテゴリに入れておく
      • よくわかっていない
  • ドットを目で追う
    • 目で追うとかいてあるけど、実際には輪っかをタップし続けて一定時間追いかけ続けると解除できる
  • ブラックスクリーン
    • 画面が一定時間真っ暗になって、その後開ける
    • 真っ黒になると自分の顔が打つづので冷静になる
  • スマホの画面を回転させる
    • デフォルトだと3回スマホ本体を回転させると開くことができる
    • 公共の場だとスマホを回転させる変な人に見られたくないので、結果的に開かなくなる

です。

また、これらはカスタマイズができ、呼吸している秒数を好みの秒数に伸ばせたり、開こうとしたn回分だけ数を増やすようなこともできます。

自分もn回分増やすような機能を発見したとき、より制御できるんじゃないかと思って設定を入れましたが、DMで何往復か連絡をしたいときとか急なときに不便だったので、一旦デフォルト設定に戻してしまいました。アプリによっては、例えばゲームとかは、n回分増やす設定を入れてもいいかもしれないです。

まだ使い切れていないけど、気づいている機能

まだ使い切れていないけど、気にはなっている機能がいくつかあります。誰か使いこなせていたら教えてほしいー。

まとめ

今現在、自分は課金もしています。 妨害設定にしているアプリは次の3つです。

  • Twitter
    • 言わずもがな
    • ダラダラ見てしまうので設定
  • Instagram
    • そこまで見てはいなかったけど、Twitterの後に巡回している気がしたので設定
  • Slack
    • お仕事に関する通知を見に行きまくっていたので設定
    • お仕事中はPCから見るので、プライベートの時間に見ないようになった

という感じです。

one secを入れてから、

  • 本を読む時間が増えた
  • 気になっていたマンガを読破した
  • 勉強時間が増えた
  • 調べ物する時間が増えた

などがあります。 家の中や移動中でもダラダラSNSを見る時間が減り、自分が気になっていたことを消化できていて、満足度は結構高いです。 ぜひ使ってみてくださいー

GitHub ActionsでSQLFluffを実行する

SQLのLinterであるSQLfluffをCI /CDで回したかったので、GitHub Actionsで試してみた。今回は

  • PR上でCIが回る
  • checkout時のデータ量削減で、fetch-depth: 1の設定
  • リポジトリ内の全てのSQLファイルにLinterを実行するため、find . -name '*.sql' -type f | xargs -I {} sqlfluff lint {} --dialect bigquery
    • sqlfluff lint **/*.sqlで全て実行できるかと思ったが、うまく実行できなかった

とした。

sql_lint.yaml

name: SQLFluff

on: [pull_request]

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: "actions/checkout@v3"
        with:
          fetch-depth: 1
          ref: ${{ github.head_ref }}

      - uses: "actions/setup-python@v2"
        with:
          python-version: "3.8"

      - name: install sqlfluff
        run: pip install sqlfluff
        
      - name: run sqlfluff
        run: find . -name '*.sql' -type f | xargs -I {} sqlfluff lint {} --dialect bigquery

BigQueryのメタデータからDDL文を確認する

BigQueryのメタデータからDDL文を確認する方法を調べた。以前、過去に作ったテーブルのDDL文がわからず困った時に役立った。 INFORMATION_SCHEMA.TABLESddlカラムにDDL文が存在している。

SELECT
  table_name, ddl
FROM
  `<project_id>.<dataset_name>.INFORMATION_SCHEMA.TABLES`
 WHERE
  table_name = '<target_table>'
;

BigQueryのpublic-dataからbaseballデータセットのschedulesテーブルのDDL文を確認したいときは次のようになる。

SELECT
  table_name, ddl
FROM
  `bigquery-public-data.baseball.INFORMATION_SCHEMA.TABLES`
 WHERE
  table_name = 'schedules'

出力結果

table_name ddl
schedules CREATE TABLE bigquery-public-data.baseball.schedules
(
gameId STRING,
gameNumber INT64,
seasonId STRING,
year INT64,
type STRING,
dayNight STRING,
duration STRING,
duration_minutes INT64,
homeTeamId STRING,
homeTeamName STRING,
awayTeamId STRING,
awayTeamName STRING,
startTime TIMESTAMP,
attendance INT64,
status STRING,
created TIMESTAMP
);

【Python】ETLツールのLuigiをさわってみた

前回に引き続き、ETLツールをいろいろさわってみたいと思ったところから、今回はPython製のETLツールLuigiをさわってみた。

インストール

pip install luigi

バージョン確認

$ python --version
Python 3.9.1

luigi 3.2.0

サンプルコード

import luigi


class Hello(luigi.Task):
    def run(self):
        out = self.output()
        with out.open("w") as f:
            f.write("Hello")

    def output(self):
        return luigi.LocalTarget("hello.txt")


class World(luigi.Task):
    def requires(self):
        return Hello()

    def run(self):
        input_ = self.input()
        output = self.output()

        out = self.output()
        with out.open("w") as f:
            f.write("World")

    def output(self):
        return luigi.LocalTarget("World.txt")


def main():
    luigi.run(main_task_cls=World, local_scheduler=True)


if __name__ == "__main__":
    main()

実行結果

無事にHelloとWorldが出力されている。

$ python hello_world.py
DEBUG: Checking if World() is complete
DEBUG: Checking if Hello() is complete
INFO: Informed scheduler that task   World__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Hello__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running   Hello()
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done      Hello()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Hello__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running   World()
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done      World()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   World__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Hello()
    - 1 World()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

luigiの感想

Prefectはタスクとフローという概念があった。

yoshitaku-jp.hatenablog.com

luigiは、Worldクラスの中でHelloをrequires の形で呼んでいるところから、タスクを連続で擬似的なフローを実行していく思想で構成されているように感じた。個人的には、一つの処理の流れがまとまって見えたほうが見通しが良いと感じるので、luigiのWorldタスクの中でHelloタスクを参照している形は処理の流れが追いくくなるので苦手意識を感じた。

一方で、タスクの実行履歴コントロールをファイル出力で管理している点は個人的には手軽で良いと感じた。サンプルコードではoutput関数を呼んでいる部分になる。実行後のディレクトリ配下は次のようになっている。

$ ls
Pipfile  Pipfile.lock  World.txt  hello.txt  hello_world.py

実行履歴のコントロールは実際の運用面を考えてくると出る問題であるが、ファイル出力ができるということはクラウドのストレージを選べるということであり、安く運用することができる可能性が高まる。また、使いやすさの面でもブラウザ上からストレージ操作することも可能になるので、RDBで管理するよりも運用のハードルも下がると感じる。