Compute Cloud Support

Introduction

CDAP traditionally runs on a Hadoop YARN cluster and uses the compute resources in YARN to run all workloads, including CDAP master services as well as user programs. This monolithic design gave a relatively easy deployment and operation model, but its disadvantages are started to show, especially when more users are moving storage and computation to the Cloud.

Problems / Disadvantages of Current Design

Here are some of the major disadvantages with the current design:

  • CDAP system workload and user program can interfere each other, resulting in downtime and poor performance in some cases. Here are some of the cases we've encountered before:
    • Heavy load created by metrics system impeded user program operations on any Table dataset (including PFS)
    • Logs collected by CDAP is taking up both CPU and HDFS storage significantly if user program is emitting a lot of logs
    • Transaction system is shared between CDAP and user program, any user program misbehavior can seriously affect CDAP system (e.g. generating a lot of invalid transactions).
  • User programs can affect each other, since resources such as HDFS, HBase and CDAP transaction service are shared among all programs
  • Cannot leverage any remote cluster (cloud) resources for user programs
    • If the data storage is on cloud (e.g. s3), that means a lot of data need to transfer in/out of the cloud

Requirements and Design Principles

  • Flexible in launching runtime in any targeted cluster, whether it is cloud or on premise, same or different cluster as CDAP runs.
    • A CDAP instance can uses different clouds / clusters simultaneously for different program executions
  • Flexible in integration with different runtime framework (MR, Spark, Beam/DataFlow, whatever next...).
  • Runtime is autonomous.
    • Once launched, it manages every runtime aspect of a program execution.
      • Logs, metrics, metadata collection are done within that runtime
    • Runtime shouldn't initiate any communication to CDAP master
      • There are practical concerns about scalability and availability
        • Scalability
          • The scaling of the CDAP master should only based on number of concurrent program executions, rather than individual program logic
        • Availability
          • Although the CDAP master should support HA, it shouldn't be in the critical path of program execution. Meaning if CDAP master is down, programs already running should keep on running without any blockage or error
    • The runtime provides REST endpoint for the CDAP master to periodically poll the runtime for information updates
      • Program states, metadata, workflow tokens, etc
    • The runtime provides REST endpoint for CDAP master to control the runtime
      • Suspend / Termination
  • Real-time logs and metrics collection mechanism is pluggable (via common services like current CDAP, per program run, rely on the cloud provider, ...)
    • This also means can be disabled (no-op) based on launch time configuration.

Design

CDAP Architecture

 The guiding principles of the architecture is as follows:

  • A stable and scalable core platform that provides essential functionalities
    • An application API for Data Application development
    • Manages and operates Data Application
    • A central catalog for entities' metadata
  • Provide a well defined API to support addition capabilities
    • Enable fast iteration of new ideas
    • Allow running data / compute intensive jobs for the system (e.g. Spark, Elastic Search, etc.)
    • Individual extended system can be turn on / off independently

CDAP Core System

The major components of the CDAP core are as follows:

  • Metadata Catalog
    • Responsible for collecting, storing, serving and exporting metadata as defined by users and applications
  • Artifacts and Application Repository
    • Responsible for artifacts and applications deployment, update (versioning), removal and surfacing of those information
  • Runtime Manager
    • Responsible for all aspects about program lifecycle, including resource provisioning, execution, run records, etc.
  • Scheduler
    • Responsible for launching programs based on triggers and constraints
  • Transaction
    • Apache Tephra to provide transactional operations on HBase
    • #Transaction Service may not be available to application runtime, depending on the target execution cluster and we may eventually removing it
  • Metrics and Monitoring
    • Responsible for collecting and querying metrics
    • *May integrate with external metrics system in future
  • TMS
    • Provides messaging service for event based design to decouple components

CDAP Extended System

 

Metadata Management

This basically contains various services that manage CDAP metadata for artifacts, applications, program runs and dataset. There is no big changes architecturally to those services, except for the removal dataset service, which we will cover in later section.

