...
- User should be able to run a workflow to process new partitions of a PFS.
- User should be able to limit the number of partitions I process in each run of the workflow.
- User should be able to apply a PartitionFilter over the partitions to be processed by a workflow.
- User should be able to run multiple instances of the workflow, each processing its own set of partitions.
- User should be able to retry the processing of a partition a configurable number of times, if there is a failure to process it.
- User should be able to start or end the incremental processing at a particular start/end timestamp.
- User should be able to query for new partitions outside of a workflow/MapReduce (for instance, from a Worker).
Related JIRAs:
BatchPartitionConsumer should support a limit to read Jira Legacy server Cask Community Issue Tracker serverId 45b48dee-c8d6-34f0-9990-e6367dc2fe4b key CDAP-3102
https://issues.cask.co/browse/CDAP-3103 BatchPartitionConsumer should allow a partition filter
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * DefinesConfiguration configuration,parameters usedto inbe theused Partitionby consuminga and processing pipeline{@link PartitionConsumer}. */ public interfaceclass PartitionConsumerConfigurationConsumerConfiguration { ... //** defines how/where* the@return stateA ispredicate persistedto andbe readapplied fromon {@link public byte[] readState(); public void persistState(byte[] state); // defines a PartitionFilter, defining which partitions to entirely omit from the querying/processingPartitionDetail}s to determine which partitions to include in the * partition consumption. */ public PartitionFilterPredicate<PartitionDetail> getPartitionFiltergetPartitionPredicate() { return partitionPredicate; } // defines an /** * @return An upper bound on howthe many partitions can be insize of the working set atof anypartitions giventhat timeget serialized as publicpart int getMaxWorkingSetSize(); of the consumer's // defines* whether to continue consuming partitions or not, when retrievingstate. partitions from the working set*/ public PartitionAccepterint getConsumingAcceptergetMaxWorkingSetSize() { return maxWorkingSetSize; } // defines** * Defines an expiration timeout, in seconds, of IN_PROGRESS partitions public * int getTimeOut(); // called when processing of the retrieved partitions is successful public void onSuccess(); // called when there is an error during consuming of partitions public void onFailure(); }@return number of seconds that a partition can be in progress before it is deemed failed. */ public long getTimeout() { return timeout; } |
Below is the API that the PartitionConsumer class exposes:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * ResponsibleIncrementally forconsumes determining whether to accept more Partitions. * Generally used to dynamically determine the number of partitions to consumenew/unprocessed {@link Partition}s of a {@link PartitionedFileSet}. */ public interface PartitionAccepterPartitionConsumer { public enum/** Response { * @return list CONTINUE, STOP; } public Response accept(PartitionDetail partitionDetail); } |
With the above methods implemented in PartitionConsumerConfiguration, the flow of query and process will look like:
Code Block | ||||
---|---|---|---|---|
| ||||
public class PartitionConsumer { // move params to constructor? public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) { PartitionConsumerState state = partitionConsumerConfiguration.readState(); // get new partitions in the dataset that match a partition PartitionFilter int limit = getMaxWorkingSetSize() - state.getSize(of {@link PartitionDetail}s that have not yet processed. */ List<PartitionDetail> consumePartitions(); /** * @param limit upper limit on number of partitions to consume * @return list of {@link PartitionDetail}s that have not yet processed. */ List<PartitionDetail> consumePartitions(int limit); /** Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit, getPartitionFilter()); // add these new partitions into the working set * This method must be called on any partitions returned by the {@code #consumePartitions} method. state.addPartitions(partitions);* * @param // get a numberpartitions list of partitions fromto themark workingas set,either marksucceeded them IN_PROGRESS, and return them. Iterator<Partition> partitionsToProcess = state.getPartitions(getConsumingAccepter()); partitionConsumerConfiguration.persistState(state); or failed processing * @param succeeded whether or not processing of the specified partitions was successful *// need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS. return partitionsToProcess; } } void onFinish(List<? extends Partition> partitions, boolean succeeded); } |
Below is usage of the ConsumerConfiguration class, which can be passed to the PartitionedFileSet, when requesting a PartitionConsumer
Code Block | ||||
---|---|---|---|---|
| ||||
ConsumerConfiguration.builder()
.setPartitionPredicate(predicate)
.setMaxWorkingSetSize(2000)
.build(); |
Example Usage
From a worker:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Worker which runs one iteration of processing new partitions. * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code. */ public class CustomWorker extends AbstractWorker { @Override public void run() { PartitionConsumerTransactionalPartitionConsumer partitionConsumer = new PartitionConsumerTransactionalPartitionConsumer(getContext();, "lines", PartitionConsumerConfiguration pcc = ... // Design of user defined/implemented configuration - TBD PartitionConsumerResult partitionsToProcess; getContext().execute(new TxRunnable() { @Override new public void run(DatasetContext context) throws Exception { PartitionedFileSet pfs = context.getDataset("customPFS"); KVTableStatePersistor("consumingState", "state.key")); // request new partitions (with an upper limit of 10) final List<PartitionDetail> partitionsToProcesspartitions = partitionConsumer.getPartitions(pfs, pccconsumePartitions(10); } }); boolean success = true; try { // process partitionsToProcess } catch (Throwable t) { success = false; // log the error } getContext().execute(new TxRunnable() { @Override public void run(DatasetContext context) throws Exception { pccpartitionConsumer.onFinish(successpartitionsToProcess, partitionsToProcesssuccess); } }); } } |
From a MapReduce:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}. */ public class DataCleansingMapReduce extends AbstractMapReduce { private BatchPartitionConsumerPartitionBatchInput.BatchPartitionCommitter batchPartitionConsumerpartitionCommitter; @Override public void beforeSubmit(MapReduceContext context) throws Exception { partitionCommitter = PartitionConsumerConfiguration pcc = PartitionBatchInput... // Design of user defined/implemented configuration - TBDsetInput(context, DataCleansing.RAW_RECORDS, new BatchPartitionConsumer.setInput(context, pfsName, pccKVTableStatePersistor(DataCleansing.CONSUMING_STATE, "state.key")); // set the output dataset as well as mapper and reducer classes ... } @Override public void onFinish(boolean succeeded, MapReduceContext context) throws Exception { batchPartitionConsumerpartitionCommitter.onFinish(succeeded); } // define the mapper and reducer classes ... } |
...
- What will be the format of the state / WorkingSet? How will it be serialized/deserialized?
Answer: We will use the current state (PartitionConsumerState.java) as the cursor when requesting new partitions. In addition, we will have to store a working set of partitions. This will be two lists - a 'NEW' list of partitions, ready to be consumed as well as an 'IN PROGRESS' list of partitions that are currently being consumed. Each Partition will need to be a PartitionDetail, so that it also has its metadata. We will need to define a toBytes() method similar to the existing PartitionConsumerState#toBytes method. In addition, we will define a toJson() serialization for human readability.TBD - API for how the PartitionConsumerConfiguration will be configured. - Limit the type of dataset (Table) that can be written to, for simplification of reading? This will make it easier to make inspection/debugging tools around the consuming states.
For now, there's a helper class: KVTableStatePersistor.