Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

There have been multiple requests to support sending records to different outputs depending on some condition. Other similar pipelining solutions have this capability.

Goals

The goal is to support pipeline use cases that require sending some records to one output stage and other records to another output stage. 

Use Cases

A pipeline is processing records that contain a union field. The field is a union of 5 possible records. The pipeline developer wants to insert a stage that will have five different output ports, one for each possible schema in the union.

A pipeline is processing records that contain a nullable 'email' field. Most records contain an email, but some do not. The pipeline developer wants to insert a stage that sends all records with empty emails on to a join stage to populate the email field. 

User Stories 

  • As a pipeline developer, I want to insert a stage that sends records to different output ports
  • As a pipeline developer, I want to know how many output ports a stage supports
  • As a pipeline developer, I want to see the schema for each output port
  • As a plugin developer, I want to specify which records are sent to which output port
  • As a plugin developer, I want to be able to specify different output ports depending on input schema and configuration
  • As a plugin developer, I want to be able to modify records before they are sent to an output port

Design

Cover details on assumptions made, design alternatives considered, high level designWe will introduce a new 'splitter' plugin type:

Code Block
public abstract class Splitter<T> implements StageLifecycle<SplitterContext>, MultiOutputPipelineConfigurable {
  public static final String PLUGIN_TYPE = "splitter";
 
  /**
   * Configure the pipeline. This is run once when the pipeline is being published.
   * This is where you perform any static logic, like creating required datasets, performing schema validation,
   * setting output schema, and things of that nature.
   *
   * @param configurer the configurer used to add required datasets and streams
   */
  @Override
  public void configurePipeline(MultiOutputPipelineConfigurer configurer) {
    // no-op
  }
 
  /**
   * Initialize the splitter.
   */
  @Override
  public void initialize(SplitterContext context) throws Exception {
    //no-op
  }

  /**
   * Destroy the splitter.
   */
  @Override
  public void destroy() {
    //no-op
  }
 
  public abstract void transform(T input, MultiOutputEmitter<T> emitter);
}
 
public interface MultiOutputEmitter<T> {
 
  public emit(String output, Object record);
 
  public emitError(InvalidEntry<T> invalidEntry);
}
 
public interface MultiOutputPipelineConfigurable {
 
  void configurePipeline(MultiOutputPipelineConfigurer multiOutputPipelineConfigurer);
}
 
public interface MultiOutputPipelineConfigurer extends PipelineConfigurer {
  
  MultiOutputStageConfigurer getMultiOutputStageConfigurer();
}
 
public interface MultiOutputStageConfigurer {
 
  Schema getInputSchema();
 
  void setOutputSchemas(Map<String, Schema> outputSchemas);
}

 

Approach

Approach #1

Approach #2

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Deprecated Programmatic APIs

 None

New REST APIs

None

Deprecated REST API

None

CLI Impact or Changes

None

UI Impact or Changes

UI will need enhancements to support multiple outputs. It will need some way to display and connect multiple outputs of a stage and manage schema for each output.

Security Impact 

None

Impact on Infrastructure Outages 

None

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work