Format Plugins

Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

There is a long standing bug where plugins that expose the same classes run into classloading issues at runtime. This is most commonly seen with avro/parquet/orc classes in the various file based plugins. 

Error rendering macro 'jira' : Unable to locate Jira server for this macro. It may be due to Application Link configuration.
 and  Unable to locate Jira server for this macro. It may be due to Application Link configuration.  are a couple examples. The problem arises when a pipeline uses plugins from two different artifacts that expose the same class. For example, File source from core-plugins that exposes AvroKey and GCS sink from google-plugins that also exposes AvroKey. Two separate classloaders end up defining the class, and pipelines fail with confusing errors like AvroKey cannot be cast to AvroKey.


In addition to functional breakage, these plugins all have duplicate code resulting in tech debt. Whenever a new format is added to one, it needs to be added to all the others. Whenever a bug is fixed in one, it must be fixed in all others.

Goals

To provide a pluggable framework for adding and modifying formats that removes both classloading and code duplication as issues. 

User Stories 

  1. As a pipeline developer, I want the same set of formats to be available across all file based sources and sinks
  2. As a pipeline developer, I want to be able to use any combination of plugins in my pipeline without taking format into consideration
  3. As a pipeline developer, I want to know as soon as possible if I have configured an invalid schema for my format
  4. As a pipeline developer, I want the format to tell me what schema should be used if it requires a specific schema
  5. As a pipeline developer, I want the format of my source or sink to be set as metadata on my dataset
  6. As a plugin developer, I want to be able to add a new format without modifying the code or widgets for plugins that use the format
  7. As a plugin developer, I want to be able to add a new format without any changes to CDAP platform or UI code
  8. As a plugin developer, I want to be able to write a format that requires additional configuration properties, like a custom delimiter
  9. As a plugin developer, I want to be able to write a format that uses macro enabled configuration properties
  10. As a plugin developer, I want to be able to provide documentation for formats
  11. As a CDAP administrator, I want to be able to control the set of formats that pipeline developers can use

Design

The classloading and code duplication problems that exist today can be solved if formats were made into plugins rather than embedded inside each plugin. At a high level, avro, parquet, orc, etc. are removed from the core-plugins, google-cloud, amazon-s3, etc plugin jars. They are moved into their own avro-format, parquet-format, orc-format, etc jars. The File, GCS, S3, TPFS, etc plugins are changed to use these format plugins to setup the hadoop input and output formats used to actually read and write data in those formats.

The design is broken down into two phases. The first phase takes place completely in plugin projects. It requires no work in the CDAP platform or UI and solves many of the problems in CDAP 5.1.0 and earlier.

The second phase requires platform work and may overlap with Connection unification work. A proposal is given that meets the requirements from a pipeline perspective, but may need to be changed in light of other dataset work.

Phase1 

In this phase, new format plugins will be created that implement the existing InputFormatProvider and OutputFormatProvider interfaces. Since these interfaces exist in the CDAP API, the format plugins do not have to have any parent and can be used by any CDAP application or plugin. The output formats returned by these formats will implement OutputFormat<NullWritable, StructuredRecord> so that sinks can just pass their input directly to the output format without transformation. Similarly, input formats will implement InputFormat<NullWritable, StructuredRecord>. For example, the avro output format plugin will look something like:

@Plugin(type = "outputformat")
@Name("avro")
@Description("Avro output format plugin that provides the output format class name and properties " +
  "required to write Avro files.")
public class AvroOutputFormatProvider implements OutputFormatProvider {
  private final Conf conf;

  public AvroOutputFormatProvider(Conf conf) {
    this.conf = conf;
  }

  @Override
  public String getOutputFormatClassName() {
    return StructuredAvroOutputFormat.class.getName();
  }

  @Override
  public Map<String, String> getOutputFormatConfiguration() {
    Map<String, String> configuration = new HashMap<>();
    configuration.put("avro.schema.output.key", conf.schema);
    return configuration;
  }

