用例:
我有一个协调器,它将一个包含多个文件的目录传递给一个工作流。工作流有以下节点:
java node 1 : Reads the file, and does some json parsing gets some input values to below nodes. Done using <capture-output>.
pig node 1 : Does some action. Requires above input values from parsed json.
pig node 2 : Same as above
pig node 3 : ................
..................问题:
协调器将目录名传递给工作流。我想做以下几件事:
for every file in directory {
java node 1 : get config from file X
pig node 1 : ...............
..............
}请建议一种我可以这样做的方法。
下面的是协调器:
LAST\_ONLY <datasets>
<dataset name="input" frequency="${datasetFrequency}" initial-instance="${datasetInitialInstance}" timezone="UTC">
<uri-template>${nameNode}/user/${coord:user()}/alertcampaign/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
<done-flag></done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="inputLogs1" dataset="input">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${nameNode}/user/${coord:user()}/test.xml</app-path>
<configuration>
<property>
<name>wfInput</name>
<value>${coord:dataIn('inputLogs1')}</value>
</property>
</configuration>
</workflow>
发布于 2016-01-29 05:30:06
https://stackoverflow.com/questions/28986737
复制相似问题