Goals
Checklist
- User stories documented (Albert/Vinisha)
- User stories reviewed (Nitin)
- Design documented (Albert/Vinisha)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
...
- A data scientist wants to join data from multiple datasets to run a machine learning model. The first dataset, Customers, contains cust_id, cust_name and location. Second dataset, Purchases, contains purchase_id, cust_id and store. Third dataset, Transactions contains cust_id, tranasaction_data and purchase_amount. The data scientist wants to inner join these datasets on cust_id.
Customers:
cust_id cust_name location 1 Alex Canada 2 Bob California 3 Cathie Texas Purchases:
purchase_id cust_id store 23 1 Walmart 24 2 Dstore 25 2 Walmart Transactions:
cust_id transaction_date purchase_amount 1 2015-01-23 20 2 2015-01-12 400 3 2014-03-28 600 Joining above three datasets would give us following output:
1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart), Transactions: (1, 2015-01-23, 20)>2 -> <Customers: (2, Bob, California), Purchases: (24, 2, Dstore), Transactions: (2, 2015-01-12, 400)>
2 -> <Customers: (2, Bob, California), Purchases: (25, 2, Walmart), Transactions: (2, 2015-01-12, 400)>
A data scientist wants to join data from multiple datasets. The first dataset, Customers, contains cust_id, cust_name and location. Second dataset, Purchases, contains purchase_id, cust_id and store. Third dataset, Transactions contains purchase_id, tranasaction_data and purchase_amount. The data scientist wants to inner join Customers and Purchases on cust_id and the resultant dataset can be joined to Transactions on purchase_id
Customers:
cust_id cust_name location 1 Alex Canada 2 Bob California 3 Cathie Texas Purchases:
purchase_id cust_id store 23 1 Walmart 24 2 Dstore 25 2 Walmart CustomersPurchases:
cust_id cust_name location purchase_id store 1 Alex Canada 23 Walmart 2 Bob California 24 Dstore 3 Bob California 25 Walmart Transactions:
purchase_id transaction_date purchase_amount 23 2015-01-23 20 24 2015-01-12 400 25 2014-03-28 600 CustomersPurchasesTransactions:
purchase_id cust_id cust_name location store transaction_date purchase_amount 23 1 Alex Canada Walmart 2015-01-23 20 24 2 Bob California Dstore 2015-01-12 400 25 3 Bob California Walmart 2014-03-28 600 A data scientist wants to join two or more datasets and wants to selects columns present in the output of join. For example, Customers has cust_id, cust_name and location. Second dataset, Purchases, has purchase_id, cust_id and store. After inner join of these 2 datasets on cust_id, output dataset should have cust_id, cust_name and purchase_id.
Customers:
cust_id cust_name location 1 Alex Canada 2 Bob California 3 Cathie Texas Purchases:
purchase_id cust_id store 23 1 Walmart 24 2 Dstore 25 2 Walmart CustomersPurchases:
cust_id cust_name purchase_id 1 Alex 23 2 Bob 24 2 Bob 25
...
- Hydrator will provide, one by one, input records from customers and purchases with stage name to Joiner plugin.
- joinOn() method in the plugin will interpret schema for that stage and return join key for each stage.
For example: For customers: joinOn(Customers, (1, Alex, Canada)) will return 1, for Purchases: joinOn(Purchases, (23, 1, Walmart)) will return 1 - Hydrator will be responsible for joining records based on the type of join using following 2 engines.
- For Map Reduce:
In map() phase, joinOn() will get called for each stage record and intermediate output with tagged record will be emitted. For example:-
For Customers record, (1, Alex, Canada) => Intermediate output: <1, Customers: (1, Alex, Canada)>
For Purchases record, (23, 1, Walmart) => Intermediate output: <1, Purchases: (23, 1, Walmart)>
In reduce() phase, depending on plugin type, hydrator will join intermediate output records. For example,
For cust_id = 1, output of inner join will be: 1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)> - For Spark:
Spark has join api for pairedRdd. For each record from each source, joinOn can be called from hydrator for each record and spark join api can be used on paired rdd: rdd1.join(rdd2).
To join more than 2 sources on same key, spark join() api can be called multiple times.
For example, For inputs Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart), after inner join, output will be: 1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)>
- For Map Reduce:
Implementation:
Config.json for the plugin will look like. As we are using special characters for providing join keys, we will need to validate stage names.
...
- Join can be applied only on equality
- Join is applied only on AND for composite keys and not for OR
- Join plugin will be available for batch and not realtime as it requires mapreduce and spark programs to be executed.
- Join plugin will not use 'where' clause like sql join.
- Join plugin requires to have join key from each source for each join condition. For example, if we have 3 inputs from Customers, Transactions and Purchases, then join keys should be Customers.id=Transactions.id=Purchases.id&Customers.name=Transactions.name=Purchases.name. We can not have Customers.id=Transactions.id=Purchases.id&Customers.name=Purchases.name. To achieve the same result as Customers.id=Transactions.id=Purchases.id&Customers.name=Purchases.name condition, we can use Joiner plugin twice in datapipeline by joining Customers.id=Purchases.id&Customers.name=Purchases.name first and then join on Transactions.id, which will give the same result.