User Stories
...
- clientId: Client ID from the connected app
- cllientSecret: Client Secret from the connected app
- username: Username
- password: Password
- topic: topic we subscribe to. E.g. /events/InvoiceStatementUpdates
- loginUrl (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is different. That's why user needs this option.
-------------Create push Topic--------------------- - Name PushTopicName (optional) - name of pushTopic, which will be used in endpoint /events/pushTopicName.
- Query PushTopicQuery (optional) - query which is used by push Topic to send updates.
- PushTopicName (optional) - checkBox
- PushTopicNofityCreate (optional) - checkBox
- PushTopicNotifyUpdate (optional) - checkBox
- PushTopicNotifyDelete (optional) - checkBox
- PushTopicForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htm
- Button *Create PushTopic* - button has type "getPropertyValue". https://github.com/cdapio/cdap/tree/develop/cdap-ui/app/directives/plugin-functions/functions
it allows to call function and to get a return in a nice format. If it's easy enough change to add a new function-type, we will need to add new "doAction" type and change title from "Property Value" to "Operation status" and "Apply" to "OK".
...
Code Block |
---|
{
"name": "SalesforceRealtime",
"plugin": {
"name": "SalesforceRealtime",
"type": "realtimesource",
"label": "SalesforceRealtime",
"properties": {
"clientId": "XXXXXXX",
"clientSecret": "XXXXXXXXX",
"username": "XXXXX",
"password": "XXXXX",
"topic": "/events/InvoiceStatementUpdates",
"outputSchema": "Name:String",
"loginUrl": "https://login.salesforce.com/services/oauth2/token",
"pushTopicName": "",
"pushTopicQuery": "",
"PushTopicNofityCreate": "",
"PushTopicNotifyUpdate": "",
"PushTopicNotifyDelete": "",
"PushTopicForFields": "Referenced"
}
}
} |
Implementation
Salesforce streaming API will be used for realtime plugin.
Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue
.
Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.
SalesforceReceiver#onStart:
- 1. Establish connection to
instance_url
/cometd/45.0 - 2. Start thread which moves data from
blockingQueue
to Spark Stream