Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
Phase 1 of replication is to support a hot-cold setup where CDAP data is replicated from one cluster to another using existing tools for replicating underlying infrastructure.
Goals
Allow manual failover from a hot cluster to a cold cluster.
User Stories
- As a cluster administrator, I want to be able to configure CDAP so that all HBase tables created by CDAP are set up to replicate data to another cluster
- As a cluster administrator, I want to be able to manually stop CDAP in one cluster and start it in another cluster with the exact same state
- As a cluster administrator, I want to be able to have a way to know when it is safe to start the cold cluster after the hot one has been shut down
Design
CDAP stores state in several systems:
HDFS
- Transaction snapshots
- Artifacts (jars)
- Streams
- FileSet based datasets
- Program logs
HBase
- CDAP entity metadata (program specifications, schedules, run history, metrics, etc.)
- Table based datasets
- Kafka offsets for metrics and logs
- Flow queues
- Messaging system data
Kafka
- unprocessed metrics
- unsaved log messages
Hive
- Explorable CDAP datasets and their partitions
For phase 1, much of the responsiblity for data replication falls to the cluster administrator. It is assumed that replication of HDFS, Hive, and Kafka will be handled by the cluster administrator. HDFS is usually done through regularly scheduled distcp jobs, or by using some distro specific tools, such as Cloudera's Backup and Data Recovery (http://www.cloudera.com/documentation/enterprise/latest/topics/cm_bdr_about.html). Kafka can be done using MirrorMaker. Hive can be done by replicating the data (HDFS and/or HBase), and by replication the metastore through whatever replication mechanisms are available to the relational DB backing the metastore. All of this can be setup outside of CDAP.
HBase DDL
HBase, however, will require some hooks in CDAP, because replication must be setup for every table when it is created, and before any data is written to it. CDAP will define an interface to create, modify, and delete HBase tables. Instead of just creating a table in the local HBase instance, we need to create a table in both the master and slave instances and set up replication from the master to the slave. We can do this by introducing an SPI for HBase DDL operations, where the default implementation is the current single cluster implementation, and users can plug in their own implementation that creates tables and sets up replication as needed.
Java SPI
Code Block |
---|
/** * Executes HBase DDL operations. */ public interface HBaseDDLExecutor { /** * Create the specified namespace if it does not exist. * * @param name the namespace to create * @throws IOException if a remote or network exception occurs */ void createNamespaceIfNotExists(String name) throws IOException; /** * Delete the specified namespace if it exists. * * @param name the namespace to delete * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if there are tables in the namespace */ void deleteNamespaceIfExists(String name) throws IOException; /** * Create the specified table if it does not exist. * * @param descriptor the descriptor for the table to create * @param splitKeys * @throws IOException if a remote or network exception occurs * @throws NotFoundException if the namespace for the specified table does not exist */ void createTableIfNotExists(HTableDescriptor descriptor, byte [][] splitKeys) throws IOException; /** * Enable the specified table. * * @param name the table to enable * @throws IOException if a remote or network exception occurs * @throws NotFoundException if the specified table does not exist */ void enableTable(TableName name) throws IOException; /** * Disable the specified table. * * @param name the table to disable * @throws IOException if a remote or network exception occurs * @throws NotFoundException if the specified table does not exist */ void disableTable(TableName name) throws IOException; /** * Modify the specified table. * * @param name the table to modify * @param descriptor the descriptor for the table * @throws IOException if a remote or network exception occurs * @throws NotFoundException if the specified table does not exist */ void modifyTable(TableName name, HTableDescriptor descriptor) throws IOException; /** * Truncate the specified table. * * @param name the table to truncate * @throws IOException if a remote or network exception occurs * @throws NotFoundException if the specified table does not exist */ void truncateTable(TableName name) throws IOException; /** * Delete the table if it exists. * * @param name the table to delete * @throws IOException if a remote or network exception occurs * @throws NotFoundException if the namespace for the specified table does not exist */ void deleteTableIfExists(TableName name) throws IOException; } |
The default implementation will simply use the existing HBaseTableUtil. There can be another implementation that makes REST calls for each method, leaving actual HBase operations and auth up to an external service. For example, an analagous RESTful API could be:
Method | Path | Request Body | Description |
---|---|---|---|
PUT | /namespaces/<namespace> | create namespace if it doesn't exist. No-op if it already exists. | |
PUT | /namespaces/<namespace>/tables/<table> | HTableDescriptor contents, split keys | create table if it doesn't exist. No-op if it already exists. |
PUT | /namespaces/<namespace>/tables/<table>/properties | HTableDescriptor contents | modify an existing table. |
POST | /namespaces/<namespace>/tables/<table>/enable | enable an existing table. | |
POST | /namespaces/<namespace>/tables/<table>/disable | disable an existing table. | |
POST | /namespaces/<namespace>/tables/<table>/truncate | truncate an existing table. | |
DELETE | /namespaces/<namespace> | delete a namespace. | |
DELETE | /namespaces/<namespace>/tables/<table> | delete a table. |
where the user is passed as request headers. Each endpoint must be idempotent, as there could be a failure in one or more HBase instances, but a success in another instance. In such cases, the client will retry the request, so the endpoint must be idempotent. A 200 should only be returned if the operation succeeded in all HBase instances.
Coprocessors
One difficulty will be in handling the coprocessor jar. Today, when a Table is being created, its coprocessor jar is also built and placed on HDFS.
One way to handle this is to send the jar contents as part of the table creation request (Base64 encoded for example). However, this would be an issue if the master and slave clusters are running different versions of HBase, which require different coprocessors. In order for it to work, we would have to somehow consolidate all coprocessors into a single one that works for all supported HBase versions. This type of thing may be required to support resiliency against rolling HBase upgrades anyway.
If that is not possible, an alternative is for each CDAP instance to pre-build these coprocessor jars and place them on HDFS in pre-determined locations. Instead of building the jar on demand, we simply use the correct location. For example, there may be a tool that users have to run that will build coprocessors compatible with the cluster's hbase version and put them on hdfs.
Replication Status
Cluster administrators will require a way to tell when it is safe for a cold cluster to be started up. In other words, they need to be able to tell when all necessary data has been replicated. HBase shell already includes a command that helps:
Code Block |
---|
hbase(main):030:0> status 'replication', 'source' version 1.1.2.2.3.4.7-4 1 live servers [hostname]: SOURCE: PeerID=1, AgeOfLastShippedOp=29312, SizeOfLogQueue=0, TimeStampsOfLastShippedOp=Thu Nov 10 22:51:55 UTC 2016, Replication Lag=29312 |
HBase also includes a mapreduce job that can be used to verify replicated data (https://hbase.apache.org/book.html#_verifying_replicated_data). It must be run on the master cluster.
Code Block |
---|
$ HADOOP_CLASSPATH=`hbase classpath` hadoop jar /usr/hdp/current/hbase-master/lib/hbase-server-1.1.2.2.3.4.7-4.jar verifyrep <peer id> <table> ... Map-Reduce Framework Map input records=1 Map output records=0 Input split bytes=103 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=64 CPU time spent (ms)=1810 Physical memory (bytes) snapshot=255139840 Virtual memory (bytes) snapshot=916021248 Total committed heap usage (bytes)=287309824 org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters BADROWS=1 CONTENT_DIFFERENT_ROWS=1 |
Under the HBase counters, you only want to see the GOODROWS counter, and not BADROWS or CONTENT_DIFFERENT_ROWS.
Kafka offset mismatches
MirrorMaker is not much more than a Kafka client that consumes from source topics and writes the same messages to some destination. As such, MirrorMaker does not offer any guarantees about a message from the source being written to the same partition and offset in the destination. The log saver and metrics processor store Kafka offsets per topic partition in HBase, and will need to be modified to handle a cluster failover.
To start, we will require that the 'logs.user-v2' and 'metrics' topics have the same number of partitions in both clusters. This will guarantee that a message written to partition X in the master will also be written to the same partition X in the slave. Adding partitions to a topic will need to be handled specially.
- If metrics and logs need to be preserved, cluster administrators must stop all cdap programs and cdap services, make sure the slave has caught up to the master, then add the same number of partitions to the topic in both master and slave clusters.
- If metrics and logs do not need to be preserved, cluster administrators can stop MirrorMaker, add the same number of partitions in both master and slave clusters, then start up MirrorMaker using offsets that are after partition change. Note that this only results in data loss if the failover occurs before the messages in Kafka before the partition change are processed by the master.
Log Saver
The log saver services stores two sets of Kafka offsets in the 'log.meta' table. The first uses row prefix 101, and is used by LogMetricsPlugin to track to emit metrics for the number of log messages emitted at each log level. The second uses row prefix 100 and is used by KafkaLogWriterPlugin to track which log messages have been saved to hdfs. Kafka offset is stored in column 'nextOffset', and max event time in 'maxEventTime'.
Code Block |
---|
hbase(main):003:0> scan 'cdap_system:log.meta' ROW COLUMN+CELL \x00\x00\x00dlogs.user-v2\x00\x00\x00\x00 column=d:maxEventTime, timestamp=1478732791857000000, value=\x00\x00\x01XKW\xA1\xE7 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x00 column=d:nextOffset, timestamp=1478732791857000000, value=\x00\x00\x00\x00\x00\x00\x13\xC9 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x02 column=d:maxEventTime, timestamp=1478732282540000000, value=\x00\x00\x01XKLW\xED \x00\x00\x00dlogs.user-v2\x00\x00\x00\x02 column=d:nextOffset, timestamp=1478732282540000000, value=\x00\x00\x00\x00\x00\x00\x0D\x1F \x00\x00\x00dlogs.user-v2\x00\x00\x00\x04 column=d:maxEventTime, timestamp=1478727911132000000, value=\x00\x00\x01XK\x0D:\x9F \x00\x00\x00dlogs.user-v2\x00\x00\x00\x04 column=d:nextOffset, timestamp=1478727911132000000, value=\x00\x00\x00\x00\x00\x1B\xBA\xC0 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x05 column=d:maxEventTime, timestamp=1478727731822000000, value=\x00\x00\x01XK\x09\xE2\xD7 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x05 column=d:nextOffset, timestamp=1478727731822000000, value=\x00\x00\x00\x00\x00\x00\x01\xA7 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x06 column=d:maxEventTime, timestamp=1478727552152000000, value=\x00\x00\x01XK\x07\x17\x9F \x00\x00\x00dlogs.user-v2\x00\x00\x00\x06 column=d:nextOffset, timestamp=1478727552152000000, value=\x00\x00\x00\x00\x00\x00\x00\xC2 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x08 column=d:maxEventTime, timestamp=1478727911144000000, value=\x00\x00\x01XK\x0D9( \x00\x00\x00dlogs.user-v2\x00\x00\x00\x08 column=d:nextOffset, timestamp=1478727911144000000, value=\x00\x00\x00\x00\x00\x00P\x1E \x00\x00\x00dlogs.user-v2\x00\x00\x00\x09 column=d:maxEventTime, timestamp=1478723387161000000, value=\x00\x00\x01XJ\xC7\xAF\x09 \x00\x00\x00dlogs.user-v2\x00\x00\x00\x09 column=d:nextOffset, timestamp=1478723387161000000, value=\x00\x00\x00\x00\x00\x00\x00\x06 \x00\x00\x00elogs.user-v2\x00\x00\x00\x00 column=d:maxEventTime, timestamp=1478732944517000000, value=\x00\x00\x01XKW\xA1\xE7 \x00\x00\x00elogs.user-v2\x00\x00\x00\x00 column=d:nextOffset, timestamp=1478732944517000000, value=\x00\x00\x00\x00\x00\x00\x13\xC9 \x00\x00\x00elogs.user-v2\x00\x00\x00\x01 column=d:maxEventTime, timestamp=1478732944555000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x01 column=d:nextOffset, timestamp=1478732944555000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x02 column=d:maxEventTime, timestamp=1478732944572000000, value=\x00\x00\x01XKLW\xED \x00\x00\x00elogs.user-v2\x00\x00\x00\x02 column=d:nextOffset, timestamp=1478732944572000000, value=\x00\x00\x00\x00\x00\x00\x0D\x1F \x00\x00\x00elogs.user-v2\x00\x00\x00\x03 column=d:maxEventTime, timestamp=1478730708668000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x03 column=d:nextOffset, timestamp=1478730708668000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x04 column=d:maxEventTime, timestamp=1478729459940000000, value=\x00\x00\x01XK\x0E\xEA# \x00\x00\x00elogs.user-v2\x00\x00\x00\x04 column=d:nextOffset, timestamp=1478729459940000000, value=\x00\x00\x00\x00\x00\x1D\xD2\x9B \x00\x00\x00elogs.user-v2\x00\x00\x00\x05 column=d:maxEventTime, timestamp=1478727913009000000, value=\x00\x00\x01XK\x09\xE2\xD7 \x00\x00\x00elogs.user-v2\x00\x00\x00\x05 column=d:nextOffset, timestamp=1478727913009000000, value=\x00\x00\x00\x00\x00\x00\x01\xA7 \x00\x00\x00elogs.user-v2\x00\x00\x00\x06 column=d:maxEventTime, timestamp=1478727913079000000, value=\x00\x00\x01XK\x07\x17\x9F \x00\x00\x00elogs.user-v2\x00\x00\x00\x06 column=d:nextOffset, timestamp=1478727913079000000, value=\x00\x00\x00\x00\x00\x00\x00\xC2 \x00\x00\x00elogs.user-v2\x00\x00\x00\x07 column=d:maxEventTime, timestamp=1478727913140000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x07 column=d:nextOffset, timestamp=1478727913140000000, value=\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF \x00\x00\x00elogs.user-v2\x00\x00\x00\x08 column=d:maxEventTime, timestamp=1478727913228000000, value=\x00\x00\x01XK\x0DLD \x00\x00\x00elogs.user-v2\x00\x00\x00\x08 column=d:nextOffset, timestamp=1478727913228000000, value=\x00\x00\x00\x00\x00\x00PH \x00\x00\x00elogs.user-v2\x00\x00\x00\x09 column=d:maxEventTime, timestamp=1478727913291000000, value=\x00\x00\x01XJ\xC7\xAF\x09 \x00\x00\x00elogs.user-v2\x00\x00\x00\x09 column=d:nextOffset, timestamp=1478727913291000000, value=\x00\x00\x00\x00\x00\x00\x00\x06 |
Log saver only needs to be changed so that on start up, if the offset does not match the event time, it does a binary search for the correct offset.
Metrics Processor
The metrics processor service stores its offsets in the 'metrics.kafka.meta' table. Row key is topic.partition, and the offset is stored under column 'o'. The logic is handled in KafkaConsumerMetaTable.
Code Block |
---|
hbase(main):005:0> scan 'cdap_system:metrics.kafka.meta' ROW COLUMN+CELL metrics.00 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00YQ metrics.01 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x06\xF6) metrics.02 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00'\xA6 metrics.03 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x85\xFD metrics.04 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x12\x18 metrics.05 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00\x0D\x96 metrics.06 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00\x03\xAA metrics.07 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x00\x06\xBE metrics.08 column=d:o, timestamp=1478729530145, value=\x00\x00\x00\x00\x00\x00*\xFD metrics.09 column=d:o, timestamp=1478732942360, value=\x00\x00\x00\x00\x00\x01:\x0D 10 row(s) in 0.0330 seconds |
Metrics processor needs to be changed to also store the timestamp for each offset. On startup, if the offset and timestamp don't match, it does a binary search for the correct offset for that timestamp.
Edge Conditions
If there are multiple Kafka messages for the same timestamp, some logs and metrics for that timestamp may be duplicated in the slave.
Approach
Approach #1
Approach #2
API changes
New Programmatic APIs
New Java APIs introduced (both user facing and internal)
Deprecated Programmatic APIs
No programmatic APIs will be deprecated
New REST APIs
No new REST APIs will be added to the platform. There may be new REST APIs used by an external service used to handle table DDL and replication.
Deprecated REST API
No REST APIs will be deprecated
CLI Impact or Changes
- No Changes
UI Impact or Changes
- No Changes
Security Impact
What's the impact on Authorization and how does the design take care of this aspect
Impact on Infrastructure Outages
With replication, there is now another cluster that is required. If the cold cluster suffers an outage, replication will eventually catch up once service is restored. This assumes the outage lasts is for a shorter duration of time than how long HBase keeps events in its WAL.
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release 4.1.0
Related Work
- Work #1
- Work #2
- Work #3