Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations. For example, the BigQuery sink plugin can have schema which does not match with an underlying BigQuery table. CDAP pipeline developer can use new validation endpoint to validate the stages before deploying the pipeline. In order to fail fast, validation endpoint should return all the validation errors from a given stage when this endpoint is called. 

Data pipeline app exposes various exceptions to plugins so that appropriate exception is thrown while validating the plugins. In future releases, new exception types can be introduced. Data pipeline app should be modified so that app artifacts does not need to be replaced for every new type of exception.

Goals

  • To fail fast, introduce a new api to collect multiple error messages from plugins at configure time

  • Decouple various validation exception types from data pipeline app
  • Instrument plugins to use this api to return multiple error messages for validation endpoint

User Stories 

  • As a CDAP pipeline developer, if a pipeline contains plugin configurations which are invalid, I will like it to fail early with appropriate error message specific to the field or config property.
  • As a CDAP pipeline developer, I should be able to incorporate new types of exceptions that are exposed by Data pipeline app in custom plugins without replacing data pipeline artifacts.

API Changes for Plugin Validation

Plugin Validation

Plugin validation endpoint is used to surface all the stage level errors at once. To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. If there are one or more errors added to stage configurer, the pipeline deployment will fail.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations. For example, the BigQuery sink plugin can have schema which does not match with an underlying BigQuery table. CDAP pipeline developer can use new validation endpoint to validate the stages before deploying the pipeline. In order to fail fast, validation endpoint should return all the validation errors from a given stage when this endpoint is called. 

Data pipeline app exposes various exceptions to plugins so that appropriate exception is thrown while validating the plugins. In future releases, new exception types can be introduced. When plugins with new exception types are pushed to hub, data pipeline artifacts need to be upgraded for every new type of exception that is introduced. A better approach would be to modify data pipeline app so that app artifacts do not need to be replaced for every new type of exception.

Goals

  • To fail fast, introduce a new api to collect multiple error messages from plugins at configure time

  • Decouple various validation exception types from data pipeline app
  • Instrument plugins to use this api to return multiple error messages for validation endpoint

User Stories 

  • As a CDAP pipeline developer, I validate a stage, I expect that all the invalid config properties and schema fields are highlighted on CDAP UI with appropriate error message.
  • As a plugin developer, I should be able to use new validation exception types without replacing data pipeline app artifacts. 

API Changes for Plugin Validation

Approach 1

Surface multiple validation errors from plugin

To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. If there are any validation errors added to stage configurer, the pipeline deployment will fail and all the errors will be returned as a response to stage validation REST endpoint.

Code Block
languagejava
titleStageConfigurer.java
public interface StageConfigurer {

  /**
   * get the input schema for this stage, or null if its unknown
   *
   * @return input schema
   */
  @Nullable
  Schema getInputSchema();

  /**
   * set the output schema for this stage, or null if its unknown
   *
   * @param outputSchema output schema for this stage
   */ 	
  void setOutputSchema(@Nullable Schema outputSchema);

  /**
   * set the error schema for this stage, or null if its unknown.
   * If no error schema is set, it will default to the input schema for the stage. Note that since source
   * plugins do not have an input schema, it will default to null for sources.
   *
   * @param errorSchema error schema for this stage
   */
  void setErrorSchema(@Nullable Schema errorSchema);

  /**
   * add validation errors for this stage to the configurer if pipeline stage is invalid. 
   *
   * @param error {@link ValidationException} when a pipeline stage is invalid for any reason.
   */
  void addValidationError(ValidationException error);
}

Decoupling exception types from data pipeline app

A new ValidationException is introduced to . It includes the type of the exception and any additional properties for that exception. In a new cdap module, each specific type of plugin validation exception can extend this class. This will allow us to easily add more types of exceptions for plugin validations while removing a need to update data pipeline artifact for each new exception type. 

Code Block
languagejava
titleValidationException.java
/**
 * Represents some sort of error that occurred during validation.
 */
@Beta
public class ValidationException extends RuntimeException {
  private static final String TYPE = "ERROR";
  protected final Map<String, String> props = new HashMap<>();

  public ValidationException(String message) {
    super(message);
  }

  public ValidationException(String message, Throwable cause) {
    super(message, cause);
  }

  /**
   * Returns the type of the validation exception. Type has to be unique for every validation exception.
   */
  public String getType() {
    return TYPE;
  }

  /**
   * Returns additional properties for the error.
   */
  public Map<String, String> getProperties() {
    return props;
  }
}


A new CDAP module will be introduced to add new exception types for plugins. Plugins will have this module as compile time dependency which means when a type of exception is added, plugins can leverage it by just installing the module locally. Below are some of the exception types that will be added to this module.

Code Block
/**
 * Represents some sort of error that occurred during stage validation.
 */
@Beta
public class InvalidStageException extends ValidationException {
  private static final String TYPE = "STAGE_ERROR";

  public InvalidStageException(String message, String stage) {
    super(message);
    props.put("stage", stage);
  }

  public InvalidStageException(String message, Throwable cause, String stage) {
    super(message, cause);
    props.put("stage", stage);
  }

  public String getType() {
    return TYPE;
  }
}
Code Block
languagejava
titleInvalidStagePropertyException.java
/**
 * Represents invalid stage property error that occurred during stage validation.
 */
public class InvalidStagePropertyException extends InvalidStageException {
  private static final String TYPE = "INVALID_FIELD";

  public InvalidStagePropertyException(String message, String stage, String property) {
    super(message, stage);
    props.put("property", property);
  }

  public InvalidStagePropertyException(String message, Throwable cause, String stage, String property) {
    super(message, cause, stage);
    props.put("property", property);
  }

  public String getType() {
    return TYPE;
  }
}

Sources and sinks can have schema mismatch with underlying storage. A new type of exception can be introduced so that invalid schema fields can be highlighted when schema mismatch occurs:

Code Block
languagejava
titleInvalidSchemaFieldException.java
/**
 * Represents schema mismatch that occurred during stage validation.
 */
@Beta
public class InvalidSchemaFieldException extends InvalidStageException {
  private static final String TYPE = "INVALID_SCHEMA";

  public InvalidSchemaFieldException(String message, String stage, String field) {
    super(message, stage);
    props.put("field", field);
  }

  public InvalidSchemaFieldException(String message, Throwable cause, String stage, String field) {
    super(message, cause, stage);
    props.put("field", field);
  }

  public String getType() {
    return TYPE;
  }
}

The plugins can add these exceptions to the stage configurer as below:

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    stageConfigurer.addStageError(new InvalidStagePropertyException(e.getMessage(), "filterRegex"));
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    stageConfigurer.addStageError(new ValidationException("source and destination filesets must be different"));
  }
}


Approach 2

In Approach 1 the contract of getProperties() method is to return a Map<String, String>. For example, stage validation exception can include stage → stage_name as a key value pair in the properties map. This map will be included in the json response of validation endpoint along with type and message. This contract limits the type of properties that can be added to the json response. For example, when pipeline validation endpoint is implemented, the additional properties of an error may include non-string objects.

Another option is to return a StructuredRecord with additional properties. While StructuredRecord does not have any limitation on type of properties that can be returned in the json response, each specific exception will have to create a StructuredRecord with additional properties. Because of the limitation of above contract, using StructuredRecord to get exception properties is a better approach.

Code Block
languagejava
titleValidationException.java
/**
 * Represents some sort of error that occurred during validation.
 */
@Beta
public class ValidationException extends RuntimeException {
  private static final String TYPE = "ERROR";

  public ValidationException(String message) {
    super(message);
  }

  public ValidationException(String message, Throwable cause) {
    super(message, cause);
  }

  /**
   * Returns the type of the validation exception. Type has to be unique for every validation exception.
   */
  public String getType() {
    return TYPE;
  }

  /**
   * Returns additional properties for the error as a {@link StructuredRecord}. If there are no additional properties
   * for this error, then return null.
   */
  @Nullable
  public StructuredRecord getProperties() {
    return null;
  }
}


Below are the corresponding specific exception types using above api:

Code Block
languagejava
titleStageConfigurerInvalidStageException.java
/**
 * Represents some sort of error that occurred during stage validation.
 */
@Beta
public class InvalidStageException interfaceextends StageConfigurerValidationException {
  private static final String /**TYPE = "STAGE_ERROR";
 * getprivate thefinal inputSchema schema;
 for thisprivate stage,final orStructuredRecord nullrecord;
if
its unknown public InvalidStageException(String message, *String stage) {
 * @return input schemasuper(message);
   */ schema  @Nullable
  Schema getInputSchema();

