...
If namespaces are to be migrated from one cluster to another cluster, both source and destination clusters should be running the CDAP with the same version.
Both clusters time need to have the same timestamp, Andreas: what is the timestamp of a cluster? be in sync, otherwise the information such as TTL for the stream will be invalidated.
All programs on the old CDAP cluster are stopped and no additional activity (such as ingesting the data to the stream) is happening on the old cluster.
There are no pending events in the flow queue before the migration process starts. It will be nice to have the support from the platform for draining the queue data.
We will not migrate any transient data such as stream state store Andreas: I would think that stream states have to migrated. Otherwisethe
new cluster will process all data again. andqueue
states. Migration will only be started once the existingdata
is processed.
Scope for 3.3:
In 3.3 we will focus on the migration of the datasets and streams associated with the applications.
Migration of the application consists of the following steps:
Ensure that there is no unprocessed data in the application. This will be manual process for 3.3.
Migrate the datasets and streams associated with the application.
Deploy the application in the new namespace.
We will not migrate the operational data such as metrics and logs as a part of 3.3.
Following are some possible use cases for the migrating the application datasets/streams:
Migrate the dataset X from namespace ‘NS1’ to namespace ‘NS2’. [NS1.X -> NS2.X]
Migrate the dataset X from ‘NS1’ to ‘NS2’ updating its name to Y. [NS1.X -> NS2.Y]
Make the copy of the dataset X in ‘NS1’. [NS1.X -> NS1.Y]
Once the application data is migrated, application will need to be deployed manually. If there are any changes in the dataset names, corresponding changes in the application are required. Migration of the application forms a baseline case for migrating the namespaces.
Migration of the HDFS directories:
CDAP stores FileSets and Streams in HDFS. We can figure out the filesets and the streams used in the namespace and can use distcp to migrate the corresponding HDFS files to the new cluster. Andreas: Can we preserve the file modification time? FileSets also support reading from the files which are managed by the external process. This allows accessing the existing locations outside of the CDAP. We will need to migrate those files as well, however doing so may cause conflicts if the file location already exists on the target cluster. Andreas: It is questionable whether that should be supported. If the files are managed by an outside process, then that process must start doing so in the new cluster, and that may be in a different location.
Migrating the user USER datasets:
User datasets (except FileSets) are stored as HBase tables. However we cannot directly copy those tables over to the target cluster. Every HBase cell can have multiple versions marked by the transaction-ids. These transaction-ids generated by the Transaction Manager on the old cluster may conflict with the transaction-ids generated on the new cluster. For e.g. Consider tx1 is the latest transaction id generated for the table on the old cluster. If we simply copy the table, then the copied table will also have transaction id as tx1. However the new cluster might have generated the transaction id tx1 and its invalidated. In this case, the copied data from the new table will not be visible on the new cluster.
Potential solutions:
Check if it is possible to inject CDAP HBase implementation in the CopyTable utility. Andreas: How does that help?
Utility | Description | Pros | Cons |
---|---|---|---|
CopyTable | CopyTable utility allows copying HBase tables across the clusters. |
|
|
Snapshots | Snapshot captures the state of the table. Using ExportSnapshot utility the snapshot can be copied to the destination cluster and then can be cloned to the table on the destination cluster. |
|
|
BulkLoad | Allows bulk loading of the HFiles. |
|
|
Custom MapReduce programs | The process would be -
|
|
|
Dataset Framework support | Dataset framework can support admin operations for exporting and importing the data. |
|
|
NOTE: So far, the custom MapReduce program seems promising solution, but this does not address the TTL. MapReduce program in source cluster will copy the HBase table data to the file. For the MapReduce program we can have DatasetInputFormat as the input format for the job. However using DatasetInputFormat has a limitation in which if the dataset used is custom dataset, we will need to pass the dataset jar as well. Another option is for platform to support the TableInputFormat which will simply read from the underlying HBase tables. (How to handle the datasets which are not BatchReadable?)
Overview of the process for migrating the HBase tables:
- User will need to provide the namespace name, app name, and dataset name as an input to the tool.
- Tool will then figure out the underlying HBase tables associated with the dataset by using the DatasetSpecifications.
- For each table associated with the dataset, tool will start the MapReduce program to read from the table and write to the HFiles.
In order to read the valid version from the HBase table, we will need to get the transaction from the Transaction Server.
Code Block // Instantiate the TransactionSystemClient private final TransactionSystemClient txClient; Injector injector = createInjector(CConfiguration.create(), HBaseConfiguration.create()); this.txClient = injector.getInstance(TransactionSystemClient.class); Transaction tx = txClient.startLong();
Transaction created in the above step will need to be pass to the Scan object while setting up the MapReduce job.
Code Block // Create instance of the TransactionCodec for encoding the transaction. TransactionCodec txCodec = new TransactionCodec(); // Configure the scan instance to be passed to the TableMapReduceUtil class with the transaction details Scan scan = new Scan(); scan.setAttribute("cask.tx", txCodec.encode(tx)); // Set up the Mapper job to use the scan we just created TableMapReduceUtil.initTableMapperJob(hbaseTableName, scan, Import.KeyValueImporter.class, null, null, job); // MapReduce job can then be configured to write to the HFiles using HFileOutputFormat HFileOutputFormat2.setOutputPath(job, bulkloadDir);
- Directory created after running the above MapReduce program will need to be manually copied to the destination cluster using distcp utility.
- On the destination cluster the HFiles can be loaded into the destination table using BulkUpload utility.
- Along with the tables we will also need to migrate the metadata associated with the datasets to the destination cluster.
CDAP stores FileSets and Streams in HDFS. We can figure out the filesets and the streams used in the namespace and can use distcp to migrate the corresponding HDFS files to the new cluster. Andreas: Can we preserve the file modification time? (SAGAR: distcp preserves the timestamp)
Andreas: What about partitioned file sets? How will the meta data be migrated? And how can the BatchPartitionConsumer's states be migrated? (SAGAR: TBD)
Solving namespace migration use cases with the application migration scheme (NOT FOR 3.3):
Migrate data from namespace ‘NS1’ on the old cluster to the namespace ‘NS2’ on the new cluster. ‘NS2’ can be on the same cluster or on the new cluster.
Code Block for each app : 'NS1'.applications for each dataset : app.datasets migrate the dataset either using distcp (if it is filesets) or using hbase table copy operation for each stream : app.streams migrate the stream files using distcp
Merging of the namespaces ‘NS1’ and ‘NS2’ into namespace ‘NS3’. Namespace ‘NS3’ can be on the same cluster or on the new cluster.
Code Block for each namespace 'NS' : Namespaces to merge for each app : 'NS'.applications-to-migrate for each dataset : app.datasets migrate the dataset either using distcp (if it is filesets) or using hbase table copy operation for each stream : app.streams migrate the stream files using distcp
Splitting of the namespace ‘NS1’ into multiple namespaces say ‘NS2’ and ‘NS3’. Namespaces ‘NS2’ and ‘NS3’ can exist on the existing cluster or on the new cluster.
Code Block // TBD