Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. A pipeline developer wants to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run. For example, the time partition(s) that should be read by the source, and the name of the dataset sink, need to be set at a per-run basis.  The arguments can be set either through CDAP runtime arguments/preferences, or by the pipeline itself. For example,  
    1. pipeline developer wants to create a pipeline that reads from a database source and writes to a Table sink. we wants to configure the name of database table and name of table sink at per run basis and he gives those values as input before starting the run.  
  2. Pipeline developer wants to create a pipeline with a custom action at the start of the run, the pipeline performs some action (ex: queries a dataset or makes an http call) to lookup which time partitions should be read, and where data should be written to, for that pipeline run. Alternatively, a user can manually specify the time partitions through CDAP runtime arguments/preferences then start the run.custom action based on a logic provides the name of the database to use as source and the name of the table to write in sink, the next stage in pipeline uses this information to read from the appropriate db source and write to table sink.  

User Stories

  1. As a pipeline developer, I want to be able to configure a plugin property to some value that will get substituted for each run based on the runtime arguments
  2. As a pipeline operator, I want to be able to set arguments for the entire pipeline that will be used for substitution
  3. As a pipeline operator, I want to be able to set arguments for a specific stage in the pipeline that will be used for substitution
  4. As a plugin developer, I want to be able to write a code that is executed at the start of the pipeline and sets arguments for the rest of the run.

...

Code Block
Expanded Syntax : 
${macro-type(macro)}
 
Shorthand notation:
${macro}
 
Example Usage: 
${runtime-argument(hostname)) - get hostname from runtime arguments
${wf-token(hostname)) - get hostname from workflow token
${secure(access_key)) - get access key from secure store 
${function_time(time_format)) - apply time function on the time_format provided and use the value. 
 
The Default (short-hand) usage will read from runtime arguments, having an expanded notation gives user option for using more macro types.
Examples :
ipConfig : ${hostname}:${port}
JDBC connection string : jdbc:${jdbc-plugin}://${hostname}:${sql-port}/${db-name}
 

...