  /**
   * set the output schema for this stage, or null if its unknown
   *
   * @param outputSchema output schema for this stage
   */ 	
  void setOutputSchema(@Nullable Schema outputSchema);

  /**
   * set the error schema for this stage, or null if its unknown.
   * If no error schema is set, it will default to the input schema for the stage. Note that since source
   * plugins do not have an input schema, it will default to null for sources.
   *
   * @param errorSchema error schema for this stage
   */
  void setErrorSchema(@Nullable Schema errorSchema);

  /**
   * add errors for this stage to the configurer if pipeline stage is invalid. 
   *
   * @param error {@link ValidationException} when a pipeline stage is invalid for any reason.
   */
  void addStageError(ValidationException error);
}

where ValidationException can be defined as below:
/**
 * Exception thrown while validating a plugin.
 */
@Beta
public class ValidationException extends RuntimeException {
  private static final String ERROR = "ERROR";
  protected final String type;

  public ValidationException(String message) {
    super(message);
    this.type = getType= Schema.recordOf("StageError", Schema.Field.of("stage", Schema.of(Schema.Type.STRING)));
    record = StructuredRecord.builder(schema).set("stage", stage).build();
  }

  public InvalidStageException(String message, Throwable cause, String stage) {
    super(message, cause);
    schema = Schema.recordOf("StageError", Schema.Field.of("stage", Schema.of(Schema.Type.STRING)));
    record = StructuredRecord.builder(schema).set("stage", stage).build();
  }

  @Override
  public String getType() {
    return TYPE;
  }

  @Nullable
  @Override
  public StructuredRecord getProperties() {
    return record;
  }
}
Code Block
languagejava
titleInvalidSchemaFieldException
/**
 * Represents schema mismatch that occurred during stage validation.
 */
@Beta
public class InvalidSchemaFieldException extends ValidationException {
  private static final String TYPE = "INVALID_SCHEMA";
  private final Schema schema;
  private final StructuredRecord record;

  public InvalidSchemaFieldException(String message, String stage, String field) {
    super(message);
    schema = Schema.recordOf("InvalidSchema",
                             Schema.Field.of("stage", Schema.of(Schema.Type.STRING)),
                             Schema.Field.of("field", Schema.of(Schema.Type.STRING)));
    record = StructuredRecord.builder(schema)
      .set("stage", stage)
      .set("field", field)
      .build();
  }

  public ValidationExceptionInvalidSchemaFieldException(String message, Throwable cause, String stage, String field) {
    super(message, cause);
    this.type = getType();
  }

  /**
   * Returns type of the exception.
);
  */  schema public String getType() {= Schema.recordOf("InvalidSchema",
     return ERROR;   }    /**    * Returns json string which should include type, message and other attributes Schema.Field.of the validation exception.("stage", Schema.of(Schema.Type.STRING)),
   */   public String toJson() {     // return json string containing type, error message     return "{\"message\" : \"" + getMessage() + "\", \"type\" : \"" + getType() + "\"}";
  }
}



A new module will be introduced so that whenever new type of exception is added, data pipeline artifact does not need to be updated. Below are some of the exceptions that will be added to the new module.

Code Block
languagejava
titleInvalidStagePropertyException.java
/**
 * Exception used for invalid stage property.
 */
@Beta
public class InvalidStagePropertyException extends ValidationException {
  private final String property;
  private static final String STAGE_ERROR = "STAGE_ERROR";

  public InvalidStagePropertyException(String message, String property) {
    super(message);
    this.property = property;
  }

  public InvalidStagePropertyException(String message, Throwable cause, String property) {
    super(message, cause);
    this.property = property;
  }

  public String getProperty( Schema.Field.of("field", Schema.of(Schema.Type.STRING)));
    record = StructuredRecord.builder(schema)
      .set("stage", stage)
      .set("field", field)
      .build();
  }

  @Override
  public String getType() {
    return TYPE;
  }

  @Nullable
  @Override
  public StructuredRecord getProperties() {
    return record;
  }
}
Code Block
/**
 * Represents invalid stage property error that occurred during stage validation.
 */
