Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue

/**
 * Configuration parameters to be used by a {@link PartitionConsumer}.
 */
public class ConsumerConfiguration {

  ...


  /**
   * @return A predicate to be applied on {@link PartitionDetail}s to determine which partitions to include in the
   *         partition consumption.
   */
  public Predicate<PartitionDetail> getPartitionPredicate() {
    return partitionPredicate;
  }

  /**
   * @return An upper bound on the size of the working set of partitions that get serialized as part of the consumer's
   *         state.
   */
  public int getMaxWorkingSetSize() {
    return maxWorkingSetSize;
  }

  /**
   * Defines an expiration timeout, in seconds, of IN_PROGRESS partitions
   * @return number of seconds that a partition can be in progress before it is deemed failed.
   */
  public long getTimeout() {
    return timeout;
  }


Below is the API that the PartitionConsumer class exposes:

Code Block
languagejava
linenumberstrue
/**
 * ResponsibleIncrementally forconsumes determining whether to accept more Partitions.
 * Generally used to dynamically determine the number of partitions to consumenew/unprocessed {@link Partition}s of a {@link PartitionedFileSet}.
 */
public interface PartitionAccepterPartitionConsumer {

 public enum/**
Response {  * @return list CONTINUE, STOP;
  }

  public Response accept(PartitionDetail partitionDetail);
}

 

With the above methods implemented in PartitionConsumerConfiguration, the flow of query and process will look like:

Code Block
languagejava
linenumberstrue
public class PartitionConsumer {

  // move params to constructor?
  public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) {
    PartitionConsumerState state = partitionConsumerConfiguration.readState();
    // get new partitions in the dataset that match a partition PartitionFilter
    int limit = getMaxWorkingSetSize() - state.getSize(of {@link PartitionDetail}s that have not yet processed.
   */
  List<PartitionDetail> consumePartitions();

  /**
   * @param limit upper limit on number of partitions to consume
   * @return list of {@link PartitionDetail}s that have not yet processed.
   */
  List<PartitionDetail> consumePartitions(int limit);

  /**
Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit, getPartitionFilter());
    // add these new partitions into the working set * This method must be called on any partitions returned by the {@code #consumePartitions} method.
    state.addPartitions(partitions);*
   * @param // get a number partitions list of partitions fromto themark workingas set,either marksucceeded them IN_PROGRESS, and return them.
    Iterator<Partition> partitionsToProcess = state.getPartitions(getConsumingAccepter());
    partitionConsumerConfiguration.persistState(state);
or failed processing
   * @param succeeded whether or not processing of the specified partitions was successful
   *//
 need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS.
    return partitionsToProcess;
  }
} void onFinish(List<? extends Partition> partitions, boolean succeeded);

}

 

Below is usage of the ConsumerConfiguration class, which can be passed to the PartitionedFileSet, when requesting a PartitionConsumer

Code Block
languagejava
linenumberstrue
ConsumerConfiguration.builder()
  .setPartitionPredicate(predicate)
  .setMaxWorkingSetSize(2000)
  .build();

 

Example Usage

From a worker: 

Code Block
languagejava
linenumberstrue
/**
 * Worker which runs one iteration of processing new partitions.
 * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code.
 */
public class CustomWorker extends AbstractWorker {

  @Override
  public void run() {
    TransactionalPartitionConsumer partitionConsumer =
      new TransactionalPartitionConsumer(getContext(), "lines",
                                         new KVTableStatePersistor("consumingState", "state.key"));
    // request new partitions (with an upper limit of 10)
    final List<PartitionDetail> partitions = partitionConsumer.consumePartitions(10);
    
    boolean success = true;
    try {
      // process partitionsToProcess
    } catch (Throwable t) {
      success = false;
      // log the error
    }
 
    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        partitionConsumer.onFinish(partitionsToProcess, success);
      }
    });
  }
}

...