Code Block
titlePipeline Config
"stages": [
    {
        "name": "Database",
        "plugin": {
            "name": "Database",
            "type": "batchsource",
            "properties": {
				...
                "user": "${username}",
                "password": "${secure(sql-password)}",
                "jdbcPluginName": "jdbc",
                "jdbcPluginType": "${jdbc-type}",
                "connectionString": "jdbc:${jdbc-type}//${hostname}:${port}/${db-name}",
                "importQuery": "select * from ${table-name};"
            }
        }
    },
    {
        "name": "Table",
        "plugin": {
            "name": "Table",
            "type": "batchsink",                                        
            "properties": {
                "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",
                \"fields\":[{\"name\":\"name\",\"type\":\"string\"},
                {\"name\":\"age\",\"type\":\"int\"},{\"name\":\"emp_id\",\"type\":\"long\"}]}",
                "name": "${table-name}",
                "schema.row.field": "name"
            }
        }
    }
]

New API's in Hydrator for Macro Support:

DataPipeline app instantiates a plugin (using plugin context) and then performs macro substitution on the plugin and uses the updated Plugin with macro substituted configs.

Code Block
titleMacroContext
interface MacroContext {	
	/**
	 * Given the macro key, return the substituted value
     */ 
	String getValue(String macroKey);
}

...

titleMacro Types

...

Scoping:

Since the macro-substitution is performed at the DataPipeline app level, it will be possible to scope at stage name level if the user desires that. 

In our example config of JDBC source to Table sink, there is a common macro "${table-name}", if the user wants to provide a different name for the table-name in Table Sink, he can use scoping.

Code Block
Example for Scoping:

Provided runtime arguments:
 
Key : table-name, value : employees 
Key : TableSink:table-name, value : employee_sql 
 
table-name is the macro name that is used in both DBSource stage and TableSink stage. 
if user wants to provide a special value for macro. "table-name"  to DefaultMacroContextbe implementsused MacroContext {
	Map<Stringin TableSink, String>he runtimeArguments;will 	String getValue(String macroKey) {
		return runtimeArguments.get(macroKey);
	}
}

SecureMacroContext implements MacroContext {
	SecureStore secureStore;
	String getValue(String macroKey) {
		return secureStore.get(macroKey);
	}
}

RuntimeFunctionMacro implements MacroContext {	
	TimeZone timeZone;
	long logicalStartTime;
	Function<String, String> timezoneFunction;
	String getValue(String arguments) {
		return timezoneFunction.apply(arguments);
	}
} 
 

 

Scoping:

Since the macro-substitution is performed at the DataPipeline app level, it will be possible to scope at stage name level if the user desires that. 

In our example config of JDBC source to Table sink, there is a common macro "${table-name}", if the user wants to provide a different name for the table-name in Table Sink, he can use scoping.

Code Block
Format : stage_name:key 
 
Example for Scoping:
 
Key : table-name, value : employees // non-scoped key and value
Key : Table:table-name, value : employee_sql // scoped key and value
 
 
//Priority will be provided for scoped key with stage-name if that key is present, else non-scoped key will be used if that is present.
String substituteAndGet(String macro, String stageName) {
	if (macroContext.containsKey("<stagename>.<macro>")) {
		return substituteMacro(macroContext.get(<stagename>.<macro>))
	} else if (macroContext.containsKey(<macro>)) {
		return substituteMacro(macroContext.get(<macro>));
	} else {
		throw MacroNotFoundException("Expected macro %s is not found", <macro>);
	}
}

 

 

Hydrator Plugin Changes

Currently when we deploy a pipeline,  configurePipeline is called on each plugin. we perform few validations in configure stage, specifically for schema, syntax for scripts, etc. In some Plugins we also create dataset if the dataset doesn't already exist. 

The dataset to write to can be macro-substituted. so we have to defer dataset creation to prepareRun rather than doing at configure stage. 

However there are certain fields which are used to determine the schema in the plugin and those cannot be macro-substituted and we want to disallow macro usage for them. 

The plugin can specify using an annotation if a property field can support macro or not. This will require CDAP Platform change as its a new Annotation.

 

Code Block
// if a pipeline config fields does not support macro, can be specified at config level. 
// if not specified by default, a field can not be macro substituted
@Macro(enabled=true) 
public static class ValidatorConfig extends PluginConfig {
  @Description("Comma-separated list of validator plugins that are used in script")
  String validators;
  @Description(SCRIPT_DESCRIPTION)
  String validationScript;

  @Description("Lookup tables to use during transform. Currently supports KeyValueTable.")
  @Nullable
  String lookup;
}
Code Block
public class TableSinkConfig extends PluginConfig {
  @Name(Properties.Table.NAME)
  @Description("Name of the table. If the table does not already exist, one will be created.")
  @Macro(enabled=true) // The name of the table can be specifiedprefix stage-name before the macro name separated by the delimiter (colon).

 

 

Hydrator Plugin Changes

Currently when we deploy a pipeline,  configurePipeline is called on each plugin. we perform few validations in configure stage, specifically for schema, syntax for scripts, etc. In some Plugins we also create dataset if the dataset doesn't already exist. 

The dataset to write to can be macro-substituted. so we have to defer dataset creation to prepareRun rather than doing at configure stage. 

Deferring dataset creation in prepareRun will required changes to BatchSinkContext to have a new method. 

Code Block
@Beta
public interface BatchSinkContext extends BatchContext {
// new method
void createDataset(String datasetName, String typeName, DatasetProperties properties);
 
//existing methods
@Deprecated
void addOutput(String datasetName);
...
}

Currently if a  stream given in stream source or table given in table source doesn't exist, we create a new stream/table, we want to disallow this, so this addition will only be in BatchSinkContext and not BatchContext.

PluginConfigurer can be made not to extend DatasetConfigurer.

 

However there are certain fields which are used to determine the schema in the plugin and those cannot be macro-substituted as schema validation is essential during configure time and we want to disallow macro usage for them. 

One option for this is to use Annotations in plugins. 

The plugin can specify using an annotation if a property field can support macro or not. This will require CDAP Platform change as its a new Annotation.

 

Code Block
// if a pipeline config fields does not support macro, can be specified at config level. 
// if not specified by default, a field can not be macro substituted
@Macro(enabled=true) 
public static class ValidatorConfig extends PluginConfig {
  @Description("Comma-separated list of validator plugins that are used in script")
  String validators;
  @Description(SCRIPT_DESCRIPTION)
  String validationScript;

  @Description("Lookup tables to use during transform. Currently supports KeyValueTable.")
  @Nullable
  String lookup;
}
Code Block
public class TableSinkConfig extends PluginConfig {
  @Name(Properties.Table.NAME)
  @Description("Name of the table. If the table does not already exist, one will be created.")
  @Macro(enabled=true) // The name of the table can be specified by a runtime macro
  private String name;

  @Name(Properties.Table.PROPERTY_SCHEMA)
  @Description("schema of the table as a JSON Object. If the table does not already exist, one will be " +
    "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " +
    "table created will not be explorable.")
  @Nullable
  // The schema should not be substituted by a runtime macro
  private String nameschemaStr;

  @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD)
  @Description("schemaThe name of the tablerecord field asthat ashould JSONbe Object.used Ifas the tablerow doeskey notwhen alreadywriting exist,to one will be " +
    "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " +
    "table created will not be explorable.")
  @Nullable
  // The schema should not be substituted by a runtime macro
  private String schemaStr;

  @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD)
  @Description("The name of the record field that should be used as the row key when writing to the table.")
  @Macro(enabled=true) // The name of the row field can also be specified by a runtime macro
  private String rowField;
}

