Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

The primary use case for this feature is writing records to multiple partitions, where the partition can vary based upon the data in the key/value.
For instance, the records being processed may encode an event time of that record, which needs to be one of the partition keys.

API (purely programmatic)

User will need to implement a DynamicPartitioner, which is responsible for defining the PartitionKey to use for each record.
For each of the partition keys that this DynamicPartitioner returns, a partition will be created in the output PartitionedFileSet dataset.

/**
 * Responsible for dynamically determining a @{link PartitionKey}.
 */
public interface DynamicPartitioner<K, V> {

  /**
   * Determine the PartitionKey for the key-value pair to be written to.
   *
   * @param key the key to be written
   * @param value the value to be written
   * @param logicalStartTime see {@link co.cask.cdap.api.mapreduce.MapReduceContext#getLogicalStartTime}
   * @return the {@link PartitionKey} for the key-value pair to be written to.
   */
  PartitionKey getPartitionKey(K key, V value, long logicalStartTime);
}

 

public static final class CustomDynamicPartitioner implements DynamicPartitioner<byte[], Long> {
  @Override
  public PartitionKey getPartitionKey(byte[] key, Long value, long logicalStartTime) {
    return PartitionKey.builder().addLongField("time", logicalStartTime).addLongField("value", value).build();
  }
}


 In the beforeSubmit() method of the user's MapReduce job, set the PartitionedFileSet dataset with the user's Dynamic Partitioner as output

public void beforeSubmit(MapReduceContext context) throws Exception {
  // define input and other setup
  // ...

  Map<String, String> outputArgs = new HashMap<>();
  // alternative to setting the PartitionKey, set a DynamicPartitioner class
  PartitionedFileSetArguments.setDynamicPartitioner(outputArgs, CustomDynamicPartitioner.class);
  context.addOutput("outputLines", outputArgs);
}

 
 Alternatives:
1) For more flexibility to the user, an alternative to passing in the logicalStartTime into the DynamicPartitioner is to pass in CDAP's MapReduceTaskContext or Hadoop's TaskAttemptContext. Passing in the latter will require moving the DynamicPartitioner interface into a new cdap-api-hadoop module.   

High-level Implementation Design

Currently, when using a PartitionedFileSet dataset as an output for MapReduce, a single partition must be set as the output partition.
This means that currently, the MapReduce job will write to a single output directory and register a single partition within the PartitionedFileSet's metadata table.
Dynamic Partitioning will allow users to specify a PartitionKey based upon the records being processed by the MapReduce job. 


If this output PartitionKey is missing, we will look for a specified DynamicPartitioner, which maps records to PartitionKey.
We will also have to implement our own DynamicPartitioningOutputFormat which is responsible for writing to multiple paths, depending on the PartitionKey.
We will also need to implement an OutputCommitter, which is responsible for creating partitions for each of the partition keys written to.

  • No labels