Overview
With the goal of making CDAP a first class cloud citizen, CDAP 6.0 will remove hard dependency on Hadoop and HBase for its operation. Going forward, CDAP is intended to be run in environments with or without Hadoop. This requires that CDAP becomes storage engine agnostic for storing the CDAP system data. One way of achieving this is to define an SPI layer for storage, and then refactor CDAP to use the SPI layer for all store operations for system data. The right storage provider implementation for the environment can be injected at runtime, thus enabling CDAP to run on multiple storage environments.
...
There are two layers at which the datastore interactions can be abstracted at -
- Dataset
In this approach, each dataset will have custom code for each storage backend. The advantage of this approach is that the dataset will get to use the raw APIs of the backend store, and hence can optimize heavily for performance.
However, the disadvantage is that this leads to logic duplication and can cause long term maintenance issues. Also adding a new storage backend will need a reimplementation of all the existing datasets.
- A layer below the dataset
An SPI will define the basic operations that can be performed on the store - like create, write, scan, delete, etc. A dataset will then use the SPI to implement its business logic. Since the SPI provides a single interface for the underlying store, there needs to be only one implementation of the dataset no matter how many storage backends. Adding a new storage backend now only needs a new implementation for the SPI, the existing dataset implementations do not have to change.
...
TableAdmin SPI will provide operations to administer a StructuredTable. The following operations will be supported -
Create
A create operation will create a logical StructuredTable entity. The schema of the table has to be specified during the create operation.
...
tableAdmin.create(tableId, spec);
Columns / Fields
The schema for the columns of a table is fixed and has to be specified during creation. The schema can be changed later during an upgrade process. A table can have multiple columns, and all columns are nullable.
In the above example, the table is defined to have columns – “k1” of type int, “k2” of type string, “k3” of type long, “c1” of type int, “c2” of type long and “c3” of type string. The following types are supported as column values – string, int, long, float and double.
Primary Key
A primary key has to be specified in the table specification during the creation. The primary key will be used to address a row in the table.
...
In the above example, the composite key <”k1”, “k2”, “k3”> is specified as the primary key.
Index
One or more columns to index can be specified during the table creation. An index can contain only one column. During any data update operations, the corresponding indexes will be automatically updated. Indexes are not supported on a column that is part of a primary key.
In the above example, an index on column “c1” and another index on column “c2” are created.
Implementation notes
All the table definitions will be applied once during the startup of the master process. This ensures that the process can fail fast if the tables cannot be created. Also it makes writing upgrade logic simpler due to elimination of possible concurrent changes to table definitions from different services.
A physical table in the underlying store may not get created for every create operation. For example, in HBase a single physical table can be used to store all the logical tables for a namespace. The rows of various tables are distinguished by using the table name as the row key prefix.
Exists
Exists operation allows checking whether a StructuredTable has been created or not.
...
boolean exists = tableAdmin.exists(tableId);
Get Table
Get is used to obtain a StructuredTable instance.
...
The following data operations are supported by the SPI -
Write
A write operation allows the user to add or update columns in a table.
...
There may be cases where some keys have to be salted or reversed. The SPI does not have support for this, this will have to be done by the Datasets using the SPI.
Read / Scan
Data can be read from the table in a variety of ways – read a single row, scan a set of rows or scan rows using an index.
Read a single row
When the full composite key is specified, only one row corresponding to the key is returned.
...
Read method will throw an exception if the full composite key is not specified. Schema check is done during read. NoSuchFieldException will be thrown if there is a schema mismatch.
Scan a set of rows
When a partial key range is specified, multiple rows that match the partial key range are returned. The partial key has to be specified starting from the leftmost keys to the right – in the order defined in the table specification.
...
In addition, the scan can be restricted using a filter, and the columns that are returned can be explicitly specified if all the columns are not required.
Scan with index
Rows can also be retrieved using an index. Only one index can be queried at any time in a scan operation.
...
In the above example, all rows that have column “c1” value as “cv1” are returned from the table.
Filters
Filters can be applied on the row keys, or on column values with simple data types. Filters on compound column values like a JSON string is not supported. In such a case, the field of the JSON will have to be added as a new column to support filtering on the field.
...
Filters.in(“c2”, Set.of(“a”, “b”, “c”))
)
);
Sort
Sorting is supported on the composite key only. By default all the rows are sorted on the composite key in ascending order. In case there is a need for a reverse sort for a key (eg., time), then the dataset will have to reverse the key.
Delete
Delete operation is used to delete a single row using the composite key, or multiple rows using a partial composite key.
...
The Transactional object is created using a TransactionalFactory. The right implementation of the TransactionalFactory object is injected during runtime.
Transactions and Messaging
There are use-cases where any changes to the data of a Dataset has to be published as a message. Subscribers listen to the changes and take action based on it. Currently it is possible to publish the messages transactionally since both the messaging service and the Datasets use the same storage and transaction system. Publishing the message as part of the same transaction that also changes the data keeps everything consistent.
...
Future work
Admin operations
Get Specification
This operation is used to fetch the specification of a table that is defined during a create or an upgrade operation.
Schema upgrade
Changing the schema a table will be possible as long as the old and the new schemas are compatible.
...
The data upgrade will happen in the background and there needs to be a way to track the progress of the upgrade. This will also need the dataset using the table to be aware of the old and the new schema versions. The dataset will need to work with both the schema versions until the upgrade is complete. The details need to be flushed out.
Delete
A delete operation will delete a table. Although the table may become invisible immediately, the data in the table may be deleted in background.
...