...
Existing APIs (in MapReduceContext, used in beforeSubmit):
Code Block | ||
---|---|---|
| ||
// sets a single Dataset as the output for the MapReduce job |
...
context.setOutput(String datasetName); |
...
context.setOutput(String datasetName, Dataset dataset); |
New APIs (in MapReduceContext, used in beforeSubmit):
Code Block | ||
---|---|---|
| ||
// |
...
adds a Dataset to the set of output Datasets for the MapReduce job: |
...
context.addOutput(String datasetName); |
...
context.addOutput(String datasetName, Dataset dataset); |
New APIs (in our custom MultipleOutputs, to be used from mapper/reducer):
Code Block | ||
---|---|---|
| ||
// specifies which Dataset to write to and handles the delegation to the appropriate OutputFormat: |
...
mos.write(String datasetName, KEY key, VALUE value); |
Example Usage:
Code Block | ||
---|---|---|
| ||
public void beforeSubmit(MapReduceContext context) throws Exception {
context.addOutput("cleanCounts");
context.addOutput("invalidCounts");
// ...
}
public static class Counter extends Reducer<Text, IntWritable, byte[], Long> {
private MultipleOutputs<byte[], Long> mos;
@Override
protected void setup(Context context) {
mos = new MultipleOutputs<>(context);
}
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
// do computation and output to the desired dataset
if ( ... ) {
mos.write("cleanCounts", key.getBytes(), val);
} else {
mos.write("invalidCounts", key.getBytes(), val);
}
}
@Override
protected void cleanup(Context context) {
mos.close();
}
} |
...