Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "stages": [
    {
      "name": "customers",
      "plugin": {
        "name": "File",
        "type": "batchsource",
        "properties": {
          "path": "hdfs://host:port/${customers_inputpath}" // ${customers_inputpath} will get replaced with the value of the 'customers_.inputpath' runtime argument
        }
      }
    },
    {
      "name": "items",
      "plugin": {
        "name": "File",
        "type": "batchsource",
        "properties": {
          "path": "hdfs://host:port/${items_inputpath}" // ${items_inputpath} will get replaced with the value of the 'items_.inputpath' runtime argument
        }
      }
    }
  ]
}

...

CDAP preferences and runtime arguments will be used directly as Hydrator arguments. 

1.) Runtime arguments can be passed to hydrator pipeline in 2 ways:

  1. Using Prepipeline-CustomActions:
    Prepipeline custom actions can set runtime arguments. For example, before running the pipeline, custom actions can copy local files to hdfs and set runtime arguments for input path for batchsource. In order to do that, we can expose setPreferences() and getPreferences() programmatic api for setting runtime arguments. These arguments can be passed to hydrator app using workflow token. 
  2. Using Hydrator UI:
    For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for preferences/runtime arguments framework. 

2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. To substitute, we can use Macro api. We already have it in hydrator.

Code Block
public interface Macro {

  /**
   * Get the value of the macro based on the context and arguments.
   *
   * @param arguments arguments to the macro
   * @param context the runtime context, which gives access to things runtime arguments.
   * @return the macro value
   */
  String getValue(@Nullable String arguments, RuntimeContext context) throws Exception;
}

Now, substitution can can be a value which can be directly substituted or it can be key to some keystore for example, in case of SFTP Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '$secure.key', in case of value directly to be substituted, macro can be '$inputpath' without any prefix. 

Scoping:

Hydrator arguments can also be scoped (similar to CDAP workflows). If an argument is prefixed with the stage name, the Hydrator app will strip the stage name and add that as an additional argument local to that stage. For example the pipeline has 2 arguments:

Argument nameArgument value
customers.inputpath/data/customers/2016-01-01
items.inputpath/data/items/2016-01-01


The 'customers' stage will see 3 arguments:

Argument nameArgument value
customers.inputpath/data/customers/2016-01-01
items.inputpath/data/items/2016-01-01
inputpath/data/customers/2016-01-01


Similarly, the 'items' stage will see 3 arguments:

Argument nameArgument value
customers.inputpath/data/customers/2016-01-01
items.inputpath/data/items/2016-01-01
inputpath/data/items/2016-01-01


If there is a name conflict between a scoped argument and a global argument, the scoped argument will overwrite the global for that scope. For example if the pipeline has 2 arguments:

Argument nameArgument value
customers.inputpath/data/customers/2016-01-01
inputpath/data/items/2016-01-01


The customers stage will see 2 arguments:

Argument nameArgument value
customers.inputpath/data/customers/2016-01-01
inputpath/data/customers/2016-01-01

 

The items stage will see 2 arguments:

 

Argument nameArgument value
customers.inputpath/data/customers/2016-01-01
inputpath/data/items/2016-01-01


For secure keys, the scoped macro would be: '$customers.secure.key'

Precedence:

Workflow token > scoped arguments > pipeline level (global) arguments 


3.) Set substituted properties for a stage. To setProperties at At runtime for a stage, we will need to add setPluginProperties() in PluginContext and Plugin.java as shown below.

...

Code Block
package co.cask.cdap.api.plugin;
 
public final class Plugin {
  private final ArtifactId artifactId;
  private final PluginClass pluginClass;
  private PluginProperties properties;

  public Plugin(ArtifactId artifactId, PluginClass pluginClass, PluginProperties properties) {
    this.artifactId = artifactId;
    this.pluginClass = pluginClass;
    this.properties = properties;
  }

  /**
   * @return artifact id
   */
  public ArtifactId getArtifactId() {
    return artifactId;
  }

  /**
   * @return {@link PluginClass}
   */
  public PluginClass getPluginClass() {
    return pluginClass;
  }
 
  /**
   * @param properties properties of the plugin
   */
  public void setPluginProperties(PluginProperties properties) {
    this.properties = properties;
  }

  /**
   * Returns the set of properties available when the plugin was created.
   */
  public PluginProperties getProperties() {
    return properties;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }

    Plugin that = (Plugin) o;
    return Objects.equals(artifactId, that.artifactId)
      && Objects.equals(pluginClass, that.pluginClass)
      && Objects.equals(properties, that.properties);
  }

 
@Override
  public int hashCode() {
    return Objects.hash(artifactId, pluginClass, properties);
  }

  @Override
  public String toString() {
    return "Plugin{" +
      "artifactId=" + artifactId +
      ",pluginClass=" + pluginClass +
      ",properties=" + properties +
      '}';
  }
}

 

In initialize() of ETLMapReduce, PipelinePluginInstantiator instantiates a stage at runtime. We can substitute the properties with actual runtime arguments in ETLMapReduce.initialize() for MapReduceContext or ETLSpark.beforeSubmit() for SparkClientContext. DefaultPluginContext can set use substituted properties for a stage and instantiate a stage.

No Format
 
PipelinePluginInstantiator pluginInstantiator = new PipelinePluginInstantiator(context, phaseSpec);
// substitute properties of a plugin and set it in the context
BatchConfigurable<BatchSourceContext> batchSource = pluginInstantiator.newPluginInstance(sourceName, substitutedProperties);

 

LimitationsConstraints:-

  • The runtime argument name should not contain '$'

...

  • This design is only applicable for batch and not for realtime because current implementation for realtime does not have a way for passing tokens which is required for preaction/custom actions in a pipeline to pass runtime arguments. 

Assumptions:

  • The hydrator app (SparkClientContext, MapReduceContext) will have access to secure store manager to substitute values from key store.

 

Thoughts from Terence: 

Below are the thoughts I have so far.

...