Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations or schema. For example, the BigQuery sink plugin can have output schema which does not match with underlying BigQuery table. CDAP pipeline developer can use new validation endpoint to validate the stages before deploying the pipeline. In order to fail fast and for better user experience, validation endpoint should return all the validation errors from a given stage when this endpoint is called. 

Data pipeline app exposes various error types for plugin validation. In future releases, new error types can be introduced. With current implementation, when plugins with new error types are pushed to hub, data pipeline artifacts need to be updated for every new type of error that is introduced. This is because the validation errors are defined in the data pipeline app itself. A better approach would be to modify data pipeline app so that app artifacts do not need to be replaced for every new type of error.

Goals

  • To fail fast and for better user experience, introduce a new api to collect multiple error messages from a stage at configure time

  • Decouple validation error types from data pipeline app
  • Instrument plugins to use this api to return multiple error messages for validation endpoint

User Stories 

  • As a CDAP pipeline developer, when I validate a stage, I expect that all the invalid config properties and input/output schema fields are highlighted on CDAP UI with appropriate error message.
  • As a plugin developer, I should be able to capture all the validation errors while configuring the plugin so that all the validation errors can be surfaced on CDAP UI.
  • As a plugin developer, I should be able to use new validation error types without replacing data pipeline app artifacts. 

API Changes for Plugin Validation

Collect Multiple errors from plugins

To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. If there are any validation errors added to stage configurer, the pipeline deployment will fail and all the errors will be returned as a response to stage validation REST endpoint.

Current implementation does not expose stage name to the plugin in configurePipeline method. Stage name will be needed by the plugins to create stage specific errors. For that stageName will be exposed to plugins through stage configurer as below.

Code Block
languagejava
titleStageConfigurer.java
public interface StageConfigurer {

  ...

/**
 * get the stage name.
 * @return stage name
 */
String getStageName();

/**
 * add validation errorsfailures.
 * @param e errorsfailures
 */
void addValidationErroraddValidationFailure(ValidationErrorValidationFailure e);

Decouple plugin error types from data pipeline app

Approach - 1

A new ValidationError To carry the error information, a new ValidationFailure class is introduced to collect multiple validation errors in stage configurer. This class will be exposed by data pipeline app to the plugins. Each new type of error can be added to hydrator-common which can be added as compile time dependency to the plugins. This approach allows us to easily add more types of validation errors for plugins while removing a need to update data pipeline artifact for each new error type.can be built using a ValidationFailureBuilder which only allows string properties. This defines clear contract between plugins and data pipeline app. 

Code Block
languagejava
titleValidationErrorValidationFailure.java
/**
 * Represents errorfailure that occurred during validation.
 */
@Beta
public class ValidationErrorValidationFailure {
  private final String message;
  private final String type;
  private /**final Map<String, Object> properties;
*
Creates an error with provided error message.
   * @param message error message
   */
  public ValidationError(String message) {ValidationFailure(String message, String type, Map<String, Object> map) {
    this.message = message;
     this.messagetype = messagetype;
    this.typeproperties = getType()map;
  }

  /**public static ValidationFailureBuilder builder() {
    *return Returns the type of the error.
   */
  public String getType()new DefaultValidationFailureBuilder();
  }
}
Code Block
languagejava
titleValidationFailureBuilder.java
/**
 * Validation Failure builder.
 */
@Beta
public interface ValidationFailureBuilder {

  /**
return "ERROR";  * }Sets failure message
 @Override  * public@param booleanmessage
equals(Object o) { */
  ValidationFailureBuilder if setMessage(this == o) {
 String message);

  /**
   * returnSets true;failure type
   }* @param type error type
if (o == null*/
|| getClass() != o.getClass()) {ValidationFailureBuilder setType(String type);

  /**
   return* false;Adds a property related to }ValidationFailure.
   * ValidationError@param errorpropertyName
= (ValidationError) o;
   * @param value
  return message.equals(error.message) && * @return
   */
  ValidationFailureBuilder type.equals(error.typeaddProperty(String propertyName, String value);

 } /**
  @Override * Builds publicvalidation intfailure.
hashCode() {  */
  returnValidationFailure Objects.hash(message, typebuild();
 
}
}

Introduced Errors

Below are some of the error types that will be added to hydrator-common module. Note that this list will keep evolving as new types of errors are added to the module.

InvalidStageError
Code Block
languagejava
title
Code Block
languagejava
titleDefaultValidationFailureBuilder.java
/**
 * RepresentsDefault errorvalidation that occurred during stage validationfailure builder.
 */
@Beta
public class InvalidStageErrorDefaultValidationFailureBuilder extendsimplements ValidationErrorValidationFailureBuilder {
  private final String stagemessage;
  private /**String type;
  *private Createsfinal errorMap<String, thatObject> representsproperties;
a
stage validation error.public DefaultValidationFailureBuilder() {
 *   this.properties *= @param message error message
new HashMap<>();
  }

 * @param@Override
stage name ofpublic the stage
   */
  public InvalidStageErrorValidationFailureBuilder setMessage(String message,) String{
stage) {   this.message = super(message);
    this.stage = stagereturn this;
  }

