Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 21 Next »

User Stories

  • As a data pipeline user, I would like to be able to pull data from Salesforce in a streaming pipeline, transform it, and load it into a sink of my choice for analysis.
    TBD

Requirements

  • Pushtopic: should be optional. The plugin should attempt to create one if it is not specified. If there is an error creating it, a human readable error message should be displayed. At the end of the run, the pushtopic should be deleted, if the plugin created it. If a user overrides it, the plugin should not manage the pushTopic
  • Should support the same requirements regarding Object and SOQL query as in the Salesforce Batch Source
  • Login URL should be configurable
  • Instance URL should be derived from the response of the login API

General Overview of Salesforce Streaming API

Salesforce Streaming API uses publish/subscribe model. Bayeux protocol and CometD are used, so that the client-to-server connection is maintained through long polling. 

Client can subscribe to certain events by subscribing to what is called a pushTopic.

A pushTopic is a special sObject in salesforce DB. Example of pushTopic created via apex Code:

PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'InvoiceStatementUpdates';
pushTopic.Query = 'SELECT Id, Name, Status__c, Description__c FROM Invoice_Statement__c';
pushTopic.ApiVersion = 45.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'Referenced';
insert pushTopic;

After this object is created on server, client can subscribe to /events/InvoiceStatementUpdates to get updates from the query.

Salesforce allows to create pushTopics only for limited set of objects:

  • Account
  • Campaign
  • Case
  • Contact
  • Lead
  • Opportunity
  • Task
  • All custom objects

Salesforce pushTopic queries are restricted to below format:

SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>

Also:

  • The SELECT statement’s field list must include Id.
  • Only one object per query is allowed.
  • The object must be valid for the specified API version.


Number of pushTopics is limited to 40-100 depending on Salesforce license type.

Design

User will be able to use existing topic or to create a new topic via widget using "Create Push Topic" button. I don't like idea of automatically creating and deleting pushTopic. Since we can delete users own topics or create too much topics. This is problematic with a small topic limit enforced by Salesforce and inability of cdap to tell when streaming finishes sometimes.  

Properties

  • clientId: Client ID from the connected app 
  • cllientSecret: Client Secret from the connected app
  • username: Username
  • password: Password
  • pushTopicNametopic we subscribe to. E.g. for /topics/InvoiceStatementUpdates name would be InvoiceStatementUpdates
  • loginUrl (default is https://login.salesforce.com/services/oauth2/tokenFor Salesforce sandbox runs login url is different. That's why user needs this option.
    -------------Create push Topic---------------------
  • PushTopicQuery (optional) - query which is used by push Topic to send updates.
  • PushTopicNofityCreate (optional) - checkBox
  • PushTopicNotifyUpdate (optional) - checkBox
  • PushTopicNotifyDelete (optional) - checkBox
  • PushTopicForFields (optional, default is Referenced) - possible values are: All, Referenced, Select, Where. We need to put a good description here. Or to link to https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/notifications.htm
  • Button *Create PushTopic* - button has type "getPropertyValue". https://github.com/cdapio/cdap/tree/develop/cdap-ui/app/directives/plugin-functions/functions
    it allows to call function and to get a return in a nice format. If it's easy enough change to add a new function-type, we will need to add new "doAction" type and change title from "Property Value" to "Operation status" and "Apply" to "OK".
     

Example JSON

{
        "name": "SalesforceRealtime",
        "plugin": {
        "name": "SalesforceRealtime",
        "type": "realtimesource",
        "label": "SalesforceRealtime",
        "properties": {
           "clientId": "XXXXXXX",
           "clientSecret": "XXXXXXXXX",
           "username": "XXXXX",
           "password": "XXXXX",
           "topic": "/events/InvoiceStatementUpdates", 
           "outputSchema": "Name:String",                 
           "loginUrl": "https://login.salesforce.com/services/oauth2/token",
		   "pushTopicName": "",
           "pushTopicQuery": "",
           "PushTopicNofityCreate": "",
           "PushTopicNotifyUpdate": "",
           "PushTopicNotifyDelete": "",
           "PushTopicForFields": "Referenced"
         }
       }
 }


Implementation

Salesforce streaming API will be used for realtime plugin.

Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue.

Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.

SalesforceReceiver#onStart:

  • 1. Establish connection to instance_url/cometd/45.0
  • 2. Start thread which moves data from blockingQueue to Spark Stream
      
  • No labels