...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Configuration parameters to be used by a {@link PartitionConsumer}. */ public class ConsumerConfiguration { ... /** * @return A predicate to be applied on {@link PartitionDetail}s to determine which partitions to include in the * partition consumption. */ public Predicate<PartitionDetail> getPartitionPredicate() { return partitionPredicate; } /** * @return An upper bound on the size of the working set of partitions that get serialized as part of the consumer's * state. */ public int getMaxWorkingSetSize() { return maxWorkingSetSize; } /** * Defines an expiration timeout, in seconds, of IN_PROGRESS partitions * @return number of seconds that a partition can be in progress before it is deemed failed. */ public long getTimeout() { return timeout; } |
Below is the API that the PartitionConsumer class exposes:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * ResponsibleIncrementally forconsumes determining whether to accept more Partitions. * Generally used to dynamically determine the number of partitions to consumenew/unprocessed {@link Partition}s of a {@link PartitionedFileSet}. */ public interface PartitionAccepterPartitionConsumer { public enum/** Response { * @return list CONTINUE, STOP; } public Response accept(PartitionDetail partitionDetail); } |
With the above methods implemented in PartitionConsumerConfiguration, the flow of query and process will look like:
Code Block | ||||
---|---|---|---|---|
| ||||
public class PartitionConsumer { // move params to constructor? public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) { PartitionConsumerState state = partitionConsumerConfiguration.readState(); // get new partitions in the dataset that match a partition PartitionFilter int limit = getMaxWorkingSetSize() - state.getSize(of {@link PartitionDetail}s that have not yet processed. */ List<PartitionDetail> consumePartitions(); /** * @param limit upper limit on number of partitions to consume * @return list of {@link PartitionDetail}s that have not yet processed. */ List<PartitionDetail> consumePartitions(int limit); /** Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit, getPartitionFilter()); // add these new partitions into the working set * This method must be called on any partitions returned by the {@code #consumePartitions} method. state.addPartitions(partitions);* * @param // get a number partitions list of partitions fromto themark workingas set,either marksucceeded them IN_PROGRESS, and return them. Iterator<Partition> partitionsToProcess = state.getPartitions(getConsumingAccepter()); partitionConsumerConfiguration.persistState(state); or failed processing * @param succeeded whether or not processing of the specified partitions was successful *// need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS. return partitionsToProcess; } } void onFinish(List<? extends Partition> partitions, boolean succeeded); } |
Below is usage of the ConsumerConfiguration class, which can be passed to the PartitionedFileSet, when requesting a PartitionConsumer
Code Block | ||||
---|---|---|---|---|
| ||||
ConsumerConfiguration.builder()
.setPartitionPredicate(predicate)
.setMaxWorkingSetSize(2000)
.build(); |
Example Usage
From a worker:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Worker which runs one iteration of processing new partitions. * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code. */ public class CustomWorker extends AbstractWorker { @Override public void run() { TransactionalPartitionConsumer partitionConsumer = new TransactionalPartitionConsumer(getContext(), "lines", new KVTableStatePersistor("consumingState", "state.key")); // request new partitions (with an upper limit of 10) final List<PartitionDetail> partitions = partitionConsumer.consumePartitions(10); boolean success = true; try { // process partitionsToProcess } catch (Throwable t) { success = false; // log the error } getContext().execute(new TxRunnable() { @Override public void run(DatasetContext context) throws Exception { partitionConsumer.onFinish(partitionsToProcess, success); } }); } } |
...