Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Author: Andreas Neumann
Created: October 2018
Last updated: November 2018
Status: Draft

Tl;dr

This document discusses the motivation for decoupling the CDAP system from Hadoop storage and Tephra transactions. It classifies all CDAP system storage into different types, based on common access patterns and suggests possible alternative implementations.  

...

Summary of Current System Datasets

Entity Store - MDS

Purpose

Patterns

Suggestion

app.meta
artifact.meta
datasets.instance
config.store.table
datasets.type
owner.meta

Key/Value Tables mapping entities to their JSON specs

CRUD, range scan, low volume

RDBM

Run History




app.meta
workflow.stats

Run records,
program status,
workflow tokens

Small “live set”, then read-only, get & scan

RDBM, separate from entity store

Schedule Store




schedulestore

Dataset to store Quartz schedules

Only by Quartz, maybe obsolete, CRUD

RDBM

schedule.store.it.d

schedule.store.it.i

CDAP schedule store, indexed by trigger key

Put, get, index get

RDBM

job.queue

In-memory table for active scheduler jobs, backed by k/v table

Transient, put, update, get, scan

In-memory with sync to file system or DB

Metrics and Logs




metrics.v2.entity

metrics.v3.table.ts.N

Metrics tables for entities and aggregates of N seconds

Time series: batch put, increment, scan

leveldb, external metrics system, influxdb?

metrics.kafka.meta

Offsets for metrics  topic consumers

Get/Put

RDBM

log.meta

Offsets for Log Kafka topic consumers and log file meta data/index

Get/Put

RDBM

Metadata




v2.(business|system).
metadata_index.(d|i)

Indexed tables for system and business metadata

Full text index, search

Elasticsearch, Solr, embedded Lucene

usage.registry

Records all dataset access by programs


RDBM or replace with lineage store

lineage.
access_registry

Records all dataset access by programs, for lineage

Append, scan

RDBM

fieldlineage.info

Records operations performed by programs on the input/output fields

Append, get, index scan

RDBM

Miscellaneous




routestore.kv

Store service routes, only in sandbox (ZK in distributed)


Deprecated: Remove

System Apps’ Datasets




program.heartbeat

Dashboard & Reports

tbd

May not  be needed once run records are in RDBM

connections
workspace.*
recipes

Wranger/Dataprep

tbd

May need separate design

HDFS-based




Local Datasets

Pipelines

Batch file read/write

GCS, S3, transient HDFS

Logs

Central log collection

Batch write, batch read

GCS, S3, persistent disks

Artifacts

Jars containing apps, plugins, datasets

Entire file read. Write, copy

(Object store)

GCS, S3, persistent disks


Based on this, the following types of storage are required

  1. A data store - for storing entities, specs, run records, simple metadata etc, in a CRUD fashion. The implementation will be SQL-based. 

  2. A searchable metadata store - for storing metadata (tags/properties, lineage) and searching metadata using structured or free-text search. This will be implemented with a search engine like Elasticsearch or Solr. 

  3. A medium to persist the scheduler’s in-memory job queue

  4. A metrics store - for storing and retrieving metrics. Likely to be implemented using leveldb on persistent storage, reusing much of the existing sandbox code.  

  5. A persistent file system - for storing artifacts, logs, possibly metrics. 

...

  • The SPI should encapsulate how a provider implements transactions. But at least one of the available providers will be the current, HBase or HDFS dataset based store, which needs a dataset framework and a transaction service client in order to function. How can these to-be-deprecated legacy services be made available to the storage provider without exposing them in the SPI?

  • CDAP’s current datasets require that the caller includes the dataset code in the classpath. For a dataset-based implementation, that means that HBase and HDFS, and hence a lot of Hadoop common libraries, must be in the classpath of CDAP system services. But when running in GKS, it is undesirable to include all the Hadoop dependencies. The CDAP master currently has no way of packaging different dependencies based on whether datasets are used or not - it simply assumes that datasets are needed and packages all system dataset types into each CDAP container. This will have to change.Storage provider implementations must be injected at runtime with classpath isolation. CDAP’s current mechanism for that are plugins. But plugins do not have a clean way to bind them to an interface that they implement. They rather specify a plugin type such as “jdbc” or “batchsource”, and the pipeline system understands these types. One option is to introduce new types (“system.entity” or “system.meta”) for the system SPIs. Ideally, these types are not visible to applications as they are only meant to be used by the CDAP system. Alternative, a mechanism similar to plugins could be introduced for SPIs.

To load the storage provider, the existing extension loader of CDAP can be used. It is already used to load the spark runtime, which also requires transactions and a dataset framework. Because each of the storage providers has its own SPI, a separate extension loader will be required for each SPI. 

Breakdown

The following high-level tasks need to be completed:

  • Design the architecture for defining SPIs and injecting provider implementation at runtime
  • Define the SPI for each storage type
  • Refactor existing system datasets to use the SPIs
  • Implement new storage providers for each runtime environment (Kubernetes, Google Cloud, AWS, Azure, etc.)