...
Code Block |
---|
|
long startVersion;
long[] versionsToCheck;
byte[] toBytes();
static PartitionConsumerState fromBytes(byte[]); |
PartitionConsumer.java:
Code Block |
---|
|
public Iterator<Partition> consumePartitions(); |
This PartitionConsumer would be a simple consumer (without persistence persistence) which calls PartitionFileSet's consumerPartitions method with a PartitionConsumerState and keeps track of the previously returned PartitionConsumerState, to pass in the next call. Users of this PartitionConsumer (for instance, a periodically scheduled workflow or a thread in a scheduler) can manage persistence of it, with this approach.
This is complicated if there are overlapping runs of the same consumer. For instance, if a workflow runs every minute, a second run can be started before the first is committed. The first run may not have yet saved its partition state and so the second run would consume partitions based off the same PartitionConsumerState that the first workflow ran with.
Example Usage:
Initial implementation will support a non-concurrent, periodically-running MapReduce job:
Code Block |
---|
|
@Override
public void beforeSubmit(MapReduceContext context) throws Exception {
...
// get the current state of the PartitionConsumer (user is currently responsible for storing state between runs)
byte[] state = keyValueTable.read(STATE_KEY);
PartitionConsumerState initialPartitionConsumerState;
if (state == null) {
initialPartitionConsumerState = PartitionConsumerState.FROM_BEGINNING;
} else {
initialPartitionConsumerState = PartitionConsumerState.fromBytes(state);
}
PartitionedFileSet results = context.getDataset("lines");
PartitionConsumerResult partitionConsumerResult = results.consumePartitions(initialPartitionConsumerState);
// keep track of the final state to store in onFinish
finalPartitionConsumerState = partitionConsumerResult.getPartitionConsumerState();
// set the dataset as the input, with the the specified partitions to process
Map<String, String> inputArgs = Maps.newHashMap();
PartitionedFileSetArguments.addPartitions(inputArgs, partitionConsumerResult.getPartitionIterator());
PartitionedFileSet input = context.getDataset("lines", inputArgs);
context.setInput("lines", input);
...
}
@Override
public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
if (succeeded) {
...
keyValueTable.write(STATE_KEY, finalPartitionConsumerState.toBytes());
}
super.onFinish(succeeded, context);
} |
Related JIRAs:
Jira Legacy |
---|
server | Cask Community Issue Tracker |
---|
serverId | 45b48dee-c8d6-34f0-9990-e6367dc2fe4b |
---|
key | CDAP-2746 |
---|
|
Jira Legacy |
---|
server | Cask Community Issue Tracker |
---|
serverId | 45b48dee-c8d6-34f0-9990-e6367dc2fe4b |
---|
key | CDAP-2747 |
---|
|