Decoupling CDAP System Storage from Hadoop

Author: Andreas Neumann
Created: October 2018
Last updated: November 2018
Status: Draft

Tl;dr

This document discusses the motivation for decoupling the CDAP system from Hadoop storage and Tephra transactions. It classifies all CDAP system storage into different types, based on common access patterns and suggests possible alternative implementations.  

Motivation

Traditionally (up to versions 5.x), CDAP has been squarely built on top of the Hadoop stack. It depends tightly on HDFS for storing application and plugin artifacts, on HBase for storing and indexing its meta data, and on both these for user-defined datasets. For HBase tables, a sophisticated (and quite complex) transaction system, Apache Tephra, provides ACID properties.

Recently, CDAP has seen more and more use with data outside the Hadoop ecosystem. For example, CDAP pipelines have been used to ingest data from a mainframe and write to a relational database. In such use cases, Hadoop is the platform on which CDAP and its pipelines execute. However, from the user’s perspective there is no need to have Hadoop for any purpose other than running CDAP.

Hadoop itself has proven to be operationally expensive and hard to manage; its footprint is large, and many users of Hadoop have moved on to Spark as the compute engine of choice - indeed, CDAP pipelines can run in Spark equally well as in Hadoop. This brings up the question whether Hadoop should be required for CDAP. In environments where Hadoop is not installed, the hurdle to adoption is great.

One such environment is the Cloud: Even though major Cloud providers support Hadoop (for example, Google’s Dataproc and Amazon’s EMR), the preferred operation is to have a short-lived Hadoop cluster to run some workloads over data in Cloud storage, and then tear down the cluster. In version 5.0, CDAP started supporting such scenarios. The user experience, however, is sub-optimal, because the user still needs to operate a full-blown Hadoop cluster to run CDAP.

To become a first-class Cloud citizen, CDAP needs to be able to operate without Hadoop. Decoupling from Hadoop involves many aspects, including storage, execution runtime, security and coordination. This document focuses on the decoupling of CDAP from Hadoop storage, while at the same time simplifying the implementation and reducing complexity of operation.

HBase and Transactions

One of the main sources of operational complexity in CDAP is HBase. All CDAP system datasets are based either on HBase tables or HDFS files, or a combination of both. While HDFS is “just” a file system, CDAP’s HBase tables are more tricky: Most of them are transactional tables, by the means of Apache Tephra, which provides ACID semantics for HBase using multi-version optimistic concurrency control (MV/OCC). While ACID is helpful in ensuring the consistency of the system metadata, Tephra is operationally difficult:

  • It relies strongly on HBase Coprocessors, which are a plugin mechanism to hook into the RPC endpoints of HBase. Coprocessors run inside HBase’s region servers without proper isolation, and any failure in a Coprocessor can bring down the entire server. This happens particularly after an HBase upgrade: Coprocessor APIs are considered “internal” and do not have the strict compatibility guarantees of semantic versioning. In fact, these APIs have historically changed with almost every HBase release in a incompatible way. The consequence is that after an upgrade, the region servers cannot load the coprocessors and fail, making the HBase table unavailable until manual intervention to upgrade the coprocessors to a compatible version. This has proven to be one of the most error-prone processes in CDAP operations.

  • The need for coprocessors also makes it hard to port Tephra to other database systems. Tephra would require a redesign in order to use it with, for example, Google’s BigTable.

  • Under failure conditions, Tephra generates “invalid” transactions. These are transactions that have failed but could not be rolled backed, hence their “dirty” writes remain in HBase and must be filtered from reads from then on. In combination with bad application code that consistently causes these failure scenarios, Tephra accumulates a significant number of invalid transactions (hundreds of thousands), to a degree that it seriously impacts the performance of data operations. Tephra has a way to “prune” these invalid transactions, but it is operationally complex and prone to fail.

For these reasons, decoupling CDAP from Hadoop as a storage medium should also remove the dependency on Apache Tephra, at least for system data: Is there a more lightweight approach for keeping the system meta data consistent? The answer depends on the characteristics of each of the different types of system metadata and how they are accessed.  

Explore

CDAP allows to explore user datasets using SQL. This is implemented by running a Hive Server in one CDAP’s system containers (the Explore Service). When CDAP runs outside of Hadoop, Hive will also not be available.

