Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This document lists the detailed steps required for setting up Replication for CDAP :replication.

Cluster Setup

CDAP replication relies on the cluster administrator setting up replication on HBase, HDFS, Hive, and Kafka. It is assumed that CDAP is only running on the master cluster. It is assumed that you have not started CDAP before any of these steps.

HBase

  • Install the relevant cdap-hbase-compat package  package on all hbase HBase nodes in your cluster in order to use the replication status coprocessors. Note that due to HBase limitations, these coprocessors cannot be used on hbase HBase 0.96 or 0.98. Compat Available compat packages are:

    1. cdap-hbase-compat-1.0
    2. cdap-hbase-compat-1.0-cdh
    3. cdap-hbase-compat-1.0-cdh5.5.0
    4. cdap-hbase-compat-1.1
    5. cdap-hbase-compat-1.2-cdh5.7.0
  • Modify hbase-site.xml on  on all hbase HBase nodes to enable hbase replicationHBase replication, and to use the CDAP replication status coprocessors:

    Code Block
    <property>
      <name>hbase.replication</name>
      <value>true</value>
    </property>
    <property>
      <name>hbase.coprocessor.regionserver.classes</name>
      <value>co.cask.cdap.data2.replication.LastReplicateTimeObserver</value>
    </property>
    <property>
      <name>hbase.coprocessor.wal.classes</name>
      <value>co.cask.cdap.data2.replication.LastWriteTimeObserver</value>
    </property>
  • Modify hbase-env.sh on  on all hbase nodes HBase nodes to include the hbase coprocessor HBase coprocessor in the classpath classpath:

    Code Block
    export HBASE_CLASSPATH="$HBASE_CLASSPATH:/opt/cdap/<hbase-compat-version>/coprocessor/*"
     
    # for example, if you're on cdh5.5.x and have installed the cdap-hbase-compat-1.0-cdh5.5.0 package:
    export HBASE_CLASSPATH="$HBASE_CLASSPATH:/opt/cdap/hbase-compat-1.0-cdh5.5.0/coprocessor/*"
  • Restart hbase master HBase master and regionservers

HDFS

Setup HDFS replication using the solution provided by your distribution. HDFS does not have true replication, but is usually achieved by scheduling regular distcp jobs.

Hive

Setup replication for the database backing your Hive Metastore. Note that this will simply replicate the Hive metadata (which tables exist, table metadata, etc.), but not the data itself. It is assumed you will not be running Hive queries on the slave until a manual failover occurs.

For example, to setup MySQL replication, follow the steps at https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html, which amount to:

...

Modify my.cnf on the master to set a server-id and use bin logging 

Code Block
[mysqld]
log-bin=mysql-bin
server-id=1

...

Restart mysql on master

...

Modify my.cnf on the slave to set a server-id 

Code Block
[mysqld]
server-id=2

...

Restart mysql on slave

...

Create a replication user on the master 

Code Block
mysql> CREATE USER 'repl'@'%.mydomain.com' IDENTIFIED BY 'slavepass';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%.mydomain.com';

...

Obtain the master status 

Code Block
mysql > SHOW MASTER STATUS;
+------------------+----------+--------------+------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |
+------------------+----------+--------------+------------------+
| mysql-bin.000003 | 73       | test         | manual,mysql     |
+------------------+----------+--------------+------------------+

...

Set master on the slave 

Code Block
mysql> CHANGE MASTER TO
    ->     MASTER_HOST='master_host_name',
    ->     MASTER_USER='replication_user_name', // repl
    ->     MASTER_PASSWORD='replication_password', //slavepass
    ->     MASTER_LOG_FILE='recorded_log_file_name', //mysql-bin.000003
    ->     MASTER_LOG_POS=recorded_log_position; // 73

...

Start slave 

Code Block
mysql> start slave

Verify slave status 