  public static class Conf extends PluginConfig {
    @Macro
    @Nullable
    private String schema;
  }
}

/**
 * Converts StructuredRecord into GenericRecord before delegating to AvroKeyOutputFormat.
 */
public class StructuredAvroOutputFormat extends OutputFormat<NullWritable, StructuredRecord> {
  private AvroKeyOutputFormat<GenericRecord> delegate;

  @Override
  public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptContext context) throws IOException {
    Configuration hConf = context.getConfiguration();
    Schema schema = Schema.parseJson(hConf.get("avro.schema.output.key"));
    RecordWriter<AvroKey<GenericRecord>, NullWritable> delegateWriter = getDelegate().getRecordWriter(context);
    return new TransformingRecordWriter(delegateWriter, schema);
  }

  @Override
  public void checkOutputSpecs(JobContext context) throws IOException {
    getDelegate().checkOutputSpecs(context);
  }

  @Override
  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
    return getDelegate().getOutputCommitter(context);
  }

  private AvroKeyOutputFormat<GenericRecord> getDelegate() {
    if (delegate == null) {
      delegate = new AvroKeyOutputFormat<>();
    }
    return delegate;
  }

  /**
   * Transforms StructuredRecord into GenericRecord then delegates for the actual writing.
   */
  public static class TransformingRecordWriter extends RecordWriter<NullWritable, StructuredRecord> {
    private final RecordWriter<AvroKey<GenericRecord>, NullWritable> delegate;
    private final StructuredToAvroTransformer transformer;

    public TransformingRecordWriter(RecordWriter<AvroKey<GenericRecord>, NullWritable> delegate,
                                    Schema schema) {
      this.delegate = delegate;
      this.transformer = new StructuredToAvroTransformer(schema);
    }

    @Override
    public void write(NullWritable key, StructuredRecord value) throws IOException, InterruptedException {
      delegate.write(new AvroKey<>(transformer.transform(value)), key);
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
      delegate.close(context);
    }
  }
}

In order to use these plugins, sinks need to register them and instantiate them just like any other plugins.

public class FileSink extends BatchSink<StructuredRecord, NullWritable, StructuredRecord> {
  private static final String FORMAT_PLUGIN_ID = "format";
  private final Conf config;

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    OutputFormatProvider format = pipelineConfigurer.usePlugin("outputformat", config.getFormat(),
                                                               FORMAT_PLUGIN_ID, config.getFormatProperties());
    // call this so validation logic in the method is performed, failing pipeline deployment for badly configured formats 
    format.getOutputFormatConfiguration()
  }

  @Override
  public final void prepareRun(BatchSinkContext context) {
    OutputFormatProvider outputFormatProvider = context.newPluginInstance(FORMAT_PLUGIN_ID);


    String outputFormatClass = outputFormatProvider.getOutputFormatClassName();
    // actual output format contains all format specific configuration plus path configuration
    Map<String, String> outputProperties = new HashMap<>(outputFormatProvider.getOutputFormatConfiguration());
    outputProperties.put(FileOutputFormat.OUTDIR, config.getOutputPath());

    context.addOutput(Output.of(config.getReferenceName(),
                                new SinkOutputFormatProvider(outputFormatClass, outputProperties)));
  }
}

In phase 1, instead of a separate property for format properties, plugins will simply pass their own properties down to the format plugin. In future releases, with nicer UI widget support, format specific properties can be added. In this time frame, the File, GCS, S3, etc plugins will still need to hardcode available format plugins in a select widget for the format property.

Formats are separated into their own artifacts, will one format per artifact. If a system administrator wants to enforce that all pipelines write using the parquet format, they can do so by deleting all the other format plugins.

Shortcomings

This approach solves the classloading and code duplication problems. However, it does not address all of the user stories. It does not address:

4. As a pipeline developer, I want the format to tell me what schema should be used if it requires a specific schema

