Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Requirements

  1. User should be able to run a workflow to process new partitions of a PFS.
  2. User should be able to limit the number of partitions I process in each run of the workflow.
  3. User should be able to apply a PartitionFilter over the partitions to be processed by a workflow.
  4. User should be able to run multiple instances of the workflow, each processing its own set of partitions.
  5. User should be able to retry the processing of a partition a configurable number of times, if there is a failure to process it.
  6. User should be able to start or end the incremental processing at a particular start/end timestamp.
  7. User should be able to execute queries for partitions outside of a workflow/MapReduce (for instance, from a Worker).

Related JIRAs:

https://issues.cask.co/browse/CDAP-3103 BatchPartitionConsumer should allow a partition filter

https://issues.cask.co/browse/CDAP-3944 Multiple BatchPartitionConsumer consumers

 

Overview

A majority of the underlying/necessary implementation to fulfill incremental partition consuming was done in the first implementation of BatchPartitionConsumer, in CDAP 3.1.0. This involved having indexing each partition by its creation time (more correctly, its transaction’s write pointer).

https://issues.cask.co/browse/CDAP-2747

https://github.com/caskdata/cdap/pull/3075

 

The pending work is improving this implementation to allow features such as multiple workflow instances (which involves keeping a more complex state) and customization of the query and process pipeline. 

 

Overall Design

Overall, the flow of incremental processing of partitions will compose of these components:

  1. Read the current processing state. This will encode the working set of partitions.
  2. Query for new partitions from the PFS and add them to this state / working set.
  3. The current processing instance can select partitions from this working set and mark them as IN_PROGRESS.
  4. Persist the state (which includes the working set).
  5. Perform the custom processing on these partitions
  6. Execute some onSuccess or onFinish callbacks for these processed partitions. For instance, marking them as NEW again upon failure, or removing them from the working set on success.

 

API Design

public interface PartitionConsumerConfiguration {

  // defines how/where the state is persisted and read from
  public byte[] readState();
  public void persistState(byte[] state);

  // defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing
  public PartitionFilter getPartitionFilter();

  // defines an upper bound on how many partitions can be in the working set at any given time
  public int getMaxWorkingSetSize();

  // defines an upper bound on how many partitions can be processed by a run of a workflow instance
  public int getProcessingLimit();

  // called when processing of the retrieved partitions is successful
  public void onSuccess();
 
  // called when there is an error during processing of partitions
  public void onFailure();
}

 

With the above methods implemented in PartitionConsumerConfiguration, the flow of query and process will look like:

public class PartitionConsumer {

  public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) {
    PartitionConsumerState state = partitionConsumerConfiguration.readState();
    // get new partitions in the dataset that match a partition PartitionFilter
    Iterator<Partition> partitions =
      pfsDataset.consumePartitions(state.getPointer(), limit = getMaxWorkingSetSize() - state.getSize(), getPartitionFilter());
    // add these new partitions into the working set.
    state.addPartitions(partitions);

    // get a number of partitions from the working set, mark them IN_PROGRESS, and return them.
    Iterator<Partition> partitionsToProcess = state.getPartitions(getProcessingLimit());
    partitionConsumerConfiguration.persistState(state);
    // need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS.
    return partitionsToProcess;
  }
} 

 

Example Usage

/**
 * Worker which runs one iteration of processing new partitions.
 * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code.
 */
public class CustomWorker extends AbstractWorker {

  @Override
  public void run() {
    PartitionConsumer partitionConsumer = new PartitionConsumer();
    PartitionConsumerConfiguration pcc = ... // Design of user-defined configuration - TBD
    PartitionConsumerResult partitionsToProcess;

    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        PartitionedFileSet pfs = context.getDataset("customPFS"); 
        partitionsToProcess = partitionConsumer.getPartitions(pfs, pcc);
      }
    });

    boolean success = true;
    try {
      // process partitionsToProcess
    } catch (Throwable t) {
      success = false;
      // log the error
    }

    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        if (success) {
          // this will, by default, remove them from the working set because they have successfully been processed
          pcc.onSuccess(partitionsToProcess);
        } else {
          // this will, by default, leave the partitions in the working set, and simply mark them as NEW
          pcc.onFailure(partitionsToProcess); 
        }
      }
    });
  }
}

Pending Questions

  1. What will be the format of the state / WorkingSet? How will it be serialized/deserialized?
  2. Instead of a numerical getProcessingLimit(), an alternative is to allow user to define a callback that will be called for each partition being fetched from the working set, and it returns either true or false to specify whether to add more partitions or not. This will allow users to take into consideration the partition's metadata (which could encode size of the files in that partition) and dynamically determine how many partitions to process.
  • No labels