  @Override
  public ValidationFailureBuilder setType(String getType(type) {
    return "STAGE_ERROR"this.type = type;
    return this;
  }

  @Override
  public booleanValidationFailureBuilder equals(Object oaddProperty(String propertyName, String value) {
    if (this == o) {
properties.put(propertyName, value);
     return truethis;
    }

  @Override
if (o ==public null || getClass() != o.getClassValidationFailure build()) {
      return false;
    }
    if (!super.equals(o)) {
      return falsenew ValidationFailure(message, type, properties);
  }
 }
    InvalidStageError that = (InvalidStageError) o;
    return Objects.equals(stage, that.stage);
  }

  @Override
  public int hashCode() {
    return Objects.hash(super.hashCode(), stage);
  }
}
Code Block
languagejava
titleInvalidStagePropertyError.java
/**
 * Represents invalid stage property error that occurred during stage validation.
 */
@Beta
public class InvalidStagePropertyError extends InvalidStageError {
  private final String property;

  /**
   * Creates error that represents invalid stage property.
   *
   * @param message error message
   * @param stage name of the stage
   * @param property invalid property
   */
  public InvalidStagePropertyError(String message, String stage, String property) {
    super(message, stage);
    this.property = property;
  }

  @Override
  public String getType() {
    return "INVALID_FIELD";
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    if (!super.equals(o)) {
      return false;
    }
    InvalidStagePropertyError that = (InvalidStagePropertyError) o;
    return property.equals(that.property);
  }

  @Override
  public int hashCode() {
    return Objects.hash(super.hashCode(), property);
  }
}
Code Block
languagejava
titleInvalidInputSchemaFieldError.java
/**
 * Represents invalid schema field in input schema.
 */
public class InvalidInputSchemaFieldError extends InvalidStageError {
  private final String field;

  /**
   * Creates error that represents invalid schema field in input schema.
   * @param message error message
   * @param stage name of the stage
   * @param field field that is invalid in input schema
   */
  public InvalidInputSchemaFieldError(String message, String stage, String field) {
    super(message, stage);
    this.field = field;
  }

  @Override
  public String getType() {
    return "INVALID_INPUT_SCHEMA";
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    if (!super.equals(o)) {
      return false;
    }
    InvalidInputSchemaFieldError that = (InvalidInputSchemaFieldError) o;
    return field.equals(that.field);
  }

  @Override
  public int hashCode() {
    return Objects.hash(super.hashCode(), field);
  }
}
Code Block
languagejava
titleInvalidOutputSchemaFieldError.java
/**
 * Represents invalid schema field in output schema.
 */
public class InvalidOutputSchemaFieldError extends InvalidStageError {
  private final String field;

  /**
   * Creates error that represents invalid schema field in output schema.
   * @param message error message
   * @param stage name of the stage
   * @param field field that is invalid in output schema
   */
  public InvalidOutputSchemaFieldError(String message, String stage, String field) {
    super(message, stage);
    this.field = field;
  }

  @Override
  public String getType() {
    return "INVALID_OUTPUT_SCHEMA";
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    if (!super.equals(o)) {
      return false;
    }
    InvalidOutputSchemaFieldError that = (InvalidOutputSchemaFieldError) o}

Below is the helper method exposed through cdap-etl-api that can help build different types of errors.

Code Block
/**
 * Helper class to build error messages.
 */
public final class ValidationFailures {

  /**
   * Builds stage validation failure.
   * @param message failure message
   * @param type failure type
   * @param stage stage for which failure happened
   * @return validation failure
   */
  public static ValidationFailure createStageValidationError(String message, String stage) {
    ValidationErrorBuilder builder = ValidationError.builder();
    builder.setMessage(message);
    builder.setType("STAGE_ERROR");
    builder.addProperty("stage", stage);
    return fieldbuilder.equalsbuild(that.field);
  }

