Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

2) At DataPipeline App level

 

Platform Level Substitution:

Plugins can use "@Macro" annotation to specify if a plugin field can be a macro and also provides a configure-value to use at configure time to instantiate the plugin.  

...

Code Block
public class TableSinkConfig extends PluginConfig {
  @Name(Properties.Table.NAME)
  @Description("Name of the table. If the table does not already exist, one will be created.")
  @Macro(enabled=true) // The name of the table can be specified by a runtime macro, by default macros are disabled for fields. this field uses the value "test" at config time. 
  @Macro(enabled=true, configValue="test") 
  private String name;

  @Name(Properties.Table.PROPERTY_SCHEMA)
  @Description("schema of the table as a JSON Object. If the table does not already exist, one will be " +
    "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " +
    "table created will not be explorable.")
  @Nullable
  private String schemaStr;

  @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD)
  @Description("The name of the record field that should be used as the row key when writing to the table.")
  @Macro(enabled=true) // The name of the row field can also be specified by a runtime macro
  private String rowField;
}

 

This will require a CDAP platform level change as its a new annotation. PluginInstantiator has to understand and set fields appropriately..

 

Code Block
titleMacro Annotation
private String rowField;
}

 

 

Code Block
titleMacro Annotation
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Macro {

  /**
   * Default status if macro is enabled.
   */
  boolean DEFAULT_STATUS = false;

  /**
   * Returns if macro is enabled. Default is 'false'.
   */
  boolean enabled() default DEFAULT_STATUS;
}  
Code Block

  /**
   * Containsreturns information about the value to be used at config time
   */
  String configValue();
}
 
Code Block
/**
 * Contains information about a property used by a plugin.
 */
@Beta
public class PluginPropertyField {

  private final String name;
  private final String description;
  private final String type;
  private final boolean required;
  // returns true if this field can accept macro
  private final boolean macroEnabled;
  ...
}

 

Implementation Details

 DataPipeline app instantiates a plugin (using plugin context) and then performs macro substitution on the plugin and uses the updated Plugin with macro substituted configs.

Code Block
titleMacroContext
interface MacroContext {	
	/**
	 * Given the macro key, return the substituted value
     */ 
	String getValue(String macroKey);
}
Code Block
titleMacro Types
Based on the macro type, one of the below MacroContext's will be used to get the value for macro. 
 
DefaultMacroContext implements MacroContext {
	Map<String, String> runtimeArguments;
	String getValue(String macroKey) {
		return runtimeArguments.get(macroKey);
	}
}

SecureMacroContext implements MacroContext {
	SecureStore secureStore;
	String getValue(String macroKey) {
		return secureStore.get(macroKey);
	}
}

RuntimeFunctionMacro implements MacroContext {	
	TimeZone timeZone;
	long logicalStartTime;
	Function<String, String> timezoneFunction;
	String getValue(String arguments) {
		return timezoneFunction.apply(arguments);
	}
} 
 

----------------------

Setting Hydrator runtime arguments using CDAP runtime arguments/preferences

CDAP preferences and runtime arguments will be used directly as Hydrator arguments. 

1.) Runtime arguments can be passed to hydrator pipeline in 2 ways:

  1. Using Prepipeline-CustomActions:
    Prepipeline custom actions can set runtime arguments. For example, before running the pipeline, custom actions can copy local files to hdfs and set runtime arguments for input path for batchsource. In order to do that, we can expose setPreferences() and getPreferences() programmatic api for setting runtime arguments. These arguments can be passed to hydrator app using workflow token. 
  2. Using Hydrator UI:
    For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for preferences/runtime arguments framework. 

2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '${secure(key)}', in case of value directly to be substituted, macro can be '${inputpath}' without any prefix. 

 

 

Thoughts from Terence:

Below are the thoughts I have so far.
1. Preferences/runtime arguments substitution for configuration values
  - Can start with simple $var substitution
  - The DataPipeline app performs the substitution
  - The perferences can be scoped
    - Properties prefixed with the plugin name (stage name?) will be striped
    - Property in more specific scope will override the less specific one
     - e.g. If having both "password" => "a" and "plugin1.password" => "b" in perferences, then for Plugin "plugin1", it will see "password" => "b"
  - For managing passphase so that plugin config will only contains key name, but not the actual key
  - Plugins that need sensitive information need to be adjusted to use the key management
  - Potentially can have the DataPipeline app do the substitution as well
    - But we cannot use "$", since it's used above. Maybe can be "#".
      - E.g. for plugin config {"password" => "#dbpassword"}, then at runtime the actual password with name "dbpassword" will be fetched from the secure store.
