Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations or schema. For example, the BigQuery sink plugin can have output schema which does not match with underlying BigQuery table. CDAP pipeline developer can use new validation endpoint to validate the stages before deploying the pipeline. In order to fail fast and for better user experience, validation endpoint should return all the validation errors from a given stage when this endpoint is called.
Data pipeline app exposes various error types for plugin validation. In future releases, new error types can be introduced. With current implementation, when plugins with new error types are pushed to hub, data pipeline artifacts need to be updated for every new type of error that is introduced. This is because the validation errors are defined in the data pipeline app itself. A better approach would be to modify data pipeline app so that app artifacts do not need to be replaced for every new type of error.
Goals
To fail fast and for better user experience, introduce a new api to collect multiple validation error messages from a stage at configure time
- Decouple validation error types from data pipeline app
Instrument plugins to use this api to return multiple error messages for validation endpoint
User Stories
- As a CDAP pipeline developer, when I validate a stage, I expect that all the invalid config properties and input/output schema fields are highlighted on CDAP UI with appropriate error message and corrective action.
- As a plugin developer, I should be able to capture all the validation errors while configuring the plugin so that all the validation errors can be surfaced on CDAP UI.
- As a plugin developer, I should be able to use new validation error types without replacing data pipeline app artifacts.
API Changes for Plugin Validation
Collect Multiple errors from plugins
To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. Current implementation does not expose stage name to the plugin in configurePipeline method. Stage name will be needed by the plugins to create stage specific errors. For that, stage name will be exposed to plugins through stage configurer as below.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface StageConfigurer { ... /** * get the stage name. * @return stage name */ String getStageName(); |
Decouple plugin error types from data pipeline app
Approach - 1
To carry error information, a new ValidationFailure class is introduced to collect multiple validation failures in stage configurer. This class can be built using a ValidationFailureBuilder which only allows string properties. The builder expose methods to get message, type and properties of a failure. The validation failures are collected using ValidationException. Using this validation exception whenever plugin has an invalid property that is tied to another invalid property, plugin can throw a validation exception with all the errors collected so far. This keep plugin validation code much simpler.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Validation failure. */ @Beta public class ValidationFailure { private final String message; private final String type; private final String correctiveAction; private final Map<String, Object> properties; ValidationFailure(String message, String type, String correctiveAction, Map<String, Object> map) { this.message = message; this.type = type; this.correctiveAction = correctiveAction; this.properties = map; } public static ValidationFailure.Builder builder(String message, String type, String correctiveAction) { return new ValidationFailure.Builder(message, type, correctiveAction); } public static class Builder { private final String message; private final String type; this.correctiveAction = correctiveAction; private final Map<String, Object> properties; private Builder(String message, String type, String correctiveAction) { this.message = message; this.type = type; this.correctiveAction = correctiveAction; this.properties = new HashMap<>(); } // methods to get various types of properties such as string, schema.. public ValidationFailure.Builder addProperty(String property, String value) { this.properties.put(property, value); return this; } .... public ValidationFailure build() { return new ValidationFailure(message, type, correctiveAction, properties); } } } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents Validation Exception. */ @Beta public class ValidationException extends RuntimeException { private List<ValidationFailure> failures; private ValidationException(List<ValidationFailure> failures) { super(); this.failures = failures; } @Override public String getMessage() { ... } /** * Returns list of failures. */ public List<ValidationFailure> getFailures() { return failures; } public static ValidationException.Builder builder() { return new ValidationException.Builder(); } /** * Builder to build validation exception. */ public static class Builder { private final List<ValidationFailure> failures; private Builder() { this.failures = new ArrayList<>(); } public ValidationException.Builder addFailure(ValidationFailure failure) { failures.add(failure); return this; } /** * Util method to create and add stage validation failure to this exception. * * @param message validation failure message * @param stage stage name * @param correctiveAction suggested action */ public ValidationException.Builder addStageValidationFailure(String message, String stage, String correctiveAction) { ValidationFailure.Builder builder = ValidationFailure.builder(message, "INVALID_STAGE", correctiveAction); builder.addProperty("stage", stage); builder.addProperty("correctiveAction", correctiveAction); failures.add(builder.build()); return this; } // util methods for other validation failures ... /** * Build and throw validation exception. This method can be used by plugins to throw an exception at any point * during validation. The method will build and throw ValidationException if there are any failures added while * building the exception * * @return Validation exception */ public ValidationException buildAndThrow() { if (failures.isEmpty()) { return new ValidationException(failures); } throw new ValidationException(failures); } } } |
API usage in plugins
Code Block |
---|
@Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class); StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); // get the name of the stage String stageName = stageConfigurer.getStageName(); ValidationException.Builder exceptionBuilder = ValidationException.builder(); try { Pattern.compile(conf.filterRegex); } catch (Exception e) { // add validation failure to stage configurer exceptionBuilder.addValidationFailure(e.getMessage(), stageName, "filterRegex", "Make sure the file regex is correct")); } if (conf.sourceFileset.equals(conf.destinationFileset)) { // add validation failure to stage configurer exceptionBuilder.addValidationFailure("source and destination filesets must be different", stageName, "Provide different source and destination filesets")); } exceptionBuilder.buildAndThrow(); } |
Approach - 2
Validation error represents an error with various causes with different attributes for each cause. For example, when the input schema field type does not match the underlying sink schema, the cause is input field mismatch with attributes such as stage name, field name, suggested type etc. Each error message can be associated to more than one causes. This can happen for plugins such as joiner and splitter where there are multiple input or output schemas from a given stage. For example, when input schemas for joiner are not compatible, the causes will include mismatching fields from input schemas of incoming stages. This means that a validation error can be represented as a list of causes where each cause is a map of cause attribute to its value as shown below.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents failure that occurred during validation. */ @Beta public class ValidationFailure { private final String message; protected final List<Map<String, Object>> causes; /** * Creates a validation failure with a message and empty map of causes * @param message */ public ValidationFailure(String message) { this.message = message; this.causes = new ArrayList<>(); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } ValidationFailure that = (ValidationFailure) o; return message.equals(that.message) && causes.equals(that.causes); } @Override public int hashCode() { return Objects.hash(message, causes); } } |
All the attributes of a cause can be tracked at central location as below:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Failure attributes. */ public enum FailureAttributes { STAGE("stage"), // represents stage being validated PROPERTY("property"), // represents stage property INPUT_FIELD("inputField") // represents field in the input schema OUTPUT_FIELD("outputField") // represents field in the output schema OUTPUT_PORT("outputPort"), // represents output port for plugins such as SplitterTransform where multiple output schemas are expected INPUT_STAGE("inputStage"), // represents input stage for plugins such as Joiner where multiple input schemas are expected .. private String name; FailureAttributes(String name) { this.name = name; } } |
Introduced Errors
With this approach following error classes can be added to hydrator-common which represents specific type of errors.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents failure that occurred during stage validation. */ @Beta public class InvalidStageFailure extends ValidationFailure { /** * Creates validation failure that occurred during stage validation. * @param message failure message * @param stage name of the stage that caused this validation failure */ public InvalidStageFailure(String message, String stage) { super(message); causes.add(Collections.singletonMap("stage", stage)); } } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents failure that occurred during stage config property validation. */ @Beta public class InvalidStagePropertyFailure extends ValidationFailure { /** * Creates validation failure that occurred during stage validation. * @param message failure message * @param stage name of the stage that caused this validation failure * @param property property that is invalid */ public InvalidStageFailure(String message, String stage, String property) { super(message); Map<String, Object> map = new HashMap<>(); map.put("stage", stage); map.put("property", property); causes.add(map); } /** * Creates validation failure that occurred during stage validation. * @param message failure message * @param stage name of the stage that caused this validation failure * @param properties properties that is caused this failure */ public InvalidStageFailure(String message, String stage, String[] properties) { super(message); Map<String, Object> map = new HashMap<>(); for (String property : properties) { map.put("stage", stage); map.put("property", property); } causes.add(map); } } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents invalid input schema failure. */ public class InvalidInputSchemaFailure extends ValidationFailure { /** * Creates invalid input schema failure. * @param message failure message * @param stage name of the stage * @param map map of incoming stage name to field that is invalid. */ public InvalidInputSchemaFailure(String message, String stage, Map<String, String> map) { super(message); for (Map.Entry<String, String> entry : map.entrySet()) { Map<String, Object> causeMap = new HashMap<>(); causeMap.put("stage", stage); causeMap.put("inputStage", entry.getKey()); causeMap.put("inputField", entry.getValue()); causes.add(causeMap); } } } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents invalid output schema failure. */ public class InvalidOutputSchemaFailure extends ValidationFailure { /** * Creates invalid output schema failure. * @param message failure message * @param stage name of the stage * @param map map of output going port name to field that is invalid */ public InvalidOutputSchemaFailure(String message, String stage, Map<String, String> map) { super(message); for (Map.Entry<String, String> entry : map.entrySet()) { Map<String, Object> causeMap = new HashMap<>(); causeMap.put("stage", stage); causeMap.put("outputPort", entry.getKey()); causeMap.put("outputField", entry.getValue()); causes.add(causeMap); } } } |
API usage in plugins
Code Block |
---|
@Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class); StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); // get the name of the stage String stageName = stageConfigurer.getStageName(); try { Pattern.compile(conf.filterRegex); } catch (Exception e) { // add validation error to stage configurer stageConfigurer.addValidationFailure(new InvalidStagePropertyFailure(e.getMessage(), stageName, "filterRegex")); } if (conf.sourceFileset.equals(conf.destinationFileset)) { // add validation error to stage configurer stageConfigurer.addValidationFailure(new InvalidStageFailure("source and destination filesets must be different", stageName)); } } |
Impact on UI
Type | Description | Scenario | Approach - 1 - Json Response | Approach - 2 - Json Response |
---|---|---|---|---|
STAGE_ERROR | Represents validation error while configuring the stage | If there is any error while connecting to sink while getting actual schema | { | { "correctiveAction" : "Make sure correct driver is uploaded.", |
INVALID_PROPERTY | Represents invalid configuration property | If config property value contains characters that are not allowed by underlying source or sink | { | { "correctiveAction" : "Either drop or keep should be empty", |
PLUGIN_NOT_FOUND | Represents plugin not found error for a stage. This error will be added by the data pipeline app | If the plugin was not found. This error will be thrown from the data pipeline app | { | { "correctiveAction" : "Please make sure the 'Mock' plugin is installed.", "pluginId" : "Mock" |
INVALID_INPUT_SCHEMA | Represents invalid schema field in input schema | If the input schemas for joiner plugin is of different types | { | { |
INVALID_OUTPUT_SCHEMA | Represents invalid schema field in output schema | If the output schema for the plugin is not compatible with underlying sink | { | { "correctiveAction" : "Schema should be of type 'string' at output port 'port'", |
Conclusion
There are 2 contracts in this design. Programmatic contract between data pipeline app and plugins and another between data pipeline app and UI. Approach 1 provides well defined programmatic contract between plugins and data pipeline app. Using programmatic apis of Approach 1, its possible to throw the exception with all the collected errors at any point from plugin validation code in case one of the dependent properties are invalid. This makes the plugin validation code much more simpler. Hence, Approach 1 is suggested.
Related Jira
Jira Legacy | ||||||
---|---|---|---|---|---|---|
|