Multiple File Set Sink
- Russ Savage
- Chaoran Wang
Introduction
As users start to make more and more generic pipelines that are configured at runtime through macros, we will need the ability to write to to multiple sinks with a single plugin since we cannot modify the DAG once a pipeline is deployed. This plugin is designed to be dynamically configured at runtime to write to multiple filesets for a single run.
Use case(s)
- A user would like to create a single pipeline that can be used to process files from different clients. The pipeline will be completely configured using macros so that each run can be customized depending on the client file being read in. Each file contains multiple lines that start with identifiers such as 101 and 202, each of these lines should be sent to a different dataset. Each client has a different set of identifiers in the file and would like to write to a different name and number of output file sets. The user would use this plugin, in combination with conditional directives from Wrangler to dynamically configure and write the records to different filesets.
- This plugin would almost always be configured via macros at the pipeline runtime.
User Stories
- As a pipeline developer, I would like the ability to dynamically change the number of filesets being written to at runtime. Determining which record to write to each dataset should be based on the data in each record.
- As a pipeline developer, I would like to define different schemas for each fileset I am writing to, which means this plugin will allow different input schemas.
- As a pipeline developer, I would like to determine which fileset to write each record to based on one or more fields in the record.
- As a pipeline developer, I would like to be able to compress the data written to each of these datasets.
- As a pipeline developer, I would like to be able to partition the filesets using the runtime
Plugin Type
- Batch Sink
Configurable
This section defines properties that are configurable for this plugin.
User Facing Name | Required | Default | Description |
---|---|---|---|
Sink Name | Y | None | Names of the Sink plugin. |
Type | Y | Automatic | Type of this sink is always batch sink. |
OutputFileSets:
User Facing Name | Required | Default | Description |
---|---|---|---|
Dataset Name | Y | None | Name of the PartitionedFileSet to which records are written. If they do not exist, they will be created. (Macro-enabled) |
Expression | Y | None | Expression for file id filter and different fields filter. |
DatasetTargetPath | N | [Namespace]/data/[Dataset name] | The path where the data will be recorded. Defaults to the name of the dataset. |
FileSet Properties: | N | None | The Parquet file schema of the record being written to the sink as a JSON object. If there are multiple file sets to be written to, multiple schemas of each file in JSON should be combined and written together as a single JSON. (Macro-enabled) |
Compression Codec | N | None | Optional parameter to determine the compression codec to use on the resulting data. Valid values are None, Snappy, GZip, and LZO. |
Schema | Y | None | The schema of the record being written to the sink as a JSON Object. Must be specified manually. |
Example
This example will write to a PartitionedFileSet named 'users'. It will write data in Parquet format compressed using Snappy compressions using the given schema. Every time the pipeline runs, the most recent run will be stored in a new partition in the PartitionedFileSet:
{ "name": "Multiple File Set Sink", "type": "batchsink", "outputFileSets": [{ "compressionCodec": "Snappy", "datasetName": "fileset0", "datasetTargetPaths": "", "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000", "filesetProperties": "", "schema": { "type": "record", "name": "fileset0", "fields": [{ "name": "id", "type": "string" }, { "name": "first_name", "type": "string" }, { "name": "last_name", "type": "string" }, { "name": "sex", "type": "string" }, { "name": "address", "type": "string" }, { "name": "salary", "type": "string" }] } }, { "compressionCodec": "Snappy", "datasetName": "fileset1", "datasetTargetPaths": "", "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000", "filesetProperties": "", "schema": { "type": "record", "name": "sales", "fields": [{ "name": "id", "type": "string" }, { "name": "first_name", "type": "string" }, { "name": "salary", "type": "string" }] } }, { "compressionCodec": "Snappy", "datasetName": "fileset2", "datasetTargetPaths": "", "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000", "filesetProperties": "", "schema": { "type": "record", "name": "sales", "fields": [{ "name": "id", "type": "string" }, { "name": "first_name", "type": "string" }, { "name": "sex", "type": "string" }, { "name": "address", "type": "string" }, { "name": "salary", "type": "string" }] } } ] }
Original data source is as following:
id | first_name | last_name | sex | address | salary |
---|---|---|---|---|---|
202EMPLPIM | Madeline | Heine | F | 22 Rochester St, Uhome, WY | 5000 |
202EMPLPIM | Margaret | Morehead | F | 100 Commerce Cr, Springfield, IL | 6000 |
201EMPLPIM | Sean | Froula | M | 2105 8th St, Uhome, WY | 7000 |
203EMPLPIM | Jennifer | Costello | F | 21 Walker Rd, Uhome, WY | 8000 |
After sink process, it will be split into follow filesets:
fileset0
id | first_name | last_name | sex | address | salary |
---|---|---|---|---|---|
202EMPLPIM | Madeline | Heine | F | 22 Rochester St, Uhome, WY | 5000 |
202EMPLPIM | Margaret | Morehead | F | 100 Commerce Cr, Springfield, IL | 6000 |
201EMPLPIM | Sean | Froula | M | 2105 8th St, Uhome, WY | 7000 |
fileset1
id | first_name | salary |
---|---|---|
202EMPLPIM | Madeline | 5000 |
202EMPLPIM | Margaret | 6000 |
fileset2
id | first_name | sex | address | salary |
---|---|---|---|---|
202EMPLPIM | Madeline | F | 22 Rochester St, Uhome, WY | 5000 |
202EMPLPIM | Margaret | F | 100 Commerce Cr, Springfield, IL | 6000 |
203EMPLPIM | Jennifer | F | 21 Walker Rd, Uhome, WY | 8000 |
Design
- Investigate if the Wrangler core library can be used to evaluate the expression for this plugin
- Can compression codec be changed for each dataset? or should it be set on the job level?
- How can we write to multiple filesets from the same transform function?
Security
Limitation(s)
- This can only be used to write to time partitioned filesets
Future Work
- Add the ability to write to all different types of datasets
- Add the ability to write datasets with different partitioning schemes
Test Case(s)
- Test case #1
- Test case #2
Sample Pipeline
Please attach one or more sample pipeline(s) and associated data.
Pipeline #1
Pipeline #2
Table of Contents
Checklist
- User stories documented
- User stories reviewed
- Design documented
- Design reviewed
- Feature merged
- Examplesandguides
- Integration tests
- Documentation for feature
- Short video demonstrating the feature