Currently, all metadata storage is done through the Table abstraction, with various kinds of Store implementation wrapping around for access pattern and transaction management. The artifact manager also use the Location abstraction for storing artifacts. The following table shows different implementation we either support now or planned to support in the future.

TypeLocal SandboxYARN clusterCloud cluster
TableLevelDBHBase with TephraCloud columnar store / SQL like store
LocationLocal FSHDFSCloud blob storage

In long term, it is desirable not to have CDAP relies on YARN cluster in distributed mode, since YARN 

Dataset Service

The Dataset Service was first introduced since CDAP 2.2 (May 2014), mainly responsible for the following roles:

  • Dataset Modules and Types management
    • A dataset module is a jar with collection of dataset implementation
    • A dataset type defines the name to implementation mapping
      • A type can have dependency on another type, resulting in a module can have dependency on another module
  • Dataset Instance management
    • Maintain information that associates dataset instance to dataset type
    • Maintain dataset instance metadata (specifications and properties)
  • Execution of Dataset Administrative Operations
    • Create / Truncate / Delete / Upgrade of the underlying resources (e.g. HBase Table).
  • Explore Service Integration
    • On dataset instance creation, update and deletion

In the past four years, CDAP has been evolved a lot based on real use cases from our customers and POCs we've done. Here are what we've learned regarding the Dataset Service:

RoleLearning
Dataset Modules and Types Management
  • Very complicated, hard to understand dependency management
  • Never actually needed as there is no real use case for such complexity
  • Unresolved classloading problem when it involves multiple jars
  • Does not work well with dataset type defined inside plugin artifacts
Dataset Instance Management
  • Useful for providing default set of dataset properties, but could be replaced / superceded with preferences and runtime arguments
  • Dataset type association is useful only for core datasets, but becomes too restrictive when dataset instance is not tied with dataset type code.
    • For example, in data pipeline, it is quite common to have a "dataset" being defined as source / sink and the platform tracks lineage information of that,
      but there is no dataset type code since the data logic code is part of the plugin (e.g. InputFormat implementation).
    • Currently we have a ExternalDataset as a workaround. It defines a dataset implementation without data logic, but merely just to satisfy the dataset service.
Dataset Admin Operations
  • Was designed to handle impersonation before having the requirements on impersonation. Turns out being an over-engineered design.
  • With CDAP program impersonation support, those admin operations can be done directly from user program container if necessary.
Explore Service Integration
  • Hive specific queries are code inside Dataset Service, rather than exposing to dataset implementation to let dataset to make their own decision.
  • The integration is only with Hive (which is more a Explore problem than Dataset Service problem), making it not useful to integrate with other type of query engine (e.g. Big Query).

Based on what we've learned, Dataset Service yields more problems / restrictions than the benefits it bring. Also, the current implementation requires runtime talking to Dataset Service directly for all dataset related operations, which violates the Runtime is autonomous requirement as described above. Therefore, removing the Dataset Service is desirable for the future CDAP.

Given how deeply we relies on Dataset and Dataset Service, the removal of Dataset Service should be done in phases, together with changes in public API, REST API as well as dataset support.

Missing Features after Dataset Service Removal

The removal of Dataset Service will cause certain features in CDAP missing, but instead have to be done through application. Here is the list of them:

  • Creation of Dataset Instance through REST API with known Dataset Type
    • This is a seldomly used, if not unused, feature, which has little impact if removed
  • Deletion of Dataset Instance through REST API
    • Again, this is rarely used. Also, with the dataset instance management, this is not needed
  • Explore (Hive) integration
  • Dataset properties association
    • Can be managed via metadata manager

Dataset Changes

With the removal of Dataset Service, we need to define what is Dataset, as well as changes to CDAP API and impact to application.

Here are the API changes related to Dataset

Root TypeDescriptionsAPI Changes
DatasetConfigurer

For adding Dataset Module, Dataset Type and Dataset Instance creation at application deployment time

  • Removla of DatasetModule, DatasetDefinition and related classes
    • Not supporting Dataset module and type anymore
  • The properties provided via the createDataset method will be stored inside the application specification
    • No more centralized management of dataset
DatasetContext

