Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

Phase 1 of replication is to support a hot-cold setup where CDAP data is replicated from one cluster to another using existing tools for replicating underlying infrastructure.

Goals

Allow manual failover from a hot cluster to a cold cluster.

User Stories 

  • As a cluster administrator, I want to be able to configure CDAP so that all HBase tables created by CDAP are set up to replicate data to another cluster
  • As a cluster administrator, I want to be able to manually stop CDAP in one cluster and start it in another cluster with the exact same state
  • As a cluster administrator, I want to be able to have a way to know when it is safe to start the cold cluster after the hot one has been shut down

Design

CDAP stores state in several systems:

 

HDFS

  • Transaction snapshots
  • Artifacts (jars)
  • Streams
  • FileSet based datasets
  • Program logs

HBase

  • CDAP entity metadata (program specifications, schedules, run history, metrics, etc.)
  • Table based datasets
  • Kafka offsets for metrics and logs
  • Flow queues
  • Messaging system data

Kafka

  • unprocessed metrics
  • unsaved log messages

Hive

  • Explorable CDAP datasets and their partitions

 

For phase 1, much of the responsiblity for data replication falls to the cluster administrator. It is assumed that replication of HDFS, Hive, and Kafka will be handled by the cluster administrator. HDFS is usually done through regularly scheduled distcp jobs, or by using some distro specific tools, such as Cloudera's Backup and Data Recovery (http://www.cloudera.com/documentation/enterprise/latest/topics/cm_bdr_about.html). Kafka can be done using MirrorMaker. Hive can be done by replicating the data (HDFS and/or HBase), and by replication the metastore through whatever replication mechanisms are available to the relational DB backing the metastore. All of this can be setup outside of CDAP.

One thing CDAP needs to ensure is that there are no cluster specific values in any of the metadata. For example, the namenode should not be in any of the system metadata, otherwise things will fail when the data is replicated over to the slave and the slave is started.

HBase DDL

 

Design

HBase DDL will require some hooks in CDAP, because replication must be setup for every table when it is created, and before any data is written to it. Design details are at HBase DDL SPI.

Replication Status

Cluster administrators will require a way to tell when it is safe for a cold cluster to be started up. In other words, they need to be able to tell when all necessary data has been replicated. HBase shell already includes a command that helps:

Code Block
hbase(main):030:0> status 'replication', 'source'
version 1.1.2.2.3.4.7-4
1 live servers
    [hostname]:
       SOURCE: PeerID=1, AgeOfLastShippedOp=29312, SizeOfLogQueue=0, TimeStampsOfLastShippedOp=Thu Nov 10 22:51:55 UTC 2016, Replication Lag=29312

HBase also includes a mapreduce job that can be used to verify replicated data (https://hbase.apache.org/book.html#_verifying_replicated_data).  It must be run on the master cluster.

Code Block
$ HADOOP_CLASSPATH=`hbase classpath` hadoop jar /usr/hdp/current/hbase-master/lib/hbase-server-1.1.2.2.3.4.7-4.jar verifyrep <peer id> <table>
...
	Map-Reduce Framework
		Map input records=1
		Map output records=0
		Input split bytes=103
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=64
		CPU time spent (ms)=1810
		Physical memory (bytes) snapshot=255139840
		Virtual memory (bytes) snapshot=916021248
		Total committed heap usage (bytes)=287309824
	org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters
		BADROWS=1
		CONTENT_DIFFERENT_ROWS=1

Under the HBase counters, you only want to see the GOODROWS counter, and not BADROWS or CONTENT_DIFFERENT_ROWS.

Design details are at Replication Status Tool

Kafka offset mismatches

MirrorMaker is not much more than a Kafka client that consumes from source topics and writes the same messages to some destination. As such, MirrorMaker does not offer any guarantees about a message from the source being written to the same partition and offset in the destination. The log saver and metrics processor store Kafka offsets per topic partition in HBase, and will need to be modified to handle a cluster failover.

 

To start, we will require that the 'logs.user-v2' and 'metrics' topics have the same number of partitions in both clusters. This will guarantee that a message written to partition X in the master will also be written to the same partition X in the slave. Adding partitions to a topic will need to be handled specially.

  • If metrics and logs need to be preserved, cluster administrators must stop all cdap programs and cdap services, make sure the slave has caught up to the master, then add the same number of partitions to the topic in both master and slave clusters.
  • If metrics and logs do not need to be preserved, cluster administrators can stop MirrorMaker, add the same number of partitions in both master and slave clusters, then start up MirrorMaker using offsets that are after partition change. Note that this only results in data loss if the failover occurs before the messages in Kafka before the partition change are processed by the master.

Log Saver

The log saver services stores two sets of Kafka offsets in the 'log.meta' table. The first uses row prefix 101, and is used by LogMetricsPlugin to track to emit metrics for the number of log messages emitted at each log level. The second uses row prefix 100 and is used by KafkaLogWriterPlugin to track which log messages have been saved to hdfs. Kafka offset is stored in column 'nextOffset', and max event time in 'maxEventTime'.

Code Block
hbase(main):003:0> scan 'cdap_system:log.meta'
ROW                                         COLUMN+CELL
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x00  column=d:maxEventTime, timestamp=1478732791857000000, value=\x00\x00\x01XKW\xA1\xE7
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x00  column=d:nextOffset, timestamp=1478732791857000000, value=\x00\x00\x00\x00\x00\x00\x13\xC9
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x02  column=d:maxEventTime, timestamp=1478732282540000000, value=\x00\x00\x01XKLW\xED
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x02  column=d:nextOffset, timestamp=1478732282540000000, value=\x00\x00\x00\x00\x00\x00\x0D\x1F
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x04  column=d:maxEventTime, timestamp=1478727911132000000, value=\x00\x00\x01XK\x0D:\x9F
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x04  column=d:nextOffset, timestamp=1478727911132000000, value=\x00\x00\x00\x00\x00\x1B\xBA\xC0
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x05  column=d:maxEventTime, timestamp=1478727731822000000, value=\x00\x00\x01XK\x09\xE2\xD7
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x05  column=d:nextOffset, timestamp=1478727731822000000, value=\x00\x00\x00\x00\x00\x00\x01\xA7
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x06  column=d:maxEventTime, timestamp=1478727552152000000, value=\x00\x00\x01XK\x07\x17\x9F
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x06  column=d:nextOffset, timestamp=1478727552152000000, value=\x00\x00\x00\x00\x00\x00\x00\xC2
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x08  column=d:maxEventTime, timestamp=1478727911144000000, value=\x00\x00\x01XK\x0D9(
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x08  column=d:nextOffset, timestamp=1478727911144000000, value=\x00\x00\x00\x00\x00\x00P\x1E
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x09  column=d:maxEventTime, timestamp=1478723387161000000, value=\x00\x00\x01XJ\xC7\xAF\x09
 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x09  column=d:nextOffset, timestamp=1478723387161000000, value=\x00\x00\x00\x00\x00\x00\x00\x06
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x00  column=d:maxEventTime, timestamp=1478732944517000000, value=\x00\x00\x01XKW\xA1\xE7
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x00  column=d:nextOffset, timestamp=1478732944517000000, value=\x00\x00\x00\x00\x00\x00\x13\xC9
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x01  column=d:maxEventTime, timestamp=1478732944555000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x01  column=d:nextOffset, timestamp=1478732944555000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x02  column=d:maxEventTime, timestamp=1478732944572000000, value=\x00\x00\x01XKLW\xED
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x02  column=d:nextOffset, timestamp=1478732944572000000, value=\x00\x00\x00\x00\x00\x00\x0D\x1F
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x03  column=d:maxEventTime, timestamp=1478730708668000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x03  column=d:nextOffset, timestamp=1478730708668000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x04  column=d:maxEventTime, timestamp=1478729459940000000, value=\x00\x00\x01XK\x0E\xEA#
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x04  column=d:nextOffset, timestamp=1478729459940000000, value=\x00\x00\x00\x00\x00\x1D\xD2\x9B
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x05  column=d:maxEventTime, timestamp=1478727913009000000, value=\x00\x00\x01XK\x09\xE2\xD7
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x05  column=d:nextOffset, timestamp=1478727913009000000, value=\x00\x00\x00\x00\x00\x00\x01\xA7
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x06  column=d:maxEventTime, timestamp=1478727913079000000, value=\x00\x00\x01XK\x07\x17\x9F
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x06  column=d:nextOffset, timestamp=1478727913079000000, value=\x00\x00\x00\x00\x00\x00\x00\xC2
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x07  column=d:maxEventTime, timestamp=1478727913140000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x07  column=d:nextOffset, timestamp=1478727913140000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x08  column=d:maxEventTime, timestamp=1478727913228000000, value=\x00\x00\x01XK\x0DLD
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x08  column=d:nextOffset, timestamp=1478727913228000000, value=\x00\x00\x00\x00\x00\x00PH
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x09  column=d:maxEventTime, timestamp=1478727913291000000, value=\x00\x00\x01XJ\xC7\xAF\x09
 \x00\x00\x00elogs.user-v2\x00\x00\x00\x09  column=d:nextOffset, timestamp=1478727913291000000, value=\x00\x00\x00\x00\x00\x00\x00\x06

Log saver only needs to be changed so that on start up, if the offset does not match the event time, it does a binary search for the correct offset.

Metrics Processor

The metrics processor service stores its offsets in the 'metrics.kafka.meta' table. Row key is topic.partition, and the offset is stored under column 'o'. The logic is handled in KafkaConsumerMetaTable.

Code Block
hbase(main):005:0> scan 'cdap_system:metrics.kafka.meta'
ROW                                         COLUMN+CELL
 metrics.00                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00YQ
 metrics.01                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x06\xF6)
 metrics.02                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00'\xA6
 metrics.03                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x85\xFD
 metrics.04                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x12\x18
 metrics.05                                 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00\x0D\x96
 metrics.06                                 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00\x03\xAA
 metrics.07                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x06\xBE
 metrics.08                                 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00*\xFD
 metrics.09                                 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x01:\x0D
10 row(s) in 0.0330 seconds

Metrics processor needs to be changed to also store the timestamp for each offset. On startup, if the offset and timestamp don't match, it does a binary search for the correct offset for that timestamp.

Edge Conditions

If there are multiple Kafka messages for the same timestamp, some logs and metrics for that timestamp may be duplicated in the slave. 

Approach

Approach #1

Approach #2

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)Design details are at Resolving Kafka Offset Mismatches.

API changes

New Programmatic APIs

See HBase DDL SPI

Deprecated Programmatic APIs

No programmatic APIs will be deprecated 

New REST APIs

No new REST APIs will be added to the platform. There may be new REST APIs used by an external service used to handle table DDL and replication.

Deprecated REST API

No REST APIs will be deprecated

CLI Impact or Changes

  • No Changes

UI Impact or Changes

  • No Changes

Security Impact 

What's the impact on Authorization and how does the design take care of this aspectCluster administrators are responsible for setting up replication, but we should understand what is required from a security perspective to replicate hdfs and hbase data.

Impact on Infrastructure Outages 

With replication, there is now another cluster that is required. If the cold cluster suffers an outage, replication will eventually catch up once service is restored. This assumes the outage lasts is for a shorter duration of time than how long HBase keeps events in its WAL.

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release 4.1.

Phase 1 work is scheduled for release 4.1.0.

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work