...
...
...
...
...
...
...
...
Use Cases:
- Validator Filter: All records of a transform that are invalid go into one dataset; the remainder go into another.
- Writing the same output data to two separate outputs, with different formats.
API:
Existing APIs (in MapReduceContext); sets a single Dataset as the output for the MapReduce job:, used in beforeSubmit):
// sets a single Dataset as the output for the MapReduce job
context.setOutput(String datasetName);
context.setOutput(String datasetName, Dataset dataset);
Additional New APIs (in MapReduceContext); adds a Dataset to the set of output Datasets for the MapReduce , used in beforeSubmit):
// adds a Dataset to the set of output Datasets for the MapReduce job:
context.addOutput(String datasetName);
context.addOutput(String datasetName, Dataset dataset);
New APIs (in our custom MultipleOutputs, to be used from mapper/reducer):
// specifies which Dataset to write to and handles the delegation to the appropriate OutputFormat:
mos.write(String datasetName, KEY key, VALUE value);
Example Usage:
public void beforeSubmit(MapReduceContext context) throws Exception {
context.setOutput();
context.addOutput("cleanCounts");
context.addOutput("invalidCounts");
// ...
}
public static class Counter extends Reducer<Text, IntWritable, byte[], Long> {
private MultipleOutputs<byte[], Long> mos;
@Override
protected void setup(Context context) {
mos = new MultipleOutputs<>(context);
}
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
// do computation and output to the desired dataset
if ( ... ) {
mos.write("cleanCounts", key.getBytes(), val);
} else {
mos.write("invalidCounts", key.getBytes(), val);
}
}
@Override
protected void cleanup(Context context) {
mos.close();
}
}
Approach:
Take an approach similar to org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.
The Datasets to be written to must be defined in advance, in the beforeSubmit of the MapReduce job.
In the mapper/reducer, the user specifies the name of the output Dataset, and our helper class (MultipleOutputs) determines the appropriate OutputFormat and configuration for writing.
Questions:
Should we also allow specifying in the configure method? I suggest we deprecate the setting of output dataset from the configure method as it provides no utility over setting it in the beforeSubmit.
Naming of the MultipleOutputs class that we expose is up for change.
Does the MultipleOutputs class go into cdap-api? If so, how do we expose it to the user for instantiation (it has hadoop dependencies)?
Should we allow the user to write to non-Dataset files from our MultipleOutputs class?