- Created by Russ Savage, last modified on Jul 10, 2017
You are viewing an old version of this page. View the current version.
Compare with Current View Page History
« Previous Version 25 Next »
Introduction
As a user of the Pipeline tool, I would like to define a single plugin that writes to multiple file sets depending on a value in the message. This is useful for building a generic pipeline that can be used to write to a different number fo sinks on each run.
Use case(s)
- A user would like to create a single pipeline that can be used to process files from different clients. The pipeline will be configured using macros so that each run can be customized depending on the file being read in. The file contains lines that start with identifiers such as 101 and 202, each of these lines should be sent to a different dataset. Each client has a different set of identifiers in the file and would like to write to a different number of output file sets. The user would use this plugin, in combination with conditional directives from Wrangler to dynamically configure and write the records to different file sets.
- This plugin would almost always be configured via macros
User Storie(s)
- User story #1
- User story #2
- User story #3
- User story #m
Plugin Type
- Batch Sink
Configurable
This section defines properties that are configurable for this plugin.
User Facing Name | Required | Default | Description |
---|---|---|---|
Sink Name | Y | None | Names of the Sink plugin. |
Type | Y | Automatic | Type of this sink is always batch sink. |
OutputFileSets:
User Facing Name | Required | Default | Description |
---|---|---|---|
Dataset Name | Y | None | Name of the PartitionedFileSet to which records are written. If they do not exist, they will be created. (Macro-enabled) |
Expression | Y | None | Expression for file id filter and different fields filter. |
DatasetTargetPath | N | [Namespace]/data/[Dataset name] | The path where the data will be recorded. Defaults to the name of the dataset. |
FileSet Properties: | N | None | The Parquet file schema of the record being written to the sink as a JSON object. If there are multiple file sets to be written to, multiple schemas of each file in JSON should be combined and written together as a single JSON. (Macro-enabled) |
Compression Codec | N | None | Optional parameter to determine the compression codec to use on the resulting data. Valid values are None, Snappy, GZip, and LZO. |
Schema | Y | None | The schema of the record being written to the sink as a JSON Object. Must be specified manually. |
Example
This example will write to a PartitionedFileSet named 'users'. It will write data in Parquet format compressed using Snappy compressions using the given schema. Every time the pipeline runs, the most recent run will be stored in a new partition in the PartitionedFileSet:
{code}
{ "name": "Multiple File Set Sink", "type": "batchsink", "outputFileSets": "[ { "datasetName":"fileset0", "expression" : "(id.startsWith("201") || id.startsWith("202") || id.startsWith("203") )&& salary >= 5000 && salary <= 7000" "datasetTargetPaths" : "", "filesetProperties" : "", "compressionCodec": "Snappy", "schema" : " { "fields":[ {"id":"id","type":"string"}, {"first name":"first name", "type":"string"}, {"last name":"last name", "type":"string"}, {"sex":"sex", "type":"char"}, {"address":"address","type":"string"}, {"salary":"salary","type":"int"}, ] } }, { "datasetName":"fileset1", "expression" : " id.startsWith("202") && salary >= 5000 && salary <= 7000" "datasetTargetPaths" : "", "filesetProperties" : "", "compressionCodec": "Snappy", "schema" : " { "fields":[ {"id":"id","type":"string"}, {"first name":"first name", "type":"string"}, {"salary":"salary","type":"int"}, ] } }, { "datasetName":"fileset2", "expression" : "(id.startsWith("202") || id.startsWith("203")) && salary >= 5000 && salary < 9000" "datasetTargetPaths" : "", "filesetProperties" : "", "compressionCodec": "Snappy", "schema" : " { "fields":[ {"id":"id","type":"string"}, {"first name":"first name", "type":"string"}, { "sex":"sex", "type":"char" }, {"address":"address","type":"string"}, {"salary":"salary","type":"int"}, ] } } ]" } }
Original data source is as following:
id | first_name | last_name | sex | address | salary |
---|---|---|---|---|---|
202EMPLPIM | Madeline | Heine | F | 22 Rochester St, Uhome, WY | 5000 |
202EMPLPIM | Margaret | Morehead | F | 100 Commerce Cr, Springfield, IL | 6000 |
201EMPLPIM | Sean | Froula | M | 2105 8th St, Uhome, WY | 7000 |
203EMPLPIM | Jennifer | Costello | F | 21 Walker Rd, Uhome, WY | 8000 |
After sink process, it will be split into follow filesets:
fileset0
id | first_name | last_name | sex | address | salary |
---|---|---|---|---|---|
202EMPLPIM | Madeline | Heine | F | 22 Rochester St, Uhome, WY | 5000 |
202EMPLPIM | Margaret | Morehead | F | 100 Commerce Cr, Springfield, IL | 6000 |
201EMPLPIM | Sean | Froula | M | 2105 8th St, Uhome, WY | 7000 |
fileset1
id | first_name | salary |
---|---|---|
202EMPLPIM | Madeline | 5000 |
202EMPLPIM | Margaret | 6000 |
fileset2
id | first_name | sex | address | salary |
---|---|---|---|---|
202EMPLPIM | Madeline | F | 22 Rochester St, Uhome, WY | 5000 |
202EMPLPIM | Margaret | F | 100 Commerce Cr, Springfield, IL | 6000 |
203EMPLPIM | Jennifer | F | 21 Walker Rd, Uhome, WY | 8000 |
Design
- Investigate if the Wrangler core library can be used to evaluate the expression for this plugin
- Can compression codec be changed for each dataset? or should it be set on the job level?
Security
Limitation(s)
Future Work
- Some future work – HYDRATOR-99999
- Another future work – HYDRATOR-99999
Test Case(s)
- Test case #1
- Test case #2
Sample Pipeline
Please attach one or more sample pipeline(s) and associated data.
Pipeline #1
Pipeline #2
Table of Contents
Checklist
- User stories documented
- User stories reviewed
- Design documented
- Design reviewed
- Feature merged
- Examplesandguides
- Integration tests
- Documentation for feature
- Short video demonstrating the feature
- No labels