Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

An improved solution for this would be for the workflow consuming the partitions to keep track of two things each time it consumes partitions: the set of in-progress transactions as well as the write pointer of the transaction in which the partitions are consumed. The write pointer will be used as a high water mark from which to start scanning for new partitions the next time partitions are polled. The set of in-progress transactions will indicate points at which new partitions are created, but before the previously mentioned high water mark.
Then, each time the partition consumer runs and polls for additional partitions, it will do a scan by time starting from the previous transaction's write pointer up until the current time. It will also have to check if any of the previously in-progress transactions are no longer running and if they correspond to new partitions, by doing lookups or a scan on the index table. This ensures that partitions which were added in transactions that committed later are still processed.

Andreas: You should mention that this requires storing and indexing the writer pointer of the transaction, instead of the actual creation time. 

Andreas: Also, instead of using the previous transaction's write pointer as the begin of the next, it is safer to use the upper bound of the read pointer (Tephra does not guarantee that read pointer and write version are the same)

Andreas: Also, make sure to exclude the current write pointer from the scan. Partitions created in the same transaction will not be visible (which I think is tolerable). If we would include the current write pointer, then there can be a partition that is created in the same transaction, but after the scan. That would not be included this time, but also not next time (because we would scan only after the write pointer). 

Note that these are just the APIs defined on the PartitionedFileSet class and a simple consumer.

APIs:
PartitionFileSet.java:

...


This PartitionConsumer would be a simple consumer (without persistence persistence) which calls PartitionFileSet's consumerPartitions method with a PartitionConsumerState and keeps track of the previously returned PartitionConsumerState, to pass in the next call. Users of this PartitionConsumer (for instance, a periodically scheduled workflow or a thread in a scheduler) can manage persistence of it, with this approach.

Andreas: Can you mention that this becomes more complicated if there are overlapping runs of the same consumer.


Related JIRAs:

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-2746

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-2747