Note that a pipeline might still write to a sink that supports SQL. For example, in Google Cloud, any data stored in GCS or BigTable can be used with BigQuery. There is a possibility that a Dataset will still be able to register itself as an “external table” in BigQuery. However, this integration will have to be done by the dataset code, directly calling BigQuery APIs. CDAP will not have an explore service to facilitate that.

Querying can then happen directly through the interface (be it a command line or a UI) of that storage engine. However, the user experience would be disrupted, because the user would have to switch to a different UI/web page in order to query the data written by their pipelines. It would be more convenient for the user to be able to query directly from the CDAP UI. One possibility to achieve that is that a dataset can declare, as part of its properties, how it can be queried, for example, in the form of a JDBC driver and connection string. The role of CDAP’s explore service is then reduced to delegating all queries via JDBC.  

Architectural and Design Principles

The goal of this work is to make CDAP more portable across runtime environments: It should be able to run squarely on top of a Hadoop stack or a completely Hadoop-free environment (such as the Cloud). This document only addresses the storage aspects of this undertaking.

To achieve such portability, the CDAP Master needs to be able to use, dependent on what environment it is deployed in, different storage mediums for its system metadata and data. This requires that all system storage used by CDAP is abstracted behind a Storage Provider Interface (SPI). This SPI should not expose transactions to its clients - implementations of the SPI can rely on transactions to achieve consistency, but should not surface the details of the transaction system to the outside.

Also, for good operability, any implementation of the SPI should not rely on customization of the storage engines used. For example, the current OSS implementation relies on HBase coprocessors for transactions and efficient increments. This makes it operationally difficult if not unfeasible, because it assumes that CDAP has the authority to install co-processors. Therefore, this design focuses on implementations that solely rely on external APIs of a storage engine, but not on how these APIs are implemented or can be customized by a power-user.

Note that even though the current Hadoop-based implementation of the system storage does not comply with these design principles. For a transitional time, however, this implementation will continue to be supported, for compatibility with existing deployments. In fact, the first goal of this design is to introduce SPIs and refactor the current implementation to these SPIs without loss of functionality. The next step will then be to implement additional storage providers for new deployments that do not require Hadoop.  

High-Level Breakdown

The following section details the different types of data and their characteristics.  

Entity and Artifact Store

For each of the existing CDAP entities - such as namespaces, apps, programs, datasets, and artifacts - CDAP maintains a “specification” in a store named MDS (MetaDataStore). Note that even this is named “metadata”, it really only contains system properties and cannot represent true metadata such as tags, business metadata, or lineage. MDS is really more of a key/value store mapping entity IDs to JSON specs.  MDS is implemented as a transactional HBase table (with Apache Tephra as the transaction manager). Operations are mainly CRUD with a few exceptions that rely on key order for scanning efficiently (such as, list all datasets in a namespace).

As of CDAP 5.0, datasets (and dataset types and modules) are stored in a separate MDS table, managed by a separate service. As part of this refactoring, this will likely change and all entities will be stored in the same store.

In the MDS context, transactions are only used to protect from concurrent modification of the same data. Interestingly enough, each entity’s metadata is only ever modified by the same CDAP system service (for example, dataset metadata are only modified by the dataset service), and this protection could easily be achieved by an in-memory mechanism inside that service. Although transactions can also provide atomicity for multi-write operations, MDS does not make use of that capability: It starts a transaction for each mutation. One goal of refactoring MDS, however, is to correct that: If, for example, an app deployment creates an app, a few datasets and schedules, then either all or none of these should go through.

Some of the entities (namely, applications, plugins, and datasets) also rely on code artifacts. The artifacts are stored in HDFS, with references to the HDFS locations in the MDS entries.

Runtime/History Store

Any time a program runs, its lifecycle is recorded along with its run record. Both the lifecycle state and the run record evolve over the time of the execution. Once execution finishes, both of these will never change again (except that they may be removed to due TTL expiry). Mutations of this metadata are never applied directly; instead they are emitted to a message bus. A single CDAP service listens to these messages and sequentially applies all changes.

Metadata Store and Index

There are several metadata datasets, all of them are transactional tables. Conceptually, all metadata is emitted via message bus, and consumed and applied sequentially by the metadata service. (However, this is only half true - it will require some refactoring in CDAP to realize that concept.)

Usage

This dataset records every access to a dataset by a program. It is an append-only store. Reads typically scan the table and aggregate for each program, when and how often it accessed the dataset. It appears that this dataset is redundant, and there is a chance that it could be removed or merged with the Lineage dataset.

Lineage

For every run of a program, CDAP records what datasets are read and written by that run. This is an append-only dataset, and read access is similar to the usage dataset.

Field Lineage

For every run of a program, CDAP records the operations performed by that program on the fields of the inputs to process and compute the fields of the outputs. Similar to the lineage store, this is an append-only store. Each entry maps a program run id to a JSON blob representing all operations. For storage efficiency, this store is deduplicated by storing a checksum of the JSON in each row (most programs emit the exact same JSON for every run), and each checksum is mapped to the corresponding JSON in a separate row. Also, this store is indexed by various fields such as the input and output datasets.  

Metadata Dataset

The Metadata dataset stores and indexes all the system and business metadata (properties, tags and descriptions) for all entities in CDAP. In addition to the system properties that are derived from the MDS specs (see above), it stores business metadata: tags and properties explicitly defined by the user.

All metadata in this store is indexed for free text search. However, this search is not very powerful, as it is a home-grown implementation of tokenization and indexing in HBase. To make this search more powerful, it needs to be implemented with an actual text search engine such as Elastic or Solr.

Job Queue

The job queue is the working data set of the scheduler. Any job that is triggered by an event waits in the job queue for all its constraints to be fulfilled. Once that happens, the job is submitted for execution and permanently removed from the job queue.

That is, the job queue is a short-lived store that only contains transient state. It is important that this state is not lost when the scheduler crashes or restarts. One feasible approach to reimplement the job-queue is to keep it in memory, and persist snapshots to disk, with all mutations in-between snapshots persisted to a write-ahead log.

Operational Stores

Metrics

The metrics store is really a time series olap cube. It is implemented as a set of non-transactional HBase tables. With that, it does not rely on the Tephra coprocessors. However, it employs a different type of coprocessors to implement readless increments.

This store can possibly be replaced by integrating with a metrics system such as Stackdriver.     

Logs

Logs are stored as Avro files on the file system, with an index over these files (start and end time) in HBase tables. The log saver also maintains its offsets into the Kafka topics used for log collection in HBase. If CDAP integrates with a different log collection and aggregation system like Stackdriver, then these files and tables are not required any longer.  In OSS, this could remain in HBase or move to a pure file-based store.

Messaging

CDAP’s built-in messaging system (TMS) uses HBase tables to implement the message topic. It relies partly on Tephra and partly on its own coprocessors to implement transactional and order-preserving consumer semantics. Integrating CDAP with an external messaging system would remove the need for these tables.

File System

The CDAP system stores only few things in the file system:

  • Artifacts: Jar files containing the code for plugins and applications

  • Dataset Modules: Jar files containing the code of dataset types (only used for explore)

  • Logs: All logs collected by the log saver are stored as Avro files

  • Operational Dashboard & Reports, a system application, stores its reports in the file system.

  • The transaction service writes its state (snapshots and write-ahead logs) to the file system. This may not be needed in a cloud deployment where transactions are not supported.

  • Pipelines (that execute as workflows) write temporary files to “local datasets”, which are short-lived FileSet datasets on HDFS.

Currently this file system is HDFS, and it needs to be replaceable by a different file system, such as GCS or S3, or persistent attached disks.

System Apps

CDAP includes several system applications that depend on datasets. For example, Wrangler, Rules Engine, and Operational Dashboard & Reports. Currently, these datasets are transactional HBase tables, and they need to have alternative implementations in a non-Hadoop deployment. The design for this intersects largely with the redesign of these system apps.   

Summary of Current System Datasets

Entity Store - MDS

Purpose

Patterns

Suggestion

app.meta
artifact.meta
datasets.instance
config.store.table
datasets.type
owner.meta

Key/Value Tables mapping entities to their JSON specs

CRUD, range scan, low volume

RDBM

Run History




app.meta
workflow.stats

Run records,
program status,
workflow tokens

