...
2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '$secure.key', in case of value directly to be substituted, macro can be '$inputpath' without any prefix.
Scoping:
Hydrator arguments can also be scoped (similar to CDAP workflows). If an argument is prefixed with the stage name, the Hydrator app will strip the stage name and add that as an additional argument local to that stage. For example the pipeline has 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
The 'customers' stage will see 3 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
inputpath | /data/customers/2016-01-01 |
Similarly, the 'items' stage will see 3 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
inputpath | /data/items/2016-01-01 |
If there is a name conflict between a scoped argument and a global argument, the scoped argument will overwrite the global for that scope. For example if the pipeline has 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/items/2016-01-01 |
The customers stage will see 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/customers/2016-01-01 |
The items stage will see 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/items/2016-01-01 |
For secure keys, the scoped macro would be: '$customers.secure.key'
Precedence:
Workflow token > scoped arguments > pipeline level (global) arguments
3.) Set substituted properties for a stage. At runtime for a stage, we will need to add setPluginProperties() in PluginContext and Plugin.java as shown below.
Code Block |
---|
@Beta
public interface PluginContext {
/**
* Gets the {@link PluginProperties} associated with the given plugin id.
*
* @param pluginId the unique identifier provide when declaring plugin usage in the program.
* @return the {@link PluginProperties}.
* @throws IllegalArgumentException if pluginId is not found
* @throws UnsupportedOperationException if the program does not support plugin
*/
/**
* Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with
* {@link PluginProperties} provided at the time when the
* {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the
* program configuration time.
*
* @param pluginId the unique identifier provide when declaring plugin usage in the program.
* @param <T> the class type of the plugin
* @return A new instance of the plugin being specified by the arguments
*
* @throws InstantiationException if failed create a new instance
* @throws IllegalArgumentException if pluginId is not found
* @throws UnsupportedOperationException if the program does not support plugin
*/
<T> T newPluginInstance(String pluginId) throws InstantiationException;
/**
* Creates a new instance of a plugin using properties provided. The instance returned will have the {@link PluginConfig} setup with
* {@link PluginProperties} provided at the time when the
* {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the
* program configuration time.
*
* @param pluginId the unique identifier provide when declaring plugin usage in the program.
* @param properties the properties needs to be used to create plugin
* @param <T> the class type of the plugin
* @return A new instance of the plugin being specified by the arguments
*
* @throws InstantiationException if failed create a new instance
* @throws IllegalArgumentException if pluginId is not found
* @throws UnsupportedOperationException if the program does not support plugin
*/
<T> T newPluginInstance(String pluginId, Map<String,String> properties) throws InstantiationException;
} |
Code Block |
---|
package co.cask.cdap.api.plugin;
public final class Plugin {
private final ArtifactId artifactId;
private final PluginClass pluginClass;
private PluginProperties properties;
public Plugin(ArtifactId artifactId, PluginClass pluginClass, PluginProperties properties) {
this.artifactId = artifactId;
this.pluginClass = pluginClass;
this.properties = properties;
}
/**
* @return artifact id
*/
public ArtifactId getArtifactId() {
return artifactId;
}
/**
* @return {@link PluginClass}
*/
public PluginClass getPluginClass() {
return pluginClass;
}
/**
* @param properties properties of the plugin
*/
public void setPluginProperties(PluginProperties properties) {
this.properties = properties;
}
/**
* Returns the set of properties available when the plugin was created.
*/
public PluginProperties getProperties() {
return properties;
}
.....
}
|
In initialize() of ETLMapReduce, PipelinePluginInstantiator instantiates a stage at runtime. We can substitute the properties with actual runtime arguments in ETLMapReduce.initialize() for MapReduceContext or ETLSpark.beforeSubmit() for SparkClientContext. DefaultPluginContext can use substituted properties for a stage and instantiate a stage.
No Format |
---|
PipelinePluginInstantiator pluginInstantiator = new PipelinePluginInstantiator(context, phaseSpec);
// substitute properties of a plugin and set it in the context
BatchConfigurable<BatchSourceContext> batchSource = pluginInstantiator.newPluginInstance(sourceName, substitutedProperties); |
Constraints:
...
Assumptions:
...
Thoughts from Terence:
...