Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

One common use case is that of a user running a relational database with multiple tables. They would like to create copies of those tables in a data warehouse like BigQuery in a single, simple operation. All existing data should be copied first, then new changes (inserts, updates, deletes) that are applied to the relational db tables should be reflected in the BigQuery tables within minutes. Newly created tables in the relational db should automatically appear in BigQuery. Tables that are deleted in the relational db should be delete in BigQuery. Compatible schema changes should also be reflected.

Pipelines are usually not suitable for these types of use cases, which more closely resemble replication than incremental loads. It is possible to incrementally load data from a single table to a single BigQuery table if the table never has deletes or updates and has a monotonically increasing column. Most users do not have a write pattern like this, so a better solution is required. 

Goals

Design a way for users to easily create a continuously updating copy of their existing data.

Terminology

  • Replicator - A program that reads changes applied to some source storage system and applies – or replicates -- them to a target storage system 
  • Source - The storage system to replicate from. The first version focuses on relational databases.
  • Target - The storage system to replicate to. The first version focuses on data warehouses like BigQuery, Redshift, or Snowflake.
  • DDL event - An event involving a structure in the source, such as the creation, alteration, or deletion of a table
  • DML event - An event involving data in the source, such as the insertion, update, or deletion of a row in a table

User Stories

  • As a data admin, I want to be able to replicate data from Oracle, MySQL, or SQL Server
  • As a data admin, I want to be able to replicate data into BigQuery, Spanner, Redshift, or Snowflake
  • As a data admin, I want my replicated tables to be updated within 5 minutes of the source table being updated
  • As a data admin, if an event failed to replicate, but could possibly succeed on retry (ex: system temporarily down, somebody revoked privileges on the service account, etc), I want the replicator to be paused until the problem is fixed
  • As a data admin, if an event failed to replicate in a way that cannot succeed on retry (ex: db column type was altered, which isn't supported in the destination), I want the replicator to be paused until the problem is manually fixed by an admin
  • As a data admin, I want to know how many events failed to replicate to each target table
  • As a data admin, I do not want any events to be lost even if the replicator crashes
  • As a data admin, I do not want duplicate data in the target even if the replicator crashes
  • As a data admin, I want to be able to tell how far behind my target tables are compared to my source tables
  • As a data admin, I want to have some metrics around how quickly events are being replicated
  • As a data admin, I want to be able to pause and resume a replicator
  • As a data admin, I want to be able to delete a replicator
  • As a data admin, I want to be able to select a subset of source tables to replicate to my target
  • As a data admin, I want supported DDL events to be replicated to my destination system
  • As a data admin, I want unsupported DDL events in my source to pause the replicator and alert an admin until a manual resolution is performed


    Table of Contents



    Checklist

    •  User Stories Documented
    •  User Stories Reviewed
    •  Design Reviewed
    •  APIs reviewed
    •  Release priorities assigned
    •  Test cases reviewed
    •  Blog post

    Introduction 

    One common use case is that of a user running a relational database with multiple tables. They would like to create copies of those tables in a data warehouse like BigQuery in a single, simple operation. All existing data should be copied first, then new changes (inserts, updates, deletes) that are applied to the relational db tables should be reflected in the BigQuery tables within minutes. Newly created tables in the relational db should automatically appear in BigQuery. Tables that are deleted in the relational db should be delete in BigQuery. Compatible schema changes should also be reflected.

    Pipelines are usually not suitable for these types of use cases, which more closely resemble replication than incremental loads. It is possible to incrementally load data from a single table to a single BigQuery table if the table never has deletes or updates and has a monotonically increasing column. Most users do not have a write pattern like this, so a better solution is required. 

    Goals

    Design a way for users to easily create a continuously updating copy of their existing data.

    Terminology

    • Replicator - A program that reads changes applied to some source storage system and applies – or replicates -- them to a target storage system 
    • Source - The storage system to replicate from. The first version focuses on relational databases.
    • Target - The storage system to replicate to. The first version focuses on data warehouses like BigQuery, Redshift, or Snowflake.
    • DDL event - An event involving a structure in the source, such as the creation, alteration, or deletion of a table
    • DML event - An event involving data in the source, such as the insertion, update, or deletion of a row in a table

    User Stories

    1. As a data admin, I want to be able to configure whether tables that are dropped in my source should automatically be dropped in my targetreplicate data from Oracle, MySQL, or SQL Server
    2. As a data admin, I want to be able to see logs about my replicator in case there are issues (out of memory, permissions errors, etc)replicate data into BigQuery, Spanner, Redshift, or Snowflake
    3. As a data admin, I want to be able to find documentation about what type of database setup I need to perform on my source databasehave an SLO to know that X% of the time, my data is replicated within Y minutes
    4. As a data admin, if an event failed to replicate for any reason, I want to be able to browse my replicators in a single placethe replicator to retry for a configurable amount of time before stopping the replicator
    5. As a data admin, I want to be able to distinguish my replicators from pipelines and custom appsknow how many times the replicator failed to replicate an event
    6. As a data admin, I do not want any events to be able to test that my replicator is correctly configured before running it

    Design

    Approach

    At a high level, replicators are implemented by a new CDAP application that define new 'DeltaSource' and 'DeltaTarget' plugin interfaces.

    A DeltaSource is responsible for reading change events from a database and translating them into an ordered sequence of standard DDLEvents and DMLEvents. Sources begin by taking a snapshot of the current state of the database, then begin consuming change events from that moment on. Each event contains an Offset, which is a monotonically increasing and unique (at least within a single replicator). Given an offset, the source must be able to start reading events from that offset. 

    A DeltaTarget is responsible for taking the ordered sequence of events and replicating them to a storage system, as well as telling the app that it has finished replicating an event, allowing the app to store the offset for that event. Events will be send to a target exactly once during normal operation, but can be sent at least once in error scenarios. Once an offset has been successfully persisted, events prior to that number will never be seen again.

    Change events are represented as:

    No Format
    class DDLEvent {
      Offset offset;
      long sequenceNumber;
      boolean isSnapshot;
      DDLOperation operation; // "CREATE_DATABASE" | "DROP_DATABASE" | "CREATE_TABLE" | "DROP_TABLE" | "TRUNCATE_TABLE" | "ALTER_TABLE" | "RENAME_TABLE"
      Schema schema;
      String database;
      String prevTable; // used by renames
      String table;
      List<String> primaryKey;
    }
    
    
    class DMLEvent {
      Offset offset;
      long sequenceNumber;
      boolean isSnapshot;
      DMLOperation operation; // "INSERT" | "DELETE" | "UPDATE",
      String database;
      String table;
      StructuredRecord row;
    }
    
    
    class Offset implements Comparable<Offset> {
      Map<String, byte[]> offset;
    }

    Offsets are a map of values because different sources require different information to know where to start reading from. For example, MySQL events correspond to a specific position within a binlog file.

    The sequence number is a monotonically increasing number generated by the application. Sources are only responsible for attaching an Offset to each Event they emit. The application will then generate a sequence number and attach it to the event before sending it to the target. This is done because a monotonically increasing number makes it much easier for targets to implement their logic in an idempotent way, which is required to correctly handle failure scenarios.

    Example

    Source

    In this example, suppose the following queries are run on the source database:

    No Format
    CREATE DATABASE myDB;
    
    
    CREATE TABLE customers (id int, name varchar(50), PRIMARY KEY (id));
    
    
    INSERT INTO customers (id, name) VALUES (0, 'alice');
    
    
    UPDATE customers set name='ABC' where id=0;
    
    
    DELETE FROM customers where id=0;
    
    
    INSERT into customers (id, name) VALUES (0, 'Alice'), (1, 'blob');
    
    
    UPDATE customers set name='Bob' where id='1';

    The source generates the following DDL events:

    offsetsequence #operationdatabasetableschemaprimary key<binlog:mysql-bin.000003, pos:1424>0CREATE_DATABASE
    myDB
    <binlog:mysql-bin.000003, pos:1462>1CREATE_TABLE
    myDB
    customers
    id int, name varchar(50)
    id

    followed by the following DML events:

    offsetsequence #operationdatabasetablerow<binlog:mysql-bin.000003, pos:1462>2INSERTmyDBcustomers

    <id:0, name:alice>

    <binlog:mysql-bin.000003, pos:1482>3UPDATEmyDBcustomers

    <id:0, name:ABC>

    <binlog:mysql-bin.000003, pos:1519>4DELETEmyDBcustomers

    <id:0>

    <binlog:mysql-bin.000003, pos:1538>5INSERTmyDBcustomers

    <id:0, name:Alice>

    <binlog:mysql-bin.000003, pos:1557>6INSERTmyDBcustomers<id:1, name:blob><binlog:mysql-bin.000003, pos:1598>7UPDATEmyDBcustomers<id:1, name:Bob>

    Sequence number is attached by the application, the source is only responsible for attaching an offset to each event and defining how to compare offsets. In the MySQL case, offsets are compared by filename first, then position within the file.

    Target

    The BigQuery target batches DML events together and writes a batch of events to GCS. Once in GCS, it runs a BigQuery load job to load the changes into a staging table. Finally, it runs a Merge query to merge events from the staging table into the actual target table. Once that is complete, it persists the latest sequence number of events contained in the batch. DDL events are not batched together.

    For event #0, the target creates a BQ dataset named 'myDB'. Since it must assume the event occurs at least once, it checks if the dataset exists before creating it. After creating the dataset, the target calls a method that tells the application to remember that the event was replicated. The application stores the offset and sequence number for that event.

    For event #1, a staging table '_staging_customers' is created with the same schema as the source table, except with 3 extra columns – batchId, sequenceNum, and operation. Batch id is just the current timestamp of the load job. The table is partitioned on batchId and clustered on sequenceNum. This allows efficiently selecting data for a specific batchId while ordering by sequenceNum. Note that a BQ 'integer' is 8 bytes, equivalent to a Java long.

    _batchId (timestamp)_sequenceNum (integer)_operation (string)id (integer)name (string)

    the actual target table 'customers' is also created with the same schema as the source table, except with the sequence number as an additional column:

    _sequenceNumber (integer)id (integer)name (string)

    For events #2-7, the target may decide to batch them together in different ways. Supposing they all get batched together, after the load job, the staging table looks like:

    _batchId_sequenceNum_operationidname12345678902INSERT0alice12345678903UPDATE0ABC12345678904DELETE012345678905INSERT0Alice12345678906INSERT1blob12345678907UPDATE1Bob

    A merge query is then run to merge changes from the staging table into the final target table:

    No Format
    MERGE myDB.customers as T
    USING ($DIFF_QUERY) as D
    ON T.id = D.id
    WHEN MATCHED AND D._sequenceNumber > T._sequenceNumber AND D._op = "DELETE
      DELETE
    WHEN MATCHED AND D._sequenceNumber > T._sequenceNumber AND (D._op = "INSERT" OR D._op = "UPDATE")
      UPDATE id = D.id, name = D.name
    WHEN NOT MATCHED AND D._sequenceNumber > T._sequenceNumber AND (D._op = "INSERT" OR D._op = "UPDATE")
      INSERT (id, name) VALUES (id, name)

     Where the $DIFF_QUERY is:

    No Format
    SELECT S.* FROM
      (SELECT * FROM myDB._staging_customers WHERE _batchId = 1234567890) as S
      JOIN (
        SELECT id, MAX(_sequenceNum) as maxSequenceNum 
        FROM myDB._staging_customers 
        WHERE _batchId = 1234567890) as M
      ON S.id = M.id AND S._sequenceNum = M.maxSequenceNum

    The diff query is responsible for getting the latest change for each primary key. With the example above, it results in:

    _batchId_sequenceNum_operationidname12345678905INSERT0Alice12345678907UPDATE1BobWhen there is no primary key, the sequence number is essentially used as the primary key.

    Note: when there is a primary key, it is possible to implement the target in such a way where it doesn't need the additional sequence number column, exactly matching the source schema. However, this complicates the idempotency story, as the target would need to ensure that load and merge jobs are not run on data that was previously seen, requiring more complicated logic around using specific GCS object names and BQ load job ids. In addition, the sequence number is often something that is desired in case a user wants to modify their replicator somehow, but configure it to start consuming from a specific sequence number instead of from the beginning.

    Failure Handling

    In this first version, any errors encountered during the replication process will be logged, then retried until the operation succeeds. For example, if the source database is inaccessible, the replicator will keep trying to access it until it becomes accessible. Similarly, if a write to the target system fails, the target should retry until it succeeds. This may require admin intervention to modify a table in order to allow the replicator to make progress again.

    In future versions, it will likely make sense to support configurable policies, such as eventually skipping data that fails, writing the change event to files that data admins can examine at a later point in time.

    Transactions

    It is important to note that preservation of transactions is not one of the user stories for the initial version. For example, suppose 1000 rows are updated in a single transaction in the source. It might be the case that 500 of those rows are applied to the target, with the other 500 applied soon thereafter. What this means from a user perspective is that a user who queries the target may get results that were never possible to get if the source were queried.

    Honoring transactions at a table level can be added in a relatively straightforward manner if each event is enhanced to include a transaction id, where it is guaranteed that all events for a transaction are grouped together. That is, events from different transactions cannot be interleaved. If this is the case, targets can make sure to atomically apply changes from the same transaction.

    Config API

    The application configuration is of a same format as a pipeline configuration.

    No Format
    {
      "resources": {
        "vcores": 4,
        "memoryMB": 4096
      },
      "stages": [
        {
          "name": "my-oracle-db",
          "plugin": {
            "name": "oracle",
            "type": "cdc-origin",
            "artifact": {
              "scope": "SYSTEM",
              "name": "cdc-oracle",
              "version": "0.1.0"
            },
            "properties": {
              "user": "myuser",
              "password": "${secure(xyz)}",
              ...
            }
          }
        },
        {
          "name": "my-bq-dataset",
          "plugin": {
            "name": "bigquery",
            "type": "cdc-destination",
            "artifact": { ... },
            "properties": { ... }
          }
        }
      ],
      "connections": [
        { "from": "my-oracle-db", "to": "my-bq-dataset" }
      ]
    }

    This is more verbose than needed. This is done in order to allow for future versions of the application to support more complicated structures than a single source and a single target. For example, there may be future requirements around filtering or masking or multiple destinations.

    Program

    The first version will run the source and target in a single CDAP worker. A worker is chosen over Spark because it gives full control over all the error scenarios.

    Lineage

    Dataset level and field level lineage is a key part of the value offering for CDAP. Today, the dataset lineage is handled by CDAP in a mostly transparent way based on dataset usage. Field level lineage is handled explicitly by applications by calling APIs that are available at prepare time. Both of these are inadequate for CDC, since table creation, deletion, and schema changes all occur when the program is running. In order to support lineage, the CDAP Worker API will need to be enhanced to support emitting lineage at runtime.

    Code Block
    public interface WorkerContext extends RuntimeContext, ..., LineageRecorder {
    
    
    }

    LineageRecorder is only added to WorkerContext to begin with in case this turns out to be problematic. It can be added to other program contexts, or most generally to RuntimeContext, if it is found to be useful. The implementation will need to be careful not to overload the messaging system. It will likely aggregate all lineage events in the past X amount of time and publish it all at once, similar to how metrics are published. This means lineage events can potentially be lost on program death. 

    Why not Pipelines?

    There are product requirements around users managing their data copies in a single place, as a separate experience from pipelines. Implementing copies with another app provides a natural way to separate them.

    CDC is a much more targeted use case than a generic pipeline. It does not require all the flexibility with different aggregators, actions, etc. Implementing CDC via pipeline plugins would require users to setup their pipelines in a very specific way, with many ways to misconfigure their pipelines. CDC as a new application allows the app to control which plugins can be used in data copies. 

    Pipelines are very schema forward, with connections between stages usually representing a constant schema. When the data flowing between stages can be of different schemas, the tool becomes much more difficult to understand.

    CDC has a pretty standard set of metrics and state that are CDC specific. Rate of consumption, whether an initial snapshot is being executed, etc.

    State is much more complicated to manage in a pipeline. For example, the source would need to be able to store offsets indicating which events have already been processed. It cannot do this until it knows the sink has successfully written that data, which means the source and sink need to be able to communicate. This becomes even more complicated if somebody decides to add multiple sinks for the same source.

    API changes

    New Programmatic APIs

    The application will need to support additional sources and targets in the future. The UI will also need to be able to list available sources and targets and provide information about how they need to be configured. CDAP's plugin framework has all of this built out already.

    Ideally, sometime in the future these plugin types will be consolidated with pipeline sources and sinks and wrangler connections.

    DeltaSource

    The source is responsible for emitting events that the destination will persist. It is also responsible for attaching a transaction ID to each event that will allow the application to group together events for the same transaction before sending them to the destination.

    Code BlockDeltaTarget

    The destination is responsible for writing data to the downstream storage system. It is assumed that the program might die at any time, so the destination needs to be able to handle duplicate data. In other words, the writes for a single transaction must be atomic and idempotent. The framework will ensure that the destination is give all the data for a transaction in a single call.

    code
    1. lost even if the replicator crashes
    2. As a data admin, I do not want duplicate data in the target even if the replicator crashes
    3. As a data admin, I want to be able to tell how far behind my target tables are compared to my source tables
    4. As a data admin, I want to have some metrics around how quickly events are being replicated
    5. As a data admin, I want to be able to pause and resume a replicator
    6. As a data admin, I want to be able to delete a replicator
    7. As a data admin, I want to be able to select a subset of source tables to replicate to my target
    8. As a data admin, I want supported DDL events to be replicated to my destination system
    9. As a data admin, I want to be able to see logs about my replicator in case there are issues (out of memory, permissions errors, etc)
    10. As a data admin, I want to be able to find documentation about what type of database setup I need to perform on my source database
    11. As a data admin, I want to be able to test that my replicator is correctly configured before running it
    12. As a data admin, I want to track field level lineage for table that were replicated

    Design

    Approach

    At a high level, replicators are implemented by a new CDAP application that define new 'DeltaSource' and 'DeltaTarget' plugin interfaces.

    A DeltaSource is responsible for reading change events from a database and translating them into an ordered sequence of standard DDLEvents and DMLEvents. Sources begin by taking a snapshot of the current state of the database, then begin consuming change events from that moment on. Each event contains an Offset, which is a monotonically increasing and unique (at least within a single replicator). Given an offset, the source must be able to start reading events from that offset. 

    A DeltaTarget is responsible for taking the ordered sequence of events and replicating them to a storage system, as well as telling the app that it has finished replicating an event, allowing the app to store the offset for that event. Events will be send to a target exactly once during normal operation, but can be sent at least once in error scenarios. Once an offset has been successfully persisted, events prior to that number will never be seen again.

    Change events are represented as:

    No Format
    class DDLEvent {
      Offset offset;
      long sequenceNumber;
      String transactionId;
      boolean isSnapshot;
      DDLOperation operation; // "CREATE_DATABASE" | "DROP_DATABASE" | "CREATE_TABLE" | "DROP_TABLE" | "TRUNCATE_TABLE" | "ALTER_TABLE" | "RENAME_TABLE"
      Schema schema;
      String database;
      String prevTable; // used by renames
      String table;
      List<String> primaryKey;
    }
    
    
    class DMLEvent {
      Offset offset;
      long sequenceNumber;
      String transactionId;
      boolean isSnapshot;
      DMLOperation operation; // "INSERT" | "DELETE" | "UPDATE" | "COMMIT" ,
      String database;
      String table;
      StructuredRecord before; // null unless operation is "UPDATE" or "DELETE"
      StructuredRecord row;
    }
    
    
    interface Offset {
      
      // serialize the offset fields into the DataOutput
      void write(DataOutput out) throws IOException;
    
    
      // deserialize offset fields from the DataInput
      void readFields(DataInput in) throws IOException;
    }

    Each DeltaSource is responsible for defining it's own Offset implementation. This is because different sources require different information to know where to start reading from. For example, MySQL offsets correspond to a binlog file name and a position within that file. SQL Server offsets correspond to a change tracking sequence number for the database.

    The sequence number is a monotonically increasing number generated by the application, equal to the number of changes emitted by the source. Sources are only responsible for attaching an Offset to each Event they emit. The application will then generate a sequence number and attach it to the event before sending it to the target. This is done because a monotonically increasing number makes it much easier for targets to implement their logic in an idempotent way, which is required to correctly handle failure scenarios. In addition, the sequence number is used as a gauge metric to track progress being made by the replicator. 

    Examples

    With Primary Key

    Source

    In this example, suppose the following queries are run on the source database:

    No Format
    CREATE DATABASE myDB;
    
    CREATE TABLE customers (id int, name varchar(50), PRIMARY KEY (id));
    
    INSERT INTO customers (id, name) VALUES (0, 'alice');
    
    UPDATE customers set id=1 where id=0;
    
    UPDATE customers set id=2 where id=1;
    
    DELETE FROM customers where id=2;
    
    INSERT into customers (id, name) VALUES (0, 'Alice'), (1, 'blob');
    
    UPDATE customers set name='Bob' where id='1';

    The source generates the following DDL events:

    offsetoperationdatabasetableschemaprimary key
    <binlog:mysql-bin.000003, pos:1424>CREATE_DATABASE
    myDB



    <binlog:mysql-bin.000003, pos:1462>CREATE_TABLE
    myDB
    customers
    id int, name varchar(50)
    id

    followed by the following DML events:

    offsetoperationdatabasetablebeforerow
    <binlog:mysql-bin.000003, pos:1462>INSERTmyDBcustomers

    <id:0, name:alice>

    <binlog:mysql-bin.000003, pos:1482>UPDATEmyDBcustomers<id:0, name:alice>

    <id:1, name:alice>

    <binlog:mysql-bin.000003, pos:1493>UPDATEmyDBcustomers<id:1, name:alice><id:2, name:alice>
    <binlog:mysql-bin.000003, pos:1519>DELETEmyDBcustomers<id:2, name:alice>

    <id:2>

    <binlog:mysql-bin.000003, pos:1538>INSERTmyDBcustomers

    <id:0, name:Alice>

    <binlog:mysql-bin.000003, pos:1557>INSERTmyDBcustomers
    <id:1, name:blob>
    <binlog:mysql-bin.000003, pos:1598>UPDATEmyDBcustomers<id:1, name:blob><id:1, name:Bob>

    Sequence number is attached by the application, the source is only responsible for attaching an offset to each event and defining how to compare offsets. In the MySQL case, offsets are compared by filename first, then position within the file.

    Target

    The BigQuery target batches DML events together and writes a batch of events to GCS. Once in GCS, it runs a BigQuery load job to load the changes into a staging table. Finally, it runs a Merge query to merge events from the staging table into the actual target table. Once that is complete, it persists the latest sequence number of events contained in the batch. DDL events are not batched together.

    For event #0, the target creates a BQ dataset named 'myDB'. Since it must assume the event occurs at least once, it checks if the dataset exists before creating it. After creating the dataset, the target calls a method that tells the application to remember that the event was replicated. The application stores the offset and sequence number for that event.

    For event #1, a staging table '_staging_customers' is created that records values for the row before and after the change along with 3 extra columns – batchId, sequenceNum, and operation. Batch id is just the current timestamp of the load job. The table is partitioned on batchId and clustered on sequenceNum. This allows efficiently selecting data for a specific batchId while ordering by sequenceNum. Note that a BQ 'integer' is 8 bytes, equivalent to a Java long.

    _batch_id (timestamp)_sequence_num (integer)_operation (string)_before_id (integer)_before_name (string)id (integer)name (string)







    the actual target table 'customers' is also created with the same schema as the source table, except with the sequence number as an additional column:

    _sequence_num (integer)id (integer)name (string)



    For events #2-7, the target may decide to batch them together in different ways. Supposing they all get batched together, after the load job, the staging table looks like:

    _batch_id_sequence_num_operation_before_id_before_nameidname
    12345678902INSERT

    0alice
    12345678903UPDATE0alice1alice
    12345678904UPDATE1alice2alice
    12345678905DELETE2alice2
    12345678906INSERT

    0Alice
    12345678907INSERT

    1blob
    12345678908UPDATE1blob1Bob

    A merge query is then run to merge changes from the staging table into the final target table:

    No Format
    MERGE myDB.customers as T
    USING ($DIFF_QUERY) as D
    ON T.id = D._before_id
    WHEN MATCHED AND D._op = "DELETE
      DELETE
    WHEN MATCHED AND D._op IN ("INSERT", "UPDATE")
      UPDATE id = D.id, name = D.name
    WHEN NOT MATCHED AND D._op IN ("INSERT", "UPDATE")
      INSERT (id, name) VALUES (id, name)

     Where the $DIFF_QUERY is:

    No Format
    SELECT A.* FROM
      (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as A
      LEFT OUTER JOIN
      (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as B
      ON A.id = B._before_id AND A._sequence_num < B._sequence_num
      WHERE B._before_id IS NULL

    The diff query is responsible for getting the latest change for each primary key. With the example above, it results in:

    _batch_id_sequence_num_operation_before_id_before_nameidname
    12345678905DELETE2alice2
    12345678906INSERT

    0Alice
    12345678908UPDATE

    1Bob

    The $LATEST_APPLIED variable is the max sequence number seen in the target table. This is required to ensure idempotency – events that are replayed should not be re-inserted into the final target table. The latest applied sequence number can be tracked in memory by the target, except for the first time it sees the table, where it will need to run a SELECT MAX(_sequence_num) query. 

    Note: When there is a primary key, it is possible to implement the target in such a way where it doesn't need the additional sequence number column, exactly matching the source schema. However, this complicates the idempotency story, as the target would need to ensure that load and merge jobs are not run on data that was previously seen, requiring more complicated logic around using specific GCS object names and BQ load job ids.

    Without Primary Key

    If no primary key exists, a very similar set of steps occurs, except BigQuery will use all of the columns as the "primary key". 

    Note: SQL Server doesn't allow enabling change tracking on a table without a primary key.

    Source

    Suppose the following queries are run on the source database:

    No Format
    CREATE DATABASE myDB;
    
    CREATE TABLE customers (name varchar(50));
    
    INSERT INTO customers (name) VALUES ('alice', 'alice', 'Bob');
    
    UPDATE customers SET name = 'Alyce' WHERE name = 'alice';
    
    UPDATE customers SET name = 'Alice' WHERE name = 'Alyce';
    
    DELETE FROM customers WHERE name = 'alice';

    The source generates the following DDL events:

    offsetoperationdatabasetableschemaprimary key
    <binlog:mysql-bin.000003, pos:1424>CREATE_DATABASE
    myDB



    <binlog:mysql-bin.000003, pos:1462>CREATE_TABLE
    myDB
    customers
    name varchar(50)

    followed by the following DML events:

    offsettransaction idoperationdatabasetablebeforerow
    <binlog:mysql-bin.000003, pos:1462>0INSERTmyDBcustomers

    <name:alice>

    <binlog:mysql-bin.000003, pos:1482>0INSERTmyDBcustomers

    <name:alice>

    <binlog:mysql-bin.000003, pos:1493>0INSERTmyDBcustomers
    <name:Bob>
    <binlog:mysql-bin.000003, pos:1519>1UPDATEmyDBcustomers<name:alice>

    <name:Alyce>

    <binlog:mysql-bin.000003, pos:1538>1UPDATEmyDBcustomers<name:alice>

    <name:Alyce>

    <binlog:mysql-bin.000003, pos:1557>2UPDATEmyDBcustomers<name:Alyce><name:Alice>
    <binlog:mysql-bin.000003, pos:1598>2UPDATEmyDBcustomers<name:Alyce><name:Alice>
    <binlog:mysql-bin.000003, pos:1603>3DELETEmyDBcustomers<name:Alice><name:Alice>
    <binlog:mysql-bin.000003, pos:1605>3DELETEmyDBcustomers<name:Alice><name:Alice>
    Target

    The BigQuery target loads the DML events into a staging table


    _batch_id_sequence_num_operation_before_namename
    12345678902INSERT
    alice
    12345678903INSERT
    alice
    12345678904INSERT
    Bob
    12345678905UPDATEaliceAlyce
    12345678906UPDATEaliceAlyce
    12345678907UPDATEAlyceAlice
    12345678908UPDATEAlyceAlice
    12345678909DELETEAliceAlice
    123456789010DELETEAliceAlice

    A merge query is then run to merge changes from the staging table into the final target table:

    No Format
    MERGE myDB.customers as T
    USING ($DIFF_QUERY) as D
    ON T.name = D._before_name
    WHEN MATCHED AND D._op = "DELETE
      DELETE
    WHEN MATCHED AND D._op IN ("INSERT", "UPDATE")
      UPDATE id = D._after_id, name = D._after_name
    WHEN NOT MATCHED AND D._op IN ("INSERT", "UPDATE")
      INSERT (id, name) VALUES (_after_id, _after_name)

     Where the $DIFF_QUERY is:

    No Format
    SELECT A.* FROM
      (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as A
      LEFT OUTER JOIN
      (SELECT * FROM myDB._staging_customers WHERE _batch_id = 1234567890 AND _sequence_num > $LATEST_APPLIED) as B
      ON A.name = B._before_name AND A._sequence_num < B._sequence_num
      WHERE B._before_name IS NULL

    The diff query is responsible for getting the latest change for each row. With the example above, it results in:

    _batch_id_sequence_num_operation_before_namename
    12345678904INSERT
    Bob
    123456789010DELETEAliceAlice

    Note that if events from the same transaction are applied in different batches, it is possible to get into a state that was never possible at the source. For example, suppose events 2-7 are placed in the same batch. Then the diff query would result in:

    _batch_id_sequence_num_operation_before_namename
    12345678904INSERT
    Bob
    12345678907UPDATEAlyceAlice

    This would result in the file table having a single 'Bob' and 'Alice' row, which was never possible in the source.

    Transactions

    A best attempt will be made to honor transactional operations. For example, if 1000 rows are modified in a single transaction in the source database, the target will attempt to replicate those changes all in the same transaction.

    Transactions are supported if the following conditions are met:

    1. Source attaches a transactionId to each change event.
    2. Source emits a 'commit' DML event at the end of a transaction.
    3. Source does not interleave events from different transactions. In other words, there is always a 'commit' event for the current transaction before the next transaction starts. 
    4. Target transactionally replicates all events from a single transaction
    5. Target only saves offsets at a commit boundary

    Note that #4 is not possible for many storage systems. For example, BigQuery can atomically update multiple rows in a single table, but not across multiple tables. Also note that a target may decide to bundle multiple transactions from the source into a single transaction on the target.

    Note that #5 may require the target to save offsets in the same transaction as the data write. For example, when implementing a relational DB target, in order to avoid problems with reprocessing the same events, the target will need to write the offset to an offsets table in the DB in the same transaction that it is uses to apply the DML events.

    Example

    Suppose the following SQL queries are run on a database:

    No Format
    START TRANSACTION;
    
    
    INSERT INTO customers (id, name) VALUES (0, 'alice'), (1, 'bob');
    UPDATE customers SET name = 'Alice' where id = 0;
    UPDATE customers SET name = 'Bob' where id = 1;
    
    
    COMMIT;
    
    
    START TRANSACTION;
    
    
    DELETE FROM customers WHERE id = 1;
    INSERT INTO customers (id, name) VALUES (1, 'Bobby');
    
    
    COMMIT;

    This would result in the following DML Events:

    operationtransactionIdtablerow
    INSERTtx123customers<id:0, name:'alice'>
    INSERTtx123customers<id:1, name:'bob'>
    UPDATEtx123customers<id:0, name:'Alice'>
    UPDATEtx123customers<id:1, name:'Bob'>
    COMMITtx123customers
    DELETEtx456customers<id:1>
    INSERTtx456customers<id:1, name:'Bobby'>
    COMMITtx456customers

    The BigQuery target used in the previous example would need to ensure that all the events for 'tx123' are written to the same GCS file and loaded into the staging table with the same batch id. It would be ok if both 'tx123' and 'tx456' are in the same batch, but not if a batch doesn't contain all the events from a transaction. 

    Note that support for transactions is not documented as a user story. This will be a stretch goal for the first version because the best the targets can do is single table transactions, and many data warehousing use cases do not require such strict requirements.

    Failure Handling

    In this first version, any errors encountered during the replication process will be logged, then retried until the operation succeeds. For example, if the source database is inaccessible, the replicator will keep trying to access it until it becomes accessible. Similarly, if a write to the target system fails, the target should retry until it succeeds. This may require admin intervention to modify a table in order to allow the replicator to make progress again.

    In future versions, it will likely make sense to support configurable policies, such as eventually skipping data that fails, writing the change event to files that data admins can examine at a later point in time.

    Offset/State failures

    Failures to read or write offsets and state will be repeatedly retried by the application for a configurable amount of time before the replicator fails. The replicator will not make any progress during this time.

    Source/Target failures

    If a source or target throws an exception at any point, the replicator will reset its offset to the last saved offset and start consuming events from that point forward. This will result in events being seen more than once. Plugins must be implemented with this in mind to ensure events are applied exactly once, even though they may be processed at least once.

    Offsets are reset so that targets don't have to remember what they have done since the last offset was persisted. If the replicator application did not reset the offset, and just retried method calls, targets would be forced to keep change events in memory until they successfully store an offset. They would have to in case they encounter a transient error in the middle of a transaction. For example, the BigQuery target writes batches of events to GCS. In order to do it efficiently, the target should stream data to GCS and not keep the batch in memory. However, if there is an error when writing the 50th event in a 100 event transaction, the target is now stuck because it has to re-write the first 49 events again. 

    Config API

    The application configuration is of a same format as a pipeline configuration.

    No Format
    {
      "resources": {
        "vcores": 4,
        "memoryMB": 4096
      },
      "stages": [
        {
          "name": "my-oracle-db",
          "plugin": {
            "name": "oracle",
            "type": "cdc-origin",
            "artifact": {
              "scope": "SYSTEM",
              "name": "cdc-oracle",
              "version": "0.1.0"
            },
            "properties": {
              "user": "myuser",
              "password": "${secure(xyz)}",
              ...
            }
          }
        },
        {
          "name": "my-bq-dataset",
          "plugin": {
            "name": "bigquery",
            "type": "cdc-destination",
            "artifact": { ... },
            "properties": { ... }
          }
        }
      ],
      "connections": [
        { "from": "my-oracle-db", "to": "my-bq-dataset" }
      ]
    }

    This is more verbose than needed. This is done in order to allow for future versions of the application to support more complicated structures than a single source and a single target. For example, there may be future requirements around filtering or masking or multiple destinations. It also presents the same core API to the UI for batch, streaming, and replicator 'pipelines'. 

    Program Type

    The first version will run the source and target in a single CDAP worker. A worker is chosen over Spark because it gives full control over all the error scenarios.

    If the replicator needs to be able to run in cloud environments, there is substantial provisioner related work to run workers with cloud profiles. This is out of the scope of this document.

    Lineage

    Dataset level and field level lineage is a key part of the value offering for CDAP. Today, the dataset lineage is handled by CDAP in a mostly transparent way based on dataset usage. Field level lineage is handled explicitly by applications by calling APIs that are available at prepare time. Both of these are inadequate for CDC, since table creation, deletion, and schema changes all occur when the program is running. In order to support lineage, the CDAP API will need to be enhanced to support emitting lineage at runtime.

    Code Block
    public interface WorkerContext extends RuntimeContext, ..., LineageRecorder {
    
    
    }

    LineageRecorder is only added to WorkerContext to begin with in case this turns out to be problematic. It can be added to other program contexts, or most generally to RuntimeContext, if it is found to be useful. The implementation will need to be careful not to overload the messaging system. It will likely aggregate all lineage events in the past X amount of time and publish it all at once, similar to how metrics are published. This means lineage events can potentially be lost on program death. 

    Metrics

    Replicators emit several metrics that can be used to measure progress. Metric names are prefixed with the target name. This is to allow for adding support for multiple sources/targets in the future.

    dml.insert - # of DML inserts applied

    dml.update - # of DML updates applied

    dml.delete - # of DML deletes applied

    ddl - # of DDL operations applied

    Validation

    Replicators need to be validated before they are created, highlighting errors early during the creation process instead of at runtime. A similar mechanism to the one used for pipeline validation will be used, with a system service that instantiates the plugins, calls their configure methods, and returns errors. 

    Why not Pipelines?

    It is desirable that users would be able to manage their replicators in a single place, as a separate experience from pipelines. Implementing replicators with another app provides a natural way to separate them.

    CDC is a much more targeted use case than a generic pipeline. It does not require all the flexibility with different aggregators, actions, etc. Implementing CDC via pipeline plugins would require users to setup their pipelines in a very specific way, with many ways to misconfigure their pipelines. CDC as a new application allows the app to control which plugins can be used in data copies.

    Error handling is difficult to do with Spark streaming because the code doesn't have full control over what runs. For example, streaming pipelines often have issues when sources read events more quickly than the rest of the pipeline can process. Data builds up and can get dropped depending on the Spark settings and whether Receivers or InputDStreams are being used. It is not clear how to retry an event until it succeeds.

    Pipelines are very schema forward, with connections between stages usually representing a constant schema. When the data flowing between stages can be of different schemas, the tool becomes much more difficult to understand.

    CDC has a pretty standard set of metrics and state that are CDC specific. Rate of consumption, whether an initial snapshot is being executed, etc.

    State is much more complicated to manage in a pipeline. For example, the source would need to be able to store offsets indicating which events have already been processed. It cannot do this until it knows the sink has successfully written that data, which means the source and sink need to be able to communicate. This means the source and sink are essentially coupled together, even though pipeline APIs try to discourage is. This becomes even more complicated if somebody decides to add multiple sinks for the same source.

    API changes

    New Programmatic APIs

    The application will need to support additional sources and targets in the future. The UI will also need to be able to list available sources and targets and provide information about how they need to be configured. CDAP's plugin framework has all of this built out already.

    Ideally, sometime in the future these plugin types will be consolidated with pipeline sources and sinks and wrangler connections.

    Plugin Context

    Plugins have access to a DeltaContext class that will provide integration points to the platform. The context will be used to store and read state, and to emit metrics.

    Code Block
    public interface DeltaContext {
    
      String getReplicatorName();
    
      String getNamespace();
    
      String getRunId();
    
      Metrics getMetrics();
    
      void putOffset(Offset offset);
    
      void Offset getOffset();
    
      byte[] getState(String key);
    
      void putState(String key, byte[] val);
    }

    State is used to store additional information required by the plugins, such as a history of database table changes, used to keep track of table schemas at particular offsets. Storing and fetching offsets and state is not an efficient operation and is not expected to occur frequently. Underneath, state and offsets are written via the Hadoop FileSystem API, to whatever distributed filesystem CDAP is configured to use. Each replicator has its own directory so prevent conflicts with other replicators:

    No Format
    /cdap/replicators/<namespace>/<name>/<id>/offset
    /cdap/replicators/<namespace>/<name>/<id>/state-<key>

    DeltaSource

    Code Block
    public interface DeltaSource {
      String PLUGIN_TYPE = "cdcSource";
    
      /**
       * Configure the source. This is called when the application is deployed.
       *
       * @param configurer configurer used to set configuration settings
       */
      void configure(Configurer configurer);
    
      /**
       * Create an event reader used to read change events.
       * This is called when the program is started and any time the 
       * source needs to reset to an earlier state due to errors in the replicator.
       *
       * @param context program context
       * @return an event reader used to read change events
       */
      EventReader createReader(DeltaContext context, EventEmitter eventEmitter);
    }
    
    
    public interface EventReader extends Runnable {
    
      /**
       * Initialize the reader. Guaranteed to be called before the run method. 
       */
      void initialize(Offset offset);
    
      /**
       * Stop reading events and close any resources in use. This should cause the run method to complete.
       */
      void close();
    
    }

    Example

    Code Block
    public class MySqlEventReader implements EventReader {
      private final EventEmitter emitter;
      private EmbeddedEngine engine;
    
      public MySqlEventReader(String appName, MySqlConfig config, EventEmitter emitter) {
        this.config = config;
        this.appName = appName;
        this.emitter = emitter;
        this.executorService = Executors.newSingleThreadScheduledExecutor();
      }
    
      @Override
      public void start(Offset offset) {
        String fileStr = Bytes.toString(offset.get().getOrDefault("file", Bytes.toBytes("")));
        byte[] posBytes = offset.get().get("pos");
        String pos = posBytes == null ? "" : Long.toString(Bytes.toLong(posBytes));
        // Define the configuration used by Debezium MySQL
        Configuration debeziumConf = Configuration.create()
          .with("file", fileStr)
          .with("pos", pos)
          ...
          .build();
        MySqlConnectorConfig mysqlConf = new MySqlConnectorConfig(debeziumConf);
    
        engine = EmbeddedEngine.create()
          .using(debeziumConf)
          .notifying(sourceRecord -> {
            Map<String, ?> sourceOffset = sourceRecord.sourceOffset();
            String binlogFile = (String) sourceOffset.get("file");
            long binlogPosition = (Long) sourceOffset.get("pos");
            Map<String, byte[]> deltaOffset = new HashMap<>(2);
            deltaOffset.put("file", Bytes.toBytes(binlogFile));
            deltaOffset.put("pos", Bytes.toBytes(binlogPosition));
            Offset recordOffset = new Offset(deltaOffset);
    
            String ddl = val.get("ddl");
            if (ddl != null) {
              emitter.emit(getDDLEvent(ddl));
              return;
            }
            
            StructuredRecord val = Records.convert((Struct) sourceRecord.value());
            StructuredRecord row = val.get("after");
            emitter.emit(new DMLEvent(...));
          })
          .build();
      }
      public void run() {
        engine.run();
      }
    
      public void stop() {
        if (engine != null && engine.stop()) {
          engine.await(1, TimeUnit.MINUTES);
        }
      }
    }


    DeltaTarget

    Code Block
    public interface DeltaTarget {
      String PLUGIN_TYPE = "cdcTarget";
    
      /**
       * Configure the source. This is called when the application is deployed.
       *
       * @param configurer configurer used to set configuration settings
       */
      void configure(Configurer configurer);
    
      /**
       * Create an event consumer that replicates change events to the target system.
       *
       * @param context target context that provides access to application information and offset persistence
       * @return an event consumer that applies change events to the target system
       * @throws Exception if the consumer could not be created, which will result in the program failure
       */
      EventConsumer createConsumer(DeltaContext context) throws Exception;
    }
    
    
    public interface DeltaTargetContext extends DeltaRuntimeContext {
    
      /**
       * Commit changes up to the given offset. Once an offset is successfully committed, events up to that offset are
       * considered complete and will never be read again.
       *
       * @param offset offset to commit
       */
      void commitOffset(Offset offset);
    }
    
    
    
    
    public interface EventConsumer {
    
      void start();
    
      void stop();
    
      /**
       * Apply a DDL event, such as creating a table. This method must be idempotent. For example, if the event is a table
       * creation and the table already exists, this method should not fail due to an attempt to create a table that
       * already exists.
       *
       * Idempotency is required because the event can be applied multiple times in failure scenarios.
       * During normal operation, an event will be applied exactly once.
       * In failure scenarios the event will be applied at least once.
       *
       * If this method throws an Exception, the replicator offset will be reset to the last saved offset, which may
       * result in replayed change events.
       *
       * @param event ddl event to apply
       * @throws Exception if there was an error applying the DDL event
       */
      void applyDDL(DDLEvent event) throws Exception;
    
      /**
       * Apply a DML event. This method must be idempotent. For example, if there is an insert and the row already exists,
       * this method should not fail due to an attempt to insert a row that already exists, and it should not write
       * duplicate data.
       *
       * Idempotency is required because events can be applied multiple times in failure scenarios.
       * During normal operation, each event will be applied exactly once.
       * In failure scenarios each event will be applied at least once.
       * 
       * If this method throws an Exception, the replicator offset will be reset to the last saved offset, which may
       * result in replayed change events.
       *
       * @param event DML event to apply
       * @throws Exception if there was an error applying the DML event
       */
      void applyDML(DMLEvent event) throws Exception;
    
    }

    Example

    Code Block
    public class BigQueryEventConsumer implements EventConsumer {
      private final DeltaTargetContext context;
      private final BigQuery bigQuery;
      private ScheduledExecutorService executorService;
      private ScheduledFuture<?> scheduledFlush;
      private Offset latestOffset;
      private List<DMLEvent> batch;
    
      public BigQueryEventConsumer(DeltaTargetContext context, BigQuery bigQuery) {
        this.context = context;
        this.bigQuery = bigQuery;
        this.executorService = Executors.newSingleThreadScheduledExecutor();
      }
    
      @Override
      public void start() {
        scheduledFlush = executorService.scheduleAtFixedRate(() -> {
          try {
            flush();
          } catch (InterruptedException e) {
            // just return and let things end
          }
        }, 60, 60, TimeUnit.SECONDS);
      }
    
      @Override
      public void stop() {
        scheduledFlush.cancel(true);
        executorService.shutdownNow();
        try {
          executorService.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
          // just return and let everything end
        }
      }
    
      @Override
      public void applyDDL(DDLEvent event) {
        switch (event.getOperation()) {
          case CREATE_DATABASE:
            DatasetId datasetId = DatasetId.of(project, event.getDatabase());
            if (bigQuery.getDataset(datasetId) == null) {
              DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(bucket.getLocation()).build();
              bigQuery.create(datasetInfo);
            }
            break;
          case DROP_DATABASE:
            datasetId = DatasetId.of(project, event.getDatabase());
            if (bigQuery.getDataset(datasetId) != null) {
              bigQuery.delete(datasetId);
            }
            break;
          case CREATE_TABLE:
            ...
          case DROP_TABLE:
            ...
          case ALTER_TABLE:
            ...
          case RENAME_TABLE:
            ...
          case TRUNCATE_TABLE:
            ...
        }
      }
    
      @Override
      public void applyDML(DMLEvent event) {
        batch.add(event);
        gcsWriter.write(sequencedEvent);
        latestOffset = sequencedEvent.getEvent().getOffset();
      }
    
      private synchronized void flush() throws InterruptedException {
        // write batch to GCS
        ...
        
        // load data from GCS into staging BQ table
        ...
    
        // run merge query from staging table to target table
        ...
    
        context.commitOffset(latestOffset);
      }
    
    }

    System Service

    In order to help users during the pipeline creation process, a new Delta system service will be added, similar to the Wrangler and Pipeline system services. The Delta service is responsible for storing pipeline drafts, listing database tables, describe tables, and performing assessments on pipeline drafts.

    REST APIs

    PathMethodDescriptionRequest BodyResponse
    /v1/contexts/<namespace>/draftsGETlist all drafts within a namespace

    List of draft objects, where a draft contains the full pipeline config

    Code Block
    [
      {
        "name": "xyz",
        "created": timestamp
        "config": {
          "stages": [
            {
              "name": "source",
              "plugin": {
                "name": "oracle",
                "type": "cdcSource",
                "properties": {
                  "host": "[hostname]",
                  "port": "1433",
                  "user": "repluser",
                  "password": "${secure(kms-key)}"
                }
              }
            },
            ...
          ]
        }
      }
    ]
    /v1/contexts/<namespace>/drafts/<draft-id>PUTadd a new draft if it doesn't exist, or overwrite an existing one if it does

    pipeline config

    Code Block
    {
      "config": {
        "stages": [ ... ]
      }
    }

    /v1/contexts/<namespace>/drafts/<draft-id>DELETEdelete an existing draft

    /v1/contexts/<namespace>/drafts/<draft-id>GETget detail about a draft

    draft config object

    Code Block
    {
      "config": {
        "stages": [ ... ]
      }
    }
    /v1/contexts/<namespace>/drafts/<draft-id>/listTablesPOSTlist tables based on the source configuration in the draft
    Code Block
    {
      tables: [
        {
          database: "database1",
          name: "table1",
          numColumns: 50
        },
        {
          name: "table2",
          numColumns: 7
        }
      ]
    }
    /v1/contexts/<namespace>/drafts/<draft-id>/describeTablePOSTdescribe a specific table, based on source configuration in the draft
    Code Block
    {
      "database": "db1",
      "table": "tayble"
    }
    Code Block
    {
      primaryKey: [
        "col1", "col2", "col3"
      ],
      columns: [
        {
          name: "column1",
          type: "VARCHAR",
          nullable: true
        },
        {
          name: "column2",
          type: "INTEGER",
          nullable: false
        }
      ]
    }
    /v1/contexts/{namespace}/drafts/{id}/assessPipelinePOSTassess the pipeline draft, based on source and target configurations
    Code Block
    {
      tables: [
        {
          database: "database1",
          name: "table1",
          numColumns: 50,
          columnsNotSupported: 2,
          columnsPartiallySupported: 2
        },
        ...
      ],
      featureProblems: [
        {
           "name": "",
           "description": "",
           "suggestion": "",
           "impact": ""
        }
      ],
      connectivityProblems: [
        {
           "name": "",
           "description": "",
           "suggestion": "",
           "impact": ""
        }
      ]
    }
    /v1/contexts/{namespace}/drafts/{id}/assessTablePOSTassess a specific table, based on source and target configurations in the draft
    Code Block
    {
      "database": "db1",
      "table": "tayble"
    }
    Code Block
    {
      columns: [
        {
          sourceName: "id",
          targetName: "",
          sourceType: ""
          targetType: "",
          support: YES | NO | PARTIAL,
          transforms: [],
          suggestion: {
            details: {
              message: ...
            } 
            transforms: [
            ] 
          }
        },
        ...
      ],
      featureProblems: [
        {
           "name": "",
           "description": "",
           "suggestion": "",
           "impact": ""
        }
      ],
      connectivityProblems: [
        {
           "name": "",
           "description": "",
           "suggestion": "",
           "impact": ""
        }
      ]
    }
    /v1/contexts/{namespace}/getStatePOSTget state information for each table and for the source
    Code Block
    {
      "name": [replicator name],
      "offsetBasePath": [offset base path for the replicator]
    }
    Code Block
    {
      sourceState: OK | FAILING,
      sourceError: {
        message: some string,
        stackTrace: [
          { },
          ...
        ]
      },
      tables: [
        {
          database: 
          table:
          state: SNAPSHOTTING | REPLICATING | FAILING
          error: {
            message:
            stackTrace: [ ... ]
          }
        }
      ]
    }


    Data Model

    Drafts will be stored in a system table used by the system service, which mean hostnames, ports, etc. will be stored in the table. The table has the following schema:

    namespacegenerationnamecreatedupdatedconfig
    CDAP namespace that the replicator lives ingeneration of the CDAP namespacereplicator namecreated timestampupdated timestampconfig object as a JSON string


    The state returned by the /contexts/{namespace}/pipelines/{id}/state endpoint will be written to configured default CDAP distributed Filesystem, as both the worker program and the system service need access to it. Offsets, sequence number, and state are all be stored in a base directory:


    /cdap/delta/[namespace]/[replicator name]/[replicator generation]/offset.[commit timestamp]

    /cdap/delta/[namespace]/[replicator name]/[replicator generation]/state.json


    Older offsets are kept around for some time to allow users to rewind pipelines to a previous offset. They contain the sequence number, plus any other information set by the source plugin. They are of the following format:

    Code Block
    [sequence number (long)]
    [# of entries in offset (int)]
    [key1 length (int)]
    [key1 bytes]
    [val1 length (int)]
    [val1 bytes]
    ...
    [keyN length (int)]
    [keyN bytes]
    [valN length (int)]
    [valN bytes]



    State contains the information returned by the GET /v1/contexts/{namespace}/pipelines/{id}/state endpoint. For example:

    Code Block
    {
      sourceState: OK | ERROR,
      sourceError: {
        message: some string,
        stackTrace: [
          { },
          ...
        ]
      },
      tables: [
        {
          database: 
          table:
          state: SNAPSHOT | REPLICATE | ERROR
          error: {
            message:
            stackTrace: [ ... ]
          }
        }
      ]
    }


    Deprecated Programmatic APIs

    N/A

    New REST APIs

    PathMethodDescriptionResponse CodeResponse
    /v3/apps/<app-id>GETReturns the application spec for a given application

    200 - On success

    404 - When application is not available

    500 - Any internal errors







    Deprecated REST API

    N/A

    CLI Impact or Changes

    N/A

    UI Impact or Changes

    • Impact #1
    • Impact #2
    • Impact #3

    Security Impact 

    What's the impact on Authorization and how does the design take care of this aspect

    Impact on Infrastructure Outages 

    System behavior (if applicable - document impact on downstream [ YARN, HBase etc ] component failures) and how does the design take care of these aspect

    Test Scenarios

    Test IDTest DescriptionExpected Results












    Releases

    Release X.Y.Z

    Release X.Y.Z

    Related Work

    • Work #1
    • Work #2
    • Work #3


    Future work