Goals
Checklist
- User stories documented (Albert/Vinisha)
- User stories reviewed (Nitin)
- Design documented (Albert/Vinisha)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
Use Cases
- A pipeline developer wants to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run. For example, the time partition(s) that should be read by the source, and the name of the dataset sink, need to be set at a per-run basis. The arguments can be set either through CDAP runtime arguments/preferences, or by the pipeline itself. For example, at the start of the run, the pipeline performs some action (ex: queries a dataset or makes an http call) to lookup which time partitions should be read, and where data should be written to, for that pipeline run. Alternatively, a user can manually specify the time partitions through CDAP runtime arguments/preferences then start the run.
User Stories
- As a pipeline developer, I want to be able to configure a plugin property to some value that will get substituted for each run based on the runtime arguments
- As a pipeline operator, I want to be able to set arguments for the entire pipeline that will be used for substitution
- As a pipeline operator, I want to be able to set arguments for a specific stage in the pipeline that will be used for substitution
- As a plugin developer, I want to be able to write a code that is executed at the start of the pipeline and sets arguments for the rest of the run.
Design (WIP - dont review yet)
Specifying Macros
We can introduce macro syntax that can be used in plugin configs that the Hydrator app will substitute before any plugin code is run. For example:
{ "stages": [ { "name": "customers", "plugin": { "name": "File", "type": "batchsource", "properties": { "path": "hdfs://host:port/${customers_inputpath}" // ${customers_inputpath} will get replaced with the value of the 'customers_inputpath' runtime argument } } }, { "name": "items", "plugin": { "name": "File", "type": "batchsource", "properties": { "path": "hdfs://host:port/${items_inputpath}" // ${items_inputpath} will get replaced with the value of the 'items_inputpath' runtime argument } } } ] }
Setting Hydrator runtime arguments using CDAP runtime arguments/preferences
CDAP preferences and runtime arguments will be used directly as Hydrator arguments.
1.) For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for runtime arguments.
2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. To substitute, we can use Macro api. We already have it in hydrator.
public interface Macro { /** * Get the value of the macro based on the context and arguments. * * @param arguments arguments to the macro * @param context the runtime context, which gives access to things runtime arguments. * @return the macro value */ String getValue(@Nullable String arguments, RuntimeContext context) throws Exception; }
Now, substitution can can be a value which can be directly substituted or it can be key to some keystore for example, in case of SFTP. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '$secure.key', in case of value directly to be substituted, macro can be '$inputpath' without any prefix.
3.) Set substituted properties for a stage. To setProperties at runtime for a stage, we will need to add setPluginProperties() in PluginContext and Plugin.java as shown below.
@Beta public interface PluginContext { /** * Gets the {@link PluginProperties} associated with the given plugin id. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @return the {@link PluginProperties}. * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ PluginProperties getPluginProperties(String pluginId); /** * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @param pluginProperties the {@link PluginProperties} for * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ void setPluginProperties(String pluginId, PluginProperties pluginProperties); /** * Loads and returns a plugin class as specified by the given plugin id. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @param <T> the class type of the plugin * @return the resulting plugin {@link Class}. * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ <T> Class<T> loadPluginClass(String pluginId); /** * Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with * {@link PluginProperties} provided at the time when the * {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the * program configuration time. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @param <T> the class type of the plugin * @return A new instance of the plugin being specified by the arguments * * @throws InstantiationException if failed create a new instance * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ <T> T newPluginInstance(String pluginId) throws InstantiationException; }
package co.cask.cdap.api.plugin; public final class Plugin { private final ArtifactId artifactId; private final PluginClass pluginClass; private PluginProperties properties; public Plugin(ArtifactId artifactId, PluginClass pluginClass, PluginProperties properties) { this.artifactId = artifactId; this.pluginClass = pluginClass; this.properties = properties; } /** * @return artifact id */ public ArtifactId getArtifactId() { return artifactId; } /** * @return {@link PluginClass} */ public PluginClass getPluginClass() { return pluginClass; } public void setPluginProperties(PluginProperties properties) { this.properties = properties; } /** * Returns the set of properties available when the plugin was created. */ public PluginProperties getProperties() { return properties; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } Plugin that = (Plugin) o; return Objects.equals(artifactId, that.artifactId) && Objects.equals(pluginClass, that.pluginClass) && Objects.equals(properties, that.properties); } @Override public int hashCode() { return Objects.hash(artifactId, pluginClass, properties); } @Override public String toString() { return "Plugin{" + "artifactId=" + artifactId + ",pluginClass=" + pluginClass + ",properties=" + properties + '}'; } }
In initialize() of ETLMapReduce, PipelinePluginInstantiator instantiates a stage at runtime. To set substituted properties for a stage, we can call context.setPluginProperties() in ETLMapReduce.initialize() for MapReduceContext or ETLSpark.beforeSubmit() for SparkClientContext. DefaultPluginContext can set substituted properties for a stage and instantiate a stage.
PipelinePluginInstantiator pluginInstantiator = new PipelinePluginInstantiator(context, phaseSpec); // substitute properties of a plugin and set it in the context context.setPluginProperties(sourceName, properties); BatchConfigurable<BatchSourceContext> batchSource = pluginInstantiator.newPluginInstance(sourceName);
Limitations:
- The runtime argument name should not contain '$'
- Rumtime argument name should be unique through out pipeline
Thoughts from Terence: