User Stories
- As a data pipeline user, I would like to be able to pull data from Salesforce in a streaming pipeline, transform it, and load it into a sink of my choice for analysis.
...
Salesforce pushTopic queries are restricted to below format:
SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>
Also:
- The SELECT statement’s field list must include Id.
- Only one object per query is allowed.
- The object must be valid for the specified API version.
...
- clientId: Client ID from the connected app
- cllientSecret: Client Secret from the connected app
- username: Username
- password: Password
- pushTopicName: topic we subscribe to. E.g. for /topics/InvoiceStatementUpdates name would be InvoiceStatementUpdates
- loginUrl (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is different. That's why user needs this option.
- Button *Create Schema* - populated fields of object into schema by parsing the query
- Handle errors - Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records which cannot be transformed. "Skip on error" - just skip, "Fail on error" - fails the pipeline if at least one erroneous record is found.
-------------Create push Topic--------------------- - PushTopicQuery (optional) - query which is used by push Topic to send updates.
- PushTopicNofityCreate (optional) - checkBox
- PushTopicNotifyUpdate (optional) - checkBox
- PushTopicNotifyDelete (optional) - checkBox
- PushTopicNotifyForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htm
------------------------------------ - PushTopicSObject (optional) - we can either specify this or queryPushTopicInterval (optional) - interval in minutes since which updates are gathered only works (if specified sobject instead of sooql)
Example JSON
Code Block |
---|
{ "name": "SalesforceRealtime", "plugin": { "name": "SalesforceRealtime", "type": "realtimesource", "label": "SalesforceRealtime", "properties": { "clientId": "XXXXXXX", "clientSecret": "XXXXXXXXX", "username": "XXXXX", "password": "XXXXX", "pushTopicName": "InvoiceStatementUpdates", "outputSchema": "Name:String", "loginUrl": "https://login.salesforce.com/services/oauth2/token", "handleErrors": "Skip on error", "PushTopicQuery": "", "PushTopicNofityCreate": "", "PushTopicNotifyUpdate": "", "PushTopicNotifyDelete": "", "PushTopicNotifyForFields": "Referenced" } } } |
...
- Run 'SELECT query from pushTopic WHERE Name=InvoiceStatementUpdates' and get the query of push topic.
- Let's say the query is 'SELECT id,Name FROM Opportunities'
- Parse the query to get object properties: id,Name
- Parse the query to get object: Opportunities
- Use Soap API to get fields of Opportunities
- Find id and Name fields, if not found throw an error
- Get the types of fields via SOAP api.
- Convert Saleforce types to Cdap types
- Populate the schema for user.
Implementation
Salesforce streaming API will be used for realtime plugin.
Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue
.
Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.
SalesforceReceiver#onStart:
- 1. Establish connection to
instance_url
/cometd/45.0 - 2. Start thread which moves data from
blockingQueue
to Spark Stream