Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

MirrorMaker is not much more than a Kafka client that consumes from source topics and writes the same messages to some destination. As such, MirrorMaker does not offer any guarantees about a message from the source being written to the same partition and offset in the destination. The log saver and metrics processor store Kafka offsets per topic partition in HBase, and will need to be modified to handle a cluster failover.

 

To start, we will require that the 'logs.user-v2' and 'metrics' topics have the same number of partitions in both clusters. This will guarantee that a message written to partition X in the master will also be written to the same partition X in the slave. Adding partitions to a topic will need to be handled specially.

  • If metrics and logs need to be preserved, cluster administrators must stop all cdap programs and cdap services, make sure the slave has caught up to the master, then add the same number of partitions to the topic in both master and slave clusters.
  • If metrics and logs do not need to be preserved, cluster administrators can stop MirrorMaker, add the same number of partitions in both master and slave clusters, then start up MirrorMaker using offsets that are after partition change. Note that this only results in data loss if the failover occurs before the messages in Kafka before the partition change are processed by the master.

Log Saver

The log saver services stores two sets of Kafka offsets in the 'log.meta' table. The first uses row prefix 101, and is used by LogMetricsPlugin to track to emit metrics for the number of log messages emitted at each log level. The second uses row prefix 100 and is used by KafkaLogWriterPlugin to track which log messages have been saved to hdfs. Kafka offset is stored in column 'nextOffset', and max event time in 'maxEventTime'.

hbase(main):003:0> scan 'cdap_system:log.meta'
ROW                                         COLUMN+CELL
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x00  column=d:maxEventTime, timestamp=1478732791857000000, value=\x00\x00\x01XKW\xA1\xE7
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x00  column=d:nextOffset, timestamp=1478732791857000000, value=\x00\x00\x00\x00\x00\x00\x13\xC9
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x02  column=d:maxEventTime, timestamp=1478732282540000000, value=\x00\x00\x01XKLW\xED
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x02  column=d:nextOffset, timestamp=1478732282540000000, value=\x00\x00\x00\x00\x00\x00\x0D\x1F
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x04  column=d:maxEventTime, timestamp=1478727911132000000, value=\x00\x00\x01XK\x0D:\x9F
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x04  column=d:nextOffset, timestamp=1478727911132000000, value=\x00\x00\x00\x00\x00\x1B\xBA\xC0
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x05  column=d:maxEventTime, timestamp=1478727731822000000, value=\x00\x00\x01XK\x09\xE2\xD7
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x05  column=d:nextOffset, timestamp=1478727731822000000, value=\x00\x00\x00\x00\x00\x00\x01\xA7
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x06  column=d:maxEventTime, timestamp=1478727552152000000, value=\x00\x00\x01XK\x07\x17\x9F
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x06  column=d:nextOffset, timestamp=1478727552152000000, value=\x00\x00\x00\x00\x00\x00\x00\xC2
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x08  column=d:maxEventTime, timestamp=1478727911144000000, value=\x00\x00\x01XK\x0D9(
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x08  column=d:nextOffset, timestamp=1478727911144000000, value=\x00\x00\x00\x00\x00\x00P\x1E
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x09  column=d:maxEventTime, timestamp=1478723387161000000, value=\x00\x00\x01XJ\xC7\xAF\x09
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x09  column=d:nextOffset, timestamp=1478723387161000000, value=\x00\x00\x00\x00\x00\x00\x00\x06
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x00  column=d:maxEventTime, timestamp=1478732944517000000, value=\x00\x00\x01XKW\xA1\xE7
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x00  column=d:nextOffset, timestamp=1478732944517000000, value=\x00\x00\x00\x00\x00\x00\x13\xC9
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x01  column=d:maxEventTime, timestamp=1478732944555000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x01  column=d:nextOffset, timestamp=1478732944555000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x02  column=d:maxEventTime, timestamp=1478732944572000000, value=\x00\x00\x01XKLW\xED
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x02  column=d:nextOffset, timestamp=1478732944572000000, value=\x00\x00\x00\x00\x00\x00\x0D\x1F
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x03  column=d:maxEventTime, timestamp=1478730708668000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x03  column=d:nextOffset, timestamp=1478730708668000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x04  column=d:maxEventTime, timestamp=1478729459940000000, value=\x00\x00\x01XK\x0E\xEA#
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x04  column=d:nextOffset, timestamp=1478729459940000000, value=\x00\x00\x00\x00\x00\x1D\xD2\x9B
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x05  column=d:maxEventTime, timestamp=1478727913009000000, value=\x00\x00\x01XK\x09\xE2\xD7
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x05  column=d:nextOffset, timestamp=1478727913009000000, value=\x00\x00\x00\x00\x00\x00\x01\xA7
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x06  column=d:maxEventTime, timestamp=1478727913079000000, value=\x00\x00\x01XK\x07\x17\x9F
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x06  column=d:nextOffset, timestamp=1478727913079000000, value=\x00\x00\x00\x00\x00\x00\x00\xC2
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x07  column=d:maxEventTime, timestamp=1478727913140000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x07  column=d:nextOffset, timestamp=1478727913140000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x08  column=d:maxEventTime, timestamp=1478727913228000000, value=\x00\x00\x01XK\x0DLD
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x08  column=d:nextOffset, timestamp=1478727913228000000, value=\x00\x00\x00\x00\x00\x00PH
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x09  column=d:maxEventTime, timestamp=1478727913291000000, value=\x00\x00\x01XJ\xC7\xAF\x09
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x09  column=d:nextOffset, timestamp=1478727913291000000, value=\x00\x00\x00\x00\x00\x00\x00\x06

Log saver only needs to be changed so that on start up, if the offset does not match the event time, it does a binary search for the correct offset.

Metrics Processor

The metrics processor service stores its offsets in the 'metrics.kafka.meta' table. Row key is topic.partition, and the offset is stored under column 'o'. The logic is handled in KafkaConsumerMetaTable.

hbase(main):005:0> scan 'cdap_system:metrics.kafka.meta'
ROW                                         COLUMN+CELL
 metrics.00                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00YQ
 metrics.01                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x06\xF6)
 metrics.02                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00'\xA6
 metrics.03                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x85\xFD
 metrics.04                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x12\x18
 metrics.05                                 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00\x0D\x96
 metrics.06                                 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00\x03\xAA
 metrics.07                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x06\xBE
 metrics.08                                 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00*\xFD
 metrics.09                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x01:\x0D
10 row(s) in 0.0330 seconds

Metrics processor needs to be changed to also store the timestamp for each offset. On startup, if the offset and timestamp don't match, it does a binary search for the correct offset for that timestamp.

Edge Conditions

If there are multiple Kafka messages for the same timestamp, some logs and metrics for that timestamp may be duplicated in the slave. 

  • No labels