Hydrator 3.5 - Joins

Goals

Checklist

  • User stories documented (Albert/Vinisha)
  • User stories reviewed (Nitin)
  • Design documented (Albert/Vinisha)
  • Design reviewed (Terence/Andreas)
  • Feature merged ()
  • Examples and guides ()
  • Integration tests () 
  • Documentation for feature ()
  • Blog post

Use Cases

  1. A data scientist wants to join data from multiple datasets to run a machine learning model. The first dataset, Customers, contains cust_id, cust_name and location. Second dataset, Purchases, contains purchase_id, cust_id and store. Third dataset, Transactions contains cust_id, tranasaction_data and purchase_amount. The data scientist wants to inner join these datasets on cust_id. 

    Customers:

    cust_idcust_namelocation
    1AlexCanada
    2BobCalifornia
    3CathieTexas  

    Purchases:

    purchase_idcust_idstore
    231Walmart
    242Dstore
    252Walmart  

    Transactions:

    cust_idtransaction_datepurchase_amount
    12015-01-23 20
    22015-01-12400
    32014-03-28600

     

    Joining above three datasets would give us following output:
    1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart), Transactions: (1, 2015-01-23, 20)>

    2 -> <Customers: (2, Bob, California), Purchases: (24, 2, Dstore), Transactions: (2, 2015-01-12, 400)>

    2 -> <Customers: (2, Bob, California), Purchases: (25, 2, Walmart), Transactions: (2, 2015-01-12, 400)>

  2. A data scientist wants to join data from multiple datasets. The first dataset, Customers, contains cust_id, cust_name and location. Second dataset, Purchases, contains purchase_id, cust_id and store. Third dataset, Transactions contains purchase_id, tranasaction_data and purchase_amount. The data scientist wants to inner join Customers and Purchases on cust_id and the resultant dataset can be joined to Transactions on purchase_id

    Customers:

    cust_idcust_namelocation
    1AlexCanada
    2BobCalifornia
    3CathieTexas   

    Purchases:

    purchase_idcust_idstore
    231Walmart
    242Dstore
    252Walmart  

    CustomersPurchases:

    cust_idcust_namelocationpurchase_idstore
    1AlexCanada23Walmart
    2BobCalifornia24Dstore
    3BobCalifornia25Walmart

    Transactions:  

    purchase_idtransaction_datepurchase_amount
    232015-01-23 20
    242015-01-12400
    252014-03-28600

    CustomersPurchasesTransactions:

    purchase_idcust_idcust_namelocationstoretransaction_datepurchase_amount
    231AlexCanadaWalmart2015-01-2320
    242BobCaliforniaDstore2015-01-12400
    253BobCaliforniaWalmart2014-03-28600
  3. A data scientist wants to join two or more datasets and wants to selects columns present in the output of join. For example, Customers has cust_id, cust_name and location. Second dataset, Purchases, has purchase_id, cust_id and store. After inner join of these 2 datasets on cust_id, output dataset should have cust_id, cust_name and purchase_id.

    Customers:

    cust_idcust_namelocation
    1AlexCanada
    2BobCalifornia
    3CathieTexas  

    Purchases:

    purchase_idcust_idstore
    231Walmart
    242Dstore
    252Walmart  

    CustomersPurchases:

    cust_idcust_namepurchase_id
    1Alex23
    2Bob24
    2Bob25

User Stories

  1. As a pipeline developer, I want to be able to join (inner, left outer, right outer, full outer) two or more stage outputs on some common fields, or do a cross join.
  2. As a pipeline developer, I want to be able to get metrics on number of records in and records out of the join.
  3. [UI] As a pipeline developer, I want to be able to see the schema of all input into the join, and the schema output by the join.
  4. As a pipeline developer, I want to be able to choose whether the pipeline with the join runs with mapreduce or spark.
  5. As a plugin developer, I want to be able to write a plugin that gets data from multiple stages joins them.

Design

To support Join on two or more stages, we will need a new plugin type. We propose 2 designs for this plugin. After the discussion, we have decided to implement join with design 1.

Design 1:

In this design we introduce new Joiner plugin type. The API for this plugin will look like:

Joiner.java
public interface Joiner<RECORD_KEY, RECORD> {

  /**
   *  Returns record key based on record and stage name to which records belongs to.
   *
   * @param input record which needs to be joined
   * @param stageName stage name to which records belogs to
   * @return Join key
   * @throws Exception if there is some error getting the key for join
   */
  public RECORD_KEY joinOn(String stageName, RECORD record) throws Exception;
}

 

To understand the api, lets take a simple example, we want to inner join Customers dataset to Purchases on cust_id. 

Customers:

cust_idcust_namelocation
1AlexCanada
2BobCalifornia
3CathieTexas  

Purchases:

