Order By

Introduction 

Is a aggregator plugin that allows one to sort rows based on fields you specify and whether they should be sorted in ascending or descending order. 

Use-case

User is processing web access log and as part of his data pipeline user is aggregating response codes. User uses “GROUP BY” aggregation plugin in the pipeline to count for web responses in each response category. At the end of the file, user wants to sort it based on count before being written to the file. 

User Stories

  • User should have ability to specify single or composite field for sorting the records in the output
  • User should be able to specify field (basic type) from a nested structure for sorting the records
  • User should specify for each field how the records should be sorted
  • User can specify only basic types - String, Int, Long, Short, Float, Double, Byte as key, in case any other types are specified then error is thrown to notify the user

Example

Following is a simple example showing how Order By would work.

Input

First NameLast NameAgeZip Code
JoltieRoot2932826
HenryZilka6296789
BabyTrump1076563
DonaldTrump7034566
IvankaTrump3494306
BipashaBasu3967543
BabyIITrump1032816

Configuration is specified as follows
  • Input Schema
    • First Name, String
    • Last Name, String
    • Age, Int
    • Zip Code, Long
  • Sort by 
    • Last Name, Ascending
    • Age, Ascending
    • Zipcode, Descending
  • Output Schema
    • First Name, String
    • Last Name, String
    • Age, Int
    • Zip Code, Long

Output is as follows


First NameLast NameAgeZip Code
BipashaBasu3967543
JolieRoot2932826
BabyTrump1076563
BabyIITrump1032816
IvankaTrump3494306
DonaldTrump7076563
HenryZilka6296789

Implementation Tips

  • Investigate how ‘Group Comparator’ and ‘Sort Comparator’ work together and be used to achieve the functionality for this plugin.
  • Build a simple map-reduce program to show understand how the above functionality work — Implement Sort Comparator using StructuredRecord
  • If the above works, then Data Pipeline Application Template need to be modified to the set the sort class comparator and this shouldn’t affect the other plugins.

Design 

The order by plugin will use the Secondary Sort technique to sort the values (in ascending or descending order) passed to each reducer.

This plugin looks as below:

CompositeKeyWritable.java

/**
 * CustomWritable for the composite key.
 */
public class CompositeKey implements Writable, WritableComparable<CompositeKey> {

  private String structureRecordJSON;  //StructuredReocrd will be received as JSON string from the mapper.
  private String sortFieldsJSON;  //List of fields to be sorted will be received as JSON string from the mapper.
  
  /**
  *This comparator controls the sort order of the keys.
  */
  public int compareTo(CompositeKey other) {
      //Compare the structuredRecord objects parsed from the json string using JSON
  }
}

CompositeKeyMapper.java

/**
* Mapper class for Order by
*/
public class CompositeKeyMapper extends Mapper<LongWritable, Text, CompositeKey, Text> {

  /**
  *@param Longwritable blank object received for key.
  *@param StructuredRecord received as a JSON string
  *@param context object containing the configuration properties for the current job
  */
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  }
}

CompositeKeyReducer.java

/**
* Reducer class for Order by
*/
public class CompositeKeyReducer extends Reducer<CompositeKey, Text, CompositeKey, Text> {

  /**
  *@param CompositeKeyWritable key used for sorting.
  *@param StructuredRecord received as a JSON string
  *@param context object containing the configuration properties for the current job
  */
  @Override
  protected void reduce(CompositeKeyWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  }
}

CompositeKeyComparator.java

/**
* Custom sort class to sort the received structured records
*/
public class CompositeKeyComparator extends WritableComparator {
  @Override
  public int compare(WritableComparable wc1, WritableComparable wc2) {

    CompositeKey compositeKey1 = (CompositeKey) wc1;
    CompositeKey compositeKey2 = (CompositeKey) wc2;
    return compositeKey1.compareTo(compositeKey2);
  }
}

 

  • The plugin will also implement CompositeGroupComparator. java (custom group comparator) to control which keys are grouped together into a single call to the reduce() method and CompositePartitioner.java (custom partitioner class) to ensure all data with the same keys(a combination of some of the keys specified) is sent to the same reducer.
  • The prepareRun() method will register all the custom classes required for order-by
  • For sorting to be done on nested records, the name of the field should be fully qualified name starting from parent.

    {
    	"productName": "Angels & Demons",
    	"quantity" : 1,
    	"shippingAddress":{
    		"city": "Columbia",
    		"state" : "Illinois",
    		"zip" : 12568
    	},
    	"billingAddress":{
    		"city": "California City",
    		"state" : "California",
    		"zip" : 11220
    	}
    }

    For example: For the above stated order if user wants to sort the data in descending order based on the zip-code present in shippingAddress. The field should be specified as "shippingAddress->zip, desc"

 

EXAMPLE

This example would accept a structured nested record containing shippingAddress and billingAddress as record and sort the field in ascending order of productName and descending order of zip code present in shippingAddress

{
	"name": "OrderBy",
	"plugin": {
		"name": "OrderBy",
		"type": "transform",
		"label": "OrderBy",
		"properties": {
			"sortFieldList": "productName:asc,shippingAddress->zip:desc"
		}
	}
}


 

 

Table of Contents

Checklist

  • User stories documented 
  • User stories reviewed 
  • Design documented 
  • Design reviewed 
  • Feature merged 
  • Examples and guides 
  • Integration tests 
  • Documentation for feature 
  • Short video demonstrating the feature