----------------------------

App Level Substitution (wip)

One possibility is for substitution to be implemented at the app level. This would be ideal if we want to keep the concept of macros Hydrator-specific. If substitution were to occur at the app level, then the user would dictate which fields will be macro-substitutable through the plugin configuration UI. In order to allow non-string properties to be substitutable, the user must provide a default value along with the macro through the UI. For example, it a user enters the "port" property as: ${port}, the UI will provide a way for the user to enter a default port value. Creating a DB batch source would yield the following configuration JSON:

Code Block
titlePipeline Config
"plugin": {
	"name": "Database",
	"type": "batchsource",
	"properties": {
		"user": "${username}",
		"password": "${secure(sql-password)}",
		"jdbcPluginName": "jdbc",
		...
		"importQuery": "select * from ${table-name};"
		...
		"macroDefaults": {
			"user": "admin",
			"password": "pw1234",
			"importQuery": "DO 0"
		}
	}
}

 

Platform Level Substitution (wip)

Another possibility is for substitution to be implemented at the platform level. This would be ideal if we want to keep the concept of macros available across all CDAP applications.

 

Documentation Changes (wip)

 

Regardless of where the substitution occurs, the guidelines for creating Hydrator plugins would have to change. For existing plugins, any validation for properties that are macro-substitutable that exists in configurePipeline must be moved to prepareRun.

 

Reference

 

Many plugins have properties that are used in constructing or validating a schema at configure time. These fields need to have macros disabled to allow this. The following plugins and fields would be affected:

 

...

Copybook contents are converted to an InputStream and used to get external records, which are in turn used to add fields to the schema.

...

Notes

This will require a CDAP platform level change as its a new annotation.

PluginInstantiator has to understand if macro is enabled and set fields appropriately.. 

During configure time, if a field is macro enabled, PluginInstantiator would try to use configValue as the value for the field in order to instantiate the plugin.

Pipeline developer should take care that configurePipeline doesn't use the config fields which are macro enabled.  

During runtime, PipelineInsantiator would get config fields and values to substitute and can use that information to substitute macro appropriately and return an instantiated plugin.

 

App Level Substitution:

One possibility is for substitution to be implemented at the app level. This would be ideal if we want to keep the concept of macros Hydrator-specific. If substitution were to occur at the app level, then the user would dictate which fields will be macro-substitutable through the plugin configuration UI. In order to allow non-string properties to be substitutable, the user must provide a default value along with the macro through the UI. For example, it a user enters the "port" property as: ${port}, the UI will provide a way for the user to enter a default port value. Creating a DB batch source would yield the following configuration JSON:

Code Block
titlePipeline Config
"plugin": {
	"name": "Database",
	"type": "batchsource",
	"properties": {
		"user": "${username}",
		"password": "${secure(sql-password)}",
		"jdbcPluginName": "jdbc",
		...
		"importQuery": "select * from ${table-name};"
		...
		"macroDefaults": "{
			\"user\": \"admin\",
			\"password\": \"pw1234\",
			\"importQuery\": \"select * from test;\"
		}"
	}
}


In this case, the app understands from configuration the fields that are macros and the default values to use for those fields during configure time. 


This would require a new method in PluginContext to accept key and value pairs to substitute for plugin properties.

 

