CDAP Replication Deployment

This document lists the detailed steps required for setting up CDAP replication.

Cluster Setup

CDAP replication relies on the cluster administrator setting up replication on HBase, HDFS, Hive, and Kafka. It is assumed that CDAP is only running on the master cluster. It is assumed that you have not started CDAP before any of these steps.

HBase

  • Install the relevant cdap-hbase-compat package on all HBase nodes in your cluster in order to use the replication status coprocessors. Note that due to HBase limitations, these coprocessors cannot be used on HBase 0.96 or 0.98. Available compat packages are:

    1. cdap-hbase-compat-1.0
    2. cdap-hbase-compat-1.0-cdh
    3. cdap-hbase-compat-1.0-cdh5.5.0
    4. cdap-hbase-compat-1.1
    5. cdap-hbase-compat-1.2-cdh5.7.0
  • Modify hbase-site.xml on all HBase nodes to enable HBase replication, and to use the CDAP replication status coprocessors:

    <property>
      <name>hbase.replication</name>
      <value>true</value>
    </property>
    <property>
      <name>hbase.coprocessor.regionserver.classes</name>
      <value>co.cask.cdap.data2.replication.LastReplicateTimeObserver</value>
    </property>
    <property>
      <name>hbase.coprocessor.wal.classes</name>
      <value>co.cask.cdap.data2.replication.LastWriteTimeObserver</value>
    </property>
  • Modify hbase-env.sh on all HBase nodes to include the HBase coprocessor in the classpath:

    export HBASE_CLASSPATH="$HBASE_CLASSPATH:/opt/cdap/<hbase-compat-version>/coprocessor/*"
     
    # for example, if you're on cdh5.5.x and have installed the cdap-hbase-compat-1.0-cdh5.5.0 package:
    export HBASE_CLASSPATH="$HBASE_CLASSPATH:/opt/cdap/hbase-compat-1.0-cdh5.5.0/coprocessor/*"
  • Restart HBase master and regionservers
  • Enable replication from master to slave:

    master hbase shell> add_peer '[slave-name]', '[slave-zookeeper-quorum]:/[slave-zk-node]'
     
    example:
    master hbase shell> add_peer 'hbase2', 'hbase2-master.domain.net:2181:/hbase'
  • Enable replication from slave to master:

    slave hbase shell> add_peer '[master-name]', '[master-zookeeper-quorum]:/[master-zk-node]'
     
    example:
    slave hbase shell> add_peer 'hbase1', 'hbase1-master.domain.net:2181:/hbase'
  • Confirm that replication is working:

    master hbase shell> create 'repltest', 'f'
    slave hbase shell> create 'repltest', 'f'
    master hbase shell> enable_table_replication 'repltest'
    slave hbase shell> alter 'repltest', { 'NAME' => 'f', 'REPLICATION_SCOPE' => 1 }
    master hbase shell> put 'repltest', 'masterrow', 'f:v1', 'v1'
    slave hbase shell> put 'repltest', 'slaverow', 'f:v1', 'v1'
    master hbase shell> scan 'repltest'
    slave hbase shell> scan 'repltest'

HDFS

Set up HDFS replication using the solution provided by your distribution. HDFS does not have true replication, but it is usually achieved by scheduling regular distcp jobs.

Hive

Set up replication for the database backing your Hive Metastore. Note that this will simply replicate the Hive metadata (which tables exist, table metadata, etc.), but not the data itself. It is assumed you will not be running Hive queries on the slave until a manual failover occurs.

For example, to setup MySQL replication, follow the steps at https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html, which amount to:

  1. Modify my.cnf on the master to set a server-id and use bin logging:

    [mysqld]
    log-bin=mysql-bin
    binlog_format=row
    server-id=1
  2. Restart mysql on the master

  3. Modify my.cnf on the slave to set a server-id:

    [mysqld]
    server-id=2
  4. Restart MySQL on the slave

  5. Create a replication user on the master:

    mysql> CREATE USER 'repl'@'%' IDENTIFIED BY 'slavepass';
    mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';

    In a production setup, you probably want to restrict the hostname instead of using '%'. Depending on your MySQL version, you may need to use old_password:

    mysql> SET PASSWORD FOR 'repl'@'%' = OLD_PASSWORD('slavepass');
  6. Obtain the master status:

    mysql > SHOW MASTER STATUS;
    +------------------+----------+--------------+------------------+
    | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |
    +------------------+----------+--------------+------------------+
    | mysql-bin.000003 | 73       | test         | manual,mysql     |
    +------------------+----------+--------------+------------------+
  7. Set master on the slave:

    mysql> CHANGE MASTER TO
        ->     MASTER_HOST='master_host_name',
        ->     MASTER_USER='replication_user_name', // repl
        ->     MASTER_PASSWORD='replication_password', //slavepass
        ->     MASTER_LOG_FILE='recorded_log_file_name', //mysql-bin.000003
        ->     MASTER_LOG_POS=recorded_log_position; // 73
  8. Start the slave:

    mysql> start slave
  9. Verify slave status: 

    mysql> SHOW SLAVE STATUS\G
    *************************** 1. row ***************************
                   Slave_IO_State: Waiting for master to send event
                      Master_Host: localhost
                      Master_User: root
                      Master_Port: 13000
                    Connect_Retry: 60
                  Master_Log_File: master-bin.000002
              Read_Master_Log_Pos: 1307
                   Relay_Log_File: slave-relay-bin.000003
                    Relay_Log_Pos: 1508
            Relay_Master_Log_File: master-bin.000002
                 Slave_IO_Running: Yes
                Slave_SQL_Running: Yes
                  Replicate_Do_DB:
              Replicate_Ignore_DB:
               Replicate_Do_Table:
           Replicate_Ignore_Table:
          Replicate_Wild_Do_Table:
      Replicate_Wild_Ignore_Table:
                       Last_Errno: 0
                       Last_Error:
                     Skip_Counter: 0
              Exec_Master_Log_Pos: 1307
                  Relay_Log_Space: 1858
                  Until_Condition: None
                   Until_Log_File:
                    Until_Log_Pos: 0
               Master_SSL_Allowed: No
               Master_SSL_CA_File:
               Master_SSL_CA_Path:
                  Master_SSL_Cert:
                Master_SSL_Cipher:
                   Master_SSL_Key:
            Seconds_Behind_Master: 0
    Master_SSL_Verify_Server_Cert: No
                    Last_IO_Errno: 0
                    Last_IO_Error:
                   Last_SQL_Errno: 0
                   Last_SQL_Error:
      Replicate_Ignore_Server_Ids:
                 Master_Server_Id: 1
                      Master_UUID: 3e11fa47-71ca-11e1-9e33-c80aa9429562
                 Master_Info_File: /var/mysqld.2/data/master.info
                        SQL_Delay: 0
              SQL_Remaining_Delay: NULL
          Slave_SQL_Running_State: Slave has read all relay log; waiting for the slave I/O thread to update it
               Master_Retry_Count: 10
                      Master_Bind:
          Last_IO_Error_Timestamp:
         Last_SQL_Error_Timestamp:
                   Master_SSL_Crl:
               Master_SSL_Crlpath:
               Retrieved_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5
                Executed_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5
                    Auto_Position: 1
             Replicate_Rewrite_DB:
                     Channel_name:

Kafka

