Table of Contents |
---|
This document lists the detailed steps required for setting up Replication for CDAP :replication.
Cluster Setup
CDAP replication relies on the cluster administrator setting up replication on HBase, HDFS, Hive, and Kafka. It is assumed that CDAP is only running on the master cluster. It is assumed that you have not started CDAP before any of these steps.
HBase
Install the relevant
cdap-hbase-compat
package package on all hbase HBase nodes in your cluster . Compat packages are:- cdap-hbase-compat-0.96 cdap-hbase-compat-0.98
- cdap-hbase-compat-1.0
- cdap-hbase-compat-1.0-cdh
- cdap-hbase-compat-1.0-cdh5.5.0
- cdap-hbase-compat-1.1
- cdap-hbase-compat-1.2-cdh5.7.0
Modify
hbase-site.xml
on on all hbase HBase nodes to enable hbase replicationHBase replication, and to use the CDAP replication status coprocessors:Code Block <property> <name>hbase.replication</name> <value>true</value> </property> <property> <name>hbase.coprocessor.regionserver.classes</name> <value>co.cask.cdap.data2.replication.LastReplicateTimeObserver</value> </property> <property> <name>hbase.coprocessor.wal.classes</name> <value>co.cask.cdap.data2.replication.LastWriteTimeObserver</value> </property>
Modify
hbase-env.sh
on on all hbase nodes HBase nodes to include the hbase coprocessor HBase coprocessor in the classpath classpath:Code Block export HBASE_CLASSPATH="$HBASE_CLASSPATH:/opt/cdap/<hbase-compat-version>/coprocessor/*" # for example, if you're on cdh5.5.x and have installed the cdap-hbase-compat-1.0-cdh5.5.0 package: export HBASE_CLASSPATH="$HBASE_CLASSPATH:/opt/cdap/hbase-compat-1.0-cdh5.5.0/coprocessor/*"
- Restart hbase master HBase master and regionservers
in order to use the replication status coprocessors. Note that due to HBase limitations, these coprocessors cannot be used on HBase 0.96 or 0.98. Available compat packages are:
HDFS
Setup HDFS replication using the solution provided by your distribution. HDFS does not have true replication, but is usually achieved by scheduling regular distcp jobs.
Hive
Setup replication for the database backing your Hive Metastore. Note that this will simply replicate the Hive metadata (which tables exist, table metadata, etc.), but not the data itself. It is assumed you will not be running Hive queries on the slave until a manual failover occurs.
For example, to setup MySQL replication, follow the steps at https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html, which amount to:
...
Modify my.cnf on the master to set a server-id and use bin logging
Code Block |
---|
[mysqld]
log-bin=mysql-bin
server-id=1 |
...
Restart mysql on master
...
Modify my.cnf on the slave to set a server-id
Code Block |
---|
[mysqld]
server-id=2 |
...
Restart mysql on slave
...
Create a replication user on the master
Code Block |
---|
mysql> CREATE USER 'repl'@'%.mydomain.com' IDENTIFIED BY 'slavepass';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%.mydomain.com'; |
Obtain the master status
...
Enable replication from master to slave:
Code Block master hbase shell> add_peer '[slave-name]', '[slave-zookeeper-quorum]:/[slave-zk-node]' example: master hbase shell> add_peer 'hbase2', 'hbase2-master.domain.net:2181:/hbase'
Enable replication from slave to master:
Code Block slave hbase shell> add_peer '[master-name]', '[master-zookeeper-quorum]:/[master-zk-node]' example: slave hbase shell> add_peer 'hbase1', 'hbase1-master.domain.net:2181:/hbase'
Confirm that replication is working:
Code Block master hbase shell> create 'repltest', 'f' slave hbase shell> create 'repltest', 'f' master hbase shell> enable_table_replication 'repltest' slave hbase shell> alter 'repltest', { 'NAME' => 'f', 'REPLICATION_SCOPE' => 1 } master hbase shell> put 'repltest', 'masterrow', 'f:v1', 'v1' slave hbase shell> put 'repltest', 'slaverow', 'f:v1', 'v1' master hbase shell> scan 'repltest' slave hbase shell> scan 'repltest'
HDFS
Set up HDFS replication using the solution provided by your distribution. HDFS does not have true replication, but it is usually achieved by scheduling regular distcp
jobs.
Hive
Set up replication for the database backing your Hive Metastore. Note that this will simply replicate the Hive metadata (which tables exist, table metadata, etc.), but not the data itself. It is assumed you will not be running Hive queries on the slave until a manual failover occurs.
For example, to setup MySQL replication, follow the steps at https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html, which amount to:
Modify my.cnf on the master to set a server-id and use bin logging:
Code Block [mysqld] log-bin=mysql-bin binlog_format=row server-id=1
Restart mysql on the master
Modify my.cnf on the slave to set a server-id:
Code Block [mysqld] server-id=2
Restart MySQL on the slave
Create a replication user on the master:
Code Block mysql> CREATE USER 'repl'@'%' IDENTIFIED BY 'slavepass'; mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
In a production setup, you probably want to restrict the hostname instead of using '%'. Depending on your MySQL version, you may need to use
old_password
:Code Block mysql> SET PASSWORD FOR 'repl'@'%' = OLD_PASSWORD('slavepass');
Obtain the master status:
Code Block mysql > SHOW MASTER STATUS; +------------------+----------+--------------+------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | +------------------+----------+--------------+------------------+ | mysql-bin.000003 | 73 | test | manual,mysql | +------------------+----------+--------------+------------------+
Set master on the slave:
Code Block mysql> CHANGE MASTER TO -> MASTER_HOST='master_host_name', -> MASTER_USER='replication_user_name', // repl -------+ | > MASTER_PASSWORD='replication_password', //slavepass -> MASTER_LOG_FILE='recorded_log_file_name', //mysql-bin.000003 | 73 | test -> MASTER_LOG_POS=recorded_log_position; // 73
Start the slave:
Code Block mysql> start slave
Verify slave status:
Code Block mysql> SHOW SLAVE STATUS\G *************************** 1. row *************************** | manual,mysql | +------------------+----------+--------------+------------------+
Set master on the slave
Code Block mysql> CHANGE MASTER TO -> MASTER_HOST='master_host_name', -> MASTER_USER='replication_user_name', // repl -> MASTER_PASSWORD='replication_password', //slavepass -> MASTER_LOG_FILE='recorded_log_file_name', //mysql-bin.000003 -> MASTER_LOG_POS=recorded_log_position; // 73
Start slave
Code Block mysql> start slave
Verify slave status
Code Block mysql> SHOW SLAVE STATUS\G *************************** 1. row *************************** Slave_IO_State: Waiting for master to send event Master_Host: localhost Master_User: root Master_Port: 13000 Connect_Retry: 60 Master_Log_File: master-bin.000002 Read_Master_Log_Pos: 1307 Relay_Log_File: slave-relay-bin.000003 SlaveRelay_IOLog_StatePos: Waiting1508 for master to send event Relay_Master_Log_File: master-bin.000002 MasterSlave_IO_Host: localhostRunning: Yes Slave_SQL_Running: Yes Master_User Replicate_Do_DB: root Replicate_Ignore_DB: Master_Port: 13000 Replicate_Do_Table: Replicate_Ignore_Table: Connect_RetryReplicate_Wild_Do_Table: 60 Replicate_Wild_Ignore_Table: Master_Log_File: master-bin.000002 Last_Errno: 0 Read_Master_Log_Pos: 1307 RelayLast_Log_FileError: slave-relay-bin.000003 RelaySkip_Log_PosCounter: 15080 RelayExec_Master_Log_FilePos: 1307 master-bin.000002 SlaveRelay_IOLog_RunningSpace: 1858 Yes SlaveUntil_SQL_RunningCondition: YesNone Replicate_Do_DB: ReplicateUntil_IgnoreLog_DBFile: Replicate_Do_Table Until_Log_Pos: 0 Replicate_Ignore_Table: Master_SSL_Allowed: No Replicate_Wild_Do_Table: ReplicateMaster_WildSSL_IgnoreCA_TableFile: Master_SSL_CA_Path: Last_Errno: 0 Master_SSL_Cert: Last_ErrorMaster_SSL_Cipher: Skip_Counter: 0Master_SSL_Key: Seconds_Behind_Master: 0 ExecMaster_SSL_MasterVerify_LogServer_PosCert: 1307No RelayLast_LogIO_SpaceErrno: 0 1858 UntilLast_IO_ConditionError: None UntilLast_Log_File:SQL_Errno: 0 Last_SQL_Error: UntilReplicate_Ignore_LogServer_PosIds: 0 Master_SSLServer_AllowedId: No1 Master_SSL_CA_FileUUID: 3e11fa47-71ca-11e1-9e33-c80aa9429562 Master_SSLInfo_CA_Path: File: /var/mysqld.2/data/master.info Master_SSL_Cert: SQL_Delay: 0 Master_SSL_Cipher: SQL_Remaining_Delay: NULL MasterSlave_SQL_SSLRunning_KeyState: Slave has read all relay log; waiting Seconds_Behind_Master: 0 Master_SSL_Verify_Server_Cert: No for the slave I/O thread to update it LastMaster_IORetry_ErrnoCount: 010 LastMaster_IO_ErrorBind: Last_IO_Error_Timestamp: Last_SQL_Error_ErrnoTimestamp: 0 LastMaster_SQL_Error: Replicate_Ignore_Server_IdsSSL_Crl: Master_ServerSSL_IdCrlpath: 1 Master_UUIDRetrieved_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5 MasterExecuted_InfoGtid_File: /var/mysqld.2/data/master.infoSet: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5 SQL_Delay: 0 SQL_Remaining_Delay: NULLAuto_Position: 1 SlaveReplicate_SQLRewrite_Running_StateDB: Slave has read all relay log; waiting for the slave I/O thread to update it Master_Retry_Count: 10 Master_Bind: Last_IO_Error_Timestamp: Last_SQL_Error_Timestamp: Master_SSL_Crl: Master_SSL_Crlpath: Retrieved_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5 Executed_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5 Auto_Position: 1 Replicate_Rewrite_DB: Channel_name:
Kafka
Setup replication for the Kafka brokers you are using. Kafka MirrorMaker is the most common solution.
CDAP Setup
CDAP requires that you provide an extension that will perform HBase related DDL operations on both clusters instead of just one. To create an extension, you must implement the HBaseDDLExecutor class:
...
Channel_name:
Kafka
Set up replication for the Kafka brokers you are using. Kafka MirrorMaker (see https://kafka.apache.org/documentation.html#basic_ops_mirror_maker and https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330) is the most common solution.
CDAP Setup
CDAP requires that you provide an extension that will perform HBase-related DDL operations on both clusters instead of on just one. To create an extension, you must implement the HBaseDDLExecutor
class:
Code Block |
---|
/** * Interface providing the HBase DDL operations. */ public interface HBaseDDLExecutor extends Closeable { /** * Initialize the {@link HBaseDDLExecutor}. * @param context the context for the executor */ void initialize(HBaseDDLExecutorContext context); /** * Create the specified namespace if it does not exist. * * @param name the namespace to create * @throws IOException if a remote or network exception occurs */ boolean createNamespaceIfNotExists(String name) throws IOException; /** * Delete the specified namespace if it exists. * * @param name the namespace to delete * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if there are tables in the namespace */ void initializedeleteNamespaceIfExists(HBaseDDLExecutorContextString contextname) throws IOException; /** * Create the specified namespacetable if it does not exist. * * @param descriptor the descriptor for the table to create * @param name @param splitKeys the initial split keys for the namespacetable to create * @throws IOException if a remote or network exception occurs */ void createNamespaceIfNotExists(String name)createTableIfNotExists(TableDescriptor descriptor, @Nullable byte[][] splitKeys) throws IOException; /** * DeleteEnable the specified namespacetable if it existsis disabled. * * @param namenamespace the namespace of the table to deleteenable * @throws@param IOExceptionname ifthe aname remoteof orthe networktable exceptionto occursenable * @throws IllegalStateExceptionIOException if therea areremote tablesor innetwork theexception namespaceoccurs */ void deleteNamespaceIfExistsenableTableIfDisabled(String namespace, String name) throws IOException; /** * CreateDisable the specified table if it doesis not existenabled. * * @param descriptornamespace the descriptornamespace forof the table to createdisable * @param splitKeysname the initialname splitof keysthe fortable theto tabledisable * @throws IOException if a remote or network exception occurs */ void createTableIfNotExists(TableDescriptor descriptor, @Nullable byte[][] splitKeys) void disableTableIfEnabled(String namespace, String name) throws IOException; /** * EnableModify the specified table. The iftable itmust isbe disabled. * * @param namespace the namespace of the table to enablemodify * @param name the name of the table to modify * @param descriptor the table to enable descriptor for the table * @throws IOException if a remote or network exception occurs * @throws IOExceptionIllegalStateException if athe remotespecified ortable networkis exceptionnot occursdisabled */ void enableTableIfDisabledmodifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException; /** * DisableTruncate the specified table. The iftable itmust isbe enableddisabled. * * @param namespace the namespace of the table to disabletruncate * @param name the name of the table to disabletruncate * @throws IOException if a remote or network exception occurs exception occurs * @throws IllegalStateException if the specified table is not disabled */ void disableTableIfEnabledtruncateTable(String namespace, String name) throws IOException; /** * ModifyDelete the table specifiedif it tableexists. The table must be disabled. * * @param namespace the namespace of the table to delete * @param name the table to modify * @param name the name of the table to modify * @param descriptor the descriptor for the table delete * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if the specified table is not disabled */ void deleteTableIfExists(String namespace, String name) throws IOException; /** * @throwsGrant IOExceptionpermissions ifon a remotetable orto networkusers exceptionor occursgroups. * @throws IllegalStateException if the* specified table@param isnamespace notthe disablednamespace of the table */ void* modifyTable(String namespace, String@param name the name, TableDescriptorof descriptor) throws IOException;the table /** @param permissions A *map Truncatefrom theuser specifiedname table.to Thethe tablepermissions mustfor bethat disabled.user, given as a *string containing * @param namespace* the namespace of the table to truncate * @param name the name of the table to truncateonly the characters * @throws IOException if a remote or network exception occurs'a' (Admin), 'c' (Create), 'r' (Read), 'w' (Write), and 'x' (Execute). * @throws IllegalStateExceptionIOException if theanything specifiedgoes table is not disabled wrong */ void truncateTablegrantPermissions(String namespace, String name, Map<String, String> permissions) throws IOException; /** * Delete the table if it exists. The table must be disabled. * * @param namespace the namespace of the table to delete * @param name the table to delete * @throws IOException if a remote or network exception occurs * @throws IllegalStateException if the specified table is not disabled */ void deleteTableIfExists(String namespace, String name) throws IOException; } |
To deploy your extension:
Create an extension directory
Code Block $ mkdir -p /opt/cdap/master/ext/hbase/repl
Copy your jar to the directory
Code Block $ cp myextension.jar /opt/cdap/master/ext/hbase/repl/
Modify cdap-site.xml to use your implementation of HBaseDDLExecutor
Code Block <property> <name>hbase.ddlexecutor.extension.dir</name> <value>/opt/cdap/master/ext/hbase</value> </property>
Modify cdap-site.xml with any properties required by your executor. Anything prefixed by 'cdap.hbase.spi.hbase.' will be available through the Context object passed into your executor's initialize method
Code Block <property> <name>cdap.hbase.spi.hbase.zookeeper.quorum</name> <value>hbase-master-i18003-1000.dev.continuuity.net:2181/cdap</value> </property> <property> <name>cdap.hbase.spi.hbase.zookeeper.session.timeout</name> <value>60000</value> </property> <property> <name>cdap.hbase.spi.hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>cdap.hbase.spi.hbase.bulkload.staging.dir</name> <value>/tmp/hbase-staging</value> </property> <property> <name>cdap.hbase.spi.hbase.replication</name> <value>true</value> </property>
Before starting CDAP on the master cluster, load the HBase coprocessors required by CDAP onto the slave cluster
Code Block |
---|
$ cdap setup coprocessors |
Start CDAP on the master cluster
Code Block |
---|
$ cdap master start |
...
} |
Sample implementation of the SPI is located here.
To deploy your extension, run these steps on both your master and slave clusters:
Create an extension directory:
Code Block $ mkdir -p /opt/cdap/master/ext/hbase/repl
Copy your JAR to the directory:
Code Block $ cp myextension.jar /opt/cdap/master/ext/hbase/repl/
Modify
cdap-site.xml
to use your implementation ofHBaseDDLExecutor
:Code Block <property> <name>hbase.ddlexecutor.extension.dir</name> <value>/opt/cdap/master/ext/hbase</value> </property>
Modify
cdap-site.xml
with any properties required by your executor. Anything prefixed by 'cdap.hbase.spi.hbase.' will be available through the Context object passed into your executor's initialize method:Code Block <property> <name>cdap.hbase.spi.hbase.zookeeper.quorum</name> <value>hbase-master-i18003-1000.dev.continuuity.net:2181/cdap</value> </property> <property> <name>cdap.hbase.spi.hbase.zookeeper.session.timeout</name> <value>60000</value> </property> <property> <name>cdap.hbase.spi.hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>cdap.hbase.spi.hbase.bulkload.staging.dir</name> <value>/tmp/hbase-staging</value> </property> <property> <name>cdap.hbase.spi.hbase.replication</name> <value>true</value> </property>
Before starting CDAP on the master cluster, run a command on the slave cluster to load the HBase coprocessors required by CDAP onto the slave's HDFS:
Code Block slave$ cdap setup coprocessors
Start CDAP on the master cluster:
Code Block master$ cdap master start
Manual Failover Procedure
- Stop all CDAP programs on the master cluster
- Stop CDAP on the master cluster
- Copy any HDFS files that have not yet been copied using either your distro's solution or
distcp
Run the replication status tool to get cluster state:
Code Block master$ cdap run co.cask.cdap.data.tools.ReplicationStatusTool -m -o /tmp/master_state
Copy the master state onto your slave cluster:
Code Block master$ scp /tmp/master_state <slave>:/tmp/master_state
Verify replication has copied the required data onto the slave:
Code Block slave$ cdap run co.cask.cdap.data.tools.ReplicationStatusTool -i /tmp/master_state ... Master and Slave Checksums match. HDFS Replication is complete. HBase Replication is complete.
Run Hive's metatool to update locations for Hive tables:
Code Block slave$ hive --service metatool -updateLocation hdfs://[slave-namenode-host]:[slave-namenode-port] hdfs://[master-namenode-host]:[master-namenode-port] -tablePropKey avro.schema.url -serdePropKey avro.schema.url
Start CDAP on the slave:
Code Block slave$ cdap master start