Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

User Stories

...

Salesforce pushTopic queries are restricted to below format:

SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>

Also:

  • The SELECT statement’s field list must include Id.
  • Only one object per query is allowed.
  • The object must be valid for the specified API version.

...

  • clientId: Client ID from the connected app 
  • cllientSecret: Client Secret from the connected app
  • username: Username
  • password: Password
  • topicpushTopicNametopic we subscribe to. E.g. for /topics/InvoiceStatementUpdates name would be InvoiceStatementUpdates
  • loginUrl (default is https://login.salesforce.com/services/oauth2/tokenFor Salesforce sandbox runs login url is different. That's why user needs this option.
    -------------Create push Topic---------------------PushTopicName (optional) - name of pushTopic, which will be used in endpoint /topics/pushTopicName.
  • PushTopicQuery (optional) - query which is used by push Topic to send updates.
  • PushTopicNofityCreate (optional) - checkBox
  • PushTopicNotifyUpdate (optional) - checkBox
  • PushTopicNotifyDelete (optional) - checkBox
  • PushTopicForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htm
  • Button *Create PushTopic* - button has type "getPropertyValue". https://github.com/cdapio/cdap/tree/develop/cdap-ui/app/directives/plugin-functions/functions
    it allows to call function and to get a return in a nice format. If it's easy enough change to add a new function-type, we will need to add new "doAction" type and change title from "Property Value" to "Operation status" and "Apply" to "OK".
     

...

Code Block
{
        "name": "SalesforceRealtime",
        "plugin": {
        "name": "SalesforceRealtime",
        "type": "realtimesource",
        "label": "SalesforceRealtime",
        "properties": {
           "clientId": "XXXXXXX",
           "clientSecret": "XXXXXXXXX",
           "username": "XXXXX",
           "password": "XXXXX",
           "topic": "/events/InvoiceStatementUpdates", 
           "outputSchema": "Name:String",                 
           "loginUrl": "https://login.salesforce.com/services/oauth2/token",
		   "pushTopicName": "",
           "pushTopicQuery": "",
           "PushTopicNofityCreate": "",
           "PushTopicNotifyUpdate": "",
           "PushTopicNotifyDelete": "",
           "PushTopicForFields": "Referenced"
         }
       }
 }


Implementation

Salesforce streaming API will be used for realtime plugin.

Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue.

Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.

SalesforceReceiver#onStart:

  • 1. Establish connection to instance_url/cometd/45.0
  • 2. Start thread which moves data from blockingQueue to Spark Stream