Goals
Checklist
- User stories documented (Albert/Vinisha)
- User stories reviewed (Nitin)
- Design documented (Shankar/Kashif)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
...
Code Block |
---|
Function Syntax : ${macroFunction(macro)} ShorthandProperty lookup notationsyntax: ${macro} Example Usage: ${secure(accessKey)} - get access key from secure store ${logicalStartTime(timeFormat)} - apply time function on the timeFormat provided and use the value. The Default (shortHand) usage will substitute arguments using the following precedence: Custom Action Workflow-Token > Runtime Arguments > Stored Preferences Examples: ipConfig: ${hostname}:${port} JDBC connection string : jdbc:${jdbc-plugin}://${hostname}:${sql-port}/${db-name} Using the expanded syntax allows additional logic to be applied to the macro arguments through a macro function. Escaping can be supported using the \ (backslash) character (e.g. \${hostname} will not be substituted) Nested macros: if a macro contains another macro, Example : ${secure(${user-name})} In the above example, we want to lookup the user-name in properties first, then use secure store to get the key/password for that user-name. this final key/password will be used for that field. |
The shorthand notation supports retrieval precedence to limit the exposure of underlying workflow-tokens and runtime-arguments to pipeline operators. The "functionTime" macro function uses the logical start time of a run to perform the substitution. This is an example of a macro function that is not just a key-value lookup but allows for extra logic to be performed before a value is returned. For now, the implementation will only support the following macro functions: runtime-arguments. Once the secure store API is available, it will also support secure store. In the future, we can see if we will allow developers to create custom macro functions (similar to functionTime(...)).
Notes:
- For now, we will not support The current implementation for macro substitution supports recursive expansion of macros. That is, if a macro such as ${address} expands to ${hostname}:${port}, then ${hostname} and ${port} will not be evaluated. We will document this at first.We can expand the functionality later to recursively expand macros.In the case of a macro ${key} expanding to ${key}, we can implement a maximum depth of recursionHowever, this can lead to an infinite loop from circular macros, so we can add a maximum depth for expansion.
Code Block | ||
---|---|---|
| ||
"stages": [ { "name": "Database", "plugin": { "name": "Database", "type": "batchsource", "properties": { ... "user": "${username}", "password": "${secure(sql-password)}", "jdbcPluginName": "jdbc", "jdbcPluginType": "${jdbc-type}", "connectionString": "jdbc:${jdbc-type}//${hostname}:${port}/${db-name}", "importQuery": "select * from ${table-name};" } } }, { "name": "Table", "plugin": { "name": "Table", "type": "batchsink", "properties": { "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\", \"fields\":[{\"name\":\"name\",\"type\":\"string\"}, {\"name\":\"age\",\"type\":\"int\"},{\"name\":\"emp_id\",\"type\":\"long\"}]}", "name": "${table-name}", "schema.row.field": "name" } } } ] |
...
Code Block |
---|
public class TableSinkConfig extends PluginConfig { @Name(Properties.Table.NAME) @Description("Name of the table. If the table does not already exist, one will be created.") // The name of the table can be specified by a runtime macro, by default macros are disabled for fields. @Macro private String name; @Name(Properties.Table.PROPERTY_SCHEMA) @Description("schema of the table as a JSON Object. If the table does not already exist, one will be " + "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " + "table created will not be explorable.") @Nullable private String schemaStr; @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD) @Description("The name of the record field that should be used as the row key when writing to the table.") private String rowField; } |
...
PluginConfig Changes:
Code Block |
---|
@Beta public interfaceabstract PluginConfigurerclass PluginConfig extends DatasetConfigurer { /** Config implements Serializable { /** * Returns the {@link PluginProperties}. */ public final PluginProperties getProperties() { return properties; } /** * Returns false if the field is not annotated as a macro. If field is annotated as macro, * If the plugin field canvalue acceptis macro andchecked if the config for the plugin has its a macro, based thenon returnthe true,check elseit returnreturns true/false. */ public final boolean isMacro(String fieldName); { ... } } |
The method will return whether or not the property with the provided fieldName contains a macro at configure time. We don't want to force deferring macros to runtime in the case that a field is macroable but actually has no macro provided in its configuration. This allows optional checking of properties at configure time for simple pipelines.
...
Code Block | ||
---|---|---|
| ||
private final TableSinkConfig sinkConfig; @Override void configurePipeline(PipelineConfigurer pipelineConfigurer) { if (!pipelineConfigurersinkConfig.isMacro("datasetNamename")) { // create dataset if the datasetName field is not a macro pipelineConfigurer.createDataset(datasetNamename, datasetType, DatasetProperties.builder().addAll(properties).build()); } ... } |
...