Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 42 Next »

Goals

Checklist

  • User stories documented (Albert/Vinisha) 
  • User stories reviewed (Nitin)
  • Design documented (Albert/Vinisha)
  • Design reviewed (Terence/Andreas)
  • Feature merged ()
  • Examples and guides ()
  • Integration tests () 
  • Documentation for feature ()
  • Blog post

Use Cases

  1. A pipeline developer wants to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run. 
    1. pipeline developer wants to create a pipeline that reads from a database source and writes to a Table sink. we wants to configure the name of database table and name of table sink at per run basis and he gives those values as input before starting the run.  
  2. Pipeline developer wants to create a pipeline with a custom action at the start of the run, the custom action based on a logic provides the name of the database to use as source and the name of the table to write in sink, the next stage in pipeline uses this information to read from the appropriate db source and write to table sink.  

User Stories

  1. As a pipeline developer, I want to be able to configure a plugin property to some value that will get substituted for each run based on the runtime arguments
  2. As a pipeline operator, I want to be able to set arguments for the entire pipeline that will be used for substitution
  3. As a pipeline operator, I want to be able to set arguments for a specific stage in the pipeline that will be used for substitution
  4. As a plugin developer, I want to be able to write a code that is executed at the start of the pipeline and sets arguments for the rest of the run.

Design (WIP - dont review yet)

Macros Syntax


Expanded Syntax : 
${macro-type(macro)}
 
Shorthand notation:
${macro}
 
Example Usage: 
${runtime-argument(hostname)) - get hostname from runtime arguments
${wf-token(hostname)) - get hostname from workflow token
${secure(access_key)) - get access key from secure store 
${function_time(time_format)) - apply time function on the time_format provided and use the value. 
 
The Default (short-hand) usage will read from runtime arguments, having an expanded notation gives user option for using more macro types.
Examples :
ipConfig : ${hostname}:${port}
JDBC connection string : jdbc:${jdbc-plugin}://${hostname}:${sql-port}/${db-name}
 

 


Pipeline Config
"stages": [
    {
        "name": "Database",
        "plugin": {
            "name": "Database",
            "type": "batchsource",
            "properties": {
				...
                "user": "${username}",
                "password": "${secure(sql-password)}",
                "jdbcPluginName": "jdbc",
                "jdbcPluginType": "${jdbc-type}",
                "connectionString": "jdbc:${jdbc-type}//${hostname}:${port}/${db-name}",
                "importQuery": "select * from ${table-name};"
            }
        }
    },
    {
        "name": "Table",
        "plugin": {
            "name": "Table",
            "type": "batchsink",                                        
            "properties": {
                "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",
                \"fields\":[{\"name\":\"name\",\"type\":\"string\"},
                {\"name\":\"age\",\"type\":\"int\"},{\"name\":\"emp_id\",\"type\":\"long\"}]}",
                "name": "${table-name}",
                "schema.row.field": "name"
            }
        }
    }
]

Scoping:

Since the macro-substitution is performed at the DataPipeline app level, it will be possible to scope at stage name level if the user desires that. 

In our example config of JDBC source to Table sink, there is a common macro "${table-name}", if the user wants to provide a different name for the table-name in Table Sink, he can use scoping.

Example for Scoping:

Provided runtime arguments:
 
Key : table-name, value : employees 
Key : TableSink:table-name, value : employee_sql 
 
table-name is the macro name that is used in both DBSource stage and TableSink stage. 
if user wants to provide a special value for macro "table-name" to be used in TableSink, he will prefix stage-name before the macro name separated by the delimiter (colon).

 

 

Hydrator Plugin Changes

Currently when we deploy a pipeline,  configurePipeline is called on each plugin. we perform few validations in configure stage, specifically for schema, syntax for scripts, etc. In some Plugins we also create dataset if the dataset doesn't already exist. 

The dataset to write to can be macro-substituted. so we have to defer dataset creation to prepareRun rather than doing at configure stage. 

Deferring dataset creation in prepareRun will required changes to BatchSinkContext to have a new method. 

@Beta
public interface BatchSinkContext extends BatchContext {
// new method
void createDataset(String datasetName, String typeName, DatasetProperties properties);
 
//existing methods
@Deprecated
void addOutput(String datasetName);
...
}

Currently if a  stream given in stream source or table given in table source doesn't exist, we create a new stream/table, we want to disallow this, so this addition will only be in BatchSinkContext and not BatchContext.

PluginConfigurer can be made not to extend DatasetConfigurer.

 

However there are certain fields which are used to determine the schema in the plugin and those cannot be macro-substituted as schema validation is essential during configure time and we want to disallow macro usage for them. 

One option for this is to use Annotations in plugins. 

The plugin can specify using an annotation if a property field can support macro or not. This will require CDAP Platform change as its a new Annotation.

 

// if a pipeline config fields does not support macro, can be specified at config level. 
// if not specified by default, a field can not be macro substituted
@Macro(enabled=true) 
public static class ValidatorConfig extends PluginConfig {
  @Description("Comma-separated list of validator plugins that are used in script")
  String validators;
  @Description(SCRIPT_DESCRIPTION)
  String validationScript;

  @Description("Lookup tables to use during transform. Currently supports KeyValueTable.")
  @Nullable
  String lookup;
}
public class TableSinkConfig extends PluginConfig {
  @Name(Properties.Table.NAME)
  @Description("Name of the table. If the table does not already exist, one will be created.")
  @Macro(enabled=true) // The name of the table can be specified by a runtime macro
  private String name;

  @Name(Properties.Table.PROPERTY_SCHEMA)
  @Description("schema of the table as a JSON Object. If the table does not already exist, one will be " +
    "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " +
    "table created will not be explorable.")
  @Nullable
  // The schema should not be substituted by a runtime macro
  private String schemaStr;

  @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD)
  @Description("The name of the record field that should be used as the row key when writing to the table.")
  @Macro(enabled=true) // The name of the row field can also be specified by a runtime macro
  private String rowField;
}

CDAP Platform Changes:

 

Macro Annotation
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Macro {

  /**
   * Default status if macro is enabled.
   */
  boolean DEFAULT_STATUS = false;

  /**
   * Returns if macro is enabled. Default is 'false'.
   */
  boolean enabled() default DEFAULT_STATUS;
}
 
/**
 * Contains information about a property used by a plugin.
 */
@Beta
public class PluginPropertyField {

  private final String name;
  private final String description;
  private final String type;
  private final boolean required;
  // returns true if this field can accept macro
  private final boolean macroEnabled;
  ...
}

 

Implementation Details

 DataPipeline app instantiates a plugin (using plugin context) and then performs macro substitution on the plugin and uses the updated Plugin with macro substituted configs.

MacroContext
interface MacroContext {	
	/**
	 * Given the macro key, return the substituted value
     */ 
	String getValue(String macroKey);
}


Macro Types
Based on the macro type, one of the below MacroContext's will be used to get the value for macro. 
 
DefaultMacroContext implements MacroContext {
	Map<String, String> runtimeArguments;
	String getValue(String macroKey) {
		return runtimeArguments.get(macroKey);
	}
}

SecureMacroContext implements MacroContext {
	SecureStore secureStore;
	String getValue(String macroKey) {
		return secureStore.get(macroKey);
	}
}

RuntimeFunctionMacro implements MacroContext {	
	TimeZone timeZone;
	long logicalStartTime;
	Function<String, String> timezoneFunction;
	String getValue(String arguments) {
		return timezoneFunction.apply(arguments);
	}
} 
 

----------------------

Setting Hydrator runtime arguments using CDAP runtime arguments/preferences

CDAP preferences and runtime arguments will be used directly as Hydrator arguments. 

1.) Runtime arguments can be passed to hydrator pipeline in 2 ways:

  1. Using Prepipeline-CustomActions:
    Prepipeline custom actions can set runtime arguments. For example, before running the pipeline, custom actions can copy local files to hdfs and set runtime arguments for input path for batchsource. In order to do that, we can expose setPreferences() and getPreferences() programmatic api for setting runtime arguments. These arguments can be passed to hydrator app using workflow token. 
  2. Using Hydrator UI:
    For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for preferences/runtime arguments framework. 

2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '${secure(key)}', in case of value directly to be substituted, macro can be '${inputpath}' without any prefix. 

 

 

Thoughts from Terence:

Below are the thoughts I have so far.
1. Preferences/runtime arguments substitution for configuration values
  - Can start with simple $var substitution
  - The DataPipeline app performs the substitution
  - The perferences can be scoped
    - Properties prefixed with the plugin name (stage name?) will be striped
    - Property in more specific scope will override the less specific one
     - e.g. If having both "password" => "a" and "plugin1.password" => "b" in perferences, then for Plugin "plugin1", it will see "password" => "b"
  - For managing passphase so that plugin config will only contains key name, but not the actual key
  - Plugins that need sensitive information need to be adjusted to use the key management
  - Potentially can have the DataPipeline app do the substitution as well
    - But we cannot use "$", since it's used above. Maybe can be "#".
      - E.g. for plugin config {"password" => "#dbpassword"}, then at runtime the actual password with name "dbpassword" will be fetched from the secure store.
----------------------------

App Level Substitution

One possibility is for substitution to be implemented at the app level. This would be ideal if we want to keep the concept of macros Hydrator-specific. If substitution were to occur at the app level, then the user would dictate which fields will be macro-substitutable through the plugin configuration UI. In order to allow non-string properties to be substitutable, the user must provide a default value along with the macro through the UI. For example, it a user enters the "port" property as: ${port}, the UI will provide a way for the user to enter a default port value. Creating a DB batch source would yield the following configuration JSON:

Pipeline Config
"plugin": {
	"name": "Database",
	"type": "batchsource",
	"properties": {
		"user": "${username}",
		"password": "${secure(sql-password)}",
		"jdbcPluginName": "jdbc",
		...
		"importQuery": "select * from ${table-name};"
		...
		"macroDefaults": {
			"user": "admin",
			"password": "pw1234",
			"importQuery": "DO 0"
		}
	}
}



 

Platform Level Substitution

Another possibility is for substitution to be implemented at the platform level. This would be ideal if we want to keep the concept of macros available across all CDAP applications.

 

 

  • No labels