DigDagでAPI使って取得したリストでfor_eachを回す
DigDag のfor_eachオペレータで配列使う際に四苦八苦したのでメモ。
今日取得して指定の場所に配置してあるファイルを1ファイルずつ加工したかったので、DigDag使ってやってみることにしたものの。。
- 今日取得したファイルのリストってどう作るのか?
- 1ファイルずつ処理するにはどうするのか?
さっぱりわからなかったので色々調べてみた。
DigDagには、 Python API が用意されている(Perl API欲しいなぁ。。)ので、今日更新したファイルをリスト化して配列に格納し、 for_eachオペレータ 使って作成した配列で回すことにした。
んで、 digdag init mydag
で作ったサンプルワークフローをベースにして作った実際のサンプルコードはこちら。
但し、for_eachのループでechoしてるだけなので異様にループが遅いのはなんでだろ。。。まだ課題がありそうです。
ファイルツリー
mydag.dig tasks/__init__.py
mydag.dig
timezone: UTC +setup: echo>: start ${session_time} # 今日更新した *.logファイルのリスト作成 +findfiles: py>: tasks.MyWorkflow.findfiles # 作成したリストを1つずつ取得してファイルパスをecho +repeat: for_each>: path: ${target_files} _do: echo>: target ${path} +teardown: echo>: finish ${session_time}
tasks/__init__.py
import digdag import subprocess class MyWorkflow(object): def findfiles(self): cmd = "find /hoge/fuga/ -mtime 0 -name *.log" try: out = {} out = subprocess.check_output( cmd.split(" ") ).splitlines() print out digdag.env.store({'target_files': out}) except subprocess.CalledProcessError as e: print e.returncode print e.cmd print e.output,
需要ありそうなのにサンプルがなかったので何かの足しになれば\( ‘ω’)/