Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Existing APIs (in MapReduceContext, used in beforeSubmit):

Code Block
languagejava
// sets a single Dataset as the output for the MapReduce job

...


context.setOutput(String datasetName);

...


context.setOutput(String datasetName, Dataset dataset);

 

New APIs (in MapReduceContext, used in beforeSubmit):

Code Block
languagejava
// 

...

adds a Dataset to the set of output Datasets for the MapReduce job:

...


context.addOutput(String datasetName);

...


context.addOutput(String datasetName, Dataset dataset); 

 

New APIs (in our custom MultipleOutputs, to be used from mapper/reducer):

Code Block
languagejava
// specifies which Dataset to write to and handles the delegation to the appropriate OutputFormat:

...


mos.write(String datasetName, KEY key, VALUE value);

 

Example Usage:

Code Block
languagejava
public void beforeSubmit(MapReduceContext context) throws Exception {
  context.addOutput("cleanCounts");
  context.addOutput("invalidCounts");
  // ...
}

public static class Counter extends Reducer<Text, IntWritable, byte[], Long> {
  private MultipleOutputs<byte[], Long> mos;

  @Override
  protected void setup(Context context) {
    mos = new MultipleOutputs<>(context);
  }

  @Override
  public void reduce(Text key, Iterable<IntWritable> values, Context context) {
    // do computation and output to the desired dataset
    if ( ... ) {
      mos.write("cleanCounts", key.getBytes(), val);
    } else {
      mos.write("invalidCounts", key.getBytes(), val);
    }
  }

  @Override
  protected void cleanup(Context context) {
    mos.close();
  }
}

...