Change Data Capture




Checklist

  • User Stories Documented
  • User Stories Reviewed
  • Design Reviewed
  • APIs reviewed
  • Release priorities assigned
  • Test cases reviewed
  • Blog post

Introduction 

One common use case is that of a user running a relational database with multiple tables. They would like to create copies of those tables in a data warehouse like BigQuery in a single, simple operation. All existing data should be copied first, then new changes (inserts, updates, deletes) that are applied to the relational db tables should be reflected in the BigQuery tables within minutes. Newly created tables in the relational db should automatically appear in BigQuery. Tables that are deleted in the relational db should be delete in BigQuery. Compatible schema changes should also be reflected.

Pipelines are usually not suitable for these types of use cases, which more closely resemble replication than incremental loads. It is possible to incrementally load data from a single table to a single BigQuery table if the table never has deletes or updates and has a monotonically increasing column. Most users do not have a write pattern like this, so a better solution is required. 

Goals

Design a way for users to easily create a continuously updating copy of their existing data.

Terminology

  • Replicator - A program that reads changes applied to some source storage system and applies – or replicates -- them to a target storage system 
  • Source - The storage system to replicate from. The first version focuses on relational databases.
  • Target - The storage system to replicate to. The first version focuses on data warehouses like BigQuery, Redshift, or Snowflake.
  • DDL event - An event involving a structure in the source, such as the creation, alteration, or deletion of a table
  • DML event - An event involving data in the source, such as the insertion, update, or deletion of a row in a table

User Stories

  1. As a data admin, I want to be able to replicate data from Oracle, MySQL, or SQL Server
  2. As a data admin, I want to be able to replicate data into BigQuery, Spanner, Redshift, or Snowflake
  3. As a data admin, I want to have an SLO to know that X% of the time, my data is replicated within Y minutes
  4. As a data admin, if an event failed to replicate for any reason, I want the replicator to retry for a configurable amount of time before stopping the replicator
  5. As a data admin, I want to know how many times the replicator failed to replicate an event
  6. As a data admin, I do not want any events to be lost even if the replicator crashes
  7. As a data admin, I do not want duplicate data in the target even if the replicator crashes
  8. As a data admin, I want to be able to tell how far behind my target tables are compared to my source tables
  9. As a data admin, I want to have some metrics around how quickly events are being replicated
  10. As a data admin, I want to be able to pause and resume a replicator
  11. As a data admin, I want to be able to delete a replicator
  12. As a data admin, I want to be able to select a subset of source tables to replicate to my target
  13. As a data admin, I want supported DDL events to be replicated to my destination system
  14. As a data admin, I want to be able to see logs about my replicator in case there are issues (out of memory, permissions errors, etc)
  15. As a data admin, I want to be able to find documentation about what type of database setup I need to perform on my source database
  16. As a data admin, I want to be able to test that my replicator is correctly configured before running it
  17. As a data admin, I want to track field level lineage for table that were replicated

Design

Approach

At a high level, replicators are implemented by a new CDAP application that define new 'DeltaSource' and 'DeltaTarget' plugin interfaces.

A DeltaSource is responsible for reading change events from a database and translating them into an ordered sequence of standard DDLEvents and DMLEvents. Sources begin by taking a snapshot of the current state of the database, then begin consuming change events from that moment on. Each event contains an Offset, which is a monotonically increasing and unique (at least within a single replicator). Given an offset, the source must be able to start reading events from that offset. 

A DeltaTarget is responsible for taking the ordered sequence of events and replicating them to a storage system, as well as telling the app that it has finished replicating an event, allowing the app to store the offset for that event. Events will be send to a target exactly once during normal operation, but can be sent at least once in error scenarios. Once an offset has been successfully persisted, events prior to that number will never be seen again.

Change events are represented as:

class DDLEvent {
  Offset offset;
  long sequenceNumber;
  String transactionId;
  boolean isSnapshot;
  DDLOperation operation; // "CREATE_DATABASE" | "DROP_DATABASE" | "CREATE_TABLE" | "DROP_TABLE" | "TRUNCATE_TABLE" | "ALTER_TABLE" | "RENAME_TABLE"
  Schema schema;
  String database;
  String prevTable; // used by renames
  String table;
  List<String> primaryKey;
}


class DMLEvent {
  Offset offset;
  long sequenceNumber;
  String transactionId;
  boolean isSnapshot;
  DMLOperation operation; // "INSERT" | "DELETE" | "UPDATE" | "COMMIT" ,
  String database;
  String table;
  StructuredRecord before; // null unless operation is "UPDATE" or "DELETE"
  StructuredRecord row;
}