Code Block
@Beta
public interface PluginContext {
 
// existing methods
PluginProperties getPluginProperties(String pluginId);
<T> Class<T> loadPluginClass(String pluginId);
<T> T newPluginInstance(String pluginId) throws InstantiationException;
 
/**
 * Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with
 * {@link PluginProperties} provided at the time when the
 * {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the
 * program configuration time. In addition the parameter pluginProperties can be used to override the existing
 * plugin properties in config, with which the plugin instance will have substituted plugin properties.
 *
 * @param pluginId the unique identifier provide when declaring plugin usage in the program.
 * @param <T> the class type of the plugin
 * @param pluginProperties the properties to override existing plugin properties before instance creation.
 * @return A new instance of the plugin being specified by the arguments
 *
 * @throws InstantiationException if failed create a new instance
 * @throws IllegalArgumentException if pluginId is not found
 * @throws UnsupportedOperationException if the program does not support plugin
 */
<T> T newPluginInstance(String pluginId, Map<String, String> pluginProperties) throws InstantiationException;
 

 

Configure time:

The app can all this new method with macroDefault values, so plugin instance creation will use macro default values for those config fields.

Run time:

The app performs substitution for the properties with macros using the value from runtime arguments ( or workflow token) and calls the method with the field name's and substitution value's. 

Custom Action Setting Config values:

Custom action can use workflow token to set values for field names.  


Documentation Changes (wip)

 

Regardless of where the substitution occurs, the guidelines for creating Hydrator plugins would have to change. For existing plugins, any validation for properties that are macro-substitutable that exists in configurePipeline must be moved to prepareRun.



 

Reference

 

Many plugins have properties that are used in constructing or validating a schema at configure time. These fields need to have macros disabled to allow this. The following plugins and fields would be affected:

 

PluginFieldsUseConflict
BatchCassandraSourceschemaParsed for correctness to create the schema.Parsing a macro or schema with a nested macro would fail.
CopybookSourcecopybookContents

Copybook contents are converted to an InputStream and used to get external records, which are in turn used to add fields to the schema.

Schema would add macro literal as a field.
DedupAggregatoruniqueFields, filterOperationBoth fields are used to validate the input schema created.Macro literals do not exist as fields in schema and will throw IllegalArgumentException.
DistinctAggregatorfieldsSpecifies the fields used to construct the output schema.Will add macro literals as schema fields.*
GroupByAggregatorgroupByFields, aggregates,Gets fields from input schema and adds aggregates to to output fields list.Macro literals do not exist in input schema or are valid fields for an output schema.
RowDenormalizerAggregatorkeyField, nameField, valueFieldGets schemas by field names from the input schema.Macro literals do not exist as fields in the input schema.
KVTableSinkkeyField, valueFieldValidates that presence and type of these fields in the input schema.Macro literals will not exist in the input schema.
SnapshotFileBatchAvroSinkschemaParses schema to add file properties.Macro literals may disallow schema parsing or incorrect schema creation.
SnapshotFileBatchParquetSinkschemaParses schema to add file properties.Macro literals may disallow schema parsing or incorrect schema creation.
TableSinkschema, rowFieldValidates output and input schemas if properties specified.Macro literals will lead to failed validation of schema and row field.
TimePartitionedFileSetDatasetAvroSinkschemaParses schema to add file properties.Parsing macro literals in schema would fail.
TimePartitionedFileSetDatasetParquetSinkschemaParses schema to add file properties.Parsing macro literals in schema would fail.
SnapshotFileBatchAvroSourceschemaParses schema property to set output schema.Macro literals can lead to invalid schema parsing or creation.
SnapshotFileBatchParquetSourceschemaParses schema property to set output schema.Macro literals can lead to invalid schema parsing or creation.
StreamBatchSourceschema, name, formatStream is added and created through name and schema is parsed to set output schema.Macro literals will lead to bad parsing of properties.
TableSourceschemaSchema parsed to set output schema.Macro literals will lead to failed or incorrect schema creation.
TimePartitionedFileSetDatasetAvroSourceschemaSchema parsed to set output schema.Macro literals will lead to failed or incorrect schema creation.
TimePartitionedFileSetDatasetParquetSourceschemaSchema parsed to set output schema.Macro literals will lead to failed or incorrect schema creation.
JavaScriptTransformschema, script, lookupSchema format is used to set the output schema. JavaScript and lookup properties are also parsed for correctness.Macro literals can cause parsing to fail for schema creation, JavaScript compilation, or lookup parsing.
LogParserTransforminputNameGets field from input schema through inputName property.With a macro literal, the field will not exist in the input schema.
ProjectionTransformfieldsToKeep, fieldsToDrop, fieldsToConvert, fieldsToRenameProperties are used to create output schema.Macro literals will lied to a failed or wrong output schema being created.
PythonEvaluatorschemaSchema parsed for correctness and set as output schema.Macro literal can lead to failed or bad schema creation.
ValidatorTransformvalidators, validationScript,Validator property used to set validator plugins. Script property is also parsed for correctness.Macro literals can lead to failed parsing or plugins being set. Scripts can not be validated without validators.
ElasticsearchSourceschemaSchema parsed for correctness and set as output schema.Macro literal literals can lead to failed or bad incorrect schema parsing/creation.
ValidatorTransformHBaseSinkvalidators, validationScript,Validator property used to set validator plugins. Script property is also parsed for correctness.Macro literals can lead to failed parsing or plugins being set. Scripts can not be validated without validators.
ElasticsearchSourceschemaSchema parsed for correctness and set as output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
HBaseSinkrowField, schemarowField, schemaParsed to valid the output and input schemas and set the ouput schema.Macro literals can lead to failed or incorrect schema parsing/creation.
HBaseSourceschemaParsed for correctness to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
HiveBatchSourceschemaParsed for correctness to set ouput schema.Macro literals can lead to failed or incorrect schema parsing/creation.
MongoDBBatchSourceschemaParsed for correctness and validated to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
NaiveBayesClassifierpredictionFieldConfigures and sets fields of output schema and checked for existence in input schema.Output schema would be created wrongly with macro literal as prediction field and input schema check behavior is undefined.
Compressorcompressor, schemaParsed for correctness and used to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
CSVFormatterschemaParsed for correctness and used to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
CSVParserfieldValidated against input schema to check existence of field.Macro literals may not exist as fields in the input schema.
Decoderdecode, schemaDecode property is parsed and validated then used to validate the input schema. Schema parsed to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema.
Decompressordecompressor, schemaDecompressor property is parsed and validated then used to validate the input schema. Schema parsed to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema.
Encoderencode, schemaEncode property is parsed and validated then used to validate the input schema. Schema parsed to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema.
JSONFormatterschemaParsed for correctness and used to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
JSONParserfield, schemaValidates if field property is present in input schema. Parses schema property to set output schema.Macro literal may not exist in input schema and may lead to failed parsing or creation of output schema.
StreamFormatterschemaParsed for correctness and used to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.

 

May need verification

 


 

Other plugins have fields that are validated/processed at configure time that do not affect the schema. In these cases, these can be moved to the prepare run method. The following plugins and fields would be affected:

 

 
PluginFieldsUseJustification
StreamBatchSourceduration, delayParsed and validated for proper formatting.The parsing/validation is not related to the schema's creation.
TimePartitionedFileSetSourceduration, delayParsed and validated for proper formatting.The parsing/validation is not related to the schema's or dataset's creation.
ReferenceBatchSinkreferenceNameVerifies reference name meets dataset ID constraints.As dataset names can be macros, this supports the primary use case.
ReferenceBatchSourcereferenceNameVerifies that reference name meets dataset ID constraints.As dataset names can be macros, this supports the primary use case.
FileBatchSourcetimeTableCreates dataset from time table property.This is a primary use case for macros.
TimePartitionedFileSetSourcename, basePathName and basePath are used to create the dataset.This is a primary use case for macros.
BatchWritableSinkname, typeCreates dataset from properties.This is a primary use case for macros.
SnapshotFileBatchSinknameCreates dataset from name field.This is a primary use case for macros.
BatchReadableSourcename, typeDataset is created from name and type properties.This is a primary use case for macros.
SnapshotFileBatchSourceall properties*Creates dataset from properties.This is a primary use case for macros.
TimePartitionedFileSetSinkall properties*Creates dataset from properties.This is a primary use case for macros.
DBSourceimportQuery, boundingQuery, splitBy, numSplitsValidate connection settings and parsed for formatting.The parsing/validation does not lead to the creation of any schema or dataset.
HDFSSinktimeSuffixParsed to validate proper formatting of time suffix.The parsing/validation does not lead to the creation of any schema or dataset.
KafkaProducerasyncParsed to check proper formatting of boolean.The parsing/validation does not lead to the creation of any schema or dataset.
NaiveBayesClassifierfieldToClassifyChecked if input schema field is of type String.The validation does not lead to the creation or alteration of any schema.
NaiveBayesTrainerfieldToClassify, predictionFieldChecked if input schema fields are of type String and Double respectively.The validation does not lead to the creation or alteration of any schema.
CloneRecordcopiesValidated against being 0 or over the max number of copies.The validation does not lead to the creation of any schema or dataset.
CSVFormatterformatValidated for proper formatting.The validation does not lead to the creation of any schema or dataset.
CSVParserformatValidated for proper formatting.The validation does not lead to the creation of any schema or dataset.
HasherhashChecked against valid hash formats.The check does not lead to the validation or alteration of any schema.
JSONParsermappingMappings extracted and placed into a map with their expressions.The extraction does not affect any schema creation or validation.
StreamFormatterformatChecked against valid stream formats.The check does not lead to the validation or alteration of any schema.
ValueMappermapping, defaultsParsed after configuration is initialized and validated.The check does not lead to the validation or alteration of any schema.

 

May need verification

TimePartitionedFileSetSinkall properties*Creates dataset from properties.This is a primary use case for macros.
DBSourceimportQuery, boundingQuery, splitBy, numSplitsValidate connection settings and parsed for formatting.The parsing/validation does not lead to the creation of any schema or dataset.
HDFSSinktimeSuffixParsed to validate proper formatting of time suffix.The parsing/validation does not lead to the creation of any schema or dataset.
KafkaProducerasyncParsed to check proper formatting of boolean.The parsing/validation does not lead to the creation of any schema or dataset.
NaiveBayesClassifierfieldToClassifyChecked if input schema field is of type String.The validation does not lead to the creation or alteration of any schema.
NaiveBayesTrainerfieldToClassify, predictionFieldChecked if input schema fields are of type String and Double respectively.The validation does not lead to the creation or alteration of any schema.
CloneRecordcopiesValidated against being 0 or over the max number of copies.The validation does not lead to the creation of any schema or dataset.
CSVFormatterformatValidated for proper formatting.The validation does not lead to the creation of any schema or dataset.
CSVParserformatValidated for proper formatting.The validation does not lead to the creation of any schema or dataset.
HasherhashChecked against valid hash formats.The check does not lead to the validation or alteration of any schema.
JSONParsermappingMappings extracted and placed into a map with their expressions.The extraction does not affect any schema creation or validation.
StreamFormatterformatChecked against valid stream formats.The check does not lead to the validation or alteration of any schema.
ValueMappermapping, defaultsParsed after configuration is initialized and validated.The check does not lead to the validation or alteration of any schema.

 

May need verification

 

 

Implementation Details

 

