Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Goals

  1. JIRA: CDAP-3980: Multiple input datasets for MapReduce.

Checklist

  • User stories documented (Ali)
  • User stories reviewed (Nitin)
  • Design documented (Ali)
  • Design reviewed (Albert/Terence/Andreas)
  • Feature merged (Ali)
  • Examples and guides (Ali)
  • Integration tests (Ali) 
  • Documentation for feature (Ali)
  • Blog post

Use Cases

  1. A developer wants to compute an aggregation (for instance, word count) across data that is stored in multiple datasets.
  2. Joining in MapReduce:
    A developer wants to load data from a 'customers' dataset which has the customer's details. The developer then wants to load a 'transactions' dataset which holds less information about the customer, but more about a particular transaction. The developer should be able to join the data of these two datasets. (see Use Case #2 on Cask Hydrator++).

User Stories

  1. A developer should be able to set multiple datasets as input to one MapReduce job.
    1. Each dataset has the same type
    2. Each dataset has different types (this will require different Mapper classes). Note that the restriction here is that each of the Mappers must have the same output type (single Reducer class).
  2. A developer should be able to read from different partitions of a PartitionedFileSet (multiple time ranges of a TimePartitionedFileSet).
  3. A developer should be able to know which input they are processing data from, in their Mapper/Reducer.

Approach for CDAP-3980 (wip)

There already exists a MultipleInputs class in Hadoop, which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path.
Two downsides to this are:

  1. If user uses this functionality, their mapper class can no longer implement ProgramLifeCycle<MapReduceTaskContext> and expect initialize/destroy methods to be called.
  2. Datasets can not be used as the input with this implementation.

 

Summary of approach:

  1. The existing APIs exposed in the Hadoop Multiple Inputs class:

    public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass);
    public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass);
    
  2. There are several 'setInput' methods on MapReduceContext. These will be deprecated and replaced by the ones in the code snippet below. The setInput methods in AbstractMapReduce will also be deprecated.
    Since the different inputs may have differing formats, a different Mapper class may be required for each input.

    Note that our APIs are optimized for Datasets, and not files (User can not use the methods of FileInputformat on the job, with multiple inputs).

  3. New Input class will be used to define the input to the MapReduce (currently, there are 7 'setInput' methods available on the MapReduceContext):

    // Input class will consist of the following three things:
    public class Input {
      private final String datasetName;
      private final Map<String, String> arguments;
      private final List<Split> splits;
    
      // ...
    
    } 
    
    
    // The new APIs on MapReduceContext will be:
    void addInput(Input input);
    void addInput(StreamBatchReadable stream);
    void addInput(String inputName, InputFormatProvider inputFormatProvider);
  4. Similar to Hadoop MultipleInputs, we will have a DelegatingInputFormat that delegates the record-reading and getSplits to other input formats. Ours will also support Datasets, whereas the existing one in Hadoop libraries only supports paths.

 

  • No labels