Other designs

This page contains another design proposed to implement join. This design is not being implemented because it is more generic mapreduce kind of design which is not optimal for Spark.

Design:

To support join on multiple stages, we introduce new plugin type: Reducer. The java api for this plugin looks as below:

Reducer.java

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface Reducer<REDUCE_KEY, REDUCE_VALUE, OUT_VALUE, OUT> {
 
  /**
   * Emit the reduce key(s) for a given input value along with stageName from which the value belongs to. If no reduce key is emitted, the input value
   * is filtered out.
   *
   * @param reduceValue the value to reduce and stageName to which value belongs to
   * @param emitter the emitter to emit zero or more key-value pair of reduce key and intermediate value with stageName
   * @throws Exception if there is some error getting the key
   */
  void reduceOn(ReducerInput<JOIN_VALUE> reduceValue, Emitter<KeyValue<REDUCE_KEY, ReducerOutput<OUT_VALUE>>> emitter) throws Exception;
 
  /**
   * Reduce objects into zero or more output objects.
   *
   * @param reduceKey the key to be reduced on
   * @param outValues an iterator over all tagged objects that have the same reduce key
   * @param emitter the emitter to emit reduced values
   * @throws Exception if there is some error
   */
  void reduce(REDUCE_KEY reduceKey,  Iterator<ReducerOutput<OUT_VALUE>> outValues, Emitter<OUT> emitter) throws Exception;
 
}

 


ReducerInput.java

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
 * Defines immutable input to Reducer
 * @param <REDUCE_VALUE>
 */
public class ReducerInput<REDUCE_VALUE> {
  // stage to which reduceValue belongs to
  private String stageName;
  // reduce value
  private REDUCE_VALUE reduceValue;
 
  public String getStageName() {
    return stageName;
  }
 
  public REDUCE_VALUE getReduceValue() {
    return reduceValue;
  }
}

 

ReducerOutput.java

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * Defines intermediate output of Reducer.reduceOn()
 * @param <OUT_VALUE>
 */
public class ReducerOutput<OUT_VALUE> {
  // stage to which outValue belongs to
  private String stageName;
  // out value
  private OUT_VALUE outValue;
 
  public String getStageName() {
    return stageName;
  }
 
  public OUT_VALUE getOutValue() {
    return outValue;
  }
 
  public void setStageName(String stageName) {
    this.stageName = stageName;
  }
 
  public void setOutValue(OUT_VALUE outValue) {
    this.outValue = outValue;
  }
}

 

To understand this api more, lets take an example. We want to inner join Customers dataset to Purchases on cust_id. 

Customers:

cust_id
cust_name
location
1AlexCanada
2BobCalifornia
3CathieTexas  

 

Purchases:

purchase_id
cust_id
store
231Walmart
242Dstore
252Walmart  
  1. Hydrator will provide, one by one, input records from customers and purchases to plugin's reduceOn() method with stagename from which the record is coming. For example,
    For Customers, reduceOn() will get <Customers: (1, Alex, Canada)>, <Customers: (2, Bob, California)>
  2. In reduceOn() plugin intermediate output records will be emitted with join key and value with stagename to which the record belongs to. The plugin developer can change the intermediate output value if she wants.
    For example, for <Customers: (1, Alex, Canada)> => <1, Customers: (1, Alex, Canada)> will be emitted as intermediate outout
  3. Hydrator will group the emitted keys and provide iterator over intermediate output values. For example,
    1- > <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)>
    2 -> <Customers: (2, Bob, California), Purchases: (24, 2, Dstore), Purchases: (25, 2, Walmart)>
  4. Plugin developer, in reduce() method, will be responsible for actually joining both the records. So the final output after inner joining records from Customers and Purchases on key being cust_id will be:
     1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)>
     2 -> <Customers: (2, Bob, California), Purchases: (24, 2, Dstore)>, <Customers: (2, Bob, California), Purchases: (25, 2, Walmart)>

