Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations or schema. For example, the BigQuery sink plugin can have output schema which does not match with an underlying BigQuery table. CDAP pipeline developer can use use new validation endpoint endpoint to validate the stages before deploying the pipeline. In order to fail fast and for better user experience, validation endpoint should return all the validation errors from a given stage when this endpoint is called. 

Data pipeline app exposes various error types error types for plugin validation. In future releases, new error types can be introduced. When With current implementation, when plugins with new error types are pushed to hub, data pipeline artifacts need to be upgraded updated for every new type of error that is introduced. This is because the validation errors are defined in the data pipeline app itself. A better approach would be to modify data pipeline app so that app artifacts do not need to be replaced for every new type of error.

Goals

  • To fail fast and for better user experience, introduce a new api to collect multiple error messages from plugins validation error messages from a stage at configure time

  • Decouple various validation error types error types from data pipeline app
  • Instrument plugins to use this api to return multiple error messages error messages for validation endpoint

User Stories 

  • As a CDAP pipeline developer, when I validate a stage, I expect that all the invalid config properties and input/output schema fields are highlighted on CDAP UI with appropriate error messageerror message and corrective action.
  • As a plugin developer, I should be able to specify capture all the validation errors while configuring the plugin for better user experienceso that all the validation errors can be surfaced on CDAP UI.
  • As a plugin developer, I should be able to use new validation error types without replacing data pipeline app artifacts. 

