Goals
JIRA: CDAP-4075: Error handling for Workflows.
Use Cases
- When Workflow run finishes, user may want to send an email about its success or failure.
- In case of hydrator pipeline, once the run is finish, user may wish to delete the data from the external source such as Oracle/Teradata etc.
- If the Workflow fails for some reason, user may want to cleanup the files/data written by the nodes in the Workflow.
- On failure of the Workflow, user may wish to keep certain local datasets for further debugging.
- In a Workflow, user can have a custom action at the start of the workflow that writes to a dataset (which acts as a lock). Next node in the Workflow is a MapReduce program that fails for that run of the Workflow. User would like to be able to clean up the state that custom action wrote to dataset
User Stories
- As a developer of Workflow action, I want an ability to clean up the data that was written by Workflow action in case of Workflow action failure.
- As a developer of Workflow action, I want an ability to clean up the data that was written by Workflow action in case Workflow fails.
- As a developer of the Workflow, I want an ability to send an email once the Workflow run finishes. In case of failure, I should be able to access the nodes that failed and the failure cause.
- As a developer of the Workflow, I want an ability to instruct the Workflow system, not to delete the certain local datasets for triage purpose.
Possible approach
Ideally clean up activity should be done by the node in the Workflow which created the data, since the node knows what information need to be cleaned up. MapReduce and Spark program already have the onFinish method, which can be used to clean up any state on their failure. Custom action should similarly have the onFinish method to perform clean up on custom action failure. Custom action already have destroy method, however it does not know whether the run is succeeded or failed. We should deprecate it and introduce the new method onFinish to be consistent with other actions.
public interface WorkflowAction extends Runnable { /** * This method is called after the {@link #run} method completes and it can be used for resource cleanup. * Any exception thrown only gets logged but does not affect execution of the {@link Workflow}. */ @Deprecated void destroy(); /** * This method is called after the execution of the action is done. * @param succeeded defines the result of action execution: true if job succeeded, false otherwise * @throws Exception if there is any error during execution. This exception will only be logged as error * without affecting the execution of the {@link Workflow} */ void onFinish(boolean succeeded) throws Exception; }
We will have onFinish method in the Workflow interface as well, which will get called when the Workflow finishes either successfully or on failure.
public interface Workflow { /** * Called when the Workflow run finishes either successfully or on failure. * @param context the context associated with the Workflow * @param state the state of the Workflow * @throws Exception if there is an error during this method. This will not affect the status of the Workflow. */ void onFinish(WorkflowContext context, WorkflowState state) throws Exception; }
WorkflowState class contains the state of all nodes in the Workflow.
public final class WorkflowState { private final Map<String, WorkflowNodeState> nodeState; private boolean isSucceeded; } public final class WorkflowNodeState { private final String nodeId; private final NodeStatus nodeStatus; private final RunId runId; // Cause if the node execution failed, null otherwise private final Throwable failureCause; } public enum NodeStatus { KILLED, FAILED, COMPLETED }
- onFinish method in the Workflow can also update the preferences such as changing preferences for the local datasets.
- This can be either done through WorkflowToken, since user can get the WorkflowToken through WorkflowContext. However since we store the information in the WorkflowToken at node level, we will have to create an internal node for the onFinish method.
- Another approach is to have Map<String, String> properties in the WorkflowState instance, which user can update in the onFinish method.
- Similar to MapReduce and Spark, onFinish method of the Workflow will run in short transaction. Ideally user would like to have control over the kind of transaction that need to be started.
In 3.5, we can have ability for the programs to specify hook which will be called by the Workflow when it finishes, so that programs can use the hook method to perform cleanup more effectively. This would be useful in dynamic Workflows, where user may not be aware of the cleanup activity that need to be performed for each node.
public interface MapReduce { /** * @param succeeded defines the result of the Workflow execution: true if job succeeded, false otherwise * @param context job execution context * @throws Exception if there's an error during this method invocation. */ void onWorkflowFinish(boolean succeeded, MapReduceContext context) throws Exception; }