User Stories
- As a data pipeline user, I would like to be able to pull data from Salesforce in a streaming pipeline, transform it, and load it into a sink of my choice for analysis.
TBD
Requirements
- Pushtopic: should be optional. The plugin should attempt to create one if it is not specified. If there is an error creating it, a human readable error message should be displayed. At the end of the run, the pushtopic should be deleted, if the plugin created it. If a user overrides it, the plugin should not manage the pushTopic
- Should support the same requirements regarding Object and SOQL query as in the Salesforce Batch Source
- Login URL should be configurable
- Instance URL should be derived from the response of the login API
General Overview of Salesforce Streaming API
Salesforce Streaming API uses publish/subscribe model. Bayeux protocol and CometD are used, so that the client-to-server connection is maintained through long polling.
Client can subscribe to certain events by subscribing to what is called a pushTopic.
A pushTopic is a special sObject in salesforce DB. Example of pushTopic created via apex Code:
PushTopic pushTopic = new PushTopic(); pushTopic.Name = 'InvoiceStatementUpdates'; pushTopic.Query = 'SELECT Id, Name, Status__c, Description__c FROM Invoice_Statement__c'; pushTopic.ApiVersion = 45.0; pushTopic.NotifyForOperationCreate = true; pushTopic.NotifyForOperationUpdate = true; pushTopic.NotifyForOperationUndelete = true; pushTopic.NotifyForOperationDelete = true; pushTopic.NotifyForFields = 'Referenced'; insert pushTopic;
After this object is created on server, client can subscribe to /events/InvoiceStatementUpdates to get updates from the query.
Salesforce allows to create pushTopics only for limited set of objects:
- Account
- Campaign
- Case
- Contact
- Lead
- Opportunity
- Task
- All custom objects
Salesforce pushTopic queries are restricted to below format:
SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>
Also:
- The SELECT statement’s field list must include Id.
- Only one object per query is allowed.
- The object must be valid for the specified API version.
Number of pushTopics is limited to 40-100 depending on Salesforce license type.
Design
User will be able to use existing topic or to create a new topic via widget using "Create Push Topic" button. I don't like idea of automatically creating and deleting pushTopic. Since we can delete users own topics or create too much topics. This is problematic with a small topic limit enforced by Salesforce and inability of cdap to tell when streaming finishes sometimes.
Properties
- clientId: Client ID from the connected app
- cllientSecret: Client Secret from the connected app
- username: Username
- password: Password
- pushTopicName: topic we subscribe to. E.g. for /topics/InvoiceStatementUpdates name would be InvoiceStatementUpdates
- loginUrl (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is different. That's why user needs this option.
-------------Create push Topic--------------------- - PushTopicQuery (optional) - query which is used by push Topic to send updates.
- PushTopicNofityCreate (optional) - checkBox
- PushTopicNotifyUpdate (optional) - checkBox
- PushTopicNotifyDelete (optional) - checkBox
- PushTopicForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htm
- Button *Create PushTopic* - button has type "getPropertyValue". https://github.com/cdapio/cdap/tree/develop/cdap-ui/app/directives/plugin-functions/functions
it allows to call function and to get a return in a nice format. If it's easy enough change to add a new function-type, we will need to add new "doAction" type and change title from "Property Value" to "Operation status" and "Apply" to "OK".
Example JSON
{ "name": "SalesforceRealtime", "plugin": { "name": "SalesforceRealtime", "type": "realtimesource", "label": "SalesforceRealtime", "properties": { "clientId": "XXXXXXX", "clientSecret": "XXXXXXXXX", "username": "XXXXX", "password": "XXXXX", "pushTopicName": "InvoiceStatementUpdates", "outputSchema": "Name:String", "loginUrl": "https://login.salesforce.com/services/oauth2/token", "PushTopicQuery": "", "PushTopicNofityCreate": "", "PushTopicNotifyUpdate": "", "PushTopicNotifyDelete": "", "PushTopicForFields": "Referenced" } } }
Implementation
Salesforce streaming API will be used for realtime plugin.
Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue
.
Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.
SalesforceReceiver#onStart:
- 1. Establish connection to
instance_url
/cometd/45.0 - 2. Start thread which moves data from
blockingQueue
to Spark Stream