CDAP Platform Changes:

 

Code Block
titleMacro Annotation
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Macro {

  /**
   * Default status if macro is enabled.
   */
  boolean DEFAULT_STATUS = false;

  /**
   * Returns if macro is enabled. Default is 'false'.
   */
  boolean enabled() default DEFAULT_STATUS;
}
 
Code Block
/**
 * Contains information about a property used by a plugin.
 */
@Beta
public class PluginPropertyField {

  private final String name;
  private final String description;
  private final String type;
  private final boolean required;
  // returns true if this field can accept macro
  private final boolean macroEnabled;
  ...
}the table.")
  @Macro(enabled=true) // The name of the row field can also be specified by a runtime macro
  private String rowField;
}

CDAP Platform Changes:

 

Code Block
titleMacro Annotation
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Macro {

  /**
   * Default status if macro is enabled.
   */
  boolean DEFAULT_STATUS = false;

  /**
   * Returns if macro is enabled. Default is 'false'.
   */
  boolean enabled() default DEFAULT_STATUS;
}
 
Code Block
/**
 * Contains information about a property used by a plugin.
 */
@Beta
public class PluginPropertyField {

  private final String name;
  private final String description;
  private final String type;
  private final boolean required;
  // returns true if this field can accept macro
  private final boolean macroEnabled;
  ...
}

 

Implementation Details

 DataPipeline app instantiates a plugin (using plugin context) and then performs macro substitution on the plugin and uses the updated Plugin with macro substituted configs.

Code Block
titleMacroContext
interface MacroContext {	
	/**
	 * Given the macro key, return the substituted value
     */ 
	String getValue(String macroKey);
}


Code Block
titleMacro Types
Based on the macro type, one of the below MacroContext's will be used to get the value for macro. 
 
DefaultMacroContext implements MacroContext {
	Map<String, String> runtimeArguments;
	String getValue(String macroKey) {
		return runtimeArguments.get(macroKey);
	}
}

SecureMacroContext implements MacroContext {
	SecureStore secureStore;
	String getValue(String macroKey) {
		return secureStore.get(macroKey);
	}
}

RuntimeFunctionMacro implements MacroContext {	
	TimeZone timeZone;
	long logicalStartTime;
	Function<String, String> timezoneFunction;
	String getValue(String arguments) {
		return timezoneFunction.apply(arguments);
	}
} 
 

Changes to Existing Plugins

