Requirements
Generic Token support (not only MR counters)
- Ability for the user to access the token set by any node in the Workflow
- Persist token per Workflow run in the MDS
- View token set by specific node in the Workflow
Assumptions
Generic information in the token is of the key-value type, each represented by String.
Potential Changes
WorkflowToken interface changes
/** * Put the specified key-value entry in the {@link WorkflowToken}. * The token may store additional information about the context in which * this key is being set, for example, the unique name of the workflow node. * @param key the key representing the entry * @param value the value for the key */ void setValue(String key, String value); /** * Same key can be added to the WorkflowToken by multiple nodes. * This method returns the list of map entry, where key in the entry * represents the name of the node and value in the entry represents * the value that was added by that node for the specified input key. * * @param key the key to be searched * @return the list of map entry where each entry maps * from node name to the value that node added for the input key */ List<Map.Entry<String, String>> getValues(String key); /** * Return true if the {@link WorkflowToken} contains the specified key. * @param key the key to be tested for the presence in the {@link WorkflowToken} * @return the result of the test */ boolean containsKey(String key);
WorkflowConfigurer interface changes
We generate unique numeric node id for each node when the application is deployed. However while writing the Workflow, user will not be aware of the node id associated with each node in the Workflow. Since WorkflowToken stores the MapReduce counters and other information per node level, user should be able to get the value of the particular key from the token as set by the particular program in the Workflow.
If the program is used only once in the Workflow, then user can use its name to query for the token information. However we allow same program to occur multiple times in the Workflow. In this case the program name will not be sufficient to access the token.
The WorkflowConfigurer API can be updated to allow user to set the unique name for the program, it it occurs multiple times in the Workflow and use that unique name to retrieve the token./** * Add MapReduce program to the {@link Workflow}. * @param uniqueName the unique name for the MapReduce program which will be used * to identify particular occurrence of the program in the Workflow * @param mapReduceName the name of the MapReduce program */ void addMapReduce(String uniqueName, String mapReduceName);
WorkflowToken can also be updated from predicate on the condition node. In presence of multiple condition nodes in the Workflow, we will need the ability to specify unique names for the conditions as well, so that token values from specific condition nodes can be fetched./** * Adds a condition with the unique name to the {@link Workflow}. * @param conditionName the unique name to be assigned to the condition * @param condition the {@link Predicate} to be evaluated for the condition * @return the configurer for the condition */ WorkflowConditionConfigurer<? extends WorkflowConfigurer> condition(String conditionName, Predicate<WorkflowContext> condition);
Provide ability to set and get information in the WorkflowToken
1. MapReduce program - User should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the MapReduce program. Since these methods get MapReduceContext, we will need to update the MapReduceContext interface to get the WorkflowToken./** * If {@link MapReduce} program is executed as a part of the {@link Workflow} * then get the {@link WorkflowToken} associated with the current run, otherwise return null. * @return the {@link WorkflowToken} if available */ @Nullable WorkflowToken getWorkflowToken();
2. Spark program - User should be able to access and modify WorkflowToken from "beforeSubmit" and "onFinish" methods of the Spark program. Since these methods get SparkContext, we will need to update the SparkContext interface to get the WorkflowToken.
/** * If {@link Spark} program is executed as a part of the {@link Workflow} * then get the {@link WorkflowToken} associated with the current run, otherwise return null. * @return the {@link WorkflowToken} if available */ @Nullable WorkflowToken getWorkflowToken();
3. Custom Workflow action - Since custom workflow action already receive WorkflowContext no changes anticipated in the interface.
- WorkflowToken in presence of Fork and Join
When fork is encountered in the Workflow, we make copy of the WorkflowToken and pass it along each branch. At the join, we create new WorkflowToken, which will be merge of the WorkflowToken associated with all of the branches of the fork. Since we are storing the information in the token at node level, there will not be any conflicts during the merge process. However we will not be able to provide any ordering guarantees at the join. - Persisting the WorkflowToken
RunRecord for the Workflow will contain the WorkflowToken as its property. This token will be updated before the execution of the action in the Workflow. We will add version field to the RunRecord itself which would help in the upgrade process. - Rest end-points to access the value of the WorkflowToken that was received by individual node in the WorkflowWe will expose the rest end point to get the token values that were set by particular node as identified by its unique name.
/apps/{app-id}/workflows/{workflow-name}/runs/{run-id}/nodes/{unique-node-name}/token