Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations or schema. For example, the BigQuery sink plugin can have output schema which does not match with underlying BigQuery table. CDAP pipeline developer can use new validation endpoint to validate the stages before deploying the pipeline. In order to fail fast and for better user experience, validation endpoint should return all the validation errors from a given stage when this endpoint is called. 

Data pipeline app exposes various error types for plugin validation. In future releases, new error types can be introduced. With current implementation, when plugins with new error types are pushed to hub, data pipeline artifacts need to be updated for every new type of error that is introduced. This is because the validation errors are defined in the data pipeline app itself. A better approach would be to modify data pipeline app so that app artifacts do not need to be replaced for every new type of error.

Goals

  • To fail fast and for better user experience, introduce a new api to collect multiple validation error messages from a stage at configure time

  • Decouple validation error types from data pipeline app
  • Instrument plugins to use this api to return multiple error messages for validation endpoint

User Stories 

  • As a CDAP pipeline developer, when I validate a stage, I expect that all the invalid config properties and input/output schema fields are highlighted on CDAP UI with appropriate error message and corrective action.
  • As a plugin developer, I should be able to capture all the validation errors while configuring the plugin so that all the validation errors can be surfaced on CDAP UI.
  • As a plugin developer, I should be able to use new validation error types without replacing data pipeline app artifacts. 

API Changes for Plugin Validation

Collect Multiple errors from plugins

To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. Current implementation does not expose stage name to the plugin in configurePipeline method. Stage name will be needed by the plugins to create stage specific errors. For that, stage name will be exposed to plugins through stage configurer as below.

Code Block
languagejava
titleStageConfigurer.java
public interface StageConfigurer {

  ...

/**
 * Get the stage name.
 * 
 * @return stage name
 */
String getStageName();


/**
 * Adds a new validation failure to the configurer.
 *
 * @param failure a validation failure
 */
void addValidationFailure(ValidationFailure failure);

/**
 * Throws validation exception if there are any failures that are added to the configurer through
 * addValidationFailure method.
 *
 * @throws ValidationException if there are any validation failures being carried by the configurer
 */
void throwIfFailure() throws ValidationException;

Decouple plugin error types from data pipeline app

Approach - 1

To carry error information, a new ValidationFailure class is introduced to collect multiple validation failures in stage configurer. This class can be built using a ValidationFailureBuilder which only allows string properties. The builder expose methods to get message, type and properties of a failure. The validation failures are collected using ValidationException. Using this validation exception whenever plugin has an invalid property that is tied to another invalid property, plugin can throw a validation exception with all the errors collected so far. This keep plugin validation code much simpler. 

Code Block
languagejava
titleValidationFailure.java
/**
 * Represents an error condition occurred during validation.
 */
@Beta
public class ValidationFailure {
  // types of the failures
  private static final String STAGE_ERROR = "StageError";
  private static final String INVALID_PROPERTY = "InvalidProperty";
  private static final String PLUGIN_NOT_FOUND = "PluginNotFound";
  private static final String INVALID_INPUT_SCHEMA = "InvalidInputSchema";
  private static final String INVALID_OUTPUT_SCHEMA = "InvalidOutputSchema";

  // represents stage name in the failure. It is a generic property used in all the failures for a given stage
  private static final String STAGE = "stage";

  // represents configuration property in InvalidProperty failure
  private static final String CONFIG_PROPERTY = "configProperty";

  // represents plugin id in PluginNotFound failure
  private static final String PLUGIN_ID = "pluginId";
  // represents plugin type in PluginNotFound failure
  private static final String PLUGIN_TYPE = "pluginType";
  // represents plugin name in PluginNotFound failure
  private static final String PLUGIN_NAME = "pluginName";

  // represents a field in InvalidInputSchema or InvalidOutputSchema failure
  private static final String FIELD = "field";
  // represents input stage in InvalidInputSchema failure
  private static final String INPUT_STAGE = "inputStage";
  // represents output port in InvalidOutputSchema failure
  private static final String OUTPUT_PORT = "outputPort";

  private final String message;
  private final String type;
  private final String correctiveAction;
  private final Map<String, Object> properties;

