Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations. For example, the BigQuery sink plugin can have schema which does not match with an underlying BigQuery table. CDAP pipeline developer can use new validation endpoint to validate the stages before deploying the pipeline. In order to fail fast, validation endpoint should return all the validation errors from a given stage when this endpoint is called. 

Data pipeline app exposes various error types for plugin validation. In future releases, new error types can be introduced. When plugins with new error types are pushed to hub, data pipeline artifacts need to be upgraded for every new type of error that is introduced. This is because the validation errors are defined in the data pipeline app itself. A better approach would be to modify data pipeline app so that app artifacts do not need to be replaced for every new type of error.

Goals

  • To fail fast, introduce a new api to collect multiple error messages from plugins at configure time

  • Decouple various validation error types from data pipeline app
  • Instrument plugins to use this api to return multiple error messages for validation endpoint

User Stories 

  • As a CDAP pipeline developer, when I validate a stage, I expect that all the invalid config properties and schema fields are highlighted on CDAP UI with appropriate error message.
  • As a plugin developer, I should be able to specify all the validation errors while configuring the plugin for better user experience.
  • As a plugin developer, I should be able to use new validation error types without replacing data pipeline app artifacts. 

API Changes for Plugin Validation

Approach 1

Collect multiple errors from plugins 

To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. If there are any validation errors added to stage configurer, the pipeline deployment will fail and all the errors will be returned as a response to stage validation REST endpoint.

Code Block
languagejava
titleStageConfigurer.java
public interface StageConfigurer {

  ...

  /**
   * add validation errors for this stage to the configurer if pipeline stage is invalid. 
   *
   * @param error {@link ValidationError} when a pipeline stage is invalid for any reason.
   */
  void addValidationError(ValidationError error);
}

Decoupling plugin error types from data pipeline app

A new ValidationError class is introduced to collect multiple validation errors in stage configurer. This class will be exposed by data pipeline app to the plugins. Each new type of error can be added to hydrator-common which can be added as compile time dependency to the plugins. This approach allows us to easily add more types of validation errors for plugins while removing a need to update data pipeline artifact for each new error type. Below are some of the error types that will be added to hydrator-common

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations. For example, the BigQuery sink plugin can have schema which does not match with an underlying BigQuery table. CDAP pipeline developer can use new validation endpoint to validate the stages before deploying the pipeline. In order to fail fast, validation endpoint should return all the validation errors from a given stage when this endpoint is called. 

Data pipeline app exposes various error types for plugin validation. In future releases, new error types can be introduced. When plugins with new error types are pushed to hub, data pipeline artifacts need to be upgraded for every new type of error that is introduced. This is because the validation errors are defined in the data pipeline app itself. A better approach would be to modify data pipeline app so that app artifacts do not need to be replaced for every new type of error.

Goals

  • To fail fast, introduce a new api to collect multiple error messages from plugins at configure time

  • Decouple various validation error types from data pipeline app
  • Instrument plugins to use this api to return multiple error messages for validation endpoint

User Stories 

  • As a CDAP pipeline developer, when I validate a stage, I expect that all the invalid config properties and schema fields are highlighted on CDAP UI with appropriate error message.
  • As a plugin developer, I should be able to specify all the validation errors while configuring the plugin for better user experience.
  • As a plugin developer, I should be able to use new validation error types without replacing data pipeline app artifacts. 

API Changes for Plugin Validation


Collect multiple errors from plugins 

To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. If there are any validation errors added to stage configurer, the pipeline deployment will fail and all the errors will be returned as a response to stage validation REST endpoint.

Code Block
languagejava
titleStageConfigurer.java
public interface StageConfigurer {

  ...

  /**
   * add validation errors for this stage to the configurer if pipeline stage is invalid. 
   *
   * @param error {@link ValidationError} when a pipeline stage is invalid for any reason.
   */
  void addValidationError(ValidationError error);
}

Decoupling plugin error types from data pipeline app

A new ValidationError class is introduced to collect multiple validation errors in stage configurer. This class will be exposed by data pipeline app to the plugins. Each new type of error can be added to hydrator-common which can be added as compile time dependency to the plugins. This approach allows us to easily add more types of validation errors for plugins while removing a need to update data pipeline artifact for each new error type. Below are some of the error types that will be added to hydrator-common module.

