Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 25 Next »

Introduction

As a user of the Pipeline tool, I would like to define a single plugin that writes to multiple file sets depending on a value in the message. This is useful for building a generic pipeline that can be used to write to a different number fo sinks on each run.

Use case(s)

  • A user would like to create a single pipeline that can be used to process files from different clients. The pipeline will be configured using macros so that each run can be customized depending on the file being read in. The file contains lines that start with identifiers such as 101 and 202, each of these lines should be sent to a different dataset. Each client has a different set of identifiers in the file and would like to write to a different number of output file sets. The user would use this plugin, in combination with conditional directives from Wrangler to dynamically configure and write the records to different file sets.
  • This plugin would almost always be configured via macros

User Storie(s)

  • User story #1
  • User story #2
  • User story #3
  • User story #m

Plugin Type

  • Batch Sink 

Configurable

This section defines properties that are configurable for this plugin. 

User Facing NameRequiredDefaultDescription
Sink NameYNoneNames of the Sink plugin.
TypeYAutomaticType of this sink is always batch sink.

OutputFileSets:

User Facing NameRequiredDefaultDescription
Dataset NameYNoneName of the PartitionedFileSet to which records are written. If they do not exist, they will be created. (Macro-enabled)
ExpressionYNoneExpression for file id filter and different fields filter.
DatasetTargetPathN[Namespace]/data/[Dataset name]The path where the data will be recorded. Defaults to the name of the dataset.
FileSet Properties:NNoneThe Parquet file schema of the record being written to the sink as a JSON object. If there are multiple file sets to be written to, multiple schemas of each file in JSON should be combined and written together as a single JSON. (Macro-enabled)
Compression CodecNNoneOptional parameter to determine the compression codec to use on the resulting data. Valid values are None, Snappy, GZip, and LZO.
SchemaYNoneThe schema of the record being written to the sink as a JSON Object. Must be specified manually.

 

Example

This example will write to a PartitionedFileSet named 'users'. It will write data in Parquet format compressed using Snappy compressions using the given schema. Every time the pipeline runs, the most recent run will be stored in a new partition in the PartitionedFileSet:

{code}
{
    "name": "Multiple File Set Sink",
    "type": "batchsink",

    "outputFileSets": "[
                  {
                    "datasetName":"fileset0",
                    "expression" :  "(id.startsWith("201") || id.startsWith("202") || id.startsWith("203") )&& salary >= 5000 && salary <= 7000"
                    "datasetTargetPaths" : "",
                    "filesetProperties" : "",
         "compressionCodec": "Snappy",
                    "schema" : "
         {  
            "fields":[
                {"id":"id","type":"string"},
                {"first name":"first name", "type":"string"},
                {"last name":"last name", "type":"string"},
                                   {"sex":"sex", "type":"char"},
                {"address":"address","type":"string"},
                {"salary":"salary","type":"int"},                               
            ]
          }
                  },


                 {
                    "datasetName":"fileset1",
                    "expression" :  " id.startsWith("202") && salary >= 5000 && salary <= 7000"
                    "datasetTargetPaths" : "",
                    "filesetProperties" : "",
         "compressionCodec": "Snappy",
                    "schema" : "
         {
            "fields":[
                {"id":"id","type":"string"},
                {"first name":"first name", "type":"string"},
                {"salary":"salary","type":"int"},                               
            ]
          }
                  },


                 {
                    "datasetName":"fileset2",
                    "expression" : "(id.startsWith("202") || id.startsWith("203")) && salary >= 5000 && salary < 9000"
                    "datasetTargetPaths" : "",
                    "filesetProperties" : "",
         "compressionCodec": "Snappy",
                    "schema" : "
         {
            "fields":[
                {"id":"id","type":"string"},
                {"first name":"first name", "type":"string"},
                                   { "sex":"sex", "type":"char" },
                {"address":"address","type":"string"},
                {"salary":"salary","type":"int"},                               
            ]
          }
                  }
       ]"
    }
}

 

Original data source is as following:

idfirst_namelast_namesexaddresssalary
202EMPLPIM MadelineHeineF 22 Rochester St, Uhome, WY   5000
202EMPLPIMMargaretMoreheadF100 Commerce Cr, Springfield, IL6000
201EMPLPIM  SeanFroulaM2105 8th St, Uhome, WY7000
203EMPLPIM  Jennifer CostelloF21 Walker Rd, Uhome, WY8000

 

 

After sink process, it will be split into follow filesets:

fileset0

idfirst_namelast_namesexaddresssalary
202EMPLPIM MadelineHeineF22 Rochester St, Uhome, WY  5000
202EMPLPIMMargaretMoreheadF100 Commerce Cr, Springfield, IL6000
201EMPLPIM  SeanFroulaM2105 8th St, Uhome, WY7000

 

fileset1

idfirst_namesalary
202EMPLPIM Madeline5000
202EMPLPIMMargaret6000

 

fileset2

idfirst_namesexaddresssalary
202EMPLPIM MadelineF22 Rochester St, Uhome, WY   5000
202EMPLPIMMargaretF100 Commerce Cr, Springfield, IL6000
203EMPLPIM  Jennifer F21 Walker Rd, Uhome, WY8000

 

Design

  • Investigate if the Wrangler core library can be used to evaluate the expression for this plugin
  • Can compression codec be changed for each dataset? or should it be set on the job level?

 

Security

Limitation(s)

Future Work

  • Some future work – HYDRATOR-99999
  • Another future work – HYDRATOR-99999

Test Case(s)

  • Test case #1
  • Test case #2

Sample Pipeline

Please attach one or more sample pipeline(s) and associated data. 

Pipeline #1

Pipeline #2

 

 

Table of Contents

Checklist

  • User stories documented 
  • User stories reviewed 
  • Design documented 
  • Design reviewed 
  • Feature merged 
  • Examplesandguides 
  • Integration tests 
  • Documentation for feature 
  • Short video demonstrating the feature
  • No labels