Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
There is a long standing bug where plugins that expose the same classes run into classloading issues at runtime. This is most commonly seen with avro/parquet/orc classes in the various file based plugins.
Jira Legacy | ||||||
---|---|---|---|---|---|---|
|
Jira Legacy | ||||||
---|---|---|---|---|---|---|
|
In addition to functional breakage, these plugins all have duplicate code resulting in tech debt. Whenever a new format is added to one, it needs to be added to all the others. Whenever a bug is fixed in one, it must be fixed in all others.
Goals
To provide a pluggable framework for adding and modifying formats that removes both classloading and code duplication as issues.
User Stories
- As a pipeline developer, I want the same set of formats to be available across all file based sources and sinks
- As a pipeline developer, I want to be able to use any combination of plugins in my pipeline without taking format into consideration
- As a pipeline developer, I want to know as soon as possible if I have configured an invalid schema for my format
- As a pipeline developer, I want the format to tell me what schema should be used if it requires a specific schema
- As a pipeline developer, I want the format of my source or sink to be set as metadata on my dataset
- As a plugin developer, I want to be able to add a new format without modifying the code or widgets for plugins that use the format
- As a plugin developer, I want to be able to add a new format without any changes to CDAP platform or UI code
- As a plugin developer, I want to be able to write a format that requires additional configuration properties, like a custom delimiter
- As a plugin developer, I want to be able to write a format that uses macro enabled configuration properties
- As a plugin developer, I want to be able to provide documentation for formats
- As a CDAP administrator, I want to be able to control the set of formats that pipeline developers can use
Design
The classloading and code duplication problems that exist today can be solved if formats were made into plugins rather than embedded inside each plugin. At a high level, avro, parquet, orc, etc. are removed from the core-plugins, google-cloud, amazon-s3, etc plugin jars. They are moved into their own avro-format, parquet-format, orc-format, etc jars. The File, GCS, S3, TPFS, etc plugins are changed to use these format plugins to setup the hadoop input and output formats used to actually read and write data in those formats.
The design is broken down into two phases. The first phase takes place completely in plugin projects. It requires no work in the CDAP platform or UI and solves many of the problems in CDAP 5.1.0 and earlier.
The second phase requires platform work and may overlap with Connection unification work. A proposal is given that meets the requirements from a pipeline perspective, but may need to be changed in light of other dataset work.
Phase1
In this phase, new format plugins will be created that implement the existing InputFormatProvider and OutputFormatProvider interfaces. Since these interfaces exist in the CDAP API, the format plugins do not have to have any parent and can be used by any CDAP application or plugin. The output formats returned by these formats will implement OutputFormat<NullWritable, StructuredRecord> so that sinks can just pass their input directly to the output format without transformation. Similarly, input formats will implement InputFormat<NullWritable, StructuredRecord>. For example, the avro output format plugin will look something like:
Code Block |
---|
@Plugin(type = "outputformat") @Name("avro") @Description("Avro output format plugin that provides the output format class name and properties " + "required to write Avro files.") public class AvroOutputFormatProvider implements OutputFormatProvider { private final Conf conf; public AvroOutputFormatProvider(Conf conf) { this.conf = conf; } @Override public String getOutputFormatClassName() { return StructuredAvroOutputFormat.class.getName(); } @Override public Map<String, String> getOutputFormatConfiguration() { Map<String, String> confconfiguration = new HashMap<>(); confconfiguration.put("avro.schema.output.key", conf.schema.toString()); confconfiguration.put(JobContext.OUTPUT_KEY_CLASS, AvroKey.class.getName()); return conf; } public static class Conf extends PluginConfig { @Macro @Nullable private String schema; } } /** * Converts StructuredRecord into GenericRecord before delegating to AvroKeyOutputFormat. */ public class StructuredAvroOutputFormat extends OutputFormat<NullWritable, StructuredRecord> { private AvroKeyOutputFormat<GenericRecord> delegate; @Override public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptContext context) throws IOException { Configuration hConf = context.getConfiguration(); Schema schema = Schema.parseJson(hConf.get("avro.schema.output.key")); RecordWriter<AvroKey<GenericRecord>, NullWritable> delegateWriter = getDelegate().getRecordWriter(context); return new TransformingRecordWriter(delegateWriter, schema); } @Override public void checkOutputSpecs(JobContext context) throws IOException { getDelegate().checkOutputSpecs(context); } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { return getDelegate().getOutputCommitter(context); } private AvroKeyOutputFormat<GenericRecord> getDelegate() { if (delegate == null) { delegate = new AvroKeyOutputFormat<>(); } return delegate; } /** * Transforms StructuredRecord into GenericRecord then delegates for the actual writing. */ public static class TransformingRecordWriter extends RecordWriter<NullWritable, StructuredRecord> { private final RecordWriter<AvroKey<GenericRecord>, NullWritable> delegate; private final StructuredToAvroTransformer transformer; public TransformingRecordWriter(RecordWriter<AvroKey<GenericRecord>, NullWritable> delegate, Schema schema) { this.delegate = delegate; this.transformer = new StructuredToAvroTransformer(schema); } @Override public void write(NullWritable key, StructuredRecord value) throws IOException, InterruptedException { delegate.write(new AvroKey<>(transformer.transform(value)), key); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { delegate.close(context); } } } |
In order to use these plugins, sinks need to register them and instantiate them just like any other plugins.
Code Block |
---|
public class FileSink extends BatchSink<StructuredRecord, NullWritable, StructuredRecord> { private static final String FORMAT_PLUGIN_ID = "format"; private final Conf config; @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { OutputFormatProvider format = pipelineConfigurer.usePlugin("outputformat", config.getFormat(), FORMAT_PLUGIN_ID, config.getFormatProperties()); // call this so validation logic in the method is performed, failing pipeline deployment for badly configured formats format.getOutputFormatConfiguration() } @Override public final void prepareRun(BatchSinkContext context) { OutputFormatProvider outputFormatProvider = context.newPluginInstance(FORMAT_PLUGIN_ID); String outputFormatClass = outputFormatProvider.getOutputFormatClassName(); // actual output format contains all format specific configuration plus path configuration Map<String, String> outputProperties = new HashMap<>(outputFormatProvider.getOutputFormatConfiguration()); outputProperties.put(FileOutputFormat.OUTDIR, config.getOutputPath()); context.addOutput(Output.of(config.getReferenceName(), new SinkOutputFormatProvider(outputFormatter.getFormatClassName(), outputProperties))); } } |
In phase 1, instead of a separate property for format properties, plugins will simply pass their own properties down to the format plugin. In future releases, with nicer UI widget support, format specific properties can be added. In this time frame, the File, GCS, S3, etc plugins will still need to hardcode available format plugins in a select widget for the format property.
Formats are separated into their own artifacts, will one format per artifact. If a system administrator wants to enforce that all pipelines write using the parquet format, they can do so by deleting all the other format plugins.
Shortcomings
This approach solves the classloading and code duplication problems. However, it does not address all of the user stories. It does not address:
4. As a pipeline developer, I want the format to tell me what schema should be used if it requires a specific schema
This amounts to support for a 'Get Schema' button on sources that use formats. For example, the 'text' format requires that there be a 'body' field of type 'string' and the 'blob' format requires that there be a 'body' field of type 'bytes'. The source plugin should be able to ask the format to provide the required schema and return it to the user, but there is no way to do that without platform changes. In the interim, the source will have to implement that logic using prior knowledge about formats.
5. As a pipeline developer, I want the format of my source or sink to be set as metadata on my dataset
This is not automatically performed by the format or the platform, though the source/sink plugin can still add it explicitly.
6. As a plugin developer, I want to be able to add a new format without any changes to CDAP platform or UI code
This is not fully realized, as source/sink plugins still need to hardcode the available format list in the widget select dropdown. With a new widget type, this can be avoided.
8. As a plugin developer, I want to be able to write a format that requires additional configuration properties, like a custom delimiter
This is not fully realized, as source/sink plugins have no idea what properties a format requires. As such, they need to introduce a new 'formatProperties' config property and pass these down to the format. The UI will need to look up the properties that format requires and render a nice UI for users to configure. This should be done in a new widget type.
10. As a plugin developer, I want to be able to provide documentation for formats
UI changes are required to render format documentation in a sensible location.
Phase2
The purpose of Phase 2 is to address all the user stories that were not completed by Phase 1. Work in phase 2 requires platform changes in the backend or UI. Changes proposed here are more forward looking and are likely subject to change depending on what happens with the Connection consolidation work.
Schema Support
InputFormatProvider has no way to tell its caller that it requires a specific schema. This is required to support the 'Get Schema' button entirely in the format and not in the source plugin. This can be achieved by extending the InputFormatProvider interface:
No Format |
---|
public interface InputFormatProvider { ... /** * Return the schema for data that will be read by this input format. If null, the schema is either unknown * or multiple schemas may be read. */ @Nullable Schema getSchema() } |
With this change, source plugins can simply delegate to their format plugin in order to implement their 'Get Schema' button functionality. This requires one additional change:
No Format |
---|
public interface EndpointPluginContext { ... /** * Returns an instance of a plugin with the given type, name, and properties. */ <T> T usePlugin(String pluginType, String pluginName, PluginProperties pluginProperties); } |
CDAP 5.1 and earlier only allows the plugin to get a Class<T> in a plugin endpoint. Plugins will need an actual instance of an InputFormatProvider in order to call the getSchema() method.
UI Support
The UI will support a new widget type that will provide a dropdown to list all plugins of specific type. Each of these plugins may support a different set of properties. When a specific plugin is selected from the dropdown, the UI should render those properties according to its own widget spec. Format documentation should also be rendered once selected.
No Format |
---|
{ "widget-type": "plugin-list", "label": "Format", "name": "format", "widget-attributes": { "type": "inputformat", // which field the backend expects to contain format properties "propertiesName": "formatProperties", // whether to only show the latest version of a format plugin "collapseVersions": true } }, // hide this so it doesn't show up to the user as a separate config property { "widget-type": "hidden", "name": "formatProperties" } |
This same mechanism can be re-used for other plugins, like the DB source and sink, where users currently have to know which jdbc driver plugins are installed and what their names are.
API changes
New Programmatic APIs
One new method to InputFormatProvider
Deprecated Programmatic APIs
None
New REST APIs
None
Deprecated REST API
None
CLI Impact or Changes
None
UI Impact or Changes
New plugin list widget required
Security Impact
None
Impact on Infrastructure Outages
None
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
1 | Create a pipeline that reads from a local Avro file and writes to GCS, S3, TPFS, and File sinks in avro format | |
2 | Same as #1 except with Parquet | |
3 | Same as #1 except with Orc | |
4 | Same as #1 except with Json |
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3