Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titlePipeline Config
"stages": [
    {
        "name": "Database",
        "plugin": {
            "name": "Database",
            "type": "batchsource",
            "properties": {
				...
                "user": "${username}",
                "password": "${secure(sql-password)}",
                "jdbcPluginName": "jdbc",
                "jdbcPluginType": "${jdbc-type}",
                "connectionString": "jdbc:${jdbc-type}//${hostname}:${port}/${db-name}",
                "importQuery": "select * from ${table-name};"
            }
        }
    },
    {
        "name": "Table",
        "plugin": {
            "name": "Table",
            "type": "batchsink",                                        
            "properties": {
                "schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",
                \"fields\":[{\"name\":\"name\",\"type\":\"string\"},
                {\"name\":\"age\",\"type\":\"int\"},{\"name\":\"emp_id\",\"type\":\"long\"}]}",
                "name": "${table-name}",
                "schema.row.field": "name"
            }
        }
    }
]

Scoping:

Since If the macro-substitution is performed at the DataPipeline app level, it will be possible to scope at stage name level if the user desires that. 

...

Currently when we deploy a pipeline,  configurePipeline  configurePipeline is called on each plugin. we perform few validations in configure stage, specifically for schema, syntax for scripts, etc. In some Plugins we also create dataset if the dataset doesn't already exist. 

The dataset to write to can be macro-substituted. so we have to defer dataset creation to prepareRun rather than doing at configure stage. 

...

PluginConfigurer can be made not to extend DatasetConfigurer as it can no longer create dataset.

 

However there are certain fields which are used to determine the schema in the plugin and those cannot be macro-substituted as schema validation is essential during configure time and we want to disallow macro usage for them. 

One option for this is to use Annotations in plugins. 

The plugin can specify using an annotation if a property field can support macro or not. This will require CDAP Platform change as its a new Annotation.

 

...

There are two way of handling this.

1) At Platform Level

2) At DataPipeline App level

 

Platform Level:

Plugins can use "@Macro" annotation to specify if a plugin field can be a macro and also provides a configure-value to use at configure time to instantiate the plugin.  

when plugin instance is instantiated at configure time, macros cannot be substituted. 

  • If we want to keep the field with macro as is,  then the field has to be always a string, this is limiting for plugin developers. as they have to do type casting themselves for using macro on fields with other types than String.
  • By having a configure-value we can work-around that, so the plugin developer has to know that this value will be used at configure time. but this might seem unnecessary for the plugin developer as this configure-value isn't very useful except to avoid failure at configure time.

 

Code Block
public class TableSinkConfig extends PluginConfig {
  @Name(Properties.Table.NAME)
  @Description("Name of the table. If the table does not already exist, one will be created.")
  @Macro(enabled=true) // The name of the table can be specified by a runtime macro, by default macros are disabled for fields.
  private String name;

  @Name(Properties.Table.PROPERTY_SCHEMA)
  @Description("schema of the table as a JSON Object. If the table does not already exist, one will be " +
    "created with this schema, which will allow the table to be explored through Hive. If no schema is given, the " +
    "table created will not be explorable.")
  @Nullable

 // The schema should not be substituted by a runtime macro
  private String schemaStr;

  @Name(Properties.Table.PROPERTY_SCHEMA_ROW_FIELD)
  @Description("The name of the record field that should be used as the row key when writing to the table.")
  @Macro(enabled=true) // The name of the row field can also be specified by a runtime macro
  private String rowField;
}

...

 

This will require a CDAP platform level change as its a new annotation. PluginInstantiator has to understand and set fields appropriately..

 

Code Block
titleMacro Annotation
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Macro {

  /**
   * Default status if macro is enabled.
   */
  boolean DEFAULT_STATUS = false;

  /**
   * Returns if macro is enabled. Default is 'false'.
   */
  boolean enabled() default DEFAULT_STATUS;
}
 

...