public class InvalidStagePropertyException extends ValidationException {
  private static final String TYPE = "INVALID_FIELD";
  private final Schema schema;
  private final StructuredRecord record;

  public InvalidStagePropertyException(String message, String stage, String property) {
    return property;
super(message);
 }   schema @Override
  public String getType() {= Schema.recordOf("InvalidField",
      return STAGE_ERROR;   }    @Override   public String toJson()  {     return "{\"message\" : \"" + getMessage() +
  Schema.Field.of("stage", Schema.of(Schema.Type.STRING)),
    "\", \"type\" : \"" + getType() +       "\", \"property\" : \"" + property + "\"}";   } }

Sources and sinks can have schema mismatch with underlying storage. A new type of exception can be introduced so that invalid schema fields can be highlighted when schema mismatch occurs:

Code Block
languagejava
titleInvalidSchemaFieldException.java
/**
 * Exception used for invalid schema field.
 */
@Beta
public class InvalidSchemaFieldException extends ValidationException {
  private final String field;
  private static final String SCHEMA_ERROR = "SCHEMA_ERROR";

  public InvalidSchemaFieldException(String message, String field) {
    super(message);
    this.field = field;
  }

  public InvalidSchemaFieldException(String message, Throwable cause, String field) {
    super(message, cause);
    this.field = field;
  }

  public String getField() {
    return field;
  }

  @Override
  public String getType() {
    return SCHEMA_ERROR Schema.Field.of("property", Schema.of(Schema.Type.STRING)));
    record = StructuredRecord.builder(schema).set("stage", stage).set("property", property).build();
  }

  public InvalidStagePropertyException(String message, Throwable cause, String stage, String property) {
    super(message, cause);
    schema = Schema.recordOf("InvalidField",
                             Schema.Field.of("stage", Schema.of(Schema.Type.STRING)),
                             Schema.Field.of("property", Schema.of(Schema.Type.STRING)));
    record = StructuredRecord.builder(schema).set("stage", stage).set("property", property).build();
  }

  @Override
  public String toJsongetType() {
    return "{\"message\" : \"" + getMessage() +
 TYPE;
  }

  @Nullable
  @Override
 "\", \"type\" : \"" + getType public StructuredRecord getProperties() +{
      "\", \"field\" : \"" + field + "\"}";
  }
}

In order to validate plugins, this api can be used in plugins as below:

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    stageConfigurer.addStageError(new InvalidStagePropertyException(e.getMessage(), "filterRegex"));
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    stageConfigurer.addStageError(new ValidationException("source and destination filesets must be different"));
  }
}

Deprecated APIs

Following exception apis will be deprecated:

ValidationError
ValidationErrorSerDe
PluginNotFoundError
InvalidConfigPropertyError
StageValidationError
StageValidationResponse

Impact on UI

UI should be able to handle new error types that are introduced. For example, for invalid stage properties, UI should highlight all the invalid properties for a given stage. For schema mismatch, UI should be able to highlight schema fields that are mismatching. UI should also handle type that are not 

Test Scenarios

Test IDTest DescriptionExpected Results

Releases

Release 6.1.0

Related Work

  • Work #1
  • Work #2
  • Work #3
Future work
return record;
  }
}


Deprecated APIs

Following exception apis will be deprecated:

ValidationError
ValidationErrorSerDe
PluginNotFoundError
InvalidConfigPropertyError
StageValidationError
StageValidationResponse

Impact on UI

UI should be able to handle new error types that are introduced. For example, for invalid stage properties, UI should highlight all the invalid properties for a given stage. For schema mismatch, UI should be able to highlight schema fields that are mismatching. UI should also handle type that are not 

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release 6.1.0

Related Work

  • Work #1
  • Work #2
  • Work #3

Future work

/**
* Represents schema mismatch that occurred during stage validation.
*/
@Beta
public class InvalidSchemaFieldException extends InvalidStageException {
private static final String TYPE = "INVALID_SCHEMA";

public InvalidSchemaFieldException(String message, String stage, String field) {
super(message, stage);
props.put("field", field);
}

public InvalidSchemaFieldException(String message, Throwable cause, String stage, String field) {
super(message, cause, stage);
props.put("field", field);
}

public String getType() {
return TYPE;
}
}