...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Defines configuration, used in the Partition consuming and processing pipeline.
*/
public interface PartitionConsumerConfiguration {
// defines how/where the state is persisted and read from
public byte[] readState();
public void persistState(byte[] state);
// defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing
public PartitionFilter getPartitionFilter();
// defines an upper bound on how many partitions can be in the working set at any given time
public int getMaxWorkingSetSize();
// defines whether to continue consuming partitions or not, when retrieving partitions from the working set
public PartitionAccepter getConsumingAccepter();
// defines an expiration timeout, in seconds, of IN_PROGRESS partitions
public int getTimeOut();
// called when processing of the retrieved partitions is successful
public void onSuccess();
// called when there is an error during consuming of partitions
public void onFailure();
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class PartitionConsumer { // TODO: move params to constructor? public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) { PartitionConsumerState state = partitionConsumerConfiguration.readState(); // get new partitions in the dataset that match a partition PartitionFilter int limit = getMaxWorkingSetSize() - state.getSize(); Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit, getPartitionFilter()); // add these new partitions into the working set. state.addPartitions(partitions); // get a number of partitions from the working set, mark them IN_PROGRESS, and return them. Iterator<Partition> partitionsToProcess = state.getPartitions(getConsumingAccepter()); partitionConsumerConfiguration.persistState(state); // need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS. return partitionsToProcess; } } |
...