  private ValidationFailures @Override{
  public int hashCode() {// no-op
  }
 return Objects..hash(super.hashCode(),
field);
  }
}


API usage in plugins

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  // get the name of the stage 
  String stageName = stageConfigurer.getStageName();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    // add validation error to stage configurer
    ValidationBuilder
    stageConfigurer.addValidationError(new InvalidStagePropertyErroraddValidationFailure(ValidationFailures.createFieldValidationFailure(e.getMessage(), stageName, "filterRegex"));
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    // add validation error to stage configurer
    stageConfigurer.addValidationError(new InvalidStageErroraddValidationFailure(ValidationFailures.createStageValidationFailure("source and destination filesets must be different", stageName));
  }
}


Approach - 2

Validation error represents an error with various causes with different attributes for each causercause. For example, when the input schema field type does not match the underlying sink schema, the cause is input field mismatch with attributes such as stage name, field name, suggested type etc. Each error message can be associated to more than one causes. This can happen for plugins such as joiner and splitter where there are multiple input or output schemas from a given stage. For example, when input schemas for joiner are not compatible, the causes will include mismatching fields from input schemas of incoming stages. This means that a validation error can be represented as a list of causes where each cause is a map of cause attribute to its value as shown below.

Code Block
languagejava
titleValidationError.java
/**
 * Represents error that occurred during validation.
 */
@Beta
public class ValidationError {
  private final String message;
  protected final List<Map<String, Object>> causes;

  /**
   * Creates a validation error with a message and empty map of causes
   * @param message
   */
  public ValidationError(String message) {
    this.message = message;	
    this.causes = new ArrayList<>();
  }
 
  @Override
   public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    ValidationError that = (ValidationError) o;
    return message.equals(that.message) && causes.equals(that.causes);
  }

  @Override
  public int hashCode() {
    return Objects.hash(message, causes);
  }
  
}

All the attributes of a cause can be tracked at central location as below: 

Code Block
languagejava
titleErrorAttributes.java
/**
 * Cause attributes.
 */
public enum CauseAttributes {
  STAGE("stage"), // represents stage being validated
  PROPERTY("property"), // represents stage property
  INPUT_FIELD("input_field") // represents field in the input schema
  OUTPUT_FIELD("output_field") // represents field in the output schema
  OUTPUT_PORT("output_port"), // represents output port for plugins such as SplitterTransform where multiple output schemas are expected
  INPUT_STAGE("input_stage"), // represents input stage for plugins such as Joiner where multiple input schemas are expected
  ..

  private String name;

  ErrorAttributes(String name) {
    this.name = name;
  }
}


Introduced Errors

With this approach following error classes can be added to hydrator-common which represents specific type of errors.

Code Block
languagejava
titleInvalidStageError.java
/**
 * Represents error that occurred during stage validation.
 */
@Beta
public class InvalidStageError extends ValidationError {
  /**
   * Creates validation error that occurred during stage validation.
   * @param message error message
   * @param stage name of the stage that caused this validation error
   */
  public InvalidStageError(String message, String stage) {
    super(message);
    causes.add(Collections.singletonMap("stage", stage));
  }
}
Code Block
languagejava
titleInvalidStagePropertyError.java
/**
 * Represents error that occurred during stage config property validation.
 */
@Beta
public class InvalidStagePropertyError extends ValidationError {
 /**
   * Creates validation error that occurred during stage validation.
   * @param message error message
   * @param stage name of the stage that caused this validation error
   * @param property property that is invalid
   */
  public InvalidStageError(String message, String stage, String property) {
    super(message);
    Map<String, Object> map = new HashMap<>();
    map.put("stage", stage);
    map.put("property", property);
    causes.add(map);
  }


 /**
   * Creates validation error that occurred during stage validation.
   * @param message error message
   * @param stage name of the stage that caused this validation error
   * @param properties properties that is caused this error
   */
  public InvalidStageError(String message, String stage, String[] properties) {
    super(message);
    Map<String, Object> map = new HashMap<>();
    for (String property : properties) {
        map.put("stage", stage);
        map.put("property", property);
    }
    causes.add(map);
  }
}
Code Block
/**
 * Represents invalid input schema error. 
 */
public class InvalidInputSchemaError extends NewValidationError {

  /**
   * Creates invalid input schema error.
   * @param message error message
   * @param stage name of the stage 
   * @param map map of incoming stage name to field that is invalid.
   */
  public InvalidInputSchemaError(String message, String stage, Map<String, String> map) {
    super(message);
    for (Map.Entry<String, String> entry : map.entrySet()) {
      Map<String, Object> causeMap = new HashMap<>();
      causeMap.put("stage", stage);
      causeMap.put("input_stage", entry.getKey());
      causeMap.put("input_field", entry.getValue());
      causes.add(causeMap);
    }
  }
}
Code Block
languagejava
titleInvalidOutputSchemaError.java
/**
 * Represents invalid output schema error.
 */
public class InvalidOutputSchemaError extends NewValidationError {

