Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. User should be able to run a workflow to process new partitions of a PFS.
  2. User should be able to limit the number of partitions I process in each run of the workflow.
  3. User should be able to apply a PartitionFilter over the partitions to be processed by a workflow.
  4. User should be able to run multiple instances of the workflow, each processing its own set of partitions.
  5. User should be able to retry the processing of a partition a configurable number of times, if there is a failure to process it.
  6. User should be able to start or end the incremental processing at a particular start/end timestamp.
  7. User should be able to execute queries to query for new partitions outside of a workflow/MapReduce (for instance, from a Worker).

Related JIRAs:

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-3102
 BatchPartitionConsumer should support a limit to read

https://issues.cask.co/browse/CDAP-3103 BatchPartitionConsumer should allow a partition filter

...

Code Block
languagejava
linenumberstrue
public interface PartitionConsumerConfiguration {

  // defines how/where the state is persisted and read from
  public byte[] readState();
  public void persistState(byte[] state);

  // defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing
  public PartitionFilter getPartitionFilter();

  // defines an upper bound on how many partitions can be in the working set at any given time/**
 * Configuration parameters to be used by a {@link PartitionConsumer}.
 */
public class ConsumerConfiguration {

  ...

  /**
   * @return A predicate to be applied on {@link PartitionDetail}s to determine which partitions to include in the
   *         partition consumption.
   */
  public Predicate<PartitionDetail> getPartitionPredicate() {
    return partitionPredicate;
  }

  /**
   * @return An upper bound on the size of the working set of partitions that get serialized as part of the consumer's
   *         state.
   */
  public int getMaxWorkingSetSize(); {
  // defines anreturn uppermaxWorkingSetSize;
bound on how}
many
partitions can be/**
processed by a run* ofDefines aan workflowexpiration instancetimeout, in seconds, publicof int getProcessingLimit();IN_PROGRESS partitions
   //* called@return when processingnumber of theseconds retrievedthat partitionsa ispartition successfulcan be in publicprogress void onSuccess();
 
  // called when there is an error during processing of partitions
  public void onFailure();
}

 

...

before it is deemed failed.
   */
  public long getTimeout() {
    return timeout;
  }


Below is the API that the PartitionConsumer class exposes:

Code Block
languagejava
linenumberstrue
public class PartitionConsumer {

  public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) {
    PartitionConsumerState state = partitionConsumerConfiguration.readState();
    // get new partitions in the dataset that match a partition PartitionFilter
    Iterator<Partition> partitions =
      pfsDataset.consumePartitions(state.getPointer(), limit = getMaxWorkingSetSize() - state.getSize(), getPartitionFilter());
    // add these new partitions into the working set.
    state.addPartitions(partitions);

    // get a number of partitions from the working set, mark them IN_PROGRESS, and return them.
    Iterator<Partition> partitionsToProcess = state.getPartitions(getProcessingLimit());
    partitionConsumerConfiguration.persistState(state);
    // need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS.
    return partitionsToProcess;
  }
} 

 

Example Usage

/**
 * Incrementally consumes new/unprocessed {@link Partition}s of a {@link PartitionedFileSet}.
 */
public interface PartitionConsumer {

  /**
   * @return list of {@link PartitionDetail}s that have not yet processed.
   */
  List<PartitionDetail> consumePartitions();

  /**
   * @param limit upper limit on number of partitions to consume
   * @return list of {@link PartitionDetail}s that have not yet processed.
   */
  List<PartitionDetail> consumePartitions(int limit);

  /**
   * This method must be called on any partitions returned by the {@code #consumePartitions} method.
   *
   * @param partitions list of partitions to mark as either succeeded or failed processing
   * @param succeeded whether or not processing of the specified partitions was successful
   */
  void onFinish(List<? extends Partition> partitions, boolean succeeded);

}

 

Below is usage of the ConsumerConfiguration class, which can be passed to the PartitionedFileSet, when requesting a PartitionConsumer

Code Block
languagejava
linenumberstrue
ConsumerConfiguration.builder()
  .setPartitionPredicate(predicate)
  .setMaxWorkingSetSize(2000)
  .build();

 

Example Usage

From a worker: 

Code Block
languagejava
linenumberstrue
/**
 * Worker which runs one iteration of processing new partitions.
 * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code.
 */
public class CustomWorker extends AbstractWorker {

  @Override
  public void run() {
    PartitionConsumerTransactionalPartitionConsumer partitionConsumer =
      new PartitionConsumerTransactionalPartitionConsumer(getContext();, "lines",
   PartitionConsumerConfiguration pcc = ... // Design of user-defined configuration - TBD     PartitionConsumerResult partitionsToProcess;      getContext().execute(new TxRunnable() {       @Override       public voidnew run(DatasetContext context) throws Exception {
        PartitionedFileSet pfs = context.getDataset("customPFS"); KVTableStatePersistor("consumingState", "state.key"));
    // request new partitions (with an upper limit of 10)
    final List<PartitionDetail> partitions  partitionsToProcess = partitionConsumer.getPartitions(pfs, pccconsumePartitions(10);
 
    } 
   }); 
    boolean success = true;
    try {
      // process partitionsToProcess
    } catch (Throwable t) {
      success = false;
      // log the error
    }
 
    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        if (partitionConsumer.onFinish(partitionsToProcess, success);
 {     }
    });
// this will, by default, remove them from the working set because they have successfully been processed
          pcc.onSuccess(partitionsToProcess);
        } else  }
}


From a MapReduce:

Code Block
languagejava
linenumberstrue
/**
 * MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}.
 */
public class DataCleansingMapReduce extends AbstractMapReduce {
 
  private PartitionBatchInput.BatchPartitionCommitter partitionCommitter;

  @Override
  public void beforeSubmit(MapReduceContext context) throws Exception {
    partitionCommitter =
     // this will, by default, leave the partitions in the working set, and simply mark them as NEW PartitionBatchInput.setInput(context, DataCleansing.RAW_RECORDS,
                                   new  pcc.onFailure(partitionsToProcess); 
KVTableStatePersistor(DataCleansing.CONSUMING_STATE, "state.key"));
    // set the output dataset as well as mapper and reducer classes
    ...
  }

  @Override
  public void onFinish(boolean succeeded, MapReduceContext context) throws Exception }{
    }partitionCommitter.onFinish(succeeded);
  }
 
  // define the mapper and reducer classes
  ...
}


Pending Questions

  1. What will be the format of the state / WorkingSet? How will it be serialized/deserialized?
    Instead of a numerical getProcessingLimit(), an alternative is to allow user to define a callback that will be called for each partition being fetched from the working set, and it returns either true or false to specify whether to add more partitions or not. This will allow users to take into consideration the partition's metadata (which could encode size of the files in that partition) and dynamically determine how many partitions to process.Answer: We will use the current state (PartitionConsumerState.java) as the cursor when requesting new partitions. In addition, we will have to store a working set of partitions. This will be two lists - a 'NEW' list of partitions, ready to be consumed as well as an 'IN PROGRESS' list of partitions that are currently being consumed. Each Partition will need to be a PartitionDetail, so that it also has its metadata. We will need to define a toBytes() method similar to the existing PartitionConsumerState#toBytes method. In addition, we will define a toJson() serialization for human readability.
  2. Limit the type of dataset (Table) that can be written to, for simplification of reading? This will make it easier to make inspection/debugging tools around the consuming states.
    For now, there's a helper class: KVTableStatePersistor.