...

  • Enable replication from master to slave:

    Code Block
    master hbase shell> add_peer '[slave-name]', '[slave-zookeeper-quorum]:/[slave-zk-node]'
     
    example:
    master hbase shell> add_peer 'hbase2', 'hbase2-master.domain.net:2181:/hbase'
  • Enable replication from slave to master:

    Code Block
    slave hbase shell> add_peer '[master-name]', '[master-zookeeper-quorum]:/[master-zk-node]'
     
    example:
    slave hbase shell> add_peer 'hbase1', 'hbase1-master.domain.net:2181:/hbase'
  • Confirm that replication is working:

    Code Block
    master hbase shell> create 'repltest', 'f'
    slave hbase shell> create 'repltest', 'f'
    master hbase shell> enable_table_replication 'repltest'
    slave hbase shell> alter 'repltest', { 'NAME' => 'f', 'REPLICATION_SCOPE' => 1 }
    master hbase shell> put 'repltest', 'masterrow', 'f:v1', 'v1'
    slave hbase shell> put 'repltest', 'slaverow', 'f:v1', 'v1'
    master hbase shell> scan 'repltest'
    slave hbase shell> scan 'repltest'

HDFS

Set up HDFS replication using the solution provided by your distribution. HDFS does not have true replication, but it is usually achieved by scheduling regular distcp jobs.

Hive

Set up replication for the database backing your Hive Metastore. Note that this will simply replicate the Hive metadata (which tables exist, table metadata, etc.), but not the data itself. It is assumed you will not be running Hive queries on the slave until a manual failover occurs.

For example, to setup MySQL replication, follow the steps at https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html, which amount to:

  1. Modify my.cnf on the master to set a server-id and use bin logging:

    Code Block
    [mysqld]
    log-bin=mysql-bin
    binlog_format=row
    server-id=1
  2. Restart mysql on the master

  3. Modify my.cnf on the slave to set a server-id:

    Code Block
    [mysqld]
    server-id=2
  4. Restart MySQL on the slave

  5. Create a replication user on the master:

    Code Block
    mysql> CREATE USER 'repl'@'%' IDENTIFIED BY 'slavepass';
    mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';

    In a production setup, you probably want to restrict the hostname instead of using '%'. Depending on your MySQL version, you may need to use old_password:

    Code Block
    mysql> SET PASSWORD FOR 'repl'@'%' = OLD_PASSWORD('slavepass');
  6. Obtain the master status:

    Code Block
    mysql > SHOW MASTER STATUS;
    +------------------+----------+--------------+------------------+
    | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |
    +------------------+----------+--------------+------------------+
    | mysql-bin.000003 | 73       | test         | manual,mysql     |
    +------------------+----------+--------------+------------------+
  7. Set master on the slave:

    Code Block
    mysql> CHANGE MASTER TO
        ->     MASTER_HOST='master_host_name',
        ->     MASTER_USER='replication_user_name', // repl
        ->     MASTER_PASSWORD='replication_password', //slavepass
        ->     MASTER_LOG_FILE='recorded_log_file_name', //mysql-bin.000003
        ->     MASTER_LOG_POS=recorded_log_position; // 73
  8. Start the slave:

    Code Block
    mysql> start slave
  9. Verify slave status: 

    Code Block
    mysql> SHOW SLAVE STATUS\G
    *************************** 1. row ***************************
              Relay_Log_Pos: 1508         Relay_Master_Log_File: master-bin.000002
        Slave_IO_State: Waiting for master to send event
            Slave_IO_Running: Yes         Master_Host: localhost
      Slave_SQL_Running: Yes               ReplicateMaster_Do_DBUser:    root
          Replicate_Ignore_DB:            ReplicateMaster_Do_TablePort: 13000
          Replicate_Ignore_Table:       Replicate_Wild_Do_Table   Connect_Retry: 60
     Replicate_Wild_Ignore_Table:             Master_Log_File: master-bin.000002
          Last_Errno: 0   Read_Master_Log_Pos: 1307
                   LastRelay_Log_ErrorFile: slave-relay-bin.000003
                    SkipRelay_Log_CounterPos: 01508
              ExecRelay_Master_Log_PosFile: 1307master-bin.000002
                  RelaySlave_LogIO_SpaceRunning: 1858Yes
                  Until_ConditionSlave_SQL_Running: NoneYes
                   UntilReplicate_LogDo_FileDB:
                    Until_Log_PosReplicate_Ignore_DB:
    0            MasterReplicate_SSLDo_AllowedTable:
    No       Replicate_Ignore_Table:
          MasterReplicate_SSLWild_CADo_FileTable:
      Replicate_Wild_Ignore_Table:
            Master_SSL_CA_Path:               Master_SSL_Cert:Last_Errno: 0
                Master_SSL_Cipher:       Last_Error:
            Master_SSL_Key:         SecondsSkip_Behind_MasterCounter: 0
    Master_SSL_Verify_Server_Cert: No                 Last_IO_Errno: 0Exec_Master_Log_Pos: 1307
                    LastRelay_IOLog_ErrorSpace: 1858
                  LastUntil_SQL_ErrnoCondition: 0None
                   LastUntil_SQLLog_ErrorFile:
      Replicate_Ignore_Server_Ids:              MasterUntil_ServerLog_IdPos: 0
    1           Master_SSL_Allowed: No
               Master_SSL_UUID: 3e11fa47-71ca-11e1-9e33-c80aa9429562CA_File:
                 Master_SSL_InfoCA_File: /var/mysqld.2/data/master.infoPath:
                  Master_SSL_Cert:
                SQLMaster_SSL_DelayCipher:
      0             SQLMaster_RemainingSSL_DelayKey:
     NULL       SlaveSeconds_SQL_Running_State: Slave has read all relay log; waiting for the slave I/O thread to update itBehind_Master: 0
    Master_SSL_Verify_Server_Cert: No
                    Last_IO_Errno: 0
                    Last_IO_Error:
                 Master_Retry_Count: 10
                      Master_Bind:
          Last_IO_Error_Timestamp:
         Last_SQL_Error_Timestamp:
                   Master_SSL_Crl:
               Master_SSL_Crlpath:
               Retrieved_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5
                Executed_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5
                    Auto_Position: 1
             Replicate_Rewrite_DB:
                     Channel_name:

Kafka

Setup replication for the Kafka brokers you are using. Kafka MirrorMaker is the most common solution.

CDAP Setup

CDAP requires that you provide an extension that will perform HBase related DDL operations on both clusters instead of just one. To create an extension, you must implement the HBaseDDLExecutor class:

...

  1.   Last_SQL_Errno: 0
                   Last_SQL_Error:
      Replicate_Ignore_Server_Ids:
                 Master_Server_Id: 1
                      Master_UUID: 3e11fa47-71ca-11e1-9e33-c80aa9429562
                 Master_Info_File: /var/mysqld.2/data/master.info
                        SQL_Delay: 0
              SQL_Remaining_Delay: NULL
          Slave_SQL_Running_State: Slave has read all relay log; waiting for the slave I/O thread to update it
               Master_Retry_Count: 10
                      Master_Bind:
          Last_IO_Error_Timestamp:
         Last_SQL_Error_Timestamp:
                   Master_SSL_Crl:
               Master_SSL_Crlpath:
               Retrieved_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5
                Executed_Gtid_Set: 3e11fa47-71ca-11e1-9e33-c80aa9429562:1-5
                    Auto_Position: 1
             Replicate_Rewrite_DB:
                     Channel_name:

Kafka

Set up replication for the Kafka brokers you are using. Kafka MirrorMaker (see https://kafka.apache.org/documentation.html#basic_ops_mirror_maker and https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330) is the most common solution.

CDAP Setup

CDAP requires that you provide an extension that will perform HBase-related DDL operations on both clusters instead of on just one. To create an extension, you must implement the HBaseDDLExecutor class:

Code Block
/**
 * Interface providing the HBase DDL operations.
 */
public interface HBaseDDLExecutor extends Closeable {
  /**
   * Initialize the {@link HBaseDDLExecutor}.
   * @param context the context for the executor
   */
  void initialize(HBaseDDLExecutorContext context);

  /**
   * Create the specified namespace if it does not exist.
   *
   * @param name the namespace to create
   * @throws IOException if a remote or network exception occurs
   */
  boolean createNamespaceIfNotExists(String name) throws IOException;

  /**
   * Delete the specified namespace if it exists.
   *
   * @param name the namespace to delete
   * @throws IOException if a remote or network exception occurs
   * @throws IllegalStateException if there are tables in the namespace
   */
  void initializedeleteNamespaceIfExists(HBaseDDLExecutorContextString contextname) throws IOException;

  /**
   * Create the specified namespacetable if it does not exist.
   *
   * @param name descriptor the descriptor for the namespacetable to create
   * @param splitKeys the initial split keys for the table
   * @throws IOException if a remote or network exception occurs
   */
  void createNamespaceIfNotExists(String name) createTableIfNotExists(TableDescriptor descriptor, @Nullable byte[][] splitKeys)
    throws IOException;

  /**
   * DeleteEnable the specified namespacetable if it itis existsdisabled.
   *
   * @param namenamespace the namespace of the table to deleteenable
   * @throws@param IOExceptionname ifthe aname remoteof orthe networktable exceptionto occursenable
   * @throws IllegalStateExceptionIOException if therea areremote tablesor innetwork theexception namespaceoccurs
   */
  void deleteNamespaceIfExistsenableTableIfDisabled(String namespace, String name) throws IOException;

  /**
   * CreateDisable the specified table if it doesis not existenabled.
   *
   * @param descriptornamespace the descriptornamespace forof the table to createdisable
   * @param splitKeysname the initialname splitof keysthe fortable theto tabledisable
   * @throws IOException if a remote or network exception occurs
   */
  void createTableIfNotExistsdisableTableIfEnabled(TableDescriptorString descriptornamespace, @Nullable byte[][] splitKeys)
   String name) throws IOException;

  /**
   * EnableModify the specified table. ifThe table itmust isbe disabled.
   *
   * @param namespace the namespace of the table to enablemodify
   * @param name the name of of the table to modify
   * @param descriptor the descriptor for the table
to enable    * @throws IOException if a remote or network exception occurs
   */ @throws IllegalStateException void enableTableIfDisabled(String namespace, String name) throws IOException;

  /**
   * Disable if the specified table if it is enabled.
   *not disabled
   */
@param namespace the namespace of the table to disable
   * @param name the name of the table to disable
   * @throws IOException if a remote or network exception occursvoid modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException;

  /**
   * Truncate the specified table. The table must be disabled.
   *
   */ @param namespace voidthe disableTableIfEnabled(String namespace, String name) throws IOException;namespace of the table to truncate
   /** @param name the *name Modifyof the specified table. The table must be disabled.
   *
   * @param namespace the namespace of the table to modifytable to truncate
   * @throws IOException if a remote or network exception occurs
   * @throws IllegalStateException if the specified table is not disabled
   */
@param name thevoid name of the table to modifytruncateTable(String namespace, String name) throws IOException;

  /**
@param descriptor the descriptor* forDelete the table if it exists. *The @throwstable IOExceptionmust ifbe adisabled.
remote or network exception*
occurs    * @throws@param IllegalStateExceptionnamespace if the specifiednamespace tableof isthe not disabled
   */
  void modifyTable(String namespace, String name, TableDescriptor descriptor) throws IOException;

  /**table to delete
   * @param name the table to delete
   * Truncate@throws theIOException specifiedif table.a Theremote tableor mustnetwork beexception disabled.occurs
   * @throws IllegalStateException if the specified *table @paramis namespacenot thedisabled
namespace of the table*/
to truncate void deleteTableIfExists(String namespace, *String @param name) thethrows nameIOException;
of 
the table to/**
truncate    * @throwsGrant IOExceptionpermissions ifon a remotetable orto networkusers exceptionor occursgroups.
   *
@throws IllegalStateException if the* specified@param tablenamespace isthe notnamespace disabledof the table
 */  * void@param truncateTable(String namespace, Stringname the name) of throwsthe IOException;table
   /** @param permissions A map from *user name Deleteto the tablepermissions iffor itthat exists.user, Thegiven tableas musta bestring disabled.containing
   *     * @param namespace the namespace of the table to delete    * @param nameonly the table to delete
   * @throws IOException if a remote or network exception occurs
   * @throws IllegalStateException if the specified table is not disabled
   characters 'a' (Admin), 'c' (Create), 'r' (Read), 'w' (Write), and 'x' (Execute).
   * @throws IOException if anything goes wrong
   */
  void deleteTableIfExistsgrantPermissions(String namespace, String name, Map<String, String> permissions) throws IOException;
}


 Sample implementation of the SPI is located here.