For instantiation of Dataset instances based on the module, type and properties

  • The original set of getDataset methods will be replaced with one that takes an explicit Type object
    • This also allow the Dataset class is defined inside plugin artifact
      Error rendering macro 'jira' : Unable to locate Jira server for this macro. It may be due to Application Link configuration.
  • Each Dataset instance will have the DatasetContext instance injected in order to get embedding Dataset
  • Dataset properties will be coming from application specification, preferences, runtime arguments, and explicitly provided properties via API
DatasetManager

Basically a programmatic gateway to DatasetService for administrative operations

  • Create, Upgrade, Truncate, Delete, Exists
  • Metadata retrival (type, properties)
  • Removal DatasetManager and related classes
  • Dataset implementation itself is (optionally) responsible for admin operations
    • Performed directly from user program
  • Underlying resources can also be managed separately, outside of CDAP
  • Core Dataset will be methods / classes exposed for admin operations
Core Dataset Interfaces

Collections of common data usage pattern via interfaces

  • Provides abstraction over actual implementation (mainly standalone vs distributed)

Pretty much will stay the same with the following modifications

  • Expose methods / classes to perform administrative opertations
  • Have SPI contract for cloud provider implementation to provide cloud native Dataset implementation (more below)
    • e.g. Table -> BigTable, FileSet -> Blog Storage

Runtime Management

The Runtime Management has two main parts, the Mission Control and the Program Runtime. At high level, the interaction looks like the following diagram:

As shown from the diagram above, there are four main components related to program exeuction. All of them involve interacting with the target runtime environment, hence need to be extensible / customizable based on the runtme environment. CDAP will have a Runtime Extension SPI defined for the purpose.

ComponentExtension
Provisioner
  • Provision cluster from the Cloud Provider (CP)
  • Remove cluster from the Cloud Provider (CP) on execution completion / timeout
  • Retrive clusters information from CP for reporting purpose
Program Launcher
  • Prepare and launch the CDAP runtime on the provisioned cluster
  • Varies based on the target cluster configurations (firewall / authentication, YARN, Kubernetes, etc)
Runtime Monitor
  • Interact with the runtime cluster to monitor the cluster application and containers status
  • Similar to Program Launcher, it varies based on target cluster configurations
  • Perform heartbeat call with the Program Runtime. The direction of connection might varies
Program Runtime
  • Runs on the runtime cluster and interact with the cluster manager for resources
  • Varies based on the cluster manager (YARN, Kubernetes)
  • Declares the log collection mechanism
    • CDAP will provide an extensible logging framework for collection and retrieval
    • Program Runtime extension just choose among supported logging framework and provide configurations

Each of the Runtime Management components will be described with more details in the following sections.

Mission Control

The Mission Control consists of the Provisioner, the Program Launcher, and the Runtime Monitor. It is responsible for the whole lifecycle of a program execution.

For operation simplicity and scalability, the Provisioner, the Program Launcher, and the Program Runtime will be running in the same JVM process, meaning they will be scaled as one unit. In current CDAP 4, minus the provisioner, the role is mainly fulfilled by the ProgramRuntimeService, which can only runs inside the CDAP master. In order to path the way for scaling out the Mission Control in future, we should have a logical separation between the Mission Control and the CDAP master. The role of the ProgramRuntimeService will be simplified to:

  1. Loads the program runtime data from the metadata store
    • Artifact information, runtime arguments, preferences, runtime profile, etc.
  2. Generates a RunId and publish it together with the program runtime data to TMS (start program request)
    • This is essentially taking a snapshot of all information needed for the program execution

This essentially make the ProgramRuntimeService becomes a stateless library and can be used in anywhere. For example, it can be called from http handler or from scheduler, and they could be running in different processes (which is needed as we need to scale and have HA for the scheduler in future).

