Salesforce Batch Source

MyIntroduction 

Salesforce exposes an expressive query language "SOQL" (Salesforce Object Query Language) through the Salesforce HTTP REST API.   Extending the capabilities of Data Pipelines to include Salesforce Data will allow for improved usability and additional use cases within the CRM and Customer 360 space.


Use-case
A User in their Data Pipeline can retrieve opportunities and customer information through SOQL to integrate with pipeline customer data to improve overall understanding of customer interactions and the sales process.  

User Stories

  • As a data pipeline user, I would like to be able to pull data from Salesforce using a SOQL query in a batch pipeline, transform it and load it into my analytics data warehouse, so that I can use it for analytics
  • As a data pipeline user, I would like to be able to optionally specify the batch size for Salesforce API calls, so that I can guard against API limits
  • As a data pipeline user, I would like to be able to pull data for all attributes of a given object in my batch pipeline without specifying a SOQL query, so that I do not have to manually list all the columns I want to pull.
  • As a data pipeline user, I would like to support pull only data that has changed since the last time an object was read, so that I can do incremental ingestion of my data and reduce load on the APIs.

Requirements

  • For authentication client ID, client secret, user name and password. The Salesforce instance should be derived using the login API. User should not require to specify the instance.
  • Once the instance is determined, a user should be able to select an object from a list of objects available in the instance. Once he selects the object, he can then choose (multiple) attributes from a list of attributes available for that object. By default, all attributes should be selected.
  • A user can skip the above step, and instead choose to specify the data to pull as a SOQL query.
  • Users should be able to optionally specify the batch size for all Salesforce queries. If unspecified, a reasonable default should be documented and used.

User Stories - discussion results

Browsing objects and their properties

This is not be supported by cdap, until connection/source/sink consolidation work is done. For now a possibility to get Schema via UI will be implemented. Also methods to get objects and their fields will be created to easily implement this, once connection/source/sink consolidation work is done.

Defaulting to the latest version of the API. Allow user to optionally override it.

Salesforce API is not backward compatible. Salesforce Java library of certain version will support only certain versions of salesforce api. Defaulting to latest version will break things for existing customers, when new Salesforce API version is released. And they will have to go and download latest Salesforce jar to fix this.

Specifying batch size

Batches are internally created by Salesforce processing server. Their size cannot be configured. So we just have to go with what we get. More information in "MapReduce salesforce parallelization" section.

Specifying time period

Based on the discussion with Albert Shau we have decided that adding conditions like "LastModifiedDate=TODAY AND HOUR_IN_DAY(LastModifiedDate) 1" into the query is not reliable since for complex queries user can have (e.g. nested queries, complex queries without WHERE or multiple WHERE statements) this would be very error-prone to try to insert something inside. So the approach would be describe in docs on how to specify this interval so user can easily incorporate this in the query.

Mapping types from Salesforce to CDAP

CDAP TypeSalesforce TypeNotes
Boolean_bool
Long_int, _long
Double_double, currency, percent
DatedateLogical type
Timestamp_millisdatetimeLogical type
Time_millistimeLogical type
Stringpicklist, multipicklist, combobox, reference, base64, textarea, phone, id, url, email, encryptedstring datacategorygroupreference location address anyType json complexvalueCustom user types are also perceived as strings


Design

Using Bulk API to retrieve objects from SOQL

We use bulk API since SOAP api can only process hundreds of records. Please note bulk api has it's limitations caused by necessity to process large amounts of data. Bulk API query doesn’t support the following SOQL:

  • COUNT
  • ROLLUP
  • SUM
  • GROUP BY CUBE
  • OFFSET
  • Nested SOQL queries


Automatically choosing Api version

As per Bhooshan Mogal : 

Let's use the latest one available right now for development, and not make it configurable. If there's ever an instance where we require a higher version, we'll release a new plugin.

Salesforce v45 will be default non configurable version

Automatically retrieving sObject from query

Salesforce API requires an sObject when when starting a job. This object is the same as the tableName we select data from. It is bad user experience if user has to specify table name two times. That's why antlr4 parser will be used to parse SOQL query and get table name out of it.

Parser should be tested against all queries from:

https://developer.salesforce.com/docs/atlas.en-us.soql_sosl.meta/soql_sosl/sforce_api_calls_soql_select_examples.htm

The page contains an example query for every syntax construct SOQL supports

Properties

    • clientId: Client ID from the connected app 
    • clientSecret: Client Secret from the connected app
    • username: Username
    • password: Password
    • query: SOQL query
    • sObjectName: Salesforce object name. If value is provided plugin will get all fields for this object from Salesforce and generate SOQL query: "select <FIELD_1, FIELD_2, ..., FIELD_N> from ${sObjectName}". Ignored if SOQL query is provided. 
    • datetimeFilter: SObject query datetime filter applied to system field `LastModifiedDate` to allow incremental query. Filter will be added to SObject query as "WHERE LastModifiedDate > ${datetimeFilter}". If value is not provided, it means all records to be read since the beginning of time. Expected formats: Salesforce Date Format (e.g. 2019-03-12T11:29:52Z) or Salesforce Date Literal (e.g. LAST_WEEK). https://developer.salesforce.com/docs/atlas.en-us.soql_sosl.meta/soql_sosl/sforce_api_calls_soql_select_dateformats.htm

    • duration: SObject query duration filter applied to system field `LastModifiedDate` to allow range query. For example, if duration is '6' (6 hours) and the pipeline runs at 9am, it will read data updated from 3am-9am. Ignored if datetimeFilter is provided.

    • offset: SObject query offset filter applied to system field `LastModifiedDate` to allow range query. For example, if duration is '6' (6 hours) and the offset is '1' (1 hour) and the pipeline runs at 9am, it will read data updated from 2am-8am. Ignored if datetimeFilter is provided.

    • outputSchema: Key value pair for output column and datatype.If user doesn't configure output schema, then all the response data along with meta data will be emitted from the plugin(default column would be Records).
    • loginUrl (default is https://login.salesforce.com/services/oauth2/tokenFor Salesforce sandbox runs login url is different. That's why user needs this option.

Input JSON

{
        "name": "HTTPSalesforce",
        "plugin": {
        "name": "HTTPSalesforce",
        "type": "batchsource",
        "label": "HTTPSalesforce",
        "properties": {
           "clientId": "XXXXXXX",
           "clientSecret": "XXXXXXXXX",
           "username": "XXXXX",
           "password": "XXXXX",
           "query": "SELECT Name FROM Opportunity", 
           "outputSchema": "Name:String",                 
           "loginUrl": "https://login.salesforce.com/services/oauth2/token"
         }
       }
 }


Example

clientId:XXXX

clientSecret:XXXXXX

username:XXX

password:XXX

query:SELECT Name FROM Opportunity

outputSchema:Name:String

loginUrl: https://login.salesforce.com/services/oauth2/token

Output (If outputSchema is configured as mentioned in above config section)

Name
United Oil Plant Standby Generators
Edge Emergency Generator
Grand Hotels Emergency Generators

Output (If outputSchema is not configured)

Records
"Name":"United Oil Plant Standby Generators","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgJAAX"}
"Name":"Edge Emergency Generator","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgKAAX"}
"Name":"Grand Hotels Emergency Generators","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgIAAX"}


Salesforce Batch API Examples

OAuth2 

Using the provided clilentId,clientSecret,username and password,access token and instance URI will be fetched using username and password flow of OAuth2.

e.g:

grant_type=password&client_id=<your_client_id>&client_secret=<your_client_secret>&username=<your_username>&password=<your_password>

The following parameters are required:

grant_typeSet this to password.
client_idApplication's client identifier.
client_secretApplication's client secret.
usernameThe API user's Salesforce.com username, of the form user@example.com.
passwordThe API user's Salesforce.com password.

Response would be :

{
"id":"https://login.salesforce.com/id/00D50000000IZ3ZEAW/00550000001fg5OAAQ",
"issued_at":"1296509381665",
"instance_url":"https://na1.salesforce.com",
"signature":"+Nbl5EOl/DlsvUZ4NbGDno6vn935XsWGVbwoKyXHayo=",
"access_token":"00D50000000IZ3Z!AQgAQH0Yd9M51BU_rayzAdmZ6NmT3pXZBgzkc3JTwDOGBl8BP2AREOiZzL_A2zg7etH81kTuuQPljJVsX4CPt3naL7qustlb"
}