Code Block
languagejava
titleValidationError.java
/**
 * Represents error that occurred during validation.
 */
@Beta
public class ValidationError {
  private final String message;
  private final String type;

  /**
   * Creates an error with provided error message.
   * @param message error message
   */
  public ValidationError(String message) {
    this.message = message;
    this.type = getType();
  }

  /**
   * Returns the type of the error.
   */
  public String getType() {
    return "ERROR";
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    ValidationError error = (ValidationError) o;
    return message.equals(error.message) &&
      type.equals(error.type);
  }

  @Override
  public int hashCode() {
    return Objects.hash(message, type);
  }
}


Following errors will be added to hydrator-common. Note that this list will keep evolving as new types of errors are added to the module.


Code Block
languagejava
titleValidationErrorInvalidStageError.java
/**
 * Represents some sort of error that occurred during stage validation.
 */
@Beta
public class ValidationErrorInvalidStageError {
  private final String message;extends ValidationError {
  private final String typestage;

  /**
   * Creates anerror errorthat withrepresents providedstage error message.
   * @param
message error message    */ @param message publicerror ValidationError(String message)
{   * @param this.messagestage =name message;of the stage
  this.type = getType();
  } */
  public InvalidStageError(String publicmessage, String getType(stage) {
    return "ERROR"super(message);
  }  this.stage = @Overridestage;
  public}
boolean
equals(Object o) {@Override
  public String if (this == ogetType() {
 
    return true"STAGE_ERROR";
   }
}
Code Block
languagejava
titleInvalidStagePropertyError.java
/**
 * Represents invalid ifstage (oproperty ==error nullthat || getClass() != o.getClass()) {
      return false;
    }
    ValidationError error = (ValidationError) o;
    return message.equals(error.message) &&
      type.equals(error.type)occurred during stage validation.
 */
@Beta
public class InvalidStagePropertyError extends InvalidStageError {
  private final String property;

  public InvalidStagePropertyError(String message, String stage, String property) {
    super(message, stage);
    this.property = property;
  }

  @Override
  public intString hashCodegetType() {
    return Objects.hash(message, type)"INVALID_FIELD";
  }
}

Following errors will be added to hydrator-common. Note that this list will keep evolving as new types of errors are added to the module.


Code Block
languagejava
titleInvalidStageErrorInvalidSchemaFieldError.java
/**
 * Represents some sortRepresents ofschema errormismatch that occurred during stage validation.
 */
@Beta
public class InvalidStageErrorInvalidSchemaFieldError extends ValidationErrorInvalidStageError {
  private final String stagefield;
   protected InvalidStageError(String message, String stage) {
    super(message);
    this.stage = stage;
  }

  @Override
  public String getType() {
    return "STAGE_ERROR";
  }
}
Code Block
languagejava
titleInvalidStagePropertyError.java
/**
 * Represents invalid stage property error that occurred during stage validation.
 */
@Beta
public class InvalidStagePropertyError extends InvalidStageError {
  private final String property;

  public InvalidStagePropertyErrorprivate final boolean isInputField;

  /**
   * Creates error that represents schema mismatch
   * 
   * @param message error message
   * @param stage stage of the field
   * @param field invalid field
   * @param isInputField if true indicates that the field is from input schema, otherwise its from output schema
   */
  public InvalidSchemaFieldError(String message, String stage, String property field, boolean isInputField) {
    super(message, stage);
     super(message, stage)this.field = field;
    this.propertyisInputField = propertyisInputField;
  }

  @Override
  public String getType() {
    return "INVALID_FIELD";
  }
}
Code Block
languagejava
titleInvalidSchemaFieldError.java
/**
 * Represents schema mismatch that occurred during stage validation.
 */
@Beta
public class InvalidSchemaFieldException extends InvalidStageException {
  private static final String TYPE = "INVALID_SCHEMA";

  public InvalidSchemaFieldException(String message, String stage, String field) {
    super(message, stage);
    props.put("field", fieldSCHEMA";
  }
}

Usage of the API in Plugins

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    stageConfigurer.addValidationError(new InvalidStagePropertyException(e.getMessage(), "filterRegex"));
  }
   public InvalidSchemaFieldException(String message, Throwable cause, String stage, String field) {
    super(message, cause, stageif (conf.sourceFileset.equals(conf.destinationFileset)) {
    stageConfigurer.addValidationError(new ValidationException("source and destination filesets must be different"));
    props.put("field", field);
  }

  public String getType() {
    return TYPE;
  }
}

Usage of the API in Plugins

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    stageConfigurer.addValidationError(new InvalidStagePropertyException(e.getMessage(), "filterRegex"));
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    stageConfigurer.addValidationError(new ValidationException("source and destination filesets must be different"));
  }
}

Impact on UI

UI should be able to handle new error types that are introduced. For example, for invalid stage properties, UI should highlight all the invalid properties for a given stage. For schema mismatch, UI should be able to highlight schema fields that are mismatching. 

Below are the responses to the validation endpoint for each type of validation error:

TypeDescriptionJson ResponseSTAGE_ERRORRepresents validation error while configuring the stage

{
  "type" "STAGE_ERROR",
  "stage" "src",
  "message" : "source and destination filesets must be different"
}

INVALID_FIELDRepresents invalid configuration property
{
  "type" : "INVALID_FIELD", 
  "stage" : "src",
  "message" : "Invalid config for property 'port'", 
"property" : "port"
}
PLUGIN_NOT_FOUNDRepresents plugin not found error for a stage{
"errors": [
{
}
}

Impact on UI

UI should be able to handle new error types that are introduced. For example, for invalid stage properties, UI should highlight all the invalid properties for a given stage. For schema mismatch, UI should be able to highlight schema fields that are mismatching. 

Below are the responses to the validation endpoint for each type of validation error:

TypeDescriptionJson Response
STAGE_ERRORRepresents validation error while configuring the stage

{
  "type" "STAGE_ERROR",
  "stage" "src",
  "message" : "source and destination filesets must be different"
}

INVALID_FIELDRepresents invalid configuration property
{
  "type" : "INVALID_FIELD", 
  "stage" : "src",
  "message" : "Invalid config for property 'port'", 
"property" : "port"
}
PLUGIN_NOT_FOUNDRepresents plugin not found error for a stage
{
"errors": [
{
"stage": "src",
"type": "PLUGIN_NOT_FOUND",
"message": "Plugin named 'Mock' of type 'batchsource' not found.",
"pluginType": "batchsource",
"pluginName": "Mock",
"requestedArtifact": {
"scope": "USER",
"name": "app-mocks-ghost",
"version": "1.0.0"
},
"stagesuggestedArtifact": "src",{
"typescope": "PLUGIN_NOT_FOUNDUSER",
"messagename": "Plugin named 'Mock' of type 'batchsource' not found.app-mocks",
"pluginTypeversion": "batchsource",1.0.0"
}
}
"pluginName"]
}
INVALID_SCHEMARepresents invalid schema field
{
  "type" : "
Mock
INVALID_SCHEMA",

 
  "stage" : "
requestedArtifact": {
sink",
  "
scope
message" : "
USER",
Invalid schema for the field 'name'", 
"field"
"name": "app-mocks-ghost",
"version": "1.0.0"
},
"suggestedArtifact": {
"scope": "USER",
"name": "app-mocks",
"version": "1.0.0"
}
}
]
}
INVALID_SCHEMARepresents invalid schema field
{
  "type" : "INVALID_SCHEMA", 
  "stage" : "sink",
  "message" : "Invalid schema for the field 'name'", 
"field" : "name"
}

Future Work

  • Add more types of errors to hydrator-common
  • error codes
  • correction/suggestion along with message

Test Scenarios

Test IDTest DescriptionExpected Results

Releases

Release 6.1.0

Related Work

  • Work #1
  • Work #2
  • Work #3
Future work
: "name"
}




Future Work

  • Add more types of errors to hydrator-common
  • error codes
  • correction/suggestion along with message

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release 6.1.0

Related Work

  • Work #1
  • Work #2
  • Work #3

Future work

/**
* Represents some sort of error that occurred during stage validation.
*/
@Beta
public class InvalidStageError extends ValidationError {
private final String stage;

/**
* Creates error that represents a stage validation error.
*
* @param message error message
* @param stage name of the stage
*/
public InvalidStageError(String message, String stage) {
super(message);
this.stage = stage;
}

@Override
public String getType() {
return "STAGE_ERROR";
}
}