Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

Goals

Checklist

  • User stories documented (Albert/Vinisha) 
  • User stories reviewed (Nitin)
  • Design documented (Albert/Vinisha)
  • Design reviewed (Terence/Andreas)
  • Feature merged ()
  • Examples and guides ()
  • Integration tests () 
  • Documentation for feature ()
  • Blog post

Use Cases

  1. A pipeline developer wants to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run. For example, the time partition(s) that should be read by the source, and the name of the dataset sink, need to be set at a per-run basis.  The arguments can be set either through CDAP runtime arguments/preferences, or by the pipeline itself. For example, at the start of the run, the pipeline performs some action (ex: queries a dataset or makes an http call) to lookup which time partitions should be read, and where data should be written to, for that pipeline run. Alternatively, a user can manually specify the time partitions through CDAP runtime arguments/preferences then start the run.

User Stories

  1. As a pipeline developer, I want to be able to configure a plugin property to some value that will get substituted for each run based on the runtime arguments
  2. As a pipeline operator, I want to be able to set arguments for the entire pipeline that will be used for substitution
  3. As a pipeline operator, I want to be able to set arguments for a specific stage in the pipeline that will be used for substitution
  4. As a plugin developer, I want to be able to write a code that is executed at the start of the pipeline and sets arguments for the rest of the run.

Design (WIP - dont review yet)

Specifying Macros

We can introduce macro syntax that can be used in plugin configs that the Hydrator app will substitute before any plugin code is run. For example:

{
  "stages": [
    {
      "name": "customers",
      "plugin": {
        "name": "File",
        "type": "batchsource",
        "properties": {
          "path": "hdfs://host:port/${customers_inputpath}" // ${customers_inputpath} will get replaced with the value of the 'customers_inputpath' runtime argument
        }
      }
    },
    {
      "name": "items",
      "plugin": {
        "name": "File",
        "type": "batchsource",
        "properties": {
          "path": "hdfs://host:port/${items_inputpath}" // ${items_inputpath} will get replaced with the value of the 'items_inputpath' runtime argument
        }
      }
    }
  ]
}

Setting Hydrator runtime arguments using CDAP runtime arguments/preferences

CDAP preferences and runtime arguments will be used directly as Hydrator arguments.

1.) For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for runtime arguments.

2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. To substitute, we can use Macro api. We already have it in hydrator.

public interface Macro {

  /**
   * Get the value of the macro based on the context and arguments.
   *
   * @param arguments arguments to the macro
   * @param context the runtime context, which gives access to things runtime arguments.
   * @return the macro value
   */
  String getValue(@Nullable String arguments, RuntimeContext context) throws Exception;
}

Now, substitution can can be a value which can be directly substituted or it can be key to some keystore for example, in case of SFTP. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '$secure.key', in case of value directly to be substituted, macro can be '$inputpath' without any prefix.

 

3.) Set substituted properties for a stage. To setProperties at runtime for a stage, we will need to add setPluginProperties() in PluginContext and Plugin.java as shown below.

@Beta
public interface PluginContext {

  /**
   * Gets the {@link PluginProperties} associated with the given plugin id.
   *
   * @param pluginId the unique identifier provide when declaring plugin usage in the program.
   * @return the {@link PluginProperties}.
   * @throws IllegalArgumentException if pluginId is not found
   * @throws UnsupportedOperationException if the program does not support plugin
   */
  PluginProperties getPluginProperties(String pluginId);

  /**
   *
   * @param pluginId the unique identifier provide when declaring plugin usage in the program.
   * @param pluginProperties the {@link PluginProperties} for
   * @throws IllegalArgumentException if pluginId is not found
   * @throws UnsupportedOperationException if the program does not support plugin
   */
  void setPluginProperties(String pluginId, PluginProperties pluginProperties);

  /**
   * Loads and returns a plugin class as specified by the given plugin id.
   *
   * @param pluginId the unique identifier provide when declaring plugin usage in the program.
   * @param <T> the class type of the plugin
   * @return the resulting plugin {@link Class}.
   * @throws IllegalArgumentException if pluginId is not found
   * @throws UnsupportedOperationException if the program does not support plugin
   */
  <T> Class<T> loadPluginClass(String pluginId);

  /**
   * Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with
   * {@link PluginProperties} provided at the time when the
   * {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the
   * program configuration time.
   *
   * @param pluginId the unique identifier provide when declaring plugin usage in the program.
   * @param <T> the class type of the plugin
   * @return A new instance of the plugin being specified by the arguments
   *
   * @throws InstantiationException if failed create a new instance
   * @throws IllegalArgumentException if pluginId is not found
   * @throws UnsupportedOperationException if the program does not support plugin
   */
  <T> T newPluginInstance(String pluginId) throws InstantiationException;
}
package co.cask.cdap.api.plugin;
 