Scenarios

  • Sources
  • Transforms
  • Sinks
  • Joiner
  • Splitter
  • API Changes for Plugin Validation

    Collect Multiple errors from plugins

    To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. If there are any validation errors added to stage configurer, the pipeline deployment will fail and all the errors will be returned as a response to stage validation REST endpoint.Current implementation does not expose stage name to the plugin Current implementation does not expose stage name to the plugin in configurePipeline method. Stage name will be needed by the plugins to create stage specific errors. For that stageName will be exposed to , stage name will be exposed to plugins through stage configurer as configurer as below.

    Code Block
    languagejava
    titleStageConfigurer.java
    public interface StageConfigurer {
    
      ...
    
    /**
     * getGet the stage name.
     * 
     * @return stage name
     */
    String getStageName();
    
    
    /**
     * Adds a addnew validation errors.failure to the configurer.
     *
     * @param failure a evalidation errorsfailure
     */
    void addValidationErroraddValidationFailure(ValidationErrorValidationFailure e);

    Decouple plugin error types from data pipeline app

    Approach - 1

    A new ValidationError
    failure);
    
    /**
     * Throws validation exception if there are any failures that are added to the configurer through
     * addValidationFailure method.
     *
     * @throws ValidationException if there are any validation failures being carried by the configurer
     */
    void throwIfFailure() throws ValidationException;

    Decouple plugin error types from data pipeline app

    Approach - 1

    To carry error information, a new ValidationFailure class is introduced to collect multiple validation errors failures in stage configurer. This class will be exposed by data pipeline app to the plugins. Each new type of error can be added to hydrator-common which can be added as compile time dependency to the plugins. This approach allows us to easily add more types of validation errors for plugins while removing a need to update data pipeline artifact for each new error type. Below are some of the error types that will be added to hydrator-common module.can be built using a ValidationFailureBuilder which only allows string properties. The builder expose methods to get message, type and properties of a failure. The validation failures are collected using ValidationException. Using this validation exception whenever plugin has an invalid property that is tied to another invalid property, plugin can throw a validation exception with all the errors collected so far. This keep plugin validation code much simpler. 

    Code Block
    languagejava
    titleValidationErrorValidationFailure.java
    /**
     * Represents an error thatcondition occurred during validation.
     */
    @Beta
    public class ValidationErrorValidationFailure {
      // privatetypes finalof Stringthe message;failures
      private static final String type;STAGE_ERROR = "StageError";
     /** private static final *String CreatesINVALID_PROPERTY an error with provided error message.
       * @param message error message
       */
      public ValidationError(String message) {
        this.message = message= "InvalidProperty";
      private static final String PLUGIN_NOT_FOUND = "PluginNotFound";
      private static final String INVALID_INPUT_SCHEMA = "InvalidInputSchema";
      private static final String INVALID_OUTPUT_SCHEMA = "InvalidOutputSchema";
    
      // this.typerepresents = getType();
      }
    
      /**
       * Returns the type of the error.
       */
      public String getType() {
        return "ERROR";
      }
    
      @Override
      public boolean equals(Object o) {
        if (this == o) {
          return true;
        }
        if (o == null || getClass() != o.getClass()) {
          return false;
        }
        ValidationError error = (ValidationError) o;
        return message.equals(error.message) &&
          type.equals(error.type);
      }
    
      @Override
      public int hashCode() {
        return Objects.hash(message, type);
      }
    }

    Introduced Errors

    Following errors will be added to hydrator-common. Note that this list will keep evolving as new types of errors are added to the module.

    Code Block
    languagejava
    titleInvalidStageError.java
    /**
     * Represents error that occurred during stage validation.
     */
    @Beta
    public class InvalidStageError extends ValidationError {stage name in the failure. It is a generic property used in all the failures for a given stage
      private static final String STAGE = "stage";
    
      // represents configuration property in InvalidProperty failure
      private static final String CONFIG_PROPERTY = "configProperty";
    
      // represents plugin id in PluginNotFound failure
      private static final String PLUGIN_ID = "pluginId";
      // represents plugin type in PluginNotFound failure
      private static final String PLUGIN_TYPE = "pluginType";
      // represents plugin name in PluginNotFound failure
      private static final String PLUGIN_NAME = "pluginName";
    
      // represents a field in InvalidInputSchema or InvalidOutputSchema failure
      private static final String FIELD = "field";
      // represents input stage in InvalidInputSchema failure
      private static final String INPUT_STAGE = "inputStage";
      // represents output port in InvalidOutputSchema failure
      private static final String OUTPUT_PORT = "outputPort";
    
      private final String message;
      private final String stagetype;
      private /**final String correctiveAction;
     * Createsprivate errorfinal thatMap<String, representsObject> aproperties;
    stage
    validation error. private ValidationFailure(String message, *String type, @Nullable String *correctiveAction,
    @param message error message    * @param stage name of the stage    */   public InvalidStageError(String message, String stage) {   Map<String, Object> super(message);properties) {
        this.stagemessage = stagemessage;
      }  this.type = @Overridetype;
        publicthis.correctiveAction String getType() {= correctiveAction;
        this.properties return "STAGE_ERROR"= properties;
      }
    
      /**
    @Override   public* booleanCreates equals(Object o) {
    a stage validation failure.
       if*
    (this == o) {* @param message validation failure message
     return true; * @param stage stage }name
       * if@param (ocorrectiveAction ==corrective nullaction
    || getClass() != o.getClass()) { */
      public static ValidationFailure createStageFailure(String message, returnString false;stage, @Nullable    }
        if (!super.equals(o)) {String correctiveAction) {
        Builder builder = return falsebuilder(message, STAGE_ERROR);
        }
        InvalidStageError that = (InvalidStageError) obuilder.setCorrectiveAction(correctiveAction).addProperty(STAGE, stage);
        return Objectsbuilder.equals(stage, that.stagebuild();
      }
    
      @Override/**
      public int* hashCode()Creates {a config property validation failure.
    return Objects.hash(super.hashCode(), stage);   *
      } }
    Code Block
    languagejava
    titleInvalidStagePropertyError.java
    /**
     * Represents invalid stage property error that occurred during stage validation.
     */
    @Beta
    public class InvalidStagePropertyError extends InvalidStageError {
      private final String property;
    
      /**
       * Creates error that represents invalid stage property.
       *
       * @param message error message
       * @param stage name of the stage
       * @param property invalid property
       */
      public InvalidStagePropertyError(String message, String stage, String property) {
        super(message, stage* @param message validation failure message
       * @param stage stage name
       * @param correctiveAction corrective action
       */
      public static ValidationFailure createConfigPropertyFailure(String message, String stage,
                                                                  String property, @Nullable String correctiveAction) {
        Builder builder = builder(message, INVALID_PROPERTY);
        this.property = property;builder.setCorrectiveAction(correctiveAction)
         } .addProperty(STAGE, stage).addProperty(CONFIG_PROPERTY, property);
    @Override   public Stringreturn getTypebuilder.build();
    {  }
    
     return "INVALID_FIELD";
      }
    
      @Override
      public boolean equals(Object o) {
        if (this == o) {
          return true;
        }
        if (o == null || getClass() != o.getClass()) { /**
       * Creates a plugin not found validation failure.
       *
       * @param message validation failure message
       * @param stage stage name
       * @param correctiveAction corrective action
       */
      public static ValidationFailure createPluginNotFoundFailure(String message, String stage,
           return false;     }     if (!super.equals(o)) {       return false;     }     InvalidStagePropertyError that = (InvalidStagePropertyError) o;     return property.equals(that.property);   }    @Override   public int hashCode() { String pluginId, String pluginName, return Objects.hash(super.hashCode(), property);String pluginType,
        } } 
    Code Block
    languagejava
    titleInvalidInputSchemaFieldError.java
    /**  * Represents invalid schema field in input schema.  */ public class InvalidInputSchemaFieldError extends InvalidStageError {   private final String field;    /**    * Creates error that represents invalid schema field in input schema.    * @param message error message    * @param stage name@Nullable ofString thecorrectiveAction) stage{
       * @paramBuilder fieldbuilder field that is invalid in input schema
       */
      public InvalidInputSchemaFieldError(String message, String stage, String field) {= builder(message, PLUGIN_NOT_FOUND);
        builder.setCorrectiveAction(correctiveAction)
          .addProperty(STAGE, stage).addProperty(PLUGIN_ID, pluginId)
          super(message.addProperty(PLUGIN_TYPE, stagepluginType);
          this.field = field.addProperty(PLUGIN_NAME, pluginName);
      }  return builder.build();
    @Override  }
    public
    String getType() { /**
       * return "INVALID_INPUT_SCHEMA";
      }
    Creates a invalid input schema failure.
      @Override *
     public boolean equals(Object o) {
        if (this == o) {
     * @param message validation failure message
       * @param stage stage name
       * return@param true;field input schema field
     }  * @param inputStage ifoptional (o == null || getClass() != o.getClass()) {
    input stagename. This is applicable to plugins of type {@link Joiner}.
       * @param returncorrectiveAction false;optional corrective action
      } * @return invalid input if (!super.equals(o)) {schema validation failure
       */
      returnpublic false;static ValidationFailure createInputSchemaFailure(String message, String }stage, String field,
      InvalidInputSchemaFieldError that = (InvalidInputSchemaFieldError) o;     return field.equals(that.field);   }    @Override   public int hashCode() {       return Objects.hash(super.hashCode(), field);   } }
    Code Block
    languagejava
    titleInvalidOutputSchemaFieldError.java
    /**  * Represents invalid schema field in output schema.  */ public class InvalidOutputSchemaFieldError extends InvalidStageError {   private final@Nullable String field;inputStage,
        /**    * Creates error that represents invalid schema field in output schema.    * @param message error message    * @param stage name of the stage    * @param field field that is invalid in output schema    */   public InvalidOutputSchemaFieldError(String message, String@Nullable stage, String fieldcorrectiveAction) {
        super(message, stage);...
      }
    
     this.field = field; /**
       }* Creates a invalid @Overrideoutput schema failure.
    public String getType() {*
       * return "INVALID_OUTPUT_SCHEMA";
      }@param message validation failure message
       @Override* @param stage publicstage booleanname
    equals(Object o) { * @param field output ifschema (thisfield
    == o) { * @param outputPort optional output port. returnThis true;is applicable to plugins of type {@link SplitterTransform}.
       * @param correctiveAction optional corrective ifaction
    (o == null || getClass() != o.getClass()) {
     * @return invalid output schema validation failure
       */
    return false; public static ValidationFailure createOutputSchemaFailure(String }message, String stage, String field,
    if (!super.equals(o)) {       return false;     }     InvalidOutputSchemaFieldError that = (InvalidOutputSchemaFieldError) o;     return field.equals(that.field);   }    @Override   public int hashCode() {     return Objects.hash(super.hashCode(), field);   } }

    API usage in plugins

    Code Block
    @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { @Nullable  pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);String outputPort,
       StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();   // get the name of the stage    String stageName = stageConfigurer.getStageName();   try {     Pattern.compile(conf.filterRegex);   } catch  (Exception e) {       // add validation error to stage configurer     stageConfigurer.addValidationError(new InvalidStagePropertyError(e.getMessage(), stageName, "filterRegex"));
      } @Nullable String correctiveAction) {
      if (conf..sourceFileset.equals(conf.destinationFileset)) {
    .
      }
    
      //**
     add validation error* toReturns stagea configurerbuilder for creating a {@link stageConfigurer.addValidationError(new InvalidStageError("source and destination filesets must be different", stageName)ValidationFailure}.
       */
      public static Builder builder(String message, String type) {
        return new Builder(message, type);
      }
    
      /**
       * A builder to create {@link }

    Approach - 2

    Validation error represents an error with various causes with extra properties that caused the error. For example, when the input schema field type does not match the underlying sink schema, information such as error message, stage name, field name, suggested type will be useful information to have in the validation error. Each error message can be associated to more than one causes. This can happen for plugins such as joiner and splitter where there are multiple input or output schemas from a given stage. For example, when input schemas for joiner are not compatible, the validation error information should contain error message along with the fields from both the stages that caused this error. This means that a validation error can be represented as a list of causes where each cause is a map of string to object as shown below.
    ValidationFailure} instance.
       */
      public static class Builder {
        private final String message;
        private final String type;
        private String correctiveAction;
        private final Map<String, Object> properties;
    
        private Builder(String message, String type) {
          this.message = message;
          this.type = type;
          this.properties = new HashMap<>();
        }
    
        /**
         * Sets corrective action to rectify the failure.
         *
         * @param correctiveAction corrective action
         * @return this builder
         */
        public Builder setCorrectiveAction(String correctiveAction) {
          this.correctiveAction = correctiveAction;
          return this;
        }
    
        /**
         * Adds a property to the failure.
         *
         * @param property the name of the property
         * @param value the value of the property
         * @return this builder
         */
        public Builder addProperty(String property, String value) {
          this.properties.put(property, value);
          return this;
        }
    
        /**
         * Creates a new instance of {@link ValidationFailure}.
         *
         * @return instance of {@link ValidationFailure}
         */
        public ValidationFailure build() {
          return new ValidationFailure(message, type, correctiveAction,
                                       Collections.unmodifiableMap(new HashMap<>(properties)));
        }
      }
    
      /**
       * Returns message of this failure.
       */
      public String getMessage() {
        return message;
      }
    
      /**
       * Returns type of this failure.
       */
      public String getType() {
        return type;
      }
    
      /**
       * Returns corrective action for this failure.
       */
      @Nullable
      public String getCorrectiveAction() {
        return correctiveAction;
      }
    
      /**
       * Returns properties of this failure.
       */
      public Map<String, Object> getProperties() {
        return properties;
      }
    
      @Override
      public boolean equals(Object o) {
        if (this == o) {
          return true;
        }
        if (o == null || getClass() != o.getClass()) {
          return false;
        }
        ValidationFailure that = (ValidationFailure) o;
        return message.equals(that.message) && type.equals(that.type) &&
          Objects.equals(correctiveAction, that.correctiveAction) &&
          properties.equals(that.properties);
      }
    
      @Override
      public int hashCode() {
        return Objects.hash(message, type, correctiveAction, properties);
      }
    
      @Override
      public String toString() {
        return "ValidationFailure{" +
          "message='" + message + '\'' +
          ", type='" + type + '\'' +
          ", correctiveAction='" + correctiveAction + '\'' +
          ", properties=" + properties +
          '}';
      }
    }
    Code Block
    languagejava
    titleValidationException.java
    /**
     * Validation exception that carries multiple validation failures.
     */
    @Beta
    public class ValidationException extends RuntimeException {
      private List<ValidationFailure> failures;
    
      public ValidationException(List<ValidationFailure> failures) {
        super(failures.isEmpty() ? "Validation Exception occurred." : failures.iterator().next().getMessage());
        this.failures = failures;
      }
    
      /**
       * Returns list of failures.
       */
      public List<ValidationFailure> getFailures() {
        return failures;
      }
    }


    API usage in plugins

    Code Block
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
      pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
      StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
      // get the name of the stage 
      String stageName = stageConfigurer.getStageName();
      try {
        Pattern.compile(conf.filterRegex);
      } catch (Exception e) {  
        // add validation failure to stage configurer
        stageConfigurer.addValidationFailure(ValidationFailure.createConfigPropertyFailure(e.getMessage(), stageName, "filterRegex", "Make sure the file regex is correct"));
        // plugin can choose to terminate processing here
        stageConfigurer.throwIfFailure();
      }
      if (conf.sourceFileset.equals(conf.destinationFileset)) {
        // add validation failure to stage configurer
        stageConfigurer.addValidationFailure(ValidationFailure.createStageFailure("source and destination filesets must be different", stageName, "Provide different source and destination filesets"));
      }
    }


    Approach - 2

    Validation error represents an error with various causes with different attributes for each cause. For example, when the input schema field type does not match the underlying sink schema, the cause is input field mismatch with attributes such as stage name, field name, suggested type etc. Each error message can be associated to more than one causes. This can happen for plugins such as joiner and splitter where there are multiple input or output schemas from a given stage. For example, when input schemas for joiner are not compatible, the causes will include mismatching fields from input schemas of incoming stages. This means that a validation error can be represented as a list of causes where each cause is a map of cause attribute to its value as shown below.

    Code Block
    languagejava
    titleFailureCollector.java
    /**
     * Failure collector is responsible to collect {@link ValidationFailure}s.
     */
    @Beta
    public interface FailureCollector {
    
      /**
       * Add a validation failure to this failure collector. The method returns the validation failure that was added to
       * the failure collector. This failure can be used to add additional {@link ValidationFailure.Cause}s.
       * For example,
       * <code>failureCollector.addFailure("message", "action").withConfigProperty("configProperty");</code>
       *
       * @param message failure message
       * @param correctiveAction corrective action
       * @return a validation failure
       * @throws UnsupportedOperationException if the implementation does not override this method
       */
      default ValidationFailure addFailure(String message, @Nullable String correctiveAction) {
        throw new UnsupportedOperationException("Adding a failure is not supported.");
      }
    
      /**
       * Throws validation exception if there are any failures that are added to the failure collector through
       * {@link #addFailure(String, String)}.
       * If no failures are added to the collector, it will return a {@link ValidationException} with empty failure list.
       *
       * <pre>
       *   String someMethod() {
       *   switch (someVar) {
       *     // cases
       *   }
       *   // if control comes here, it means failure
       *   failureCollector.addFailure(...);
       *   // throw validation exception so that compiler knows that exception is being thrown which eliminates the need to
       *   // have a statement that returns null towards the end of this method
       *   throw failureCollector.getOrThrowException();
       * }
       * </pre>
       *
       * @return returns a {@link ValidationException} if no failures were added to the collector
       * @throws ValidationException exception indicating validation failures
       * @throws UnsupportedOperationException if the implementation does not override this method
       */
      default ValidationException getOrThrowException() throws ValidationException {
        throw new UnsupportedOperationException("Throwing failures is not supported.");
      }
    }
    Code Block
    public interface StageConfigurer {
      ....
    
      /**
       * Returns a failure collector for the stage.
       *
       * @return a failure collector
       * @throws UnsupportedOperationException if the implementation does not override this method
       */
      default FailureCollector getFailureCollector() {
        throw new UnsupportedOperationException("Getting failure collector is not supported.");
      }
    }
    
    
    Code Block
    languagejava
    titleValidationException.java
    /**
     * Validation exception that carries multiple validation failures.
     */
    @Beta
    public class ValidationException extends RuntimeException {
      private final List<ValidationFailure> failures;
    
      /**
       * Creates a validation exception with list of failures.
       *
       * @param failures list of validation failures
       */
      public ValidationException(List<ValidationFailure> failures) {
        super("Errors were encountered during validation.");
        this.failures = Collections.unmodifiableList(new ArrayList<>(failures));
      }
    
      /**
       * Returns a list of validation failures.
       */
      public List<ValidationFailure> getFailures() {
        return failures;
      }
    }
    Code Block
    languagejava
    titleValidationFailure.java
    /**
     * Represents a failure condition occurred during validation.
     */
    @Beta
    public class ValidationFailure {
      private static final Gson GSON = new Gson();
      private final String message;
      private final String correctiveAction;
      private final List<Cause> causes;
    
      /**
       * Creates a validation failure with provided message.
       *
       * @param message validation failure message
       */
      public ValidationFailure(String message) {
        this(message, null);
      }
    
      /**
       * Creates a validation failure with provided message and corrective action.
       *
       * @param message validation failure message
       * @param correctiveAction corrective action
       */
      public ValidationFailure(String message, @Nullable String correctiveAction) {
        this.message = message;
        this.correctiveAction = correctiveAction;
        this.causes = new ArrayList<>();
      }
    
      /**
       * Adds provided cause to this validation failure.
       *
       * @param cause cause of validation failure
       * @return validation failure with provided cause
       */
      public ValidationFailure withCause(Cause cause) {
        causes.add(cause);
        return this;
      }
    
      /**
       * Adds cause attributes that represents plugin not found failure cause.
       *
       * @param pluginId plugin id
       * @param pluginName plugin name
       * @param pluginType plugin type
       * @return validation failure with plugin not found cause
       */
      public ValidationFailure withPluginNotFound(String pluginId, String pluginName, String pluginType) {
        return withPluginNotFound(pluginId, pluginName, pluginType, null, null);
      }
    
      /**
       * Adds cause attributes that represents plugin not found failure cause.
       *
       * @param pluginId plugin id
       * @param pluginName plugin name
       * @param pluginType plugin type
       * @param requestedArtifact requested artifact
       * @param suggestedArtifact suggested artifact
       * @return validation failure with plugin not found cause
       */
      public ValidationFailure withPluginNotFound(String pluginId, String pluginName, String pluginType,
                                                  @Nullable ArtifactId requestedArtifact,
                                                  @Nullable ArtifactId suggestedArtifact) {
        Cause cause = new Cause().addAttribute(CauseAttributes.PLUGIN_ID, pluginId)
          .addAttribute(CauseAttributes.PLUGIN_NAME, pluginName)
          .addAttribute(CauseAttributes.PLUGIN_TYPE, pluginType);
        if (requestedArtifact != null) {
          cause.addAttribute(CauseAttributes.REQUESTED_ARTIFACT_NAME, requestedArtifact.getName());
          cause.addAttribute(CauseAttributes.REQUESTED_ARTIFACT_SCOPE, requestedArtifact.getScope().name());
          cause.addAttribute(CauseAttributes.REQUESTED_ARTIFACT_VERSION, requestedArtifact.getVersion().getVersion());
        }
    
        if (suggestedArtifact != null) {
          cause.addAttribute(CauseAttributes.SUGGESTED_ARTIFACT_NAME, suggestedArtifact.getName());
          cause.addAttribute(CauseAttributes.SUGGESTED_ARTIFACT_SCOPE, suggestedArtifact.getScope().name());
          cause.addAttribute(CauseAttributes.SUGGESTED_ARTIFACT_VERSION, suggestedArtifact.getVersion().getVersion());
        }
        causes.add(cause);
        return this;
      }
    
      /**
       * Adds cause attributes that represents invalid stage configure property failure cause.
       *
       * @param stageConfigProperty stage config property
       * @return validation failure with invalid stage config property cause
       */
      public ValidationFailure withConfigProperty(String stageConfigProperty) {
        causes.add(new Cause().addAttribute(CauseAttributes.STAGE_CONFIG, stageConfigProperty));
        return this;
      }
    
      /**
       * Adds cause attributes for failure cause that represents an invalid element in the list associated with given stage
       * configure property.
       *
       * @param stageConfigProperty stage config property
       * @param element element in the list associated by a given stageConfigProperty
       * @return validation failure with invalid stage config property element cause
       */
      public ValidationFailure withConfigElement(String stageConfigProperty, String element) {
        causes.add(new Cause().addAttribute(CauseAttributes.STAGE_CONFIG, stageConfigProperty)
                     .addAttribute(CauseAttributes.CONFIG_ELEMENT, element));
        return this;
      }
    
      /**
       * Adds cause attributes that represents invalid input schema field failure cause.
       *
       * @param fieldName name of the input schema field
       * @param inputStage stage name
       * @return validation failure with invalid input schema field cause
       */
      public ValidationFailure withInputSchemaField(String fieldName, @Nullable String inputStage) {
        Cause cause = new Cause().addAttribute(CauseAttributes.INPUT_SCHEMA_FIELD, fieldName);
        cause = inputStage == null ? cause : cause.addAttribute(CauseAttributes.INPUT_STAGE, inputStage);
        causes.add(cause);
        return this;
      }
    
      /**
       * Adds cause attributes that represents invalid output schema field failure cause.
       *
       * @param fieldName name of the output schema field
       * @param outputPort stage name
       * @return validation failure with invalid output schema field cause
       */
      public ValidationFailure withOutputSchemaField(String fieldName, @Nullable String outputPort) {
        Cause cause = new Cause().addAttribute(CauseAttributes.OUTPUT_SCHEMA_FIELD, fieldName);
        cause = outputPort == null ? cause : cause.addAttribute(CauseAttributes.OUTPUT_PORT, outputPort);
        causes.add(cause);
        return this;
      }
    
      /**
       * Adds cause attributes that represents a stacktrace.
       *
       * @param stacktraceElements stacktrace for the error
       * @return validation failure with stacktrace
       */
      public ValidationFailure withStacktrace(StackTraceElement[] stacktraceElements) {
        causes.add(new Cause().addAttribute(CauseAttributes.STACKTRACE, GSON.toJson(stacktraceElements)));
        return this;
      }
    
      /**
       * Returns failure message.
       */
      public String getMessage() {
        return message;
      }
    
      /**
       * Returns corrective action for this failure.
       */
      @Nullable
      public String getCorrectiveAction() {
        return correctiveAction;
      }
    
      /**
       * Returns causes that caused this failure.
       */
      public List<Cause> getCauses() {
        return causes;
      }
    
      @Override
      public boolean equals(Object o) {
        if (this == o) {
          return true;
        }
        if (o == null || getClass() != o.getClass()) {
          return false;
        }
        ValidationFailure failure = (ValidationFailure) o;
        return message.equals(failure.message) &&
          Objects.equals(correctiveAction, failure.correctiveAction) && causes.equals(failure.causes);
      }
    
      @Override
      public int hashCode() {
        return Objects.hash(message, correctiveAction, causes);
      }
    
      /**
       * Represents a cause of a failure.
       */
      @Beta
      public static class Cause {
        private final Map<String, String> attributes;
    
        /**
         * Creates a failure cause.
         */
        public Cause() {
          this.attributes = new HashMap<>();
        }
    
        /**
         * Adds an attribute to this cause.
         *
         * @param attribute cause attribute name
         * @param value cause attribute value
         * @return this cause
         */
        public Cause addAttribute(String attribute, String value) {
          attributes.put(attribute, value);
          return this;
        }
    
        /**
         * Returns value of the provided cause attribute.
         *
         * @param attribute attribute name
         */
        public String getAttribute(String attribute) {
          return attributes.get(attribute);
        }
    
        /**
         * Returns all the attributes of the cause.
         */
        public Map<String, String> getAttributes() {
          return Collections.unmodifiableMap(new HashMap<>(attributes));
        }
    
        @Override
        public boolean equals(Object o) {
          if (this == o) {
            return true;
          }
          if (o == null || getClass() != o.getClass()) {
            return false;
          }
          Cause cause = (Cause) o;
          return attributes.equals(cause.attributes);
        }
    
        @Override
        public int hashCode() {
          return Objects.hash(attributes);
        }
      }
    }


    All the attributes of a cause can be tracked at central location as below: 

    Code Block
    languagejava
    titleValidationErrorCauseAttributes.java
    /**
     * RepresentsCause error that occurred during validationattributes constants.
     */
    @Beta
    public class ValidationErrorCauseAttributes {
    
     private final String message;
      protected final List<Map<String, Object>> causes;
    
      /**
       * Creates a validation error with a message and empty map of causes
       * @param message
       */
      public ValidationError(String message) {
        this.message = message;	
        this.causes = new ArrayList<>();
      }
     
      @Override
       public boolean equals(Object o) {
        if (this == o) {
          return true;
        }
        if (o == null || getClass() != o.getClass()) {
          return false;
        }
        ValidationError that = (ValidationError) o;
        return message.equals(that.message) && causes.equals(that.causes);
      }
    
      @Override
      public int hashCode() {
        return Objects.hash(message, causes);
      }
    }

    All the attributes of an error can be tracked at central location as below: 

    Code Block
    languagejava
    titleErrorAttributes.java
    /**
     * Error attributes.
     */
    public enum  ErrorAttributes {
      STAGE("stage"), // represents stage being validated
      FIELD("field"), // represents input/output schema field
      PROPERTY("property"), // represents stage property
      OUTPUT_PORT("output_port"), // represents output port for plugins such as SplitterTransform where multiple output schemas are expected
      INPUT_STAGE("input_stage"), // represents input stage for plugins such as Joiner where multiple input schemas are expected
      ..
    
      private String name;
    
      ErrorAttributes(String name) {
        this.name = name;
      }
    }

    Introduced Errors

    With this approach following error classes can be added to hydrator-common which represents specific type of errors.

    Code Block
    languagejava
    titleInvalidStageError.java
    /**
     * Represents error that occurred during stage validation.
     */
    @Beta
    public class InvalidStageError extends ValidationError {
      /**
       * Creates validation error that occurred during stage validation.
       * @param message error message
       * @param stage name of the stage that caused this validation error
       */
      public InvalidStageError(String message, String stage) {
        super(message);
        causes.add(Collections.singletonMap("stage", stage));
      }
    }
    Code Block
    languagejava
    titleInvalidStagePropertyError.java
    /**
     * Represents error that occurred during stage config property validation.
     */
    @Beta
    public class InvalidStagePropertyError extends ValidationError {
     /**
       * Creates validation error that occurred during stage validation.
       * @param message error message
       * @param stage name of the stage that caused this validation error
       * @param property property that is invalid
       */
      public InvalidStageError(String message, String stage, String property) {
        super(message);
        Map<String, Object> map = new HashMap<>();
        map.put("stage", stage);
        map.put("property", property);
        causes.add(map);
      }
    }
    Code Block
    /**
     * Represents invalid input schema error. 
     */
    public class InvalidInputSchemaError extends NewValidationError {
    
      /**
       * Creates invalid input schema error.
       * @param message error message
       * @param stage name of the stage 
       * @param map map of incoming stage name to field that is invalid.
       */
      public InvalidInputSchemaError(String message, String stage, Map<String, String> map) {
        super(message);
        for (Map.Entry<String, String> entry : map.entrySet()) {
          Map<String, Object> causeMap = new HashMap<>();
          causeMap.put("stage", stage);
          causeMap.put("input_stage", entry.getKey());
          causeMap.put("field", entry.getValue());
          causes.add(causeMap);
        }
      }
    }
    Code Block
    languagejava
    titleInvalidOutputSchemaError.java
    /**
     * Represents invalid output schema error.
     */
    public class InvalidOutputSchemaError extends NewValidationError {
    
      /**
       * Creates invalid output schema error.
       * @param message error message
       * @param stage name of the stage
       * @param map map of output going port name to field that is invalid
       */
      public InvalidOutputSchemaError(String message, String stage, Map<String, String> map) {
        super(message);
        for (Map.Entry<String, String> entry : map.entrySet()) {
          Map<String, Object> causeMap = new HashMap<>();
          causeMap.put("stage", stage);
          causeMap.put("output_port", entry.getKey());
          causeMap.put("field", entry.getValue());
          causes.add(causeMap);
        }
      }
    }

    API usage in plugins

    The above api usage in the plugins is same as approach-1.

    Code Block
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
      pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
      StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
      // get the name of the stage 
      String stageName = stageConfigurer.getStageName();
      try {
        Pattern.compile(conf.filterRegex);
      } catch (Exception e) {  
        // add validation error to stage configurer
        stageConfigurer.addValidationError(new InvalidStagePropertyError(e.getMessage(), stageName, "filterRegex"));
      }
      if (conf.sourceFileset.equals(conf.destinationFileset)) {
        // add validation error to stage configurer
        stageConfigurer.addValidationError(new InvalidStageError("source and destination filesets must be different", stageName));
      }
    }

    Impact on UI

    TypeDescriptionScenarioApproach - 1 - Json ResponseApproach - 2 - Json ResponseSTAGE_ERRORRepresents validation error while configuring the stageIf there is any error while connecting to sink while getting actual schema
    {
    "errors": [
    {
    "type" : "STAGE_ERROR", "stage" : "src", "message" : "source and destination filesets must be different"
     // Represents stage configuration property failure
      public static final String STAGE_CONFIG = "stageConfig";
      // Represents an element in the list of elements associated with a stage config property.
      // For example, in projection transform, config property 'keep' represents a list of input fields to keep. Below
      // cause attribute can be used to represent an invalid field in 'keep' config property
      public static final String CONFIG_ELEMENT = "configElement";
    
      // Represents id of the plugin
      public static final String PLUGIN_ID = "pluginId";
      // Represents type of the plugin
      public static final String PLUGIN_TYPE = "pluginType";
      // Represents name of the plugin
      public static final String PLUGIN_NAME = "pluginName";
      // Represents requested artifact name
      public static final String REQUESTED_ARTIFACT_NAME = "requestedArtifactName";
      // Represents requested artifact name
      public static final String REQUESTED_ARTIFACT_VERSION = "requestedArtifactVersion";
      // Represents requested artifact scope
      public static final String REQUESTED_ARTIFACT_SCOPE = "requestedArtifactScope";
      // Represents suggested artifact name
      public static final String SUGGESTED_ARTIFACT_NAME = "suggestedArtifactName";
      // Represents suggested artifact name
      public static final String SUGGESTED_ARTIFACT_VERSION = "suggestedArtifactVersion";
      // Represents suggested artifact scope
      public static final String SUGGESTED_ARTIFACT_SCOPE = "suggestedArtifactScope";
    
      // Represents input stage
      public static final String INPUT_STAGE = "inputStage";
      // represents field of input stage schema
      public static final String INPUT_SCHEMA_FIELD = "inputField";
    
      // Represents a stage output port
      public static final String OUTPUT_PORT = "outputPort";
      // Represents a field of output stage schema
      public static final String OUTPUT_SCHEMA_FIELD = "outputField";
    
      // Represents a stacktrace
      public static final String STACKTRACE = "stacktrace";
    }


    API usage in plugins

    Code Block
    languagejava
    @Override
    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
      pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
      FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
      try {
        Pattern.compile(conf.filterRegex);
      } catch (Exception e) {
        collector.addFailure("Error encountered while compiling filter regex: " + e.getMessage(),
                             "Make sure filter regex is valid.").withConfigProperty("filterRegex");
      }
      if (conf.sourceFileset.equals(conf.destinationFileset)) {
        collector.addFailure("Source and destination filesets must be different",
                             "Make sure source and destination filesets are different")
          .withConfigProperty("sourceFileset").withConfigProperty("destinationFileset");
      }
    }

    Impact on UI

    TypeDescriptionScenarioApproach - 1 - Json ResponseApproach - 2 - Json Response
    StageErrorRepresents validation error while configuring the stageIf there is any error while connecting to sink while getting actual schema
    {
    "failures": [
    {
    "type": "StageError",
    "message": "Could not load jdbc driver class.",
    "correctiveAction": "Make sure correct driver is available.",
    "properties": {
    "stage": "src"
    }
    }
    ]
    }



    {
    "errors": [
    {
    "message": "Could not load jdbc driver class.",
          "correctiveAction" : "Make sure correct driver is available.",
    "causes": [
    {
    "stage": "src"
    }
    ]
    }
    ]
    }
    InvalidPropertyRepresents invalid configuration propertyIf config property value contains characters that are not allowed by underlying source or sink
    {
    "failures": [
    {
    "type": "InvalidProperty",
    "message": "Property 'millis' should be more than 0.",
    "correctiveAction": "Make sure 'millis' is greater than 0.",
    "properties": {
    "stage": "transform",
    "configProperty": "millis"
    }
    }
    ]
    }
    {
    "errors": [
    {
    "message": "Property 'millis' should be more than 0.",
          "correctiveAction" : "Make sure 'millis' is greater than 0.",
    "causes": [
    {
    "stage": "transform",
    "stageConfig": "millis"
    }
    ]
    }
    ]
    }
    PluginNotFoundRepresents plugin not found error for a stage. This error will be added by the data pipeline appIf the plugin was not found. This error will be thrown from the data pipeline app
    {
    "failures": [
    {
    "type": "PluginNotFound",
    "message": "Plugin named 'Mock' of type 'batchsource' not found.",
    "correctiveAction": "Please make sure the 'Mock' plugin is installed.",
    "properties": {
    "stage": "src",
    "pluginType": "batchsource",
    "pluginName": "Mock",
    "pluginId": "Mock"
    }
    }
    ]
    }
    {
    "errors": [
    {
    "message": "Plugin
    {
    named 'Mock' of type 'batchsource' not found.",
          "
    message
    correctiveAction" : "
    source
    Please make 
    and
    sure 
    destination
    the 
    filesets
    'Mock' 
    must
    plugin 
    be
    is 
    different
    installed.",
    "causes": [
    {
    "stage": "src"

    }
    ,
    ]
       
    }
    ]
    }INVALID_PROPERTYRepresents invalid configuration propertyIf config property value contains characters that are not allowed by underlying source or sink{
      "
    errors
    pluginType":
    [
     
    { "type" : "INVALID_PROPERTY",
    "batchsource",
    "
    stage
    pluginName"
    : "
    src
    Mock",
              "
    message
    pluginId" : "
    Invalid
    Mock"
    config
     
    for
     
    property
     
    'port'",
     

      }
    "property"
     
    :
     
    "port"
    ]
    }
    ]
    }
    InvalidInputSchemaRepresents invalid schema field in input schemaIf the input schemas for joiner plugin is of different types
    {
    "
    errors
    failures": [
    {
    "
    message": "Invalid config for property 'port'",
    "causes": [
    {
    type": "InvalidInputSchema",
    "
    stage
    message": "
    src",
    "property": "port"
    Invalid schema field 'id'. Different types of join keys found in source1 and source2.",
    "correctiveAction": "Type
    }
    of join keys from source1 and 
    ]
    source2 must be of 
    }
    same 
    ]
    }PLUGIN_NOT_FOUNDRepresents plugin not found error for a stageIf the plugin was not found. This error will be thrown from the data pipeline app{
    "errors": [
    {
    type string",
    "properties": {
    "stage": "joiner",
    "field": "id",
    "
    stage
    inputStage": "
    src"
    source1"
    }
    },
    {
    "type": "
    PLUGIN_NOT_FOUND
    InvalidInputSchema",
    "message": "
    Plugin
    Invalid schema 
    named
    field '
    Mock
    id'. Different types of 
    type 'batchsource' not found
    join keys found in source1 and source2.",
    "
    pluginType
    correctiveAction": "
    batchsource",
    "pluginName": "Mock
    Type of join keys from source1 and source2 must be of same type string",
    "
    requestedArtifact
    properties": {
    "
    scope
    stage": "
    USER
    joiner",
    "
    name
    field": "
    app-mocks-ghost
    id",
    "
    version
    inputStage": "
    1.0.0
    source2"
    }
    }
      ]
    }
    {
    "
    errors
    failures": [
    {
    "message": "Different types of join keys found.",
    "correctiveAction" : "
    Plugin named 'Mock' of type 'batchsource' not found
    Type of join keys from all the sources must be same.",
    "causes": [
    {
    "stage": "
    src
    joiner",
    "
    pluginType
    joinKey": "
    batchsource"
    source1.id,source2.id"
    },
            {
    "stage": "joiner",
    	  "
    pluginName
    inputStage": "
    Mock
    source1",
    "
    requestedArtifact
    inputField": 
    {
    "id"
    },
         
    "scope":
     
    "USER",
      {
    "
    name
    stage": "
    app-mocks-ghost
    joiner",
              
    "
    version
    inputStage": "
    1.0.0
    source2",
    }
    "inputField": "id"
    }
    ]
    }
    ]
    }
    INVALID_INPUT_SCHEMA
    InvalidOutputSchemaRepresents invalid schema field in
    input
    output schemaIf the
    input
    output schema for the plugin is not
    supported by underlying plugin{
    "errors": [
    {
    compatible with underlying sink
    {
    "failures": [
    {
    "type": "InvalidOutputSchema",
    "message": "Invalid schema field 'email'.",
    "correctiveAction": "Schema should be of type 'string' at output port 'port'",
    "
    type
    properties"
    : 
    "INVALID_INPUT_SCHEMA",
    {
    "stage"
    : "
    joiner
    splitter",
    "
    message
    field"
    : "email"
    Invalid
    ,
    schema
     
    field
     
    'id'.
     
    It
     
    should
     
    be
     
    of
     
    type 'string'",
    "outputPort": "port"
    "field" : "id"
    }
    }
    ]
    }
    {
    "errors": [
    {
    "message": "Invalid schema field '
    id
    email'.
    It
    ",
          "correctiveAction" : "Schema should be of type 'string' at output port 'port'",
    "causes": [
    {
    "stage": "
    joiner
    splitter",
    "
    input_stage
    outputPort": "
    source1
    port",
    "
    field
    outputField": "
    id
    email"
    }
    ,

    {
    ]
    "stage" : "joiner",
    "input_stage" : "source2",
    }
    ]
    }
    InvalidFieldInPropertyRepresents an invalid field in property listIf the property represents list of fields, the failure should include the property name along with invalid field
    {
    "failures": [
    {
    "
    field
    type"
    : "
    id
    InvalidFieldInProperty",
    "message": "Unique
    }
    field 'name' does not exist in 
    ]
    the input schema.",
    }
      
    ]
    }INVALID_OUTPUT_SCHEMARepresents invalid schema field in output schemaIf the output schema for the plugin is not compatible with underlying sink{
      "
    errors
    correctiveAction":
    [
    {
     "Make sure 'name' field is a correct field.",
    "
    type
    properties"
    : 
    "INVALID_OUTPUT_SCHEMA",
    {
    "stage"
    : "
    splitter
    Deduplicate",
    "
    message
    field"
    : "uniqueFields"
    Invalid
    ,
    schema
     
    field
     
    'email'.
     
    It
     
    should
     
    be
     
    of type 'string' at port 'port'",
    "configElement": "name"
    "field" : "email"
    }
    }
    ]
    }


    {
    "errors": [
    {
    "message": "
    Invalid
    Unique 
    schema
    field '
    email'. It should be of type 'string'
    name' does not exist in the input schema.",
    "correctiveAction" : "Make sure 'name' field is a correct field.",
    "causes": [
    {
    "stage": "
    splitter
    Deduplicate",
    "
    output_port
    outputPort": "
    port
    uniqueFields",
    "
    field
    outputField": "
    email
    name"
    }
    ]

    }
    ]
    }

    Conclusion

    Approach 1 

    Approach 2 is more flexible than approach 1 in terms of what error classes can have as attributes. 

    }
    ]
    }

    Conclusion

    There are 2 contracts in this design. Programmatic contract between data pipeline app and plugins and another between data pipeline app and UI. Approach 2 does not introduce concept of failure type. This means that contract with UI will be based on the cause attributes rather than the type. This means that if plugins creates a custom failure and uses any of the UI compatible attributes, the UI can still highlight them. Approach 2 also provides association between causes which represents the failure better in case there are multiple causes causing this failure. Hence, Approach 2 is suggested.

    Related Jira

    Jira Legacy
    serverCask Community Issue Tracker
    serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
    keyCDAP-15578

    Test Scenarios

    Test IDTest DescriptionExpected Results

    Related Work

    Releases

    Release 6.1.0

    Related Work

  • Improvements in Error Handling 
  • Pipeline System Application