Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

User Stories

  • As a data pipeline user, I would like to be able to pull data from Salesforce in a streaming pipeline, transform it, and load it into a sink of my choice for analysis.
    TBD

Requirements

  • Pushtopic: should be optional. The plugin should attempt to create one if it is not specified. If there is an error creating it, a human readable error message should be displayed. At the end of the run, the pushtopic should be deleted, if the plugin created it. If a user overrides it, the plugin should not manage the pushTopic
  • Should support the same requirements regarding Object and SOQL query as in the Salesforce Batch Source
  • Login URL should be configurable
  • Instance URL should be derived from the response of the login API

...

Salesforce pushTopic queries are restricted to below format:

SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>

Also:

  • The SELECT statement’s field list must include Id.
  • Only one object per query is allowed.
  • The object must be valid for the specified API version.

...

Number of pushTopics is limited to 40-100 depending on Salesforce license type.

Design

User will be able to use existing topic or to create a new topic via widget using "Create Push Topic" button. I  I don't like idea of automatically creating and deleting pushTopicautomatically deleting pushTopic after creating. Since we can delete users own topics or create too much topics. This is problematic with a small topic limit enforced by Salesforce and inability of cdap to tell when streaming finishes sometimes.  . So how pushTopics will be handled:

If pushTopic with given name does not exist, create it.

If pushTopic with given name already exists check if parameters are the same as in widget:

  • if yes, start the plugin
  • else, throw an exception. We don't want to edit user's pushTopics might be a bit too intrusive and can cause problems if other clients are using the topic. 

Properties

  • clientId: Client ID from the connected app 
  • cllientSecret: Client Secret from the connected app
  • username: Username
  • password: Password
  • pushTopicNametopic we subscribe to. E.g. for /topics/InvoiceStatementUpdates name would be InvoiceStatementUpdates
  • loginUrl (default is https://login.salesforce.com/services/oauth2/tokenFor Salesforce sandbox runs login url is different. That's why user needs this option.
  • Button *Create Schema* - populated fields of object into schema by parsing the query
  • Handle errors - Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records which cannot be transformed. "Skip on error" - just skip, "Fail on error" - fails the pipeline if at least one erroneous record is found.
    -------------Create push Topic---------------------
  • PushTopicQuery (optional) - query which is used by push Topic to send updates.
  • PushTopicNofityCreate (optional) - checkBox
  • PushTopicNotifyUpdate (optional) - checkBox
  • PushTopicNotifyDelete (optional) - checkBox
  • PushTopicForFields PushTopicNotifyForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htm
    Button *Create PushTopic* - button has type "getPropertyValue". https://github.com/cdapio/cdap/tree/develop/cdap-ui/app/directives/plugin-functions/functions
    it allows to call function and to get a return in a nice format. If it's easy enough change to add a new function-type, we will need to add new "doAction" type and change title from "Property Value" to "Operation status" and "Apply" to "OK".
    Image Removed ------------------------------------
  • PushTopicSObject (optional) - we can either specify this or query

Example JSON

Code Block
{
        "name": "SalesforceRealtime",
        "plugin": {
        "name": "SalesforceRealtime",
        "type": "realtimesource",
        "label": "SalesforceRealtime",
        "properties": {
           "clientId": "XXXXXXX",
           "clientSecret": "XXXXXXXXX",
           "username": "XXXXX",
           "password": "XXXXX",
           "topicpushTopicName": "/events/InvoiceStatementUpdates", 
           "outputSchema": "Name:String",                 
           "loginUrl": "https://login.salesforce.com/services/oauth2/token",
		
           "pushTopicNamehandleErrors": "Skip on error",
           "pushTopicQueryPushTopicQuery": "",
           "PushTopicNofityCreate": "",
           "PushTopicNotifyUpdate": "",
           "PushTopicNotifyDelete": "",
           "PushTopicForFieldsPushTopicNotifyForFields": "Referenced"
         }
       }
 }

Get Schema button design

Let's say we have a pushTopic InvoiceStatementUpdates

  1. Run 'SELECT query from pushTopic WHERE Name=InvoiceStatementUpdates' and get the query of push topic.
  2. Let's say the query is 'SELECT id,Name FROM Opportunities'
  3. Parse the query to get object properties: id,Name
  4. Parse the query to get object: Opportunities
  5. Use Soap API to get fields of Opportunities
  6. Find id and Name fields, if not found throw an error
  7. Get the types of fields via SOAP api.
  8. Convert Saleforce types to Cdap types
  9. Populate the schema for user.

Implementation

Salesforce streaming API will be used for realtime plugin.

Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue.

Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.

SalesforceReceiver#onStart:

  • 1. Establish connection to instance_url/cometd/45.0
  • 2. Start thread which moves data from blockingQueue to Spark Stream