Salesforce Realtime Source

User Stories

  • As a data pipeline user, I would like to be able to pull data from Salesforce in a streaming pipeline, transform it, and load it into a sink of my choice for analysis.

Requirements

  • Pushtopic: should be optional. The plugin should attempt to create one if it is not specified. If there is an error creating it, a human readable error message should be displayed. At the end of the run, the pushtopic should be deleted, if the plugin created it. If a user overrides it, the plugin should not manage the pushTopic
  • Should support the same requirements regarding Object and SOQL query as in the Salesforce Batch Source
  • Login URL should be configurable
  • Instance URL should be derived from the response of the login API

General Overview of Salesforce Streaming API

Salesforce Streaming API uses publish/subscribe model. Bayeux protocol and CometD are used, so that the client-to-server connection is maintained through long polling. 

Client can subscribe to certain events by subscribing to what is called a pushTopic.

A pushTopic is a special sObject in salesforce DB. Example of pushTopic created via apex Code:

PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'InvoiceStatementUpdates';
pushTopic.Query = 'SELECT Id, Name, Status__c, Description__c FROM Invoice_Statement__c';
pushTopic.ApiVersion = 45.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'Referenced';
insert pushTopic;

After this object is created on server, client can subscribe to /events/InvoiceStatementUpdates to get updates from the query.

Salesforce allows to create pushTopics only for limited set of objects:

  • Account
  • Campaign
  • Case
  • Contact
  • Lead
  • Opportunity
  • Task
  • All custom objects

Salesforce pushTopic queries are restricted to below format:

SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>

Also:

  • The SELECT statement’s field list must include Id.
  • Only one object per query is allowed.
  • The object must be valid for the specified API version.


Number of pushTopics is limited to 40-100 depending on Salesforce license type.

Design

 I don't like idea of automatically deleting pushTopic after creating. Since we can delete users own topics or create too much topics. This is problematic with a small topic limit enforced by Salesforce and inability of cdap to tell when streaming finishes sometimes. So how pushTopics will be handled:

If pushTopic with given name does not exist, create it.

If pushTopic with given name already exists check if parameters are the same as in widget:

  • if yes, start the plugin
  • else, throw an exception. We don't want to edit user's pushTopics might be a bit too intrusive and can cause problems if other clients are using the topic. 

Properties

  • clientId: Client ID from the connected app 
  • cllientSecret: Client Secret from the connected app
  • username: Username
  • password: Password
  • pushTopicNametopic we subscribe to. E.g. for /topics/InvoiceStatementUpdates name would be InvoiceStatementUpdates
  • loginUrl (default is https://login.salesforce.com/services/oauth2/tokenFor Salesforce sandbox runs login url is different. That's why user needs this option.
  • Button *Create Schema* - populated fields of object into schema by parsing the query
  • Handle errors - Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records which cannot be transformed. "Skip on error" - just skip, "Fail on error" - fails the pipeline if at least one erroneous record is found.
    -------------Create push Topic---------------------
  • PushTopicQuery (optional) - query which is used by push Topic to send updates.
  • PushTopicNofityCreate (optional) - checkBox
  • PushTopicNotifyUpdate (optional) - checkBox
  • PushTopicNotifyDelete (optional) - checkBox
  • PushTopicNotifyForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htm
    ------------------------------------
  • PushTopicSObject (optional) - we can either specify this or query

Example JSON

{
        "name": "SalesforceRealtime",
        "plugin": {
        "name": "SalesforceRealtime",
        "type": "realtimesource",
        "label": "SalesforceRealtime",
        "properties": {
           "clientId": "XXXXXXX",
           "clientSecret": "XXXXXXXXX",
           "username": "XXXXX",
           "password": "XXXXX",
           "pushTopicName": "InvoiceStatementUpdates", 
           "outputSchema": "Name:String",                 
           "loginUrl": "https://login.salesforce.com/services/oauth2/token",
           "handleErrors": "Skip on error",
           "PushTopicQuery": "",
           "PushTopicNofityCreate": "",
           "PushTopicNotifyUpdate": "",
           "PushTopicNotifyDelete": "",
           "PushTopicNotifyForFields": "Referenced"
         }
       }
 }

Get Schema button design

Let's say we have a pushTopic InvoiceStatementUpdates

  1. Run 'SELECT query from pushTopic WHERE Name=InvoiceStatementUpdates' and get the query of push topic.
  2. Let's say the query is 'SELECT id,Name FROM Opportunities'
  3. Parse the query to get object properties: id,Name
  4. Parse the query to get object: Opportunities
  5. Use Soap API to get fields of Opportunities
  6. Find id and Name fields, if not found throw an error
  7. Get the types of fields via SOAP api.
  8. Convert Saleforce types to Cdap types
  9. Populate the schema for user.

Implementation

Salesforce streaming API will be used for realtime plugin.

Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue.

Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.

SalesforceReceiver#onStart:

  • 1. Establish connection to instance_url/cometd/45.0
  • 2. Start thread which moves data from blockingQueue to Spark Stream