Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. If namespaces are to be migrated from one cluster to another cluster, both source and destination clusters should be running the CDAP with the same version.

  2. Both clusters time need to have the same timestampbe in sync, otherwise the information such as TTL for the stream will be invalidated.

  3. All programs on the old CDAP cluster are stopped and no additional activity (such as ingesting the data to the stream) is happening on the old cluster.

  4. There are no pending events in the flow queue before the migration process starts. It will be nice to have the support from the platform for draining the queue data.

  5. We will not migrate any transient data such as stream state store and queue states. Migration will only be started once the existing data is processed.

Scope for 3.3:

In 3.3 we will focus on the migration of the datasets and streams associated with the applications.

Migration of the application consists of the following steps:

  1. Ensure that there is no unprocessed data in the application. This will be manual process for 3.3.

  2. Migrate the datasets and streams associated with the application.

  3. Deploy the application in the new namespace.

  4. We will not migrate the operational data such as metrics and logs as a part of 3.3.

Following are some possible use cases for the migrating the application datasets/streams:

  1. Migrate the dataset X from namespace ‘NS1’ to namespace ‘NS2’. [NS1.X -> NS2.X]

  2. Migrate the dataset X from ‘NS1’ to ‘NS2’ updating its name to Y. [NS1.X -> NS2.Y]

  3. Make the copy of the dataset X in ‘NS1’. [NS1.X -> NS1.Y]

Once the application data is migrated, application will need to be deployed manually. If there are any changes in the dataset names, corresponding changes in the application would requireare requiredMigration of the application forms a baseline case for migrating the namespaces.

Migration of the HDFS directories:  

CDAP stores FileSets and Streams in HDFS. We can figure out the filesets and the streams used in the namespace and can use distcp to migrate the corresponding HDFS files to the new cluster. FileSets also support reading from the files which are managed by the external process. This allows accessing the existing locations outside of the CDAP. We will need to migrate those files as well, however doing so may cause conflicts if the file location already exists on the target cluster.

Migrating the user datasets:  

 Migrating the USER datasets:

User datasets (except FileSets) are stored as HBase tables. However we cannot directly copy those tables over to the target cluster. Every HBase cell can have multiple versions marked by the transaction-ids. These transaction-ids generated by the Transaction Manager on the old cluster may conflict with the transaction-ids generated on the new cluster. For e.g. Consider tx1 is the latest transaction id generated for the table on the old cluster. If we simply copy the table, then the copied table will also have transaction id as tx1. However the new cluster might have generated the transaction id tx1 and its invalidated. In this case, the copied data from the new table will not be visible on the new cluster.

