Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

User Stories

  • As a data pipeline user, I would like to be able to pull data from Salesforce in a streaming pipeline, transform it, and load it into a sink of my choice for analysis.
    TBD

Requirements

TBD: Bhooshan Mogal Albert Shau please fill these in

General Overview of Salesforce Streaming API

Salesforce Streaming API uses publish/subscribe model. Bayeux protocol and CometD are used, so that the client-to-server connection is maintained through long polling. 

Client can subscribe to certain events by subscribing to what is called a pushTopic.

A pushTopic is a special sObject in salesforce DB. Example of pushTopic created via apex Code:

PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'InvoiceStatementUpdates';
pushTopic.Query = 'SELECT Id, Name, Status__c, Description__c FROM Invoice_Statement__c';
pushTopic.ApiVersion = 45.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'Referenced';
insert pushTopic;

After this object is created on server, client can subscribe to /events/InvoiceStatementUpdates to get updates from the query.

Salesforce allows to create pushTopics only for limited set of objects:

  • Account
  • Campaign
  • Case
  • Contact
  • Lead
  • Opportunity
  • Task
  • All custom objects

Number of pushTopics is limited to 40-100. Depending on Salesforce license type.

Design

User will be able to use existing topic or to create a new topic via widget using "Create Push Topic" button. I don't like idea of automatically creating and deleting pushTopic. Since we can delete users own topics or create too much topics. This is problematic with a small topic limit enforced by Salesforce and inability of cdap to tell when streaming finishes sometimes.  

Properties

Example JSON

{
        "name": "SalesforceRealtime",
        "plugin": {
        "name": "SalesforceRealtime",
        "type": "realtimesource",
        "label": "SalesforceRealtime",
        "properties": {
           "clientId": "XXXXXXX",
           "clientSecret": "XXXXXXXXX",
           "username": "XXXXX",
           "password": "XXXXX",
           "topic": "/events/InvoiceStatementUpdates", 
           "outputSchema": "Name:String",                 
           "loginUrl": "https://login.salesforce.com/services/oauth2/token"
         }
       }
 }


Implementation

Salesforce streaming API will be used for realtime plugin.

Cometd BayeuxClient is used to subscribe to events. When event is received the data is added to a blockingQueue.

Class SalesforceReceiver extends Receiver from Spark Streaming and is used as a receiver stream.

SalesforceReceiver#onStart:

  • 1. Establish connection to instance_url/cometd/45.0
  • 2. Start thread which moves data from blockingQueue to Spark Stream
      
  • No labels