仕事でバッチ処理を作成する機会が多々あるものの
何らかのFrameworkを使用していないので調査!
仕様を把握して今後使用していきたい。
Luigiとは
Luigi
複雑なパイプライン構築を支援するPythonパッケージ。
Spotifyが作成したパッケージ。
依存関係の分離や、ワークフロー管理、ビジュアライゼーション、Failureハンドリング、CLIなど様々な便利機能がある。
ワークフロー構築
基本的な構成要素は以下の2つ
抽象クラスで、いくつかメソッドが実装されることが想定されている。
タスクの実行方法を決める概念
Targetクラス
Targetクラスは以下のような中間生成物を作成するチェックポイントに対応する
- ディスクのファイル
- HDFSのファイル
- データベースのエントリ
ターゲットが存在する場合に、Trueを返すexistsメソッドを実装しなければならない
便利なツールが用意されているので、実際にはTargetのサブクラスを実装することはほぼない。
- luigi.LocalTarget
- luigi.HdfsTarget
- luigi.contrib.s3.S3Target
- luigi.contrib.ssh.RemoteTarget
- luigi.contrib.mysqldb.MySqlTarget
- …etc
open()
メソッドを実装していて、読み込み(mode=r
)、書き込み(mode=w
)が可能なストリームオブジェクトを返す。
Gzipもサポートしている。
Taskクラス
Taskクラスは計算を行うクラス。
振る舞いを変えるrun()
, output()
, require()
メソッドを実装できる。
Taskでできること
- 他のTaskによって出力されたTargetを利用する
- Targetを出力する
require()
メソッドでTask間の依存関係を定義できる
output()
メソッドで出力を定義できる
- 各Task間の依存関係となるTargetクラスを返す
input()
メソッド
Parameterクラス
実行されるTaskクラスのジョブで、何らかのパラメータ化したいときに使う。日付など。
サンプル
Taskサンプル
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import luigi
class MyTask(luigi.Task):
# パラメータ
param = luigi.Parameter(default=42)
def require(self):
"""タスク間の依存関係
"""
return SomeOtherTask(self.param)
def run(self):
"""ビジネスロジック
"""
f = self.output().open("w")
print("hello world", file=f)
f.close()
def output(self):
return luigi.LocalTarget("/tmp/foo/bar-%s.txt" % self.param)
if __name__ == "__main__":
luigi.run()
|
.pkl
などのバイナリファイルの場合は、format=Nop
を記述する必要がある。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
class GenerateWords(luigi.Task):
def output(self):
return luigi.LocalTarget('words.pckl', format=Nop)
def run(self):
import pickle
# write a dummy list of words to output file
words = [
'apple',
'banana',
'grapefruit'
]
with self.output().open('w') as f:
pickle.dump(words, f)
|
input()
の使い方
1
2
3
4
5
6
7
8
|
class TaskWithManyInputs(luigi.Task):
def requires(self):
return {'a': TaskA(), 'b': [TaskB(i) for i in xrange(100)]}
def run(self):
# .input()はrequiresのTaskオブジェクトのTargetオブジェクトがreturnが返される
f = self.input()['a'].open('r')
g = [y.open('r') for y in self.input()['b']]
|
タスク状況を知りたい場合に、GUIでプログレスバーを出すことも可能。
1
2
3
4
5
6
7
8
9
10
11
12
|
class MyTask(luigi.Task):
def run(self):
# set a tracking url
self.set_tracking_url("http://...")
# set status messages during the workload
for i in range(100):
# do some hard work here
if i % 10 == 0:
self.set_status_message("Progress: %d / 100" % i)
# displays a progress bar in the scheduler UI
self.set_progress_percentage(i)
|
pythonで実行する場合
サンプル
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class MyTask1(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=0)
def run(self):
print(self.x + self.y)
class MyTask2(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=1)
z = luigi.IntParameter(default=2)
def run(self):
print(self.x * self.y * self.z)
if __name__ == '__main__':
luigi.build([MyTask1(x=10), MyTask2(x=15, z=3)])
|
windowsは何か注意事項があるみたい。
Luigi on Windows