  private ValidationFailure(String message, String type, @Nullable String correctiveAction,
                            Map<String, Object> properties) {
    this.message = message;
    this.type = type;
    this.correctiveAction = correctiveAction;
    this.properties = properties;
  }

  /**
   * Creates a stage validation failure.
   *
   * @param message validation failure message
   * @param stage stage name
   * @param correctiveAction corrective action
   */
  public static ValidationFailure createStageFailure(String message, String stage, @Nullable String correctiveAction) {
    Builder builder = builder(message, STAGE_ERROR);
    builder.setCorrectiveAction(correctiveAction).addProperty(STAGE, stage);
    return builder.build();
  }

  /**
   * Creates a config property validation failure.
   *
   * @param message validation failure message
   * @param stage stage name
   * @param correctiveAction corrective action
   */
  public static ValidationFailure createConfigPropertyFailure(String message, String stage,
                                                              String property, @Nullable String correctiveAction) {
    Builder builder = builder(message, INVALID_PROPERTY);
    builder.setCorrectiveAction(correctiveAction)
      .addProperty(STAGE, stage).addProperty(CONFIG_PROPERTY, property);
    return builder.build();
  }

  /**
   * Creates a plugin not found validation failure.
   *
   * @param message validation failure message
   * @param stage stage name
   * @param correctiveAction corrective action
   */
  public static ValidationFailure createPluginNotFoundFailure(String message, String stage,
                                                              String pluginId, String pluginName, String pluginType,
                                                              @Nullable String correctiveAction) {
    Builder builder = builder(message, PLUGIN_NOT_FOUND);
    builder.setCorrectiveAction(correctiveAction)
      .addProperty(STAGE, stage).addProperty(PLUGIN_ID, pluginId)
      .addProperty(PLUGIN_TYPE, pluginType)
      .addProperty(PLUGIN_NAME, pluginName);
    return builder.build();
  }

  /**
   * Creates a invalid input schema failure.
   *
   * @param message validation failure message
   * @param stage stage name
   * @param field input schema field
   * @param inputStage optional input stagename. This is applicable to plugins of type {@link Joiner}.
   * @param correctiveAction optional corrective action
   * @return invalid input schema validation failure
   */
  public static ValidationFailure createInputSchemaFailure(String message, String stage, String field,
                                                           @Nullable String inputStage,
                                                           @Nullable String correctiveAction) {
    ...
  }

  /**
   * Creates a invalid output schema failure.
   *
   * @param message validation failure message
   * @param stage stage name
   * @param field output schema field
   * @param outputPort optional output port. This is applicable to plugins of type {@link SplitterTransform}.
   * @param correctiveAction optional corrective action
   * @return invalid output schema validation failure
   */
  public static ValidationFailure createOutputSchemaFailure(String message, String stage, String field,
                                                            @Nullable String outputPort,
                                                            @Nullable String correctiveAction) {
   ....
  }

  /**
   * Returns a builder for creating a {@link ValidationFailure}.
   */
  public static Builder builder(String message, String type) {
    return new Builder(message, type);
  }

  /**
   * A builder to create {@link ValidationFailure} instance.
   */
  public static class Builder {
    private final String message;
    private final String type;
    private String correctiveAction;
    private final Map<String, Object> properties;

    private Builder(String message, String type) {
      this.message = message;
      this.type = type;
      this.properties = new HashMap<>();
    }

    /**
     * Sets corrective action to rectify the failure.
     *
     * @param correctiveAction corrective action
     * @return this builder
     */
    public Builder setCorrectiveAction(String correctiveAction) {
      this.correctiveAction = correctiveAction;
      return this;
    }

    /**
     * Adds a property to the failure.
     *
     * @param property the name of the property
     * @param value the value of the property
     * @return this builder
     */
    public Builder addProperty(String property, String value) {
      this.properties.put(property, value);
      return this;
    }

    /**
     * Creates a new instance of {@link ValidationFailure}.
     *
     * @return instance of {@link ValidationFailure}
     */
    public ValidationFailure build() {
      return new ValidationFailure(message, type, correctiveAction,
                                   Collections.unmodifiableMap(new HashMap<>(properties)));
    }
  }

