I asked to build periodic job to perform data aggregation MR and upload to HBase based on:
1. period of time
2. start running when input folder exists
3. start running when input folder contains _SUCCESS file
So I define 4 steps in my workflow
Step 1: based on job requirements check if input folder exists and contains _SUCCESS file
<decision name="upload-decision">
<switch>
<case to="create-csv">
${fs:exists(startFlag)}
</case>
<default to="end"/>
</switch>
</decision>
Step 2: Running MR job as java action of Oozie
In prepare section appears deletion of _SUCCESS file and MR job output folder
<action name="create-csv">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}${startFlag}"/>
<delete path="${nameNode}${csvOutputDir}"/>
</prepare>
<main-class>LocationToCsvMRMain</main-class>
<arg>${rowDataInputDir}</arg>
<arg>${csvOutputDir}</arg>
<arg>${numOfReducers}</arg>
<capture-output />
</java>
<ok to="import-csv-to-hbase" />
<error to="fail" />
</action>
Step 3: Import data in CSV format to HBase as shell action of Oozie
<action name="import-csv-to-hbase">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>import-csv-to-hbase.sh</exec>
<argument>${csvOutputDir}</argument>
<argument>${zkHostPort}</argument>
<file>import-csv-to-hbase.sh#import-csv-to-hbase.sh</file>
<capture-output />
</shell>
<ok to="cleanup" />
<error to="fail" />
</action>
Step 4: Move input folder to archive folder to prevent multiple processing of the same data
<action name="cleanup">
<fs>
<mkdir path='${nameNode}${archiveFolder}${wf:id()}'/>
<move source='${nameNode}${rowDataInputDir}' target='${archiveFolder}${wf:id()}/processed-input'/>
</fs>
<ok to="end"/>
<error to="fail"/>
</action>
To provide periodic behavior I wrap this workflow in simple coordinator
<coordinator-app name="simple-coordinator"
frequency="${coord:minutes(15)}"
start="2014-06-01T00:00Z" end="2014-06-03T00:15Z" timezone="America/Los_Angeles" xmlns="uri:oozie:coordinator:0.2">
<action>
<workflow>
<app-path>${nameNode}${wfPath}</app-path>
<configuration>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Comments
Post a Comment