The Missions Control will be polling for program start message from TMS; upon receviing a new message, it will:

  1. The Provisioner picks up the start program request and starts the provision logic
  2. When provisioning completed, it notifies the Program Launcher to start the program in the cluster
    • The notification can simply done with in-memory method call, via some interface contract, assuming the Provisioner and the Program Launcher always running in the same process
    • The notification should includes all the program runtime data plus the unique identifier for the execution cluster
  3. The Program Launcher starts the Program Runtime in the given cluster
    • Different (cloud) clusters might need different ways of starting Program Runtime
  4. The Program Runtime setup the CDAP runtime environment in order to provide CDAP services to the program (more on this in later section)
  5. The Program Runtime launch the user program via ProgramRunner and manage the program lifecycle
  6. On the Mission Control side, after the Program Launcher started the Program Runtime (step 5), it notifies the Runtime Monitor to start monitoring the running program
    • Similar to above, the notification can be done with method call, providing the state has been persisted
      • If the process failed, upon restart the Runtime Monitor will just read from the state store and resumes monitoring
      • The Run Record Corrector is part of the Runtime Monitor
    • The Runtime Monitor maintains a heartbeat with the Program Runtime to get the latest states update (more on this in later section)
  7. On the completion of program execution, the Program Runtime will persist the final states into cloud storage
    1. This could be optional, depending on how much information are needed to be retained if there is failure (more on this in later section)
    2. The Runtime Monitor will fetch the final states (or already get it via the last heartbeat) and update the program states

Provisioner

The provisioner is the component that intereacts with cloud providers for resources provisioning and de-provisioning. Generally it has the following properties:

  • Resilient to failures
    • CDAP master interruptions, including restart, failover, upgrade, etc.
    • Cloud provider interruptions, which can be caused by network outage, disconnection or cloud provider availability issues.
    • Generally speak, the provisioner progress might be halt during service interruptions, but should be able to recover and continue correctly after interruptions.
    • Generally it is handled with intent log and retry
      • First log (persist) the intented action before performing the action
      • Based on intent log, the action will be retried upon failure, even after process termination and restart
        • The retry should contain logic to validate if the action has been performed or not by consulting the source of truth
      • Once the action completed, log the completion state as well as the next intended action (if any).
  • Pluggable and Extensible
    • It is defined as part of the Runtime Extension SPI
  • Horizontally Scalable
    • A CDAP instance could be launching thousands or even more programs at the same time

Detail design is in CDAP Provisioning

Program Launcher

The Program Launcher is a relatively simple. It is responsible to launch the CDAP runtime in the cluster being provisioned by the Provisioner, which involves

  1. Gather CDAP runtime jars, program artifacts, program runtime data, configurations, etc.
  2. Launch the CDAP runtime on the target runtime cluster with the information gathered in step 1
    • This is a cluster specific step, but generally involves files localization and launching the CDAP runtime JVM on the cluster. Here are some possibilities:
      • ssh + scp files and running command
      • Talks to YARN RM directly via the RM Client protocol for localization as well as launching YARN application
      • Build and publish a docker image (localization) and interacts with Kubernetes master to launch CDAP runtime in container

Runtime Monitor

The Runtime Monitor collects information emitted by the running program, including

  • Program lifecycle states
    • Starting, Running, Suspended, Completed, Killed, Failed
  • Metadata
    • Lineage, metadata, workflow tokens, 

 

Program Runtime

The Program Runtime is the CDAP component running inside the target execution (cloud) environment. It is responsible for

  • Interact with the cluster resource manager to acquire resources and execution of the user program
  • Manage the lifecycle of that particular program run inside the cluster
  • Response to handle heartbeat requests from the CDAP master
  • Provides a local TMS service for metadata and metrics collection

CDAP Applications

MMDS

Wrangler

Data Pipeline

Random notes (WIP):

Data Fabric

Dataset Service

Dataset service is responsible for the following operations

  • Dataset modules and types management
    • Add / remove of modules and types
  • Dataset instance management
    • Creation
      • Association of dataset instance and dataset properties
      • Creation / Acquisition of underlying storage resource
      • (Optional) creation of the corresponding explore (hive) table

 

Transaction

Core Datasets

Table

File Set and Partitioned File Set

Streams

Audit / Lineage / Metadata collection

Operations

Real-time logs

Real-time metrics and cross runs aggregation

TMS