Resolving Kafka Offset Mismatches
MirrorMaker is not much more than a Kafka client that consumes from source topics and writes the same messages to some destination. As such, MirrorMaker does not offer any guarantees about a message from the source being written to the same partition and offset in the destination. The log saver and metrics processor store Kafka offsets per topic partition in HBase, and will need to be modified to handle a cluster failover.
CDAP System Services
To start, we will require that the 'logs.user-v2' and 'metrics' topics have the same number of partitions in both clusters. This will guarantee that a message written to partition X in the master will also be written to the same partition X in the slave. Adding partitions to a topic will need to be handled specially.
- If metrics and logs need to be preserved, cluster administrators must stop all cdap programs and cdap services, make sure the slave has caught up to the master, then add the same number of partitions to the topic in both master and slave clusters.
- If metrics and logs do not need to be preserved, cluster administrators can stop MirrorMaker, add the same number of partitions in both master and slave clusters, then start up MirrorMaker using offsets that are after partition change. Note that this only results in data loss if the failover occurs before the messages in Kafka before the partition change are processed by the master.
Log Saver
The log saver services stores two sets of Kafka offsets in the 'log.meta' table. The first uses row prefix 101, and is used by LogMetricsPlugin to track to emit metrics for the number of log messages emitted at each log level. The second uses row prefix 100 and is used by KafkaLogWriterPlugin to track which log messages have been saved to hdfs. Kafka offset is stored in column 'nextOffset', and max event time in 'maxEventTime'.
hbase(main):003:0> scan 'cdap_system:log.meta' ROW COLUMN+CELL \x00\x00\x00dlogs.user-v2\x00\x00\x00\x00 column=d:maxEventTime, timestamp=1478732791857000000, value=\x00\x00\x01XKW\xA1\xE7 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x00 column=d:nextOffset, timestamp=1478732791857000000, value=\x00\x00\x00\x00\x00\x00\x13\xC9 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x02 column=d:maxEventTime, timestamp=1478732282540000000, value=\x00\x00\x01XKLW\xED \x00\x00\x00dlogs.user-v2\x00\x00\x00\x02 column=d:nextOffset, timestamp=1478732282540000000, value=\x00\x00\x00\x00\x00\x00\x0D\x1F \x00\x00\x00dlogs.user-v2\x00\x00\x00\x04 column=d:maxEventTime, timestamp=1478727911132000000, value=\x00\x00\x01XK\x0D:\x9F \x00\x00\x00dlogs.user-v2\x00\x00\x00\x04 column=d:nextOffset, timestamp=1478727911132000000, value=\x00\x00\x00\x00\x00\x1B\xBA\xC0 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x05 column=d:maxEventTime, timestamp=1478727731822000000, value=\x00\x00\x01XK\x09\xE2\xD7 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x05 column=d:nextOffset, timestamp=1478727731822000000, value=\x00\x00\x00\x00\x00\x00\x01\xA7 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x06 column=d:maxEventTime, timestamp=1478727552152000000, value=\x00\x00\x01XK\x07\x17\x9F \x00\x00\x00dlogs.user-v2\x00\x00\x00\x06 column=d:nextOffset, timestamp=1478727552152000000, value=\x00\x00\x00\x00\x00\x00\x00\xC2 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x08 column=d:maxEventTime, timestamp=1478727911144000000, value=\x00\x00\x01XK\x0D9( \x00\x00\x00dlogs.user-v2\x00\x00\x00\x08 column=d:nextOffset, timestamp=1478727911144000000, value=\x00\x00\x00\x00\x00\x00P\x1E \x00\x00\x00dlogs.user-v2\x00\x00\x00\x09 column=d:maxEventTime, timestamp=1478723387161000000, value=\x00\x00\x01XJ\xC7\xAF\x09 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x09 column=d:nextOffset, timestamp=1478723387161000000, value=\x00\x00\x00\x00\x00\x00\x00\x06 \x00\x00\x00elogs.user-v2\x00\x00\x00\x00 column=d:maxEventTime, timestamp=1478732944517000000, value=\x00\x00\x01XKW\xA1\xE7 \x00\x00\x00elogs.user-v2\x00\x00\x00\x00 column=d:nextOffset, timestamp=1478732944517000000, value=\x00\x00\x00\x00\x00\x00\x13\xC9 \x00\x00\x00elogs.user-v2\x00\x00\x00\x01 column=d:maxEventTime, timestamp=1478732944555000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x01 column=d:nextOffset, timestamp=1478732944555000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x02 column=d:maxEventTime, timestamp=1478732944572000000, value=\x00\x00\x01XKLW\xED \x00\x00\x00elogs.user-v2\x00\x00\x00\x02 column=d:nextOffset, timestamp=1478732944572000000, value=\x00\x00\x00\x00\x00\x00\x0D\x1F \x00\x00\x00elogs.user-v2\x00\x00\x00\x03 column=d:maxEventTime, timestamp=1478730708668000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x03 column=d:nextOffset, timestamp=1478730708668000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x04 column=d:maxEventTime, timestamp=1478729459940000000, value=\x00\x00\x01XK\x0E\xEA# \x00\x00\x00elogs.user-v2\x00\x00\x00\x04 column=d:nextOffset, timestamp=1478729459940000000, value=\x00\x00\x00\x00\x00\x1D\xD2\x9B \x00\x00\x00elogs.user-v2\x00\x00\x00\x05 column=d:maxEventTime, timestamp=1478727913009000000, value=\x00\x00\x01XK\x09\xE2\xD7 \x00\x00\x00elogs.user-v2\x00\x00\x00\x05 column=d:nextOffset, timestamp=1478727913009000000, value=\x00\x00\x00\x00\x00\x00\x01\xA7 \x00\x00\x00elogs.user-v2\x00\x00\x00\x06 column=d:maxEventTime, timestamp=1478727913079000000, value=\x00\x00\x01XK\x07\x17\x9F \x00\x00\x00elogs.user-v2\x00\x00\x00\x06 column=d:nextOffset, timestamp=1478727913079000000, value=\x00\x00\x00\x00\x00\x00\x00\xC2 \x00\x00\x00elogs.user-v2\x00\x00\x00\x07 column=d:maxEventTime, timestamp=1478727913140000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x07 column=d:nextOffset, timestamp=1478727913140000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x08 column=d:maxEventTime, timestamp=1478727913228000000, value=\x00\x00\x01XK\x0DLD \x00\x00\x00elogs.user-v2\x00\x00\x00\x08 column=d:nextOffset, timestamp=1478727913228000000, value=\x00\x00\x00\x00\x00\x00PH \x00\x00\x00elogs.user-v2\x00\x00\x00\x09 column=d:maxEventTime, timestamp=1478727913291000000, value=\x00\x00\x01XJ\xC7\xAF\x09 \x00\x00\x00elogs.user-v2\x00\x00\x00\x09 column=d:nextOffset, timestamp=1478727913291000000, value=\x00\x00\x00\x00\x00\x00\x00\x06
Log saver only needs to be changed so that on start up, if the offset does not match the event time, it does a binary search for the correct offset. This is possible because log messages in Kafka contain the timestamp of the log message.
Metrics Processor
The metrics processor service stores its offsets in the 'metrics.kafka.meta' table. Row key is topic.partition, and the offset is stored under column 'o'. The logic is handled in KafkaConsumerMetaTable.
hbase(main):005:0> scan 'cdap_system:metrics.kafka.meta' ROW COLUMN+CELL metrics.00 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00YQ metrics.01 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x06\xF6) metrics.02 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00'\xA6 metrics.03 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x85\xFD metrics.04 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x12\x18 metrics.05 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00\x0D\x96 metrics.06 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00\x03\xAA metrics.07 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x06\xBE metrics.08 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00*\xFD metrics.09 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x01:\x0D 10 row(s) in 0.0330 seconds
Metrics processor needs to be changed to also store the timestamp for each offset. On startup, if the offset and timestamp don't match, it does a binary search for the correct offset for that timestamp. This is possible because metrics messages in Kafka contain the timestamp of the metric.
Edge Conditions
If there are multiple Kafka messages for the same timestamp, some logs and metrics for that timestamp may be duplicated in the slave.Â
Kafka Flowlet library
There is a library that can be used to write a flowlet that reads from Kafka (https://github.com/caskdata/cdap-packs/tree/develop/cdap-kafka-pack).
If a developer is using that library, it is up to the developer to handle the scenario when an offset may be pointing to different data after a failover. This is because the logic really requires application specific knowledge. For example, some applications may not be able to self correct if the Kafka messages don't contain some sort of monotonically increasing field, some applications may decide it is ok to re-process Kafka messages, and some may require some manual intervention to set the new offset through runtime arguments.
In any case, the developer has to override the method in the flowlet that gets the start offset, and probably also the method that saves the read offsets. For example, it could look like:
@Override protected void saveReadOffsets(Map<TopicPartition, Long> offsets) { KeyValueTable offsetStore = getOffsetStore(); if (offsetStore == null) { return; } for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) { TopicPartition topicPartition = entry.getKey(); Long kafkaOffset = entry.getValue(); // get the message id at this offset (assumes messages in Kafka have a monotonically increasing id) long messageId = getMessageId(topicPartition, kafkaOffset); // store both offset and message id offsetStore.write(getStoreKey(topicPartition), Bytes.add(Bytes.toBytes(kafkaOffset), Bytes.toBytes(messageId))); } } Â @Override protected Long getBeginOffset(TopicPartition topicPartition) { KeyValueTable offsetStore = getOffsetStore(); if (offsetStore == null) { return getDefaultOffset(topicPartition); } // The value is simply a 8-bytes long representing the offset byte[] value = offsetStore.read(getStoreKey(topicPartition)); if (value == null || value.length != Bytes.SIZEOF_LONG) { return getDefaultOffset(topicPartition); } Â long storedKafkaOffset = Bytes.toLong(value); long storedMessageId = Bytes.toLong(value, 8); // can happen after a cluster failover if (getMessageId(topicPartition, storedKafkaOffset) != storedMessageId) { return findKafkaOffset(storedMessageId); } Â return storedKafkaOffset; }
In this example, both the kafka offset and some message id corresponding to that offset are stored in the offset table. On program start, the message id from the offset store is compared to the actual message id in Kafka, and corrections are made if they do not match. This contrasts with the base class implementation (https://github.com/caskdata/cdap-packs/blob/develop/cdap-kafka-pack/cdap-kafka-flow/cdap-kafka-flow-compat-0.8/src/main/java/co/cask/cdap/kafka/flow/Kafka08ConsumerFlowlet.java), that just stores the kafka offset in the offset table and always assumes that it is correct.
Â