Set up replication for the Kafka brokers you are using. Kafka MirrorMaker (see https://kafka.apache.org/documentation.html#basic_ops_mirror_maker and https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330) is the most common solution.

CDAP Setup

CDAP requires that you provide an extension that will perform HBase-related DDL operations on both clusters instead of on just one. To create an extension, you must implement the HBaseDDLExecutor class:

/**
 * Interface providing the HBase DDL operations.
 */
public interface HBaseDDLExecutor extends Closeable {
  /**
   * Initialize the {@link HBaseDDLExecutor}.
   * @param context the context for the executor
   */
  void initialize(HBaseDDLExecutorContext context);

  /**
   * Create the specified namespace if it does not exist.
   *
   * @param name the namespace to create
   * @throws IOException if a remote or network exception occurs
   */
  boolean createNamespaceIfNotExists(String name) throws IOException;

  /**
   * Delete the specified namespace if it exists.
   *
   * @param name the namespace to delete
   * @throws IOException if a remote or network exception occurs
   * @throws IllegalStateException if there are tables in the namespace
   */
  void deleteNamespaceIfExists(String name) throws IOException;

  /**
   * Create the specified table if it does not exist.
   *
   * @param descriptor the descriptor for the table to create
   * @param splitKeys the initial split keys for the table
   * @throws IOException if a remote or network exception occurs
   */
  void createTableIfNotExists(TableDescriptor descriptor, @Nullable byte[][] splitKeys)
    throws IOException;

  /**
   * Enable the specified table if it is disabled.
   *
   * @param namespace the namespace of the table to enable
   * @param name the name of the table to enable
   * @throws IOException if a remote or network exception occurs
   */
  void enableTableIfDisabled(String namespace, String name) throws IOException;

  /**
   * Disable the specified table if it is enabled.
   *
   * @param namespace the namespace of the table to disable
   * @param name the name of the table to disable
   * @throws IOException if a remote or network exception occurs
   */
  void disableTableIfEnabled(String namespace, String name) throws IOException;

  /**
   * Modify the specified table. The table must be disabled.
   *
   * @param namespace the namespace of the table to modify
   * @param name the name of the table to modify
   * @param descriptor the descriptor for the table
   * @throws IOException if a remote or network exception occurs
   * @throws IllegalStateException if the specified table is not disabled
   */
  void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException;

  /**
   * Truncate the specified table. The table must be disabled.
   *
   * @param namespace the namespace of the table to truncate
   * @param name the name of the table to truncate
   * @throws IOException if a remote or network exception occurs
   * @throws IllegalStateException if the specified table is not disabled
   */
  void truncateTable(String namespace, String name) throws IOException;

  /**
   * Delete the table if it exists. The table must be disabled.
   *
   * @param namespace the namespace of the table to delete
   * @param name the table to delete
   * @throws IOException if a remote or network exception occurs
   * @throws IllegalStateException if the specified table is not disabled
   */
  void deleteTableIfExists(String namespace, String name) throws IOException;
 
  /**
   * Grant permissions on a table to users or groups.
   *
   * @param namespace the namespace of the table
   * @param name the name of the table
   * @param permissions A map from user name to the permissions for that user, given as a string containing
   *                    only the characters 'a' (Admin), 'c' (Create), 'r' (Read), 'w' (Write), and 'x' (Execute).
   * @throws IOException if anything goes wrong
   */
  void grantPermissions(String namespace, String name, Map<String, String> permissions) throws IOException;
}


Sample implementation of the SPI is located here.

To deploy your extension, run these steps on both your master and slave clusters:

  1. Create an extension directory:

    $ mkdir -p /opt/cdap/master/ext/hbase/repl
  2. Copy your JAR to the directory:

    $ cp myextension.jar /opt/cdap/master/ext/hbase/repl/
  3. Modify cdap-site.xml to use your implementation of HBaseDDLExecutor:

    <property>
      <name>hbase.ddlexecutor.extension.dir</name>
      <value>/opt/cdap/master/ext/hbase</value>
    </property>
  4. Modify cdap-site.xml with any properties required by your executor. Anything prefixed by 'cdap.hbase.spi.hbase.' will be available through the Context object passed into your executor's initialize method:

    <property>
      <name>cdap.hbase.spi.hbase.zookeeper.quorum</name>
      <value>hbase-master-i18003-1000.dev.continuuity.net:2181/cdap</value>
    </property>
    <property>
      <name>cdap.hbase.spi.hbase.zookeeper.session.timeout</name>
      <value>60000</value>
    </property>
    <property>
      <name>cdap.hbase.spi.hbase.cluster.distributed</name>
      <value>true</value>
    </property>
    <property>
      <name>cdap.hbase.spi.hbase.bulkload.staging.dir</name>
      <value>/tmp/hbase-staging</value>
    </property>
    <property>
      <name>cdap.hbase.spi.hbase.replication</name>
      <value>true</value>
    </property>
  5. Before starting CDAP on the master cluster, run a command on the slave cluster to load the HBase coprocessors required by CDAP onto the slave's HDFS:

    slave$ cdap setup coprocessors
  6. Start CDAP on the master cluster:

    master$ cdap master start

Manual Failover Procedure

  1. Stop all CDAP programs on the master cluster
  2. Stop CDAP on the master cluster
  3. Copy any HDFS files that have not yet been copied using either your distro's solution or distcp
  4. Run the replication status tool to get cluster state:

    master$ cdap run co.cask.cdap.data.tools.ReplicationStatusTool -m -o /tmp/master_state
  5. Copy the master state onto your slave cluster:

    master$ scp /tmp/master_state <slave>:/tmp/master_state
  6. Verify replication has copied the required data onto the slave:

    slave$ cdap run co.cask.cdap.data.tools.ReplicationStatusTool -i /tmp/master_state
    ...
    Master and Slave Checksums match. HDFS Replication is complete.
    HBase Replication is complete.
  7. Run Hive's metatool to update locations for Hive tables:

    slave$ hive --service metatool -updateLocation hdfs://[slave-namenode-host]:[slave-namenode-port] hdfs://[master-namenode-host]:[master-namenode-port] -tablePropKey avro.schema.url -serdePropKey avro.schema.url
  8. Start CDAP on the slave:

    slave$ cdap master start