  /**
   * Returns message of this failure.
   */
  public String getMessage() {
    return message;
  }

  /**
   * Returns type of this failure.
   */
  public String getType() {
    return type;
  }

  /**
   * Returns corrective action for this failure.
   */
  @Nullable
  public String getCorrectiveAction() {
    return correctiveAction;
  }

  /**
   * Returns properties of this failure.
   */
  public Map<String, Object> getProperties() {
    return properties;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    ValidationFailure that = (ValidationFailure) o;
    return message.equals(that.message) && type.equals(that.type) &&
      Objects.equals(correctiveAction, that.correctiveAction) &&
      properties.equals(that.properties);
  }

  @Override
  public int hashCode() {
    return Objects.hash(message, type, correctiveAction, properties);
  }

  @Override
  public String toString() {
    return "ValidationFailure{" +
      "message='" + message + '\'' +
      ", type='" + type + '\'' +
      ", correctiveAction='" + correctiveAction + '\'' +
      ", properties=" + properties +
      '}';
  }
}
Code Block
languagejava
titleValidationException.java
/**
 * Validation exception that carries multiple validation failures.
 */
@Beta
public class ValidationException extends RuntimeException {
  private List<ValidationFailure> failures;

  public ValidationException(List<ValidationFailure> failures) {
    super(failures.isEmpty() ? "Validation Exception occurred." : failures.iterator().next().getMessage());
    this.failures = failures;
  }

  /**
   * Returns list of failures.
   */
  public List<ValidationFailure> getFailures() {
    return failures;
  }
}


API usage in plugins

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  // get the name of the stage 
  String stageName = stageConfigurer.getStageName();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    // add validation failure to stage configurer
    stageConfigurer.addValidationFailure(ValidationFailure.createConfigPropertyFailure(e.getMessage(), stageName, "filterRegex", "Make sure the file regex is correct"));
    // plugin can choose to terminate processing here
    stageConfigurer.throwIfFailure();
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    // add validation failure to stage configurer
    stageConfigurer.addValidationFailure(ValidationFailure.createStageFailure("source and destination filesets must be different", stageName, "Provide different source and destination filesets"));
  }
}


Approach - 2

Validation error represents an error with various causes with different attributes for each cause. For example, when the input schema field type does not match the underlying sink schema, the cause is input field mismatch with attributes such as stage name, field name, suggested type etc. Each error message can be associated to more than one causes. This can happen for plugins such as joiner and splitter where there are multiple input or output schemas from a given stage. For example, when input schemas for joiner are not compatible, the causes will include mismatching fields from input schemas of incoming stages. This means that a validation error can be represented as a list of causes where each cause is a map of cause attribute to its value as shown below.

Code Block
languagejava
titleValidationFailure.java
/**
 * Represents failure that occurred during validation.
 */
@Beta
public class ValidationFailure {
  private final String message;
  protected final List<Map<String, Object>> causes;

  /**
   * Createspublic interface StageConfigurer {

  ...

/**
 * Add validation failure to this configurer.
 *
 * @param message failure message
 * @param correctiveAction corrective action
 * @return a validation failure
with a*/
messageValidationFailure and empty map of causes
 addFailure(String message, @Nullable String correctiveAction);

/**
 * @paramThrows messagevalidation exception if there */are any failures publicthat ValidationFailure(String message) {
    this.message = message;	
    this.causes = new ArrayList<>();
  }
 
