User Stories
...
Salesforce pushTopic queries are restricted to below format:
SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>
Also:
- The SELECT statement’s field list must include Id.
- Only one object per query is allowed.
- The object must be valid for the specified API version.
...
- clientId: Client ID from the connected app
- cllientSecret: Client Secret from the connected app
- username: Username
- password: Password
- topicpushTopicName: topic we subscribe to. E.g. for /topics/InvoiceStatementUpdates name would be InvoiceStatementUpdates
- loginUrl (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is different. That's why user needs this option.
-------------Create push Topic---------------------PushTopicName (optional) - name of pushTopic, which will be used in endpoint /topics/pushTopicName. - PushTopicQuery (optional) - query which is used by push Topic to send updates.
- PushTopicNofityCreate (optional) - checkBox
- PushTopicNotifyUpdate (optional) - checkBox
- PushTopicNotifyDelete (optional) - checkBox
- PushTopicForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htm
- Button *Create PushTopic* - button has type "getPropertyValue". https://github.com/cdapio/cdap/tree/develop/cdap-ui/app/directives/plugin-functions/functions
it allows to call function and to get a return in a nice format. If it's easy enough change to add a new function-type, we will need to add new "doAction" type and change title from "Property Value" to "Operation status" and "Apply" to "OK".
...
Code Block |
---|
{ "name": "SalesforceRealtime", "plugin": { "name": "SalesforceRealtime", "type": "realtimesource", "label": "SalesforceRealtime", "properties": { "clientId": "XXXXXXX", "clientSecret": "XXXXXXXXX", "username": "XXXXX", "password": "XXXXX", "topic": "/events/InvoiceStatementUpdates", "outputSchema": "Name:String", "loginUrl": "https://login.salesforce.com/services/oauth2/token", "pushTopicName": "", "pushTopicQuery": "", "PushTopicNofityCreate": "", "PushTopicNotifyUpdate": "", "PushTopicNotifyDelete": "", "PushTopicForFields": "Referenced" } } } |
Implementation
Salesforce streaming API will be used for realtime plugin.
Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue
.
Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.
SalesforceReceiver#onStart:
- 1. Establish connection to
instance_url
/cometd/45.0 - 2. Start thread which moves data from
blockingQueue
to Spark Stream