User Stories
- As a data pipeline user, I would like to be able to track data changes in Salesforce and load it into a sink.
- Login URL should be configurable
- List of objects should be configurable
General Overview of Salesforce CDC API
According to documentation Salesforce allows subscribe for all/standard/custom changes.
- All Change Events
- A Standard Object
- For example, the channel to subscribe to change events for Account records is:
- A Custom Object
- For example, the channel to subscribe to change events for Employee__c custom object records is:
When something is changed in entity we will receive notification event for it.
clientId: Client ID from the connected app
cllientSecret: Client Secret from the connected app
username: Username
password: Password
loginUrl: Salesforce login URL to authenticate against. The default value is https://login.salesforce.com. For Salesforce sandbox runs login url is different. That's why user needs this option.
consumerSecret: Application Consumer Secret. This is also known as the OAuth client secret. A Salesforce connected application must be created in order to get a client secret.
username: Username to use when connecting to Salesforce.
password: Password to use when connecting to Salesforce.
objects - Objects to read change events from (For example: Task for base object and Employee__c for custom) separated by ",". If list is empty then subscription for all events will be used.
objects - Objects to read change events from (For example: Task for base object and Employee__c for custom) separated by ",". If list is empty then subscription for all events will be used.
Example JSON
{ "name": "CDCSalesforce", "plugin": { "name": "CDCSalesforce", "type": "realtimesource", "label": "CDCSalesforce", "properties": { "clientIdconsumerKey": "XXXXXXX", "clientSecretconsumerSecret": "XXXXXXXXX", "username": "XXXXX", "password": "XXXXX", "loginUrl": "", "handleErrorsobjects": "Skip on errorXXXXX,XXXXX" } } } |
We subscribe for topic /data/ChangeEvents appropriate topics to get events for all entities when something changes in database.
As event message contains only info about fields which were changed so we have to make request for all data in record using record_ID.
Events type mapping
As there is no events for schema changes so we have to check it on every data change event. So we will make request for schema by it’s idname.
Schema payload example
Code Block |
{ "name" : "Low_Ink__e", "namespace" : "com.sforce.eventbus", "doc" : "43.0", "type" : "record", "fields" : [ { "name" : "CreatedDate", "type" : "long", "doc" : "CreatedDate:DateTime" }, { "name" : "CreatedById", "type" : "string", "doc" : "CreatedBy:EntityId" }, { "name" : "Printer_Model__c", "type" : [ "null", "string" ], "doc" : "Data:Text", "default" : null }, { "name" : "Serial_Number__c", "type" : [ "null", "string" ], "doc" : "Data:Text", "default" : null }, { "name" : "Ink_Percentage__c", "type" : [ "null", "double" ], "doc" : "Data:Double", "default" : null } ], "uuid" : "5E5OtZj5_Gm6Vax9XMXH9A" } |
Then we will compare response with previous info about entity, if it exists, and send change event if necessary. If there is no previous info, than create event will be sent.
- Offset for event's topic is not supported
- Transactional operations are not supported yet
Salesforce CDC API will be used for realtime plugin.
Cometd BayeuxClient is used to subscribe to events.