Many plugins have fields (configurable properties) that are used in constructing or validating a schema at configure time. These fields need to have macros disabled to allow this. The following plugins and fields would be affected:

PluginFieldsUseConflict
BatchCassandraSourceschemaParsed for correctness to create the schema.Parsing a macro or schema with a nested macro would fail.
CopybookSourcecopybookContents

Copybook contents are converted to an InputStream and used to get external records, which are in turn used to add fields to the schema.

Schema would add macro literal as a field.
DedupAggregatoruniqueFields, filterOperationBoth fields are used to validate the input schema created.Macro literals do not exist as fields in schema and will throw IllegalArgumentException.
DistinctAggregatorfieldsSpecifies the fields used to construct the output schema.Will add macro literals as schema fields.*
GroupByAggregatorgroupByFields, aggregates,Gets fields from input schema and adds aggregates to to output fields list.Macro literals do not exist in input schema or are valid fields for an output schema.
RowDenormalizerAggregatorkeyField, nameField, valueFieldGets schemas by field names from the input schema.Macro literals do not exist as fields in the input schema.
KVTableSinkkeyField, valueFieldValidates that presence and type of these fields in the input schema.Macro literals will not exist in the input schema.
SnapshotFileBatchAvroSinkschemaParses schema to add file properties.Macro literals may disallow schema parsing or incorrect schema creation.
SnapshotFileBatchParquetSinkschemaParses schema to add file properties.Macro literals may disallow schema parsing or incorrect schema creation.
TableSinkschema, rowFieldValidates output and input schemas if properties specified.Macro literals will lead to failed validation of schema and row field.
TimePartitionedFileSetDatasetAvroSinkschemaParses schema to add file properties.Parsing macro literals in schema would fail.
TimePartitionedFileSetDatasetParquetSinkschemaParses schema to add file properties.Parsing macro literals in schema would fail.
SnapshotFileBatchAvroSourceschemaParses schema property to set output schema.Macro literals can lead to invalid schema parsing or creation.
SnapshotFileBatchParquetSourceschemaParses schema property to set output schema.Macro literals can lead to invalid schema parsing or creation.
StreamBatchSourceschema, name, formatStream is added and created through name and schema is parsed to set output schema.Macro literals will lead to bad parsing of properties.
TableSourceschemaSchema parsed to set output schema.Macro literals will lead to failed or incorrect schema creation.
TimePartitionedFileSetDatasetAvroSourceschemaSchema parsed to set output schema.Macro literals will lead to failed or incorrect schema creation.
TimePartitionedFileSetDatasetParquetSourceschemaSchema parsed to set output schema.Macro literals will lead to failed or incorrect schema creation.
JavaScriptTransformschema, script, lookupSchema format is used to set the output schema. JavaScript and lookup properties are also parsed for correctness.Macro literals can cause parsing to fail for schema creation, JavaScript compilation, or lookup parsing.
LogParserTransforminputNameGets field from input schema through inputName property.With a macro literal, the field will not exist in the input schema.
ProjectionTransformfieldsToKeep, fieldsToDrop, fieldsToConvert, fieldsToRenameProperties are used to create output schema.Macro literals will lied to a failed or wrong output schema being created.
PythonEvaluatorschemaSchema parsed for correctness and set as output schema.Macro literal can lead to failed or bad schema creation.
ValidatorTransformvalidators, validationScript,Validator property used to set validator plugins. Script property is also parsed for correctness.Macro literals can lead to failed parsing or plugins being set. Scripts can not be validated without validators.
ElasticsearchSourceschemaSchema parsed for correctness and set as output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
HBaseSinkrowField, schemaParsed to valid the output and input schemas and set the ouput schema.Macro literals can lead to failed or incorrect schema parsing/creation.
HBaseSourceschemaParsed for correctness to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
HiveBatchSourceschemaParsed for correctness to set ouput schema.Macro literals can lead to failed or incorrect schema parsing/creation.
MongoDBBatchSourceschemaParsed for correctness and validated to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
NaiveBayesClassifierpredictionFieldConfigures and sets fields of output schema and checked for existence in input schema.Output schema would be created wrongly with macro literal as prediction field and input schema check behavior is undefined.
Compressorcompressor, schemaParsed for correctness and used to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
CSVFormatterschemaParsed for correctness and used to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
CSVParserfieldValidated against input schema to check existence of field.Macro literals may not exist as fields in the input schema.
Decoderdecode, schemaDecode property is parsed and validated then used to validate the input schema. Schema parsed to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema.
Decompressordecompressor, schemaDecompressor property is parsed and validated then used to validate the input schema. Schema parsed to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema.
Encoderencode, schemaEncode property is parsed and validated then used to validate the input schema. Schema parsed to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema.
JSONFormatterschemaParsed for correctness and used to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.
JSONParserfield, schemaValidates if field property is present in input schema. Parses schema property to set output schema.Macro literal may not exist in input schema and may lead to failed parsing or creation of output schema.
StreamFormatterschemaParsed for correctness and used to set output schema.Macro literals can lead to failed or incorrect schema parsing/creation.

