Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
/**
 * DefinesConfiguration configuration,parameters usedto inbe theused Partitionby consuminga and processing pipeline{@link PartitionConsumer}.
 */
public interfaceclass PartitionConsumerConfigurationConsumerConfiguration {

  ...

// defines how/where the state is persisted and read from
  public byte[] readState();
  public void persistState(byte[] state);
 
  // defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing
  public PartitionFilter getPartitionFilter();

  // defines an  /**
   * @return A predicate to be applied on {@link PartitionDetail}s to determine which partitions to include in the
   *         partition consumption.
   */
  public Predicate<PartitionDetail> getPartitionPredicate() {
    return partitionPredicate;
  }

  /**
   * @return An upper bound on howthe many partitions can be insize of the working set atof anypartitions giventhat timeget serialized as publicpart int getMaxWorkingSetSize();
 of the consumer's
  // defines* whether to continue consuming partitions or not, when retrieving partitionsstate.
from the working set*/
  public PartitionAccepterint getConsumingAcceptergetMaxWorkingSetSize() {
    return maxWorkingSetSize;
  }

  // defines**
   * Defines an expiration timeout, in seconds, of IN_PROGRESS partitions
   public* int getTimeOut();

  // called when processing of the retrieved partitions is successful
  public void onSuccess(PartitionConsumerResult partitionConsumerResult);
 
  // called when there is an error during consuming of partitions
  public void onFailure(PartitionConsumerResult partitionConsumerResult);
}@return number of seconds that a partition can be in progress before it is deemed failed.
   */
  public long getTimeout() {
    return timeout;
  }


Below is the API that the PartitionConsumer class exposes:

Code Block
languagejava
linenumberstrue
/**
 * ResponsibleIncrementally forconsumes determining whether to accept more Partitions.
 * Generally used to dynamically determine the number of partitions to consumenew/unprocessed {@link Partition}s of a {@link PartitionedFileSet}.
 */
public interface PartitionAccepterPartitionConsumer {

 public enum/**
Response {  * @return list CONTINUE, STOP;
  }

  public Response accept(PartitionDetail partitionDetail);
}

 

With the above methods implemented in PartitionConsumerConfiguration, the flow of query and process will look like:

Code Block
languagejava
linenumberstrue
public class PartitionConsumer {

  // move params to constructor?
  public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) {
    PartitionConsumerState state = partitionConsumerConfiguration.readState();
    // get new partitions in the dataset that match a partition PartitionFilter
    int limit = getMaxWorkingSetSize() - state.getSize(of {@link PartitionDetail}s that have not yet processed.
   */
  List<PartitionDetail> consumePartitions();

  /**
   * @param limit upper limit on number of partitions to consume
   * @return list of {@link PartitionDetail}s that have not yet processed.
   */
  List<PartitionDetail> consumePartitions(int limit);

  /**
Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit, getPartitionFilter());
    // add these new partitions into the working set * This method must be called on any partitions returned by the {@code #consumePartitions} method.
   *
state.addPartitions(partitions);   * @param  // get a numberpartitions list of partitions fromto themark workingas set,either marksucceeded them IN_PROGRESS, and return them.
    Iterator<Partition> partitionsToProcess = state.getPartitions(getConsumingAccepter());
    partitionConsumerConfiguration.persistState(state);
or failed processing
   * @param succeeded whether or not processing of the specified partitions was successful
   *//
 need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS.
    return partitionsToProcess;
  }
} void onFinish(List<? extends Partition> partitions, boolean succeeded);

}

 

Below is usage of the ConsumerConfiguration class, which can be passed to the PartitionedFileSet, when requesting a PartitionConsumer

Code Block
languagejava
linenumberstrue
ConsumerConfiguration.builder()
  .setPartitionPredicate(predicate)
  .setMaxWorkingSetSize(2000)
  .build();

 

Example Usage

From a worker: 

Code Block
languagejava
linenumberstrue
/**
 * Worker which runs one iteration of processing new partitions.
 * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code.
 */
public class CustomWorker extends AbstractWorker {

  @Override
  public void run() {
    PartitionConsumerTransactionalPartitionConsumer partitionConsumer =
      new PartitionConsumerTransactionalPartitionConsumer(getContext();, "lines",
      PartitionConsumerConfiguration pcc = ... // user-implemented/-defined PartitionConsumerConfiguration     PartitionConsumerResult partitionsToProcess;      getContext().execute(new TxRunnable() {       @Override       public voidnew run(DatasetContext context) throws Exception {KVTableStatePersistor("consumingState", "state.key"));
    // request new partitions (with an PartitionedFileSetupper pfslimit = context.getDataset("customPFS"); of 10)
    final List<PartitionDetail>   partitionsToProcesspartitions = partitionConsumer.getPartitions(pfs, pcc);
      }
consumePartitions(10);
   }); 
    boolean success = true;
    try {
      // process partitionsToProcess
    } catch (Throwable t) {
      success = false;
      // log the error
    }
 
    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        pccpartitionConsumer.onFinish(successpartitionsToProcess, partitionsToProcesssuccess);
      }
    });
  }
}


From a MapReduce:

Code Block
languagejava
linenumberstrue
/**
 * MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}.
 */
public class DataCleansingMapReduce extends AbstractMapReduce {
 
  private BatchPartitionConsumerPartitionBatchInput.BatchPartitionCommitter batchPartitionConsumerpartitionCommitter;

  @Override
  public void beforeSubmit(MapReduceContext context) throws Exception {
    PartitionConsumerConfiguration pccpartitionCommitter =
... // user-implemented/-defined PartitionConsumerConfiguration     BatchPartitionConsumerPartitionBatchInput.setInput(context, DataCleansing.RAW_RECORDS, pfsName, pcc
                                   new KVTableStatePersistor(DataCleansing.CONSUMING_STATE, "state.key"));
    // set the output dataset as well as mapper and reducer classes
    ...
  }

  @Override
  public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
    batchPartitionConsumerpartitionCommitter.onFinish(succeeded);
  }
 
  // define the mapper and reducer classes
  ...
}

...

  1. What will be the format of the state / WorkingSet? How will it be serialized/deserialized?
    Answer: We will use the current state (PartitionConsumerState.java) as the cursor when requesting new partitions. In addition, we will have to store a working set of partitions. This will be two lists - a 'NEW' list of partitions, ready to be consumed as well as an 'IN PROGRESS' list of partitions that are currently being consumed. Each Partition will need to be a PartitionDetail, so that it also has its metadata. We will need to define a toBytes() method similar to the existing PartitionConsumerState#toBytes method. In addition, we will define a toJson() serialization for human readability.TBD - API for how the PartitionConsumerConfiguration will be configured.
  2. Limit the type of dataset (Table) that can be written to, for simplification of reading? This will make it easier to make inspection/debugging tools around the consuming states.
    For now, there's a helper class: KVTableStatePersistor.