Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
Phase 1 of replication is to support a hot-cold setup where CDAP data is replicated from one cluster to another using existing tools for replicating underlying infrastructure.
Goals
Allow manual failover from a hot cluster to a cold cluster.
User Stories
- As a cluster administrator, I want to be able to configure CDAP so that all HBase tables created by CDAP are set up to replicate data to another cluster
- As a cluster administrator, I want to be able to manually stop CDAP in one cluster and start it in another cluster with the exact same state
- As a cluster administrator, I want to be able to have a way to know when it is safe to start the cold cluster after the hot one has been shut down
Design
CDAP stores state in several systems:
HDFS
- Transaction snapshots
- Artifacts (jars)
- Streams
- FileSet based datasets
- Program logs
HBase
- CDAP entity metadata (program specifications, schedules, run history, metrics, etc.)
- Table based datasets
- Kafka offsets for metrics and logs
Kafka
- unprocessed metrics
- unsaved log messages
Hive
- Explorable CDAP datasets and their partitions
For phase 1, much of the responsiblity for data replication falls to the cluster administrator. It is assumed that replication of HDFS, Hive, and Kafka will be handled by the cluster administrator. HDFS is usually done through regularly scheduled distcp jobs, or by using some distro specific tools, such as Cloudera's Backup and Data Recovery (http://www.cloudera.com/documentation/enterprise/latest/topics/cm_bdr_about.html). Kafka can be done using MirrorMaker. Hive can be done by replicating the data (HDFS and/or HBase), and by replication the metastore through whatever replication mechanisms are available to the relational DB behind the metastore. All of this can be setup outside of CDAP.
HBase DDL
HBase, however, will require some hooks in CDAP, because replication must be setup for every table when it is created, and before any data is written to it. CDAP will define an interface to create, modify, and delete HBase tables. By default, it will be implemented by the current code, which only creates tables in the local HBase instance. Another implementation can be used by setting a property in cdap-site.xml that specifies the class to use. The jar containing the class must be included in the cdap classpath. This custom class could, for example, make an http call to an external service to create the needed hbase tables.
Java SPI
Code Block |
---|
/**
* Executes HBase DDL operations.
*/
public interface HBaseDDLExecutor {
/**
* Create the specified namespace if it does not exist.
*
* @param name the namespace to create
* @throws IOException if a remote or network exception occurs
*/
void createNamespaceIfNotExists(String name) throws IOException;
/**
* Delete the specified namespace if it exists.
*
* @param name the namespace to delete
* @throws IOException if a remote or network exception occurs
*/
void deleteNamespaceIfExists(String name) throws IOException;
/**
* Create the specified table if it does not exist.
*
* @param descriptor the descriptor for the table to create
* @param splitKeys
* @throws IOException if a remote or network exception occurs
*/
void createTableIfNotExists(HTableDescriptor descriptor, byte [][] splitKeys) throws IOException;
/**
* Enable the specified table
*
* @param name the table to enable
* @throws IOException if a remote or network exception occurs
* @throws NotFoundException if the specified table does not exist
*/
void enableTable(TableName name) throws IOException;
/**
* Disable the specified table
*
* @param name the table to disable
* @throws IOException if a remote or network exception occurs
* @throws NotFoundException if the specified table does not exist
*/
void disableTable(TableName name) throws IOException;
/**
* Modify the specified table
*
* @param name the table to modify
* @param descriptor the descriptor for the table
* @throws IOException if a remote or network exception occurs
* @throws NotFoundException if the specified table does not exist
*/
void modifyTable(TableName name, HTableDescriptor descriptor) throws IOException;
/**
* Delete the table if it exists.
*
* @param name the table to delete
* @throws IOException if a remote or network exception occurs
*/
void deleteTableIfExists(TableName name) throws IOException;
} |
The default implementation will simply use the existing HBaseTableUtil. There can be another implementation that makes REST calls for each method, leaving actual HBase operations up to an external service.
Replication Status
Cluster administrators will require a way to tell when it is safe for a cold cluster to be started up. In other words, they need to be able to tell when all necessary data has been replicated. HBase shell already includes a command that helps:
Code Block |
---|
hbase(main):030:0> status 'replication', 'source' version 1.1.2.2.3.4.7-4 1 live servers [hostname]: SOURCE: PeerID=1, AgeOfLastShippedOp=29312, SizeOfLogQueue=0, TimeStampsOfLastShippedOp=Thu Nov 10 22:51:55 UTC 2016, Replication Lag=29312 |
HBase also includes a mapreduce job that can be used to verify replicated data (https://hbase.apache.org/book.html#_verifying_replicated_data). It must be run on the master cluster.
Code Block |
---|
$ HADOOP_CLASSPATH=`hbase classpath` hadoop jar /usr/hdp/current/hbase-master/lib/hbase-server-1.1.2.2.3.4.7-4.jar verifyrep <peer id> <table> ... Map-Reduce Framework Map input records=1 Map output records=0 Input split bytes=103 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=64 CPU time spent (ms)=1810 Physical memory (bytes) snapshot=255139840 Virtual memory (bytes) snapshot=916021248 Total committed heap usage (bytes)=287309824 org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$Counters BADROWS=1 CONTENT_DIFFERENT_ROWS=1 |
Under the HBase counters, you only want to see the GOODROWS counter, and not BADROWS or CONTENT_DIFFERENT_ROWS.
Kafka offset mismatches
MirrorMaker is not much more than a Kafka client that consumes from source topics and writes the same messages to some destination. As such, partitions and offsets are not guaranteed to be the same. The log saver, metrics processor, and their corresponding fetch endpoints will need to be able to handle the fact that Kafka offsets can be different in the hot and cold clusters.
Approach
Approach #1
Approach #2
API changes
New Programmatic APIs
New Java APIs introduced (both user facing and internal)
Deprecated Programmatic APIs
New REST APIs
Path | Method | Description | Response Code | Response |
---|---|---|---|---|
/v3/apps/<app-id> | GET | Returns the application spec for a given application | 200 - On success 404 - When application is not available 500 - Any internal errors |
|
Deprecated REST API
Path | Method | Description |
---|---|---|
/v3/apps/<app-id> | GET | Returns the application spec for a given application |
CLI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
UI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
Security Impact
What's the impact on Authorization and how does the design take care of this aspect
Impact on Infrastructure Outages
System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release 4.0.0
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3