Goals
Checklist
- User stories documented (Albert/Vinisha)
- User stories reviewed (Nitin)
- Design documented (Albert/Vinisha)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
Use Cases
- A pipeline developer wants to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run. For example, the time partition(s) that should be read by the source, and the name of the dataset sink, need to be set at a per-run basis. The arguments can be set either through CDAP runtime arguments/preferences, or by the pipeline itself. For example, at the start of the run, the pipeline performs some action (ex: queries a dataset or makes an http call) to lookup which time partitions should be read, and where data should be written to, for that pipeline run. Alternatively, a user can manually specify the time partitions through CDAP runtime arguments/preferences then start the run.
User Stories
- As a pipeline developer, I want to be able to configure a plugin property to some value that will get substituted for each run based on the runtime arguments
- As a pipeline operator, I want to be able to set arguments for the entire pipeline that will be used for substitution
- As a pipeline operator, I want to be able to set arguments for a specific stage in the pipeline that will be used for substitution
- As a plugin developer, I want to be able to write a code that is executed at the start of the pipeline and sets arguments for the rest of the run.
Design (WIP - dont review yet)
Specifying Macros
We can introduce macro syntax that can be used in plugin configs that the Hydrator app will substitute before any plugin code is run. For example:
{ "stages": [ { "name": "customers", "plugin": { "name": "File", "type": "batchsource", "properties": { "path": "hdfs://host:port/${inputpath}" // ${inputpath} will get replaced with the value of the 'customers.inputpath' runtime argument } } }, { "name": "items", "plugin": { "name": "File", "type": "batchsource", "properties": { "path": "hdfs://host:port/${inputpath}" // ${inputpath} will get replaced with the value of the 'items.inputpath' runtime argument } } } ] }
Setting Hydrator runtime arguments using CDAP runtime arguments/preferences
CDAP preferences and runtime arguments will be used directly as Hydrator arguments.
1.) Runtime arguments can be passed to hydrator pipeline in 2 ways:
- Using Prepipeline-CustomActions:
Prepipeline custom actions can set runtime arguments. For example, before running the pipeline, custom actions can copy local files to hdfs and set runtime arguments for input path for batchsource. In order to do that, we can expose setPreferences() and getPreferences() programmatic api for setting runtime arguments. These arguments can be passed to hydrator app using workflow token. - Using Hydrator UI:
For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for preferences/runtime arguments framework.
2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '$secure.key', in case of value directly to be substituted, macro can be '$inputpath' without any prefix.
Scoping:
Hydrator arguments can also be scoped (similar to CDAP workflows). If an argument is prefixed with the stage name, the Hydrator app will strip the stage name and add that as an additional argument local to that stage. For example the pipeline has 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
The 'customers' stage will see 3 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
inputpath | /data/customers/2016-01-01 |
Similarly, the 'items' stage will see 3 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
inputpath | /data/items/2016-01-01 |
If there is a name conflict between a scoped argument and a global argument, the scoped argument will overwrite the global for that scope. For example if the pipeline has 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/items/2016-01-01 |
The customers stage will see 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/customers/2016-01-01 |
The items stage will see 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/items/2016-01-01 |
For secure keys, the scoped macro would be: '$customers.secure.key'
Precedence:
Workflow token > scoped arguments > pipeline level (global) arguments
3.) Set substituted properties for a stage. At runtime for a stage, we will need to add setPluginProperties() in PluginContext and Plugin.java as shown below.
@Beta public interface PluginContext { /** * Gets the {@link PluginProperties} associated with the given plugin id. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @return the {@link PluginProperties}. * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ /** * Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with * {@link PluginProperties} provided at the time when the * {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the * program configuration time. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @param <T> the class type of the plugin * @return A new instance of the plugin being specified by the arguments * * @throws InstantiationException if failed create a new instance * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ <T> T newPluginInstance(String pluginId) throws InstantiationException; /** * Creates a new instance of a plugin using properties provided. The instance returned will have the {@link PluginConfig} setup with * {@link PluginProperties} provided at the time when the * {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the * program configuration time. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @param properties the properties needs to be used to create plugin * @param <T> the class type of the plugin * @return A new instance of the plugin being specified by the arguments * * @throws InstantiationException if failed create a new instance * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ <T> T newPluginInstance(String pluginId, Map<String,String> properties) throws InstantiationException; }
package co.cask.cdap.api.plugin; public final class Plugin { private final ArtifactId artifactId; private final PluginClass pluginClass; private PluginProperties properties; public Plugin(ArtifactId artifactId, PluginClass pluginClass, PluginProperties properties) { this.artifactId = artifactId; this.pluginClass = pluginClass; this.properties = properties; } /** * @return artifact id */ public ArtifactId getArtifactId() { return artifactId; } /** * @return {@link PluginClass} */ public PluginClass getPluginClass() { return pluginClass; } /** * @param properties properties of the plugin */ public void setPluginProperties(PluginProperties properties) { this.properties = properties; } /** * Returns the set of properties available when the plugin was created. */ public PluginProperties getProperties() { return properties; } ..... }
In initialize() of ETLMapReduce, PipelinePluginInstantiator instantiates a stage at runtime. We can substitute the properties with actual runtime arguments in ETLMapReduce.initialize() for MapReduceContext or ETLSpark.beforeSubmit() for SparkClientContext. DefaultPluginContext can use substituted properties for a stage and instantiate a stage.
PipelinePluginInstantiator pluginInstantiator = new PipelinePluginInstantiator(context, phaseSpec); // substitute properties of a plugin and set it in the context BatchConfigurable<BatchSourceContext> batchSource = pluginInstantiator.newPluginInstance(sourceName, substitutedProperties);
Constraints:
- The runtime argument name should not contain '$'
- This design is only applicable for batch and not for realtime because current implementation for realtime does not have a way for passing tokens which is required for preaction/custom actions in a pipeline to pass runtime arguments.
Assumptions:
- The hydrator app (SparkClientContext, MapReduceContext) will have access to secure store manager to substitute values from key store.
Thoughts from Terence: