...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Defines configuration, used in the Partition consuming and processing pipeline. */ public interface PartitionConsumerConfiguration { // defines how/where the state is persisted and read from public byte[] readState(); public void persistState(byte[] state); // defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing public PartitionFilter getPartitionFilter(); // defines an upper bound on how many partitions can be in the working set at any given time public int getMaxWorkingSetSize(); // defines whether to continue consuming partitions or not, when retrieving partitions from the working set public PartitionAccepter getConsumingAccepter(); // defines an expiration timeout, in seconds, of IN_PROGRESS partitions public int getTimeOut(); // called when processing of the retrieved partitions is successful public void onSuccess(PartitionConsumerResult partitionConsumerResult); // called when there is an error during consuming of partitions public void onFailure(PartitionConsumerResult partitionConsumerResult); } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Responsible for determining whether to accept more Partitions. * Generally used to dynamically determine the number of partitions to consume. */ public interface PartitionAccepter { public enum Response { CONTINUE, STOP; } public Response accept(PartitionDetail partitionDetail); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Worker which runs one iteration of processing new partitions. * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code. */ public class CustomWorker extends AbstractWorker { @Override public void run() { PartitionConsumer partitionConsumer = new PartitionConsumer(); PartitionConsumerConfiguration pcc = ... // Design of user defined/implemented configuration - TBD-implemented/-defined PartitionConsumerConfiguration PartitionConsumerResult partitionsToProcess; getContext().execute(new TxRunnable() { @Override public void run(DatasetContext context) throws Exception { PartitionedFileSet pfs = context.getDataset("customPFS"); partitionsToProcess = partitionConsumer.getPartitions(pfs, pcc); } }); boolean success = true; try { // process partitionsToProcess } catch (Throwable t) { success = false; // log the error } getContext().execute(new TxRunnable() { @Override public void run(DatasetContext context) throws Exception { pcc.onFinish(success, partitionsToProcess); } }); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}. */ public class DataCleansingMapReduce extends AbstractMapReduce { BatchPartitionConsumer batchPartitionConsumer; @Override public void beforeSubmit(MapReduceContext context) throws Exception { PartitionConsumerConfiguration pcc = ... // Design of user defined/implemented configuration - TBDuser-implemented/-defined PartitionConsumerConfiguration BatchPartitionConsumer.setInput(context, pfsName, pcc); // set the output dataset as well as mapper and reducer classes ... } @Override public void onFinish(boolean succeeded, MapReduceContext context) throws Exception { batchPartitionConsumer.onFinish(succeeded); } // define the mapper and reducer classes ... } |
...