Author: Andreas Neumann
Created: October 2018
Last updated: November 2018
Status: Draft
Tl;dr
This document discusses the motivation for decoupling the CDAP system from Hadoop storage and Tephra transactions. It classifies all CDAP system storage into different types, based on common access patterns and suggests possible alternative implementations.
...
Summary of Current System Datasets
Entity Store - MDS | Purpose | Patterns | Suggestion |
app.meta | Key/Value Tables mapping entities to their JSON specs | CRUD, range scan, low volume | RDBM |
Run History | |||
app.meta | Run records, | Small “live set”, then read-only, get & scan | RDBM, separate from entity store |
Schedule Store | |||
schedulestore | Dataset to store Quartz schedules | Only by Quartz, maybe obsolete, CRUD | RDBM |
CDAP schedule store, indexed by trigger key | Put, get, index get | RDBM | |
job.queue | In-memory table for active scheduler jobs, backed by k/v table | Transient, put, update, get, scan | In-memory with sync to file system or DB |
Metrics and Logs | |||
metrics.v2.entity metrics.v3.table.ts.N | Metrics tables for entities and aggregates of N seconds | Time series: batch put, increment, scan | leveldb, external metrics system, influxdb? |
metrics.kafka.meta | Offsets for metrics topic consumers | Get/Put | RDBM |
log.meta | Offsets for Log Kafka topic consumers and log file meta data/index | Get/Put | RDBM |
Metadata | |||
v2.(business|system). | Indexed tables for system and business metadata | Full text index, search | Elasticsearch, Solr, embedded Lucene |
usage.registry | Records all dataset access by programs | RDBM or replace with lineage store | |
lineage. | Records all dataset access by programs, for lineage | Append, scan | RDBM |
Records operations performed by programs on the input/output fields | Append, get, index scan | RDBM | |
Miscellaneous | |||
routestore.kv | Store service routes, only in sandbox (ZK in distributed) | Deprecated: Remove | |
System Apps’ Datasets | |||
program.heartbeat | Dashboard & Reports | tbd | May not be needed once run records are in RDBM |
connections | Wranger/Dataprep | tbd | May need separate design |
HDFS-based | |||
Local Datasets | Pipelines | Batch file read/write | GCS, S3, transient HDFS |
Logs | Central log collection | Batch write, batch read | GCS, S3, persistent disks |
Artifacts | Jars containing apps, plugins, datasets | Entire file read. Write, copy (Object store) | GCS, S3, persistent disks |
Based on this, the following types of storage are required
A data store - for storing entities, specs, run records, simple metadata etc, in a CRUD fashion. The implementation will be SQL-based.
A searchable metadata store - for storing metadata (tags/properties, lineage) and searching metadata using structured or free-text search. This will be implemented with a search engine like Elasticsearch or Solr.
A medium to persist the scheduler’s in-memory job queue
A metrics store - for storing and retrieving metrics. Likely to be implemented using leveldb on persistent storage, reusing much of the existing sandbox code.
A persistent file system - for storing artifacts, logs, possibly metrics.
...
The SPI should encapsulate how a provider implements transactions. But at least one of the available providers will be the current, HBase or HDFS dataset based store, which needs a dataset framework and a transaction service client in order to function. How can these to-be-deprecated legacy services be made available to the storage provider without exposing them in the SPI?
CDAP’s current datasets require that the caller includes the dataset code in the classpath. For a dataset-based implementation, that means that HBase and HDFS, and hence a lot of Hadoop common libraries, must be in the classpath of CDAP system services. But when running in GKS, it is undesirable to include all the Hadoop dependencies. The CDAP master currently has no way of packaging different dependencies based on whether datasets are used or not - it simply assumes that datasets are needed and packages all system dataset types into each CDAP container. This will have to change.Storage provider implementations must be injected at runtime with classpath isolation. CDAP’s current mechanism for that are plugins. But plugins do not have a clean way to bind them to an interface that they implement. They rather specify a plugin type such as “jdbc” or “batchsource”, and the pipeline system understands these types. One option is to introduce new types (“system.entity” or “system.meta”) for the system SPIs. Ideally, these types are not visible to applications as they are only meant to be used by the CDAP system. Alternative, a mechanism similar to plugins could be introduced for SPIs.
Breakdown
The following high-level tasks need to be completed:
- Design the architecture for defining SPIs and injecting provider implementation at runtime
- Define the SPI for each storage type
- Refactor existing system datasets to use the SPIs
- Implement new storage providers for each runtime environment (Kubernetes, Google Cloud, AWS, Azure, etc.)