Decoupling of CDAP Metadata Store

Author: Andreas Neumann
Created: November 2018
Last updated: January 2019
Status: Draft
Main Document: Decoupling CDAP System Storage from Hadoop

Tl;dr

This documents details the design for the Metadata SPI for CDAP 6.x, which allows different implementations of the metadata backend, and details an implementation for the case when an ElasticSearch instance is available.

Motivation

The current CDAP (5.x) metadata store is built using CDAP tables and Tephra transactions. This has the following drawbacks:

  • CDAP Tables are backed by HBase, which is operationally complex. In addition, HBase is not available in non-Hadoop environments such as Cloud or Kubernetes.

  • Tephra Transactions are tightly coupled with HBase coprocessors and would be hard to implement for other storage engines that might substitute HBase.

  • Text search over metadata is a “poor man’s search”. It only supports simple word prefix match. However, a richer query functionality is required, such as

    • credit card transactions anonymized:true
    • sales transactions schema:zip
  • The current indexing is inefficient and search requires large table scans

  • The current storage layout only supports CDAP entities. In some scenarios, such as hybrid cloud or multi-cloud, the metadata store needs to be able to represent foreign entities, such as an on-prem RDBMS not managed by CDAP.

Arguably, two major efforts are required to address these shortcomings:

  1. Allow using a full-blown search engine for indexing and searching the metadata

  2. Improve the metadata representation to be more generic, capable of expressing unknown/foreign entities.

Choice of Search Engine

To use a search engine in CDAP, there are the several options:

  • Develop a (CDAP system) service that uses and embeds a search library.

  • Deploy a stand-alone search service such as Elasticsearch.

The following sections will discuss these options.

Develop a System Service

Developing a CDAP system service that embeds a search library and maintains its indexes in persistent storage has the advantage that it would work almost unmodified in Hadoop and in any non-Hadoop setup. Similar to other system services, the CDAP master will configure and manage this service, and CDAP will have full control over all its characteristics, similar to the existing Metadata Service in CDAP 5.x.

The most popular and mature open source search library is Lucene. It is truly a library with an API to index documents and to execute queries. The client that embeds Lucene must provide the storage (file system) to persist the indexes and implement all the RPC interfaces.

While this gives complete control over what the search engine does, the development effort is high, and a lot of that effort would go into features such as scale-out and availability that have been implemented in projects such as Solr and Elasticsearch - why reinvent the wheel?

Deploy a stand-alone search service

Assuming that there is an external search service that can store, index, and query CDAP’s metadata, CDAP can communicate with that search service via RPC. The search service can be managed by the customer outside of CDAP, or, when running in Kubernetes, the CDAP master can deploy it in Kubernetes. In both cases, CDAP will treat this service as an external service.

The two obvious choices for such a search service are Solr and Elasticsearch. Both are open source and built on top of Lucene, and their capabilities are comparable. However, Solr requires a Zookeeper to be installed, and it is not straightforward to run ZK in Kubernetes. Also, Elasticsearch is readily available on GKE and therefore preferable to Solr when running in GCP.

The advantage of this approach is that it can work well in Hadoop and non-Hadoop environments. On-prem users of Open Source CDAP can stand up an Elasticsearch instance or reuse an existing one, and connect CDAP to it. For Cloud or Kubernetes setups, the CDAP Master, or the agent that installs CDAP, can deploy an ElasticSearch for CDAP to use.    

Elasticsearch for CDAP Metadata

As detailed above, Elasticsearch is the most suitable (and only viable) implementation for a CDAP metadata store.

  • It is readily available on Kubernetes and many OSS customers already run Elasticsearch

  • Elasticsearch is scalable and operationally hardened by a large community

  • It is the solution with the least anticipated development effort.  

Design

Requirements

  1. Ability to configure different storage providers for Metadata storage, retrieval, and search

  2. Store and modify the metadata for an entity

  3. Retrieve the metadata for an entity, by lookup on entity id

  4. Find entities by searching their names, properties, descriptions and schema.

  5. Secure search that returns only what the user has access to.

Not in Scope

  1. Store and retrieve the history of an entity’s metadata

  2. Structured search across entities, especially parent/child and siblings. For example, search on the description of a dataset AND a property of one of its fields.

API

Entity Identification

In current CDAP 5.x, an entity is represented as an object of MetadataEntity, which uses an ordered sequence of key-value pairs, for example:

    namespace=myns,app=myapp,workflow=wf1

The Metadata SPI will also use this representation.  

Metadata Representation

