Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
/**
 * Defines configuration, used in the Partition consuming and processing pipeline.
 */
public interface PartitionConsumerConfiguration {

  // defines how/where the state is persisted and read from
  public byte[] readState();
  public void persistState(byte[] state);
 
  // defines a PartitionFilter, defining which partitions to entirely omit from the querying/processing
  public PartitionFilter getPartitionFilter();

  // defines an upper bound on how many partitions can be in the working set at any given time
  public int getMaxWorkingSetSize();
 
  // defines whether to continue consuming partitions or not, when retrieving partitions from the working set
  public PartitionAccepter getConsumingAccepter();

  // defines an expiration timeout, in seconds, of IN_PROGRESS partitions
  public int getTimeOut();

  // called when processing of the retrieved partitions is successful
  public void onSuccess(PartitionConsumerResult partitionConsumerResult);
 
  // called when there is an error during consuming of partitions
  public void onFailure(PartitionConsumerResult partitionConsumerResult);
}
Code Block
languagejava
linenumberstrue
/**
 * Responsible for determining whether to accept more Partitions.
 * Generally used to dynamically determine the number of partitions to consume.
 */
public interface PartitionAccepter {
  public enum Response {
    CONTINUE, STOP;
  }

  public Response accept(PartitionDetail partitionDetail);
}

...

Code Block
languagejava
linenumberstrue
/**
 * Worker which runs one iteration of processing new partitions.
 * Note: As we see frequent patterns, we can abstract the common parts of the example usage into CDAP code.
 */
public class CustomWorker extends AbstractWorker {

  @Override
  public void run() {
    PartitionConsumer partitionConsumer = new PartitionConsumer();
    PartitionConsumerConfiguration pcc = ... // Design of user defined/implemented configuration - TBD-implemented/-defined PartitionConsumerConfiguration
    PartitionConsumerResult partitionsToProcess;

    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        PartitionedFileSet pfs = context.getDataset("customPFS"); 
        partitionsToProcess = partitionConsumer.getPartitions(pfs, pcc);
      }
    });

    boolean success = true;
    try {
      // process partitionsToProcess
    } catch (Throwable t) {
      success = false;
      // log the error
    }
 
    getContext().execute(new TxRunnable() {
      @Override
      public void run(DatasetContext context) throws Exception {
        pcc.onFinish(success, partitionsToProcess);
      }
    });
  }
}

...

Code Block
languagejava
linenumberstrue
/**
 * MapReduce job which incrementally processes new partitions of a {@link PartitionedFileSet}.
 */
public class DataCleansingMapReduce extends AbstractMapReduce {
 
  BatchPartitionConsumer batchPartitionConsumer;

  @Override
  public void beforeSubmit(MapReduceContext context) throws Exception {
    PartitionConsumerConfiguration pcc = ... // Design of user defined/implemented configuration - TBDuser-implemented/-defined PartitionConsumerConfiguration
    BatchPartitionConsumer.setInput(context, pfsName, pcc);
    // set the output dataset as well as mapper and reducer classes
    ...
  }

  @Override
  public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
    batchPartitionConsumer.onFinish(succeeded);
  }
 
  // define the mapper and reducer classes
  ...
}

...