...
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
/**
* DefinesConfiguration configuration,parameters usedto inbe theused Partitionby consuminga and processing pipeline{@link PartitionConsumer}.
*/
public interfaceclass PartitionConsumerConfigurationConsumerConfiguration {
...
// defines how/where the state is persisted and read from
public byte[] readState();
public void persistState(byte[] state);
// defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing
public PartitionFilter getPartitionFilter();
// defines an /**
* @return A predicate to be applied on {@link PartitionDetail}s to determine which partitions to include in the
* partition consumption.
*/
public Predicate<PartitionDetail> getPartitionPredicate() {
return partitionPredicate;
}
/**
* @return An upper bound on howthe many partitions can be insize of the working set atof anypartitions giventhat timeget serialized as publicpart int getMaxWorkingSetSize();
of the consumer's
// defines* whether to continue consuming partitions or not, when retrieving partitionsstate.
from the working set*/
public PartitionAccepterint getConsumingAcceptergetMaxWorkingSetSize() {
return maxWorkingSetSize;
}
// defines**
* Defines an expiration timeout, in seconds, of IN_PROGRESS partitions
public* int getTimeOut();
// called when processing of the retrieved partitions is successful
public void onSuccess(PartitionConsumerResult partitionConsumerResult);
// called when there is an error during consuming of partitions
public void onFailure(PartitionConsumerResult partitionConsumerResult);
}@return number of seconds that a partition can be in progress before it is deemed failed.
*/
public long getTimeout() {
return timeout;
}
|
Below is the API that the PartitionConsumer class exposes:
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
/**
* ResponsibleIncrementally forconsumes determining whether to accept more Partitions.
* Generally used to dynamically determine the number of partitions to consumenew/unprocessed {@link Partition}s of a {@link PartitionedFileSet}.
*/
public interface PartitionAccepterPartitionConsumer {
public enum/**
Response { * @return list CONTINUE, STOP;
}
public Response accept(PartitionDetail partitionDetail);
} |
With the above methods implemented in PartitionConsumerConfiguration, the flow of query and process will look like:
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
public class PartitionConsumer {
// move params to constructor?
public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) {
PartitionConsumerState state = partitionConsumerConfiguration.readState();
// get new partitions in the dataset that match a partition PartitionFilter
int limit = getMaxWorkingSetSize() - state.getSize(of {@link PartitionDetail}s that have not yet processed.
*/
List<PartitionDetail> consumePartitions();
/**
* @param limit upper limit on number of partitions to consume
* @return list of {@link PartitionDetail}s that have not yet processed.
*/
List<PartitionDetail> consumePartitions(int limit);
/**
Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit, getPartitionFilter());
// add these new partitions into the working set * This method must be called on any partitions returned by the {@code #consumePartitions} method.
*
state.addPartitions(partitions); * @param // get a numberpartitions list of partitions fromto themark workingas set,either marksucceeded them IN_PROGRESS, and return them.
Iterator<Partition> partitionsToProcess = state.getPartitions(getConsumingAccepter());
partitionConsumerConfiguration.persistState(state);
or failed processing
* @param succeeded whether or not processing of the specified partitions was successful
*//
need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS.
return partitionsToProcess;
}
} void onFinish(List<? extends Partition> partitions, boolean succeeded);
} |
Below is usage of the ConsumerConfiguration class, which can be passed to the PartitionedFileSet, when requesting a PartitionConsumer
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
ConsumerConfiguration.builder()
.setPartitionPredicate(predicate)
.setMaxWorkingSetSize(2000)
.build(); |
Example Usage
From a worker:
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
/**
* Worker which runs one iteration of processing new partitions.
* Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code.
*/
public class CustomWorker extends AbstractWorker {
@Override
public void run() {
PartitionConsumerTransactionalPartitionConsumer partitionConsumer =
new PartitionConsumerTransactionalPartitionConsumer(getContext();, "lines",
PartitionConsumerConfiguration pcc = ... // user-implemented/-defined PartitionConsumerConfiguration PartitionConsumerResult partitionsToProcess; getContext().execute(new TxRunnable() { @Override public voidnew run(DatasetContext context) throws Exception {KVTableStatePersistor("consumingState", "state.key"));
// request new partitions (with an PartitionedFileSetupper pfslimit = context.getDataset("customPFS"); of 10)
final List<PartitionDetail> partitionsToProcesspartitions = partitionConsumer.getPartitions(pfs, pcc);
}
consumePartitions(10);
});
boolean success = true;
try {
// process partitionsToProcess
} catch (Throwable t) {
success = false;
// log the error
}
getContext().execute(new TxRunnable() {
@Override
public void run(DatasetContext context) throws Exception {
pccpartitionConsumer.onFinish(successpartitionsToProcess, partitionsToProcesssuccess);
}
});
}
} |
From a MapReduce:
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
/**
* MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}.
*/
public class DataCleansingMapReduce extends AbstractMapReduce {
private BatchPartitionConsumerPartitionBatchInput.BatchPartitionCommitter batchPartitionConsumerpartitionCommitter;
@Override
public void beforeSubmit(MapReduceContext context) throws Exception {
PartitionConsumerConfiguration pccpartitionCommitter =
... // user-implemented/-defined PartitionConsumerConfiguration BatchPartitionConsumerPartitionBatchInput.setInput(context, DataCleansing.RAW_RECORDS, pfsName, pcc
new KVTableStatePersistor(DataCleansing.CONSUMING_STATE, "state.key"));
// set the output dataset as well as mapper and reducer classes
...
}
@Override
public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
batchPartitionConsumerpartitionCommitter.onFinish(succeeded);
}
// define the mapper and reducer classes
...
} |
...
- What will be the format of the state / WorkingSet? How will it be serialized/deserialized?
Answer: We will use the current state (PartitionConsumerState.java) as the cursor when requesting new partitions. In addition, we will have to store a working set of partitions. This will be two lists - a 'NEW' list of partitions, ready to be consumed as well as an 'IN PROGRESS' list of partitions that are currently being consumed. Each Partition will need to be a PartitionDetail, so that it also has its metadata. We will need to define a toBytes() method similar to the existing PartitionConsumerState#toBytes method. In addition, we will define a toJson() serialization for human readability.TBD - API for how the PartitionConsumerConfiguration will be configured. - Limit the type of dataset (Table) that can be written to, for simplification of reading? This will make it easier to make inspection/debugging tools around the consuming states.
For now, there's a helper class: KVTableStatePersistor.