  /**
   * Creates invalid output schema error.
   * @param message error message
   * @param stage name of the stage
   * @param map map of output going port name to field that is invalid
   */
  public InvalidOutputSchemaError(String message, String stage, Map<String, String> map) {
    super(message);
    for (Map.Entry<String, String> entry : map.entrySet()) {
      Map<String, Object> causeMap = new HashMap<>();
      causeMap.put("stage", stage);
      causeMap.put("output_port", entry.getKey());
      causeMap.put("output_field", entry.getValue());
      causes.add(causeMap);
    }
  }
}

API usage in plugins

The above api


API usage in the plugins is same as approach-1.

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  // get the name of the stage 
  String stageName = stageConfigurer.getStageName();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    // add validation error to stage configurer
    stageConfigurer.addValidationError(new InvalidStagePropertyError(e.getMessage(), stageName, "filterRegex"));
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    // add validation error to stage configurer
    stageConfigurer.addValidationError(new InvalidStageError("source and destination filesets must be different", stageName));
  }
}

Impact on UI

TypeDescriptionScenarioApproach - 1 - Json ResponseApproach - 2 - Json Response
STAGE_ERRORRepresents validation error while configuring the stageIf there is any error while connecting to sink while getting actual schema
{
"errors": [
{
      "type" : "STAGE_ERROR", 
      "stage" : "src",
      "message" : "Could not load jdbc driver."
    }
]
}
{
"errors": [
{
"message": "Could not load jdbc driver.",
      "causes": [
{
"stage": "src"
}
]
}
]
}
INVALID_PROPERTYRepresents invalid configuration propertyIf config property value contains characters that are not allowed by underlying source or sink
{
"errors": [
{
"type" : "INVALID_PROPERTY",
"stage" : "projection",
"message" : "Can not specify both drop and keep. One should be empty or null",
"property" : "drop"
},
{
"type" : "INVALID_PROPERTY",
"stage" : "projection",
"message" : "Can not specify both drop and keep. One should be empty or null",
"property" : "keep"
}
]
}
{
"errors": [
{
"message": "Can not specify both drop and keep. One should be empty or null",
"causes": [
{
"stage": "projection",
"property": "keep"
},
{
"stage" : "projection",
"property" : "drop"
}
]
}
]
}
PLUGIN_NOT_FOUNDRepresents plugin not found error for a stageIf the plugin was not found. This error will be thrown from the data pipeline app
{
"errors": [
{
"stage": "src",
"type": "PLUGIN_NOT_FOUND",
"message": "Plugin named 'Mock' of type 'batchsource' not found.",
"pluginType": "batchsource",
"pluginName": "Mock",
"requestedArtifact": {
"scope": "USER",
"name": "app-mocks-ghost",
"version": "1.0.0"
}
}
]
}
{
"errors": [
{
"message": "Plugin named 'Mock' of type 'batchsource' not found.",
"causes": [
{
"stage": "src",
"pluginType": "batchsource",
"pluginName": "Mock",
"requestedArtifact": {
"scope": "USER",
"name": "app-mocks-ghost",
"version": "1.0.0"
}
}
]
}
]
}
INVALID_INPUT_SCHEMARepresents invalid schema field in input schemaIf the input schemas for joiner plugin is of different types
{
"errors": [
{
      "type" : "INVALID_INPUT_SCHEMA", 
      "stage" : "joiner",
      "message" : "Invalid schema field 'id'. Different types of join keys found in source1 and source2.", 
      "field" : "id"
    }
]
}
{

"errors": [
{
"message": "Invalid schema field 'id'. Different types of join keys found.",
"causes": [
{
"stage": "joiner",
"input_stage": "source1",
"input_field": "id"
},
{
"stage": "joiner",
"input_stage": "source2",
"input_field": "id"
}
]
}
]
}
INVALID_OUTPUT_SCHEMARepresents invalid schema field in output schemaIf the output schema for the plugin is not compatible with underlying sink
{
"errors": [
{
      "type" : "INVALID_OUTPUT_SCHEMA", 
      "stage" : "splitter",
      "message" : "Invalid  schema field 'email'. It should be of type 'string' at output port 'port'", 
"field" : "email"
    }
]
}

{

  "errors": [
{
"message": "Invalid schema field 'email'. It should be of type 'string'",
"causes": [
{
"stage": "splitter",
"output_port": "port",
"output_field": "email"
}
]
}
]
}

Conclusion

Approach 2 does not use type to identify the error while Approach 1 uses type parameter to distinguish every exception. This means that UI should be aware of each type of error backend exposes in order to render it. However, with Approach 2 UI just needs to know how each parameters should be rendered regardless of type of error. Hence, Approach 2 is suggested.


Related Jira

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-15578

Related Work

Releases

Release 6.1.0