Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

CDAP pipeline is composed of various CDAP plugins. These plugins handle error situations in case of invalid inputs or configurations. While developing CDAP pipelines, pipeline developer can provide invalid plugin configurations. For example, the BigQuery sink plugin can have schema which does not match with an underlying BigQuery table. CDAP pipeline developer can use new validation endpoint to validate the stages before deploying the pipeline. In such situations, order to fail fast, pipeline plugins validation endpoint should provide return all the error messages at once. validation errors from a given stage when this endpoint is called. 

Data pipeline app exposes various exceptions to plugins so that appropriate exception is thrown while validating the plugins. In future releases, new exception types can be introduced. Data pipeline app should be modified so that app artifacts does not need to be replaced for every new type of exception.

Goals

  • To fail fast, introduce a new api to collect multiple error messages from plugins at configure time

  • Decouple various validation exception types from data pipeline app
  • Instrument plugins to use this api to return multiple error messages for validation endpoint

User Stories 

  • As a CDAP pipeline developer, if a pipeline contains plugin configurations which are invalid, I will like it to fail early with appropriate error message specific to the field or config property.
  • As a CDAP pipeline developer, I should be able to incorporate new types of exceptions that are exposed by Data pipeline app in custom plugins without replacing data pipeline artifacts.

API Changes for Plugin Validation

Plugin Validation

Plugin validation endpoint is used to surface all the stage level errors at once. To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. If there are one or more errors added to stage configurer, the pipeline deployment will fail.

Code Block
languagejava
titleStageConfigurer.java
public interface StageConfigurer {

  /**
   * get the input schema for this stage, or null if its unknown
   *
   * @return input schema
   */
  @Nullable
  Schema getInputSchema();

  /**
   * set the output schema for this stage, or null if its unknown
   *
   * @param outputSchema output schema for this stage
   */ 	
  void setOutputSchema(@Nullable Schema outputSchema);

  /**
   * set the error schema for this stage, or null if its unknown.
   * If no error schema is set, it will default to the input schema for the stage. Note that since source
   * plugins do not have an input schema, it will default to null for sources.
   *
   * @param errorSchema error schema for this stage
   */
  void setErrorSchema(@Nullable Schema errorSchema);

  /**
   * add errors for this stage to the configurer if pipeline stage is invalid. 
   *
   * @param error {@link InvalidStageExceptionValidationException} when a pipeline stage is invalid for any reason.
   */
  void addStageError(InvalidStageExceptionValidationException error);
}

Plugins can use this api as below:

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer


where ValidationException can be defined as below:
/**
 * Exception thrown while validating a plugin.
 */
public class ValidationException extends RuntimeException {
  private static final String ERROR = "ERROR";
  protected final String type;

  public ValidationException(String message) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class  super(message);
  StageConfigurer  stageConfigurerthis.type = pipelineConfigurer.getStageConfigurergetType();
  try}

  public ValidationException(String message, Throwable cause) {
    Pattern.compile(conf.filterRegexsuper(message, cause);
    this.type = getType();
  }
catch (Exception e) {  
    stageConfigurer.addStageError(new InvalidConfigPropertyException(e.getMessage(), "filterRegex"));
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    stageConfigurer.addStageError(new InvalidStageException("source and destination filesets must be different"))
  /**
   * Returns type of the exception.
   */
  public String getType() {
    return ERROR;
  }

  /**
   * Returns json string which should include type, message and other attributes of the validation exception.
   */
  public String toJson() {
    // return json string containing type, error message
    return "{\"message\" : \"" + getMessage() + "\", \"type\" : \"" + getType() + "\"}";
  }
}
Sources and sinks can have schema mismatch with underlying storage. A bew type of exception can be introduced so that invalid schema fields can be highlighted when schema mismatch occurs:





A new module will be introduced so that whenever new type of exception is added, data pipeline artifact does not need to be updated. Below are some of the exceptions that will be added to the new module.

Code Block
languagejava
titleInvalidSchemaFieldExceptionInvalidStagePropertyException.java
/**
 * Exception used for invalid stage property.
 */
public class InvalidSchemaFieldExceptionInvalidStagePropertyException extends InvalidStageExceptionValidationException {
  private final String fieldproperty;
  private static final String STAGE_ERROR = "STAGE_ERROR";

  public InvalidSchemaFieldExceptionInvalidStagePropertyException(String message, String fieldproperty) {
    super(message);
    this.fieldproperty = fieldproperty;
  }

  public InvalidSchemaFieldExceptionInvalidStagePropertyException(String message, Throwable cause, String fieldproperty) {
    super(message, cause);
    this.fieldproperty = fieldproperty;
  }

  public String getFieldgetProperty() {
    return fieldproperty;
  }
}

Validation error will have corresponding INVALID_SCHEMA type for UI to identify schema field errors.

Code Block
languagejava
titleValidationError.java
public class ValidationError

  @Override
  public String getType() {
  protected final Typereturn typeSTAGE_ERROR;
  protected final String message;}

  @Override
/**  public String *toJson() Types{
of validation errors  return  */
  public enum Type {"{\"message\" : \"" + getMessage() +
      ERROR"\", \"type\" : \""  STAGE_ERROR,
+ getType() +
   INVALID_FIELD,   "\",  PLUGIN_NOT_FOUND,
    INVALID_SCHEMA\"property\" : \"" + property + "\"}";
  }
}

 ...
}


Sources and sinks can have schema mismatch with underlying storage. A new type of exception can be introduced so that invalid schema fields can be highlighted when schema mismatch occurs:

Code Block
languagejava
titleInvalidSchemaFieldErrorInvalidSchemaFieldException.java
/**
 * AnException errorused thatfor occurredinvalid dueschema to field schema mismatch in a specific pipeline stage.
 */
public class InvalidSchemaFieldErrorInvalidSchemaFieldException extends StageValidationErrorValidationException {
  private final String field;
  private static final String SCHEMA_ERROR = "SCHEMA_ERROR";

  public InvalidSchemaFieldErrorInvalidSchemaFieldException(String stagemessage, InvalidSchemaFieldExceptionString causefield) {
    this(cause.getMessage(), stage, cause.getField())super(message);
    this.field = field;
  }

  public InvalidConfigPropertyErrorInvalidSchemaFieldException(String message, StringThrowable stagecause, String field) {
    super(Type.INVALID_SCHEMA, message, stagecause);
    this.field = field;
  }

  public String getField() {
    return field;
  }

  @Override
  public booleanString equals(Object o) {
    ....
  }

  @Override
  public int hashCode() {
    ...
  getType() {
    return SCHEMA_ERROR;
  }

  @Override
  public String toJson() {
    return "{\"message\" : \"" + getMessage() +
      "\", \"type\" : \"" + getType() +
      "\", \"field\" : \"" + field + "\"}";
  }
}


In order to validate plugins, this api can be used in plugins as below:

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    stageConfigurer.addStageError(new InvalidStagePropertyException(e.getMessage(), "filterRegex"));
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    stageConfigurer.addStageError(new ValidationException("source and destination filesets must be different"));
  }
}


Impact on UI

UI changes will be needed for invalid schema type errors returned from validation endpointshould be able to handle new error types that are introduced.

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release 6.1.0

Related Work

  • Work #1
  • Work #2
  • Work #3

Future work