Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
/**
 * Defines configuration, used in the Partition consuming and processing pipeline.
 */
public interface PartitionConsumerConfiguration {

  // defines how/where the state is persisted and read from
  public byte[] readState();
  public void persistState(byte[] state);
 
  // defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing
  public PartitionFilter getPartitionFilter();

  // defines an upper bound on how many partitions can be in the working set at any given time
  public int getMaxWorkingSetSize();
 
  // defines whether to continue consuming partitions or not, when retrieving partitions from the working set
  public PartitionAccepter getConsumingAccepter();

  // defines an expiration timeout, in seconds, of IN_PROGRESS partitions
  public int getTimeOut();

  // called when processing of the retrieved partitions is successful
  public void onSuccess();
 
  // called when there is an error during consuming of partitions
  public void onFailure();
}

...

Code Block
languagejava
linenumberstrue
public class PartitionConsumer {

  // TODO: move params to constructor?
  public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) {
    PartitionConsumerState state = partitionConsumerConfiguration.readState();
    // get new partitions in the dataset that match a partition PartitionFilter
    int limit = getMaxWorkingSetSize() - state.getSize();
    Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit, getPartitionFilter());
    // add these new partitions into the working set.
    state.addPartitions(partitions);

    // get a number of partitions from the working set, mark them IN_PROGRESS, and return them.
    Iterator<Partition> partitionsToProcess = state.getPartitions(getConsumingAccepter());
    partitionConsumerConfiguration.persistState(state);
    // need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS.
    return partitionsToProcess;
  }
} 

...