Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

User Stories

  • As a data pipeline user, I would like to be able to pull data from Salesforce in a streaming pipeline, transform it, and load it into a sink of my choice for analysis.

...

Salesforce pushTopic queries are restricted to below format:

SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>

Also:

  • The SELECT statement’s field list must include Id.
  • Only one object per query is allowed.
  • The object must be valid for the specified API version.

...

Number of pushTopics is limited to 40-100 depending on Salesforce license type.

Design

User will be able to use existing topic or to create a new topic via widget using "Create Push Topic" button. I  I don't like idea of automatically creating and deleting pushTopic. Since we can delete users own topics or create too much topics. This is problematic with a small topic limit enforced by Salesforce and inability of cdap to tell when streaming finishes sometimes.   So how pushTopics will be handled:

If pushTopic with given name does not exist, create it.

If pushTopic with given name already exists check if parameters are the same as in widget:

  • if yes, start the plugin
  • else, throw an exception. We don't want to edit user's pushTopics might be a bit too intrusive and can cause problems if other clients are using the topic. 

Properties

  • clientId: Client ID from the connected app 
  • cllientSecret: Client Secret from the connected app
  • username: Username
  • password: Password
  • pushTopicNametopic we subscribe to. E.g. for /topics/InvoiceStatementUpdates name would be InvoiceStatementUpdates
  • loginUrl (default is https://login.salesforce.com/services/oauth2/tokenFor Salesforce sandbox runs login url is different. That's why user needs this option.
  • Button *Create Schema* - populated fields of object into schema by parsing the query
    -------------Create push Topic---------------------
  • PushTopicQuery (optional) - query which is used by push Topic to send updates.
  • PushTopicNofityCreate (optional) - checkBox
  • PushTopicNotifyUpdate (optional) - checkBox
  • PushTopicNotifyDelete (optional) - checkBox
  • PushTopicForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htmButton *Create PushTopic* - button has type "getPropertyValue". https://github.com/cdapio/cdap/tree/develop/cdap-ui/app/directives/plugin-functions/functions
    it allows to call function and to get a return in a nice format. If it's easy enough change to add a new function-type, we will need to add new "doAction" type and change title from "Property Value" to "Operation status" and "Apply" to "OK".
    Image Removed 

Example JSON

Code Block
{
        "name": "SalesforceRealtime",
        "plugin": {
        "name": "SalesforceRealtime",
        "type": "realtimesource",
        "label": "SalesforceRealtime",
        "properties": {
           "clientId": "XXXXXXX",
           "clientSecret": "XXXXXXXXX",
           "username": "XXXXX",
           "password": "XXXXX",
           "pushTopicName": "InvoiceStatementUpdates", 
           "outputSchema": "Name:String",                 
           "loginUrl": "https://login.salesforce.com/services/oauth2/token",
           "PushTopicQuery": "",
           "PushTopicNofityCreate": "",
           "PushTopicNotifyUpdate": "",
           "PushTopicNotifyDelete": "",
           "PushTopicForFields": "Referenced"
         }
       }
 }

...

  1. Run 'SELECT query from pushTopic WHERE Name=InvoiceStatementUpdates' and get the query of push topic.
  2. Let's say the query is 'SELECT id,Name FROM Opportunities'
  3. Parse the query to get object properties: id,Name
  4. Parse the query to get object: Opportunities
  5. Use Soap API to get fields of Opportunities
  6. Find id and Name fields, if not found throw an error
  7. Get the types of fields via SOAP api.
  8. Convert Saleforce types to Cdap types
  9. Populate the schema for user.

Implementation

Salesforce streaming API will be used for realtime plugin.

Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue.

Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.

SalesforceReceiver#onStart:

  • 1. Establish connection to instance_url/cometd/45.0
  • 2. Start thread which moves data from blockingQueue to Spark Stream