public final class Plugin {
  private final ArtifactId artifactId;
  private final PluginClass pluginClass;
  private PluginProperties properties;

  public Plugin(ArtifactId artifactId, PluginClass pluginClass, PluginProperties properties) {
    this.artifactId = artifactId;
    this.pluginClass = pluginClass;
    this.properties = properties;
  }

  /**
   * @return artifact id
   */
  public ArtifactId getArtifactId() {
    return artifactId;
  }

  /**
   * @return {@link PluginClass}
   */
  public PluginClass getPluginClass() {
    return pluginClass;
  }
  
  public void setPluginProperties(PluginProperties properties) {
    this.properties = properties;
  }

  /**
   * Returns the set of properties available when the plugin was created.
   */
  public PluginProperties getProperties() {
    return properties;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }

    Plugin that = (Plugin) o;
    return Objects.equals(artifactId, that.artifactId)
      && Objects.equals(pluginClass, that.pluginClass)
      && Objects.equals(properties, that.properties);
  }

  @Override
  public int hashCode() {
    return Objects.hash(artifactId, pluginClass, properties);
  }

  @Override
  public String toString() {
    return "Plugin{" +
      "artifactId=" + artifactId +
      ",pluginClass=" + pluginClass +
      ",properties=" + properties +
      '}';
  }
}

 

In initialize() of ETLMapReduce, PipelinePluginInstantiator instantiates a stage at runtime. To set substituted properties for a stage, we can call context.setPluginProperties() in ETLMapReduce.initialize() for MapReduceContext or ETLSpark.beforeSubmit() for SparkClientContext. DefaultPluginContext can set substituted properties for a stage and instantiate a stage.

 
PipelinePluginInstantiator pluginInstantiator = new PipelinePluginInstantiator(context, phaseSpec);
// substitute properties of a plugin and set it in the context
context.setPluginProperties(sourceName, properties);
BatchConfigurable<BatchSourceContext> batchSource = pluginInstantiator.newPluginInstance(sourceName);

Limitations:

- The runtime argument name should not contain '$'
- Rumtime argument name should be unique through out pipeline

 

Thoughts from Terence:

 

Below are the thoughts I have so far.
1. Preferences/runtime arguments substitution for configuration values
  - Can start with simple $var substitution
  - The DataPipeline app performs the substitution
  - The perferences can be scoped
    - Properties prefixed with the plugin name (stage name?) will be striped
    - Property in more specific scope will override the less specific one
     - e.g. If having both "password" => "a" and "plugin1.password" => "b" in perferences, then for Plugin "plugin1", it will see "password" => "b"
2. Secure store support
  - For managing passphase so that plugin config will only contains key name, but not the actual key
  - Plugins that need sensitive information need to be adjusted to use the key management
  - Potentially can have the DataPipeline app do the substitution as well
    - But we cannot use "$", since it's used above. Maybe can be "#".
      - E.g. for plugin config {"password" => "#dbpassword"}, then at runtime the actual password with name "dbpassword" will be fetched from the secure store.
      
3. Expression computation
  - Evaluate by the DataPipeline app at runtime when instantiating plugin
   - Evaluation result will be used as the plugin config value
  - JS expression
  - May need to expose some predefined variables (e.g. logicalStartTime)
  - Should limit to evaluation of config values
    - Per record expression evaulation would be too slow and easily misused. Shouldn't encourage.
  - For per record expression computation (e.g computing HBase row key), should encourage to use JS transform
    - For performance reason
    - Inserting an extra JS transform to augment record (e.g. add field, remove field, combine/recompute fields) should be easy.
      - May need better support of schema propagation (since this JS transform won't have a fix in/out schema)
      
4. Pre/Post custom action hook
  - Need to define the Hydrator API for that so that plugin can be written
  - Need to get CDAP-4648 resolved. Specifically I would like to replace WorkflowAction with Worker in Workflow.
  - UI needs to be adjust
  - To support compiled code, user can write custom plugin for custom action and uses it in Hydrator
  - When resolving plugin config values (point 1) for the plugins used in the execution engine (MR or Spark program), the resolution can combine both perferences and WorkflowToken, which the values in WorkflowToken has higher precedence.
  - We can have out of the box custom actions for common actions
    - email
    - make call to REST (with parameters resolution in point 1 and 3)
    - make call to CDAP service (this is slightly different than REST because it involves discovery)
      - May need to enable making cross namespace service call
    - JS custom action
      - Exposes the WorkflowToken for the JS to update
 
5. Fork-join and condition
  - Underlying Workflow only supports fork-join and condition at the node level
    - i.e. fork and runs two MR/Spark in parallel. Optionally execute a MR/Spark
  - How to support fork-join and condition at record level? We had some discussion between Andreas, Albert and me on how. Need to finalize the design.
  - Would involves modifying the JSON structure between UI and the app.

 

 

  • No labels