  @Override
   public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }
    ValidationFailure that = (ValidationFailure) o;
    return message.equals(that.message) && causes.equals(that.causes);
  }

  @Override
  public int hashCodeare added to the configurer through
 * {@link #addFailure(String, String)}.
 *
 * @throws ValidationException if there are any validation failures
 */
void throwIfFailure() throws ValidationException;

Code Block
languagejava
titleValidationException.java
/**
 * Validation exception that carries multiple validation failures.
 */
@Beta
public class ValidationException extends RuntimeException {
  private List<ValidationFailure> failures;

  /**
   * Creates a validation exception with list of failures.
   *
   * @param failures list of validation failures
   */
  public ValidationException(List<ValidationFailure> failures) {
    super(failures.isEmpty() ? "Validation Exception occurred." : failures.iterator().next().getMessage());
    this.failures = failures;
  }

  /**
   * Returns a list of validation failures.
   */
  public List<ValidationFailure> getFailures() {
    return Objects.hash(message, causes)failures;
  }
 

}

All the attributes of a cause can be tracked at central location as below: 

Code Block
languagejava
titleFailureAttributesValidationFailure.java
/**
 * Failure attributesRepresents a failure condition occurred during validation.
 */
@Beta
public enumclass FailureAttributesValidationFailure {
  STAGE("stage"), // represents stage being validated
  PROPERTY("property"), // represents stage property
  INPUT_FIELD("inputField") // represents field in the input schema
  OUTPUT_FIELD("outputField") // represents field in the output schema
  OUTPUT_PORT("outputPort"), // represents output port for plugins such as SplitterTransform where multiple output schemas are expected
  INPUT_STAGE("inputStage"), // represents input stage for plugins such as Joiner where multiple input schemas are expected
  ..

  private String name;

  FailureAttributes(String name) {
    this.name = name;
  }
}

Introduced Errors

With this approach following error classes can be added to hydrator-common which represents specific type of errors.

Code Block
languagejava
titleInvalidStageFailure.java
/**
 * Represents failure that occurred during stage validation.
 */
@Beta
public class InvalidStageFailure extends ValidationFailure {
  /**
   * Creates validation failure that occurred during stage validation.
   * @param message failure message
   * @param stage name of the stage that caused this validation failure
   */
  public InvalidStageFailure(String message, String stage) {
    super(message);
    causes.add(Collections.singletonMap("stage", stage));
  }
}
Code Block
languagejava
titleInvalidStagePropertyFailure.java
/**
 * Represents failure that occurred during stage config property validation.
 */
@Beta
public class InvalidStagePropertyFailure extends ValidationFailure {
 /**
   * Creates validation failure that occurred during stage validation.
   * @param message failure message
   * @param stage name of the stage that caused this validation failure
   * @param property property that is invalid
   */
  public InvalidStageFailure(String message, String stage, String property) {
    super(message);
    Map<String, Object> map = new HashMap<>();
    map.put("stage", stage);
    map.put("property", property);
    causes.add(map);
  }


 /**
   * Creates validation failure that occurred during stage validation.
   * @param message failure message
   * @param stage name of the stage that caused this validation failure
   * @param properties properties that is caused this failure
   */
  public InvalidStageFailure(String message, String stage, String[] properties) {
    super(message);
    Map<String, Object> map = new HashMap<>();
    for (String property : properties) {
        map.put("stage", stage);
        map.put("property", property);
    }
    causes.add(map);
  }
}
Code Block
languagejava
titleInvalidInputSchemaFailure.java
/**
 * Represents invalid input schema failure. 
 */
public class InvalidInputSchemaFailure extends ValidationFailure {private final String message;
  private final String correctiveAction;
  private final List<Cause> causes;

  /**
   * Creates a validation failure with provided message and corrective action.
   *
   * @param message validation failure message
   * @param correctiveAction corrective action
   */
  public ValidationFailure(String message, String correctiveAction) {
    this.message = message;
    this.correctiveAction = correctiveAction;
    this.causes = new ArrayList<>();
  }

  /**
   * Adds provided cause to this validation failure.
   *
   * @param cause cause of validation failure
   * @return validation failure with provided cause
   */
  public ValidationFailure withCause(Cause cause) {
    causes.add(cause);
    return this;
  }

  /**
   * Adds cause attributes that represents plugin not found failure cause.
   *
   * @param pluginId plugin id
   * @param pluginName plugin name
   * @param pluginType plugin type
   * @return validation failure with plugin not found cause
   */
  public ValidationFailure withPluginNotFoundCause(String pluginId, String pluginName, String pluginType) {
    causes.add(new Cause().with(CauseAttributes.PLUGIN_ID, pluginId).with(CauseAttributes.PLUGIN_NAME, pluginName)
                 .with(CauseAttributes.PLUGIN_TYPE, pluginType));
    return this;
  }

  /**
   * Adds cause attributes that represents plugin configure failure cause.
   *
   * @param pluginConfig plugin config property
   * @return validation failure with invalid plugin config property cause
   */
  public ValidationFailure withPluginConfigCause(String pluginConfig) {
    causes.add(new Cause().with(CauseAttributes.PLUGIN_CONFIG, pluginConfig));
    return this;
  }

  /**
   * Adds cause attributes that represents invalid input schema field failure cause.
   *
   * @param fieldName name of the input schema field
   * @param inputStage stage name
   * @return validation failure with invalid input schema field cause
   */
  public ValidationFailure withInvalidInputSchemaCause(String fieldName, @Nullable String inputStage) {
    causes.add(new Cause().with(CauseAttributes.INPUT_STAGE, inputStage)
                 .with(CauseAttributes.INPUT_SCHEMA_FIELD, fieldName));
    return this;
  }

  /**
   * Creates Adds cause attributes that represents invalid inputoutput schema field failure cause.
   *
@param message failure message* @param fieldName  * @param stage name of the stageoutput schema field
   * @param outputPort mapstage mapname
of incoming stage name* to@return fieldvalidation thatfailure iswith invalid. output schema field cause
   */
  public ValidationFailure InvalidInputSchemaFailurewithInvalidOutputSchemaCause(String messagefieldName, @Nullable String stage, Map<String, String> mapoutputPort) {
    super(message);
    for (Map.Entry<String, String> entry : map.entrySet()) { causes.add(new Cause().with(CauseAttributes.OUTPUT_PORT, outputPort)
           Map<String, Object> causeMap = new HashMap<>( .with(CauseAttributes.OUTPUT_SCHEMA_FIELD, fieldName));
    return  causeMap.put("stage", stage);this;
  }

  /**
   * Returns failure causeMap.put("inputStage", entry.getKey());message.
   */
   causeMap.put("inputField", entry.getValue());
public String getMessage() {
     causes.add(causeMap)return message;
    }

 } }
Code Block
languagejava
titleInvalidOutputSchemaFailure.java
/**
   * RepresentsReturns corrective invalidaction outputfor schemathis failure.
   */
  @Nullable
  public class InvalidOutputSchemaFailure extends ValidationFailure {String getCorrectiveAction() {
    return correctiveAction;
  }

  /**
   * CreatesReturns causes invalidthat outputcaused schemathis failure.
   */
@param message failurepublic messageList<Cause> getCauses() {
 * @param stage namereturn ofcauses;
the stage }

 * @param@Override
map map ofpublic outputboolean going port name to field that is invalid
   */
  public InvalidOutputSchemaFailure(String message, String stage, Map<String, String> map) {
    super(message);
    for (Map.Entry<String, String> entry : map.entrySetequals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return Map<String,false;
 Object> causeMap = new}
HashMap<>();    ValidationFailure failure = causeMap.put("stage", stage)(ValidationFailure) o;
    return  causeMapmessage.put("outputPort", entry.getKey());
equals(failure.message) && 
     causeMap Objects.putequals("outputField"correctiveAction, entryfailure.getValue(correctiveAction));
       && causes.addequals(causeMapfailure.causes);
  }

}  @Override
} }

