Validator Transform

User Stories

A user wants to create a pipeline that reads from a source (in batch or realtime) and runs validation to make sure data is as expected.  If there is data that is invalid, it should be written to a dataset to be examined later.  If data is valid, it should be passed on to the next stage.

For example, a user wants to create a pipeline that reads log files in batch and validates that the ‘date’ field is an actual date, the ‘request’ field is a valid url, and the response size is below 1mb.  For valid data, he wants to perform some other transforms before writing to another fileset.  For invalid data, he wants to store it in an error dataset called “invalidlogs”.  

First the user wants to find out if there are validation functions that meets his needs. He makes a call to get plugins of type ‘validator’ for the etl-batch artifact:

 

 

GET /artifacts/etl-batch/versions/3.2.0/extensions/validator
[
 {
   "name": "apache",
   "description": "Apache validator functions from org.apache.commons.validator.routines package. Includes isDate(), isURL(), ...",
   "className": "co.cask.cdap.etl.validator.ApacheFunctions"
 },
 {
   "name": "guava",
   "description": "Guava methods",
   "className": "co.cask.cdap.etl.validator.GuavaFunctions"
 },
 ...
]

 

Based on the validation functions available, the user creates an etl-batch app:


PUT /apps/logfileValidator
{
 "artifact": {
   "name": "etl-batch",
   "version": "3.2.0"
 },
 "config": {
   "source": { ... },
   "transform": [
     {
       "name": "validator",
       "errorDataset": "invalidlogs",
       "properties": {
         "validators": "apache,guava" // UI can populate based on rest call
         "validationScript": "function isValid(input) {
           var isValid = true;
           var errMsg = "";
           var errCode = 0;
           if (!apacheValidator.isDate(input.date)) {
             isValid = false;
             errMsg = input.date + " is an invalid date";
             errCode = 5;
           } else if (!apacheValidator.isURL(input.request)) {
             isValid = false;
             errMsg = input.request + " is an invalid url";
             errCode = 7;
           } else if (!apacheValidator.isInRange(input.content_length, 0, 1024 * 1024)) {
             isValid = false;
             errMsg = input.content_length + " is greater than 1MB";
             errCode = 12;
           }

           return {
             'isValid': isValid,
             'errMsg': errMsg,
             'errCode': errCode
           }
         }"
       }
     },
     ...
   ],
   "sink": { ... }
 }
}

Now the user wants to add their own custom validation function that validates that a particular field matches a configurable regex. They implement the ‘Validator’ class that will be added to cdap-etl-api.


public interface Validator {
 
 Set<String> getPackages();

 Object getValidator();

}

 

For example:


@Plugin(type = "validator")
@Name("dateValidator")
@Description("Date Validator using apache commons project.")
public class DateValidator implements Validator {
 
 public getPackages() {
   return Sets.newHashSet("org.apache.commons.validator.routines");
 }
 public Object getValidator() {
   return new DateValidator();
 }
}

 

The user builds their jar, then adds the artifact using the rest api:

 

POST /artifacts/custom-validators --data-binary @custom-validators-1.0.0.jar -H 'Artifact-Extends: etl-batch[3.2.0,4.0.0)/etl-realtime[3.2.0,4.0.0)'

 

The new plugin now shows up in the list plugins call:


GET /artifacts/etl-batch/versions/3.2.0/extensions/validatorFunctions
[
 {
   "name": "apache",
   "description": "Apache validator functions  from org.apache.commons.validator.routines package. Includes isDate(), isURL(), ...",
   "className": "co.cask.cdap.etl.validator.ApacheFunctions"
 },
 {
   "name": "guava",
   "description": "Guava methods",
   "className": "co.cask.cdap.etl.validator.GuavaFunctions"
 },
 {
   "name": "DateValidator",
   ...
 }
 ...
]

Implementation

Internally, the etl app needs to change so that transforms can write to an error dataset, and a Validator transform needs to be added to etl-lib.  Validator will look something like:

 
@Plugin(type = "transform")
@Name("Validator")
@Description("Validates a record, writing to an error dataset if the record is invalid. Otherwise it passes the record on to the next stage.")
public class ValidatorTransform extends Transform<StructuredRecord, StructuredRecord> {
 private ValidatorConfig config;
 private List<Validator> validators;


 // NEW METHOD: today it exists in source/sink but not in transform
 @Override
 public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
   for (String validator : config.validators.split(",")) {
     pipelineConfigurer.usePluginClass("validatorFunction", functionConfig.name, id, properties);
   }
 }


 public void initialize(TransformContext context) {
   for (String validator : config.validators.split(",")) {
     // NEW METHOD: today it doesn't exist in TransformContext, but does in
     //             sink,source contexts
     validators.add(context.newPluginInstance(pluginId))
   }
   // build base javascript that calls importPackage on packages and
   // sets up global variables
 }


 public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) {
   ErrorResult errResult = // run javascript
   if (errResult.isInvalid()) {
     emitter.emitError(errResult);
   } else {
     emitter.emit(input);
   }
 }


 public static class ValidatorConfig extends PluginConfig {
   @Description("list of validator plugins...")
   private final String validators;
 }
}