Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Note: Additional details and implementation specifics will be added over time.

Andreas: This needs more details use cases. I know a few:

  • a worker that looks for new partitions continuously
  • a workflow that runs periodically and consumes all new partitions each time
  • a workflow that catches up with old partitions while new partitions are created
  • a scheduler that triggers a workflow every time there is a new partition


A requirement/use-case is to continuously process partitions of a PartitionedFileSetDataset from within a periodically running MapReduce job, scheduled by a Workflow. This means that each time the Workflow runs, it should process all of the partitions created since the last one it processed, so that it has processed all existing partitions after running. In  
Other possible use-cases include:

  • a worker that looks for new partitions continuously
  • a workflow that runs periodically and consumes all new partitions each time
  • a workflow that catches up with old partitions while new partitions are created
  • a scheduler that triggers a workflow every time there is a new partition

In order to do this, we will be adding an index on creation time of the partitions, as they are added to the dataset, in order to avoid inefficiently scanning all of the partitions each run. Then, one possibility is that the workflow will be responsible for maintaining the end timestamp used when requesting the partitions by time (lastProcessedTimestamp). In the next run of the workflow, it can then simply request from the dataset the partitions that have been created after lastProcessedTimestamp up until the then current time.

However, there is an issue with this idea - a partition could have had a creation time smaller/earlier than lastProcessedTimestamp, but not have been committed/visible at the time the workflow requested partitions up until lastProcessedTimestamp. So, even though the partition became visible after lastProcessedTimestamp, the next scan from lastProcessedTimestamp onwards will not include this partition because the partition's creation time will not match the time range. While this approach is simple, it has the possibility to omit the processing of partitions, and so it is not good enough.

An improved solution for this would be for the workflow consuming the partitions to keep track of two things each time it consumes partitions: the set of in-progress transactions as well as the write (read pointer + 1) of the transaction in which the partitions are consumed. The write (read pointer + 1) will be used as a high water mark from which to start scanning for new partitions the next time partitions are polled. The set of in-progress transactions will indicate points at which new partitions are created, but before the previously mentioned high water mark. Since there may be in-progress transactions after the read pointer, we exclude these in the set of in-progress transactions that we keep track of. Note that all of this requires that the partitions are indexed on the write pointer of the transaction in which it is added, as opposed to the actual creation time.
Then, each time the partition consumer runs and polls for additional partitions, it will do a scan by time starting from the previous transaction's write (read pointer + 1) up until the current timetransaction's (read pointer + 1). It will also have to check if any of the previously in-progress transactions are no longer running and if they correspond to new partitions, by doing lookups or a scan for each of these values on the index table. This ensures that partitions which were added in transactions that committed later are still processed.Andreas: You should mention that this requires storing and indexing the writer pointer of the transaction, instead of the actual creation time. 

Andreas: Also, instead of using the previous transaction's write pointer as the begin of the next, it is safer to use the upper bound of the read pointer (Tephra does not guarantee that read pointer and write version are the same)Andreas: Also, make sure to exclude the current write pointer from the scan. Partitions created in the same transaction will not be visible (which I think is tolerable). If we would include the current write pointer, then there can be a partition that is created in the same transaction, but after the scan. That would not be included this time, but also not next time (because we would scan only after the write pointer). 

...

PartitionConsumerState.java:

Code Block
languagejava
Longlong startVersion;
long[] versionsToCheck;
 
byte[] toBytes();
static PartitionConsumerState fromBytes(byte[]); 

PartitionConsumer.java:

Code Block
languagejava
public Iterator<Partition> consumePartitions();


This PartitionConsumer would be a simple consumer (without persistence persistence) which calls PartitionFileSet's consumerPartitions method with a PartitionConsumerState and keeps track of the previously returned PartitionConsumerState, to pass in the next call. Users of this PartitionConsumer (for instance, a periodically scheduled workflow or a thread in a scheduler) can manage persistence of it, with this approach.
Andreas: Can you mention that this becomes more
This is complicated if there are overlapping runs of the same consumer. For instance, if a workflow runs every minute, a second run can be started before the first is committed. The first run may not have yet saved its partition state and so the second run would consume partitions based off the same PartitionConsumerState that the first workflow ran with.


Example Usage:
Initial implementation will support a non-concurrent, periodically-running MapReduce job:

Code Block
languagejava
@Override
public void beforeSubmit(MapReduceContext context) throws Exception {
  ...
  // get the current state of the PartitionConsumer (user is currently responsible for storing state between runs)
  byte[] state = keyValueTable.read(STATE_KEY);
  PartitionConsumerState initialPartitionConsumerState;
  if (state == null) {
    initialPartitionConsumerState = PartitionConsumerState.FROM_BEGINNING;
  } else {
    initialPartitionConsumerState = PartitionConsumerState.fromBytes(state);
  }

  PartitionedFileSet results = context.getDataset("lines");
  PartitionConsumerResult partitionConsumerResult = results.consumePartitions(initialPartitionConsumerState);
  // keep track of the final state to store in onFinish
  finalPartitionConsumerState = partitionConsumerResult.getPartitionConsumerState();
 
  // set the dataset as the input, with the the specified partitions to process
  Map<String, String> inputArgs = Maps.newHashMap();
  PartitionedFileSetArguments.addPartitions(inputArgs, partitionConsumerResult.getPartitionIterator());
  PartitionedFileSet input = context.getDataset("lines", inputArgs);
  context.setInput("lines", input);
  ...
}

@Override
public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
  if (succeeded) {
    ...
    keyValueTable.write(STATE_KEY, finalPartitionConsumerState.toBytes());
  }
  super.onFinish(succeeded, context);
} 




Related JIRAs:

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-2746

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-2747