Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
public interface PartitionConsumerConfiguration {

  // defines how/where the state is persisted and read from
  public byte[] readState();
  public void persistState(byte[] state);
 
  // defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing
  public PartitionFilter getPartitionFilter();

  // defines an upper bound on how many partitions can be in the working set at any given time
  public int getMaxWorkingSetSize();
 
  // defines an upper bound on how many partitions can be processed by a run of a workflow instance whether to continue consuming partitions or not, when retrieving partitions from the working set
  public PartitionAccepter getConsumingAccepter();

  // defines an expiration timeout, in seconds, of IN_PROGRESS partitions
  public int getProcessingLimitgetTimeOut();

  // called when processing of the retrieved partitions is successful
  public void onSuccess();
 
  // called when there is an error during processingconsuming of partitions
  public void onFailure();
}
Code Block
languagejava
linenumberstrue
/**
 * Responsible for determining whether to accept more Partitions. Generally used to dynamically determine the number of partitions to consume.
 */
public interface PartitionAccepter {
  public enum Response {
    CONTINUE, STOP;
  }

  public Response accept(PartitionDetail partitionDetail);
}

 

With the above methods implemented in PartitionConsumerConfiguration, the flow of query and process will look like:

Code Block
languagejava
linenumberstrue
public class PartitionConsumer {

// TODO: move params to constructor?
  public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) {
    PartitionConsumerState state = partitionConsumerConfiguration.readState();
    // get new partitions in the dataset that match a partition PartitionFilter
    Iterator<Partition>int partitionslimit = getMaxWorkingSetSize() - state.getSize();
    Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit = getMaxWorkingSetSize() - state.getSize(), getPartitionFilter());
    // add these new partitions into the working set.
    state.addPartitions(partitions);

    // get a number of partitions from the working set, mark them IN_PROGRESS, and return them.
    Iterator<Partition> partitionsToProcess = state.getPartitions(getProcessingLimitgetConsumingAccepter());
    partitionConsumerConfiguration.persistState(state);
    // need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS.
    return partitionsToProcess;
  }
} 

...

Code Block
languagejava
linenumberstrue
/**
 * Worker which runs one iteration of processing new partitions.
 * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code.
 */
public class CustomWorker extends AbstractWorker {

  @Override
  public void run() {
    PartitionConsumer partitionConsumer = new PartitionConsumer();
    PartitionConsumerConfiguration pcc = ... // Design of user defined/implemented configuration - TBD
    PartitionConsumerResult partitionsToProcess;

    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        PartitionedFileSet pfs = context.getDataset("customPFS"); 
        partitionsToProcess = partitionConsumer.getPartitions(pfs, pcc);
      }
    });

    boolean success = true;
    try {
      // process partitionsToProcess
    } catch (Throwable t) {
      success = false;
      // log the error
    }
 
    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        if (success) {
   
      // this will, by default, remove them from the working set because they have successfully been processed
          pcc.onSuccess(partitionsToProcess);
        } else {
          // this will, by default, leave the partitions in the working set, and simply mark them as NEW
          pcc.onFailure(partitionsToProcess); 
        }pcc.onFinish(success, partitionsToProcess);
      }
    });
  }
}


From a MapReduce:

Code Block
languagejava
linenumberstrue
/**
 * MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}.
 */
public class DataCleansingMapReduce extends AbstractMapReduce {
 
  BatchPartitionConsumer batchPartitionConsumer;

  @Override
  public void beforeSubmit(MapReduceContext context) throws Exception {
    PartitionConsumerConfiguration pcc = ... // Design of user defined/implemented configuration - TBD
  
 batchPartitionConsumer = new BatchPartitionConsumer(pcc);
    PartitionedFileSet rawRecords = batchPartitionConsumer.getConfiguredDataset.setInput(context, DataCleansing.RAW_RECORDS);
    context.setInput(DataCleansing.RAW_RECORDSpfsName, rawRecordspcc);
     // set the output dataset as well as mapper and reducer classes
    ...
  }

  @Override
  public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
    if (succeeded) {
      batchPartitionConsumer.onSuccessonFinish(succeeded);

   } else {
     
batchPartitionConsumer.onFailure();
    }
  }
 
  // define the mapper and reducer classes
  ...
}

...

  1. What will be the format of the state / WorkingSet? How will it be serialized/deserialized?
    Instead of a numerical getProcessingLimit(), an alternative is to allow user to define a callback that will be called for each partition being fetched from the working set, and it returns either true or false to specify whether to add more partitions or not. This will allow users to take into consideration the partition's metadata (which could encode size of the files in that partition) and dynamically determine how many partitions to processAnswer: We will use the current state (PartitionConsumerState.java) as the cursor when requesting new partitions. In addition, we will have to store a working set of partitions. This will be two lists - a 'NEW' list of partitions, ready to be consumed as well as an 'IN PROGRESS' list of partitions that are currently being consumed. Each Partition will need to be a PartitionDetail, so that it also has its metadata. We will need to define a toBytes() method similar to the existing PartitionConsumerState#toBytes method. In addition, we will define a toJson() serialization for human readability.
  2. TBD - API for how the PartitionConsumerConfiguration will be configured.
  3. Limit the type of dataset (Table) that can be written to, for simplification of reading? This will make it easier to make inspection/debugging tools around the consuming states.