This amounts to support for a 'Get Schema' button on sources that use formats. For example, the 'text' format requires that there be a 'body' field of type 'string' and the 'blob' format requires that there be a 'body' field of type 'bytes'. The source plugin should be able to ask the format to provide the required schema and return it to the user, but there is no way to do that without platform changes. In the interim, the source will have to implement that logic using prior knowledge about formats.

5. As a pipeline developer, I want the format of my source or sink to be set as metadata on my dataset

This is not automatically performed by the format or the platform, though the source/sink plugin can still add it explicitly.

6. As a plugin developer, I want to be able to add a new format without any changes to CDAP platform or UI code

This is not fully realized, as source/sink plugins still need to hardcode the available format list in the widget select dropdown. With a new widget type, this can be avoided.

8. As a plugin developer, I want to be able to write a format that requires additional configuration properties, like a custom delimiter

This is not fully realized, as source/sink plugins have no idea what properties a format requires. As such, they need to introduce a new 'formatProperties' config property and pass these down to the format. The UI will need to look up the properties that format requires and render a nice UI for users to configure. This should be done in a new widget type.

10. As a plugin developer, I want to be able to provide documentation for formats

UI changes are required to render format documentation in a sensible location.

Phase2

The purpose of Phase 2 is to address all the user stories that were not completed by Phase 1. Work in phase 2 requires platform changes in the backend or UI. Changes proposed here are more forward looking and are likely subject to change depending on what happens with the Connection consolidation work.

Schema Support

InputFormatProvider has no way to tell its caller that it requires a specific schema. This is required to support the 'Get Schema' button entirely in the format and not in the source plugin. This can be achieved by extending the InputFormatProvider interface:


public interface InputFormatProvider {

  ...


  /**
   * Return the schema for data that will be read by this input format. If null, the schema is either unknown
   * or multiple schemas may be read.
   */
  @Nullable
  Schema getSchema()

}

With this change, source plugins can simply delegate to their format plugin in order to implement their 'Get Schema' button functionality. This requires one additional change:

public interface EndpointPluginContext {
  ...


  /**
   * Returns an instance of a plugin with the given type, name, and properties.
   */
  <T> T usePlugin(String pluginType, String pluginName, PluginProperties pluginProperties);
}

CDAP 5.1 and earlier only allows the plugin to get a Class<T> in a plugin endpoint. Plugins will need an actual instance of an InputFormatProvider in order to call the getSchema() method.

UI Support

The UI will support a new widget type that will provide a dropdown to list all plugins of specific type. Each of these plugins may support a different set of properties. When a specific plugin is selected from the dropdown, the UI should render those properties according to its own widget spec. Format documentation should also be rendered once selected.

  {
    "widget-type": "plugin-list",
    "label": "Format",
    "name": "format",
    "widget-attributes": {
      "type": "inputformat",
      // which field the backend expects to contain format properties
      "propertiesName": "formatProperties",
      // whether to only show the latest version of a format plugin
      "collapseVersions": true
    }
  },
  // hide this so it doesn't show up to the user as a separate config property
  {
    "widget-type": "hidden",
    "name": "formatProperties"
  }

This same mechanism can be re-used for other plugins, like the DB source and sink, where users currently have to know which jdbc driver plugins are installed and what their names are.

API changes

New Programmatic APIs

One new method to InputFormatProvider

Deprecated Programmatic APIs

None

New REST APIs

None

Deprecated REST API

None

CLI Impact or Changes

None

UI Impact or Changes

New plugin list widget required

Security Impact 

None

Impact on Infrastructure Outages 

None

Test Scenarios

Test IDTest DescriptionExpected Results
1Create a pipeline that reads from a local Avro file and writes to GCS, S3, TPFS, and File sinks in avro format
2Same as #1 except with Parquet
3Same as #1 except with Orc
4Same as #1 except with Json

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3


Future work