Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

User Stories 

  • As a data pipeline user, I would like to be able to track data changes in Salesforce and load it into a sink.

Requirements

  • Login URL should be configurable
  • List of objects should be configurable

General Overview of Salesforce CDC API

According to documentation Salesforce allows subscribe for all/standard/custom changes.

  • All Change Events

/data/ChangeEvents

  • A Standard Object

/data/<Standard_Object_Name>ChangeEvent

  • For example, the channel to subscribe to change events for Account records is:

/data/AccountChangeEvent

  • A Custom Object

/data/<Custom_Object_Name>__ChangeEvent

  • For example, the channel to subscribe to change events for Employee__c custom object records is:

/data/Employee__ChangeEvent

When something is changed in entity we will receive notification event for it. 


Design

Properties:

  • clientId: Client ID from the connected app

  • cllientSecret: Client Secret from the connected app

  • username: Username

  • password: Password

  • loginUrl (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is different. That's why user needs this option.

  • Handle errors - Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records which cannot be transformed. "Skip on error" - just skip, "Fail on error" - fails the pipeline if at least one erroneous record is found.

  • objects - list of object's API names (For example: Task for base object and Employee__c for custom) separated by ",". If list is empty then subscription for all events will be used.

Example JSON

{
        "name": "CDCSalesforce",
        "plugin": {
        "name": "CDCSalesforce",
        "type": "realtimesource",
        "label": "CDCSalesforce",
        "properties": {
           "clientId": "XXXXXXX",
           "clientSecret": "XXXXXXXXX",
           "username": "XXXXX",
           "password": "XXXXX",               
           "loginUrl": "https://login.salesforce.com/services/oauth2/token",
           "handleErrors": "Skip on error",
		   "objects": "XXXXX,XXXXX"
         }
       }
 }


DML

We subscribe for appropriate topics to get events for entities when something changes in database.

Event payload example

{
     "data": {
         "schema": "IeRuaY6cbI_HsV8Rv1Mc5g", 
         "payload": {
             "ChangeEventHeader": {
                 "entityName": "Account", 
                 "recordIds": [
                    "<record_ID>"
                  ], 
                  "changeType": "CREATE", 
                  "changeOrigin": "com.salesforce.core", 
                  "transactionKey": "001b7375-0086-250e-e6ca-b99bc3a8b69f", 
                  "sequenceNumber": 1, 
                  "isTransactionEnd": true, 
                  "commitTimestamp": 1501010206653, 
                  "commitNumber": 92847272780, 
                  "commitUser": "<User_ID>"
             }, 
             "Name": "Acme", 
             "Description": "Worldwide leader in gadgets of the future.", 
             "OwnerId": "<Owner_ID>", 
             "CreatedDate": "2018-03-11T19:16:44Z", 
             "CreatedById": "<User_ID>", 
             "LastModifiedDate": "2018-03-11T19:16:44Z", 
             "LastModifiedById": "<User_ID>"
  }, 
  "event": {
      "replayId": 6
  }
 }, 
 "channel": "/data/ChangeEvents"
}

As event message contains only info about fields which were changed so we have to make request for all data in record using record_ID.

Events type mapping

Incoming
Outgoing
CREATEINSERT
UPDATEUPDATE
DELETEDELETE
UNDELETEINSERT

DDL

As there is no events for schema changes so we have to check it on every data change event. So we will make request for schema by it’s name.

Schema payload example

{
  "name" : "Low_Ink__e",
  "namespace" : "com.sforce.eventbus",
  "doc" : "43.0",
  "type" : "record",
  "fields" : [ {
    "name" : "CreatedDate",
    "type" : "long",
    "doc" : "CreatedDate:DateTime"
  }, {
    "name" : "CreatedById",
    "type" : "string",
    "doc" : "CreatedBy:EntityId"
  }, {
    "name" : "Printer_Model__c",
    "type" : [ "null", "string" ],
    "doc" : "Data:Text",
    "default" : null
  }, {
    "name" : "Serial_Number__c",
    "type" : [ "null", "string" ],
    "doc" : "Data:Text",
    "default" : null
  }, {
    "name" : "Ink_Percentage__c",
    "type" : [ "null", "double" ],
    "doc" : "Data:Double",
    "default" : null
  } ],
  "uuid" : "5E5OtZj5_Gm6Vax9XMXH9A"
}

Then we will compare response with previous info about entity, if it exists, and send change event if necessary. If there is no previous info, than create event will be sent.


Implementation

Salesforce CDC API will be used for realtime plugin.

Cometd BayeuxClient is used to subscribe to events.

  • No labels