Potential solutions:

  • Check if it is possible to inject CDAP HBase implementation in the CopyTable utility.


  • Write the custom MapReduce to copy the HBase user datasets. This MapReduce program will get the Transaction-Id from the Transaction Manager running on the new cluster.
    Possible approaches explored:

     

    UtilityDescriptionProsCons
    CopyTable

    CopyTable utility allows copying HBase tables across the clusters.

    • Simple to use
    • No issues with the TTL
    • Causes read load when reading from the source table.
    • Causes write load when writing to the destination table.
    • May not work when the hadoop distributions are wire-incompatible.
    • Not suitable for our use case since the timestamp of the cells are preserved. We need to pick up only valid rows from the source cluster and update the timestamp with valid transaction id on the destination cluster.
    Snapshots

    Snapshot captures the state of the table. Using ExportSnapshot utility the snapshot can be copied to the destination cluster and then can be cloned to the table on the destination cluster.

    • Simple to use
    • No issues with the TTL
    • Quick compare to the CopyTable.
    • Minimal read/write load, since it bypasses the HBase read/write paths.
    • Since snapshot need to be exported to the destination HDFS cluster, it still generates some I/O load.
    • HBase user needs read permission on the source HDFS cluster and write permission on the destination HDFS cluster.
    • Similar to the CopyTable, this does not solve our use case, since we need to pick up only valid rows from the source cluster and update the timestamp with valid transaction id on the destination cluster.
    BulkLoadAllows bulk loading of the HFiles.
    • No issues with the TTL
    • Bypasses the write path completely.
    • This only takes care of the uploading data to the new cluster. We still need some other utility to download the data from the old table.
    • Similar to other options, we won't be able to update the transaction id correctly while uploading.
    Custom MapReduce programs

    The process would be -

    • MapReduce program on the source cluster copies the data in the HBase table on HDFS filesystem in HFile or CSV file format.
    • The files will then be copied to the destination HDFS cluster.
    • MapReduce program on the destination cluster will read the file and store the data into the destination HBase table.

     

    • We will always read the data corresponding to the valid transactions.
    • We have control over with what transaction id the data on the destination table to be written.
    • TTL will not be honored.
    • Cause read load when reading from the source cluster.
    • Cause write load when writing to the destination cluster.
    • More work is involved, since this is custom implementation.
    Dataset Framework supportDataset framework can support admin operations for exporting and importing the data.
    • Cleaner solution.
    • TTL will not be honored.
    • If there is large amount of the data in the hbase table, then the export/import operation will be expensive.
    • More work is involved, since this is custom implementation.

     

    NOTE: So far, the custom MapReduce program seems promising solution, but this does not address the TTL. MapReduce program in source cluster will copy the HBase table data to the file. For the MapReduce program we can have DatasetInputFormat as the input format for the job. However using DatasetInputFormat has a limitation in which if the dataset used is custom dataset, we will need to pass the dataset jar as well. Another option is for platform to support the TableInputFormat which will simply read from the underlying HBase tables. (How to handle the datasets which are not BatchReadable?)

    Overview of the process for migrating the HBase tables:

    1. User will need to provide the namespace name, app name, and dataset name as an input to the tool.
    2. Tool will then figure out the underlying HBase tables associated with the dataset by using the DatasetSpecifications. 
    3. For each table associated with the dataset, tool will start the MapReduce program to read from the table and write to the HFiles.
    4. In order to read the valid version from the HBase table, we will need to get the transaction from the Transaction Server.

      Code Block
      // Instantiate the TransactionSystemClient
      private final TransactionSystemClient txClient;
      Injector injector = createInjector(CConfiguration.create(), HBaseConfiguration.create());
      this.txClient = injector.getInstance(TransactionSystemClient.class);
      Transaction tx = txClient.startLong();
    5. Transaction created in the above step will need to be pass to the Scan object while setting up the MapReduce job.

      Code Block
      // Create instance of the TransactionCodec for encoding the transaction.
      TransactionCodec txCodec = new TransactionCodec();
       
      // Configure the scan instance to be passed to the TableMapReduceUtil class with the transaction details
      Scan scan = new Scan();
      scan.setAttribute("cask.tx", txCodec.encode(tx));
       
      // Set up the Mapper job to use the scan we just created
      TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, Import.KeyValueImporter.class, null, null, job);
      
      // MapReduce job can then be configured to write to the HFiles using HFileOutputFormat
      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
    6. Directory created after running the above MapReduce program will need to be manually copied to the destination cluster using distcp utility.
    7. On the destination cluster the HFiles can be loaded into the destination table using BulkUpload utility.
    8.  Along with the tables we will also need to migrate the metadata associated with the datasets to the destination cluster.

    Migration of the HDFS directories:  

    CDAP stores FileSets and Streams in HDFS. We can figure out the filesets and the streams used in the namespace and can use distcp to migrate the corresponding HDFS files to the new cluster. Andreas: Can we preserve the file modification time? (SAGAR: distcp preserves the timestamp)

    Andreas: What about partitioned file sets? How will the meta data be migrated? And how can the BatchPartitionConsumer's states be migrated? (SAGAR: TBD)

    Solving namespace migration use cases with the application migration scheme (NOT FOR 3.3):

    1. Migrate data from namespace ‘NS1’ on the old cluster to the namespace ‘NS2’ on the new cluster. ‘NS2’ can be on the same cluster or on the new cluster.

      Code Block
       for each app : 'NS1'.applications
          for each dataset : app.datasets
             migrate the dataset either using distcp (if it is filesets) or using hbase table copy operation
        for each stream : app.streams
           migrate the stream files using distcp   


    2. Merging of the namespaces ‘NS1’ and ‘NS2’ into namespace ‘NS3’. Namespace ‘NS3’ can be on the same cluster or on the new cluster.

      Code Block
       for each namespace 'NS' : Namespaces to merge
          for each app : 'NS'.applications-to-migrate
             for each dataset : app.datasets
                migrate the dataset either using distcp (if it is filesets) or using hbase table copy operation
           for each stream : app.streams
              migrate the stream files using distcp   
    3. Splitting of the namespace ‘NS1’ into multiple namespaces say ‘NS2’ and ‘NS3’. Namespaces ‘NS2’ and ‘NS3’ can exist on the existing cluster or on the new cluster.

      Code Block
      // TBD