Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
Expanded Syntax : 
${macro-type(macro)}
 
Shorthand notation:
${macro}
 
Example Usage: 
${runtime(hostname)) - get hostname from runtime arguments
${token(hostname)) - get hostname from workflow token
${secure(access_key)) - get access key from secure store 
${function_time(time_format)) - apply time function on the time_format provided and use the value. 
 
The Default (short-hand) usage will read from runtime arguments, having an expanded notation gives user option for using more macro types.
Examples :
ipConfig : ${hostname}:${port}
JDBC connection string : jdbc:{jdbc-plugin}://{hostname}:{sql-port}/{db-name}
 

 


{  
Code Block
titlePipeline Config
"stages": [
     {
        "name": "customersDatabase",
        "plugin": {
            "name": "FileDatabase",
            "type": "batchsource",
            "properties": {
				...
                "user": "${username}",
                "password": "${secure(sql-password)}",
                "jdbcPluginName": "jdbc",
                "jdbcPluginType": "${jdbc-type}",
          "path      "connectionString": "hdfs:jdbc:${jdbc-type}//host:port/${inputpath}"${hostname}:${port}/${db-name}",
                "importQuery": "select * from ${table-name};"
            }
        }
    },
    {
        "name": "itemsTable",
        "plugin": {
            "name": "FileTable",
            "type": "batchsourcebatchsink",                                        
            "properties": {
                "path": "hdfs://host:port/${inputpath}" // ${inputpath} will get replaced with the value of the 'items.inputpath' runtime argument
        }
      }
    }
  ]
} "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",
                \"fields\":[{\"name\":\"name\",\"type\":\"string\"},
                {\"name\":\"age\",\"type\":\"int\"},{\"name\":\"emp_id\",\"type\":\"long\"}]}",
                "name": "${table-name}",
                "schema.row.field": "name"
            }
        }
    }
]

New API's in Hydrator for Macro Support:

DataPipeline app instantiates a plugin (using plugin context) and then performs macro substitution on the plugin and uses the updated Plugin with macro substituted configs.

Code Block
titleMacroContext
interface MacroContext {	
	/**
	 * Given the macro key, return the translated value
     */ 
	String getValue(String macroKey);
}


Code Block
titleMacro Types
Based on the macro type, one of the below MacroContext's will be used to get the value for macro. 
 
DefaultMacroContext implements MacroContext {
	Map<String, String> runtimeArguments;
	String getValue(String macroKey) {
		return runtimeArguments.get(macroKey);
	}
}

SecureMacroContext implements MacroContext {
	SecureStore secureStore;
	String getValue(String macroKey) {
		return secureStore.get(macroKey);
	}
}

RuntimeFunctionMacro implements MacroContext {	
	TimeZone timeZone;
	long logicalStartTime;
	Function<String, String> timezoneFunction;
	String getValue(String arguments) {
		return timezoneFunction.apply(arguments);
	}
} 
 

 

Hydrator Plugin Changes

Currently when we deploy a pipeline,  configurePipeline is called on each plugin. we perform few validations in configure stage, specifically for schema, syntax for scripts, etc. In some Plugins we also create dataset if the dataset doesn't already exist. 

The dataset to write to can be macro-substituted. so we have to defer dataset creation in initialize, rather than doing at configure stage. 

However there are certain fields which are used to determine the schema in the plugin and those cannot be macro-substituted and we want to disallow macro usage for them. 

The plugin can specify using an annotation if a property field can support macro or not. This will require CDAP Platform change as its a new Annotation.

 

Code Block
// if a pipeline config fields does not support macro, can be specified at config level. 
// if not specified by default, all fields can be macro substituted
@Macro(enabled=false) 
public static class ValidatorConfig extends PluginConfig {
  @Description("Comma-separated list of validator plugins that are used in script")
  String validators;
  @Description(SCRIPT_DESCRIPTION)
  String validationScript;

  @Description("Lookup tables to use during transform. Currently supports KeyValueTable.")
  @Nullable
  String lookup;
}
Code Block
public class TableSinkConfig extends PluginConfig {
  @Name(Properties.Table.NAME)
  @Description("Name of the table. If the table does not already exist, one will be created.")
  private String name;

  @Name(Properties.Table.PROPERTY_SCHEMA)
  @Description("schema of the table as a JSON Object. If the table does not already exist, one will be " +
    "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " +
    "table created will not be explorable.")
  @Nullable
  @Macro(enabled=false)	
  // schema cannot be macro substituted, other fields can be macro substituted.
  private String schemaStr;

  @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD)
  @Description("The name of the record field that should be used as the row key when writing to the table.")
  private String rowField;
}

CDAP Platform Changes:

 

Code Block
titleMacro Annotation
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Macro {

  /**
   * Default status if macro is enabled.
   */
  boolean DEFAULT_STATUS = true;

  /**
   * Returns if macro is enabled. Default is 'true'.
   */
  boolean enabled() default DEFAULT_STATUS;
}
 

 

Changes to Existing Plugins (WIP)

...