purchase_idcust_idstore
231Walmart
242Dstore
252Walmart  

 

  1. Hydrator will provide, one by one, input records from customers and purchases with stage name to Joiner plugin.
  2. joinOn() method in the plugin will interpret schema for that stage and return join key for each stage. 
    For example: For customers:  joinOn(Customers, (1, Alex, Canada)) will return 1, for Purchases: joinOn(Purchases, (23, 1, Walmart)) will return 1
  3. Hydrator will be responsible for joining records based on the type of join using following 2 engines. 
    1. For Map Reduce:
      In map() phase, joinOn() will get called for each stage record and intermediate output with tagged record will be emitted. For example:-
      For Customers record, (1, Alex, Canada) => Intermediate output: <1, Customers: (1, Alex, Canada)>
      For Purchases record, (23, 1, Walmart) => Intermediate output: <1, Purchases: (23, 1, Walmart)>
      In reduce() phase, depending on plugin type, hydrator will join intermediate output records. For example,
      For cust_id = 1, output of inner join will be: 1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)>
    2. For Spark:
      Spark has join api for pairedRdd. For each record from each source, joinOn can be called from hydrator for each record and spark join api can be used on paired rdd: rdd1.join(rdd2). 
      To join more than 2 sources on same key, spark join() api can be called multiple times.
      For example, For inputs Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart), after inner join, output will be: 1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)>


Implementation:

Config.json for the plugin will look like. As we are using special characters for providing join keys, we will need to validate stage names.

{
  "stages": [
    {
      "name": "customers",
      "plugin": {
        "name": "Table",
        "type": "batchsource", ...
      }
    },
    {
      "name": "purchases",
      "plugin": {
        "name": "Table",
        "type": "batchsource", ...
      }
    },
    {
      "name": "transactions",
      "plugin": {
        "name": "Table",
        "type": "batchsource", ...
      }
    },
    {
      "name": "joinStage",
      "plugin": {
        "name": "Join",
        "type": "batchjoiner",
        "properties": 
		{ 
            "joinKeys": "customers.cust_id,purchases.cust_id,transactions.cust_id, <stageName1>.<join_field1>:customers.cust_name..<stageName1>.<join_field2>..",// : will be used to separate composite join key from each stage. Total number of join keys will be x * numOfInputs.
			"joinType": "innerjoin/outerjoin", // Only 2 join types are needed
			"numOfInputs": "3" // number of stages connected to joiner plugin
			"fieldsToSelect": "customers.cust_id,transactions.cust_id,<stageName>.<out_field>", // fields to be selected (without rename) in join output
			"fieldsToRename": "customers.cust_id:id,purchases.cust_id:customer_id,<stageName>.<old_field_name>:<new_field_name>", 
			"requiredInputs": "transactions,<stageName>" // Required only for outer join, to decide which stage values will be null.
        }
      }
    },
    ...
  ],
  "connections": [
    { "from": "customers", "to": "joinStage" },
    { "from": "purchases", "to": "joinStage" },
    { "from": "transactions", "to": "joinStage" },
    { "from": "innerJoin", "to": "sink" },
  ]
}

Outer join:

With this design outer join can be performed on more than two stages. All the nullableInput stages will have null values for their fields, if they do not match join criteria. NullableInputs are only required for outer join type, Inner join will not take nullableInputs into consideration. 

Output Schema:

As join plugin takes multiple input schemas, there is a possibility of having duplicate field name in different schemas like customers.cust_id and purchases.cust_id. User can remove/rename/select fields using above config properties, similar to projection plugin. In order to populate output schema on UI, app can provide getOutputSchema REST endpoint to get output schema. In case of repeating fields in specified output schema, backend should send error message with list of repeating fields to the user. User can modify the output schema by either dropping/renaming/selecting fields and UI will again call getOutputSchema REST endpoint. If none of drop/select/rename properties are used, by default, output schema will have all the fields with field names as <stageName>.<fieldName>.

fieldsToDrop: All fields will be in the output schema except these fields

fieldsToSelect: Only these fields will be in output schema

fieldsToRename: Fields to be renamed

 

Pros:

  • Plugin does not need to worry about join logic. Hydrator will be responsible for joins. This makes join plugin development easy. 

Cons:

  • Hydrator will have all the logic about join and plugin will only be responsible for emitting keys. This will make Hydrator app heavy.

Assumptions

  • Join can be applied only on equality
  • Join is applied only on AND for composite keys and not for OR
  • Join plugin will be available for batch and not realtime as it requires mapreduce and spark programs to be executed.
  • Join plugin will not use 'where' clause like sql join.
  • Join plugin requires to have join key from each source for each join condition. For example, if we have 3 inputs from Customers, Transactions and Purchases, then join keys should be Customers.id=Transactions.id=Purchases.id&Customers.name=Transactions.name=Purchases.name. We can not have Customers.id=Transactions.id=Purchases.id&Customers.name=Purchases.name. To achieve the same result as Customers.id=Transactions.id=Purchases.id&Customers.name=Purchases.name condition, we can use Joiner plugin twice in datapipeline by joining Customers.id=Purchases.id&Customers.name=Purchases.name first and then join on Transactions.id, which will give the same result.