CDAP Datastore SPI Design

Overview

With the goal of making CDAP a first class cloud citizen, CDAP 6.0 will remove hard dependency on Hadoop and HBase for its operation. Going forward, CDAP is intended to be run in environments with or without Hadoop. This requires that CDAP becomes storage engine agnostic for storing the CDAP system data. One way of achieving this is to define an SPI layer for storage, and then refactor CDAP to use the SPI layer for all store operations for system data. The right storage provider implementation for the environment can be injected at runtime, thus enabling CDAP to run on multiple storage environments.

In non-Hadoop environments, an RDBMS with a SQL interface is a good choice to replace HBase. A SQL based store is easily available in both the open source and on the various cloud providers. Also, a vast majority of users are familiar with the operation of a SQL based store. This document discusses the design of the storage SPI with respect to abstracting HBase and a SQL based store.

Level of Abstraction

There are two layers at which the datastore interactions can be abstracted at -

  • Dataset

In this approach, each dataset will have custom code for each storage backend. The advantage of this approach is that the dataset will get to use the raw APIs of the backend store, and hence can optimize heavily for performance.

However, the disadvantage is that this leads to logic duplication and can cause long term maintenance issues. Also adding a new storage backend will need a reimplementation of all the existing datasets.

  • A layer below the dataset

An SPI will define the basic operations that can be performed on the store - like create, write, scan, delete, etc. A dataset will then use the SPI to implement its business logic. Since the SPI provides a single interface for the underlying store, there needs to be only one implementation of the dataset no matter how many storage backends. Adding a new storage backend now only needs a new implementation for the SPI, the existing dataset implementations do not have to change.

However, using a single interface to define multiple backends means that only features common across the backends can be exposed through the SPI. This can lead to a less optimal usage of the underlying storage.

In the case for CDAP Datastore for system data, since the amount of system data is not going to be significantly large (about 10-15 million rows across all Datasets), the need for heavy optimization is minimal. Minimizing the code duplication across the 20 odd datasets will provide better returns.

SPI Specification

The Datastore SPI will expose the following abstractions -

  1. StructuredTable is the basic abstraction that the SPI will expose. All the data operations that the SPI supports will be centered around the StructuredTable abstraction. A StructuredTable is a logical entity that contains rows and columns.

  2. A TableAdmin is used to perform admin operations on a StructuredTable instance.

  3. A Transactional abstraction will be used to execute the StructuredTable data operations in a transaction.

Admin operations

TableAdmin SPI will provide operations to administer a StructuredTable. The following operations will be supported -

Create

A create operation will create a logical StructuredTable entity. The schema of the table has to be specified during the create operation.

TableSpecification spec = new TableSpecificationBuiler()
                          .withFields(
                             Fields.int(“k1”), 
                             Fields.string(“k2”), 
                             Fields.long(“k3”),
                             Fields.int(“c1”),
                             Fields.long(“c2”),
                             Fields.string(“c3”))
                          .withPrimaryKeys(“k1”, “k2”, “k3”)                          
                          .withIndexes(“c1”, “c2”)
                          .build();
TableId tableId = TableId.of(“table”);
tableAdmin.create(tableId, spec);

Columns / Fields

The schema for the columns of a table is fixed and has to be specified during creation. The schema can be changed later during an upgrade process. A table can have multiple columns, and all columns are nullable.

In the above example, the table is defined to have columns – “k1” of type int, “k2” of type string, “k3” of type long,  “c1” of type int, “c2” of type long and “c3” of type string. The following types are supported as column values – string, int, long, float and double.

Primary Key

A primary key has to be specified in the table specification during the creation. The primary key will be used to address a row in the table.

A primary key can be composed of multiple keys. All the keys of a composite key are non-nullable when the composite key is used during writes. When a partial composite key is specified during a scan, the leftmost keys of the composite key always have to be specified. The keys from the right can be dropped if needed. The order of the composite key is as defined in the table specification.

The following types can be used to compose a primary key – string, int and long.

In the above example, the composite key <”k1”, “k2”, “k3”> is specified as the primary key.

Index

One or more columns to index can be specified during the table creation. An index can contain only one column. During any data update operations, the corresponding indexes will be automatically updated. Indexes are not supported on a column that is part of a primary key.

In the above example, an index on column “c1” and another index on column “c2” are created.

Implementation notes

All the table definitions will be applied once during the startup of the master process. This ensures that the process can fail fast if the tables cannot be created. Also it makes writing upgrade logic simpler due to elimination of possible concurrent changes to table definitions from different services.

A physical table in the underlying store may not get created for every create operation. For example, in HBase a single physical table can be used to store all the logical tables for a namespace. The rows of various tables are distinguished by using the table name as the row key prefix.

Exists

Exists operation allows checking whether a StructuredTable has been created or not.

Note that this may return true even when there is no underlying physical table in the backend storage. This can happen when a single underlying table is shared between multiple StructuredTable instances.

boolean exists = tableAdmin.exists(tableId);

Get Table

Get is used to obtain a StructuredTable instance.

StructuredTable table = tableAdmin.get(tableId);

Data operations

All data operations will be performed on the StructuredTable SPI. All the data operations have to be done inside a transaction. The next section provides details on using transactions.

The following data operations are supported by the SPI -

Write

A write operation allows the user to add or update columns in a table.

StructuredTable appMeta = tableContext.get(tableId);
Collection<Field> fields = Arrays.asList(Fields.of(“k1”, 10),
                    Fields.of(“k2”, “kv2”), Fields.of(“k3”, 1000), 
                    Fields.of(“c1”, “cv1”));
appMeta.write(fields);

In the above example, the write operation writes a value of “cv1” to the column “c1” of a single row addressed by the full composite key <“k1”, “k2”, “k3”>. This method will throw an exception if the full key is not given.

There are not many use cases that write to more than one row in a single write call. Hence writing to multiple rows in a single write will not be supported as part of the SPI. This will have to be done as a combination of scan and write.

Schema check is done during a write. NoSuchFieldException will be thrown if there is a schema mismatch.

There may be cases where some keys have to be salted or reversed. The SPI does not have support for this, this will have to be done by the Datasets using the SPI.

Read / Scan

Data can be read from the table in a variety of ways – read a single row, scan a set of rows or scan rows using an index.

Read a single row

When the full composite key is specified, only one row corresponding to the key is returned.

StructuredTable appMeta = tableContext.get(tableId);
Collection<Field> keys = Arrays.asList(Fields.of(“k1”, 100), 
                    Fields.of(“k2”, “kv2”), Fields.of(“k3”, 1000));
Collection<String> cols = Arrays.asList(“c1”);
Optional<Row> row = appMeta.read(keys, cols);

In the above example, a single row with the given row key is returned. The columns to return can be specified by using the second parameter of the read method.

Read method will throw an exception if the full composite key is not specified. Schema check is done during read. NoSuchFieldException will be thrown if there is a schema mismatch.

Scan a set of rows

When a partial key range is specified, multiple rows that match the partial key range are returned. The partial key has to be specified starting from the leftmost keys to the right – in the order defined in the table specification.

StructuredTable appMeta = tableContext.get(tableId);
Range<Field> range = 
  Range.of(
   Arrays.asList(Fields.of(“k1”, 100), Fields.of(“k2”, “kv2”)), 
     INCLUSIVE,
   Arrays.asList(Fields.of(“k1”, 200), Fields.of(“k2”, “kv3”)), 
     EXCLUSIVE
  );
Filter filter = // see below for create filter syntax
int limit = 100;
CloseableIterator<Row> rows = appMeta.scan(range, limit, filter);

In the above example, the table is scanned with the partial key <”k1”, “k2”>. The rows that have keys that match the range are returned. In the above scan, the lower bound of the range is included and upper bound is excluded from the match.

In addition, the scan can be restricted using a filter, and the columns that are returned can be explicitly specified if all the columns are not required.

Scan with index

Rows can also be retrieved using an index. Only one index can be queried at any time in a scan operation.

StructuredTable appMeta = tableContext.get(tableId);
Field col = Fields.of(“c1”, “cv1”);
CloseableIterator<Row> rows = appMeta.scan(col, limit, filter);

In the above example, all rows that have column “c1” value as “cv1” are returned from the table.

Filters

Filters can be applied on the row keys, or on column values with simple data types. Filters on compound column values like a JSON string is not supported. In such a case, the field of the JSON will have to be added as a new column to support filtering on the field.

The following filters are supported in a scan -

  1. Equality

  2. Range

  3. Not

  4. Set contains

  5. Boolean combination of the above


Any other filters not in the above list will have to be implemented as a client side filter.

Filters can be specified as below -

// k2 > 100 AND (c1 != “abc” OR c2 in (“a”, “b”, “c”))
Filters.and(
  Filters.gt(“k2”, 100),
  Filters.or(
    Filters.eq(“c1”, “abc”).negate(),
    Filters.in(“c2”, Set.of(“a”, “b”, “c”))
  )
);
Sort

Sorting is supported on the composite key only. By default all the rows are sorted on the composite key in ascending order. In case there is a need for a reverse sort for a key (eg., time), then the dataset will have to reverse the key.

Delete

Delete operation is used to delete a single row using the composite key, or multiple rows using a partial composite key.

StructuredTable appMeta = tableContext.get(tableId);
Collection<Field> key = Arrays.asList(Fields.of(“k1”, 100), 
                         Fields.of(“k2”, “kv2”), Fields.of(“k3”, 10));
appMeta.delete(key);

In the above example, the delete operation uses the full key to delete a single row. This method will throw an exception if the full key is not given.

The deleteAll operation uses a partial key to delete all the rows that match the partial key. delelteAll operation may have to delete the rows in batch.

TODO: define deleteAll interface.

Transactions

Apache Tephra, used to implement transactions on HBase provides Optimistic Concurrency Control using Snapshot Isolation. Similar transactional behavior on an RDBMS can be achieved using Repeatable Read isolation level.

An interface similar to the existing Transactional and TxRunnable interfaces can be used

transactional.execute( 
  new TxRunnable() {
    void run(TableContext context) throws Exception {
      StructuredTable appMeta = context.get(tableId);
      appMeta.write(...);
  
      StructuredTable runs = context.get(runsTableId);
      runs.scan();
    }
  }
);

In the above example, a transaction is started before the run() method of TxRunnable is executed. After the run method is complete, the transaction is committed. If an exception is thrown in the run method then the transaction will be rolled back.

The Transactional object is created using a TransactionalFactory. The right implementation of the TransactionalFactory object is injected during runtime.

Transactions and Messaging

There are use-cases where any changes to the data of a Dataset has to be published as a message. Subscribers listen to the changes and take action based on it. Currently it is possible to publish the messages transactionally since both the messaging service and the Datasets use the same storage and transaction system. Publishing the message as part of the same transaction that also changes the data keeps everything consistent.

With the new architecture, where different storage engines can be used for messaging service and the Datasets, the same guarantees cannot be preserved out of the box.

TODO: define new semantics for transactions and messaging.

Implementation of SPI

There will be the following implementations of the SPI -

  1. LevelDB

  2. HBase

  3. Postgres SQL


The right implementation of the SPI will be loaded using AbstractExtensionLoader.

Future work

Admin operations

Get Specification

This operation is used to fetch the specification of a table that is defined during a create or an upgrade operation.

Schema upgrade

Changing the schema a table will be possible as long as the old and the new schemas are compatible.

Schema upgrade also may need data upgrade in some cases -

  1. Adding an index

  2. Creating a new column from a JSON field of an existing column


The data upgrade will happen in the background and there needs to be a way to track the progress of the upgrade. This will also need the dataset using the table to be aware of the old and the new schema versions. The dataset will need to work with both the schema versions until the upgrade is complete. The details need to be flushed out.

Delete

A delete operation will delete a table. Although the table may become invisible immediately, the data in the table may be deleted in background.