...
Code Block | ||||
---|---|---|---|---|
| ||||
public class PartitionConsumer { public Iterator<Partition> getPartitions(PartitionedFileSet pfs, PartitionConsumerConfiguration, partitionConsumerConfiguration) { PartitionConsumerState state = partitionConsumerConfiguration.readState(); // get new partitions in the dataset that match a partition PartitionFilter Iterator<Partition> partitions = pfsDataset.consumePartitions(state.getPointer(), limit = getMaxWorkingSetSize() - state.getSize(), getPartitionFilter()); // add these new partitions into the working set. state.addPartitions(partitions); // get a number of partitions from the working set, mark them IN_PROGRESS, and return them. Iterator<Partition> partitionsToProcess = state.getPartitions(getProcessingLimit()); partitionConsumerConfiguration.persistState(state); // need to commit this transaction now, so that other instances of the processing entity see these partitions as IN_PROGRESS. return partitionsToProcess; } } |
Example Usage
From a worker:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Worker which runs one iteration of processing new partitions. * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code. */ public class CustomWorker extends AbstractWorker { @Override public void run() { PartitionConsumer partitionConsumer = new PartitionConsumer(); PartitionConsumerConfiguration pcc = ... // Design of user- defined/implemented configuration - TBD PartitionConsumerResult partitionsToProcess; getContext().execute(new TxRunnable() { @Override public void run(DatasetContext context) throws Exception { PartitionedFileSet pfs = context.getDataset("customPFS"); partitionsToProcess = partitionConsumer.getPartitions(pfs, pcc); } }); boolean success = true; try { // process partitionsToProcess } catch (Throwable t) { success = false; // log the error } getContext().execute(new TxRunnable() { @Override public void run(DatasetContext context) throws Exception { if (success) { // this will, by default, remove them from the working set because they have successfully been processed pcc.onSuccess(partitionsToProcess); } else { // this will, by default, leave the partitions in the working set, and simply mark them as NEW pcc.onFailure(partitionsToProcess); } } }); } } |
From a MapReduce:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}.
*/
public class DataCleansingMapReduce extends AbstractMapReduce {
BatchPartitionConsumer batchPartitionConsumer;
@Override
public void beforeSubmit(MapReduceContext context) throws Exception {
PartitionConsumerConfiguration pcc = ... // Design of user defined/implemented configuration - TBD
batchPartitionConsumer = new BatchPartitionConsumer(pcc);
PartitionedFileSet rawRecords = batchPartitionConsumer.getConfiguredDataset(context, DataCleansing.RAW_RECORDS);
context.setInput(DataCleansing.RAW_RECORDS, rawRecords);
// set the output dataset as well as mapper and reducer classes
...
}
@Override
public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
if (succeeded) {
batchPartitionConsumer.onSuccess();
} else {
batchPartitionConsumer.onFailure();
}
}
// define the mapper and reducer classes
...
} |
Pending Questions
- What will be the format of the state / WorkingSet? How will it be serialized/deserialized?
- Instead of a numerical getProcessingLimit(), an alternative is to allow user to define a callback that will be called for each partition being fetched from the working set, and it returns either true or false to specify whether to add more partitions or not. This will allow users to take into consideration the partition's metadata (which could encode size of the files in that partition) and dynamically determine how many partitions to process.