Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Checklist

  •  User Stories Documented
  •  User Stories Reviewed
  •  Design Reviewed
  •  APIs reviewed
  •  Release priorities assigned
  •  Test cases reviewed
  •  Blog post

Introduction 

Capacity of Enterprise Data Warehouses(EDW) are being exhausted with tremendous growth in the generated data. Traditional ETL processes can be used to offload the infrequently used data to the Hadoop cluster. These processes run periodically (weekly, daily) and do the bulk transfer of the data from source to the destination. However since these processes run periodically, it takes time for the data to be available in the Hadoop cluster. Also these processes do the bulk transfer, they put heavy load on the source production systems. 

Change Data Capture (CDC) can be used instead of traditional ETL for EDW offloads. CDC identifies, captures, and delivers only the changes that are made to the data systems. By processing only changes, CDC makes the extracting the data from the source data systems efficient without putting much load on the systems. Also since the changes are streamed continuously, latency between the time of change occur in the source system and corresponding change available in the target systems is also greatly reduced.

Goals

  1. Ability to have CDAP Datasets in sync with the source relational tables. Changes to the data and schema from the source table configured for the CDC should get applied to the CDAP datasets (HBase, Kudu, Hive etc).
  2. Document the setup for CDC. 

User Stories 

  • Breakdown of User-Stories 
  • User Story #1Joe is an admin of Enterprise Data Warehouse. He wants to offload the data to CDAP Dataset where he can perform analytics without affecting the production databases.
  • User Story #2
  • User Story #3

Design

Here we need to design for following aspects.

  1. Configurations required to setup and integrate the Oracle Golden Gate (OGG) CDC with the source database. OGG for big data can be setup to stream the change capture data to HDFS, HBase, Flume, and Kafka.
  2. Perform initial load when we configure the golden gate for the existing tables.
  3. Hydrator plugins required.
  4. Propagating schema changes from the source table to the destination CDAP dataset. (How to keep the Hive Schema in sync?)

Approach

Extracting Change Data from Source Database

  1. OGG will be configured to stream the change data to the Kafka topic say ''oggdata''.
  2. Both schema changes and actual data changes will be sent to the same topic.
  3. Kafka topic named "oggdata" will be created and configured to have single partition. This is to make sure that the changes from the OGG are received in orderly fashion since Kafka does not provide the ordering guarantees across multiple partitions.
  4. DDL changes will be streamed in the Wrapped Avro binary format. 

    Code Block
    {
      "type" : "record",
      "name" : "generic_wrapper",
      "namespace" : "oracle.goldengate",
      "fields" : [ {
        "name" : "table_name",
        "type" : "string"
      }, {
        "name" : "schema_hash",
        "type" : "int"
      }, {
        "name" : "payload",
        "type" : "bytes"
      } ]
    }
    • table_name: represents the name of the table on the source database to which the current change record belongs to.

    • schema_hash: represents the fingerprint of the Avro schema with which the message is generated

    • payload: the wrapped Avro message

  5. Schema will be propagated in the JSON encoded Avro format. Following is the example of sample Schema

    Code Block
    {
      "type" : "record",
      "name" : "CUSTORD",
      "namespace" : "GG",
      "fields" : [ {
        "name" : "table",
        "type" : "string"
      }, {
        "name" : "op_type",
        "type" : "string"
      }, {
        "name" : "op_ts",
        "type" : "string"
      }, {
        "name" : "current_ts",
        "type" : "string"
      }, {
        "name" : "pos",
        "type" : "string"
      }, {
        "name" : "primary_keys",
        "type" : {
          "type" : "array",
          "items" : "string"
        }
      }, {
        "name" : "tokens",
        "type" : {
          "type" : "map",
          "values" : "string"
        },
        "default" : { }
      }, {
        "name" : "before",
        "type" : [ "null", {
          "type" : "record",
          "name" : "columns",
          "fields" : [ {
            "name" : "CUST_CODE",
            "type" : [ "null", "string" ],
            "default" : null
          }, {
            "name" : "CUST_CODE_isMissing",
            "type" : "boolean"
          }, {
            "name" : "ORDER_DATE",
            "type" : [ "null", "string" ],
            "default" : null
          }, {
            "name" : "ORDER_DATE_isMissing",
            "type" : "boolean"
          }, {
            "name" : "PRODUCT_CODE",
            "type" : [ "null", "string" ],
            "default" : null
          }, {
            "name" : "PRODUCT_CODE_isMissing",
            "type" : "boolean"
          }, {
            "name" : "ORDER_ID",
            "type" : [ "null", "string" ],
            "default" : null
          }, {
            "name" : "ORDER_ID_isMissing",
            "type" : "boolean"
          }, {
            "name" : "PRODUCT_PRICE",
            "type" : [ "null", "double" ],
            "default" : null
          }, {
            "name" : "PRODUCT_PRICE_isMissing",
            "type" : "boolean"
          }, {
            "name" : "PRODUCT_AMOUNT",
            "type" : [ "null", "double" ],
            "default" : null
          }, {
            "name" : "PRODUCT_AMOUNT_isMissing",
            "type" : "boolean"
          }, {
            "name" : "TRANSACTION_ID",
            "type" : [ "null", "string" ],
            "default" : null
          }, {
            "name" : "TRANSACTION_ID_isMissing",
            "type" : "boolean"
          } ]
        } ],
        "default" : null
      }, {
        "name" : "after",
        "type" : [ "null", "columns" ],
        "default" : null
      } ]
    }

