Pipeline System Application


Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

As the data pipeline experience has become more sophisticated, it has become clear that there are a good amount of capabilities required during the pipeline creation process. Some of these capabilities are provided by the CDAP platform if they are generic enough, such as the listing of plugins and their properties. However, there are a class of capabilities that are specific only to pipelines, such as pipeline validation, schema propagation, plugin templates, and pipeline drafts. These have all been implemented with non-trivial logic in the UI. This type of pipeline specific logic does not belong in the UI, but in a backend service that the UI can call. These types of features were explored using the plugin endpoint feature (for get schema buttons), but that has turned out to be difficult to handle errors correctly and ends up running user code in the CDAP master. The lack of this pipeline backend layer has slowed UI development, contributed to bugs in the product, and added a lot of technical debt. By moving much of this logic to a system application, overall development speed will increase and the door will be opened to a richer set of features in the future. 

Goals

Design the pipeline system application to remove tech debt from the UI and build an architecture that supports a richer set of pipeline specific product requirements in the future.

User Stories 

  1. As a pipeline developer, I want to be able to validate a pipeline before deploying it and know exactly which stages and which fields are invalid and why
  2. As a pipeline developer, I want to be able to validate a pipeline stage and know exactly which fields are invalid and why
  3. As a pipeline developer, I want to be able to debug a single pipeline stage by specifying input and examining output and/or errors
  4. As a pipeline developer, I want the schema displayed by the UI to always match what is used during execution
  5. As a pipeline developer, I want to be able to import a pipeline spec with missing artifacts and have the option to automatically update those versions
  6. As a plugin developer, I want schema propagation to be defined in a single place
  7. As a CDAP administrator, I don't want user code to run in the CDAP master

Design

A new system application will be introduced to provide much of the more complex logic that is currently handled by the UI. The data pipeline code will be enhanced with a Service program to expose a few APIs. Where possible, APIs will be stateless. The application will be deployed at startup to the system namespace through a bootstrap action. All REST endpoints described are prefixed by 'v3/namespaces/system/apps/pipeline/services/studio/methods/v1'. 

Validate Pipeline

Pipeline validation is already done by the application when a pipeline is deployed. The logic just needs to be refactored so that it can be run during application configure time and in a service method call. The request body is exactly the same as the request to deploy a pipeline

POST v1/contexts/<namespace-id>/validations/pipeline

Request Body:
{
  "artifact": {
    "scope": "SYSTEM",
    "name": "cdap-data-pipeline",
    "version": "6.0.0"
  }
  "config": {
    "stages": [ ... ],
    "connections": [ ... ],
    ...
  }
}

Response:
{
  "errors": [
    {
      "type": "Invalid Field",
      "message": "...",
      // type specific fields
    }
  ],
  "spec": {
    "stages": [
      {
        "name": "stageName",
        "plugin": {
          "type": "pluginType",
          "name": "pluginName",
          "properties": {
            "name": "value"
          },
          "artifact": {
            "name": "core-plugins",
            "scope": "SYSTEM" | "USER",
            "version": "1.0.0"
          },
          "inputSchemas": {
            "[inputStageName]": { schema object }
          },
          "outputPorts": {
            "[outputStageName]": {
              "port": "portName",
              "schema": "portSchema"
            }
          },
          "outputSchema": { schema object },
          "errorSchema": { schema object }
        }
      },
      ...
    ],
    "connections": [
      {
        "from": "inputStageName",
        "to": "outputStageName",
        "port": "outputPort",
        "condition": true | false
      },
      ...
    ],
    "resources": {
      "virtualCores": 1,
      "memoryMB": 1024
    },
    "driverResources": { ... },
    "clientResources": { ... },
    "stageLoggingEnabled": true | false,
    "processTimingEnabled": true | false,
    "properties": {
      "name": "val",
      ...
    },
    "endingActions": [
      {
        "name": "postaction name",
        "plugin": {
          "type": "postaction",
          "name": "email",
          "properties": { ... },
          "artifact": { ... }
        }
      }
    ]
  }
}

Namespace is required in the request in order for the app to pick up the correct plugin, since plugin artifacts can be in user scope.

The response contains whether the pipeline is valid, errors if it is invalid, and the spec if it is valid. This is the same spec that can be found in the program properties of a deployed pipeline. It is similar to the pipeline config except it contains the exact plugin artifact that will be used (different than input if the input contains a version range or missing field), and it is enhanced with more information about schema.

If the error is not specific to a stage (for example, the pipeline structure is invalid), the stage name and field name will be missing. In order to capture this information, the plugin APIs will be extended to include a validate method that can throw a specific type of exception. This method will be called at configure time when the pipeline is deployed, as well as when the validate service endpoint is called.

public class InvalidStageException extends RuntimeException {
  private final List<Exception> errors;

  public InvalidStageException(Collection<Exception> errors);

  private final List<Exception> getErrors();
}

public class InvalidFieldException extends RuntimeException {
  private final String fieldName;
  
  public InvalidFieldException(String fieldName, String message, Throwable cause) { 
    super(message, cause);
    this.fieldName = fieldName;
  }

