DigDagでAPI使って取得したリストでfor_eachを回す

www.digdag.io

DigDag のfor_eachオペレータで配列使う際に四苦八苦したのでメモ。

今日取得して指定の場所に配置してあるファイルを1ファイルずつ加工したかったので、DigDag使ってやってみることにしたものの。。

  • 今日取得したファイルのリストってどう作るのか?
  • 1ファイルずつ処理するにはどうするのか?

さっぱりわからなかったので色々調べてみた。

DigDagには、 Python API が用意されている(Perl API欲しいなぁ。。)ので、今日更新したファイルをリスト化して配列に格納し、 for_eachオペレータ 使って作成した配列で回すことにした。

んで、 digdag init mydag で作ったサンプルワークフローをベースにして作った実際のサンプルコードはこちら。 但し、for_eachのループでechoしてるだけなので異様にループが遅いのはなんでだろ。。。まだ課題がありそうです。

ファイルツリー

mydag.dig
tasks/__init__.py

mydag.dig

timezone: UTC                                                                                                                                               

+setup:
  echo>: start ${session_time}

# 今日更新した *.logファイルのリスト作成
+findfiles:
  py>: tasks.MyWorkflow.findfiles

# 作成したリストを1つずつ取得してファイルパスをecho
+repeat:
  for_each>:
    path: ${target_files}
  _do:
    echo>: target ${path}

+teardown:
  echo>: finish ${session_time}

tasks/__init__.py

import digdag                                                                                                                                               
import subprocess

class MyWorkflow(object):
  def findfiles(self):
    cmd = "find /hoge/fuga/ -mtime 0 -name *.log"
    try:
        out = {}
        out = subprocess.check_output( cmd.split(" ") ).splitlines()
        print out
        digdag.env.store({'target_files': out})
    except subprocess.CalledProcessError as e:
        print e.returncode
        print e.cmd
        print e.output,

需要ありそうなのにサンプルがなかったので何かの足しになれば\( ‘ω’)/