Overview
With the goal of making CDAP a first class cloud citizen, CDAP 6.0 will remove hard dependency on Hadoop and HBase for its operation (CDAP Storage Simplification). Going forward, CDAP is intended to be run in environments with or without Hadoop. This requires that CDAP becomes storage engine agnostic for storing the CDAP system data. One way of achieving this is to define an SPI layer for storage, and then refactor CDAP to use the SPI layer for all store operations for system data. The right storage provider implementation for the environment can be injected at runtime, thus enabling CDAP to run on multiple storage environments.
...
TableAdmin SPI will provide operations to administer a StructuredTable. The following operations will be supported -
Create
A create operation will create a logical StructuredTable entity. The schema of the table has to be specified during the create operation.
TableSpecification spec = new TableSpecificationBuiler()
.withFields(
Fields.int(“k1”),
Fields.string(“k2”),
Fields.long(“k3”),
Fields.int(“c1”),
Fields.long(“c2”),
Fields.string(“c3”))
.withPrimaryKeys(“k1”, “k2”, “k3”)
.withIndexes(“c1”, “c2”)
.build();
TableId tableId = TableId.of(“table”);
tableAdmin.create(tableId, spec);
Columns / Fields
The schema for the columns of a table is fixed and has to be specified during creation. The schema can be changed later during an upgrade process. A table can have multiple columns, and all columns are nullable.
In the above example, the table is defined to have columns – “k1” of type int, “k2” of type string, “k3” of type long, “c1” of type int, “c2” of type long and “c3” of type string. The following types are supported as column values – string, int, long, float and double.
Primary Key
A primary key has to be specified in the table specification during the creation. The primary key will be used to address a row in the table.
...
In the above example, the composite key <”k1”, “k2”, “k3”> is specified as the primary key.
Index
One or more columns to index can be specified during the table creation. An index can contain only one column. During any data update operations, the corresponding indexes will be automatically updated. Indexes are not supported on a column that is part of a primary key.
In the above example, an index on column “c1” and another index on column “c2” are created.
Implementation notes
All the table definitions will be applied once during the startup of the master process. This ensures that the process can fail fast if the tables cannot be created. Also it makes writing upgrade logic simpler due to elimination of possible concurrent changes to table definitions from different services.
A physical table in the underlying store may not get created for every create operation. For example, in HBase a single physical table can be used to store all the logical tables for a namespace. The rows of various tables are distinguished by using the table name as the row key prefix.
Exists
Exists operation allows checking whether a StructuredTable has been created or not.
Note that this may return true even when there is no underlying physical table in the backend storage. This can happen when a single underlying table is shared between multiple StructuredTable instances.
boolean exists = tableAdmin.exists(tableId);
Get Table
Get is used to obtain a StructuredTable instance.
StructuredTable table = tableAdmin.get(tableId);
Data operations
All data operations will be performed on the StructuredTable SPI. All the data operations have to be done inside a transaction. The next section provides details on using transactions.
The following data operations are supported by the SPI -
Write
A write operation allows the user to add or update columns in a table.
StructuredTable appMeta = tableContext.get(tableId);
Collection<Field> fields = Arrays.asList(Fields.of(“k1”, 10),
Fields.of(“k2”, “kv2”), Fields.of(“k3”, 1000),
Fields.of(“c1”, “cv1”));
appMeta.write(fields);
In the above example, the write operation writes a value of “cv1” to the column “c1” of a single row addressed by the full composite key <“k1”, “k2”, “k3”>. This method will throw an exception if the full key is not given.
...
Schema check is done during a write. NoSuchFieldException will be thrown if there is a schema mismatch.
There may be cases where some keys have to be salted or reversed. The SPI does not have support for this, this will have to be done by the Datasets using the SPI.
Read / Scan
Data can be read from the table in a variety of ways – read a single row, scan a set of rows or scan rows using an index.
Read a single row
When the full composite key is specified, only one row corresponding to the key is returned.
StructuredTable appMeta = tableContext.get(tableId);
Collection<Field> keys = Arrays.asList(Fields.of(“k1”, 100),
Fields.of(“k2”, “kv2”), Fields.of(“k3”, 1000));
Collection<String> cols = Arrays.asList(“c1”);
Optional<Row> row = appMeta.read(keys, cols);
In the above example, a single row with the given row key is returned. The columns to return can be specified by using the second parameter of the read method.
Read method will throw an exception if the full composite key is not specified. Schema check is done during read. NoSuchFieldException will be thrown if there is a schema mismatch.
Scan a set of rows
When a partial key range is specified, multiple rows that match the partial key range are returned. The partial key has to be specified starting from the leftmost keys to the right – in the order defined in the table specification.
StructuredTable appMeta = tableContext.get(tableId);
Range<Field> range =
Range.of(
Arrays.asList(Fields.of(“k1”, 100), Fields.of(“k2”, “kv2”)),
INCLUSIVE,
Arrays.asList(Fields.of(“k1”, 200), Fields.of(“k2”, “kv3”)),
EXCLUSIVE
);
Filter filter = // see below for create filter syntax
int limit = 100;
CloseableIterator<Row> rows = appMeta.scan(range, limit, filter);
In the above example, the table is scanned with the partial key <”k1”, “k2”>. The rows that have keys that match the range are returned. In the above scan, the lower bound of the range is included and upper bound is excluded from the match.
In addition, the scan can be restricted using a filter, and the columns that are returned can be explicitly specified if all the columns are not required.
Scan with index
Rows can also be retrieved using an index. Only one index can be queried at any time in a scan operation.
StructuredTable appMeta = tableContext.get(tableId);
Field col = Fields.of(“c1”, “cv1”);
CloseableIterator<Row> rows = appMeta.scan(col, limit, filter);
In the above example, all rows that have column “c1” value as “cv1” are returned from the table.
Filters
Filters can be applied on the row keys, or on column values with simple data types. Filters on compound column values like a JSON string is not supported. In such a case, the field of the JSON will have to be added as a new column to support filtering on the field.
...
Filters can be specified as below -
// k2 > 100 AND (c1 != “abc” OR c2 in (“a”, “b”, “c”))
Filters.and(
Filters.gt(“k2”, 100),
Filters.or(
Filters.eq(“c1”, “abc”).negate(),
Filters.in(“c2”, Set.of(“a”, “b”, “c”))
)
);
Sort
Sorting is supported on the composite key only. By default all the rows are sorted on the composite key in ascending order. In case there is a need for a reverse sort for a key (eg., time), then the dataset will have to reverse the key.
Delete
Delete operation is used to delete a single row using the composite key, or multiple rows using a partial composite key.
StructuredTable appMeta = tableContext.get(tableId);
Collection<Field> key = Arrays.asList(Fields.of(“k1”, 100),
Fields.of(“k2”, “kv2”), Fields.of(“k3”, 10));
appMeta.delete(key);
In the above example, the delete operation uses the full key to delete a single row. This method will throw an exception if the full key is not given.
The deleteAll operation uses a partial key to delete all the rows that match the partial key. delelteAll operation may have to delete the rows in batch.
TODO: define deleteAll interface.
Transactions
Apache Tephra, used to implement transactions on HBase provides Optimistic Concurrency Control using Snapshot Isolation. Similar transactional behavior on an RDBMS can be achieved using Repeatable Read isolation level.
An interface similar to the existing Transactional and TxRunnable interfaces can be used
transactional.execute(
new TxRunnable() {
void run(TableContext context) throws Exception {
StructuredTable appMeta = context.get(tableId);
appMeta.write(...);
StructuredTable runs = context.get(runsTableId);
runs.scan();
}
}
);
In the above example, a transaction is started before the run() method of TxRunnable is executed. After the run method is complete, the transaction is committed. If an exception is thrown in the run method then the transaction will be rolled back.
The Transactional object is created using a TransactionalFactory. The right implementation of the TransactionalFactory object is injected during runtime.
Transactions and Messaging
There are use-cases where any changes to the data of a Dataset has to be published as a message. Subscribers listen to the changes and take action based on it. Currently it is possible to publish the messages transactionally since both the messaging service and the Datasets use the same storage and transaction system. Publishing the message as part of the same transaction that also changes the data keeps everything consistent.
With the new architecture, where different storage engines can be used for messaging service and the Datasets, the same guarantees cannot be preserved out of the box.
TODO: define new semantics for transactions and messaging.
Implementation of SPI
There will be the following implementations of the SPI -
...
Future work
Admin operations
Get Specification
This operation is used to fetch the specification of a table that is defined during a create or an upgrade operation.
Schema upgrade
Changing the schema a table will be possible as long as the old and the new schemas are compatible.
...
The data upgrade will happen in the background and there needs to be a way to track the progress of the upgrade. This will also need the dataset using the table to be aware of the old and the new schema versions. The dataset will need to work with both the schema versions until the upgrade is complete. The details need to be flushed out.
Delete
A delete operation will delete a table. Although the table may become invisible immediately, the data in the table may be deleted in background.
...