  public String getFieldName() {
    return fieldName;
  }
}


public interface PipelineConfigurable {
  
  void configurePipeline(PipelineConfigurer pipelineConfigurer) throws InvalidStageException;

}

This allows plugins to write validation code that looks something like:

@Override
public void propagateSchema(StageConfigurer stageConfigurer) throws InvalidStageException {
  List<Exception> configErrors = new ArrayList<>();
  if (config.getNumPartitions() < 1) {
    configErrors.add(new InvalidFieldException("numPartitions", "The number of partitions must be at least 1."));
  }
  if (!configErrors.isEmpty()) {
    throw new InvalidStageException(configErrors);
  }
}

Which would result in errors in the REST response like:

"errors": [
  {
    "type": "Invalid Field",
    "message": "The number of partitions must be at least 1.",
    "stage": "stage-4",
    "field": "numPartitions"
  }
]

Another type of error could be due to a plugin artifact that is missing. This specific type of error is fairly common during pipeline import, when a user imports a pipeline from a different version of CDAP, and the artifact versions do not match.  The error would look like:

"errors": [
  {
    "stage": "stage-5",
    "type": "Artifact Not Found",
    "message": "No plugin named 'table' could be found.",
    "suggestedArtifact": {
      "scope": "SYSTEM",
      "name": "core-plugins",
      "version": "6.0.0"
    }
  }
]

Validate Stage

Validation for a single stage is very similar to that of an entire pipeline. Instead of passing the entire pipeline config, the client passes just the input schema and the stage config. Instead of responding with the entire pipeline spec, just stage spec is returned.

POST v1/contexts/<namespace-id>/validations/stage

Request Body:
{
  "stage": {
    "name": "stageName",
    "plugin": {
      "type": "pluginType",
      "name": "pluginName",
      "artifact": {
        "scope": "SYSTEM" | "USER",
        "name": "core-plugins",
        "version": "1.0.0"
      },
      "properties": {
        "name": "val",
        ...
      }
    }
  },
  "inputSchemas": [
    {
      "stage": "abc",
      "schema": { schema object}
    },
    ...
  ]       
}

Response:
{
  "errors": [
    {
      "type": "Invalid Field",
      "message": "..."
    }
  ],
  "spec": {
    "name": "stageName",
    "plugin": {
      "type": "pluginType",
      "name": "pluginName",
      "properties": {
        "name": "value"
      },
      "artifact": {
        "name": "core-plugins",
        "scope": "SYSTEM" | "USER",
        "version": "1.0.0"
      },
      "inputSchemas": {
        "[inputStageName]": { schema object }
      },
      "portSchemas": {
        "[portName]": { schema object } 
      }
      "outputPorts": {
        "[outputStageName]": {
          "port": "portName",
          "schema": "portSchema"
        }
      },
      "outputSchema": { schema object },
      "errorSchema": { schema object }
    }
  }
}

This endpoint can be used to automatically update the output schema of a stage after a user changes the configuration of the stage. This would replace the 'Get Schema' button that is present in several plugins, removing the possibility of an invalid pipeline because somebody forgot to click the button.

Stage Debugging (Future Work)

It is often useful to be able to send input into a single pipeline stage and see what it would output. This is useful to debug a pipeline stage, and can be useful just to figure out how exactly a pipeline behaves. The request body is similar to the validate stage body, except it contains an additional section defining what the input stages are, the schema of the records from that stage, and the records from that stage. The response contains the output records, error records, or any exception that was generated while attempting to generate the output. This will be implemented initially for transform plugin types, and can be expanded as needed.

POST v1/contexts/<namespace-id>/validations/output


Request Body:
{
  "namespace": "default",
  "stage": {
    "name": "stageName",
    "plugin": {
      "type": "pluginType",
      "name": "pluginName",
      "artifact": {
        "scope": "SYSTEM" | "USER",
        "name": "core-plugins",
        "version": "1.0.0"
      },
      "properties": {
        "name": "val",
        ...
      }
    }
  },
  "input": [
    {
      "stage": "input stage name",
      "schema": { schema of data coming from that stage },
      "records": [
        {
          "fieldName": type appropriate value
        },
        ...
      ]
    },
    ...
  ]       
}


Response:
{
  "errors": [
    {
      "message": "exception message",
      "trace": "exception stack trace"
    },
    ...
  ],
  "output": {
    "schema": { output schema },
    "records": [
      {
        "fieldName": type appropriate value
      },
      ...
    ] 
  }
}

Plugin Endpoints (Future Work)

Plugins used to use a Plugin Endpoints feature to run custom code in the cdap master to calculate output schema. The plugin code would look something like:

public class MyPlugin extends Transform {
  private final MyConfig conf;
  
  // called when 'Get Schema' button is pressed
  @Path("getSchema")
  public Schema getSchema(SchemaRequest request, EndpointPluginContext context) throws Exception {
    ...
  }


  static class SchemaRequest extends MyConf {
    private final Schema inputSchema;
  }
}

The current implementation of this feature is going to be removed, as it executes user code in the cdap master, and it does a poor job of error handling. 

The new validation endpoint now removes the need for the 'get schema' use case, but there still might be times when it makes sense for a plugin to provide additional functionality that is specific to that plugin and not generic across all plugins. For example, the predictor plugin from MMDS might want to expose an endpoint that lists the available experiments, or lists the available models within an experiment. 

Now that these plugins are actually instantiated within a CDAP Service program, these endpoints can be added as a system service API

POST v1/contexts/<namespace-id>/methods/{method-name}


Request Body:
{
  "plugin": {
    "type": "pluginType",
    "name": "pluginName",
    "artifact": {
      "scope": "SYSTEM" | "USER",
      "name": "core-plugins",
      "version": "1.0.0"
    }
  },
  "request": { // json representation of the object passed into the plugin endpoint method
    "inputSchema": { ... },
    "x": "y",
    ...
  }     
}

 This means there is a restriction where the Path must be a single element – it cannot contain a '/'.


Plugin Changes

Currently, many plugins perform validation and schema propagation in multiple places. They will often look something like:

public class MyPlugin extends Transform {
  private final MyConfig conf;
  
  @Override
  public void configurePipeline(PipelineConfigurer configurer) {
    // validate config properties that were not macros
    Schema inputSchema = configurer.getStageConfigurer().getInputSchema();
    validate(inputSchema);
    configurer.getStageConfigurer().setOutputSchema(getOutputSchema(inputSchema));
    // setup datasets
    ...
  }

  @Override
  public void prepareRun(BatchSinkContext context) {
    // validate again to validate any config properties that were macros
    validate(context.getInputSchema());
    // setup run
    ...
  }
   
  // called when 'Get Schema' button is pressed
  @Path("getSchema")
  public Schema getSchema(SchemaRequest request) throws Exception {
    validate(request.inputSchema);
    return getOutputSchema(request.inputSchema);
  }

  private void validate(@Nullable Schema inputSchema) {
    if (!conf.containsMacro("f1") && conf.getF1() >= 100) {
      throw new IllegalArgumentException("f1 must be less than 100);
    }
    ...
  }

  private Schema getOutputSchema(@Nullable Schema inputSchema) {
    // generate output schema based on config properties
  }


  static class SchemaRequest extends MyConf {
    private final Schema inputSchema;
  }
}

Validation is done at configure time to catch any errors as early as possible. It is also done when preparing a run, as many properties could have been macros. It is also done when getting the output schema for the 'Get Schema' button. Similarly, schema propagation is done at configure time when possible, and for the 'Get Schema' button. It should also be done at prepare time (was never added due to time), since schema can be a macro as well.

There is a lot of duplication. The contract for configurePipeline() can be extended so that it can be called before pipeline deployment for validation and schema propagation. In addition, it can be called when preparing a run so that full validation can take place now that macros have been evaluated, and schema that couldn't have been determined at deployment can now be propagated. This removes the need for the plugin developer to explicitly validate in two places, and removes the need to explicitly get schema in two places. The code is then simplified to just be:

public class MySource extends Transform {
  private final MyConfig conf;
  
  @Override
  public void configurePipeline(PipelineConfigurer configurer) {
    // validate config properties that were not macros
    Schema inputSchema = configurer.getStageConfigurer().getInputSchema();
    validate(inputSchema);
    configurer.getStageConfigurer().setOutputSchema(getOutputSchema(inputSchema));
    // setup datasets
    ...
  }

  @Override
  public void prepareRun(BatchSinkContext context) {
    // setup run
    ...
  }

  private void validate(@Nullable Schema inputSchema) {
    if (!conf.containsMacro("f1") && conf.getF1() >= 100) {
      throw new IllegalArgumentException("f1 must be less than 100);
    }
    ...
  }

}


API changes

New Programmatic APIs

There are new Exceptions added to the plugin APIs

Deprecated Programmatic APIs

None

New REST APIs

PathMethodDescriptionResponse CodeResponse
v3/namespaces/system/apps/pipelines/services/studio/methods/v1/contexts/<namespace-id>/validations/pipelinePOSTValidates a pipeline config

Pipeline spec or errors

v3/namespaces/system/apps/pipelines/services/studio/methods/v1/contexts/<namespace-id>/validations/stagePOSTValidates a single pipeline stage
Stage spec or errors
v3/namespaces/system/apps/pipelines/services/studio/methods/v1/contexts/<namespace-id>/validations/outputPOSTReturns what the stage would output given its configuration and some provided input
Output records or errors

Deprecated REST API

None

CLI Impact or Changes

None

UI Impact or Changes

UI will need to change how the 'Get Schema' buttons are handled to call the new stage validation endpoint.

When the UI wants to remove it's own schema propagation logic and add validation, it will need to use the new validation endpoints.

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

Users will not be able to use the pipeline studio if the new pipeline service is down.

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release 6.0.0

Implement validate endpoints in order to ensure user code no longer runs in the CDAP master

Related Work

  • Work #1
  • Work #2
  • Work #3


Future work

Stage Debugging