Hydrator 3.5 - Joins
Goals
Checklist
- User stories documented (Albert/Vinisha)
- User stories reviewed (Nitin)
- Design documented (Albert/Vinisha)
- Design reviewed (Terence/Andreas)
- Feature merged ()
- Examples and guides ()
- Integration tests ()
- Documentation for feature ()
- Blog post
Use Cases
- A data scientist wants to join data from multiple datasets to run a machine learning model. The first dataset, Customers, contains cust_id, cust_name and location. Second dataset, Purchases, contains purchase_id, cust_id and store. Third dataset, Transactions contains cust_id, tranasaction_data and purchase_amount. The data scientist wants to inner join these datasets on cust_id.
Customers:
cust_id cust_name location 1 Alex Canada 2 Bob California 3 Cathie Texas Purchases:
purchase_id cust_id store 23 1 Walmart 24 2 Dstore 25 2 Walmart Transactions:
cust_id transaction_date purchase_amount 1 2015-01-23 20 2 2015-01-12 400 3 2014-03-28 600 Joining above three datasets would give us following output:
1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart), Transactions: (1, 2015-01-23, 20)>2 -> <Customers: (2, Bob, California), Purchases: (24, 2, Dstore), Transactions: (2, 2015-01-12, 400)>
2 -> <Customers: (2, Bob, California), Purchases: (25, 2, Walmart), Transactions: (2, 2015-01-12, 400)>
A data scientist wants to join data from multiple datasets. The first dataset, Customers, contains cust_id, cust_name and location. Second dataset, Purchases, contains purchase_id, cust_id and store. Third dataset, Transactions contains purchase_id, tranasaction_data and purchase_amount. The data scientist wants to inner join Customers and Purchases on cust_id and the resultant dataset can be joined to Transactions on purchase_id
Customers:
cust_id cust_name location 1 Alex Canada 2 Bob California 3 Cathie Texas Purchases:
purchase_id cust_id store 23 1 Walmart 24 2 Dstore 25 2 Walmart CustomersPurchases:
cust_id cust_name location purchase_id store 1 Alex Canada 23 Walmart 2 Bob California 24 Dstore 3 Bob California 25 Walmart Transactions:
purchase_id transaction_date purchase_amount 23 2015-01-23 20 24 2015-01-12 400 25 2014-03-28 600 CustomersPurchasesTransactions:
purchase_id cust_id cust_name location store transaction_date purchase_amount 23 1 Alex Canada Walmart 2015-01-23 20 24 2 Bob California Dstore 2015-01-12 400 25 3 Bob California Walmart 2014-03-28 600 A data scientist wants to join two or more datasets and wants to selects columns present in the output of join. For example, Customers has cust_id, cust_name and location. Second dataset, Purchases, has purchase_id, cust_id and store. After inner join of these 2 datasets on cust_id, output dataset should have cust_id, cust_name and purchase_id.
Customers:
cust_id cust_name location 1 Alex Canada 2 Bob California 3 Cathie Texas Purchases:
purchase_id cust_id store 23 1 Walmart 24 2 Dstore 25 2 Walmart CustomersPurchases:
cust_id cust_name purchase_id 1 Alex 23 2 Bob 24 2 Bob 25
User Stories
- As a pipeline developer, I want to be able to join (inner, left outer, right outer, full outer) two or more stage outputs on some common fields, or do a cross join.
- As a pipeline developer, I want to be able to get metrics on number of records in and records out of the join.
- [UI] As a pipeline developer, I want to be able to see the schema of all input into the join, and the schema output by the join.
- As a pipeline developer, I want to be able to choose whether the pipeline with the join runs with mapreduce or spark.
- As a plugin developer, I want to be able to write a plugin that gets data from multiple stages joins them.
Design
To support Join on two or more stages, we will need a new plugin type. We propose 2 designs for this plugin. After the discussion, we have decided to implement join with design 1.
Design 1:
In this design we introduce new Joiner plugin type. The API for this plugin will look like:
public interface Joiner<RECORD_KEY, RECORD> { /** * Returns record key based on record and stage name to which records belongs to. * * @param input record which needs to be joined * @param stageName stage name to which records belogs to * @return Join key * @throws Exception if there is some error getting the key for join */ public RECORD_KEY joinOn(String stageName, RECORD record) throws Exception; }
To understand the api, lets take a simple example, we want to inner join Customers dataset to Purchases on cust_id.
Customers:
cust_id | cust_name | location |
---|---|---|
1 | Alex | Canada |
2 | Bob | California |
3 | Cathie | Texas |
Purchases:
purchase_id | cust_id | store |
---|---|---|
23 | 1 | Walmart |
24 | 2 | Dstore |
25 | 2 | Walmart |
- Hydrator will provide, one by one, input records from customers and purchases with stage name to Joiner plugin.
- joinOn() method in the plugin will interpret schema for that stage and return join key for each stage.
For example: For customers: joinOn(Customers, (1, Alex, Canada)) will return 1, for Purchases: joinOn(Purchases, (23, 1, Walmart)) will return 1 - Hydrator will be responsible for joining records based on the type of join using following 2 engines.
- For Map Reduce:
In map() phase, joinOn() will get called for each stage record and intermediate output with tagged record will be emitted. For example:-
For Customers record, (1, Alex, Canada) => Intermediate output: <1, Customers: (1, Alex, Canada)>
For Purchases record, (23, 1, Walmart) => Intermediate output: <1, Purchases: (23, 1, Walmart)>
In reduce() phase, depending on plugin type, hydrator will join intermediate output records. For example,
For cust_id = 1, output of inner join will be: 1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)> - For Spark:
Spark has join api for pairedRdd. For each record from each source, joinOn can be called from hydrator for each record and spark join api can be used on paired rdd: rdd1.join(rdd2).
To join more than 2 sources on same key, spark join() api can be called multiple times.
For example, For inputs Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart), after inner join, output will be: 1 -> <Customers: (1, Alex, Canada), Purchases: (23, 1, Walmart)>
- For Map Reduce:
Implementation:
Config.json for the plugin will look like. As we are using special characters for providing join keys, we will need to validate stage names.
{ "stages": [ { "name": "customers", "plugin": { "name": "Table", "type": "batchsource", ... } }, { "name": "purchases", "plugin": { "name": "Table", "type": "batchsource", ... } }, { "name": "transactions", "plugin": { "name": "Table", "type": "batchsource", ... } }, { "name": "joinStage", "plugin": { "name": "Join", "type": "batchjoiner", "properties": { "joinKeys": "customers.cust_id,purchases.cust_id,transactions.cust_id, <stageName1>.<join_field1>:customers.cust_name..<stageName1>.<join_field2>..",// : will be used to separate composite join key from each stage. Total number of join keys will be x * numOfInputs. "joinType": "innerjoin/outerjoin", // Only 2 join types are needed "numOfInputs": "3" // number of stages connected to joiner plugin "fieldsToSelect": "customers.cust_id,transactions.cust_id,<stageName>.<out_field>", // fields to be selected (without rename) in join output "fieldsToRename": "customers.cust_id:id,purchases.cust_id:customer_id,<stageName>.<old_field_name>:<new_field_name>", "requiredInputs": "transactions,<stageName>" // Required only for outer join, to decide which stage values will be null. } } }, ... ], "connections": [ { "from": "customers", "to": "joinStage" }, { "from": "purchases", "to": "joinStage" }, { "from": "transactions", "to": "joinStage" }, { "from": "innerJoin", "to": "sink" }, ] }
Outer join:
With this design outer join can be performed on more than two stages. All the nullableInput stages will have null values for their fields, if they do not match join criteria. NullableInputs are only required for outer join type, Inner join will not take nullableInputs into consideration.
Output Schema:
As join plugin takes multiple input schemas, there is a possibility of having duplicate field name in different schemas like customers.cust_id and purchases.cust_id. User can remove/rename/select fields using above config properties, similar to projection plugin. In order to populate output schema on UI, app can provide getOutputSchema REST endpoint to get output schema. In case of repeating fields in specified output schema, backend should send error message with list of repeating fields to the user. User can modify the output schema by either dropping/renaming/selecting fields and UI will again call getOutputSchema REST endpoint. If none of drop/select/rename properties are used, by default, output schema will have all the fields with field names as <stageName>.<fieldName>.
fieldsToDrop: All fields will be in the output schema except these fields
fieldsToSelect: Only these fields will be in output schema
fieldsToRename: Fields to be renamed
Pros:
- Plugin does not need to worry about join logic. Hydrator will be responsible for joins. This makes join plugin development easy.
Cons:
- Hydrator will have all the logic about join and plugin will only be responsible for emitting keys. This will make Hydrator app heavy.
Assumptions
- Join can be applied only on equality
- Join is applied only on AND for composite keys and not for OR
- Join plugin will be available for batch and not realtime as it requires mapreduce and spark programs to be executed.
- Join plugin will not use 'where' clause like sql join.
- Join plugin requires to have join key from each source for each join condition. For example, if we have 3 inputs from Customers, Transactions and Purchases, then join keys should be Customers.id=Transactions.id=Purchases.id&Customers.name=Transactions.name=Purchases.name. We can not have Customers.id=Transactions.id=Purchases.id&Customers.name=Purchases.name. To achieve the same result as Customers.id=Transactions.id=Purchases.id&Customers.name=Purchases.name condition, we can use Joiner plugin twice in datapipeline by joining Customers.id=Purchases.id&Customers.name=Purchases.name first and then join on Transactions.id, which will give the same result.