Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
One common use case is that of a user running a relational database with multiple tables. They would like to create copies of those tables in a data warehouse like BigQuery in a single, simple operation. All existing data should be copied first, then new changes (inserts, updates, deletes) that are applied to the relational db tables should be reflected in the BigQuery tables within minutes. Newly created tables in the relational db should automatically appear in BigQuery. Tables that are deleted in the relational db should be delete in BigQuery. Compatible schema changes should also be reflected.
Pipelines are usually not suitable for these types of use cases, which more closely resemble replication than incremental loads. It is possible to incrementally load data from a single table to a single BigQuery table if the table never has deletes or updates and has a monotonically increasing column. Most users do not have a write pattern like this, so a better solution is required.
Goals
Design a way for users to easily create a continuously updating copy of their existing data.
Terminology
- Replicator - A program that reads changes applied to some source storage system and applies – or replicates -- them to a target storage system
- Source - The storage system to replicate from. The first version focuses on relational databases.
- Target - The storage system to replicate to. The first version focuses on data warehouses like BigQuery, Redshift, or Snowflake.
- DDL event - An event involving a structure in the source, such as the creation, alteration, or deletion of a table
- DML event - An event involving data in the source, such as the insertion, update, or deletion of a row in a table
User Stories
- As a data admin, I want to be able to replicate data from Oracle, MySQL, or SQL Server
- As a data admin, I want to be able to replicate data into BigQuery, Spanner, Redshift, or Snowflake
- As a data admin, I want my replicated tables to be updated within 5 minutes of the source table being updated
- As a data admin, if an event failed to replicate, but could possibly succeed on retry (ex: system temporarily down, somebody revoked privileges on the service account, etc), I want the replicator to be paused until the problem is fixed
- As a data admin, if an event failed to replicate in a way that cannot succeed on retry (ex: db column type was altered, which isn't supported in the destination), I want the replicator to be paused until the problem is manually fixed by an admin
- As a data admin, I want to know how many events failed to replicate to each target table
- As a data admin, I do not want any events to be lost even if the replicator crashes
- As a data admin, I do not want duplicate data in the target even if the replicator crashes
- As a data admin, I want to be able to tell how far behind my target tables are compared to my source tables
- As a data admin, I want to have some metrics around how quickly events are being replicated
- As a data admin, I want to be able to pause and resume a replicator
- As a data admin, I want to be able to delete a replicator
- As a data admin, I want to be able to select a subset of source tables to replicate to my target
- As a data admin, I want supported DDL events to be replicated to my destination system
- As a data admin, I want unsupported DDL events in my source to pause the replicator and alert an admin until a manual resolution is performed
- As a data admin, I want to be able to configure whether tables that are dropped in my source should automatically be dropped in my target
- As a data admin, I want to be able to see logs about my replicator in case there are issues (out of memory, permissions errors, etc)
- As a data admin, I want to be able to find documentation about what type of database setup I need to perform on my source database
- As a data admin, I want to be able to browse my replicators in a single place
- As a data admin, I want to be able to distinguish my replicators from pipelines and custom apps
- As a data admin, I want to be able to test that my replicator replicator is correctly configured before running it
Design
Approach
This section describes the At a high level approach by going through an example for consuming events from MySQL into BigQuery.
Example 1 - Primary Key
In this example, suppose the following queries on run on the source database:
noformat, replicators are implemented by connecting a 'Source' and 'Target'. Sources and Targets define an API that can be implemented by plugins.
Sources are responsible for reading change events from a database and translating them into an ordered sequence of standard DDLEvents and DMLEvents. Each event contains a 'Sequence Number', which is a monotonically increasing, unique (at least within a single replicator) value. Given a sequence number, the source must be able to start reading events from that number. Sources begin by taking a snapshot of the current state of the database, then begin consuming change events from that moment on.
Targets are responsible for taking the ordered sequence of events and replicating them to the downstream storage system, then persisting the sequence number for those events. Events will be send to a target exactly once during normal operation, but can be sent at least once in error scenarios. Once a sequence number has been successfully persisted, events prior to that number will never be seen again.
Change events are represented as:
No Format |
---|
class DDLEvent {
long sequenceNum;
boolean isSnapshot;
DDLOperation operation; // "CREATE_DATABASE" | "DROP_DATABASE" | "CREATE_TABLE" | "DROP_TABLE" | "TRUNCATE_TABLE" | "ALTER_TABLE" | "RENAME_TABLE"
Schema schema;
String database;
String prevTable; // used by renames
String table;
List<String> primaryKey;
}
class DMLEvent {
long sequenceNum;
boolean isSnapshot;
DMLOperation operation; // "INSERT" | "DELETE" | "UPDATE",
String database;
String table;
StructuredRecord row;
} |
Example 1 - Primary Key
Source
In this example, suppose the following queries are run on the source database:
No Format |
---|
CREATE DATABASE myDB;
CREATE TABLE customers (id int, name varchar(50), PRIMARY KEY (id));
INSERT INTO customers (id, name) VALUES (0, 'alice');
UPDATE customers set name='ABC' where id=0;
DELETE FROM customers where id=0;
INSERT into customers (id, name) VALUES (0, 'Alice'), (1, 'blob');
UPDATE customers set name='Bob' where id='1'; |
The source generates the following DDL events:
sequence # | operation | database | table | schema | primary key |
---|---|---|---|---|---|
0 | CREATE_DATABASE | myDB | |||
1 | CREATE_TABLE | myDB | customers | id int, name varchar(50) | id |
followed by the following DML events:
sequence # | operation | database | table | row |
---|---|---|---|---|
2 | INSERT | myDB | customers | <id:0, name:alice> |
3 | UPDATE | myDB | customers | <id:0, name:ABC> |
4 | DELETE | myDB | customers | <id:0> |
5 | INSERT | myDB | customers | <id:0, name:Alice> |
6 | INSERT | myDB | customers | <id:1, name:blob> |
7 | UPDATE | myDB | customers | <id:1, name:Bob> |
Target
Application
A new artifact named cdap-data-transfer is added to CDAP. The artifact supports a configuration that specifies what to read from and what to write to. In order to avoid confusion with similar concepts in wrangler and pipelines, this document will refer to them as an origin and a destination. Origins and destinations are both pluggable and will ideally be consolidated with wrangler connections and pipeline sources/sinks in the future. The application is responsible for keeping track of what events it has read, ensuring that no data is lost, emitting relevant metrics, and exposing any additional information required by the UI.
A data transfer is created by creating an application using the cdap-data-transfer artifact along with a configuration. A single app is responsible for consuming events for a database. It is not one app per table, but one app per database. This is needed because relational databases support transactions across multiple tables, so updates to multiple tables need to be able to handled in a single program.
Transactions
It is important to note that preservation of transactions is not one of the user stories for the initial version. For example, suppose 1000 rows are updated in a single transaction in the origin. It might be the case that 500 of those rows are applied to the destination, with the other 500 applied soon thereafter. What this means from a user perspective is that a user who queries the destination may get results that were never possible to get if the origin were queried.
Ensuring atomicity can be added in a relatively straightforward manner if the change log for an origin ensures that all events for a transaction are grouped together. If this property does not hold, it is a bit more complicated. This complexity, plus the fact that most data warehouse use cases do not require this strong of a guarantee, are the reasons transactions are not being handled in the first version.
Config API
The application configuration is of a same format as a pipeline configuration.
No Format |
---|
{ "resources": { "vcores": 4, "memoryMB": 4096 }, "stages": [ { "name": "my-oracle-db", "plugin": { "name": "oracle", "type": "cdc-origin", "artifact": { "scope": "SYSTEM", "name": "cdc-oracle", "version": "0.1.0" }, "properties": { "user": "myuser", "password": "${secure(xyz)}", ... } } }, { "name": "my-bq-dataset", "plugin": { "name": "bigquery", "type": "cdc-destination", "artifact": { ... }, "properties": { ... } } } ], "connections": [ { "from": "my-oracle-db", "to": "my-bq-dataset" } ] } |
This is more verbose than needed. This is done in order to allow for future versions of the application to support more complicated structures than a single origin and a single destination. For example, there may be future requirements around filtering or masking or multiple destinations.
Plugins
The application will need to support additional origins and destinations in the future. The UI will also need to be able to list available origins and destinations and provide information about how they need to be configured. CDAP's plugin framework has all of this built out already.
Ideally, sometime in the future these plugin types will be consolidated with pipeline sources and sinks and wrangler connections.
Origin
The origin is responsible for emitting events that the destination will persist. It is also responsible for attaching a transaction ID to each event that will allow the application to group together events for the same transaction before sending them to the destination.
Code Block |
---|
Destination
The destination is responsible for writing data to the downstream storage system. It is assumed that the program might die at any time, so the destination needs to be able to handle duplicate data. In other words, the writes for a single transaction must be atomic and idempotent. The framework will ensure that the destination is give all the data for a transaction in a single call.
Code Block |
---|
Program
A data transfer will run as a CDAP worker.
Lineage
Dataset level and field level lineage is a key part of the value offering for CDAP. Today, the dataset lineage is handled by CDAP in a mostly transparent way based on dataset usage. Field level lineage is handled explicitly by applications by calling APIs that are available at prepare time. Both of these are inadequate for CDC, since table creation, deletion, and schema changes all occur when the program is running. In order to support lineage, the CDAP Worker API will need to be enhanced to support emitting lineage at runtime.
Code Block |
---|
public interface WorkerContext extends RuntimeContext, ..., LineageRecorder { } |
LineageRecorder is only added to WorkerContext to begin with in case this turns out to be problematic. It can be added to other program contexts, or most generally to RuntimeContext, if it is found to be useful. The implementation will need to be careful not to overload the messaging system. It will likely aggregate all lineage events in the past X amount of time and publish it all at once, similar to how metrics are published. This means lineage events can potentially be lost on program death.
Examples
MySQL Origin
Code Block |
---|
BigQuery Destination
Code Block |
---|
Why not Pipelines?
There are product requirements around users managing their data copies in a single place, as a separate experience from pipelines. Implementing copies with another app provides a natural way to separate them.
CDC is a much more targeted use case than a generic pipeline. It does not require all the flexibility with different aggregators, actions, etc. Implementing CDC via pipeline plugins would require users to setup their pipelines in a very specific way, with many ways to misconfigure their pipelines. CDC as a new application allows the app to control which plugins can be used in data copies.
Pipelines are very schema forward, with connections between stages usually representing a constant schema. When the data flowing between stages can be of different schemas, the tool becomes much more difficult to understand.
CDC has a pretty standard set of metrics and state that are CDC specific. Rate of consumption, whether an initial snapshot is being executed, etc.
State is much more complicated to manage in a pipeline. For example, the source would need to be able to store offsets indicating which events have already been processed. It cannot do this until it knows the sink has successfully written that data, which means the source and sink need to be able to communicate. This becomes even more complicated if somebody decides to add multiple sinks for the same source.
API changes
New Programmatic APIs
New Java APIs introduced (both user facing and internal)
Deprecated Programmatic APIs
New REST APIs
Path | Method | Description | Response Code | Response |
---|---|---|---|---|
/v3/apps/<app-id> | GET | Returns the application spec for a given application | 200 - On success 404 - When application is not available 500 - Any internal errors | |
Deprecated REST API
Path | Method | Description |
---|---|---|
/v3/apps/<app-id> | GET | Returns the application spec for a given application |
CLI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
UI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
Security Impact
What's the impact on Authorization and how does the design take care of this aspect
Impact on Infrastructure Outages
System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3