Hydrator Plugins

  1. CDAP Streaming Pipeline will be responsible for reading the changed data from the Kafka and create CDAP Datasets (Kudu, HBase, Hive etc.). Pipeline will look like following -



  2. Golden Gate Source is the simple realtime Kafka streaming source. It will forward the messages to the CDC Normalizer in the form of array of bytes.
  3. CDC Normalizer is responsible for converting array of bytes into StructuredRecord. The message received can contain either the Avro schema (DDL operation) or the wrapped Avro message (DML). The output Schema for the CDC Normalizer would be union of String (representing the Schema) and StructuredRecord (representing the wrapped Avro message).
  4. CDC Sink will accept the configuration for the name of the table to be used as Schema registry. Schema registry table can be shared between multiple pipelines.
    Following will be the RowKey for the schema registry table <source_table_name_with_namespace>:<schema_fingerprint> and the value would be JSON formatted Avro schema. Schema fingerprint would be computed using SchemaNormalization.fingerprint64 method.
  5. CDC Sink - 
    • Receives either the JSON formatted Avro Schema or wrapped Avro messages containing the changes data.
    • If the received message is Avro Schema, insert it into Schema Registry table.
    • If the received message contains wrapped Avro message the schema for it will be 

      Code Block
      {
        "type" : "record",
        "name" : "generic_wrapper",
        "namespace" : "oracle.goldengate",
        "fields" : [ {
          "name" : "table_name",
          "type" : "string"
        }, {
          "name" : "schema_hash",
          "type" : "int"
        }, {
          "name" : "payload",
          "type" : "bytes"
        } ]
      }

      Use <namespace>:<table_name>:<schema_hash> as a row key to get the associated Avro schema from the Schema registry. Use this schema to deserialize the payload.

    • Deserialized Avro message will contain metadata about the operation such as operation_type (INSERT, UPDATE, DELETE), operation_ts etc and will contain the actual change (columns and corresponding values), which can be used to perform the desired operation on the CDAP dataset.

Updating the CDAP dataset schema (TBD)

Open Questions

  1. How to perform initial load when the golden gate is configured for the existing table.
    https://docs.oracle.com/goldengate/1212/gg-winux/GWUAD/wu_initsync.htm#GWUAD557
  2. Handling errors in propagating the changes to Kafka from Golden Gate.
  3. How does the truncating of the truncate work?source table work? This needs experimentation.
  4. Configuring Golden Gate to read from different types of databases such as MySQL, postgres etc.

 

 

 

API changes

New Programmatic APIs

New Java APIs introduced (both user facing and internal)

Deprecated Programmatic APIs

New REST APIs

PathMethodDescriptionResponse CodeResponse
/v3/apps/<app-id>GETReturns the application spec for a given application

200 - On success

404 - When application is not available

500 - Any internal errors

 

     

Deprecated REST API

PathMethodDescription
/v3/apps/<app-id>GETReturns the application spec for a given application

CLI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

UI Impact or Changes

  • Impact #1
  • Impact #2
  • Impact #3

Security Impact 

What's the impact on Authorization and how does the design take care of this aspect

Impact on Infrastructure Outages 

System behavior (if applicable - document impact on downstream [YARN, HBase etc ] component failures) and how does the design take care of these aspect

Test Scenarios

Test IDTest DescriptionExpected Results
   
   
   
   

Releases

Release X.Y.Z

Release X.Y.Z

Related Work

  • Work #1
  • Work #2
  • Work #3

 

Future work