...
Code Block | ||
---|---|---|
| ||
public void beforeSubmit(MapReduceContext context) throws Exception { context.addOutput("cleanCounts"); context.addOutput("invalidCounts"); // ... } public static class Counter extends Reducer<Text, IntWritable, byte[], Long> { private MultipleOutputs<byte[], Long>MultipleOutputs mos; @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) { // do computation and output to the desired dataset if ( ... ) { mos.write("cleanCounts", key.getBytes(), val); } else { mos.write("invalidCounts", key.getBytes(), val); } } } |
...