* May need verification


Other plugins have fields that are validated/processed at configure time that do not affect the schema. In these cases, these can be moved to the prepare run method. The following plugins and fields would be affected:

PluginFieldsUseJustification
StreamBatchSourceduration, delayParsed and validated for proper formatting.The parsing/validation is not related to the schema's creation.
TimePartitionedFileSetSourceduration, delayParsed and validated for proper formatting.The parsing/validation is not related to the schema's or dataset's creation.
ReferenceBatchSinkreferenceNameVerifies reference name meets dataset ID constraints.As dataset names can be macros, this supports the primary use case.
ReferenceBatchSourcereferenceNameVerifies that reference name meets dataset ID constraints.As dataset names can be macros, this supports the primary use case.
FileBatchSourcetimeTableCreates dataset from time table property.This is a primary use case for macros.
TimePartitionedFileSetSourcename, basePathName and basePath are used to create the dataset.This is a primary use case for macros.
BatchWritableSinkname, typeCreates dataset from properties.This is a primary use case for macros.
SnapshotFileBatchSinknameCreates dataset from name field.This is a primary use case for macros.
BatchReadableSourcename, typeDataset is created from name and type properties.This is a primary use case for macros.
SnapshotFileBatchSourceall properties*Creates dataset from properties.This is a primary use case for macros.
TimePartitionedFileSetSinkall properties*Creates dataset from properties.This is a primary use case for macros.
DBSourceimportQuery, boundingQuery, splitBy, numSplitsValidate connection settings and parsed for formatting.The parsing/validation does not lead to the creation of any schema or dataset.
HDFSSinktimeSuffixParsed to validate proper formatting of time suffix.The parsing/validation does not lead to the creation of any schema or dataset.
KafkaProducerasyncParsed to check proper formatting of boolean.The parsing/validation does not lead to the creation of any schema or dataset.
NaiveBayesClassifierfieldToClassifyChecked if input schema field is of type String.The validation does not lead to the creation or alteration of any schema.
NaiveBayesTrainerfieldToClassify, predictionFieldChecked if input schema fields are of type String and Double respectively.The validation does not lead to the creation or alteration of any schema.
CloneRecordcopiesValidated against being 0 or over the max number of copies.The validation does not lead to the creation of any schema or dataset.
CSVFormatterformatValidated for proper formatting.The validation does not lead to the creation of any schema or dataset.
CSVParserformatValidated for proper formatting.The validation does not lead to the creation of any schema or dataset.
HasherhashChecked against valid hash formats.The check does not lead to the validation or alteration of any schema.
JSONParsermappingMappings extracted and placed into a map with their expressions.The extraction does not affect any schema creation or validation.
StreamFormatterformatChecked against valid stream formats.The check does not lead to the validation or alteration of any schema.
ValueMappermapping, defaultsParsed after configuration is initialized and validated.The check does not lead to the validation or alteration of any schema.

May need verification

----------------------

Setting Hydrator runtime arguments using CDAP runtime arguments/preferences

...