To deploy your extension, run these steps on both your master and slave clusters:

  1. Create an extension directory directory:

    Code Block
    $ mkdir -p /opt/cdap/master/ext/hbase/repl
  2. Copy your jar JAR to the directory directory:

    Code Block
    $ cp myextension.jar /opt/cdap/master/ext/hbase/repl/
  3. Modify cdap-site.xml to  to use your implementation of HBaseDDLExecutor  HBaseDDLExecutor:

    Code Block
    <property>
      <name>hbase.ddlexecutor.extension.dir</name>
      <value>/opt/cdap/master/ext/hbase</value>
    </property>
  4. Modify cdap-site.xml with  with any properties required by your executor. Anything prefixed by 'cdap.hbase.spi.hbase.' will be available through the Context object passed into your executor's initialize method:

    Code Block
    <property>
      <name>cdap.hbase.spi.hbase.zookeeper.quorum</name>
      <value>hbase-master-i18003-1000.dev.continuuity.net:2181/cdap</value>
    </property>
    <property>
      <name>cdap.hbase.spi.hbase.zookeeper.session.timeout</name>
      <value>60000</value>
    </property>
    <property>
      <name>cdap.hbase.spi.hbase.cluster.distributed</name>
      <value>true</value>
    </property>
    <property>
      <name>cdap.hbase.spi.hbase.bulkload.staging.dir</name>
      <value>/tmp/hbase-staging</value>
    </property>
    <property>
      <name>cdap.hbase.spi.hbase.replication</name>
      <value>true</value>
    </property>
  5. Before starting CDAP on the master cluster, run a command on the slave cluster to load the HBase coprocessors required by CDAP onto the slave's

