Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  •  User stories documented (Ali)
  •  User stories reviewed (Nitin)
  •  Design documented (Ali)
  •  Design reviewed (Albert/Terence/Andreas)
  •  Feature merged (Ali)
  •  Examples and guides (Ali)
  •  Integration tests (Ali) 
  •  Documentation for feature (Ali)
  •  Blog post

Misc section.

...

Use Cases

...

  1. The user wants to process data from multiple datasets in one MapReduce job.
      The user wants to load data from a 'users' table that contains user id, and various attributes such as age, gender, email, etc. The user also wants to load data from a 'purchases' table that contains a user id, item id, purchase time, and purchase price.  The user then wants to join both tables on userid, then run a collaborative filtering algorithm to generate a model that can be used to recommend shopping items to people. (from Cask Hydrator++)
      1. Each dataset has the same type
      2. Each dataset has different types (this will require different Mapper classes). Note that the restriction here is that each of the Mappers must have the same output type (single Reducer class).
    1. Performing a join on a particular key, from two different datasets (see Use Case #2 on Cask Hydrator++).
    2. Reading from different partitions of a PartitionedFileSet (multiple time ranges of a TimePartitionedFileSet).
    3. User should be able to know which input they are processing data from (in their mapper/reducer).

    User Stories

    Approach for CDAP-3980

    There already exists a MultipleInputs class in Hadoop, which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path.
    Two downsides to this are:

    1. If user uses this functionality, their mapper class can no longer implement ProgramLifeCycle<MapReduceTaskContext> and expect initialize/destroy methods to be called.
    2. Datasets can not be used as the input with this implementation.

      

    Summary of approach:

    1. The existing APIs exposed in the Hadoop Multiple Inputs class:

      Code Block
      languagejava
      public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass);
      public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass);
      
    2. There are several 'setInput' methods on MapReduceContext. These will be deprecated and replaced by corresponding addInput methods. Pend

      Code Block
      languagejava
       
    3. Code Block
      // Insert example here
    4. Work in progress..the ones in the code snippet below. The setInput methods in AbstractMapReduce will also be deprecated.
      Since the different inputs may have differing formats, a different Mapper class may be required for each input.

      Note that our APIs are optimized for Datasets, and not files (User can not use the methods of FileInputformat on the job, with multiple inputs).

    5. New Input class will be used to define the input to the MapReduce (currently, there are 7 'setInput' methods available on the MapReduceContext):

      Code Block
      // Input class will consist of the following three things:
      public class Input {
        private final String datasetName;
        private final Map<String, String> arguments;
        private final List<Split> splits;
      
        // ...
      
      } 
      
      
      // The new APIs on MapReduceContext will be:
      void addInput(Input input);
      void addInput(StreamBatchReadable stream);
      void addInput(String inputName, InputFormatProvider inputFormatProvider);
    6. Similar to Hadoop MultipleInputs, we will have a DelegatingInputFormat that delegates the record-reading and getSplits to other input formats. Ours will also support Datasets, whereas the existing one in Hadoop libraries only supports paths.