Multiple Partition Consumers and Other Customizability
Requirements
- User should be able to run a workflow to process new partitions of a PFS.
- User should be able to limit the number of partitions I process in each run of the workflow.
- User should be able to apply a PartitionFilter over the partitions to be processed by a workflow.
- User should be able to run multiple instances of the workflow, each processing its own set of partitions.
- User should be able to retry the processing of a partition a configurable number of times, if there is a failure to process it.
- User should be able to start or end the incremental processing at a particular start/end timestamp.
- User should be able to query for new partitions outside of a workflow/MapReduce (for instance, from a Worker).
Related JIRAs:
https://issues.cask.co/browse/CDAP-3103Â BatchPartitionConsumer should allow a partition filter
https://issues.cask.co/browse/CDAP-3944Â Multiple BatchPartitionConsumer consumers
Â
Overview
A majority of the underlying/necessary implementation to fulfill incremental partition consuming was done in the first implementation of BatchPartitionConsumer, in CDAP 3.1.0. This involved having indexing each partition by its creation time (more correctly, its transaction’s write pointer).
https://issues.cask.co/browse/CDAP-2747
https://github.com/caskdata/cdap/pull/3075
Â
The pending work is improving this implementation to allow features such as multiple workflow instances (which involves keeping a more complex state) and customization of the query and process pipeline.Â
Â
Overall Design
Overall, the flow of incremental processing of partitions will compose of these components:
- Read the current processing state. This will encode the working set of partitions.
- Query for new partitions from the PFS and add them to this state / working set.
- The current processing instance can select partitions from this working set and mark them as IN_PROGRESS.
- Persist the state (which includes the working set).
- Perform the custom processing on these partitions
- Execute some onSuccess or onFinish callbacks for these processed partitions. For instance, marking them as NEW again upon failure, or removing them from the working set on success.
Â
API Design
/** * Configuration parameters to be used by a {@link PartitionConsumer}. */ public class ConsumerConfiguration { ... /** * @return A predicate to be applied on {@link PartitionDetail}s to determine which partitions to include in the * partition consumption. */ public Predicate<PartitionDetail> getPartitionPredicate() { return partitionPredicate; } /** * @return An upper bound on the size of the working set of partitions that get serialized as part of the consumer's * state. */ public int getMaxWorkingSetSize() { return maxWorkingSetSize; } /** * Defines an expiration timeout, in seconds, of IN_PROGRESS partitions * @return number of seconds that a partition can be in progress before it is deemed failed. */ public long getTimeout() { return timeout; }
Below is the API that the PartitionConsumer class exposes:
/** * Incrementally consumes new/unprocessed {@link Partition}s of a {@link PartitionedFileSet}. */ public interface PartitionConsumer { /** * @return list of {@link PartitionDetail}s that have not yet processed. */ List<PartitionDetail> consumePartitions(); /** * @param limit upper limit on number of partitions to consume * @return list of {@link PartitionDetail}s that have not yet processed. */ List<PartitionDetail> consumePartitions(int limit); /** * This method must be called on any partitions returned by the {@code #consumePartitions} method. * * @param partitions list of partitions to mark as either succeeded or failed processing * @param succeeded whether or not processing of the specified partitions was successful */ void onFinish(List<? extends Partition> partitions, boolean succeeded); }
Â
Below is usage of the ConsumerConfiguration class, which can be passed to the PartitionedFileSet, when requesting a PartitionConsumer
ConsumerConfiguration.builder() .setPartitionPredicate(predicate) .setMaxWorkingSetSize(2000) .build();
Â
Example Usage
From a worker:Â
/** * Worker which runs one iteration of processing new partitions. * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code. */ public class CustomWorker extends AbstractWorker { @Override public void run() { TransactionalPartitionConsumer partitionConsumer = new TransactionalPartitionConsumer(getContext(), "lines", new KVTableStatePersistor("consumingState", "state.key")); // request new partitions (with an upper limit of 10) final List<PartitionDetail> partitions = partitionConsumer.consumePartitions(10); boolean success = true; try { // process partitionsToProcess } catch (Throwable t) { success = false; // log the error } Â getContext().execute(new TxRunnable() { @Override public void run(DatasetContext context) throws Exception { partitionConsumer.onFinish(partitionsToProcess, success); } }); } }
From a MapReduce:
/** * MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}. */ public class DataCleansingMapReduce extends AbstractMapReduce { Â private PartitionBatchInput.BatchPartitionCommitter partitionCommitter; @Override public void beforeSubmit(MapReduceContext context) throws Exception { partitionCommitter = PartitionBatchInput.setInput(context, DataCleansing.RAW_RECORDS, new KVTableStatePersistor(DataCleansing.CONSUMING_STATE, "state.key")); // set the output dataset as well as mapper and reducer classes ... } @Override public void onFinish(boolean succeeded, MapReduceContext context) throws Exception { partitionCommitter.onFinish(succeeded); } Â // define the mapper and reducer classes ... }
Pending Questions
- What will be the format of the state / WorkingSet? How will it be serialized/deserialized?
Answer: We will use the current state (PartitionConsumerState.java) as the cursor when requesting new partitions. In addition, we will have to store a working set of partitions. This will be two lists - a 'NEW' list of partitions, ready to be consumed as well as an 'IN PROGRESS' list of partitions that are currently being consumed. Each Partition will need to be a PartitionDetail, so that it also has its metadata. We will need to define a toBytes() method similar to the existing PartitionConsumerState#toBytes method. In addition, we will define a toJson() serialization for human readability. - Limit the type of dataset (Table) that can be written to, for simplification of reading? This will make it easier to make inspection/debugging tools around the consuming states.
For now, there's a helper class:Â KVTableStatePersistor.
Â