よしたく blog

ほぼ週刊で記事を書いています

【Python】ETLツールのLuigiをさわってみた

前回に引き続き、ETLツールをいろいろさわってみたいと思ったところから、今回はPython製のETLツールLuigiをさわってみた。

インストール

pip install luigi

バージョン確認

$ python --version
Python 3.9.1

luigi 3.2.0

サンプルコード

import luigi


class Hello(luigi.Task):
    def run(self):
        out = self.output()
        with out.open("w") as f:
            f.write("Hello")

    def output(self):
        return luigi.LocalTarget("hello.txt")


class World(luigi.Task):
    def requires(self):
        return Hello()

    def run(self):
        input_ = self.input()
        output = self.output()

        out = self.output()
        with out.open("w") as f:
            f.write("World")

    def output(self):
        return luigi.LocalTarget("World.txt")


def main():
    luigi.run(main_task_cls=World, local_scheduler=True)


if __name__ == "__main__":
    main()

実行結果

無事にHelloとWorldが出力されている。

$ python hello_world.py
DEBUG: Checking if World() is complete
DEBUG: Checking if Hello() is complete
INFO: Informed scheduler that task   World__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Hello__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running   Hello()
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done      Hello()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Hello__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running   World()
INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done      World()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   World__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 Hello()
    - 1 World()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

luigiの感想

Prefectはタスクとフローという概念があった。

yoshitaku-jp.hatenablog.com

luigiは、Worldクラスの中でHelloをrequires の形で呼んでいるところから、タスクを連続で擬似的なフローを実行していく思想で構成されているように感じた。個人的には、一つの処理の流れがまとまって見えたほうが見通しが良いと感じるので、luigiのWorldタスクの中でHelloタスクを参照している形は処理の流れが追いくくなるので苦手意識を感じた。

一方で、タスクの実行履歴コントロールをファイル出力で管理している点は個人的には手軽で良いと感じた。サンプルコードではoutput関数を呼んでいる部分になる。実行後のディレクトリ配下は次のようになっている。

$ ls
Pipfile  Pipfile.lock  World.txt  hello.txt  hello_world.py

実行履歴のコントロールは実際の運用面を考えてくると出る問題であるが、ファイル出力ができるということはクラウドのストレージを選べるということであり、安く運用することができる可能性が高まる。また、使いやすさの面でもブラウザ上からストレージ操作することも可能になるので、RDBで管理するよりも運用のハードルも下がると感じる。