Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Goals

Checklist

  •  User stories documented (Albert/Vinisha)
  •  User stories reviewed (Nitin)
  •  Design documented (Albert/Vinisha)
  •  Design reviewed (Terence/Andreas)
  •  Feature merged ()
  •  Examples and guides ()
  •  Integration tests () 
  •  Documentation for feature ()
  •  Blog post

...

  1. A data scientist wants to join data from multiple datasets to run a machine learning model. The first dataset, Customers, contains cust_id, cust_name and location. Second dataset, Purchases, contains purchase_id, cust_id and store. Third dataset, Transactions contains cust_id, tranasaction_data and purchase_amount. The data scientist wants to inner join these datasets on cust_id. 

    Customers:

    cust_idcust_namelocation
    1AlexCanada
    2BobCalifornia
    3CathieTexas  

    Purchases:

    purchase_idcust_idstore
    231Walmart
    242Dstore
    252Walmart  

    Transactions:

    cust_idtransaction_datepurchase_amount
    12015-01-23 20
    22015-01-12400
    32014-03-28600

     

    Joining above three datasets would give us following output:
    1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart), Transactions: (1, 2015-01-23, 20)>

    2 -> <Customers: (2, Bob, California), Purchases: (24, 2, Dstore), Transactions: (2, 2015-01-12, 400)>

    2 -> <Customers: (2, Bob, California), Purchases: (25, 2, Walmart), Transactions: (2, 2015-01-12, 400)>

  2. A data scientist wants to join data from multiple datasets. The first dataset, Customers, contains cust_id, cust_name and location. Second dataset, Purchases, contains purchase_id, cust_id and store. Third dataset, Transactions contains purchase_id, tranasaction_data and purchase_amount. The data scientist wants to inner join Customers and Purchases on cust_id and the resultant dataset can be joined to Transactions on purchase_id

    Customers:

    cust_idcust_namelocation
    1AlexCanada
    2BobCalifornia
    3CathieTexas   

    Purchases:

    purchase_idcust_idstore
    231Walmart
    242Dstore
    252Walmart  

    CustomersPurchases:

    cust_idcust_namelocationpurchase_idstore
    1AlexCanada23Walmart
    2BobCalifornia24Dstore
    3BobCalifornia25Walmart

    Transactions:  

    purchase_idtransaction_datepurchase_amount
    232015-01-23 20
    242015-01-12400
    252014-03-28600

    CustomersPurchasesTransactions:

    purchase_idcust_idcust_namelocationstoretransaction_datepurchase_amount
    231AlexCanadaWalmart2015-01-2320
    242BobCaliforniaDstore2015-01-12400
    253BobCaliforniaWalmart2014-03-28600
  3. A data scientist wants to join two or more datasets and wants to selects columns present in the output of join. For example, Customers has cust_id, cust_name and location. Second dataset, Purchases, has purchase_id, cust_id and store. After inner join of these 2 datasets on cust_id, output dataset should have cust_id, cust_name and purchase_id.

    Customers:

    cust_idcust_namelocation
    1AlexCanada
    2BobCalifornia
    3CathieTexas  

    Purchases:

    purchase_idcust_idstore
    231Walmart
    242Dstore
    252Walmart  

    CustomersPurchases:

    cust_idcust_namepurchase_id
    1Alex23
    2Bob24
    2Bob25

...

  1. Hydrator will provide, one by one, input records from customers and purchases with stage name to Joiner plugin.
  2. joinOn() method in the plugin will interpret schema for that stage and return join key for each stage. 
    For example: For customers:  joinOn(Customers, (1, Alex, Canada)) will return 1, for Purchases: joinOn(Purchases, (23, 1, Walmart)) will return 1
  3. Hydrator will be responsible for joining records based on the type of join using following 2 engines. 
    1. For Map Reduce:
      In map() phase, joinOn() will get called for each stage record and intermediate output with tagged record will be emitted. For example:-
      For Customers record, (1, Alex, Canada) => Intermediate output: <1, Customers: (1, Alex, Canada)>
      For Purchases record, (23, 1, Walmart) => Intermediate output: <1, Purchases: (23, 1, Walmart)>
      In reduce() phase, depending on plugin type, hydrator will join intermediate output records. For example,
      For cust_id = 1, output of inner join will be: 1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)>
    2. For Spark:
      Spark has join api for pairedRdd. For each record from each source, joinOn can be called from hydrator for each record and spark join api can be used on paired rdd: rdd1.join(rdd2). 
      To join more than 2 sources on same key, spark join() api can be called multiple times.
      For example, For inputs Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart), after inner join, output will be: 1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)>


Implementation:

Config.json for the plugin will look like. As we are using special characters for providing join keys, we will need to validate stage names.

...

  • Join can be applied only on equality
  • Join is applied only on AND for composite keys and not for OR
  • Join plugin will be available for batch and not realtime as it requires mapreduce and spark programs to be executed.
  • Join plugin will not use 'where' clause like sql join.
  • Join plugin requires to have join key from each source for each join condition. For example, if we have 3 inputs from Customers, Transactions and Purchases, then join keys should be Customers.id=Transactions.id=Purchases.id&Customers.name=Transactions.name=Purchases.name. We can not have Customers.id=Transactions.id=Purchases.id&Customers.name=Purchases.name. To achieve the same result as Customers.id=Transactions.id=Purchases.id&Customers.name=Purchases.name condition, we can use Joiner plugin twice in datapipeline by joining Customers.id=Purchases.id&Customers.name=Purchases.name first and then join on Transactions.id, which will give the same result.