...
...
Jira Legacy | ||||||
---|---|---|---|---|---|---|
|
API
User will need to extend and implement a DynamicPartitioner, which is responsible for defining the PartitionKey to use for each record.
For each of the partition keys that this DynamicPartitioner returns, a partition will be created in the output PartitionedFileSet dataset.
Code Block | ||
---|---|---|
| ||
/**public static *final Responsibleclass forCustomDynamicPartitioner dynamicallyextends determiningDynamicPartitioner<Long, aLong> @{link PartitionKey}. */private publiclong interfacelogicalStartTime; DynamicPartitioner<K, V> { @Override /**public void initialize(MapReduceTaskContext mapReduceTaskContext) *{ Determine the PartitionKey for the key-value pair to be written to. * * @param key the key to be written * @param value the value to be written * @param logicalStartTime see {@link co.cask.cdap.api.mapreduce.MapReduceContext#getLogicalStartTime} * @return the {@link PartitionKey} for the key-value pair to be written to. */ PartitionKey getPartitionKey(K key, V value, long logicalStartTime); } |
Code Block | ||
---|---|---|
| ||
public static final class CustomDynamicPartitioner implements DynamicPartitioner<byte[], Long> { this.logicalStartTime = mapReduceTaskContext.getLogicalStartTime(); } @Override public PartitionKey getPartitionKey(byte[]Long key, Long value, long logicalStartTime) { return PartitionKey.builder().addLongField("time", logicalStartTime).addLongField("valueother", valuekey).build(); } } |
In the beforeSubmit() method of the user's MapReduce job, set the PartitionedFileSet dataset with the user's Dynamic Partitioner as output
Code Block | ||
---|---|---|
| ||
public void beforeSubmit(MapReduceContext context) throws Exception {
// define input and other setup
// ...
Map<String, String> outputArgs = new HashMap<>();
// alternative to setting the PartitionKey, set a DynamicPartitioner class
PartitionedFileSetArguments.setDynamicPartitioner(outputArgs, CustomDynamicPartitioner.class);
context.addOutput("outputLines", outputArgs);
}
|
...
...
High-level Implementation Design
...
If this output PartitionKey is missing, we will look for a specified DynamicPartitioner, which maps records to PartitionKey.
We will also have to implement our own DynamicPartitioningOutputFormat which is responsible for writing to multiple paths, depending on the PartitionKey.
We will also need to implement an OutputCommitter, which is responsible for creating partitions for each of the partition keys written to.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Responsible for dynamically determining a @{link PartitionKey}.
* For each K, V pair, the getPartitionKey(K, V) method is called to determine a PartitionKey.
*
* @param <K> Type of key
* @param <V> Type of value
*/
public abstract class DynamicPartitioner<K, V> {
/**
* Initializes a DynamicPartitioner.
* <p>
* This method will be called only once per {@link DynamicPartitioner} instance. It is the first method call
* on that instance.
* </p>
* @param mapReduceTaskContext the mapReduceTaskContext for the task that this DynamicPartitioner is running in.
*
*/
public void initialize(MapReduceTaskContext<K, V> mapReduceTaskContext) {
// do nothing by default
}
/**
* Destroys a DynamicPartitioner.
* <p>
* This method will be called only once per {@link DynamicPartitioner} instance. It is the last method call
* on that instance.
* </p>
*/
public void destroy() {
// do nothing by default
}
/**
* Determine the PartitionKey for the key-value pair to be written to.
*
* @param key the key to be written
* @param value the value to be written
* @return the {@link PartitionKey} for the key-value pair to be written to.
*/
public abstract PartitionKey getPartitionKey(K key, V value);
} |