Multiple File Set Sink

Introduction

As users start to make more and more generic pipelines that are configured at runtime through macros, we will need the ability to write to to multiple sinks with a single plugin since we cannot modify the DAG once a pipeline is deployed. This plugin is designed to be dynamically configured at runtime to write to multiple filesets for a single run.

Use case(s)

  • A user would like to create a single pipeline that can be used to process files from different clients. The pipeline will be completely configured using macros so that each run can be customized depending on the client file being read in. Each file contains multiple lines that start with identifiers such as 101 and 202, each of these lines should be sent to a different dataset. Each client has a different set of identifiers in the file and would like to write to a different name and number of output file sets. The user would use this plugin, in combination with conditional directives from Wrangler to dynamically configure and write the records to different filesets.
  • This plugin would almost always be configured via macros at the pipeline runtime.

User Stories

  • As a pipeline developer, I would like the ability to dynamically change the number of filesets being written to at runtime. Determining which record to write to each dataset should be based on the data in each record.
  • As a pipeline developer, I would like to define different schemas for each fileset I am writing to, which means this plugin will allow different input schemas.
  • As a pipeline developer, I would like to determine which fileset to write each record to based on one or more fields in the record.
  • As a pipeline developer, I would like to be able to compress the data written to each of these datasets.
  • As a pipeline developer, I would like to be able to partition the filesets using the runtime

Plugin Type

  • Batch Sink 

Configurable

This section defines properties that are configurable for this plugin. 

User Facing NameRequiredDefaultDescription
Sink NameYNoneNames of the Sink plugin.
TypeYAutomaticType of this sink is always batch sink.

OutputFileSets:

User Facing NameRequiredDefaultDescription
Dataset NameYNoneName of the PartitionedFileSet to which records are written. If they do not exist, they will be created. (Macro-enabled)
ExpressionYNoneExpression for file id filter and different fields filter.
DatasetTargetPathN[Namespace]/data/[Dataset name]The path where the data will be recorded. Defaults to the name of the dataset.
FileSet Properties:NNoneThe Parquet file schema of the record being written to the sink as a JSON object. If there are multiple file sets to be written to, multiple schemas of each file in JSON should be combined and written together as a single JSON. (Macro-enabled)
Compression CodecNNoneOptional parameter to determine the compression codec to use on the resulting data. Valid values are None, Snappy, GZip, and LZO.
SchemaYNoneThe schema of the record being written to the sink as a JSON Object. Must be specified manually.

 

Example

This example will write to a PartitionedFileSet named 'users'. It will write data in Parquet format compressed using Snappy compressions using the given schema. Every time the pipeline runs, the most recent run will be stored in a new partition in the PartitionedFileSet:

{
 "name": "Multiple File Set Sink",
 "type": "batchsink",
 "outputFileSets": [{
   "compressionCodec": "Snappy",
   "datasetName": "fileset0",
   "datasetTargetPaths": "",
   "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000",
   "filesetProperties": "",
   "schema": {
    "type": "record",
    "name": "fileset0",
    "fields": [{
     "name": "id",
     "type": "string"
    }, {
     "name": "first_name",
     "type": "string"
    }, {
     "name": "last_name",
     "type": "string"
    }, {
     "name": "sex",
     "type": "string"
    }, {
     "name": "address",
     "type": "string"
    }, {
     "name": "salary",
     "type": "string"
    }]
   }
  },
  {
   "compressionCodec": "Snappy",
   "datasetName": "fileset1",
   "datasetTargetPaths": "",
   "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000",
   "filesetProperties": "",
   "schema": {
    "type": "record",
    "name": "sales",
    "fields": [{
     "name": "id",
     "type": "string"
    }, {
     "name": "first_name",
     "type": "string"
    }, {
     "name": "salary",
     "type": "string"
    }]
   }
  },
  {
   "compressionCodec": "Snappy",
   "datasetName": "fileset2",
   "datasetTargetPaths": "",
   "expression": "(id.startsWith(\"201\") || id.startsWith(\"202\") || id.startsWith(\"203\") )&& salary >= 5000 && salary <= 7000",
   "filesetProperties": "",
   "schema": {
    "type": "record",
    "name": "sales",
    "fields": [{
     "name": "id",
     "type": "string"
    }, {
     "name": "first_name",
     "type": "string"
    }, {
     "name": "sex",
     "type": "string"
    }, {
     "name": "address",
     "type": "string"
    }, {
     "name": "salary",
     "type": "string"
    }]
   }
  }
 ]
}

 

Original data source is as following:

idfirst_namelast_namesexaddresssalary
202EMPLPIM MadelineHeineF 22 Rochester St, Uhome, WY   5000
202EMPLPIMMargaretMoreheadF100 Commerce Cr, Springfield, IL6000
201EMPLPIM  SeanFroulaM2105 8th St, Uhome, WY7000
203EMPLPIM  Jennifer CostelloF21 Walker Rd, Uhome, WY8000

 

 

After sink process, it will be split into follow filesets:

fileset0

idfirst_namelast_namesexaddresssalary
202EMPLPIM MadelineHeineF22 Rochester St, Uhome, WY  5000
202EMPLPIMMargaretMoreheadF100 Commerce Cr, Springfield, IL6000
201EMPLPIM  SeanFroulaM2105 8th St, Uhome, WY7000

 

fileset1

idfirst_namesalary
202EMPLPIM Madeline5000
202EMPLPIMMargaret6000

 

fileset2

idfirst_namesexaddresssalary
202EMPLPIM MadelineF22 Rochester St, Uhome, WY   5000
202EMPLPIMMargaretF100 Commerce Cr, Springfield, IL6000
203EMPLPIM  Jennifer F21 Walker Rd, Uhome, WY8000

 

Design

  • Investigate if the Wrangler core library can be used to evaluate the expression for this plugin
  • Can compression codec be changed for each dataset? or should it be set on the job level?
  • How can we write to multiple filesets from the same transform function?

 

Security

Limitation(s)

  • This can only be used to write to time partitioned filesets

 

Future Work

  • Add the ability to write to all different types of datasets
  • Add the ability to write datasets with different partitioning schemes

Test Case(s)

  • Test case #1
  • Test case #2

Sample Pipeline

Please attach one or more sample pipeline(s) and associated data. 

Pipeline #1

Pipeline #2

 

 

Table of Contents

Checklist

  • User stories documented 
  • User stories reviewed 
  • Design documented 
  • Design reviewed 
  • Feature merged 
  • Examplesandguides 
  • Integration tests 
  • Documentation for feature 
  • Short video demonstrating the feature