Goals
Checklist
- User stories documented (Albert/Vinisha)
- User stories reviewed (Nitin)
- AlbertShankar/VinishaKashif) Design documented (
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
Use Cases
- A pipeline developer wants to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run.
- A pipeline developer wants to create a pipeline that reads from a database source and writes to a Table table sink. we He wants to configure the name of the database table and name of the table sink at on a per run basis and he gives those values as input before starting the run.
- Pipeline A pipeline developer wants to create a pipeline with a custom action at the start of the run, the . The custom action based on a some logic provides the name of the database to use as source and the name of the table to write in sink, the . The next stage in pipeline uses this information to read from the appropriate db database source and write to the table sink.
User Stories
- As a pipeline developer, I want to be able to configure a plugin property to some value that will get be substituted for each run based on the runtime arguments.
- As a pipeline operator, I want to be able to set arguments for the entire pipeline that will be used for substitution.
- As a pipeline operator, I want to be able to set arguments for a specific stage in the pipeline that will be used for substitution.
- As a plugin developer, I want to be able to write a code that is executed at the start of the pipeline and sets arguments for the rest of the run.
Design
...
Macros Syntax
Code Block |
---|
ExpandedFunction Syntax : ${macro-typemacroFunction(macro)} ShorthandProperty lookup notationsyntax: ${macro} Example Usage: ${runtime-argumentsecure(hostnameaccessKey))} - get hostnameaccess key from secure runtimestore arguments ${wf-tokenlogicalStartTime(hostnametimeFormat))} - get hostname from workflow token ${secure(access_key)) - get access key from secure store ${function_time(time_format)) - apply apply time function on the time_formattimeFormat provided and use the value. The Default (short-handshortHand) usage will readsubstitute fromarguments runtimeusing arguments,the havingfollowing anprecedence: expandedCustom notationAction givesWorkflow-Token user> optionRuntime forArguments using> moreStored macroPreferences types. Examples : ipConfig : ${hostname}:${port} JDBC connection string : jdbc:${jdbc-plugin}://${hostname}:${sql-port}/${db-name} |
...
Code Block | ||
---|---|---|
| ||
"stages": [ { "name": "Database", "plugin": { "name": "Database", "type": "batchsource", "properties": { ... "user": "${username}", "password": "${secure(sql-password)}", "jdbcPluginName": "jdbc", "jdbcPluginType": "${jdbc-type}", "connectionString": "jdbc:${jdbc-type}// Using the expanded syntax allows additional logic to be applied to the macro arguments through a macro function. Escaping can be supported using the \ (backslash) character (e.g. \${hostname} will not be substituted) Nested macros: if a macro contains another macro, Example : ${secure(${user-name})} In the above example, we want to lookup the user-name in properties first, then use secure store to get the key/password for that user-name. this final key/password will be used for that field. |
The shorthand notation supports retrieval precedence to limit the exposure of underlying workflow-tokens and runtime-arguments to pipeline operators. The "functionTime" macro function uses the logical start time of a run to perform the substitution. This is an example of a macro function that is not just a key-value lookup but allows for extra logic to be performed before a value is returned. For now, the implementation will only support the following macro functions: runtime-arguments. Once the secure store API is available, it will also support secure store. In the future, we can see if we will allow developers to create custom macro functions (similar to functionTime(...)).
Notes:
- The current implementation for macro substitution supports recursive expansion. That is, if a macro such as ${address} expands to ${hostname}:${port
...
- }, then ${hostname} and ${
...
- port} will be evaluated. However, this can lead to an infinite loop from circular macros, so we can add a maximum depth for expansion.
Code Block | ||
---|---|---|
| ||
"stages": [ { "importQueryname": "select * from ${table-name};"Database", "plugin": { } } }"name": "Database", { "nametype": "Tablebatchsource", "pluginproperties": { ... "name": "Table", "type"user": "batchsink${username}", "password": "${secure(sql-password)}", "jdbcPluginName": "jdbc", "propertiesjdbcPluginType": "${jdbc-type}", "schemaconnectionString": "{\"type\":\"record\",\"name\":\"etlSchemaBody\jdbc:${jdbc-type}//${hostname}:${port}/${db-name}", \"fields\importQuery":[{\"name\":\"name\",\"type\":\"string\"}, "select * from ${table-name};" } {\"name\":\"age\",\"type\":\"int\"},{\"name\":\"emp_id\",\"type\":\"long\"}]}",} }, { "name": "${table-name}Table", "schema.row.field"plugin": "name"{ } "name": "Table", } } ] |
Scoping:
If the macro-substitution is performed at the DataPipeline app level, it will be possible to scope at stage name level if the user desires that.
In our example config of JDBC source to Table sink, there is a common macro "${table-name}", if the user wants to provide a different name for the table-name in Table Sink, he can use scoping.
Code Block |
---|
Example for Scoping:
Provided runtime arguments:
Key : table-name, value : employees
Key : TableSink:table-name, value : employee_sql
table-name is the macro name that is used in both DBSource stage and TableSink stage.
if user wants to provide a special value for macro "table-name" to be used in TableSink, he will prefix stage-name before the macro name separated by the delimiter (colon). |
Hydrator Plugin Changes
Currently when we deploy a pipeline, configurePipeline is called on each plugin. we perform few validations in configure stage, specifically for schema, syntax for scripts, etc. In some Plugins we also create dataset if the dataset doesn't already exist.
The dataset to write to can be macro-substituted. so we have to defer dataset creation to prepareRun rather than doing at configure stage.
Deferring dataset creation in prepareRun will required changes to BatchSinkContext to have a new method.
Code Block |
---|
@Beta
public interface BatchSinkContext extends BatchContext {
// new method
void createDataset(String datasetName, String typeName, DatasetProperties properties);
//existing methods
@Deprecated
void addOutput(String datasetName);
...
} |
Currently if a stream given in stream source or table given in table source doesn't exist, we create a new stream/table, we want to disallow this, so this addition will only be in BatchSinkContext and not BatchContext.
PluginConfigurer can be made not to extend DatasetConfigurer as it can no longer create dataset.
However there are certain fields which are used to determine the schema in the plugin and those cannot be macro-substituted as schema validation is essential during configure time and we want to disallow macro usage for them.
There are two way of handling this.
1) At Platform Level
2) At DataPipeline App level
Platform Level Substitution:
Plugins can use "@Macro" annotation to specify if a plugin field can be a macro and also provides a configure-value to use at configure time to instantiate the plugin.
when plugin instance is instantiated at configure time, macros cannot be substituted.
- If we want to keep the field with macro as is, then the field has to be always a string, this is limiting for plugin developers. as they have to do type casting themselves for using macro on fields with other types than String.
- By having a configure-value we can work-around that, so the plugin developer has to know that this value will be used at configure time. but this might seem unnecessary for the plugin developer as this configure-value isn't very useful except to avoid failure at configure time.
Code Block |
---|
public class TableSinkConfig extends "type": "batchsink", "properties": { "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\", \"fields\":[{\"name\":\"name\",\"type\":\"string\"}, {\"name\":\"age\",\"type\":\"int\"},{\"name\":\"emp_id\",\"type\":\"long\"}]}", "name": "${table-name}", "schema.row.field": "name" } } } ] |
CDAP API Changes
Macro Annotation:
Plugins can use an "@Macro" annotation to specify if a plugin field can be a macro.
When a plugin instance is instantiated at configure time, macros cannot be substituted as the values to substitute have not been specified yet. By default, macros will be disabled for properties. This is to prevent new plugin developers from having to worry about undefined behavior if they did not consider or are not familiar with macros.
Code Block | ||
---|---|---|
| ||
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
/**
* Annotation for a plugin field to specify if it can support macro
*/
public @interface Macro {
} |
Code Block |
---|
/**
* Contains information about a property used by a plugin.
*/
@Beta
public class PluginPropertyField {
private final String name;
private final String description;
private final String type;
private final boolean required;
// returns true if this field can accept macro
private final boolean isMacro;
...
} |
Example Macro Usage in Plugin:
Code Block |
---|
public class TableSinkConfig extends PluginConfig { @Name(Properties.Table.NAME) @Description("Name of the table. If the table does not already exist, one will be created.") // The name of the table can be specified by a runtime macro, by default macros are disabled for fields. this field uses@Macro the value "test" atprivate configString time.name; @Macro(enabled=true, configValue="test") private String name; @Name(Properties.Table.PROPERTY_SCHEMA) @Name(Properties.Table.PROPERTY_SCHEMA) @Description("schema of the table as a JSON Object. If the table does not already exist, one will be " + "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " + "table created will not be explorable.") @Nullable private String schemaStr; @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD) @Description("The name of the record field that should be used as the row key when writing to the table.") private String rowField; } |
...
PluginConfig Changes:
Code Block | ||
---|---|---|
| ||
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface Macro@Beta public abstract class PluginConfig extends Config implements Serializable { /** * DefaultReturns statusthe if macro is enabled. */{@link PluginProperties}. */ public final PluginProperties getProperties() { boolean DEFAULT_STATUS =return falseproperties; } /** * Returns Returnsfalse if the macrofield is enabled. Default is 'false'. */ boolean enabled() default DEFAULT_STATUS; /** * returns the value to be used at config time not annotated as a macro. If field is annotated as macro, * the field value is checked if its a macro, based on the check it returns true/false. */ String configValue(); } |
Code Block |
---|
/**
* Contains information about a property used by a plugin.
*/
@Beta
public class PluginPropertyField {
private final String name;
private final String description;
private final String type;
private final boolean required;
// returns true if this field can accept macro
private final boolean macroEnabled;
...
} |
Notes
This will require a CDAP platform level change as its a new annotation.
PluginInstantiator has to understand if macro is enabled and set fields appropriately..
During configure time, if a field is macro enabled, PluginInstantiator would try to use configValue as the value for the field in order to instantiate the plugin.
Pipeline developer should take care that configurePipeline doesn't use the config fields which are macro enabled.
During runtime, PipelineInsantiator would get config fields and values to substitute and can use that information to substitute macro appropriately and return an instantiated plugin.
App Level Substitution:
One possibility is for substitution to be implemented at the app level. This would be ideal if we want to keep the concept of macros Hydrator-specific. If substitution were to occur at the app level, then the user would dictate which fields will be macro-substitutable through the plugin configuration UI. In order to allow non-string properties to be substitutable, the user must provide a default value along with the macro through the UI. For example, it a user enters the "port" property as: ${port}, the UI will provide a way for the user to enter a default port value. Creating a DB batch source would yield the following configuration JSON:
Code Block | ||
---|---|---|
| ||
"plugin": {
"name": "Database",
"type": "batchsource",
"properties": {
"user": "${username}",
"password": "${secure(sql-password)}",
"jdbcPluginName": "jdbc",
...
"importQuery": "select * from ${table-name};"
...
"macroDefaults": "{
\"user\": \"admin\",
\"password\": \"pw1234\",
\"importQuery\": \"select * from test;\"
}"
}
} |
In this case, the app understands from configuration the fields that are macros and the default values to use for those fields during configure time.
This would require a new method in PluginContext to accept key and value pairs to substitute for plugin properties.
Code Block |
---|
@Beta
public interface PluginContext {
// existing methods
PluginProperties getPluginProperties(String pluginId);
<T> Class<T> loadPluginClass(String pluginId);
<T> T newPluginInstance(String pluginId) throws InstantiationException;
/**
* Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with
* {@link PluginProperties} provided at the time when the
* {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the
* program configuration time. In addition the parameter pluginProperties can be used to override the existing
* plugin properties in config, with which the plugin instance will have substituted plugin properties.
*
* @param pluginId the unique identifier provide when declaring plugin usage in the program.
* @param <T> the class type of the plugin
* @param pluginProperties the properties to override existing plugin properties before instance creation.
* @return A new instance of the plugin being specified by the arguments
*
* @throws InstantiationException if failed create a new instance
* @throws IllegalArgumentException if pluginId is not found
* @throws UnsupportedOperationException if the program does not support plugin
*/
<T> T newPluginInstance(String pluginId, Map<String, String> pluginProperties) throws InstantiationException;
|
Configure time:
The app can all this new method with macroDefault values, so plugin instance creation will use macro default values for those config fields.
Run time:
The app performs substitution for the properties with macros using the value from runtime arguments ( or workflow token) and calls the method with the field name's and substitution value's.
Custom Action Setting Config values:
Custom action can use workflow token to set values for field names.
Code Block |
---|
"plugin": {
"name": "Database",
"type": "batchsource",
"properties": {
"user": "${token(username)}",
"password": "${secure(sql-password)}",
"jdbcPluginName": "jdbc",
"importQuery": "select * from ${token(table-name)};"
}
} |
If pipeline builder wants to use workflow token sent from custom action to be used as value for fields, then he uses macro-type token in his fields as above.
Context has access to workflow token and we should be able to use Workflow tokens similar to runtime arguments for substitution.
Documentation Changes (wip)
Regardless of where the substitution occurs, the guidelines for creating Hydrator plugins would have to change. For existing plugins, any validation for properties that are macro-substitutable that exists in configurePipeline must be moved to prepareRun.
Reference
Many plugins have properties that are used in constructing or validating a schema at configure time. These fields need to have macros disabled to allow this. The following plugins and fields would be affected:
...
Copybook contents are converted to an InputStream and used to get external records, which are in turn used to add fields to the schema.
...
public final boolean isMacro(String fieldName) {
...
}
} |
The method will return whether or not the property with the provided fieldName contains a macro at configure time. We don't want to force deferring macros to runtime in the case that a field is macroable but actually has no macro provided in its configuration. This allows optional checking of properties at configure time for simple pipelines.
Code Block | ||
---|---|---|
| ||
private final TableSinkConfig sinkConfig;
@Override
void configurePipeline(PipelineConfigurer pipelineConfigurer) {
if (!sinkConfig.isMacro("name")) {
// create dataset if the datasetName field is not a macro
pipelineConfigurer.createDataset(name, datasetType, DatasetProperties.builder().addAll(properties).build());
}
...
} |
Notes:
- To allow macro substitution of non-String properties, any properties configured with a macro will have a placeholder default value at configure time which is different from runtime.
- For primitive types, this would be Java's default value, Example: integer field will have a value of 0. objects, would be null.
- This is fine because we are exposing a method to check whether a field is a macro or not in pipelineConfigurer. so plugin developers can check at configure time if a field is macro before performing appropriate action using that field.
- During runtime, macro fields will be substituted and plugin instance will have fields with substituted values.
PluginContext Changes:
Code Block |
---|
@Beta
public interface PluginContext {
/**
* Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with
* {@link PluginProperties} provided at the time when the
* {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the
* program configuration time. If a plugin field has a macro,
* the parameter evaluator is used to evaluate the macro and the evaluated value is used for the plugin field.
*
* @param pluginId the unique identifier provide when declaring plugin usage in the program.
* @param evaluator the macro evaluator that's used to evaluate macro for plugin field if macro is supported on those fields.
* @param <T> the class type of the plugin
* @return A new instance of the plugin being specified by the arguments
*
* @throws InstantiationException if failed create a new instance
* @throws IllegalArgumentException if pluginId is not found
* @throws UnsupportedOperationException if the program does not support plugin
*/
<T> T newPluginInstance(String pluginId, MacroEvaluator evaluator) throws InstantiationException; |
Code Block | ||
---|---|---|
| ||
@Beta
interface MacroEvaluator {
/**
* lookup the property and return the value corresponding to the property.
*/
String evaluate(String property);
/**
* Use the macro function and call the function with provided arguments,
* apply appropriate function or use properties in default case and return the evaluated value
*/
String evaluate(String macroFunction, String... arguments);
} |
- During configure time in the configurePipeline method, if a field is macro enabled, the property should not be validated as the macro has not been provided a substitutable value.
- During runtime, PluginInstantiator has to understand if macro is enabled for a field, it parses the macro and then uses the parameter `evaluator` to evaluate the macro.
- The MacroEvaluator is provided by the DataPipelineApp. Though the platform understands macro, MacroEvaluator enables the actual substitution logic to be provided by the Application.
ETL API - BatchContext Changes
Given that a dataset could be created at configure time if no fields are provided macros, a check should be available for plugin developers to see whether the dataset already exists at runtime. We can do this by altering the runtime context object passed in to the prepareRun method. As the object extends BatchContext which extends DatasetContext, we can create a new method in BatchContext that checks for the existence of a dataset.
This method will return whether or not the dataset with the provided datasetName already exists and can be used in prepareRun:
If The dataset to write to is macro-substituted and a macro is using in the config, we have to defer dataset creation to prepareRun rather than doing this in the configure stage. Deferring dataset creation in prepareRun will require adding a new method to BatchContext.
Code Block |
---|
@Beta
public interface BatchContext extends DatasetContext, TransformContext {
/**
* create dataset identified by datasetName, typeName with properties.
*/
void createDataset(String datasetName, String typeName, DatasetProperties properties);
/**
* return true if dataset with datasetName exists
*/
boolean datasetExists(String datasetName);
...
} |
Code Block |
---|
@Override
public void prepareRun(BatchSinkContext context) {
if (!context.datasetExists(config.getName())) {
pipelineConfigurer.createDataset(config.getName(), ...);
}
// ...
} |
Notes
Currently if a stream given in stream source or table given in table source doesn't exist, we create a new stream/table. We want to allow table creation as we want to create external dataset for sources, but disallow stream creation, so we are adding only createDataset to the BatchContext.
However there are certain fields which are used to determine the schema in the plugin and those cannot be macro-substituted as schema validation is essential during configure time and we want to disallow macro usage for them.
Custom Action Setting Config Values:
One use case of the feature is to allow custom actions that run before a plugin to set macros. Custom actions can use workflow tokens to set values for field names. since workflow tokens are merged with runtime arguments and exposed in macro property lookup, macro substitution has access to tokens set from custom action, workflow tokens has higher priority over runtime arguments and preferences.
Scoping (Low priority):
Scoping is currently at low priority and can be done manually. In our example config from a JDBC source to a table sink, there is a common macro "${table-name}", if the user wants to provide a different name for the table-name in Table Sink, he can manually do this:
Syntax | Macro | Evaluates To |
---|---|---|
${table-name} | table-name | employees |
${DBSource.table-name} | DBSource.table-name | employee_sql |
This is more of the user creating unique argument keys as opposed to scoping.
Documentation/Changes
Regardless of where the substitution occurs, the guidelines for creating Hydrator plugins would have to change. For existing plugins, any validation for properties that are macro-substitutable in configurePipeline must be moved to prepareRun (see reference section for specific plugins). We also must document the convention for nulling/defaulting macroable properties at configure time.
Implementation Details
Code Block | ||
---|---|---|
| ||
interface MacroContext {
/**
* Given the macro key, return the substituted value
*/
String getValue(String macroKey);
} |
Code Block | ||
---|---|---|
| ||
Based on the macro type, one of the below MacroContext's will be used to get the value for macro.
DefaultMacroContext implements MacroContext {
Map<String, String> runtimeArguments;
String getValue(String macroKey) {
return runtimeArguments.get(macroKey);
}
}
SecureMacroContext implements MacroContext {
SecureStore secureStore;
String getValue(String macroKey) {
return secureStore.get(macroKey);
}
}
RuntimeFunctionMacro implements MacroContext {
long logicalStartTime;
Function<String, String> timezoneFunction;
String getValue(String arguments) {
return timezoneFunction.apply(arguments);
}
} |
JIRA:
Jira Legacy server Cask Community Issue Tracker serverId 45b48dee-c8d6-34f0-9990-e6367dc2fe4b key CDAP-5642
Reference
Many plugins have properties that are used in constructing or validating a schema at configure time. These fields need to have macros disabled to allow this. The following plugins and fields would be affected:
Plugin | Fields | Use | Conflict |
---|---|---|---|
BatchCassandraSource | schema | Parsed for correctness to create the schema. | Parsing a macro or schema with a nested macro would fail. |
CopybookSource | copybookContents | Copybook contents are converted to an InputStream and used to get external records, which are in turn used to add fields to the schema. | Schema would add macro literal as a field. |
DedupAggregator | uniqueFields, filterOperation | Both fields are used to validate the input schema created. | Macro literals do not exist as fields in schema and will throw IllegalArgumentException. |
DistinctAggregator | fields | Specifies the fields used to construct the output schema. | Will add macro literals as schema fields.* |
GroupByAggregator | groupByFields, aggregates, | Gets fields from input schema and adds aggregates to to output fields list. | Macro literals do not exist in input schema or are valid fields for an output schema. |
RowDenormalizerAggregator | keyField, nameField, valueField | Gets schemas by field names from the input schema. | Macro literals do not exist as fields in the input schema. |
KVTableSink | keyField, valueField | Validates that presence and type of these fields in the input schema. | Macro literals will not exist in the input schema. |
SnapshotFileBatchAvroSink | schema | Parses schema to add file properties. | Macro literals may disallow schema parsing or incorrect schema creation. |
SnapshotFileBatchParquetSink | schema | Parses schema to add file properties. | Macro literals may disallow schema parsing or incorrect schema creation. |
TableSink | schema, rowField | Validates output and input schemas if properties specified. | Macro literals will lead to failed validation of schema and row field. |
TimePartitionedFileSetDatasetAvroSink | schema | Parses schema to add file properties. | Parsing macro literals in schema would fail. |
TimePartitionedFileSetDatasetParquetSink | schema | Parses schema to add file properties. | Parsing macro literals in schema would fail. |
SnapshotFileBatchAvroSource | schema | Parses schema property to set output schema. | Macro literals can lead to |
invalid schema parsing |
or creation. |
SnapshotFileBatchParquetSource | schema | Parses schema property to set output schema. | Macro literals can lead to |
invalid schema parsing |
or creation. |
StreamBatchSource |
schema, name, |
format |
Stream is added and created through name and schema is parsed to set output schema. | Macro literals |
will lead to |
bad parsing of properties. | |||
TableSource | schema | Schema parsed to set output schema. | Macro literals |
will lead to failed or incorrect schema |
creation |
. |
TimePartitionedFileSetDatasetAvroSource | schema |
Schema parsed to set output schema. | Macro literals |
will lead to failed or incorrect schema |
creation. |
TimePartitionedFileSetDatasetParquetSource |
schema |
Schema parsed to set output schema. | Macro |
literals will lead to failed |
or incorrect schema creation |
. |
JavaScriptTransform | schema |
, script, lookup | Schema format is used to set |
the output schema. JavaScript and lookup properties are also parsed for correctness. | Macro literals can |
* May need verification
Other plugins have fields that are validated/processed at configure time that do not affect the schema. In these cases, these can be moved to the prepare run method. The following plugins and fields would be affected:
Plugin | Fields | Use | Justification |
---|---|---|---|
StreamBatchSource | duration, delay | Parsed and validated for proper formatting. | The parsing/validation is not related to the schema's creation. |
TimePartitionedFileSetSource | duration, delay | Parsed and validated for proper formatting. | The parsing/validation is not related to the schema's or dataset's creation. |
ReferenceBatchSink | referenceName | Verifies reference name meets dataset ID constraints. | As dataset names can be macros, this supports the primary use case. |
ReferenceBatchSource | referenceName | Verifies that reference name meets dataset ID constraints. | As dataset names can be macros, this supports the primary use case. |
FileBatchSource | timeTable | Creates dataset from time table property. | This is a primary use case for macros. |
TimePartitionedFileSetSource | name, basePath | Name and basePath are used to create the dataset. | This is a primary use case for macros. |
BatchWritableSink | name, type | Creates dataset from properties. | This is a primary use case for macros. |
SnapshotFileBatchSink | name | Creates dataset from name field. | This is a primary use case for macros. |
BatchReadableSource | name, type | Dataset is created from name and type properties. | This is a primary use case for macros. |
SnapshotFileBatchSource | all properties* | Creates dataset from properties. | This is a primary use case for macros. |
TimePartitionedFileSetSink | all properties* | Creates dataset from properties. | This is a primary use case for macros. |
DBSource | importQuery, boundingQuery, splitBy, numSplits | Validate connection settings and parsed for formatting. | The parsing/validation does not lead to the creation of any schema or dataset. |
HDFSSink | timeSuffix | Parsed to validate proper formatting of time suffix. | The parsing/validation does not lead to the creation of any schema or dataset. |
KafkaProducer | async | Parsed to check proper formatting of boolean. | The parsing/validation does not lead to the creation of any schema or dataset. |
NaiveBayesClassifier | fieldToClassify | Checked if input schema field is of type String. | The validation does not lead to the creation or alteration of any schema. |
NaiveBayesTrainer | fieldToClassify, predictionField | Checked if input schema fields are of type String and Double respectively. | The validation does not lead to the creation or alteration of any schema. |
CloneRecord | copies | Validated against being 0 or over the max number of copies. | The validation does not lead to the creation of any schema or dataset. |
CSVFormatter | format | Validated for proper formatting. | The validation does not lead to the creation of any schema or dataset. |
CSVParser | format | Validated for proper formatting. | The validation does not lead to the creation of any schema or dataset. |
Hasher | hash | Checked against valid hash formats. | The check does not lead to the validation or alteration of any schema. |
JSONParser | mapping | Mappings extracted and placed into a map with their expressions. | The extraction does not affect any schema creation or validation. |
StreamFormatter | format | Checked against valid stream formats. | The check does not lead to the validation or alteration of any schema. |
ValueMapper | mapping, defaults | Parsed after configuration is initialized and validated. | The check does not lead to the validation or alteration of any schema. |
* May need verification
Implementation Details
DataPipeline app instantiates a plugin (using plugin context) and then performs macro substitution on the plugin and uses the updated Plugin with macro substituted configs.
Code Block | ||
---|---|---|
| ||
interface MacroContext {
/**
* Given the macro key, return the substituted value
*/
String getValue(String macroKey);
} |
Code Block | ||
---|---|---|
| ||
Based on the macro type, one of the below MacroContext's will be used to get the value for macro.
DefaultMacroContext implements MacroContext {
Map<String, String> runtimeArguments;
String getValue(String macroKey) {
return runtimeArguments.get(macroKey);
}
}
SecureMacroContext implements MacroContext {
SecureStore secureStore;
String getValue(String macroKey) {
return secureStore.get(macroKey);
}
}
RuntimeFunctionMacro implements MacroContext {
TimeZone timeZone;
long logicalStartTime;
Function<String, String> timezoneFunction;
String getValue(String arguments) {
return timezoneFunction.apply(arguments);
}
}
|
----------------------
Setting Hydrator runtime arguments using CDAP runtime arguments/preferences
CDAP preferences and runtime arguments will be used directly as Hydrator arguments.
1.) Runtime arguments can be passed to hydrator pipeline in 2 ways:
- Using Prepipeline-CustomActions:
Prepipeline custom actions can set runtime arguments. For example, before running the pipeline, custom actions can copy local files to hdfs and set runtime arguments for input path for batchsource. In order to do that, we can expose setPreferences() and getPreferences() programmatic api for setting runtime arguments. These arguments can be passed to hydrator app using workflow token. - Using Hydrator UI:
For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for preferences/runtime arguments framework.
2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '${secure(key)}', in case of value directly to be substituted, macro can be '${inputpath}' without any prefix.
Thoughts from Terence:
cause parsing to fail for schema creation, JavaScript compilation, or lookup parsing. | |||
LogParserTransform | inputName | Gets field from input schema through inputName property. | With a macro literal, the field will not exist in the input schema. |
ProjectionTransform | fieldsToKeep, fieldsToDrop, fieldsToConvert, fieldsToRename | Properties are used to create output schema. | Macro literals will lied to a failed or wrong output schema being created. |
PythonEvaluator | schema | Schema parsed for correctness and set as output schema. | Macro literal can lead to failed or bad schema creation. |
ValidatorTransform | validators, validationScript, | Validator property used to set validator plugins. Script property is also parsed for correctness. | Macro literals can lead to failed parsing or plugins being set. Scripts can not be validated without validators. |
ElasticsearchSource | schema | Schema parsed for correctness and set as output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
HBaseSink | rowField, schema | Parsed to valid the output and input schemas and set the ouput schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
HBaseSource | schema | Parsed for correctness to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
HiveBatchSource | schema | Parsed for correctness to set ouput schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
MongoDBBatchSource | schema | Parsed for correctness and validated to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
NaiveBayesClassifier | predictionField | Configures and sets fields of output schema and checked for existence in input schema. | Output schema would be created wrongly with macro literal as prediction field and input schema check behavior is undefined. |
Compressor | compressor, schema | Parsed for correctness and used to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
CSVFormatter | schema | Parsed for correctness and used to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
CSVParser | field | Validated against input schema to check existence of field. | Macro literals may not exist as fields in the input schema. |
Decoder | decode, schema | Decode property is parsed and validated then used to validate the input schema. Schema parsed to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema. |
Decompressor | decompressor, schema | Decompressor property is parsed and validated then used to validate the input schema. Schema parsed to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema. |
Encoder | encode, schema | Encode property is parsed and validated then used to validate the input schema. Schema parsed to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema. |
JSONFormatter | schema | Parsed for correctness and used to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
JSONParser | field, schema | Validates if field property is present in input schema. Parses schema property to set output schema. | Macro literal may not exist in input schema and may lead to failed parsing or creation of output schema. |
StreamFormatter | schema | Parsed for correctness and used to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
* May need verification
Other plugins have fields that are validated/processed at configure time that do not affect the schema. In these cases, these can be moved to the prepare run method. The following plugins and fields would be affected:
Plugin | Fields | Use | Justification |
---|---|---|---|
StreamBatchSource | duration, delay | Parsed and validated for proper formatting. | The parsing/validation is not related to the schema's creation. |
TimePartitionedFileSetSource | duration, delay | Parsed and validated for proper formatting. | The parsing/validation is not related to the schema's or dataset's creation. |
ReferenceBatchSink | referenceName | Verifies reference name meets dataset ID constraints. | As dataset names can be macros, this supports the primary use case. |
ReferenceBatchSource | referenceName | Verifies that reference name meets dataset ID constraints. | As dataset names can be macros, this supports the primary use case. |
FileBatchSource | timeTable | Creates dataset from time table property. | This is a primary use case for macros. |
TimePartitionedFileSetSource | name, basePath | Name and basePath are used to create the dataset. | This is a primary use case for macros. |
BatchWritableSink | name, type | Creates dataset from properties. | This is a primary use case for macros. |
SnapshotFileBatchSink | name | Creates dataset from name field. | This is a primary use case for macros. |
BatchReadableSource | name, type | Dataset is created from name and type properties. | This is a primary use case for macros. |
SnapshotFileBatchSource | all properties* | Creates dataset from properties. | This is a primary use case for macros. |
TimePartitionedFileSetSink | all properties* | Creates dataset from properties. | This is a primary use case for macros. |
DBSource | importQuery, boundingQuery, splitBy, numSplits | Validate connection settings and parsed for formatting. | The parsing/validation does not lead to the creation of any schema or dataset. |
HDFSSink | timeSuffix | Parsed to validate proper formatting of time suffix. | The parsing/validation does not lead to the creation of any schema or dataset. |
KafkaProducer | async | Parsed to check proper formatting of boolean. | The parsing/validation does not lead to the creation of any schema or dataset. |
NaiveBayesClassifier | fieldToClassify | Checked if input schema field is of type String. | The validation does not lead to the creation or alteration of any schema. |
NaiveBayesTrainer | fieldToClassify, predictionField | Checked if input schema fields are of type String and Double respectively. | The validation does not lead to the creation or alteration of any schema. |
CloneRecord | copies | Validated against being 0 or over the max number of copies. | The validation does not lead to the creation of any schema or dataset. |
CSVFormatter | format | Validated for proper formatting. | The validation does not lead to the creation of any schema or dataset. |
CSVParser | format | Validated for proper formatting. | The validation does not lead to the creation of any schema or dataset. |
Hasher | hash | Checked against valid hash formats. | The check does not lead to the validation or alteration of any schema. |
JSONParser | mapping | Mappings extracted and placed into a map with their expressions. | The extraction does not affect any schema creation or validation. |
StreamFormatter | format | Checked against valid stream formats. | The check does not lead to the validation or alteration of any schema. |
ValueMapper | mapping, defaults | Parsed after configuration is initialized and validated. | The check does not lead to the validation or alteration of any schema. |
* May need verification