Introduction
CDAP pipeline is composed of various plugins that can be configured by users as CDAP pipelines are being developed. While building CDAP pipelines, pipeline developer can provide invalid plugin configurations or schema. For example, the BigQuery sink plugin can have output schema which does not match with an underlying BigQuery table. CDAP pipeline developer can use use new validation endpoint endpoint to validate the stages before deploying the pipeline. In order to fail fast and for better user experience, validation endpoint should return all the validation errors from a given stage when this endpoint is called.
Data pipeline app exposes various exceptions to plugins so that appropriate exception is thrown while validating the pluginserror types for plugin validation. In future releases, new exception error types can be introduced. When With current implementation, when plugins with new exception error types are pushed to hub, data pipeline artifacts need to be upgraded updated for every new type of exception error that is introduced. This is because the validation exceptions errors are defined in the data pipeline app itself. A better approach would be to modify data pipeline app so that app artifacts do not need to be replaced for every new type of exceptionerror.
Goals
To fail fast and for better user experience, introduce a new api to collect multiple error messages from plugins validation error messages from a stage at configure time
- Decouple various validation exception types error types from data pipeline app
Instrument plugins to use this api to return multiple error messages error messages for validation endpoint
User Stories
- As a CDAP pipeline developer, when I validate a stage, I expect that all the invalid config properties and input/output schema fields are highlighted on CDAP UI with appropriate error messageerror message and corrective action.
- As a plugin developer, I should be able to use new validation exception types without replacing data capture all the validation errors while configuring the plugin so that all the validation errors can be surfaced on CDAP UI.
- As a plugin developer, I should be able to use new validation error types without replacing data pipeline app artifacts.
API Changes for Plugin Validation
Approach 1Collect
multiple Multiple errors from
plugins plugins
To collect multiple stage validation errors from the stage, StageConfigurer, MultiInputStageConfigurer and MultiOutputStageConfigurer can be modified as below. If there are any validation errors added to stage configurer, the pipeline deployment will fail and all the errors will be returned as a response to stage validation REST endpoint.
codeCurrent implementation does not expose stage name to the plugin in configurePipeline method. Stage name will be needed by the plugins to create stage specific errors. For that, stage name will be exposed to plugins through stage configurer as below.
Code Block |
---|
language | java |
---|
title | StageConfigurer.java |
---|
|
public interface StageConfigurer {
...
/**
* addGet validationthe errors for this stage to the configurer if pipelinestage name.
*
* @return stage is invalid.
name
*/
String getStageName();
/**
* Adds *a @paramnew errorvalidation {@linkfailure ValidationException}to whenthe aconfigurer.
pipeline stage*
is invalid* for@param anyfailure reason.a validation failure
*/
void addValidationErroraddValidationFailure(ValidationExceptionValidationFailure errorfailure);
} |
Decoupling exception types from data pipeline app
A new ValidationException is introduced for plugin validation in data pipeline application. It includes the type of the exception and any additional properties for that exception.
Code Block |
---|
language | java |
---|
title | ValidationException.java |
---|
|
/
/**
* RepresentsThrows somevalidation sortexception ofif errorthere thatare occurredany duringfailures validation.that are */added @Betato publicthe classconfigurer ValidationExceptionthrough
extends RuntimeException* {addValidationFailure method.
private*
static final* String@throws TYPEValidationException = "ERROR";
protected final Map<String, String> props = new HashMap<>();
public ValidationException(String message) {
super(message);
}
public ValidationException(String message, Throwable cause) {
super(message, cause);
}
/**
* Returns the type of the validation exception. Type has to be unique for every validation exception.
*/
public String getType() {
return TYPE;
}
/**
* Returns additional properties for the error.
*/
public Map<String, String> getProperties() {
return props;
}
} |
A new CDAP module will be introduced to add new exception types for plugins. Plugins will have this module as compile time dependency which means when a type of exception is added, plugins can leverage it by just installing the module locally. This will allow us to easily add more types of exceptions for plugin validations while removing a need to update data pipeline artifact for each new exception type. Below are some of the exception types that will be added to this module.
Code Block |
---|
/**
* Represents some sort of error that occurred during stage validation.
*/
@Beta
public class InvalidStageException extends ValidationException {
if there are any validation failures being carried by the configurer
*/
void throwIfFailure() throws ValidationException; |
Decouple plugin error types from data pipeline app
Approach - 1
To carry error information, a new ValidationFailure class is introduced to collect multiple validation failures in stage configurer. This class can be built using a ValidationFailureBuilder which only allows string properties. The builder expose methods to get message, type and properties of a failure. The validation failures are collected using ValidationException. Using this validation exception whenever plugin has an invalid property that is tied to another invalid property, plugin can throw a validation exception with all the errors collected so far. This keep plugin validation code much simpler.
Code Block |
---|
language | java |
---|
title | ValidationFailure.java |
---|
|
/**
* Represents an error condition occurred during validation.
*/
@Beta
public class ValidationFailure {
// types of the failures
private static final String STAGE_ERROR = "StageError";
private static final String INVALID_PROPERTY = "InvalidProperty";
private static final String PLUGIN_NOT_FOUND = "PluginNotFound";
private static final String INVALID_INPUT_SCHEMA = "InvalidInputSchema";
private static final String INVALID_OUTPUT_SCHEMA = "InvalidOutputSchema";
// represents stage name in the failure. It is a generic property used in all the failures for a given stage
private static final String STAGE = "stage";
// represents configuration property in InvalidProperty failure
private static final String TYPECONFIG_PROPERTY = "STAGE_ERRORconfigProperty";
public InvalidStageException(String message, String stage) {
// represents plugin id in PluginNotFound failure
private super(message);static final String PLUGIN_ID = props.put("stagepluginId", stage);
}
public InvalidStageException(String message, Throwable cause, String stage) {
super(message, cause);
props.put("stage", stage);
}
public String getType() {
return TYPE;
}
} |
Code Block |
---|
language | java |
---|
title | InvalidStagePropertyException.java |
---|
|
/**
* Represents invalid stage property error that occurred during stage validation.
*/
public class InvalidStagePropertyException extends InvalidStageException {
;
// represents plugin type in PluginNotFound failure
private static final String PLUGIN_TYPE = "pluginType";
// represents plugin name in PluginNotFound failure
private static final String PLUGIN_NAME = "pluginName";
// represents a field in InvalidInputSchema or InvalidOutputSchema failure
private static final String FIELD = "field";
// represents input stage in InvalidInputSchema failure
private static final String TYPEINPUT_STAGE = "INVALID_FIELDinputStage";
// represents publicoutput InvalidStagePropertyException(String message, String stage, String property) {
super(message, stage)port in InvalidOutputSchema failure
private static final String OUTPUT_PORT = "outputPort";
private props.put("property", property)final String message;
}private final String type;
public InvalidStagePropertyException(String message,private Throwablefinal cause, String stagecorrectiveAction;
private final Map<String, StringObject> property)properties;
{
private ValidationFailure(String super(message, String type, @Nullable String cause, stage);correctiveAction,
props.put("property", property); } publicMap<String, StringObject> getType(properties) {
this.message return= TYPEmessage;
} }
|
Sources and sinks can have schema mismatch with underlying storage. A new type of exception can be introduced so that invalid schema fields can be highlighted when schema mismatch occurs:
Code Block |
---|
language | java |
---|
title | InvalidSchemaFieldException.java |
---|
|
/**
* Represents schema mismatch that occurred during stage validation.
*/
@Beta
public class InvalidSchemaFieldException extends InvalidStageException {
private static final String TYPE = "INVALID_SCHEMA";
public InvalidSchemaFieldException(String message, String stage, String field) {
super(message, stage);
props.put("field", field);
}
public InvalidSchemaFieldException(String message, Throwable causethis.type = type;
this.correctiveAction = correctiveAction;
this.properties = properties;
}
/**
* Creates a stage validation failure.
*
* @param message validation failure message
* @param stage stage name
* @param correctiveAction corrective action
*/
public static ValidationFailure createStageFailure(String message, String stage, @Nullable String fieldcorrectiveAction) {
superBuilder builder = builder(message, cause, stageSTAGE_ERROR);
props.put("field", fieldbuilder.setCorrectiveAction(correctiveAction).addProperty(STAGE, stage);
} public String getTypereturn builder.build();
{ }
return/**
TYPE; }
} |
Below is sample usage of these exceptions in the plugins:
Code Block |
---|
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
try {
Pattern.compile(conf.filterRegex);
} catch (Exception e) {
stageConfigurer.addValidationError(new InvalidStagePropertyException(e.getMessage(), "filterRegex"));
}
if (conf.sourceFileset.equals(conf.destinationFileset)) {
stageConfigurer.addValidationError(new ValidationException("source and destination filesets must be different"));
}
} |
Approach 2
In Approach 1 the contract of getProperties() method is to return a Map<String, String>. For example, stage validation exception can include stage → stage_name as a key value pair in the properties map. This map will be included in the json response of validation endpoint along with type and message. This contract limits the type of properties that can be added to the json response. For example, when pipeline validation endpoint is implemented, the additional properties of an error may include non-string objects.
Another option is to return a StructuredRecord with additional properties. While StructuredRecord does not have any limitation on type of properties that can be returned in the json response, each specific exception will have to create a StructuredRecord with additional properties. Because of the limitation of above contract, using StructuredRecord to get exception properties is a better approach.
Code Block |
---|
language | java |
---|
title | ValidationException.java |
---|
|
/**
* Represents some sort of error that occurred during validation.
*/
@Beta
public class ValidationException extends RuntimeException {
...
/**
* Returns additional properties for the error as a {@link StructuredRecord}. If there are no additional properties
* for this error, then return null.
*/
@Nullable
public StructuredRecord getProperties() {
return null;
}
} |
Below are the corresponding specific exception types that extends ValidationException:
Code Block |
---|
language | java |
---|
title | InvalidStageException.java |
---|
|
/**
* Represents some sort of error that occurred during stage validation.
*/
@Beta
public class InvalidStageException extends ValidationException {
private static final String TYPE = "STAGE_ERROR";
private final Schema schema;
private final StructuredRecord record;
public InvalidStageException(String message, String stage) {
super(message);
schema = Schema.recordOf("StageError", Schema.Field.of("stage", Schema.of(Schema.Type.STRING)));
record = StructuredRecord.builder(schema).set("stage", stage).build();
}
public InvalidStageException(String message, Throwable cause, String stage) {
super(message, cause);
schema = Schema.recordOf("StageError", Schema.Field.of("stage", Schema.of(Schema.Type.STRING)));
record = StructuredRecord.builder(schema).set("stage", stage).* Creates a config property validation failure.
*
* @param message validation failure message
* @param stage stage name
* @param correctiveAction corrective action
*/
public static ValidationFailure createConfigPropertyFailure(String message, String stage,
String property, @Nullable String correctiveAction) {
Builder builder = builder(message, INVALID_PROPERTY);
builder.setCorrectiveAction(correctiveAction)
.addProperty(STAGE, stage).addProperty(CONFIG_PROPERTY, property);
return builder.build();
}
/**
* Creates a plugin not found validation failure.
*
* @param message validation failure message
* @param stage stage name
* @param correctiveAction corrective action
*/
public static ValidationFailure createPluginNotFoundFailure(String message, String stage,
String pluginId, String pluginName, String pluginType,
@Nullable String correctiveAction) {
Builder builder = builder(message, PLUGIN_NOT_FOUND);
builder.setCorrectiveAction(correctiveAction)
.addProperty(STAGE, stage).addProperty(PLUGIN_ID, pluginId)
.addProperty(PLUGIN_TYPE, pluginType)
.addProperty(PLUGIN_NAME, pluginName);
return builder.build();
}
@Override/**
public String* getType()Creates {a invalid input schema failure.
return TYPE; *
} * @Nullable@param message validation @Overridefailure message
public StructuredRecord* getProperties()@param {stage stage name
return record;* @param field }
} |
Code Block |
---|
language | java |
---|
title | InvalidSchemaFieldException |
---|
|
/**
* Represents schema mismatch that occurred during stage validation.
*/
@Beta
public class InvalidSchemaFieldException extends ValidationException {
private static final String TYPE = "INVALID_SCHEMA";
private final Schema schema;
private final StructuredRecord record;
public InvalidSchemaFieldExceptioninput schema field
* @param inputStage optional input stagename. This is applicable to plugins of type {@link Joiner}.
* @param correctiveAction optional corrective action
* @return invalid input schema validation failure
*/
public static ValidationFailure createInputSchemaFailure(String message, String stage, String field) {,
super(message); schema = Schema.recordOf("InvalidSchema", Schema.Field.of("stage", Schema.of(Schema.Type.STRING)), @Nullable String inputStage,
Schema.Field.of("field", Schema.of(Schema.Type.STRING))); record = StructuredRecord.builder(schema) .set("stage", stage) .set("field", field) .build(); } public InvalidSchemaFieldException(String message, Throwable cause, String stage,@Nullable String fieldcorrectiveAction) {
super(message, cause);...
}
schema = Schema.recordOf("InvalidSchema",/**
* Creates a invalid output schema failure.
*
* @param message validation failure message
* @param stage Schema.Field.of("stage", Schema.of(Schema.Type.STRING)),stage name
* @param field output schema field
* @param outputPort optional output port. This is applicable to plugins of type {@link SplitterTransform}.
Schema.Field.of("field", Schema.of(Schema.Type.STRING)));
record = StructuredRecord.builder(schema)
.set("stage", stage)
.set("field", field)
.build();
}
@Override
public String getType() {
return TYPE;
}
@Nullable
@Override
public StructuredRecord getProperties() {
return record;
}
} |
Code Block |
---|
/**
* Represents invalid stage property error that occurred during stage validation.
*/
public class InvalidStagePropertyException extends ValidationException {
private static final String TYPE = "INVALID_FIELD";
private final Schema schema;
private final StructuredRecord record;
public InvalidStagePropertyException(String message, String stage, String property) {
super(message);
schema = Schema.recordOf("InvalidField",* @param correctiveAction optional corrective action
* @return invalid output schema validation failure
*/
public static ValidationFailure createOutputSchemaFailure(String message, String stage, String field,
@Nullable String outputPort,
@Nullable String correctiveAction) {
....
}
/**
* Schema.Field.of("stage", Schema.of(Schema.Type.STRING)),
Returns a builder for creating a {@link ValidationFailure}.
*/
public static Builder builder(String message, String type) {
return new Builder(message, type);
}
Schema.Field.of("property", Schema.of(Schema.Type.STRING)));
/**
record* =A StructuredRecord.builder(schema).set("stage", stage).set("property", property).build();
}
builder to create {@link ValidationFailure} instance.
*/
public static InvalidStagePropertyException(String message, Throwable cause, String stage, String property) {
super(message, cause);
schema = Schema.recordOf("InvalidField",
class Builder {
private final String message;
private final String type;
private String correctiveAction;
private final Map<String, Object> properties;
private Builder(String message, String type) {
Schema.Field.of("stage", Schema.of(Schema.Type.STRING)),
this.message = message;
this.type = type;
this.properties = new HashMap<>();
}
/**
* Sets corrective action to rectify the Schema.Field.of("property", Schema.of(Schema.Type.STRING)));failure.
*
record = StructuredRecord.builder(schema).set("stage", stage).set("property", property).build();
}
@Override
public String getType() {
return TYPE;
}
@Nullable
@Override
public StructuredRecord getProperties() { * @param correctiveAction corrective action
* @return this builder
*/
public Builder setCorrectiveAction(String correctiveAction) {
this.correctiveAction = correctiveAction;
return recordthis;
}
} |
Conclusion
Approach 2 provides better representation of exception properties and does not impose any limitation on type of properties.
Impact on UI
UI should be able to handle new error types that are introduced. For example, for invalid stage properties, UI should highlight all the invalid properties for a given stage. For schema mismatch, UI should be able to highlight schema fields that are mismatching.
Below are the responses to the validation endpoint for each type of exception:
Type | Description | Json Response |
---|
STAGE_ERROR | Represents validation error while configuring the stage | {
"type" : "STAGE_ERROR" ,
"stage" : "stageName" ,
"message" : "Validation error message"
}
|
INVALID_FIELD | Represents invalid configuration property | {
"type" : "INVALID_FIELD",
"stage" : "stageName",
"message" : "Invalid config error message", "property" : "propertyName" } |
INVALID_SCHEMA | Represents invalid schema field | {
"type" : "INVALID_FIELD",
"stage" : "stageName",
"message" : "Invalid schema error message", "field" : "fieldName" } |
Test Scenarios
Test ID | Test Description | Expected Results |
---|
Releases
Release 6.1.0
Future work
/**
* Adds a property to the failure.
*
* @param property the name of the property
* @param value the value of the property
* @return this builder
*/
public Builder addProperty(String property, String value) {
this.properties.put(property, value);
return this;
}
/**
* Creates a new instance of {@link ValidationFailure}.
*
* @return instance of {@link ValidationFailure}
*/
public ValidationFailure build() {
return new ValidationFailure(message, type, correctiveAction,
Collections.unmodifiableMap(new HashMap<>(properties)));
}
}
/**
* Returns message of this failure.
*/
public String getMessage() {
return message;
}
/**
* Returns type of this failure.
*/
public String getType() {
return type;
}
/**
* Returns corrective action for this failure.
*/
@Nullable
public String getCorrectiveAction() {
return correctiveAction;
}
/**
* Returns properties of this failure.
*/
public Map<String, Object> getProperties() {
return properties;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ValidationFailure that = (ValidationFailure) o;
return message.equals(that.message) && type.equals(that.type) &&
Objects.equals(correctiveAction, that.correctiveAction) &&
properties.equals(that.properties);
}
@Override
public int hashCode() {
return Objects.hash(message, type, correctiveAction, properties);
}
@Override
public String toString() {
return "ValidationFailure{" +
"message='" + message + '\'' +
", type='" + type + '\'' +
", correctiveAction='" + correctiveAction + '\'' +
", properties=" + properties +
'}';
}
} |
Code Block |
---|
language | java |
---|
title | ValidationException.java |
---|
|
/**
* Validation exception that carries multiple validation failures.
*/
@Beta
public class ValidationException extends RuntimeException {
private List<ValidationFailure> failures;
public ValidationException(List<ValidationFailure> failures) {
super(failures.isEmpty() ? "Validation Exception occurred." : failures.iterator().next().getMessage());
this.failures = failures;
}
/**
* Returns list of failures.
*/
public List<ValidationFailure> getFailures() {
return failures;
}
} |
API usage in plugins
Code Block |
---|
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
// get the name of the stage
String stageName = stageConfigurer.getStageName();
try {
Pattern.compile(conf.filterRegex);
} catch (Exception e) {
// add validation failure to stage configurer
stageConfigurer.addValidationFailure(ValidationFailure.createConfigPropertyFailure(e.getMessage(), stageName, "filterRegex", "Make sure the file regex is correct"));
// plugin can choose to terminate processing here
stageConfigurer.throwIfFailure();
}
if (conf.sourceFileset.equals(conf.destinationFileset)) {
// add validation failure to stage configurer
stageConfigurer.addValidationFailure(ValidationFailure.createStageFailure("source and destination filesets must be different", stageName, "Provide different source and destination filesets"));
}
} |
Approach - 2
Validation error represents an error with various causes with different attributes for each cause. For example, when the input schema field type does not match the underlying sink schema, the cause is input field mismatch with attributes such as stage name, field name, suggested type etc. Each error message can be associated to more than one causes. This can happen for plugins such as joiner and splitter where there are multiple input or output schemas from a given stage. For example, when input schemas for joiner are not compatible, the causes will include mismatching fields from input schemas of incoming stages. This means that a validation error can be represented as a list of causes where each cause is a map of cause attribute to its value as shown below.
Code Block |
---|
language | java |
---|
title | FailureCollector.java |
---|
|
/**
* Failure collector is responsible to collect {@link ValidationFailure}s.
*/
@Beta
public interface FailureCollector {
/**
* Add a validation failure to this failure collector. The method returns the validation failure that was added to
* the failure collector. This failure can be used to add additional {@link ValidationFailure.Cause}s.
* For example,
* <code>failureCollector.addFailure("message", "action").withConfigProperty("configProperty");</code>
*
* @param message failure message
* @param correctiveAction corrective action
* @return a validation failure
* @throws UnsupportedOperationException if the implementation does not override this method
*/
default ValidationFailure addFailure(String message, @Nullable String correctiveAction) {
throw new UnsupportedOperationException("Adding a failure is not supported.");
}
/**
* Throws validation exception if there are any failures that are added to the failure collector through
* {@link #addFailure(String, String)}.
* If no failures are added to the collector, it will return a {@link ValidationException} with empty failure list.
*
* <pre>
* String someMethod() {
* switch (someVar) {
* // cases
* }
* // if control comes here, it means failure
* failureCollector.addFailure(...);
* // throw validation exception so that compiler knows that exception is being thrown which eliminates the need to
* // have a statement that returns null towards the end of this method
* throw failureCollector.getOrThrowException();
* }
* </pre>
*
* @return returns a {@link ValidationException} if no failures were added to the collector
* @throws ValidationException exception indicating validation failures
* @throws UnsupportedOperationException if the implementation does not override this method
*/
default ValidationException getOrThrowException() throws ValidationException {
throw new UnsupportedOperationException("Throwing failures is not supported.");
}
} |
Code Block |
---|
public interface StageConfigurer {
....
/**
* Returns a failure collector for the stage.
*
* @return a failure collector
* @throws UnsupportedOperationException if the implementation does not override this method
*/
default FailureCollector getFailureCollector() {
throw new UnsupportedOperationException("Getting failure collector is not supported.");
}
}
|
Code Block |
---|
language | java |
---|
title | ValidationException.java |
---|
|
/**
* Validation exception that carries multiple validation failures.
*/
@Beta
public class ValidationException extends RuntimeException {
private final List<ValidationFailure> failures;
/**
* Creates a validation exception with list of failures.
*
* @param failures list of validation failures
*/
public ValidationException(List<ValidationFailure> failures) {
super("Errors were encountered during validation.");
this.failures = Collections.unmodifiableList(new ArrayList<>(failures));
}
/**
* Returns a list of validation failures.
*/
public List<ValidationFailure> getFailures() {
return failures;
}
} |
Code Block |
---|
language | java |
---|
title | ValidationFailure.java |
---|
|
/**
* Represents a failure condition occurred during validation.
*/
@Beta
public class ValidationFailure {
private static final Gson GSON = new Gson();
private final String message;
private final String correctiveAction;
private final List<Cause> causes;
/**
* Creates a validation failure with provided message.
*
* @param message validation failure message
*/
public ValidationFailure(String message) {
this(message, null);
}
/**
* Creates a validation failure with provided message and corrective action.
*
* @param message validation failure message
* @param correctiveAction corrective action
*/
public ValidationFailure(String message, @Nullable String correctiveAction) {
this.message = message;
this.correctiveAction = correctiveAction;
this.causes = new ArrayList<>();
}
/**
* Adds provided cause to this validation failure.
*
* @param cause cause of validation failure
* @return validation failure with provided cause
*/
public ValidationFailure withCause(Cause cause) {
causes.add(cause);
return this;
}
/**
* Adds cause attributes that represents plugin not found failure cause.
*
* @param pluginId plugin id
* @param pluginName plugin name
* @param pluginType plugin type
* @return validation failure with plugin not found cause
*/
public ValidationFailure withPluginNotFound(String pluginId, String pluginName, String pluginType) {
return withPluginNotFound(pluginId, pluginName, pluginType, null, null);
}
/**
* Adds cause attributes that represents plugin not found failure cause.
*
* @param pluginId plugin id
* @param pluginName plugin name
* @param pluginType plugin type
* @param requestedArtifact requested artifact
* @param suggestedArtifact suggested artifact
* @return validation failure with plugin not found cause
*/
public ValidationFailure withPluginNotFound(String pluginId, String pluginName, String pluginType,
@Nullable ArtifactId requestedArtifact,
@Nullable ArtifactId suggestedArtifact) {
Cause cause = new Cause().addAttribute(CauseAttributes.PLUGIN_ID, pluginId)
.addAttribute(CauseAttributes.PLUGIN_NAME, pluginName)
.addAttribute(CauseAttributes.PLUGIN_TYPE, pluginType);
if (requestedArtifact != null) {
cause.addAttribute(CauseAttributes.REQUESTED_ARTIFACT_NAME, requestedArtifact.getName());
cause.addAttribute(CauseAttributes.REQUESTED_ARTIFACT_SCOPE, requestedArtifact.getScope().name());
cause.addAttribute(CauseAttributes.REQUESTED_ARTIFACT_VERSION, requestedArtifact.getVersion().getVersion());
}
if (suggestedArtifact != null) {
cause.addAttribute(CauseAttributes.SUGGESTED_ARTIFACT_NAME, suggestedArtifact.getName());
cause.addAttribute(CauseAttributes.SUGGESTED_ARTIFACT_SCOPE, suggestedArtifact.getScope().name());
cause.addAttribute(CauseAttributes.SUGGESTED_ARTIFACT_VERSION, suggestedArtifact.getVersion().getVersion());
}
causes.add(cause);
return this;
}
/**
* Adds cause attributes that represents invalid stage configure property failure cause.
*
* @param stageConfigProperty stage config property
* @return validation failure with invalid stage config property cause
*/
public ValidationFailure withConfigProperty(String stageConfigProperty) {
causes.add(new Cause().addAttribute(CauseAttributes.STAGE_CONFIG, stageConfigProperty));
return this;
}
/**
* Adds cause attributes for failure cause that represents an invalid element in the list associated with given stage
* configure property.
*
* @param stageConfigProperty stage config property
* @param element element in the list associated by a given stageConfigProperty
* @return validation failure with invalid stage config property element cause
*/
public ValidationFailure withConfigElement(String stageConfigProperty, String element) {
causes.add(new Cause().addAttribute(CauseAttributes.STAGE_CONFIG, stageConfigProperty)
.addAttribute(CauseAttributes.CONFIG_ELEMENT, element));
return this;
}
/**
* Adds cause attributes that represents invalid input schema field failure cause.
*
* @param fieldName name of the input schema field
* @param inputStage stage name
* @return validation failure with invalid input schema field cause
*/
public ValidationFailure withInputSchemaField(String fieldName, @Nullable String inputStage) {
Cause cause = new Cause().addAttribute(CauseAttributes.INPUT_SCHEMA_FIELD, fieldName);
cause = inputStage == null ? cause : cause.addAttribute(CauseAttributes.INPUT_STAGE, inputStage);
causes.add(cause);
return this;
}
/**
* Adds cause attributes that represents invalid output schema field failure cause.
*
* @param fieldName name of the output schema field
* @param outputPort stage name
* @return validation failure with invalid output schema field cause
*/
public ValidationFailure withOutputSchemaField(String fieldName, @Nullable String outputPort) {
Cause cause = new Cause().addAttribute(CauseAttributes.OUTPUT_SCHEMA_FIELD, fieldName);
cause = outputPort == null ? cause : cause.addAttribute(CauseAttributes.OUTPUT_PORT, outputPort);
causes.add(cause);
return this;
}
/**
* Adds cause attributes that represents a stacktrace.
*
* @param stacktraceElements stacktrace for the error
* @return validation failure with stacktrace
*/
public ValidationFailure withStacktrace(StackTraceElement[] stacktraceElements) {
causes.add(new Cause().addAttribute(CauseAttributes.STACKTRACE, GSON.toJson(stacktraceElements)));
return this;
}
/**
* Returns failure message.
*/
public String getMessage() {
return message;
}
/**
* Returns corrective action for this failure.
*/
@Nullable
public String getCorrectiveAction() {
return correctiveAction;
}
/**
* Returns causes that caused this failure.
*/
public List<Cause> getCauses() {
return causes;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ValidationFailure failure = (ValidationFailure) o;
return message.equals(failure.message) &&
Objects.equals(correctiveAction, failure.correctiveAction) && causes.equals(failure.causes);
}
@Override
public int hashCode() {
return Objects.hash(message, correctiveAction, causes);
}
/**
* Represents a cause of a failure.
*/
@Beta
public static class Cause {
private final Map<String, String> attributes;
/**
* Creates a failure cause.
*/
public Cause() {
this.attributes = new HashMap<>();
}
/**
* Adds an attribute to this cause.
*
* @param attribute cause attribute name
* @param value cause attribute value
* @return this cause
*/
public Cause addAttribute(String attribute, String value) {
attributes.put(attribute, value);
return this;
}
/**
* Returns value of the provided cause attribute.
*
* @param attribute attribute name
*/
public String getAttribute(String attribute) {
return attributes.get(attribute);
}
/**
* Returns all the attributes of the cause.
*/
public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(new HashMap<>(attributes));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Cause cause = (Cause) o;
return attributes.equals(cause.attributes);
}
@Override
public int hashCode() {
return Objects.hash(attributes);
}
}
} |
All the attributes of a cause can be tracked at central location as below:
Code Block |
---|
language | java |
---|
title | CauseAttributes.java |
---|
|
/**
* Cause attributes constants.
*/
@Beta
public class CauseAttributes {
// Represents stage configuration property failure
public static final String STAGE_CONFIG = "stageConfig";
// Represents an element in the list of elements associated with a stage config property.
// For example, in projection transform, config property 'keep' represents a list of input fields to keep. Below
// cause attribute can be used to represent an invalid field in 'keep' config property
public static final String CONFIG_ELEMENT = "configElement";
// Represents id of the plugin
public static final String PLUGIN_ID = "pluginId";
// Represents type of the plugin
public static final String PLUGIN_TYPE = "pluginType";
// Represents name of the plugin
public static final String PLUGIN_NAME = "pluginName";
// Represents requested artifact name
public static final String REQUESTED_ARTIFACT_NAME = "requestedArtifactName";
// Represents requested artifact name
public static final String REQUESTED_ARTIFACT_VERSION = "requestedArtifactVersion";
// Represents requested artifact scope
public static final String REQUESTED_ARTIFACT_SCOPE = "requestedArtifactScope";
// Represents suggested artifact name
public static final String SUGGESTED_ARTIFACT_NAME = "suggestedArtifactName";
// Represents suggested artifact name
public static final String SUGGESTED_ARTIFACT_VERSION = "suggestedArtifactVersion";
// Represents suggested artifact scope
public static final String SUGGESTED_ARTIFACT_SCOPE = "suggestedArtifactScope";
// Represents input stage
public static final String INPUT_STAGE = "inputStage";
// represents field of input stage schema
public static final String INPUT_SCHEMA_FIELD = "inputField";
// Represents a stage output port
public static final String OUTPUT_PORT = "outputPort";
// Represents a field of output stage schema
public static final String OUTPUT_SCHEMA_FIELD = "outputField";
// Represents a stacktrace
public static final String STACKTRACE = "stacktrace";
} |
API usage in plugins
Code Block |
---|
|
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
pipelineConfigurer.createDataset(conf.destinationFileset, FileSet.class);
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
try {
Pattern.compile(conf.filterRegex);
} catch (Exception e) {
collector.addFailure("Error encountered while compiling filter regex: " + e.getMessage(),
"Make sure filter regex is valid.").withConfigProperty("filterRegex");
}
if (conf.sourceFileset.equals(conf.destinationFileset)) {
collector.addFailure("Source and destination filesets must be different",
"Make sure source and destination filesets are different")
.withConfigProperty("sourceFileset").withConfigProperty("destinationFileset");
}
} |
Impact on UI
Type | Description | Scenario | Approach - 1 - Json Response | Approach - 2 - Json Response |
---|
StageError | Represents validation error while configuring the stage | If there is any error while connecting to sink while getting actual schema | { "failures": [ { "type": "StageError", "message": "Could not load jdbc driver class.", "correctiveAction": "Make sure correct driver is available.", "properties": { "stage": "src" } } ] }
| { "errors": [ { "message": "Could not load jdbc driver class.", "correctiveAction" : "Make sure correct driver is available.", "causes": [ { "stage": "src" } ] } ] } |
InvalidProperty | Represents invalid configuration property | If config property value contains characters that are not allowed by underlying source or sink | { "failures": [ { "type": "InvalidProperty", "message": "Property 'millis' should be more than 0.", "correctiveAction": "Make sure 'millis' is greater than 0.", "properties": { "stage": "transform", "configProperty": "millis" } } ] } | { "errors": [ { "message": "Property 'millis' should be more than 0.", "correctiveAction" : "Make sure 'millis' is greater than 0.", "causes": [ { "stage": "transform", "stageConfig": "millis" } ] } ] } |
PluginNotFound | Represents plugin not found error for a stage. This error will be added by the data pipeline app | If the plugin was not found. This error will be thrown from the data pipeline app | { "failures": [ { "type": "PluginNotFound", "message": "Plugin named 'Mock' of type 'batchsource' not found.", "correctiveAction": "Please make sure the 'Mock' plugin is installed.", "properties": { "stage": "src", "pluginType": "batchsource", "pluginName": "Mock", "pluginId": "Mock" } } ] } | { "errors": [ { "message": "Plugin named 'Mock' of type 'batchsource' not found.", "correctiveAction" : "Please make sure the 'Mock' plugin is installed.", "causes": [ { "stage": "src", "pluginType": "batchsource", "pluginName": "Mock", "pluginId" : "Mock" } ] } ] } |
InvalidInputSchema | Represents invalid schema field in input schema | If the input schemas for joiner plugin is of different types | { "failures": [ { "type": "InvalidInputSchema", "message": "Invalid schema field 'id'. Different types of join keys found in source1 and source2.", "correctiveAction": "Type of join keys from source1 and source2 must be of same type string", "properties": { "stage": "joiner", "field": "id", "inputStage": "source1" } }, { "type": "InvalidInputSchema", "message": "Invalid schema field 'id'. Different types of join keys found in source1 and source2.", "correctiveAction": "Type of join keys from source1 and source2 must be of same type string", "properties": { "stage": "joiner", "field": "id", "inputStage": "source2" } } ] } | { "failures": [ { "message": "Different types of join keys found.", "correctiveAction" : "Type of join keys from all the sources must be same.", "causes": [ { "stage": "joiner", "joinKey": "source1.id,source2.id" }, { "stage": "joiner", "inputStage": "source1", "inputField": "id" }, { "stage": "joiner", "inputStage": "source2", "inputField": "id" } ] } ] } |
InvalidOutputSchema | Represents invalid schema field in output schema | If the output schema for the plugin is not compatible with underlying sink | { "failures": [ { "type": "InvalidOutputSchema", "message": "Invalid schema field 'email'.", "correctiveAction": "Schema should be of type 'string' at output port 'port'", "properties": { "stage": "splitter", "field": "email", "outputPort": "port" } } ] } | { "errors": [ { "message": "Invalid schema field 'email'.", "correctiveAction" : "Schema should be of type 'string' at output port 'port'", "causes": [ { "stage": "splitter", "outputPort": "port", "outputField": "email" } ] } ] } |
InvalidFieldInProperty | Represents an invalid field in property list | If the property represents list of fields, the failure should include the property name along with invalid field | { "failures": [ { "type": "InvalidFieldInProperty", "message": "Unique field 'name' does not exist in the input schema.", "correctiveAction": "Make sure 'name' field is a correct field.", "properties": { "stage": "Deduplicate", "field": "uniqueFields", "configElement": "name" } } ] }
| { "errors": [ { "message": "Unique field 'name' does not exist in the input schema.", "correctiveAction" : "Make sure 'name' field is a correct field.", "causes": [ { "stage": "Deduplicate", "outputPort": "uniqueFields", "outputField": "name" } ] } ] } |
Conclusion
There are 2 contracts in this design. Programmatic contract between data pipeline app and plugins and another between data pipeline app and UI. Approach 2 does not introduce concept of failure type. This means that contract with UI will be based on the cause attributes rather than the type. This means that if plugins creates a custom failure and uses any of the UI compatible attributes, the UI can still highlight them. Approach 2 also provides association between causes which represents the failure better in case there are multiple causes causing this failure. Hence, Approach 2 is suggested.
Jira Legacy |
---|
server | Cask Community Issue Tracker |
---|
serverId | 45b48dee-c8d6-34f0-9990-e6367dc2fe4b |
---|
key | CDAP-15578 |
---|
|
Releases
Release 6.1.0