Metadata consists of tags and properties, associated with an entity. Each property or tag has a scope: SYSTEM for system-generated metadata (for the most part, technical metadata), and USER for user-defined metadata (a.k.a. business metadata). In CDAP 5.x, the two metadata scopes were implemented as two separate tables, and the API was reflecting that design, in using a metadata record that can hold only the tags and properties of one scope. It would be better to hide that implementation detail and use a more generic metadata representation, as follows:

  class Metadata {
  MetadataEntity entity;
 List<Tag>      tags;
  List<Property> properties;
}
  class Tag {
  Scope scope;
  String name;
}
  class Property {
 Scope scope;
  String name;
  String value;
}
  enum Scope { SYSTEM, USER }

This has the following advantages:

  • Metadata from both scopes can be represented in a single object

  • Both tags and properties can be extended in the future with optional fields (e.g., creation date or modification-date)

  • If needed, more scopes can be added by extending the Scope enum, without further code changes

Manipulating Metadata

The scenarios in which metadata is created, changed or deleted are:

  • Retrieval of metadata: The system or an external client or user retrieves the metadata for an entity. In some cases, this client is interested only in tags or only in properties, and/or only in metadata in a specific scope.

          Metadata getMetadata(MetadataEntity entity, 
                     @Nullable Scope scope);
          List<Tag> getTags(MetadataEntity entity, 
                   @Nullable Scope scope);
          List<Property> getProperties(MetadataEntity entity, 
                             @Nullable Scope scope)

All of these methods may throw an IOException for errors caused by the underlying storage. The same will apply to all subsequent methods.

None of these methods ever return null. If no metadata is found, they return an empty metadata object, an empty list of tags, or an empty list of properties.

Why not throw a NotFoundException? That would mean that we can distinguish between an existing entity that has no metadata, and an entity that does not exist. However, the metadata store does not know what entities exist. It cannot rely on the clients (the CDAP system and users) to create every entity explicitly before adding metadata. Therefore this API is agnostic to the existence of entities: it only manages the metadata.  

  • Entity creation: When an entity is created, or replaced with a new version, the system generates technical metadata for this entity. Examples include:

    • The type and schema of a dataset

    • The tag batch for a MapReduce program

    • The creation time of the entity

  • Entity update: When an entity is created, this metadata is written for the first time. When the entity is updated, the system metadata is replaced. However, some of the original metadata needs to be preserved. In CDAP 5.x, preserved metadata is:

    • The creation time. This should never change - it is an unmodifiable property.

    • The description. If the newer version of entity, for whatever reason, lacks a description, the existing description is preserved. This is particularly useful if the description was set separately from entity creation.

Hence, for entity update, there needs to be a way to specify which metadata properties to preserve, and how.

For the same reason as discussed above, the metadata store is agnostic to what entities exist and what don’t. Therefore there will be a single API to create or replace the system metadata for an entity:

          class Directive {
Scope scope;
  String name;
  Action action;
  Kind kind;
}
          enum Action { KEEP | PRESERVE }
          enum Kind { PROPERTY | TAG }
          void replace(Metadata metadata, List<Directive>);

A directive specifies a scope and the name of a tag or property, and the way that this tag or property needs to be preserved. All existing properties or tags for the entity will be removed, unless they are contained in the new metadata, or there is a directive for them.

  • Entity deletion: When an entity is deleted, all system and user metadata for the entity is removed. Furthermore, if the entity has any sub-entities, then all metadata for the sub-entities must be deleted, too. In CDAP 5.x, the metadata system relies on the caller to ensure that all sub-entities have already been deleted along with their metadata. Hence the consistency of the metadata store relies on the behaviour of its clients. That is a suboptimal choice. It is better to enforce deletion of sub-entity metadata in the metadata store itself.

          void drop(MetadataEntity entity);
  • External metadata update: Setting, replacing, or removing properties and tags after the entity was created. This happens in a few use cases:

    • A user manipulates the tags and properties. This almost always applies only to scope USER, although there may be exceptional cases, where a user needs to correct system metadata, for example, the description.

    • The system updates a tag or property, for example, when a profile is assigned to a program, CDAP will add that profile name as a system property.

    • As a variation of the previous case, the system needs to update properties for many entities. For example, when a profile is assigned to a namespace, all programs and schedules in the namespace receive the profile name as a property value. For efficiency, the metadata store should offer a batch update.

For these kinds of operations the APIs are:

          addProperties(Scope, Map<String, String>);
setProperties(Scope, Map<String, String>);
removeProperties(Scope, List<String>);
addTags(Scope, List<String>);
setTags(Scope, List<String>);
removeTags(Scope, List<String>);

Batch write:

          addProperties(Scope, Map<EntityId, Map<String, String>);
  • Search. The current search API in CDAP 5.x is sufficient. However, the results will be returned in the form of Metadata objects. Details TBD.

Security

TBD