Luigi入門

仕事でバッチ処理を作成する機会が多々あるものの 何らかのFrameworkを使用していないので調査!

仕様を把握して今後使用していきたい。

Luigiとは

Luigi

複雑なパイプライン構築を支援するPythonパッケージ。
Spotifyが作成したパッケージ。
依存関係の分離や、ワークフロー管理、ビジュアライゼーション、Failureハンドリング、CLIなど様々な便利機能がある。

ワークフロー構築

基本的な構成要素は以下の2つ

  • Taskクラス
  • Targetクラス

抽象クラスで、いくつかメソッドが実装されることが想定されている。

タスクの実行方法を決める概念

  • Prameterクラス

Targetクラス

Targetクラスは以下のような中間生成物を作成するチェックポイントに対応する

  • ディスクのファイル
  • HDFSのファイル
  • データベースのエントリ

ターゲットが存在する場合に、Trueを返すexistsメソッドを実装しなければならない

便利なツールが用意されているので、実際にはTargetのサブクラスを実装することはほぼない。

  • luigi.LocalTarget
  • luigi.HdfsTarget
  • luigi.contrib.s3.S3Target
  • luigi.contrib.ssh.RemoteTarget
  • luigi.contrib.mysqldb.MySqlTarget
  • …etc

open()メソッドを実装していて、読み込み(mode=r)、書き込み(mode=w)が可能なストリームオブジェクトを返す。
Gzipもサポートしている。

Taskクラス

Taskクラスは計算を行うクラス。
振る舞いを変えるrun(), output(), require()メソッドを実装できる。

Taskでできること

  • 他のTaskによって出力されたTargetを利用する
  • Targetを出力する
  • require()メソッドでTask間の依存関係を定義できる
  • output()メソッドで出力を定義できる
  • 各Task間の依存関係となるTargetクラスを返すinput()メソッド

Parameterクラス

実行されるTaskクラスのジョブで、何らかのパラメータ化したいときに使う。日付など。

サンプル

Taskサンプル

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import luigi

class MyTask(luigi.Task):
    # パラメータ
    param = luigi.Parameter(default=42)

    def require(self):
        """タスク間の依存関係
        """
        return SomeOtherTask(self.param)
    
    def run(self):
        """ビジネスロジック
        """
        f = self.output().open("w")
        print("hello world", file=f)
        f.close()
    
    def output(self):
        return luigi.LocalTarget("/tmp/foo/bar-%s.txt" % self.param)

if __name__ == "__main__":
    luigi.run()

.pklなどのバイナリファイルの場合は、format=Nopを記述する必要がある。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class GenerateWords(luigi.Task):

    def output(self):
        return luigi.LocalTarget('words.pckl', format=Nop)

    def run(self):
        import pickle

        # write a dummy list of words to output file
        words = [
                'apple',
                'banana',
                'grapefruit'
                ]

        with self.output().open('w') as f:
            pickle.dump(words, f)

input()の使い方

1
2
3
4
5
6
7
8
class TaskWithManyInputs(luigi.Task):
    def requires(self):
        return {'a': TaskA(), 'b': [TaskB(i) for i in xrange(100)]}

    def run(self):
        # .input()はrequiresのTaskオブジェクトのTargetオブジェクトがreturnが返される
        f = self.input()['a'].open('r')
        g = [y.open('r') for y in self.input()['b']]

タスク状況を知りたい場合に、GUIでプログレスバーを出すことも可能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class MyTask(luigi.Task):
    def run(self):
        # set a tracking url
        self.set_tracking_url("http://...")

        # set status messages during the workload
        for i in range(100):
            # do some hard work here
            if i % 10 == 0:
                self.set_status_message("Progress: %d / 100" % i)
                # displays a progress bar in the scheduler UI
                self.set_progress_percentage(i)

pythonで実行する場合

サンプル

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class MyTask1(luigi.Task):
    x = luigi.IntParameter()
    y = luigi.IntParameter(default=0)

    def run(self):
        print(self.x + self.y)


class MyTask2(luigi.Task):
    x = luigi.IntParameter()
    y = luigi.IntParameter(default=1)
    z = luigi.IntParameter(default=2)

    def run(self):
        print(self.x * self.y * self.z)


if __name__ == '__main__':
    luigi.build([MyTask1(x=10), MyTask2(x=15, z=3)])

windowsは何か注意事項があるみたい。
Luigi on Windows

最終更新 Feb 18, 2024 18:26 +0900
Built with Hugo
テーマ StackJimmy によって設計されています。