...

  1. HDFS:

    Code Block
    slave$ cdap setup coprocessors

...

  1. Start CDAP on the master cluster:

    Code Block
    master$ cdap master start

...

Manual Failover Procedure

  1. Stop all CDAP programs on the master cluster
  2. Stop CDAP on the master cluster
  3. Copy any hdfs HDFS files that have not yet been copied using either your distro's solution or or distcp
  4. Run the replication status tool to get cluster state state:

    Code Block
    master$ cdap run co.cask.cdap.data.tools.ReplicationStatusTool -m -o /tmp/master_state
  5. Copy the master state onto your slave cluster cluster:

    Code Block
    master$ scp /tmp/master_state <slave-master><slave>:/tmp/master_state
  6. Verify replication has copied the required data on onto the slave slave:

    Code Block
    slave$ cdap run co.cask.cdap.data.tools.ReplicationStatusTool -i /tmp/master_state
    ...
    Master and Slave Checksums match. HDFS Replication is complete.
    HBase Replication is complete.
  7. Run Hive's metatool to update locations for Hive tables tables:

    Code Block
    slave$ hive --service metatool -updateLocation hdfs://[masterslave-namenode-host]:[masterslave-namenode-port] hdfs://[slavemaster-namenode-host]:[slavemaster-namenode-port] -tablePropKey avro.schema.url -serdePropKey avro.schema.url
  8. Start CDAP on the slave slave:

    Code Block
    slave$ cdap master start

...