Old Design details
moving old design documentation here
Scoping:
Hydrator arguments can also be scoped (similar to CDAP workflows). If an argument is prefixed with the stage name, the Hydrator app will strip the stage name and add that as an additional argument local to that stage. For example the pipeline has 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
The 'customers' stage will see 3 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
inputpath | /data/customers/2016-01-01 |
Similarly, the 'items' stage will see 3 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
items.inputpath | /data/items/2016-01-01 |
inputpath | /data/items/2016-01-01 |
If there is a name conflict between a scoped argument and a global argument, the scoped argument will overwrite the global for that scope. For example if the pipeline has 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/items/2016-01-01 |
The customers stage will see 2 arguments:
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/customers/2016-01-01 |
Â
The items stage will see 2 arguments:
Â
Argument name | Argument value |
---|---|
customers.inputpath | /data/customers/2016-01-01 |
inputpath | /data/items/2016-01-01 |
For secure keys, the scoped macro would be: '$customers.secure.key'
Precedence:
Workflow token > scoped arguments > pipeline level (global) argumentsÂ
3.) Set substituted properties for a stage. At runtime for a stage, we will need to add setPluginProperties() in PluginContext and Plugin.java as shown below.
@Beta public interface PluginContext { /** * Gets the {@link PluginProperties} associated with the given plugin id. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @return the {@link PluginProperties}. * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ /** * Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with * {@link PluginProperties} provided at the time when the * {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the * program configuration time. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @param <T> the class type of the plugin * @return A new instance of the plugin being specified by the arguments * * @throws InstantiationException if failed create a new instance * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ <T> T newPluginInstance(String pluginId) throws InstantiationException; Â /** * Creates a new instance of a plugin using properties provided. The instance returned will have the {@link PluginConfig} setup with * {@link PluginProperties} provided at the time when the * {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the * program configuration time. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @param properties the properties needs to be used to create plugin * @param <T> the class type of the plugin * @return A new instance of the plugin being specified by the arguments * * @throws InstantiationException if failed create a new instance * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ <T> T newPluginInstance(String pluginId, Map<String,String> properties) throws InstantiationException; }
package co.cask.cdap.api.plugin; Â public final class Plugin { private final ArtifactId artifactId; private final PluginClass pluginClass; private PluginProperties properties; public Plugin(ArtifactId artifactId, PluginClass pluginClass, PluginProperties properties) { this.artifactId = artifactId; this.pluginClass = pluginClass; this.properties = properties; } /** * @return artifact id */ public ArtifactId getArtifactId() { return artifactId; } /** * @return {@link PluginClass} */ public PluginClass getPluginClass() { return pluginClass; } Â /** * @param properties properties of the plugin */ public void setPluginProperties(PluginProperties properties) { this.properties = properties; } /** * Returns the set of properties available when the plugin was created. */ public PluginProperties getProperties() { return properties; } ..... }
Â
In initialize() of ETLMapReduce, PipelinePluginInstantiator instantiates a stage at runtime. We can substitute the properties with actual runtime arguments in ETLMapReduce.initialize() for MapReduceContext or ETLSpark.beforeSubmit() for SparkClientContext. DefaultPluginContext can use substituted properties for a stage and instantiate a stage.
PipelinePluginInstantiator pluginInstantiator = new PipelinePluginInstantiator(context, phaseSpec); // substitute properties of a plugin and set it in the context BatchConfigurable<BatchSourceContext> batchSource = pluginInstantiator.newPluginInstance(sourceName, substitutedProperties);
Â
Constraints:
- The runtime argument name should not contain '$'
- This design is only applicable for batch and not for realtime because current implementation for realtime does not have a way for passing tokens which is required for preaction/custom actions in a pipeline to pass runtime arguments.Â
Assumptions:
- The hydrator app (SparkClientContext, MapReduceContext) will have access to secure store manager to substitute values from key store.
Â
Previous Details/Design Notes:
App Level Substitution:
One possibility is for substitution to be implemented at the app level. This would be ideal if we want to keep the concept of macros Hydrator-specific. If substitution were to occur at the app level, then the user would dictate which fields will be macro-substitutable through the plugin configuration UI. In order to allow non-string properties to be substitutable, the user must provide a default value along with the macro through the UI. For example, it a user enters the "port" property as: ${port}, the UI will provide a way for the user to enter a default port value. Creating a DB batch source would yield the following configuration JSON:
"plugin": { "name": "Database", "type": "batchsource", "properties": { "user": "${username}", "password": "${secure(sql-password)}", "jdbcPluginName": "jdbc", ... "importQuery": "select * from ${table-name};" ... "macroDefaults": "{ \"user\": \"admin\", \"password\": \"pw1234\", \"importQuery\": \"select * from test;\" }" } }
In this case, the app understands from configuration the fields that are macros and the default values to use for those fields during configure time.Â
This would require a new method in PluginContext to accept key and value pairs to substitute for plugin properties.
Â
@Beta public interface PluginContext { Â // existing methods PluginProperties getPluginProperties(String pluginId); <T> Class<T> loadPluginClass(String pluginId); <T> T newPluginInstance(String pluginId) throws InstantiationException; Â /** * Creates a new instance of a plugin. The instance returned will have the {@link PluginConfig} setup with * {@link PluginProperties} provided at the time when the * {@link PluginConfigurer#usePlugin(String, String, String, PluginProperties)} was called during the * program configuration time. In addition the parameter pluginProperties can be used to override the existing * plugin properties in config, with which the plugin instance will have substituted plugin properties. * * @param pluginId the unique identifier provide when declaring plugin usage in the program. * @param <T> the class type of the plugin * @param pluginProperties the properties to override existing plugin properties before instance creation. * @return A new instance of the plugin being specified by the arguments * * @throws InstantiationException if failed create a new instance * @throws IllegalArgumentException if pluginId is not found * @throws UnsupportedOperationException if the program does not support plugin */ <T> T newPluginInstance(String pluginId, Map<String, String> pluginProperties) throws InstantiationException; Â
Â
Configure time:
The app can call this new method with macroDefault values, so plugin instance creation will use macro default values for those config fields.
Run time:
The app performs substitution for the properties with macros using the value from runtime arguments (or workflow token) and calls the method with the field names and substitution values.Â
Â
Scoping:
Â
If the macro-substitution is performed at the DataPipeline app level, it will be possible to scope at stage name level if the user desires that.Â
Â
In our example config of JDBC source to Table sink, there is a common macro "${table-name}", if the user wants to provide a different name for the table-name in Table Sink, he can use scoping.
Â
Example for Scoping: Provided runtime arguments:  Key : table-name, value : employees Key : TableSink:table-name, value : employee_sql  table-name is the macro name that is used in both DBSource stage and TableSink stage. if user wants to provide a special value for macro "table-name" to be used in TableSink, he will prefix stage-name before the macro name separated by the delimiter (colon).
Thoughts from Terence:
Â
Â
----------------------Â
Setting Hydrator runtime arguments using CDAP runtime arguments/preferences
Â
CDAP preferences and runtime arguments will be used directly as Hydrator arguments.Â
Â
1.) Runtime arguments can be passed to hydrator pipeline in 2 ways:
Â
- Using Prepipeline-CustomActions:
Prepipeline custom actions can set runtime arguments. For example, before running the pipeline, custom actions can copy local files to hdfs and set runtime arguments for input path for batchsource. In order to do that, we can expose setPreferences() and getPreferences() programmatic api for setting runtime arguments. These arguments can be passed to hydrator app using workflow token. - Using Hydrator UI:
For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for preferences/runtime arguments framework.Â
Â
2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '${secure(key)}', in case of value directly to be substituted, macro can be '${inputpath}' without any prefix.Â
Â
Reference:
Changes to Existing Plugins
Many plugins have fields (configurable properties) that are used in constructing or validating a schema at configure time. These fields need to have macros disabled to allow this. The following plugins and fields would be affected:
Plugin | Fields | Use | Conflict |
---|---|---|---|
BatchCassandraSource | schema | Parsed for correctness to create the schema. | Parsing a macro or schema with a nested macro would fail. |
CopybookSource | copybookContents | Copybook contents are converted to an InputStream and used to get external records, which are in turn used to add fields to the schema. | Schema would add macro literal as a field. |
DedupAggregator | uniqueFields, filterOperation | Both fields are used to validate the input schema created. | Macro literals do not exist as fields in schema and will throw IllegalArgumentException. |
DistinctAggregator | fields | Specifies the fields used to construct the output schema. | Will add macro literals as schema fields.* |
GroupByAggregator | groupByFields, aggregates, | Gets fields from input schema and adds aggregates to to output fields list. | Macro literals do not exist in input schema or are valid fields for an output schema. |
RowDenormalizerAggregator | keyField, nameField, valueField | Gets schemas by field names from the input schema. | Macro literals do not exist as fields in the input schema. |
KVTableSink | keyField, valueField | Validates that presence and type of these fields in the input schema. | Macro literals will not exist in the input schema. |
SnapshotFileBatchAvroSink | schema | Parses schema to add file properties. | Macro literals may disallow schema parsing or incorrect schema creation. |
SnapshotFileBatchParquetSink | schema | Parses schema to add file properties. | Macro literals may disallow schema parsing or incorrect schema creation. |
TableSink | schema, rowField | Validates output and input schemas if properties specified. | Macro literals will lead to failed validation of schema and row field. |
TimePartitionedFileSetDatasetAvroSink | schema | Parses schema to add file properties. | Parsing macro literals in schema would fail. |
TimePartitionedFileSetDatasetParquetSink | schema | Parses schema to add file properties. | Parsing macro literals in schema would fail. |
SnapshotFileBatchAvroSource | schema | Parses schema property to set output schema. | Macro literals can lead to invalid schema parsing or creation. |
SnapshotFileBatchParquetSource | schema | Parses schema property to set output schema. | Macro literals can lead to invalid schema parsing or creation. |
StreamBatchSource | schema, name, format | Stream is added and created through name and schema is parsed to set output schema. | Macro literals will lead to bad parsing of properties. |
TableSource | schema | Schema parsed to set output schema. | Macro literals will lead to failed or incorrect schema creation. |
TimePartitionedFileSetDatasetAvroSource | schema | Schema parsed to set output schema. | Macro literals will lead to failed or incorrect schema creation. |
TimePartitionedFileSetDatasetParquetSource | schema | Schema parsed to set output schema. | Macro literals will lead to failed or incorrect schema creation. |
JavaScriptTransform | schema, script, lookup | Schema format is used to set the output schema. JavaScript and lookup properties are also parsed for correctness. | Macro literals can cause parsing to fail for schema creation, JavaScript compilation, or lookup parsing. |
LogParserTransform | inputName | Gets field from input schema through inputName property. | With a macro literal, the field will not exist in the input schema. |
ProjectionTransform | fieldsToKeep, fieldsToDrop, fieldsToConvert, fieldsToRename | Properties are used to create output schema. | Macro literals will lied to a failed or wrong output schema being created. |
PythonEvaluator | schema | Schema parsed for correctness and set as output schema. | Macro literal can lead to failed or bad schema creation. |
ValidatorTransform | validators, validationScript, | Validator property used to set validator plugins. Script property is also parsed for correctness. | Macro literals can lead to failed parsing or plugins being set. Scripts can not be validated without validators. |
ElasticsearchSource | schema | Schema parsed for correctness and set as output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
HBaseSink | rowField, schema | Parsed to valid the output and input schemas and set the ouput schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
HBaseSource | schema | Parsed for correctness to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
HiveBatchSource | schema | Parsed for correctness to set ouput schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
MongoDBBatchSource | schema | Parsed for correctness and validated to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
NaiveBayesClassifier | predictionField | Configures and sets fields of output schema and checked for existence in input schema. | Output schema would be created wrongly with macro literal as prediction field and input schema check behavior is undefined. |
Compressor | compressor, schema | Parsed for correctness and used to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
CSVFormatter | schema | Parsed for correctness and used to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
CSVParser | field | Validated against input schema to check existence of field. | Macro literals may not exist as fields in the input schema. |
Decoder | decode, schema | Decode property is parsed and validated then used to validate the input schema. Schema parsed to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema. |
Decompressor | decompressor, schema | Decompressor property is parsed and validated then used to validate the input schema. Schema parsed to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema. |
Encoder | encode, schema | Encode property is parsed and validated then used to validate the input schema. Schema parsed to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation or incorrect validation of input schema. |
JSONFormatter | schema | Parsed for correctness and used to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
JSONParser | field, schema | Validates if field property is present in input schema. Parses schema property to set output schema. | Macro literal may not exist in input schema and may lead to failed parsing or creation of output schema. |
StreamFormatter | schema | Parsed for correctness and used to set output schema. | Macro literals can lead to failed or incorrect schema parsing/creation. |
*Â May need verification
Other plugins have fields that are validated/processed at configure time that do not affect the schema. In these cases, these can be moved to the prepare run method. The following plugins and fields would be affected:
Plugin | Fields | Use | Justification |
---|---|---|---|
StreamBatchSource | duration, delay | Parsed and validated for proper formatting. | The parsing/validation is not related to the schema's creation. |
TimePartitionedFileSetSource | duration, delay | Parsed and validated for proper formatting. | The parsing/validation is not related to the schema's or dataset's creation. |
ReferenceBatchSink | referenceName | Verifies reference name meets dataset ID constraints. | As dataset names can be macros, this supports the primary use case. |
ReferenceBatchSource | referenceName | Verifies that reference name meets dataset ID constraints. | As dataset names can be macros, this supports the primary use case. |
FileBatchSource | timeTable | Creates dataset from time table property. | This is a primary use case for macros. |
TimePartitionedFileSetSource | name, basePath | Name and basePath are used to create the dataset. | This is a primary use case for macros. |
BatchWritableSink | name, type | Creates dataset from properties. | This is a primary use case for macros. |
SnapshotFileBatchSink | name | Creates dataset from name field. | This is a primary use case for macros. |
BatchReadableSource | name, type | Dataset is created from name and type properties. | This is a primary use case for macros. |
SnapshotFileBatchSource | all properties* | Creates dataset from properties. | This is a primary use case for macros. |
TimePartitionedFileSetSink | all properties* | Creates dataset from properties. | This is a primary use case for macros. |
DBSource | importQuery, boundingQuery, splitBy, numSplits | Validate connection settings and parsed for formatting. | The parsing/validation does not lead to the creation of any schema or dataset. |
HDFSSink | timeSuffix | Parsed to validate proper formatting of time suffix. | The parsing/validation does not lead to the creation of any schema or dataset. |
KafkaProducer | async | Parsed to check proper formatting of boolean. | The parsing/validation does not lead to the creation of any schema or dataset. |
NaiveBayesClassifier | fieldToClassify | Checked if input schema field is of type String. | The validation does not lead to the creation or alteration of any schema. |
NaiveBayesTrainer | fieldToClassify, predictionField | Checked if input schema fields are of type String and Double respectively. | The validation does not lead to the creation or alteration of any schema. |
CloneRecord | copies | Validated against being 0 or over the max number of copies. | The validation does not lead to the creation of any schema or dataset. |
CSVFormatter | format | Validated for proper formatting. | The validation does not lead to the creation of any schema or dataset. |
CSVParser | format | Validated for proper formatting. | The validation does not lead to the creation of any schema or dataset. |
Hasher | hash | Checked against valid hash formats. | The check does not lead to the validation or alteration of any schema. |
JSONParser | mapping | Mappings extracted and placed into a map with their expressions. | The extraction does not affect any schema creation or validation. |
StreamFormatter | format | Checked against valid stream formats. | The check does not lead to the validation or alteration of any schema. |
ValueMapper | mapping, defaults | Parsed after configuration is initialized and validated. | The check does not lead to the validation or alteration of any schema. |
*Â May need verification
Â
Â
Â