Goals
Checklist
- User stories documented (Albert/Vinisha)
- User stories reviewed (Nitin)
- Design documented (Albert/Vinisha)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
Use Cases
- A pipeline developer wants to create a pipeline that has several configuration settings that are not known at pipeline creation time, but that are set at the start of the each pipeline run. For example, the time partition(s) that should be read by the source, and the name of the dataset sink, need to be set at a per-run basis. The arguments can be set either through CDAP runtime arguments/preferences, or by the pipeline itself. For example, at the start of the run, the pipeline performs some action (ex: queries a dataset or makes an http call) to lookup which time partitions should be read, and where data should be written to, for that pipeline run. Alternatively, a user can manually specify the time partitions through CDAP runtime arguments/preferences then start the run.
User Stories
- As a pipeline developer, I want to be able to configure a plugin property to some value that will get substituted for each run based on the runtime arguments
- As a pipeline operator, I want to be able to set arguments for the entire pipeline that will be used for substitution
- As a pipeline operator, I want to be able to set arguments for a specific stage in the pipeline that will be used for substitution
- As a plugin developer, I want to be able to write a code that is executed at the start of the pipeline and sets arguments for the rest of the run.
Design (WIP - dont review yet)
Macros Syntax
Expanded Syntax : ${macro-type(macro)} Shorthand notation: ${macro} Example Usage: ${runtime(hostname)) - get hostname from runtime arguments ${token(hostname)) - get hostname from workflow token ${secure(access_key)) - get access key from secure store ${function_time(time_format)) - apply time function on the time_format provided and use the value. The Default (short-hand) usage will read from runtime arguments, having an expanded notation gives user option for using more macro types. Examples : ipConfig : ${hostname}:${port} JDBC connection string : jdbc:{jdbc-plugin}://{hostname}:{sql-port}/{db-name}
"stages": [ { "name": "Database", "plugin": { "name": "Database", "type": "batchsource", "properties": { ... "user": "${username}", "password": "${secure(sql-password)}", "jdbcPluginName": "jdbc", "jdbcPluginType": "${jdbc-type}", "connectionString": "jdbc:${jdbc-type}//${hostname}:${port}/${db-name}", "importQuery": "select * from ${table-name};" } } }, { "name": "Table", "plugin": { "name": "Table", "type": "batchsink", "properties": { "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\", \"fields\":[{\"name\":\"name\",\"type\":\"string\"}, {\"name\":\"age\",\"type\":\"int\"},{\"name\":\"emp_id\",\"type\":\"long\"}]}", "name": "${table-name}", "schema.row.field": "name" } } } ]
New API's in Hydrator for Macro Support:
DataPipeline app instantiates a plugin (using plugin context) and then performs macro substitution on the plugin and uses the updated Plugin with macro substituted configs.
interface MacroContext { /** * Given the macro key, return the translated value */ String getValue(String macroKey); }
Based on the macro type, one of the below MacroContext's will be used to get the value for macro. DefaultMacroContext implements MacroContext { Map<String, String> runtimeArguments; String getValue(String macroKey) { return runtimeArguments.get(macroKey); } } SecureMacroContext implements MacroContext { SecureStore secureStore; String getValue(String macroKey) { return secureStore.get(macroKey); } } RuntimeFunctionMacro implements MacroContext { TimeZone timeZone; long logicalStartTime; Function<String, String> timezoneFunction; String getValue(String arguments) { return timezoneFunction.apply(arguments); } }
Hydrator Plugin Changes
Currently when we deploy a pipeline, configurePipeline is called on each plugin. we perform few validations in configure stage, specifically for schema, syntax for scripts, etc. In some Plugins we also create dataset if the dataset doesn't already exist.
The dataset to write to can be macro-substituted. so we have to defer dataset creation in initialize, rather than doing at configure stage.
However there are certain fields which are used to determine the schema in the plugin and those cannot be macro-substituted and we want to disallow macro usage for them.
The plugin can specify using an annotation if a property field can support macro or not. This will require CDAP Platform change as its a new Annotation.
// if a pipeline config fields does not support macro, can be specified at config level. // if not specified by default, all fields can be macro substituted @Macro(enabled=false) public static class ValidatorConfig extends PluginConfig { @Description("Comma-separated list of validator plugins that are used in script") String validators; @Description(SCRIPT_DESCRIPTION) String validationScript; @Description("Lookup tables to use during transform. Currently supports KeyValueTable.") @Nullable String lookup; }
public class TableSinkConfig extends PluginConfig { @Name(Properties.Table.NAME) @Description("Name of the table. If the table does not already exist, one will be created.") private String name; @Name(Properties.Table.PROPERTY_SCHEMA) @Description("schema of the table as a JSON Object. If the table does not already exist, one will be " + "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " + "table created will not be explorable.") @Nullable @Macro(enabled=false) // schema cannot be macro substituted, other fields can be macro substituted. private String schemaStr; @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD) @Description("The name of the record field that should be used as the row key when writing to the table.") private String rowField; }
CDAP Platform Changes:
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface Macro { /** * Default status if macro is enabled. */ boolean DEFAULT_STATUS = true; /** * Returns if macro is enabled. Default is 'true'. */ boolean enabled() default DEFAULT_STATUS; }
/** * Contains information about a property used by a plugin. */ @Beta public class PluginPropertyField { private final String name; private final String description; private final String type; private final boolean required; // returns true if this field can accept macro private final boolean macroEnabled; ... }
Changes to Existing Plugins (WIP)
Many plugins have fields (configurable properties) that are used in constructing or validating a schema at configure time. These fields need to have macros disabled to allow this. The following plugins and fields would be affected:
Plugin | Fields | Use | Conflict |
---|---|---|---|
BatchCassandraSource | schema | Parsed for correctness to create the schema. | Parsing a macro or schema with a nested macro would fail. |
CopybookSource | copybookContents | Copybook contents are converted to an InputStream and used to get external records, which are in turn used to add fields to the schema. | Schema would add macro literal as a field. |
DedupAggregator | uniqueFields, filterOperation | Both fields are used to validate the input schema created. | Macro literals do not exist as fields in schema and will throw IllegalArgumentException. |
DistinctAggregator | fields | Specifies the fields used to construct the output schema. | Will add macro literals as schema fields.* |
GroupByAggregator | groupByFields, aggregates, | Gets fields from input schema and adds aggregates to to output fields list. | Macro literals do not exist in input schema or are valid fields for an output schema. |
RowDenormalizerAggregator | keyField, nameField, valueField | Gets schemas by field names from the input schema. | Macro literals do not exist as fields in the input schema. |
BatchWritableSink | name, type | Creates dataset from properties. | Macro literals will lead to a misnamed dataset and type. |
KVTableSink | keyField, valueField | Validates that presence and type of these fields in the input schema. | Macro literals will not exist in the input schema. |
ReferenceBatchSink | referenceName | Verifies reference name meets dataset ID constraints. | Macro literal may not meet constraints. |
SnapshotFileBatchAvroSink | schema | Parses schema to add file properties. | Macro literals may disallow schema parsing or incorrect schema creation. |
SnapshotFileBatchParquetSink | schema | Parses schema to add file properties. | Macro literals may disallow schema parsing or incorrect schema creation. |
SnapshotFileBatchSink | name | Creates dataset from name field. | Name of data set will change on macro substitution. |
* May need verification
Other plugins have fields that are validated/processed at configure time that do not affect the schema. In these cases, these can be moved to initialization time. The following plugins and fields would be affected:
Plugin | Fields | Use |
---|---|---|
* May need verification
Setting Hydrator runtime arguments using CDAP runtime arguments/preferences
CDAP preferences and runtime arguments will be used directly as Hydrator arguments.
1.) Runtime arguments can be passed to hydrator pipeline in 2 ways:
- Using Prepipeline-CustomActions:
Prepipeline custom actions can set runtime arguments. For example, before running the pipeline, custom actions can copy local files to hdfs and set runtime arguments for input path for batchsource. In order to do that, we can expose setPreferences() and getPreferences() programmatic api for setting runtime arguments. These arguments can be passed to hydrator app using workflow token. - Using Hydrator UI:
For each stage, runtime arguments can be passed from hydrator UI using cdap REST endpoints for preferences/runtime arguments framework.
2.) Hydrator app will substitute properties using Macro substitution for each ETLStage. Now, plugins, like SFTP, which need secure substitution using key management can use 'secure' prefix in the macro. Macro substitution should vary depending on prefix of the arguments. In case of secure key, macro can be '$secure.key', in case of value directly to be substituted, macro can be '$inputpath' without any prefix.
Thoughts from Terence: