Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Overview

With the goal of making CDAP a first class cloud citizen, CDAP 6.0 will remove hard dependency on Hadoop and HBase for its operation (CDAP Storage Simplification). Going  Going forward, CDAP is intended to be run in environments with or without Hadoop. This requires that CDAP becomes storage engine agnostic for storing the CDAP system data. One way of achieving this is to define an SPI layer for storage, and then refactor CDAP to use the SPI layer for all store operations for system data. The right storage provider implementation for the environment can be injected at runtime, thus enabling CDAP to run on multiple storage environments.

...

TableAdmin SPI will provide operations to administer a StructuredTable. The following operations will be supported -

Create

A create operation will create a logical StructuredTable entity. The schema of the table has to be specified during the create operation.

...

tableAdmin.create(tableId, spec);

Columns / Fields

The schema for the columns of a table is fixed and has to be specified during creation. The schema can be changed later during an upgrade process. A table can have multiple columns, and all columns are nullable.

In the above example, the table is defined to have columns – “k1” of type int, “k2” of type string, “k3” of type long,  “c1” of type int, “c2” of type long and “c3” of type string. The following types are supported as column values – string, int, long, float and double.

Primary Key

A primary key has to be specified in the table specification during the creation. The primary key will be used to address a row in the table.

...

In the above example, the composite key <”k1”, “k2”, “k3”> is specified as the primary key.

Index

One or more columns to index can be specified during the table creation. An index can contain only one column. During any data update operations, the corresponding indexes will be automatically updated. Indexes are not supported on a column that is part of a primary key.

In the above example, an index on column “c1” and another index on column “c2” are created.

Implementation notes

All the table definitions will be applied once during the startup of the master process. This ensures that the process can fail fast if the tables cannot be created. Also it makes writing upgrade logic simpler due to elimination of possible concurrent changes to table definitions from different services.

A physical table in the underlying store may not get created for every create operation. For example, in HBase a single physical table can be used to store all the logical tables for a namespace. The rows of various tables are distinguished by using the table name as the row key prefix.

Exists

Exists operation allows checking whether a StructuredTable has been created or not.

...

boolean exists = tableAdmin.exists(tableId);

Get Table

Get is used to obtain a StructuredTable instance.

...

The following data operations are supported by the SPI -

Write

A write operation allows the user to add or update columns in a table.

...

There may be cases where some keys have to be salted or reversed. The SPI does not have support for this, this will have to be done by the Datasets using the SPI.

Read / Scan

Data can be read from the table in a variety of ways – read a single row, scan a set of rows or scan rows using an index.

Read a single row

When the full composite key is specified, only one row corresponding to the key is returned.

...

Read method will throw an exception if the full composite key is not specified. Schema check is done during read. NoSuchFieldException will be thrown if there is a schema mismatch.

Scan a set of rows

When a partial key range is specified, multiple rows that match the partial key range are returned. The partial key has to be specified starting from the leftmost keys to the right – in the order defined in the table specification.

...

In addition, the scan can be restricted using a filter, and the columns that are returned can be explicitly specified if all the columns are not required.

Scan with index

Rows can also be retrieved using an index. Only one index can be queried at any time in a scan operation.

...

In the above example, all rows that have column “c1” value as “cv1” are returned from the table.

Filters

Filters can be applied on the row keys, or on column values with simple data types. Filters on compound column values like a JSON string is not supported. In such a case, the field of the JSON will have to be added as a new column to support filtering on the field.

...

    Filters.in(“c2”, Set.of(“a”, “b”, “c”))
  )
);
Sort

Sorting is supported on the composite key only. By default all the rows are sorted on the composite key in ascending order. In case there is a need for a reverse sort for a key (eg., time), then the dataset will have to reverse the key.

Delete

Delete operation is used to delete a single row using the composite key, or multiple rows using a partial composite key.

...

The Transactional object is created using a TransactionalFactory. The right implementation of the TransactionalFactory object is injected during runtime.

Transactions and Messaging

There are use-cases where any changes to the data of a Dataset has to be published as a message. Subscribers listen to the changes and take action based on it. Currently it is possible to publish the messages transactionally since both the messaging service and the Datasets use the same storage and transaction system. Publishing the message as part of the same transaction that also changes the data keeps everything consistent.

...

Future work

Admin operations

Get Specification

This operation is used to fetch the specification of a table that is defined during a create or an upgrade operation.

Schema upgrade

Changing the schema a table will be possible as long as the old and the new schemas are compatible.

...

The data upgrade will happen in the background and there needs to be a way to track the progress of the upgrade. This will also need the dataset using the table to be aware of the old and the new schema versions. The dataset will need to work with both the schema versions until the upgrade is complete. The details need to be flushed out.

Delete

A delete operation will delete a table. Although the table may become invisible immediately, the data in the table may be deleted in background.

...