Luigi使ってみた
以下のデータパイプラインを作成
- irisデータ取得
- irisデータ加工(前処理)
(すごい適当。。。ひとまず動作確認が出来ればOK)
詳細
ディレクトリ構成
1
2
3
4
5
6
|
├─data
│ ├─preprocessing ← 前処理したファイル
│ └─source ← 取得したファイル
│
└─src
run.py ← Luigiのソース
|
run.py
の中身
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import pickle
import luigi
import pandas as pd
from luigi.util import requires
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
class LoadDataset(luigi.Task):
def output(self):
"""出力を定義
"""
return luigi.LocalTarget("../data/source/iris.pkl", format=luigi.format.Nop) # pickleのためNop指定
def run(self):
"""ビジネスロジック
"""
df: pd.DataFrame = datasets.fetch_openml("iris", version=1, as_frame=True, return_X_y=False).frame
with self.output().open(mode="w") as fp:
# 型が失われるのでpickle(バイナリ)で保存
fp.write(pickle.dumps(df, protocol=pickle.HIGHEST_PROTOCOL))
@requires(LoadDataset)
class MainTask(luigi.Task):
def output(self):
return luigi.LocalTarget("../data/preprocessing/iris_normed.pkl", format=luigi.format.Nop)
def run(self):
scaler = StandardScaler()
xcols = ['sepallength', 'sepalwidth', 'petallength', 'petalwidth']
ycol = ["class"]
with self.input().open("r") as fp:
df = pickle.load(fp)
df_scaled = pd.DataFrame(
data=scaler.fit_transform(df[xcols]),
columns=xcols
)
with self.output().open("w") as fp:
fp.write(pickle.dumps(df_scaled, protocol=pickle.HIGHEST_PROTOCOL))
if __name__ == "__main__":
luigi.build([MainTask()], local_scheduler=True)
|
pandasのデータフレームの型情報を残すには、pickle
(バイナリ)で保存するのが良い見たい
Luigi逆引きリファレンス
実行
1
2
|
$ cd src
$ python run.py
|
1
2
3
4
5
6
7
8
9
10
|
├─data
│ ├─preprocessing
│ │ iris_normed.pkl
│ │
│ ├─result
│ └─source
│ iris.pkl
│
└─src
run.py
|
できてるっぽい