API usage in plugins

Code Block
@Override public voidint configurePipelinehashCode(PipelineConfigurer pipelineConfigurer) {
   pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  // get the name of the stage 
  String stageName = stageConfigurer.getStageName();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) { return Objects.hash(message, correctiveAction, causes);
  }

  /**
   * Represents a cause of a failure.
   */
  @Beta
  public static class Cause {
    private final Map<String, String> attributes;

    /**
// add validation error to stage* configurerCreates a failure cause.
 stageConfigurer.addValidationFailure(new InvalidStagePropertyFailure(e.getMessage(), stageName, "filterRegex"));
 */
 }   ifpublic (conf.sourceFileset.equals(conf.destinationFileset))Cause() {
    // add validation error to stage configurer
    stageConfigurer.addValidationFailure(new InvalidStageFailure("source and destination filesets must be different", stageName));
  }
}

Impact on UI

TypeDescriptionScenarioApproach - 1 - Json ResponseApproach - 2 - Json ResponseStageErrorRepresents validation error while configuring the stageIf there is any error while connecting to sink while getting actual schema
{
"failures": [
{
"type": "StageError",
"message": "Could not load jdbc driver.",
"correctiveAction": "Make sure correct driver is uploaded.",
"properties": {
"stage": "src"
}
}
]
}
{
"errors": [
{
"message": "Could not load jdbc driver.",
"correctiveAction" : "Make sure correct driver is uploaded.",
"causes": [
this.attributes = new HashMap<>();
    }

    /**
     * Adds attributes to this cause.
     *
     * @param attribute cause attribute name
     * @param value cause attribute value
     * @return this cause
     */
    public Cause with(String attribute, String value) {
      attributes.put(attribute, value);
      return this;
    }

    /**
     * Returns cause attributes.
     */
    public Map<String, String> getAttributes() {
      return 
{
attributes;
    }

   
"stage": "src"
 @Override
    public boolean 
}
equals(Object o) {
    
]
  if (this 
}
== 
]
}InvalidPropertyRepresents invalid configuration propertyIf config property value contains characters that are not allowed by underlying source or sink{
"failures": [
o) {
       
{
 return true;
   
"type":
 
"InvalidProperty",
  }
   
"message":
 
"Can
 
not
 
specify
if 
both
(o 
drop
== 
and keep.",
"correctiveAction": "Either drop or keep should be empty",
null || getClass() != o.getClass()) {
        return false;
    
"properties":
 
{
 }
      
"stage": "projection",
Cause cause = (Cause) o;
    
"configProperty":
 
"drop"
 
}
return attributes.equals(cause.attributes);
    }
,


  
{
  @Override
   
"type": "InvalidProperty",
 public int hashCode() {
  
"message":
 
"Can
 
not
 
specify
 
both drop and keep.",
return Objects.hash(attributes);
    }
  
"correctiveAction": "Either drop or keep should be empty",
"properties": {
"stage": "projection",
"configProperty": "keep"
}
}
]
}
{
"errors": [
{
"message": "Can not specify both drop and keep",
      "correctiveAction" : "Either drop or keep should be empty",
"causes": [
{
"stage": "projection",
"property": "keep"
},
{
"stage" : "projection",
"property" : "drop"
}
]
}
]
}
PluginNotFoundRepresents plugin not found error for a stage. This error will be added by the data pipeline appIf the plugin was not found. This error will be thrown from the data pipeline app
{
"failures": [
{
"type": "PluginNotFound",
"message": "Plugin named 'Mock' of type 'batchsource' not found.",
"correctiveAction": "Please make sure the 'Mock' plugin is installed.",
"properties": {
"stage": "src",
"pluginType": "batchsource",
"pluginName": "Mock",
"pluginId": "Mock"
}
}
]
}
{
"errors": [
{
"message": "Plugin named 'Mock' of type 'batchsource' not found.",
"correctiveAction" : "Please make sure the 'Mock' plugin is installed.",
"causes
}
}


All the attributes of a cause can be tracked at central location as below: 

Code Block
languagejava
titleCauseAttributes.java
/**
 * Cause attributes constants.
 */
@Beta
public final class CauseAttributes {
  // represents plugin configuration property failure
  public static final String PLUGIN_CONFIG = "pluginConfig";
  // represents id of the plugin
  public static final String PLUGIN_ID = "pluginId";
  // represents type of the plugin
  public static final String PLUGIN_TYPE = "pluginType";
  // represents name of the plugin
  public static final String PLUGIN_NAME = "pluginName";
  // represents input stage
  public static final String INPUT_STAGE = "inputStage";
  // represents field of input stage schema
  public static final String INPUT_SCHEMA_FIELD = "inputField";
  // represents a port of stage output
  public static final String OUTPUT_PORT = "outputPort";
  // represents a field of output port schema
  public static final String OUTPUT_SCHEMA_FIELD = "outputField";

  private CauseAttributes() {
    // no-op
  }
}


API usage in plugins

Code Block
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
  StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
  try {
    Pattern.compile(conf.filterRegex);
  } catch (Exception e) {  
    // add validation error to stage configurer
    stageConfigurer.addFailure(new ValidationFailure(e.getMessage(), "Provide a valid regex for property 'filterRegex'.").withPluginConfigCause("filterRegex"));
  }
  if (conf.sourceFileset.equals(conf.destinationFileset)) {
    // add validation error to stage configurer
    stageConfigurer.addFailure(new ValidationFailure("source and destination filesets must be different", "Provide different fileset for source and destination.")
            .withPluginConfigCause("sourceFileset").withPluginConfigCause("sourceFileset");
    stageConfigurer.addValidationFailure(new InvalidStageFailure("source and destination filesets must be different", stageName));
  }
}

Impact on UI

stagesrc pluginTypebatchsource",
pluginNameMock",pluginId "Mock" }] },InvalidInputSchemaInvalidschemafield 'id'. Different types of join keys found in source1 and source2Typeofjoinkeysfrom source1 and source2 must be of same type stringjoinerfieldid",
"inputStage": "source2InvalidInputSchemaUnsupportedtype'bytes'forinputschema field 'name'Changethetypeoftheschema field 'name' to be 'string'sinkfieldname, "inputStage": "database"
TypeDescriptionScenarioApproach - 1 - Json ResponseApproach - 2 - Json Response
StageErrorRepresents validation error while configuring the stageIf there is any error while connecting to sink while getting actual schema
{
"failures": [
{
      
"
type": "
StageError",
"
message": "
Could not load jdbc driver.",
"
correctiveAction": "
Make sure correct driver is uploaded.",
"
properties"
: 
{
 "stage": "src"
}
}
]
}
InvalidInputSchemaRepresents invalid schema field in input schemaIf the input schemas for joiner plugin is of different types



{
"failureserrors": [
{
"typemessage": "InvalidInputSchemaCould not load jdbc driver.",
      "messagecorrectiveAction" : "InvalidMake schemasure field 'id'. Different types of join keys found in source1 and source2correct driver is uploaded.",
"correctiveActioncauses": "Type[
of join keys from source1 and source2 must be of same type string", {
"properties": {
"stage": "joinersrc",
"field": "id",}
"inputStage": "source1"]
}
]
}
InvalidPropertyRepresents invalid configuration propertyIf config property value contains characters that are not allowed by underlying source or sink
{
"failures": [
{
"type": "
InvalidProperty",
"message": "
Can 
not 
specify both drop and keep.",
"correctiveAction": "
Either 
drop 
or 
keep 
should be empty",
"properties": {
"stage": "
projection",
"
configProperty": "
drop"
}
},
{
"type": "
InvalidProperty",
"message": "
Can 
not 
specify 
both 
drop 
and keep.",
"correctiveAction": "
Either 
drop 
or 
keep 
should 
be empty",
"properties": {
"stage": "
projection",
"
configProperty": "
keep"

}
}
]
}
{
"failureserrors": [
{
"message": "InvalidCan schemanot field 'id'. Different types of join keys found.specify both drop and keep",
      "correctiveAction" : "TypeEither ofdrop joinor keys from source1 and source2 must be of same type stringkeep should be empty",
"causes": [
{
"stage": "joinerprojection",
"inputStagestageConfig": "source1keep",
},
"inputField": "id"
},
{
"stage" : "joinerprojection",
"inputStagestageConfig" : "source2drop",
}
"inputField": "id"
]
}
]
},]
}
PluginNotFoundRepresents plugin not found error for a stage. This error will be added by the data pipeline appIf the plugin was not found. This error will be thrown from the data pipeline app
{
"failures": [
{
"messagetype": "Unsupported type 'bytes' for input schema field 'name'"PluginNotFound",
"message": "Plugin named 'Mock' of type 'batchsource' not found.",
"correctiveAction": "ChangePlease themake type ofsure the schema field 'nameMock' toplugin be 'string'is installed.",
"causesproperties": [
{
"stage": "sinksrc",
"inputStagepluginType": "databasebatchsource",
"inputFieldpluginName": "nameMock",
}"pluginId": "Mock"
]}
}
]
}
InvalidOutputSchemaRepresents invalid schema field in output schemaIf the output schema for the plugin is not compatible with underlying sink
{
"failures{
"errors": [
{
"typemessage": "InvalidOutputSchema",
Plugin named 'Mock' of type "message": "Invalid schema field 'email''batchsource' not found.",
      "correctiveAction" : "SchemaPlease shouldmake besure ofthe type 'stringMock' atplugin output port 'port'is installed.",
"propertiescauses": [
{
"stage": "splittersrc",
"field "pluginType": "batchsource",
"pluginName": "emailMock",
          "outputPortpluginId" : "portMock"
}
]
}
]
}
InvalidInputSchemaRepresents invalid schema field in input schemaIf the input schemas for joiner plugin is of different types
{
"errorsfailures": [
{
"messagetype": "Invalid schema field 'email'.InvalidInputSchema",
"correctiveActionmessage" : "SchemaInvalid shouldschema be offield type 'stringid'. Different attypes outputof port 'port'join keys found in source1 and source2.",
"causescorrectiveAction": [
{
"Type of join keys from source1 and source2 must be of same type string",
"properties": {
"stage": "splitterjoiner",
"outputPortfield": "portid",
"outputFieldinputStage": "emailsource1"
}
},
{
] "type": "InvalidInputSchema",
}
]
}

Conclusion

There are 2 contracts in this design. Programmatic contract between data pipeline app and plugins and another between data pipeline app and UI. Approach 1 provides well defined programmatic contract between plugins and data pipeline app. Using programmatic apis of Approach 1, its possible to throw the exception with all the collected errors at any point from plugin validation code in case one of the dependent properties are invalid. This makes the plugin validation code much more simpler. Hence, Approach 1
  "message": "Invalid schema field 'id'. Different types of join keys found in source1 and source2.",
"correctiveAction": "Type of join keys from source1 and source2 must be of same type string",
"properties": {
"stage": "joiner",
"field": "id",
"inputStage": "source2"
}
},
    {
"type": "InvalidInputSchema",
"message": "Unsupported type 'bytes' for input schema field 'name'.",
"correctiveAction": "Change the type of the schema field 'name' to be 'string'",
"properties": {
"stage": "sink",
"field": "name",
"inputStage": "database"
}
}
]
}
{
"failures": [
{
"message": "Invalid schema field 'id'. Different types of join keys found.",
"correctiveAction" : "Type of join keys from source1 and source2 must be of same type string",
"causes": [
{
"stage": "joiner",
"inputStage": "source1",
"inputField": "id"
},
{
"stage": "joiner",
"inputStage": "source2",
"inputField": "id"
}
]
},
    {
"message": "Unsupported type 'bytes' for input schema field 'name'.",
"correctiveAction": "Change the type of the schema field 'name' to be 'string'",
"causes": [
{
"stage": "sink",
"inputStage": "database",
"inputField": "name"
}
]
}
  ]
}


InvalidOutputSchemaRepresents invalid schema field in output schemaIf the output schema for the plugin is not compatible with underlying sink
{
"failures": [
{
"type": "InvalidOutputSchema",
"message": "Invalid schema field 'email'.",
"correctiveAction": "Schema should be of type 'string' at output port 'port'",
"properties": {
"stage": "splitter",
"field": "email",
"outputPort": "port"
}
}
]
}
{
"errors": [
{
"message": "Invalid schema field 'email'.",
      "correctiveAction" : "Schema should be of type 'string' at output port 'port'",
"causes": [
{
"stage": "splitter",
"outputPort": "port",
"outputField": "email"
}
]
}
]
}

Conclusion

There are 2 contracts in this design. Programmatic contract between data pipeline app and plugins and another between data pipeline app and UI. Approach 2 does not introduce concept of failure type. This means that contract with UI will be based on the cause attributes rather than the type. This means that if plugins creates a custom failure and uses any of the UI compatible attributes, the UI can still highlight them. Approach 2 also provides association between causes which represents the failure better in case there are multiple causes causing this failure. Hence, Approach 2 is suggested.

Related Jira

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-15578

Related Work

Releases

Release 6.1.0