【Python】ETLツールのLuigiをさわってみた
前回に引き続き、ETLツールをいろいろさわってみたいと思ったところから、今回はPython製のETLツールLuigiをさわってみた。
インストール
pip install luigi
バージョン確認
$ python --version Python 3.9.1 luigi 3.2.0
サンプルコード
import luigi class Hello(luigi.Task): def run(self): out = self.output() with out.open("w") as f: f.write("Hello") def output(self): return luigi.LocalTarget("hello.txt") class World(luigi.Task): def requires(self): return Hello() def run(self): input_ = self.input() output = self.output() out = self.output() with out.open("w") as f: f.write("World") def output(self): return luigi.LocalTarget("World.txt") def main(): luigi.run(main_task_cls=World, local_scheduler=True) if __name__ == "__main__": main()
実行結果
無事にHelloとWorldが出力されている。
$ python hello_world.py DEBUG: Checking if World() is complete DEBUG: Checking if Hello() is complete INFO: Informed scheduler that task World__99914b932b has status PENDING INFO: Informed scheduler that task Hello__99914b932b has status PENDING INFO: Done scheduling tasks INFO: Running Worker with 1 processes DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 2 INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running Hello() INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done Hello() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task Hello__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Pending tasks: 1 INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) running World() INFO: [pid 14403] Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) done World() DEBUG: 1 running tasks, waiting for next task to finish INFO: Informed scheduler that task World__99914b932b has status DONE DEBUG: Asking scheduler for work... DEBUG: Done DEBUG: There are no more tasks to run at this time INFO: Worker Worker(salt=2023197516, workers=1, host=yoshitakuMBA.local, username=yoshitaku, pid=14403) was stopped. Shutting down Keep-Alive thread INFO: ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 Hello() - 1 World() This progress looks :) because there were no failed tasks or missing dependencies ===== Luigi Execution Summary =====
luigiの感想
Prefectはタスクとフローという概念があった。
luigiは、Worldクラスの中でHelloをrequires
の形で呼んでいるところから、タスクを連続で擬似的なフローを実行していく思想で構成されているように感じた。個人的には、一つの処理の流れがまとまって見えたほうが見通しが良いと感じるので、luigiのWorldタスクの中でHelloタスクを参照している形は処理の流れが追いくくなるので苦手意識を感じた。
一方で、タスクの実行履歴コントロールをファイル出力で管理している点は個人的には手軽で良いと感じた。サンプルコードではoutput関数を呼んでいる部分になる。実行後のディレクトリ配下は次のようになっている。
$ ls Pipfile Pipfile.lock World.txt hello.txt hello_world.py
実行履歴のコントロールは実際の運用面を考えてくると出る問題であるが、ファイル出力ができるということはクラウドのストレージを選べるということであり、安く運用することができる可能性が高まる。また、使いやすさの面でもブラウザ上からストレージ操作することも可能になるので、RDBで管理するよりも運用のハードルも下がると感じる。