Introduction
CDAP traditionally runs on a Hadoop YARN cluster and uses the compute resources in YARN to run all workloads, including CDAP master services as well as user programs. This monolithic design gave a relatively easy deployment and operation model, but its disadvantages are started to show, especially when more users are moving storage and computation to the Cloud.
Problems / Disadvantages of Current Design
Here are some of the major disadvantages with the current design:
- CDAP system workload and user program can interfere each other, resulting in downtime and poor performance in some cases. Here are some of the cases we've encountered before:
- Heavy load created by metrics system impeded user program operations on any Table dataset (including PFS)
- Logs collected by CDAP is taking up both CPU and HDFS storage significantly if user program is emitting a lot of logs
- Transaction system is shared between CDAP and user program, any user program misbehavior can seriously affect CDAP system (e.g. generating a lot of invalid transactions).
- User programs can affect each other, since resources such as HDFS, HBase and CDAP transaction service are shared among all programs
- Cannot leverage any remote cluster (cloud) resources for user programs
- If the data storage is on cloud (e.g. s3), that means a lot of data need to transfer in/out of the cloud
Requirements and Design Principles
- Flexible in launching runtime in any targeted cluster, whether it is cloud or on premise, same or different cluster as CDAP runs.
- A CDAP instance can uses different clouds / clusters simultaneously for different program executions
- Flexible in integration with different runtime framework (MR, Spark, Beam/DataFlow, whatever next...).
- Runtime is autonomous.
- Once launched, it manages every runtime aspect of a program execution.
- Logs, metrics, metadata collection are done within that runtime
- Runtime shouldn't initiate any communication to CDAP master
- There are practical concerns about scalability and availability
- Scalability
- The scaling of the CDAP master should only based on number of concurrent program executions, rather than individual program logic
- Availability
- Although the CDAP master should support HA, it shouldn't be in the critical path of program execution. Meaning if CDAP master is down, programs already running should keep on running without any blockage or error
- Scalability
- There are practical concerns about scalability and availability
- The runtime provides REST endpoint for the CDAP master to periodically poll the runtime for information updates
- Program states, metadata, workflow tokens, etc
- The runtime provides REST endpoint for CDAP master to control the runtime
- Suspend / Termination
- Once launched, it manages every runtime aspect of a program execution.
- Real-time logs and metrics collection mechanism is pluggable (via common services like current CDAP, per program run, rely on the cloud provider, ...)
- This also means can be disabled (no-op) based on launch time configuration.
Design
Generally speaking, the CDAP master consists of two main roles, Metadata Management and Runtime Management. There are other services provided as part of the master, such as transaction, stream, logs, metrics and explore. We'll be covering those services in later section.
Metadata Management
This basically contains various services that manage CDAP metadata for artifacts, applications, program runs and dataset. There is no big changes architecturally to those services, except for the removal dataset service, which we will cover in later section.
Currently, all metadata storage is done through the Table
abstraction, with various kinds of Store
implementation wrapping around for access pattern and transaction management. The artifact manager also use the Location
abstraction for storing artifacts. The following table shows different implementation we either support now or planned to support in the future.
Type | Local Sandbox | YARN cluster | Cloud cluster |
---|---|---|---|
Table | LevelDB | HBase with Tephra | Cloud columnar store / SQL like store |
Location | Local FS | HDFS | Cloud blob storage |
In long term, it is desirable not to have CDAP relies on YARN cluster in distributed mode, since YARN
Dataset Service
The Dataset Service was first introduced since CDAP 2.2 (May 2014), mainly responsible for the following roles:
- Dataset Modules and Types management
- A dataset module is a jar with collection of dataset implementation
- A dataset type defines the name to implementation mapping
- A type can have dependency on another type, resulting in a module can have dependency on another module
- Dataset Instance management
- Maintain information that associates dataset instance to dataset type
- Maintain dataset instance metadata (specifications and properties)
- Execution of Dataset Administrative Operations
- Create / Truncate / Delete / Upgrade of the underlying resources (e.g. HBase Table).
- Explore Service Integration
- On dataset instance creation, update and deletion
In the past four years, CDAP has been evolved a lot based on real use cases from our customers and POCs we've done. Here are what we've learned regarding the Dataset Service:
Role | Learning |
---|---|
Dataset Modules and Types Management |
|
Dataset Instance Management |
|
Dataset Admin Operations |
|
Explore Service Integration |
|
Based on what we've learned, Dataset Service yields more problems / restrictions than the benefits it bring. Also, the current implementation requires runtime talking to Dataset Service directly for all dataset related operations, which violates the Runtime is autonomous requirement as described above. Therefore, removing the Dataset Service is desirable for the future CDAP.
Given how deeply we relies on Dataset and Dataset Service, the removal of Dataset Service should be done in phases, together with changes in public API, REST API as well as dataset support.
Missing Features after Dataset Service Removal
The removal of Dataset Service will cause certain features in CDAP missing, but instead have to be done through application. Here is the list of them:
- Creation of Dataset Instance through REST API with known Dataset Type
- This is a seldomly used, if not unused, feature, which has little impact if removed
- Deletion of Dataset Instance through REST API
- Again, this is rarely used. Also, with the dataset instance management, this is not needed
- Explore (Hive) integration
- Dataset properties association
- Can be managed via metadata manager
Dataset Changes
With the removal of Dataset Service, we need to define what is Dataset, as well as changes to CDAP API and impact to application.
Here are the API changes related to Dataset
Root Type | Descriptions | API Changes |
---|---|---|
DatasetConfigurer | For adding Dataset Module, Dataset Type and Dataset Instance creation at application deployment time |
|
DatasetContext | For instantiation of Dataset instances based on the module, type and properties |
|
DatasetManager | Basically a programmatic gateway to DatasetService for administrative operations
|
|
Core Dataset Interfaces | Collections of common data usage pattern via interfaces
| Pretty much will stay the same with the following modifications
|
Runtime Management
The Runtime Management has two main parts, the Mission Control and the Program Runtime. At high level, the interaction looks like the following diagram:
As shown from the diagram above, there are four main components related to program exeuction. All of them involve interacting with the target runtime environment, hence need to be extensible / customizable based on the runtme environment. CDAP will have a Runtime Extension SPI defined for the purpose.
Component | Extension |
---|---|
Provisioner |
|
Program Launcher |
|
Runtime Monitor |
|
Program Runtime |
|
Each of the Runtime Management components will be described with more details in the following sections.
Mission Control
The Mission Control consists of the Provisioner, the Program Launcher, and the Runtime Monitor. It is responsible for the whole lifecycle of a program execution.
For operation simplicity and scalability, the Provisioner, the Program Launcher, and the Program Runtime will be running in the same JVM process, meaning they will be scaled as one unit. In current CDAP 4, minus the provisioner, the role is mainly fulfilled by the ProgramRuntimeService
, which can only runs inside the CDAP master. In order to path the way for scaling out the Mission Control in future, we should have a logical separation between the Mission Control and the CDAP master. The role of the ProgramRuntimeService
will be simplified to:
- Loads the program runtime data from the metadata store
- Artifact information, runtime arguments, preferences, runtime profile, etc.
- Generates a
RunId
and publish it together with the program runtime data to TMS (start program request)- This is essentially taking a snapshot of all information needed for the program execution
This essentially make the ProgramRuntimeService
becomes a stateless library and can be used in anywhere. For example, it can be called from http handler or from scheduler, and they could be running in different processes (which is needed as we need to scale and have HA for the scheduler in future).
The Missions Control will be polling for program start message from TMS; upon receviing a new message, it will:
- The Provisioner picks up the start program request and starts the provision logic
- See the detail design of the state transition in CDAP Provisioning
- When provisioning completed, it notifies the Program Launcher to start the program in the cluster
- The notification can simply done with in-memory method call, via some interface contract, assuming the Provisioner and the Program Launcher always running in the same process
- The notification should includes all the program runtime data plus the unique identifier for the execution cluster
- The Program Launcher starts the Program Runtime in the given cluster
- Different (cloud) clusters might need different ways of starting Program Runtime
- The Program Runtime setup the CDAP runtime environment in order to provide CDAP services to the program (more on this in later section)
- The Program Runtime launch the user program via
ProgramRunner
and manage the program lifecycle - On the Mission Control side, after the Program Launcher started the Program Runtime (step 5), it notifies the Runtime Monitor to start monitoring the running program
- Similar to above, the notification can be done with method call, providing the state has been persisted
- If the process failed, upon restart the Runtime Monitor will just read from the state store and resumes monitoring
- The Run Record Corrector is part of the Runtime Monitor
- The Runtime Monitor maintains a heartbeat with the Program Runtime to get the latest states update (more on this in later section)
- Similar to above, the notification can be done with method call, providing the state has been persisted
- On the completion of program execution, the Program Runtime will persist the final states into cloud storage
- This could be optional, depending on how much information are needed to be retained if there is failure (more on this in later section)
- The Runtime Monitor will fetch the final states (or already get it via the last heartbeat) and update the program states
Provisioner
The provisioner is the component that intereacts with cloud providers for resources provisioning and de-provisioning. Generally it has the following properties:
- Resilient to failures
- CDAP master interruptions, including restart, failover, upgrade, etc.
- Cloud provider interruptions, which can be caused by network outage, disconnection or cloud provider availability issues.
- Generally speak, the provisioner progress might be halt during service interruptions, but should be able to recover and continue correctly after interruptions.
- Generally it is handled with intent log and retry
- First log (persist) the intented action before performing the action
- Based on intent log, the action will be retried upon failure, even after process termination and restart
- The retry should contain logic to validate if the action has been performed or not by consulting the source of truth
- Once the action completed, log the completion state as well as the next intended action (if any).
- Pluggable and Extensible
- It is defined as part of the Runtime Extension SPI
- Horizontally Scalable
- A CDAP instance could be launching thousands or even more programs at the same time
Detail design is in CDAP Provisioning
Program Launcher
The Program Launcher is a relatively simple. It is responsible to launch the CDAP runtime in the cluster being provisioned by the Provisioner, which involves
- Gather CDAP runtime jars, program artifacts, program runtime data, configurations, etc.
- Launch the CDAP runtime on the target runtime cluster with the information gathered in step 1
- This is a cluster specific step, but generally involves files localization and launching the CDAP runtime JVM on the cluster. Here are some possibilities:
- ssh + scp files and running command
- Talks to YARN RM directly via the RM Client protocol for localization as well as launching YARN application
- Build and publish a docker image (localization) and interacts with Kubernetes master to launch CDAP runtime in container
- This is a cluster specific step, but generally involves files localization and launching the CDAP runtime JVM on the cluster. Here are some possibilities:
Runtime Monitor
Program Runtime
The Program Runtime is the CDAP component running inside the target execution (cloud) environment. It is responsible for
- Interact with the cluster resource manager to acquire resources and execution of the user program
- Manage the lifecycle of that particular program run inside the cluster
- Response to handle heartbeat requests from the CDAP master
- Provides a local TMS service for metadata and metrics collection
CDAP Applications
MMDS
Wrangler
Data Pipeline
Random notes (WIP):
Data Fabric
Dataset Service
Dataset service is responsible for the following operations
- Dataset modules and types management
- Add / remove of modules and types
- Dataset instance management
- Creation
- Association of dataset instance and dataset properties
- Creation / Acquisition of underlying storage resource
- (Optional) creation of the corresponding explore (hive) table
- Creation