interface Offset {
  
  // serialize the offset fields into the DataOutput
  void write(DataOutput out) throws IOException;


  // deserialize offset fields from the DataInput
  void readFields(DataInput in) throws IOException;
}

Each DeltaSource is responsible for defining it's own Offset implementation. This is because different sources require different information to know where to start reading from. For example, MySQL offsets correspond to a binlog file name and a position within that file. SQL Server offsets correspond to a change tracking sequence number for the database.

The sequence number is a monotonically increasing number generated by the application, equal to the number of changes emitted by the source. Sources are only responsible for attaching an Offset to each Event they emit. The application will then generate a sequence number and attach it to the event before sending it to the target. This is done because a monotonically increasing number makes it much easier for targets to implement their logic in an idempotent way, which is required to correctly handle failure scenarios. In addition, the sequence number is used as a gauge metric to track progress being made by the replicator. 

Examples

With Primary Key

Source

In this example, suppose the following queries are run on the source database:

CREATE DATABASE myDB;

CREATE TABLE customers (id int, name varchar(50), PRIMARY KEY (id));

INSERT INTO customers (id, name) VALUES (0, 'alice');

UPDATE customers set id=1 where id=0;

UPDATE customers set id=2 where id=1;

DELETE FROM customers where id=2;

INSERT into customers (id, name) VALUES (0, 'Alice'), (1, 'blob');

UPDATE customers set name='Bob' where id='1';

The source generates the following DDL events:

offsetoperationdatabasetableschemaprimary key
<binlog:mysql-bin.000003, pos:1424>CREATE_DATABASE
myDB



<binlog:mysql-bin.000003, pos:1462>CREATE_TABLE
myDB
customers
id int, name varchar(50)
id

followed by the following DML events:

offsetoperationdatabasetablebeforerow
<binlog:mysql-bin.000003, pos:1462>INSERTmyDBcustomers

<id:0, name:alice>

<binlog:mysql-bin.000003, pos:1482>UPDATEmyDBcustomers<id:0, name:alice>

<id:1, name:alice>

<binlog:mysql-bin.000003, pos:1493>UPDATEmyDBcustomers<id:1, name:alice><id:2, name:alice>
<binlog:mysql-bin.000003, pos:1519>DELETEmyDBcustomers<id:2, name:alice>

<id:2>

<binlog:mysql-bin.000003, pos:1538>INSERTmyDBcustomers

<id:0, name:Alice>

<binlog:mysql-bin.000003, pos:1557>INSERTmyDBcustomers
<id:1, name:blob>
<binlog:mysql-bin.000003, pos:1598>UPDATEmyDBcustomers<id:1, name:blob><id:1, name:Bob>

Sequence number is attached by the application, the source is only responsible for attaching an offset to each event and defining how to compare offsets. In the MySQL case, offsets are compared by filename first, then position within the file.

Target

The BigQuery target batches DML events together and writes a batch of events to GCS. Once in GCS, it runs a BigQuery load job to load the changes into a staging table. Finally, it runs a Merge query to merge events from the staging table into the actual target table. Once that is complete, it persists the latest sequence number of events contained in the batch. DDL events are not batched together.

For event #0, the target creates a BQ dataset named 'myDB'. Since it must assume the event occurs at least once, it checks if the dataset exists before creating it. After creating the dataset, the target calls a method that tells the application to remember that the event was replicated. The application stores the offset and sequence number for that event.

For event #1, a staging table '_staging_customers' is created that records values for the row before and after the change along with 3 extra columns – batchId, sequenceNum, and operation. Batch id is just the current timestamp of the load job. The table is partitioned on batchId and clustered on sequenceNum. This allows efficiently selecting data for a specific batchId while ordering by sequenceNum. Note that a BQ 'integer' is 8 bytes, equivalent to a Java long.

_batch_id (timestamp)_sequence_num (integer)_operation (string)_before_id (integer)_before_name (string)id (integer)name (string)







the actual target table 'customers' is also created with the same schema as the source table, except with the sequence number as an additional column:

_sequence_num (integer)id (integer)name (string)



For events #2-7, the target may decide to batch them together in different ways. Supposing they all get batched together, after the load job, the staging table looks like:

_batch_id_sequence_num_operation_before_id_before_nameidname
12345678902INSERT

0alice
12345678903UPDATE0alice1alice
12345678904UPDATE1alice2alice
12345678905DELETE2alice2
12345678906INSERT

0Alice
12345678907INSERT

1blob
12345678908UPDATE1blob1Bob

A merge query is then run to merge changes from the staging table into the final target table:

MERGE myDB.customers as T
USING ($DIFF_QUERY) as D
ON T.id = D._before_id
WHEN MATCHED AND D._op = "DELETE
  DELETE
WHEN MATCHED AND D._op IN ("INSERT", "UPDATE")
  UPDATE id = D.id, name = D.name
WHEN NOT MATCHED AND D._op IN ("INSERT", "UPDATE")
  INSERT (id, name) VALUES (id, name)

 Where the $DIFF_QUERY is:

SELECT A.* FROM
  (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as A
  LEFT OUTER JOIN
  (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as B
  ON A.id = B._before_id AND A._sequence_num < B._sequence_num
  WHERE B._before_id IS NULL

The diff query is responsible for getting the latest change for each primary key. With the example above, it results in:

_batch_id_sequence_num_operation_before_id_before_nameidname
12345678905DELETE2alice2
12345678906INSERT

0Alice
12345678908UPDATE

1Bob

The $LATEST_APPLIED variable is the max sequence number seen in the target table. This is required to ensure idempotency – events that are replayed should not be re-inserted into the final target table. The latest applied sequence number can be tracked in memory by the target, except for the first time it sees the table, where it will need to run a SELECT MAX(_sequence_num) query. 

Note: When there is a primary key, it is possible to implement the target in such a way where it doesn't need the additional sequence number column, exactly matching the source schema. However, this complicates the idempotency story, as the target would need to ensure that load and merge jobs are not run on data that was previously seen, requiring more complicated logic around using specific GCS object names and BQ load job ids.

Without Primary Key

If no primary key exists, a very similar set of steps occurs, except BigQuery will use all of the columns as the "primary key". 

Note: SQL Server doesn't allow enabling change tracking on a table without a primary key.

Source

Suppose the following queries are run on the source database:

CREATE DATABASE myDB;

CREATE TABLE customers (name varchar(50));

INSERT INTO customers (name) VALUES ('alice', 'alice', 'Bob');

UPDATE customers SET name = 'Alyce' WHERE name = 'alice';

UPDATE customers SET name = 'Alice' WHERE name = 'Alyce';

DELETE FROM customers WHERE name = 'alice';

The source generates the following DDL events:

offsetoperationdatabasetableschemaprimary key
<binlog:mysql-bin.000003, pos:1424>CREATE_DATABASE
myDB



<binlog:mysql-bin.000003, pos:1462>CREATE_TABLE
myDB
customers
name varchar(50)

followed by the following DML events:

offsettransaction idoperationdatabasetablebeforerow
<binlog:mysql-bin.000003, pos:1462>0INSERTmyDBcustomers

<name:alice>

<binlog:mysql-bin.000003, pos:1482>0INSERTmyDBcustomers

<name:alice>

<binlog:mysql-bin.000003, pos:1493>0INSERTmyDBcustomers
<name:Bob>
<binlog:mysql-bin.000003, pos:1519>1UPDATEmyDBcustomers<name:alice>

<name:Alyce>

<binlog:mysql-bin.000003, pos:1538>1UPDATEmyDBcustomers<name:alice>

<name:Alyce>

<binlog:mysql-bin.000003, pos:1557>2UPDATEmyDBcustomers<name:Alyce><name:Alice>
<binlog:mysql-bin.000003, pos:1598>2UPDATEmyDBcustomers<name:Alyce><name:Alice>
<binlog:mysql-bin.000003, pos:1603>3DELETEmyDBcustomers<name:Alice><name:Alice>
<binlog:mysql-bin.000003, pos:1605>3DELETEmyDBcustomers<name:Alice><name:Alice>
Target

The BigQuery target loads the DML events into a staging table


_batch_id_sequence_num_operation_before_namename
12345678902INSERT
alice
12345678903INSERT
alice
12345678904INSERT
Bob
12345678905UPDATEaliceAlyce
12345678906UPDATEaliceAlyce
12345678907UPDATEAlyceAlice
12345678908UPDATEAlyceAlice
12345678909DELETEAliceAlice
123456789010DELETEAliceAlice

A merge query is then run to merge changes from the staging table into the final target table:

MERGE myDB.customers as T
USING ($DIFF_QUERY) as D
ON T.name = D._before_name
WHEN MATCHED AND D._op = "DELETE
  DELETE
WHEN MATCHED AND D._op IN ("INSERT", "UPDATE")
  UPDATE id = D._after_id, name = D._after_name
WHEN NOT MATCHED AND D._op IN ("INSERT", "UPDATE")
  INSERT (id, name) VALUES (_after_id, _after_name)

 Where the $DIFF_QUERY is:

SELECT A.* FROM
  (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as A
  LEFT OUTER JOIN
  (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as B
  ON A.name = B._before_name AND A._sequence_num < B._sequence_num
  WHERE B._before_name IS NULL

The diff query is responsible for getting the latest change for each row. With the example above, it results in:

_batch_id_sequence_num_operation_before_namename
12345678904INSERT
Bob
123456789010DELETEAliceAlice

Note that if events from the same transaction are applied in different batches, it is possible to get into a state that was never possible at the source. For example, suppose events 2-7 are placed in the same batch. Then the diff query would result in:

_batch_id_sequence_num_operation_before_namename
12345678904INSERT
Bob
12345678907UPDATEAlyceAlice

This would result in the file table having a single 'Bob' and 'Alice' row, which was never possible in the source.

Transactions

A best attempt will be made to honor transactional operations. For example, if 1000 rows are modified in a single transaction in the source database, the target will attempt to replicate those changes all in the same transaction.

Transactions are supported if the following conditions are met:

  1. Source attaches a transactionId to each change event.
  2. Source emits a 'commit' DML event at the end of a transaction.
  3. Source does not interleave events from different transactions. In other words, there is always a 'commit' event for the current transaction before the next transaction starts. 
  4. Target transactionally replicates all events from a single transaction
  5. Target only saves offsets at a commit boundary

Note that #4 is not possible for many storage systems. For example, BigQuery can atomically update multiple rows in a single table, but not across multiple tables. Also note that a target may decide to bundle multiple transactions from the source into a single transaction on the target.

Note that #5 may require the target to save offsets in the same transaction as the data write. For example, when implementing a relational DB target, in order to avoid problems with reprocessing the same events, the target will need to write the offset to an offsets table in the DB in the same transaction that it is uses to apply the DML events.

Example

Suppose the following SQL queries are run on a database:

START TRANSACTION;


INSERT INTO customers (id, name) VALUES (0, 'alice'), (1, 'bob');
UPDATE customers SET name = 'Alice' where id = 0;
UPDATE customers SET name = 'Bob' where id = 1;


COMMIT;


START TRANSACTION;


DELETE FROM customers WHERE id = 1;
INSERT INTO customers (id, name) VALUES (1, 'Bobby');


COMMIT;

This would result in the following DML Events:

operationtransactionIdtablerow
INSERTtx123customers<id:0, name:'alice'>
INSERTtx123customers<id:1, name:'bob'>
UPDATEtx123customers<id:0, name:'Alice'>
UPDATEtx123customers<id:1, name:'Bob'>
COMMITtx123customers
DELETEtx456customers<id:1>
INSERTtx456customers<id:1, name:'Bobby'>
COMMITtx456customers

The BigQuery target used in the previous example would need to ensure that all the events for 'tx123' are written to the same GCS file and loaded into the staging table with the same batch id. It would be ok if both 'tx123' and 'tx456' are in the same batch, but not if a batch doesn't contain all the events from a transaction. 

Note that support for transactions is not documented as a user story. This will be a stretch goal for the first version because the best the targets can do is single table transactions, and many data warehousing use cases do not require such strict requirements.

Failure Handling

In this first version, any errors encountered during the replication process will be logged, then retried until the operation succeeds. For example, if the source database is inaccessible, the replicator will keep trying to access it until it becomes accessible. Similarly, if a write to the target system fails, the target should retry until it succeeds. This may require admin intervention to modify a table in order to allow the replicator to make progress again.

In future versions, it will likely make sense to support configurable policies, such as eventually skipping data that fails, writing the change event to files that data admins can examine at a later point in time.

Offset/State failures

Failures to read or write offsets and state will be repeatedly retried by the application for a configurable amount of time before the replicator fails. The replicator will not make any progress during this time.

Source/Target failures

If a source or target throws an exception at any point, the replicator will reset its offset to the last saved offset and start consuming events from that point forward. This will result in events being seen more than once. Plugins must be implemented with this in mind to ensure events are applied exactly once, even though they may be processed at least once.

Offsets are reset so that targets don't have to remember what they have done since the last offset was persisted. If the replicator application did not reset the offset, and just retried method calls, targets would be forced to keep change events in memory until they successfully store an offset. They would have to in case they encounter a transient error in the middle of a transaction. For example, the BigQuery target writes batches of events to GCS. In order to do it efficiently, the target should stream data to GCS and not keep the batch in memory. However, if there is an error when writing the 50th event in a 100 event transaction, the target is now stuck because it has to re-write the first 49 events again. 

Config API

The application configuration is of a same format as a pipeline configuration.

{
  "resources": {
    "vcores": 4,
    "memoryMB": 4096
  },
  "stages": [
    {
      "name": "my-oracle-db",
      "plugin": {
        "name": "oracle",
        "type": "cdc-origin",
        "artifact": {
          "scope": "SYSTEM",
          "name": "cdc-oracle",
          "version": "0.1.0"
        },
        "properties": {
          "user": "myuser",
          "password": "${secure(xyz)}",
          ...
        }
      }
    },
    {
      "name": "my-bq-dataset",
      "plugin": {
        "name": "bigquery",
        "type": "cdc-destination",
        "artifact": { ... },
        "properties": { ... }
      }
    }
  ],
  "connections": [
    { "from": "my-oracle-db", "to": "my-bq-dataset" }
  ]
}

This is more verbose than needed. This is done in order to allow for future versions of the application to support more complicated structures than a single source and a single target. For example, there may be future requirements around filtering or masking or multiple destinations. It also presents the same core API to the UI for batch, streaming, and replicator 'pipelines'. 

Program Type

The first version will run the source and target in a single CDAP worker. A worker is chosen over Spark because it gives full control over all the error scenarios.

If the replicator needs to be able to run in cloud environments, there is substantial provisioner related work to run workers with cloud profiles. This is out of the scope of this document.

Lineage

Dataset level and field level lineage is a key part of the value offering for CDAP. Today, the dataset lineage is handled by CDAP in a mostly transparent way based on dataset usage. Field level lineage is handled explicitly by applications by calling APIs that are available at prepare time. Both of these are inadequate for CDC, since table creation, deletion, and schema changes all occur when the program is running. In order to support lineage, the CDAP API will need to be enhanced to support emitting lineage at runtime.

public interface WorkerContext extends RuntimeContext, ..., LineageRecorder {


}

LineageRecorder is only added to WorkerContext to begin with in case this turns out to be problematic. It can be added to other program contexts, or most generally to RuntimeContext, if it is found to be useful. The implementation will need to be careful not to overload the messaging system. It will likely aggregate all lineage events in the past X amount of time and publish it all at once, similar to how metrics are published. This means lineage events can potentially be lost on program death. 

Metrics

Replicators emit several metrics that can be used to measure progress. Metric names are prefixed with the target name. This is to allow for adding support for multiple sources/targets in the future.

dml.insert - # of DML inserts applied

dml.update - # of DML updates applied

dml.delete - # of DML deletes applied

ddl - # of DDL operations applied

Validation

Replicators need to be validated before they are created, highlighting errors early during the creation process instead of at runtime. A similar mechanism to the one used for pipeline validation will be used, with a system service that instantiates the plugins, calls their configure methods, and returns errors. 

Why not Pipelines?

It is desirable that users would be able to manage their replicators in a single place, as a separate experience from pipelines. Implementing replicators with another app provides a natural way to separate them.

CDC is a much more targeted use case than a generic pipeline. It does not require all the flexibility with different aggregators, actions, etc. Implementing CDC via pipeline plugins would require users to setup their pipelines in a very specific way, with many ways to misconfigure their pipelines. CDC as a new application allows the app to control which plugins can be used in data copies.

Error handling is difficult to do with Spark streaming because the code doesn't have full control over what runs. For example, streaming pipelines often have issues when sources read events more quickly than the rest of the pipeline can process. Data builds up and can get dropped depending on the Spark settings and whether Receivers or InputDStreams are being used. It is not clear how to retry an event until it succeeds.

Pipelines are very schema forward, with connections between stages usually representing a constant schema. When the data flowing between stages can be of different schemas, the tool becomes much more difficult to understand.

CDC has a pretty standard set of metrics and state that are CDC specific. Rate of consumption, whether an initial snapshot is being executed, etc.

State is much more complicated to manage in a pipeline. For example, the source would need to be able to store offsets indicating which events have already been processed. It cannot do this until it knows the sink has successfully written that data, which means the source and sink need to be able to communicate. This means the source and sink are essentially coupled together, even though pipeline APIs try to discourage is. This becomes even more complicated if somebody decides to add multiple sinks for the same source.

API changes

New Programmatic APIs

The application will need to support additional sources and targets in the future. The UI will also need to be able to list available sources and targets and provide information about how they need to be configured. CDAP's plugin framework has all of this built out already.

Ideally, sometime in the future these plugin types will be consolidated with pipeline sources and sinks and wrangler connections.

Plugin Context

Plugins have access to a DeltaContext class that will provide integration points to the platform. The context will be used to store and read state, and to emit metrics.

public interface DeltaContext {

  String getReplicatorName();

  String getNamespace();

  String getRunId();

  Metrics getMetrics();

  void putOffset(Offset offset);

  void Offset getOffset();

  byte[] getState(String key);

  void putState(String key, byte[] val);
}

State is used to store additional information required by the plugins, such as a history of database table changes, used to keep track of table schemas at particular offsets. Storing and fetching offsets and state is not an efficient operation and is not expected to occur frequently. Underneath, state and offsets are written via the Hadoop FileSystem API, to whatever distributed filesystem CDAP is configured to use. Each replicator has its own directory so prevent conflicts with other replicators:

/cdap/replicators/<namespace>/<name>/<id>/offset
/cdap/replicators/<namespace>/<name>/<id>/state-<key>

DeltaSource

public interface DeltaSource {
  String PLUGIN_TYPE = "cdcSource";

  /**
   * Configure the source. This is called when the application is deployed.
   *
   * @param configurer configurer used to set configuration settings
   */
  void configure(Configurer configurer);

  /**
   * Create an event reader used to read change events.
   * This is called when the program is started and any time the 
   * source needs to reset to an earlier state due to errors in the replicator.
   *
   * @param context program context
   * @return an event reader used to read change events
   */
  EventReader createReader(DeltaContext context, EventEmitter eventEmitter);
}


public interface EventReader extends Runnable {

  /**
   * Initialize the reader. Guaranteed to be called before the run method. 
   */
  void initialize(Offset offset);

  /**
   * Stop reading events and close any resources in use. This should cause the run method to complete.
   */
  void close();

}

Example

public class MySqlEventReader implements EventReader {
  private final EventEmitter emitter;
  private EmbeddedEngine engine;

  public MySqlEventReader(String appName, MySqlConfig config, EventEmitter emitter) {
    this.config = config;
    this.appName = appName;
    this.emitter = emitter;
    this.executorService = Executors.newSingleThreadScheduledExecutor();
  }

  @Override
  public void start(Offset offset) {
    String fileStr = Bytes.toString(offset.get().getOrDefault("file", Bytes.toBytes("")));
    byte[] posBytes = offset.get().get("pos");
    String pos = posBytes == null ? "" : Long.toString(Bytes.toLong(posBytes));
    // Define the configuration used by Debezium MySQL
    Configuration debeziumConf = Configuration.create()
      .with("file", fileStr)
      .with("pos", pos)
      ...
      .build();
    MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);

    engine = EmbeddedEngine.create()
      .using(debeziumConf)
      .notifying(sourceRecord -> {
        Map<String, ?> sourceOffset = sourceRecord.sourceOffset();
        String binlogFile = (String) sourceOffset.get("file");
        long binlogPosition = (Long) sourceOffset.get("pos");
        Map<String, byte[]> deltaOffset = new HashMap<>(2);
        deltaOffset.put("file", Bytes.toBytes(binlogFile));
        deltaOffset.put("pos", Bytes.toBytes(binlogPosition));
        Offset recordOffset = new Offset(deltaOffset);

        String ddl = val.get("ddl");
        if (ddl != null) {
          emitter.emit(getDDLEvent(ddl));
          return;
        }
        
        StructuredRecord val = Records.convert((Struct) sourceRecord.value());
        StructuredRecord row = val.get("after");
        emitter.emit(new DMLEvent(...));
      })
      .build();
  }
  public void run() {
    engine.run();
  }

  public void stop() {
    if (engine != null && engine.stop()) {
      engine.await(1, TimeUnit.MINUTES);
    }
  }
}


DeltaTarget

public interface DeltaTarget {
  String PLUGIN_TYPE = "cdcTarget";

  /**
   * Configure the source. This is called when the application is deployed.
   *
   * @param configurer configurer used to set configuration settings
   */
  void configure(Configurer configurer);

  /**
   * Create an event consumer that replicates change events to the target system.
   *
   * @param context target context that provides access to application information and offset persistence
   * @return an event consumer that applies change events to the target system
   * @throws Exception if the consumer could not be created, which will result in the program failure
   */
  EventConsumer createConsumer(DeltaContext context) throws Exception;
}


public interface DeltaTargetContext extends DeltaRuntimeContext {

  /**
   * Commit changes up to the given offset. Once an offset is successfully committed, events up to that offset are
   * considered complete and will never be read again.
   *
   * @param offset offset to commit
   */
  void commitOffset(Offset offset);
}




public interface EventConsumer {

  void start();

  void stop();

  /**
   * Apply a DDL event, such as creating a table. This method must be idempotent. For example, if the event is a table
   * creation and the table already exists, this method should not fail due to an attempt to create a table that
   * already exists.
   *
   * Idempotency is required because the event can be applied multiple times in failure scenarios.
   * During normal operation, an event will be applied exactly once.
   * In failure scenarios the event will be applied at least once.
   *
   * If this method throws an Exception, the replicator offset will be reset to the last saved offset, which may
   * result in replayed change events.
   *
   * @param event ddl event to apply
   * @throws Exception if there was an error applying the DDL event
   */
  void applyDDL(DDLEvent event) throws Exception;

  /**
   * Apply a DML event. This method must be idempotent. For example, if there is an insert and the row already exists,
   * this method should not fail due to an attempt to insert a row that already exists, and it should not write
   * duplicate data.
   *
   * Idempotency is required because events can be applied multiple times in failure scenarios.
   * During normal operation, each event will be applied exactly once.
   * In failure scenarios each event will be applied at least once.
   * 
   * If this method throws an Exception, the replicator offset will be reset to the last saved offset, which may
   * result in replayed change events.
   *
   * @param event DML event to apply
   * @throws Exception if there was an error applying the DML event
   */
  void applyDML(DMLEvent event) throws Exception;

}

Example

public class BigQueryEventConsumer implements EventConsumer {
  private final DeltaTargetContext context;
  private final BigQuery bigQuery;
  private ScheduledExecutorService executorService;
  private ScheduledFuture<?> scheduledFlush;
  private Offset latestOffset;
  private List<DMLEvent> batch;

  public BigQueryEventConsumer(DeltaTargetContext context, BigQuery bigQuery) {
    this.context = context;
    this.bigQuery = bigQuery;
    this.executorService = Executors.newSingleThreadScheduledExecutor();
  }

  @Override
  public void start() {
    scheduledFlush = executorService.scheduleAtFixedRate(() -> {
      try {
        flush();
      } catch (InterruptedException e) {
        // just return and let things end
      }
    }, 60, 60, TimeUnit.SECONDS);
  }

  @Override
  public void stop() {
    scheduledFlush.cancel(true);
    executorService.shutdownNow();
    try {
      executorService.awaitTermination(10, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      // just return and let everything end
    }
  }

  @Override
  public void applyDDL(DDLEvent event) {
    switch (event.getOperation()) {
      case CREATE_DATABASE:
        DatasetId datasetId = DatasetId.of(project, event.getDatabase());
        if (bigQuery.getDataset(datasetId) == null) {
          DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(bucket.getLocation()).build();
          bigQuery.create(datasetInfo);
        }
        break;
      case DROP_DATABASE:
        datasetId = DatasetId.of(project, event.getDatabase());
        if (bigQuery.getDataset(datasetId) != null) {
          bigQuery.delete(datasetId);
        }
        break;
      case CREATE_TABLE:
        ...
      case DROP_TABLE:
        ...
      case ALTER_TABLE:
        ...
      case RENAME_TABLE:
        ...
      case TRUNCATE_TABLE:
        ...
    }
  }

  @Override
  public void applyDML(DMLEvent event) {
    batch.add(event);
    gcsWriter.write(sequencedEvent);
    latestOffset = sequencedEvent.getEvent().getOffset();
  }

  private synchronized void flush() throws InterruptedException {
    // write batch to GCS
    ...
    
    // load data from GCS into staging BQ table
    ...

    // run merge query from staging table to target table
    ...

    context.commitOffset(latestOffset);
  }

}

System Service

In order to help users during the pipeline creation process, a new Delta system service will be added, similar to the Wrangler and Pipeline system services. The Delta service is responsible for storing pipeline drafts, listing database tables, describe tables, and performing assessments on pipeline drafts.

REST APIs

PathMethodDescriptionRequest BodyResponse
/v1/contexts/<namespace>/draftsGETlist all drafts within a namespace

List of draft objects, where a draft contains the full pipeline config

[
  {
    "name": "xyz",
    "created": timestamp
    "config": {
      "stages": [
        {
          "name": "source",
          "plugin": {
            "name": "oracle",
            "type": "cdcSource",
            "properties": {
              "host": "[hostname]",
              "port": "1433",
              "user": "repluser",
              "password": "${secure(kms-key)}"
            }
          }
        },
        ...
      ]
    }
  }
]
/v1/contexts/<namespace>/drafts/<draft-id>PUTadd a new draft if it doesn't exist, or overwrite an existing one if it does

pipeline config

{
  "config": {
    "stages": [ ... ]
  }
}

/v1/contexts/<namespace>/drafts/<draft-id>DELETEdelete an existing draft

/v1/contexts/<namespace>/drafts/<draft-id>GETget detail about a draft

draft config object

{
  "config": {
    "stages": [ ... ]
  }
}
/v1/contexts/<namespace>/drafts/<draft-id>/listTablesPOSTlist tables based on the source configuration in the draft
{
  tables: [
    {
      database: "database1",
      name: "table1",
      numColumns: 50
    },
    {
      name: "table2",
      numColumns: 7
    }
  ]
}
/v1/contexts/<namespace>/drafts/<draft-id>/describeTablePOSTdescribe a specific table, based on source configuration in the draft
{
  "database": "db1",
  "table": "tayble"
}
{
  primaryKey: [
    "col1", "col2", "col3"
  ],
  columns: [
    {
      name: "column1",
      type: "VARCHAR",
      nullable: true
    },
    {
      name: "column2",
      type: "INTEGER",
      nullable: false
    }
  ]
}
/v1/contexts/{namespace}/drafts/{id}/assessPipelinePOSTassess the pipeline draft, based on source and target configurations
{
  tables: [
    {
      database: "database1",
      name: "table1",
      numColumns: 50,
      columnsNotSupported: 2,
      columnsPartiallySupported: 2
    },
    ...
  ],
  featureProblems: [
    {
       "name": "",
       "description": "",
       "suggestion": "",
       "impact": ""
    }
  ],
  connectivityProblems: [
    {
       "name": "",
       "description": "",
       "suggestion": "",
       "impact": ""
    }
  ]
}
/v1/contexts/{namespace}/drafts/{id}/assessTablePOSTassess a specific table, based on source and target configurations in the draft
{
  "database": "db1",
  "table": "tayble"
}
{
  columns: [
    {
      sourceName: "id",
      targetName: "",
      sourceType: ""
      targetType: "",
      support: YES | NO | PARTIAL,
      transforms: [],
      suggestion: {
        details: {
          message: ...
        } 
        transforms: [
        ] 
      }
    },
    ...
  ],
  featureProblems: [
    {
       "name": "",
       "description": "",
       "suggestion": "",
       "impact": ""
    }
  ],
  connectivityProblems: [
    {
       "name": "",
       "description": "",
       "suggestion": "",
       "impact": ""
    }
  ]
}
/v1/contexts/{namespace}/getStatePOSTget state information for each table and for the source
{
  "name": [replicator name],
  "offsetBasePath": [offset base path for the replicator]
}
{
  sourceState: OK | FAILING,
  sourceError: {
    message: some string,
    stackTrace: [
      { },
      ...
    ]
  },
  tables: [
    {
      database: 
      table:
      state: SNAPSHOTTING | REPLICATING | FAILING
      error: {
        message:
        stackTrace: [ ... ]
      }
    }
  ]
}


Data Model

Drafts will be stored in a system table used by the system service, which mean hostnames, ports, etc. will be stored in the table. The table has the following schema:

namespacegenerationnamecreatedupdatedconfig
CDAP namespace that the replicator lives ingeneration of the CDAP namespacereplicator namecreated timestampupdated timestampconfig object as a JSON string


The state returned by the /contexts/{namespace}/pipelines/{id}/state endpoint will be written to configured default CDAP distributed Filesystem, as both the worker program and the system service need access to it. Offsets, sequence number, and state are all be stored in a base directory:


/cdap/delta/[namespace]/[replicator name]/[replicator generation]/offset.[commit timestamp]

/cdap/delta/[namespace]/[replicator name]/[replicator generation]/state.json


Older offsets are kept around for some time to allow users to rewind pipelines to a previous offset. They contain the sequence number, plus any other information set by the source plugin. They are of the following format:

[sequence number (long)]
[# of entries in offset (int)]
[key1 length (int)]
[key1 bytes]
[val1 length (int)]
[val1 bytes]
...
[keyN length (int)]
[keyN bytes]
[valN length (int)]
[valN bytes]



State contains the information returned by the GET /v1/contexts/{namespace}/pipelines/{id}/state endpoint. For example:

{
  sourceState: OK | ERROR,
  sourceError: {
    message: some string,
    stackTrace: [
      { },
      ...
    ]
  },
  tables: [
    {
      database: 
      table:
      state: SNAPSHOT | REPLICATE | ERROR
      error: {
        message:
        stackTrace: [ ... ]
      }
    }
  ]
}


Deprecated Programmatic APIs

N/A

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors







Deprecated REST API

N/A

CLI Impact or Changes

N/A

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results












Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3


Future work