Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Related JIRA: 

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-2756

Use Cases:

  • Validator Filter: All records of a transform that are invalid go into one dataset; the remainder go into another.
  • Writing the same output data to two separate outputs, with different formats.

...

Existing APIs (in MapReduceContext, used in beforeSubmit):

Code Block
languagejava
// sets a single Dataset as the output for the MapReduce job

...


context.setOutput(String datasetName);

...


context.setOutput(String datasetName, Dataset dataset);

 

New APIs (in MapReduceContext, used in beforeSubmit):

Code Block
languagejava
// 

...

specify a Dataset and arguments, to 

...

be used 

...

as 

...

an output 

...

for the MapReduce job:

...


context.addOutput(String datasetName);

...


context.addOutput(String datasetName, Map<String, 

...

String> 

...

arguments); 

 

New APIs (in our custom MultipleOutputs, to be used from mapper/reducer):- note that this will be a custom mapper, reducer, and context classes which override the hadoop classes, providing the additional functionality of writing to multiple outputs:

Code Block
languagejava
// specifies which Dataset to write to and handles the delegation to the appropriate OutputFormat:

...


context.write(String datasetName, KEY key, VALUE value);

 

New APIs (in BatchSinkContext, used in prepareRun of the BatchSink):

Code Block
languagejava
// specify a Dataset and arguments, to be used as an output for the Adapter job:
context.addOutput(String datasetName);
context.addOutput(String datasetName, Map<String, String> arguments); 

Example Usage:

Code Block
languagejava
public void beforeSubmit(MapReduceContext context) throws Exception {

...


  context.addOutput("cleanCounts");

...


  context.addOutput("invalidCounts");

...


  // ...

...


}

...



public static class Counter extends 

...

AbstractReducer<Text, IntWritable, byte[], Long> {

...


...


...

 

...

 

...

@Override

...


...

 

...

 

...

public void reduce(Text key, Iterable<IntWritable> values, Context context) {

...


    // do computation and output to the desired dataset

...


    

...

if ( ... ) {

...


     

...

 

...

context.write(

...

key.getBytes(), val);

...


    } else {

...


     

...

 

...

context.write("invalidCounts", key.getBytes(), val);

...


...

 

...

 

...

  

...

}
  }

Approach:

Take an approach similar to org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.
The Datasets to be written to must be defined in advance, in the beforeSubmit of the MapReduce job.
In the mapper/reducer, the user specifies the name of the output Dataset, and our helper class (MultipleOutputs) determines the appropriate OutputFormat and configuration for writing.

Questions:

Should we also allow specifying in the configure method? I suggest we deprecate The MapperWrapper and ReducerWrapper will be responsible for instantiating the MultipleOutputs class and setting it on the user's mapper/reducer in a similar fashion as Metrics are set. The MapperWrapper and ReducerWrapper will also be responsible for closing the MultipleOutputs object.

Deprecate the setting of output dataset from the configure method as it provides no utility over setting it in the beforeSubmit.

New APIs in BatchSinkContext will simply delegate to MapReduceContext's new APIs for having multiple output Datasets.

Questions:

Naming of the MultipleOutputs class that we expose is up for change.
Does the MultipleOutputs class go into cdap-api? If so, how do we expose it to the user for instantiation (it has hadoop dependencies)?
Should we allow the user to write to non-Dataset files from our MultipleOutputs class? I suggest no for simplicity. What this will disallow is the ability to write to both a Dataset and non-Dataset files from the same MapReduce.
Should we restrict users from simply calling context.write(k, v), after having set multiple Datasets as the output?