...
API (purely programmatic)
User will need to extend and implement a DynamicPartitioner, which is responsible for defining the PartitionKey to use for each record.
For each of the partition keys that this DynamicPartitioner returns, a partition will be created in the output PartitionedFileSet dataset.
Code Block | ||
---|---|---|
| ||
/** * Responsible for dynamically determining a @{link PartitionKey}. * * @param <K> Type of key * @param <V> Type of value */ public abstract interfaceclass DynamicPartitioner<K, V> { /** * Initializes a DynamicPartitioner. * <p> * This method will be called only once per {@link DynamicPartitioner} instance. * </p> * @param logicalStartTime see {@link co.cask.cdap.api.mapreduce.MapReduceContext#getLogicalStartTime} * */ public void initialize(long logicalStartTime) { // do nothing be default } /** * Determine the PartitionKey for the key-value pair to be written to. * * @param key the key to be written * @param value the value to be written * @param logicalStartTime see {@link co.cask.cdap.api.mapreduce.MapReduceContext#getLogicalStartTime} * @return the {@link PartitionKey} for the key-value pair to be written to. */ public abstract PartitionKey getPartitionKey(K key, V value, long logicalStartTime); } |
Code Block | ||
---|---|---|
| ||
public static final class CustomDynamicPartitioner implementsextends DynamicPartitioner<byte[], Long> { private long logicalStartTime; @Override public void initialize(long logicalStartTime) { this.logicalStartTime = logicalStartTime; } @Override public PartitionKey getPartitionKey(byte[] key, Long value, long logicalStartTime) { return PartitionKey.builder().addLongField("time", logicalStartTime).addLongField("valueother", value).build(); } } |
In the beforeSubmit() method of the user's MapReduce job, set the PartitionedFileSet dataset with the user's Dynamic Partitioner as output
...