Small “live set”, then read-only, get & scan

RDBM, separate from entity store

Schedule Store




schedulestore

Dataset to store Quartz schedules

Only by Quartz, maybe obsolete, CRUD

RDBM

schedule.store.it.d

schedule.store.it.i

CDAP schedule store, indexed by trigger key

Put, get, index get

RDBM

job.queue

In-memory table for active scheduler jobs, backed by k/v table

Transient, put, update, get, scan

In-memory with sync to file system or DB

Metrics and Logs




metrics.v2.entity

metrics.v3.table.ts.N

Metrics tables for entities and aggregates of N seconds

Time series: batch put, increment, scan

leveldb, external metrics system, influxdb?

metrics.kafka.meta

Offsets for metrics  topic consumers

Get/Put

RDBM

log.meta

Offsets for Log Kafka topic consumers and log file meta data/index

Get/Put

RDBM

Metadata




v2.(business|system).
metadata_index.(d|i)

Indexed tables for system and business metadata

Full text index, search

Elasticsearch, Solr, embedded Lucene

usage.registry

Records all dataset access by programs


RDBM or replace with lineage store

lineage.
access_registry

Records all dataset access by programs, for lineage

Append, scan

RDBM

fieldlineage.info

Records operations performed by programs on the input/output fields

Append, get, index scan

RDBM

Miscellaneous




routestore.kv

Store service routes, only in sandbox (ZK in distributed)


Deprecated: Remove

System Apps’ Datasets




program.heartbeat

Dashboard & Reports

tbd

May not  be needed once run records are in RDBM

connections
workspace.*
recipes

Wranger/Dataprep

tbd

May need separate design

HDFS-based




Local Datasets

Pipelines

Batch file read/write

GCS, S3, transient HDFS

Logs

Central log collection

Batch write, batch read

GCS, S3, persistent disks

Artifacts

Jars containing apps, plugins, datasets

Entire file read. Write, copy

(Object store)

GCS, S3, persistent disks


Based on this, the following types of storage are required

  1. A data store - for storing entities, specs, run records, simple metadata etc, in a CRUD fashion. The implementation will be SQL-based. 

  2. A searchable metadata store - for storing metadata (tags/properties, lineage) and searching metadata using structured or free-text search. This will be implemented with a search engine like Elasticsearch or Solr. 

  3. A medium to persist the scheduler’s in-memory job queue

  4. A metrics store - for storing and retrieving metrics. Likely to be implemented using leveldb on persistent storage, reusing much of the existing sandbox code.  

  5. A persistent file system - for storing artifacts, logs, possibly metrics. 

Storage Provider Interfaces (SPI)

For each storage paradigm, all code in CDAP that uses this storage must be agnostic to the implementation of that paradigm: a Storage Provider Interface (SPI) serves as the abstraction between CDAP and storage implementations. CDAP will be configured, most likely through the cdap-site.xml,  as to which storage provider to use and what its parameters are. It will then inject that provider at runtime. This poses a couple of challenges:

  • The SPI should encapsulate how a provider implements transactions. But at least one of the available providers will be the current, HBase or HDFS dataset based store, which needs a dataset framework and a transaction service client in order to function. How can these to-be-deprecated legacy services be made available to the storage provider without exposing them in the SPI?

  • CDAP’s current datasets require that the caller includes the dataset code in the classpath. For a dataset-based implementation, that means that HBase and HDFS, and hence a lot of Hadoop common libraries, must be in the classpath of CDAP system services. But when running in GKS, it is undesirable to include all the Hadoop dependencies. The CDAP master currently has no way of packaging different dependencies based on whether datasets are used or not - it simply assumes that datasets are needed and packages all system dataset types into each CDAP container. This will have to change.

To load the storage provider, the existing extension loader of CDAP can be used. It is already used to load the spark runtime, which also requires transactions and a dataset framework. Because each of the storage providers has its own SPI, a separate extension loader will be required for each SPI. 

Breakdown

The following high-level tasks need to be completed:

  • Design the architecture for defining SPIs and injecting provider implementation at runtime
  • Define the SPI for each storage type
  • Refactor existing system datasets to use the SPIs
  • Implement new storage providers for each runtime environment (Kubernetes, Google Cloud, AWS, Azure, etc.)