Order By
- Nitin Motgi
- Yue Gao
- Ananya Bhattacharya
Introduction
Use-case
User Stories
- User should have ability to specify single or composite field for sorting the records in the output
- User should be able to specify field (basic type) from a nested structure for sorting the records
- User should specify for each field how the records should be sorted
- User can specify only basic types - String, Int, Long, Short, Float, Double, Byte as key, in case any other types are specified then error is thrown to notify the user
Example
Following is a simple example showing how Order By would work.
Input
First Name | Last Name | Age | Zip Code |
Joltie | Root | 29 | 32826 |
Henry | Zilka | 62 | 96789 |
Baby | Trump | 10 | 76563 |
Donald | Trump | 70 | 34566 |
Ivanka | Trump | 34 | 94306 |
Bipasha | Basu | 39 | 67543 |
BabyII | Trump | 10 | 32816 |
- Input Schema
- First Name, String
- Last Name, String
- Age, Int
- Zip Code, Long
- Sort by
- Last Name, Ascending
- Age, Ascending
- Zipcode, Descending
- Output Schema
- First Name, String
- Last Name, String
- Age, Int
- Zip Code, Long
Output is as follows
First Name | Last Name | Age | Zip Code |
Bipasha | Basu | 39 | 67543 |
Jolie | Root | 29 | 32826 |
Baby | Trump | 10 | 76563 |
BabyII | Trump | 10 | 32816 |
Ivanka | Trump | 34 | 94306 |
Donald | Trump | 70 | 76563 |
Henry | Zilka | 62 | 96789 |
Implementation Tips
- Investigate how ‘Group Comparator’ and ‘Sort Comparator’ work together and be used to achieve the functionality for this plugin.
- Build a simple map-reduce program to show understand how the above functionality work — Implement Sort Comparator using StructuredRecord
- If the above works, then Data Pipeline Application Template need to be modified to the set the sort class comparator and this shouldn’t affect the other plugins.
Design
The order by plugin will use the Secondary Sort technique to sort the values (in ascending or descending order) passed to each reducer.
This plugin looks as below:
CompositeKeyWritable.java
/** * CustomWritable for the composite key. */ public class CompositeKey implements Writable, WritableComparable<CompositeKey> { private String structureRecordJSON; //StructuredReocrd will be received as JSON string from the mapper. private String sortFieldsJSON; //List of fields to be sorted will be received as JSON string from the mapper. /** *This comparator controls the sort order of the keys. */ public int compareTo(CompositeKey other) { //Compare the structuredRecord objects parsed from the json string using JSON } }
CompositeKeyMapper.java
/** * Mapper class for Order by */ public class CompositeKeyMapper extends Mapper<LongWritable, Text, CompositeKey, Text> { /** *@param Longwritable blank object received for key. *@param StructuredRecord received as a JSON string *@param context object containing the configuration properties for the current job */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { } }
CompositeKeyReducer.java
/** * Reducer class for Order by */ public class CompositeKeyReducer extends Reducer<CompositeKey, Text, CompositeKey, Text> { /** *@param CompositeKeyWritable key used for sorting. *@param StructuredRecord received as a JSON string *@param context object containing the configuration properties for the current job */ @Override protected void reduce(CompositeKeyWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { } }
CompositeKeyComparator.java
/** * Custom sort class to sort the received structured records */ public class CompositeKeyComparator extends WritableComparator { @Override public int compare(WritableComparable wc1, WritableComparable wc2) { CompositeKey compositeKey1 = (CompositeKey) wc1; CompositeKey compositeKey2 = (CompositeKey) wc2; return compositeKey1.compareTo(compositeKey2); } }
- The plugin will also implement CompositeGroupComparator. java (custom group comparator) to control which keys are grouped together into a single call to the reduce() method and CompositePartitioner.java (custom partitioner class) to ensure all data with the same keys(a combination of some of the keys specified) is sent to the same reducer.
- The prepareRun() method will register all the custom classes required for order-by
For sorting to be done on nested records, the name of the field should be fully qualified name starting from parent.
{ "productName": "Angels & Demons", "quantity" : 1, "shippingAddress":{ "city": "Columbia", "state" : "Illinois", "zip" : 12568 }, "billingAddress":{ "city": "California City", "state" : "California", "zip" : 11220 } }
For example: For the above stated order if user wants to sort the data in descending order based on the zip-code present in shippingAddress. The field should be specified as "shippingAddress->zip, desc"
EXAMPLE
This example would accept a structured nested record containing shippingAddress and billingAddress as record and sort the field in ascending order of productName and descending order of zip code present in shippingAddress
{ "name": "OrderBy", "plugin": { "name": "OrderBy", "type": "transform", "label": "OrderBy", "properties": { "sortFieldList": "productName:asc,shippingAddress->zip:desc" } } }
Table of Contents
Checklist
- User stories documented
- User stories reviewed
- Design documented
- Design reviewed
- Feature merged
- Examples and guides
- Integration tests
- Documentation for feature
- Short video demonstrating the feature