Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

One common use case is that of a user running a relational database with multiple tables. They would like to create copies of those tables in a data warehouse like BigQuery in a single, simple operation. All existing data should be copied first, then new changes (inserts, updates, deletes) that are applied to the relational db tables should be reflected in the BigQuery tables within minutes. Newly created tables in the relational db should automatically appear in BigQuery. Tables that are deleted in the relational db should be delete in BigQuery. Compatible schema changes should also be reflected.

Pipelines are usually not suitable for these types of use cases, which more closely resemble replication than incremental loads. It is possible to incrementally load data from a single table to a single BigQuery table if the table never has deletes or updates and has a monotonically increasing column. Most users do not have a write pattern like this, so a better solution is required. 

Goals

Design a way for users to easily create a continuously updating copy of their existing data.

Terminology

  • Replicator - A program that reads changes applied to some source storage system and applies – or replicates -- them to a target storage system 
  • Source - The storage system to replicate from. The first version focuses on relational databases.
  • Target - The storage system to replicate to. The first version focuses on data warehouses like BigQuery, Redshift, or Snowflake.
  • DDL event - An event involving a structure in the source, such as the creation, alteration, or deletion of a table
  • DML event - An event involving data in the source, such as the insertion, update, or deletion of a row in a table

User Stories

  1. As a data admin, I want to be able to replicate data from Oracle, MySQL, or SQL Server
  2. As a data admin, I want to be able to replicate data into BigQuery, Spanner, Redshift, or Snowflake
  3. As a data admin, I want my replicated tables to be updated within 5 minutes of the source table being updated
  4. As a data admin, if an event failed to replicate, but could possibly succeed on retry (ex: system temporarily down, somebody revoked privileges on the service account, etc), I want the replicator to be paused until the problem is fixed
  5. As a data admin, if an event failed to replicate in a way that cannot succeed on retry (ex: db column type was altered, which isn't supported in the destination), I want the replicator to be paused until the problem is manually fixed by an admin
  6. As a data admin, I want to know how many events failed to replicate to each target table
  7. As a data admin, I do not want any events to be lost even if the replicator crashes
  8. As a data admin, I do not want duplicate data in the target even if the replicator crashes
  9. As a data admin, I want to be able to tell how far behind my target tables are compared to my source tables
  10. As a data admin, I want to have some metrics around how quickly events are being replicated
  11. As a data admin, I want to be able to pause and resume a replicator
  12. As a data admin, I want to be able to delete a replicator
  13. As a data admin, I want to be able to select a subset of source tables to replicate to my target
  14. As a data admin, I want supported DDL events to be replicated to my destination system
  15. As a data admin, I want unsupported DDL events in my source to pause the replicator and alert an admin until a manual resolution is performed
  16. As a data admin, I want to be able to configure whether tables that are dropped in my source should automatically be dropped in my target
  17. As a data admin, I want to be able to see logs about my replicator in case there are issues (out of memory, permissions errors, etc)
  18. As a data admin, I want to be able to find documentation about what type of database setup I need to perform on my source database
  19. As a data admin, I want to be able to browse my replicators in a single place
  20. As a data admin, I want to be able to distinguish my replicators from pipelines and custom apps
  21. As a data admin, I want to be able to test that my replicator  replicator is correctly configured before running it

Design

Approach

This section describes the At a high level approach by going through an example for consuming events from MySQL into BigQuery.

Example 1 - Primary Key

In this example, suppose the following queries on run on the source database:

noformat

, replicators are implemented by connecting a 'Source' and 'Target'. Sources and Targets define an API that can be implemented by plugins.


Sources are responsible for reading change events from a database and translating them into an ordered sequence of standard DDLEvents and DMLEvents. Each event contains a 'Sequence Number', which is a monotonically increasing, unique (at least within a single replicator) value. Given a sequence number, the source must be able to start reading events from that number. Sources begin by taking a snapshot of the current state of the database, then begin consuming change events from that moment on.


Targets are responsible for taking the ordered sequence of events and replicating them to the downstream storage system, then persisting the sequence number for those events. Events will be send to a target exactly once during normal operation, but can be sent at least once in error scenarios. Once a sequence number has been successfully persisted, events prior to that number will never be seen again.

Change events are represented as:

No Format
class DDLEvent {
  long sequenceNum;
  boolean isSnapshot;
  DDLOperation operation; // "CREATE_DATABASE" | "DROP_DATABASE" | "CREATE_TABLE" | "DROP_TABLE" | "TRUNCATE_TABLE" | "ALTER_TABLE" | "RENAME_TABLE"
  Schema schema;
  String database;
  String prevTable; // used by renames
  String table;
  List<String> primaryKey;
}


class DMLEvent {
  long sequenceNum;
  boolean isSnapshot;
  DMLOperation operation; // "INSERT" | "DELETE" | "UPDATE",
  String database;
  String table;
  StructuredRecord row;
}


Example 1 - Primary Key

Source

In this example, suppose the following queries are run on the source database:

No Format
CREATE DATABASE myDB;


CREATE TABLE customers (id int, name varchar(50), PRIMARY KEY (id));


INSERT INTO customers (id, name) VALUES (0, 'alice');


UPDATE customers set name='ABC' where id=0;


DELETE FROM customers where id=0;


INSERT into customers (id, name) VALUES (0, 'Alice'), (1, 'blob');


UPDATE customers set name='Bob' where id='1';

The source generates the following DDL events:

sequence #operationdatabasetableschemaprimary key
0CREATE_DATABASE
myDB



1CREATE_TABLE
myDB
customers
id int, name varchar(50)
id

followed by the following DML events:

sequence #operationdatabasetablerow
2INSERTmyDBcustomers

<id:0, name:alice>

3UPDATEmyDBcustomers

<id:0, name:ABC>

4DELETEmyDBcustomers

<id:0>

5INSERTmyDBcustomers

<id:0, name:Alice>

6INSERTmyDBcustomers<id:1, name:blob>
7UPDATEmyDBcustomers<id:1, name:Bob>

Target



Application

A new artifact named cdap-data-transfer is added to CDAP. The artifact supports a configuration that specifies what to read from and what to write to. In order to avoid confusion with similar concepts in wrangler and pipelines, this document will refer to them as an origin and a destination. Origins and destinations are both pluggable and will ideally be consolidated with wrangler connections and pipeline sources/sinks in the future. The application is responsible for keeping track of what events it has read, ensuring that no data is lost, emitting relevant metrics, and exposing any additional information required by the UI.

A data transfer is created by creating an application using the cdap-data-transfer artifact along with a configuration. A single app is responsible for consuming events for a database. It is not one app per table, but one app per database. This is needed because relational databases support transactions across multiple tables, so updates to multiple tables need to be able to handled in a single program.

Transactions

It is important to note that preservation of transactions is not one of the user stories for the initial version. For example, suppose 1000 rows are updated in a single transaction in the origin. It might be the case that 500 of those rows are applied to the destination, with the other 500 applied soon thereafter. What this means from a user perspective is that a user who queries the destination may get results that were never possible to get if the origin were queried.

Ensuring atomicity can be added in a relatively straightforward manner if the change log for an origin ensures that all events for a transaction are grouped together. If this property does not hold, it is a bit more complicated. This complexity, plus the fact that most data warehouse use cases do not require this strong of a guarantee, are the reasons transactions are not being handled in the first version.

Config API

The application configuration is of a same format as a pipeline configuration.

No Format
{
  "resources": {
    "vcores": 4,
    "memoryMB": 4096
  },
  "stages": [
    {
      "name": "my-oracle-db",
      "plugin": {
        "name": "oracle",
        "type": "cdc-origin",
        "artifact": {
          "scope": "SYSTEM",
          "name": "cdc-oracle",
          "version": "0.1.0"
        },
        "properties": {
          "user": "myuser",
          "password": "${secure(xyz)}",
          ...
        }
      }
    },
    {
      "name": "my-bq-dataset",
      "plugin": {
        "name": "bigquery",
        "type": "cdc-destination",
        "artifact": { ... },
        "properties": { ... }
      }
    }
  ],
  "connections": [
    { "from": "my-oracle-db", "to": "my-bq-dataset" }
  ]
}

This is more verbose than needed. This is done in order to allow for future versions of the application to support more complicated structures than a single origin and a single destination. For example, there may be future requirements around filtering or masking or multiple destinations.

Plugins

The application will need to support additional origins and destinations in the future. The UI will also need to be able to list available origins and destinations and provide information about how they need to be configured. CDAP's plugin framework has all of this built out already.

Ideally, sometime in the future these plugin types will be consolidated with pipeline sources and sinks and wrangler connections.

Origin

The origin is responsible for emitting events that the destination will persist. It is also responsible for attaching a transaction ID to each event that will allow the application to group together events for the same transaction before sending them to the destination.

Code Block


Destination

The destination is responsible for writing data to the downstream storage system. It is assumed that the program might die at any time, so the destination needs to be able to handle duplicate data. In other words, the writes for a single transaction must be atomic and idempotent. The framework will ensure that the destination is give all the data for a transaction in a single call.

Code Block


Program

A data transfer will run as a CDAP worker.

Lineage

Dataset level and field level lineage is a key part of the value offering for CDAP. Today, the dataset lineage is handled by CDAP in a mostly transparent way based on dataset usage. Field level lineage is handled explicitly by applications by calling APIs that are available at prepare time. Both of these are inadequate for CDC, since table creation, deletion, and schema changes all occur when the program is running. In order to support lineage, the CDAP Worker API will need to be enhanced to support emitting lineage at runtime.

Code Block
public interface WorkerContext extends RuntimeContext, ..., LineageRecorder {


}

LineageRecorder is only added to WorkerContext to begin with in case this turns out to be problematic. It can be added to other program contexts, or most generally to RuntimeContext, if it is found to be useful. The implementation will need to be careful not to overload the messaging system. It will likely aggregate all lineage events in the past X amount of time and publish it all at once, similar to how metrics are published. This means lineage events can potentially be lost on program death. 

Examples

MySQL Origin

Code Block


BigQuery Destination

Code Block


Why not Pipelines?

There are product requirements around users managing their data copies in a single place, as a separate experience from pipelines. Implementing copies with another app provides a natural way to separate them.

CDC is a much more targeted use case than a generic pipeline. It does not require all the flexibility with different aggregators, actions, etc. Implementing CDC via pipeline plugins would require users to setup their pipelines in a very specific way, with many ways to misconfigure their pipelines. CDC as a new application allows the app to control which plugins can be used in data copies. 

Pipelines are very schema forward, with connections between stages usually representing a constant schema. When the data flowing between stages can be of different schemas, the tool becomes much more difficult to understand.

CDC has a pretty standard set of metrics and state that are CDC specific. Rate of consumption, whether an initial snapshot is being executed, etc.

State is much more complicated to manage in a pipeline. For example, the source would need to be able to store offsets indicating which events have already been processed. It cannot do this until it knows the sink has successfully written that data, which means the source and sink need to be able to communicate. This becomes even more complicated if somebody decides to add multiple sinks for the same source.

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Deprecated Programmatic APIs

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors







Deprecated REST API

PathMethodDescription
/v3/apps/<app-id>GETReturns the application spec for a given application

CLI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3


Future work