This access token and instance URI will be used to execute the queries via bulk and soap api

Submitting a job to batch API

Below is an example of submitting a job to Salesforce bulk API and reading all the batches returned using Salesforce java library. Note, that in actual code reading batches will be done in parallel using MapReduce.

// create connection
String accessToken = oauth2(config.getClientId(), config.getClientSecret(),
                                                      config.getUsername(), config.getPassword(),
                                                      config.getApiVersion()) // an http request
BulkConnection bulkConnection = new BulkConnection(accessToken)


// create batch job
JobInfo job = new JobInfo();
job.setObject("Opportunity");

job.setOperation(OperationEnum.query);
job.setConcurrencyMode(ConcurrencyMode.Parallel);
job.setContentType(ContentType.CSV);

job = bulkConnection.createJob(job);
assert job.getId() != null;

job = bulkConnection.getJobStatus(job.getId());

// set query and inputStream reader
String query =
  "SELECT Name, Id, IsWon FROM Opportunity";

long start = System.currentTimeMillis();

BatchInfo info = null;
ByteArrayInputStream bout =
  new ByteArrayInputStream(query.getBytes());
info = bulkConnection.createBatchFromStream(job, bout);

String[] queryResults = new String[1];
BatchInfo[] batches = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();

// read all batches
for(BatchInfo batchInfo:batches) {
    waitForBatchComplete(batchInfo);
    info = bulkConnection.getBatchInfo(job.getId(),
                                       batchInfo.getId());

    if (batchInfo.getState() == BatchStateEnum.Completed) {
      QueryResultList list =
        bulkConnection.getQueryResultList(job.getId(),
                                          batchInfo.getId());
      queryResults = ArrayUtils.add(queryResults, list.getResult());
      break;
    } else if (batchInfo.getState() == BatchStateEnum.Failed) {
      System.out.println("-------------- failed ----------"
                           + info);
      break;
    } else {
      System.out.println("-------------- waiting ----------"
                           + info);
    }
}
// read results
List<String> results = new ArrayList<>();
if (queryResults != null) {
  for (String resultId : queryResults) {
    InputStream queryResultStream = bulkConnection.getQueryResultStream(job.getId(), info.getId(), resultId);
    results.add(CharStreams.toString(new InputStreamReader(queryResultStream, Charsets.UTF_8)));
  }
}



Implementation

Plugin Methods

prepareRun:

  • setup a mapreduce SalesforceInputFormat class
  • setup linage recorder

configurePipeline:

Check if authentication can be done and if salesforce api version and object are valid.

transform:

Build StructuredRecord from record in Salesforce format.

onRunFinish:

Get job status from api and make sure it was successfully complete. If not throw an exception.

Salesforce Mapreduce parallelization

One MapReduce split equals to a single batch returned by Salesforce bulk API. Batches are internally created by Salesforce processing server. Their size cannot be configured. So we just have to go with what we get. 

Salesforce will split data into batches according to the following criteria:

  • A batch can contain a maximum of 10,000 records.
  • A batch can contain a maximum of 10,000,000 characters for all the data in a batch.
  • A field can contain a maximum of 32,000 characters.
  • A record can contain a maximum of 5,000 fields.
  • A record can contain a maximum of 400,000 characters for all its fields.
  • A batch must contain some content or an error occurs.


Most significant methods from InputFormat and RecordReader:

SalesforceInputFormat.getSplits():

create a salesforce bulk job. And return the batches created as separate splits 

SalesforceRecordReader.initialize():

Wait for current batch status = complete. And get the records from the batch. If batch status = failed throw exception.

SalesforceRecordReader.nextKeyValue():

Iterate over records from current batch. As a value we will use single record we get from batch. As a key we will use NullWritable, since we don't need MapReduce to reduce any records in our case.

Table of Contents

Checklist

  • User stories documented 
  • User stories reviewed 
  • Design documented 
  • Design reviewed 
  • Feature merged 
  • Examples and guides 
  • Integration tests 
  • Documentation for feature 
  • Short video demonstrating the feature