Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Goals

  1. JIRA: CDAP-3980: Multiple input datasets for MapReduce.

Checklist

  • User stories documented (Ali)
  • User stories reviewed (Nitin)
  • Design documented (Ali)
  • Design reviewed (Albert/Terence/Andreas)
  • Feature merged (Ali)
  • Examples and guides (Ali)
  • Integration tests (Ali) 
  • Documentation for feature (Ali)
  • Blog post

Use Cases

  1. A developer wants to compute an aggregation (for instance, word count) across data that is stored in multiple datasets.
  2. Joining in MapReduce:
    A developer wants to load data from a 'customers' dataset which has the customer's details. The developer then wants to load a 'transactions' dataset which holds less information about the customer, but more about a particular transaction. The developer should be able to join the data of these two datasets. (see Use Case #2 on Cask Hydrator++).

User Stories

  1. A developer should be able to set multiple datasets as input to one MapReduce job.
    1. The datasets have the same type.
    2. The datasets have different types (this will require different Mapper classes). Note that the restriction here is that each of the Mappers must have the same output type (single Reducer class).
  2. A developer should be able to read from different partitions of a PartitionedFileSet (multiple time ranges of a TimePartitionedFileSet).
  3. A developer should be able to know which input they are processing data from, in their Mapper/Reducer.
  4. A developer should be able to use Cask Hydrator to set up multiple sources in their pipeline.
  5. A developer should be able to use Cask Hydrator to perform a join across two branches in their pipeline (See User Story #7 on Cask Hydrator++).


API Changes

 

There are several 'setInput' methods on MapReduceContext. These will be deprecated and replaced by the ones in the code snippet below. The setInput methods in AbstractMapReduce will also be deprecated.
Since the different inputs may have differing formats, a different Mapper class may be required for each input, and so our APIs will need to depend on Hadoop modules.

Note that our APIs are optimized for Datasets, and not files (User can not use the methods of FileInputformat on the job, with multiple inputs).

New Input class will be used to define the input to the MapReduce (currently, there are 7 'setInput' methods available on the MapReduceContext):

 

// Input class will encapsulate one of the following three things:
//   1. Dataset - name, args, splits
//   2. Stream - StreamBatchReadable
//   3. InputFormatProvider
public class Input {

}
// The new APIs on MapReduceContext will be:
void addInput(Input input);
void addInput(Input input, Class<?> mapperClass);


Approach for CDAP-3980 (wip)

There already exists a MultipleInputs class in Hadoop, which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path.
Two downsides to this are:

  1. If user uses this functionality, their mapper class can no longer implement ProgramLifeCycle<MapReduceTaskContext> and expect initialize/destroy methods to be called.
  2. Datasets can not be used as the input with this implementation.

 

Summary of approach:

  1. The existing APIs exposed in the Hadoop Multiple Inputs class:

    public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass);
    public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass);
    
  2. Similar to Hadoop MultipleInputs, we will have a DelegatingInputFormat that delegates the record-reading and getSplits to other input formats. Ours will also support Datasets, whereas the existing one in Hadoop libraries only supports paths.

  • No labels