 DataPipeline app instantiates a plugin (using plugin context) and then performs macro substitution on the plugin and uses the updated Plugin with macro substituted configs.

 

Code Block
titleMacroContext
interface MacroContext {	
	/**
	 * Given the macro key, return the substituted value
     */ 
	String getValue(String macroKey);
}

 


 

Code Block
titleMacro Types
Based on the macro type, one of the below MacroContext's will be used to get the value for macro. 
 
DefaultMacroContext implements MacroContext {
	Map<String, String> runtimeArguments;
	String getValue(String macroKey) {
		return runtimeArguments.get(macroKey);
	}
}

SecureMacroContext implements MacroContext {
	SecureStore secureStore;
	String getValue(String macroKey) {
		return secureStore.get(macroKey);
	}
}

RuntimeFunctionMacro implements MacroContext {	
	TimeZone timeZone;
	long logicalStartTime;
	Function<String, String> timezoneFunction;
	String getValue(String arguments) {
		return timezoneFunction.apply(arguments);
	}
} 
 

 

----------------------

 

Setting Hydrator runtime arguments using CDAP runtime arguments/preferences

 

CDAP preferences and runtime arguments will be used directly as Hydrator arguments. 

 

1.) Runtime arguments can be passed to hydrator pipeline in 2 ways:

 

  1. Using Prepipeline-CustomActions:
    Prepipeline custom actions can set runtime arguments. For example, before running the pipeline, custom actions can copy local files to hdfs and set runtime arguments for input path for batchsource. In order to do that, we can expose setPreferences() and getPreferences() programmatic api for setting runtime arguments. These arguments can be passed to hydrator app using workflow token. 
  2. Using Hydrator UI:
    For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for preferences/runtime arguments framework. 

 

2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '${secure(key)}', in case of value directly to be substituted, macro can be '${inputpath}' without any prefix. 

 

 

 

 

 

Thoughts from Terence:

 

Below are the thoughts I have so far.

 

1. Preferences/runtime arguments substitution for configuration values
  - Can start with simple $var substitution
  - The DataPipeline app performs the substitution
  - The perferences can be scoped
    - Properties prefixed with the plugin name (stage name?) will be striped
    - Property in more specific scope will override the less specific one
     - e.g. If having both "password" => "a" and "plugin1.password" => "b" in perferences, then for Plugin "plugin1", it will see "password" => "b"
  - For managing passphase so that plugin config will only contains key name, but not the actual key
  - Plugins that need sensitive information need to be adjusted to use the key management
  - Potentially can have the DataPipeline app do the substitution as well
    - But we cannot use "$", since it's used above. Maybe can be "#".
      - E.g. for plugin config {"password" => "#dbpassword"}, then at runtime the actual password with name "dbpassword" will be fetched from the secure store.

 

----------------------------