Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
One common use case is that of a user running a relational database with multiple tables. They would like to create copies of those tables in a data warehouse like BigQuery in a single, simple operation. All existing data should be copied first, then new changes (inserts, updates, deletes) that are applied to the relational db tables should be reflected in the BigQuery tables within minutes. Newly created tables in the relational db should automatically appear in BigQuery. Tables that are deleted in the relational db should be delete in BigQuery. Compatible schema changes should also be reflected.
Pipelines are usually not suitable for these types of use cases, which more closely resemble replication than incremental loads. It is possible to incrementally load data from a single table to a single BigQuery table if the table never has deletes or updates and has a monotonically increasing column. Most users do not have a write pattern like this, so a better solution is required.
Goals
Design a way for users to easily create a continuously updating copy of their existing data.
Terminology
- Replicator - A program that reads changes applied to some source storage system and applies – or replicates -- them to a target storage system
- Source - The storage system to replicate from. The first version focuses on relational databases.
- Target - The storage system to replicate to. The first version focuses on data warehouses like BigQuery, Redshift, or Snowflake.
- DDL event - An event involving a structure in the source, such as the creation, alteration, or deletion of a table
- DML event - An event involving data in the source, such as the insertion, update, or deletion of a row in a table
User Stories
- As a data admin, I want to be able to replicate data from Oracle, MySQL, or SQL Server
- As a data admin, I want to be able to replicate data into BigQuery, Spanner, Redshift, or Snowflake
- As a data admin, I want my replicated tables to be updated within 5 minutes of the source table being updated
- As a data admin, if an event failed to replicate, but could possibly succeed on retry (ex: system temporarily down, somebody revoked privileges on the service account, etc), I want the replicator to be paused until the problem is fixed
- As a data admin, if an event failed to replicate in a way that cannot succeed on retry (ex: db column type was altered, which isn't supported in the destination), I want the replicator to be paused until the problem is manually fixed by an admin
- As a data admin, I want to know how many events failed to replicate to each target table
- As a data admin, I do not want any events to be lost even if the replicator crashes
- As a data admin, I do not want duplicate data in the target even if the replicator crashes
- As a data admin, I want to be able to tell how far behind my target tables are compared to my source tables
- As a data admin, I want to have some metrics around how quickly events are being replicated
- As a data admin, I want to be able to pause and resume a replicator
- As a data admin, I want to be able to delete a replicator
- As a data admin, I want to be able to select a subset of source tables to replicate to my target
- As a data admin, I want supported DDL events to be replicated to my destination system
- As a data admin, I want unsupported DDL events in my source to pause the replicator and alert an admin until a manual resolution is performed
- As a data admin, I want to be able to configure whether tables that are dropped in my source should automatically be dropped in my target
- As a data admin, I want to be able to see logs about my replicator in case there are issues (out of memory, permissions errors, etc)
- As a data admin, I want to be able to find documentation about what type of database setup I need to perform on my source database
- As a data admin, I want to be able to browse my replicators in a single place
- As a data admin, I want to be able to distinguish my replicators from pipelines and custom apps
- As a data admin, I want to be able to test that my replicator is correctly configured before running it
Design
Approach
At a high level, replicators are implemented by a new CDAP application that define new 'DeltaSource' and 'DeltaTarget' plugin interfaces.
A DeltaSource is responsible for reading change events from a database and translating them into an ordered sequence of standard DDLEvents and DMLEvents. Sources begin by taking a snapshot of the current state of the database, then begin consuming change events from that moment on. Each event contains an Offset, which is a monotonically increasing and unique (at least within a single replicator). Given an offset, the source must be able to start reading events from that offset.
A DeltaTarget is responsible for taking the ordered sequence of events and replicating them to a storage system, as well as telling the app that it has finished replicating an event, allowing the app to store the offset for that event. Events will be send to a target exactly once during normal operation, but can be sent at least once in error scenarios. Once an offset has been successfully persisted, events prior to that number will never be seen again.
Change events are represented as:
class DDLEvent { Offset offset; long sequenceNumber; boolean isSnapshot; DDLOperation operation; // "CREATE_DATABASE" | "DROP_DATABASE" | "CREATE_TABLE" | "DROP_TABLE" | "TRUNCATE_TABLE" | "ALTER_TABLE" | "RENAME_TABLE" Schema schema; String database; String prevTable; // used by renames String table; List<String> primaryKey; } class DMLEvent { Offset offset; long sequenceNumber; boolean isSnapshot; DMLOperation operation; // "INSERT" | "DELETE" | "UPDATE", String database; String table; StructuredRecord row; } class Offset implements Comparable<Offset> { Map<String, byte[]> offset; }
Offsets are a map of values because different sources require different information to know where to start reading from. For example, MySQL events correspond to a specific position within a binlog file.
The sequence number is a monotonically increasing number generated by the application. Sources are only responsible for attaching an Offset to each Event they emit. The application will then generate a sequence number and attach it to the event before sending it to the target. This is done because a monotonically increasing number makes it much easier for targets to implement their logic in an idempotent way, which is required to correctly handle failure scenarios.
Example
Source
In this example, suppose the following queries are run on the source database:
CREATE DATABASE myDB; CREATE TABLE customers (id int, name varchar(50), PRIMARY KEY (id)); INSERT INTO customers (id, name) VALUES (0, 'alice'); UPDATE customers set name='ABC' where id=0; DELETE FROM customers where id=0; INSERT into customers (id, name) VALUES (0, 'Alice'), (1, 'blob'); UPDATE customers set name='Bob' where id='1';
The source generates the following DDL events:
offset | sequence # | operation | database | table | schema | primary key |
---|---|---|---|---|---|---|
<binlog:mysql-bin.000003, pos:1424> | 0 | CREATE_DATABASE | myDB | |||
<binlog:mysql-bin.000003, pos:1462> | 1 | CREATE_TABLE | myDB | customers | id int, name varchar(50) | id |
followed by the following DML events:
offset | sequence # | operation | database | table | row |
---|---|---|---|---|---|
<binlog:mysql-bin.000003, pos:1462> | 2 | INSERT | myDB | customers | <id:0, name:alice> |
<binlog:mysql-bin.000003, pos:1482> | 3 | UPDATE | myDB | customers | <id:0, name:ABC> |
<binlog:mysql-bin.000003, pos:1519> | 4 | DELETE | myDB | customers | <id:0> |
<binlog:mysql-bin.000003, pos:1538> | 5 | INSERT | myDB | customers | <id:0, name:Alice> |
<binlog:mysql-bin.000003, pos:1557> | 6 | INSERT | myDB | customers | <id:1, name:blob> |
<binlog:mysql-bin.000003, pos:1598> | 7 | UPDATE | myDB | customers | <id:1, name:Bob> |
Sequence number is attached by the application, the source is only responsible for attaching an offset to each event and defining how to compare offsets. In the MySQL case, offsets are compared by filename first, then position within the file.
Target
The BigQuery target batches DML events together and writes a batch of events to GCS. Once in GCS, it runs a BigQuery load job to load the changes into a staging table. Finally, it runs a Merge query to merge events from the staging table into the actual target table. Once that is complete, it persists the latest sequence number of events contained in the batch. DDL events are not batched together.
For event #0, the target creates a BQ dataset named 'myDB'. Since it must assume the event occurs at least once, it checks if the dataset exists before creating it. After creating the dataset, the target calls a method that tells the application to remember that the event was replicated. The application stores the offset and sequence number for that event.
For event #1, a staging table '_staging_customers' is created with the same schema as the source table, except with 3 extra columns – batchId, sequenceNum, and operation. Batch id is just the current timestamp of the load job. The table is partitioned on batchId and clustered on sequenceNum. This allows efficiently selecting data for a specific batchId while ordering by sequenceNum. Note that a BQ 'integer' is 8 bytes, equivalent to a Java long.
_batchId (timestamp) | _sequenceNum (integer) | _operation (string) | id (integer) | name (string) |
---|---|---|---|---|
the actual target table 'customers' is also created with the same schema as the source table, except with the sequence number as an additional column:
_sequenceNumber (integer) | id (integer) | name (string) |
---|---|---|
For events #2-7, the target may decide to batch them together in different ways. Supposing they all get batched together, after the load job, the staging table looks like:
_batchId | _sequenceNum | _operation | id | name |
---|---|---|---|---|
1234567890 | 2 | INSERT | 0 | alice |
1234567890 | 3 | UPDATE | 0 | ABC |
1234567890 | 4 | DELETE | 0 | |
1234567890 | 5 | INSERT | 0 | Alice |
1234567890 | 6 | INSERT | 1 | blob |
1234567890 | 7 | UPDATE | 1 | Bob |
A merge query is then run to merge changes from the staging table into the final target table:
MERGE myDB.customers as T USING ($DIFF_QUERY) as D ON T.id = D.id WHEN MATCHED AND D._sequenceNumber > T._sequenceNumber AND D._op = "DELETE DELETE WHEN MATCHED AND D._sequenceNumber > T._sequenceNumber AND (D._op = "INSERT" OR D._op = "UPDATE") UPDATE id = D.id, name = D.name WHEN NOT MATCHED AND D._sequenceNumber > T._sequenceNumber AND (D._op = "INSERT" OR D._op = "UPDATE") INSERT (id, name) VALUES (id, name)
Where the $DIFF_QUERY is:
SELECT S.* FROM (SELECT * FROM myDB._staging_customers WHERE _batchId = 1234567890) as S JOIN ( SELECT id, MAX(_sequenceNum) as maxSequenceNum FROM myDB._staging_customers WHERE _batchId = 1234567890) as M ON S.id = M.id AND S._sequenceNum = M.maxSequenceNum
The diff query is responsible for getting the latest change for each primary key. With the example above, it results in:
_batchId | _sequenceNum | _operation | id | name |
---|---|---|---|---|
1234567890 | 5 | INSERT | 0 | Alice |
1234567890 | 7 | UPDATE | 1 | Bob |
When there is no primary key, the sequence number is essentially used as the primary key.
Note: when there is a primary key, it is possible to implement the target in such a way where it doesn't need the additional sequence number column, exactly matching the source schema. However, this complicates the idempotency story, as the target would need to ensure that load and merge jobs are not run on data that was previously seen, requiring more complicated logic around using specific GCS object names and BQ load job ids. In addition, the sequence number is often something that is desired in case a user wants to modify their replicator somehow, but configure it to start consuming from a specific sequence number instead of from the beginning.
Failure Handling
In this first version, any errors encountered during the replication process will be logged, then retried until the operation succeeds. For example, if the source database is inaccessible, the replicator will keep trying to access it until it becomes accessible. Similarly, if a write to the target system fails, the target should retry until it succeeds. This may require admin intervention to modify a table in order to allow the replicator to make progress again.
In future versions, it will likely make sense to support configurable policies, such as eventually skipping data that fails, writing the change event to files that data admins can examine at a later point in time.
Transactions
It is important to note that preservation of transactions is not one of the user stories for the initial version. For example, suppose 1000 rows are updated in a single transaction in the source. It might be the case that 500 of those rows are applied to the target, with the other 500 applied soon thereafter. What this means from a user perspective is that a user who queries the target may get results that were never possible to get if the source were queried.
Honoring transactions at a table level can be added in a relatively straightforward manner if each event is enhanced to include a transaction id, where it is guaranteed that all events for a transaction are grouped together. That is, events from different transactions cannot be interleaved. If this is the case, targets can make sure to atomically apply changes from the same transaction.
Config API
The application configuration is of a same format as a pipeline configuration.
{ "resources": { "vcores": 4, "memoryMB": 4096 }, "stages": [ { "name": "my-oracle-db", "plugin": { "name": "oracle", "type": "cdc-origin", "artifact": { "scope": "SYSTEM", "name": "cdc-oracle", "version": "0.1.0" }, "properties": { "user": "myuser", "password": "${secure(xyz)}", ... } } }, { "name": "my-bq-dataset", "plugin": { "name": "bigquery", "type": "cdc-destination", "artifact": { ... }, "properties": { ... } } } ], "connections": [ { "from": "my-oracle-db", "to": "my-bq-dataset" } ] }
This is more verbose than needed. This is done in order to allow for future versions of the application to support more complicated structures than a single source and a single target. For example, there may be future requirements around filtering or masking or multiple destinations.
Program
The first version will run the source and target in a single CDAP worker. A worker is chosen over Spark because it gives full control over all the error scenarios.
Lineage
Dataset level and field level lineage is a key part of the value offering for CDAP. Today, the dataset lineage is handled by CDAP in a mostly transparent way based on dataset usage. Field level lineage is handled explicitly by applications by calling APIs that are available at prepare time. Both of these are inadequate for CDC, since table creation, deletion, and schema changes all occur when the program is running. In order to support lineage, the CDAP Worker API will need to be enhanced to support emitting lineage at runtime.
public interface WorkerContext extends RuntimeContext, ..., LineageRecorder { }
LineageRecorder is only added to WorkerContext to begin with in case this turns out to be problematic. It can be added to other program contexts, or most generally to RuntimeContext, if it is found to be useful. The implementation will need to be careful not to overload the messaging system. It will likely aggregate all lineage events in the past X amount of time and publish it all at once, similar to how metrics are published. This means lineage events can potentially be lost on program death.
Why not Pipelines?
There are product requirements around users managing their data copies in a single place, as a separate experience from pipelines. Implementing copies with another app provides a natural way to separate them.
CDC is a much more targeted use case than a generic pipeline. It does not require all the flexibility with different aggregators, actions, etc. Implementing CDC via pipeline plugins would require users to setup their pipelines in a very specific way, with many ways to misconfigure their pipelines. CDC as a new application allows the app to control which plugins can be used in data copies.
Pipelines are very schema forward, with connections between stages usually representing a constant schema. When the data flowing between stages can be of different schemas, the tool becomes much more difficult to understand.
CDC has a pretty standard set of metrics and state that are CDC specific. Rate of consumption, whether an initial snapshot is being executed, etc.
State is much more complicated to manage in a pipeline. For example, the source would need to be able to store offsets indicating which events have already been processed. It cannot do this until it knows the sink has successfully written that data, which means the source and sink need to be able to communicate. This becomes even more complicated if somebody decides to add multiple sinks for the same source.
API changes
New Programmatic APIs
The application will need to support additional sources and targets in the future. The UI will also need to be able to list available sources and targets and provide information about how they need to be configured. CDAP's plugin framework has all of this built out already.
Ideally, sometime in the future these plugin types will be consolidated with pipeline sources and sinks and wrangler connections.
DeltaSource
The source is responsible for emitting events that the destination will persist. It is also responsible for attaching a transaction ID to each event that will allow the application to group together events for the same transaction before sending them to the destination.
DeltaTarget
The destination is responsible for writing data to the downstream storage system. It is assumed that the program might die at any time, so the destination needs to be able to handle duplicate data. In other words, the writes for a single transaction must be atomic and idempotent. The framework will ensure that the destination is give all the data for a transaction in a single call.
Deprecated Programmatic APIs
N/A
New REST APIs
Path | Method | Description | Response Code | Response |
---|---|---|---|---|
/v3/apps/<app-id> | GET | Returns the application spec for a given application | 200 - On success 404 - When application is not available 500 - Any internal errors | |
Deprecated REST API
N/A
CLI Impact or Changes
N/A
UI Impact or Changes
- Impact #1
- Impact #2
- Impact #3
Security Impact
What's the impact on Authorization and how does the design take care of this aspect
Impact on Infrastructure Outages
System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect
Test Scenarios
Test ID | Test Description | Expected Results |
---|---|---|
Releases
Release X.Y.Z
Release X.Y.Z
Related Work
- Work #1
- Work #2
- Work #3