...
Ideally clean up activity should be done by the node in the Workflow which created the data, since the node knows what information need to be cleaned up. MapReduce and Spark program already have the onFinish method, which can be used to clean up any state on their failure. Custom action should similarly have the onFinish method to perform clean up on custom action failure. Custom action already have destroy method, however it does not know whether the run is succeeded or failed. We should deprecate it and introduce the new method onFinish to be consistent with other actions.
Code Block language java public interface WorkflowAction extends Runnable { /** * This method is called after the {@link #run} method completes and it can be used for resource cleanup. * Any exception thrown only gets logged but does not affect execution of the {@link Workflow}. */ @Deprecated void destroy(); /** * This method is called after the execution of the action is done. * @param succeeded defines the result of action execution: true if job succeeded, false otherwise * @throws Exception if there is any error during execution. This exception will only be logged as error * without affecting the execution of the {@link Workflow} */ void onFinish(boolean succeeded) throws Exception; }
We will have onFinish method in the Workflow interface as well, which will get called when the Workflow finishes either successfully or on failure.
Code Block language java public interface Workflow { /** * Called when the Workflow run finishes either successfully or on failure. * @param context the context associated with the Workflow * @param state the state of the Workflow * @throws Exception if there is an error during this method. This will not affect the status of the Workflow. */ void onFinish(WorkflowContext context, WorkflowState state) throws Exception; }
WorkflowState class contains the state of all nodes in the Workflow.
Code Block language java public final class WorkflowState { private final Map<String, WorkflowNodeState> nodeState; private boolean isSucceeded; } public final class WorkflowNodeState { private final String nodeId; private final NodeStatus nodeStatus; private final RunId runId; // Cause if the node execution failed, null otherwise private final Throwable failureCause; } public enum NodeStatus { KILLED, FAILED, COMPLETED }
- onFinish method in the Workflow can also update the preferences such as changing preferences for the local datasets.
- This can be either done through WorkflowToken, since user can get the WorkflowToken through WorkflowContext. However since we store the information in the WorkflowToken at node level, we will have to create an internal node for the onFinish method.
- Another approach is to have Map<String, String> properties in the WorkflowState instance, which user can update in the onFinish method.
- Similar to MapReduce and Spark, onFinish method of the Workflow will run in short transaction. Ideally user would like to have control over the kind of transaction that need to be started.
Workflow can also have the beforeStart method which can be used for any cleanup activity, so that user do not have to put additional custom action only for initialization purpose.
Code Block language java public interface Workflow { /** * Called before the start of the every run of the Workflow. * @param context the Workflow context * @throws Exception thrown if any error executing code. This will cause Workflow to fail. */ void beforeStart(WorkflowContext context) throws Exception; }
In 3.5, we can have ability for the programs to specify hook which will be called by the Workflow when it finishes, so that programs can use the hook method to perform cleanup more effectively. This would be useful in dynamic Workflows, where user may not be aware of the cleanup activity that need to be performed for each node.
Code Block language java public interface MapReduce { /** * @param succeeded defines the result of the Workflow execution: true if job succeeded, false otherwise * @param context job execution context * @throws Exception if there's an error during this method invocation. */ void onWorkflowFinish(boolean succeeded, MapReduceContext context) throws Exception; }