For MapReduce:

In Map phase, hydrator will call reduceOn() on each record from each stage. In reduce phase, hydrator will call reduce() with list of intermediate output records per join key.

 

For Spark:

In spark, instead of using join() api for spark, we can use groupByKey() to group all the records based on key which will give us RDD of type [K, Iteratble[V]] which can be passed to reduce(). 

Implementation 

The config for this plugin will look like:

config.json

 

{
  "stages": [
    {
      "name""customers",
      "plugin": {
        "name""Table",
        "type""batchsource", ...
      }
    },
    {
      "name""purchases",
      "plugin": {
        "name""Table",
        "type""batchsource", ...
      }
    },
    {
      "name""transactions",
      "plugin": {
        "name""Table",
        "type""batchsource", ...
      }
    },
    {
      "name""join",
      "plugin": {
        "name""InnerJoin",
        "type""reducer",
        "properties":
        
            "reduceKeys""customers.cust_id, purchases.cust_id, transactions.cust_id, <stageName>.<join_field>.."
        }
      }
    },
    ...
  ],
  "connections": [
    "from""customers""to""join" },
    "from""purchases""to""join" },
    "from""transactions""to""join" },
    "from""join""to""sink" },
  ]
}

 

 

Below is the sample example of how we can implement innerjoin to join records from 2 stages using above java apis.

InnerJoin.java

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Plugin(type = "reducer")
@Name("InnerJoin")
public class InnerJoin extends Reducer<StructuredRecord, StructuredRecord, StructuredRecord, StructuredRecord> {
  private static final InnerJoinConfig config;
  private static final Map<String, String> stageNameToJoinKey;
    
  public static class InnerJoinConfig extends PluginConfig {
    private String joinOnFields;
     
    public Map<String, String> getStageNameToJoinKey(){
      Map<String, String> fields = new HashMap<>();
      // get stage name and join key for that stage
      for (String field : Splitter.on(',').trimResults().split(joinOnFields)) {
        Iterable<String> list = Splitter.on('.').trimResults().split(field);
        Iterator<String> iterator = list.iterator();
        String stageName = iterator.next();
        String joinKey = iterator.next();
        fields.put(stageName, joinKey);
     }
    }
  }
   
  public void configurePipeline(PipelineConfigurer configurer) {
    StageConfigurer stageConfigurer = configurer.getStageConfigurer();
    stageConfigurer.setOutputSchema(config.getSchema());
    stageNameToJoinKey = config.getStageNameToJoinKey();
  }
   
  public reduceOn(ReducerInput<StructuredRecord> input, Emitter<KeyValue<StructuredRecord, ReducerOutput<StructuredRecord>>> emitter) {
    // emit key-value pair with key being joinkey and value being out value with stageName
    String stageName = input.getStageName();
    String joinKey = stageNameToJoinKey.get(stageName);
    JoinerOutput<StructuredRecord> output = new JoinerOutput<>();
    // mark out value with stageName
    output.setStageName(stageName);
    output.setOutValue(getRecordSubset(input, joinKey));
    emitter.emit(getKeyedRecord(joinKey, output));
  }
   
  public void reduce(StructuredRecord joinKey, Iterable<ReducerOutput<StructuredRecord>> outRecords, Emitter<StructuredRecord> emitter) {
    // inner records with same joinKey
    for (ReducerOutput<StructuredRecord> record : outRecords) {
      for (ReducerOutput<StructuredRecord> anotherRecord : outRecords) {
        if (record != anotherRecord && !(record.getStageName().equals(anotherRecord.getStageName()))) {
            emitter.emit(getReducedOutput(record, anotherRecord));
        }
      }
    }
  }

 

Pros:

  • Plugin api is very generic. Plugin Developer will have flexibility and control over operations performed on grouped records in reduce() method.

Cons:

  • It is not inherently suitable for spark. As this is more like map/reduce api, it will not use spark join api. So this may not be the optimized approach for implementing join on spark.