Salesforce Batch Source
- Todd Greenstein
- Arina Ielchiieva
- Shashank
MyIntroduction
Salesforce exposes an expressive query language "SOQL" (Salesforce Object Query Language) through the Salesforce HTTP REST API. Extending the capabilities of Data Pipelines to include Salesforce Data will allow for improved usability and additional use cases within the CRM and Customer 360 space.
User Stories
- As a data pipeline user, I would like to be able to pull data from Salesforce using a SOQL query in a batch pipeline, transform it and load it into my analytics data warehouse, so that I can use it for analytics
- As a data pipeline user, I would like to be able to optionally specify the batch size for Salesforce API calls, so that I can guard against API limits
- As a data pipeline user, I would like to be able to pull data for all attributes of a given object in my batch pipeline without specifying a SOQL query, so that I do not have to manually list all the columns I want to pull.
- As a data pipeline user, I would like to support pull only data that has changed since the last time an object was read, so that I can do incremental ingestion of my data and reduce load on the APIs.
Requirements
- For authentication client ID, client secret, user name and password. The Salesforce instance should be derived using the login API. User should not require to specify the instance.
- Once the instance is determined, a user should be able to select an object from a list of objects available in the instance. Once he selects the object, he can then choose (multiple) attributes from a list of attributes available for that object. By default, all attributes should be selected.
- A user can skip the above step, and instead choose to specify the data to pull as a SOQL query.
- Users should be able to optionally specify the batch size for all Salesforce queries. If unspecified, a reasonable default should be documented and used.
User Stories - discussion results
Browsing objects and their properties
This is not be supported by cdap, until connection/source/sink consolidation work is done. For now a possibility to get Schema via UI will be implemented. Also methods to get objects and their fields will be created to easily implement this, once connection/source/sink consolidation work is done.
Defaulting to the latest version of the API. Allow user to optionally override it.
Salesforce API is not backward compatible. Salesforce Java library of certain version will support only certain versions of salesforce api. Defaulting to latest version will break things for existing customers, when new Salesforce API version is released. And they will have to go and download latest Salesforce jar to fix this.
Specifying batch size
Batches are internally created by Salesforce processing server. Their size cannot be configured. So we just have to go with what we get. More information in "MapReduce salesforce parallelization" section.
Specifying time period
Based on the discussion with Albert Shau we have decided that adding conditions like "LastModifiedDate
=
TODAY AND HOUR_IN_DAY(LastModifiedDate)
>
1" into the query is not reliable since for complex queries user can have (e.g. nested queries, complex queries without WHERE or multiple WHERE statements) this would be very error-prone to try to insert something inside. So the approach would be describe in docs on how to specify this interval so user can easily incorporate this in the query.
Mapping types from Salesforce to CDAP
CDAP Type | Salesforce Type | Notes |
---|---|---|
Boolean | _bool | |
Long | _int, _long | |
Double | _double, currency, percent | |
Date | date | Logical type |
Timestamp_millis | datetime | Logical type |
Time_millis | time | Logical type |
String | picklist, multipicklist, combobox, reference, base64, textarea, phone, id, url, email, encryptedstring datacategorygroupreference location address anyType json complexvalue | Custom user types are also perceived as strings |
Design
Using Bulk API to retrieve objects from SOQL
We use bulk API since SOAP api can only process hundreds of records. Please note bulk api has it's limitations caused by necessity to process large amounts of data. Bulk API query doesn’t support the following SOQL:
- COUNT
- ROLLUP
- SUM
- GROUP BY CUBE
- OFFSET
- Nested SOQL queries
Automatically choosing Api version
As per Bhooshan Mogal :
Let's use the latest one available right now for development, and not make it configurable. If there's ever an instance where we require a higher version, we'll release a new plugin.
Salesforce v45 will be default non configurable version
Automatically retrieving sObject from query
Salesforce API requires an sObject when when starting a job. This object is the same as the tableName we select data from. It is bad user experience if user has to specify table name two times. That's why antlr4 parser will be used to parse SOQL query and get table name out of it.
Parser should be tested against all queries from:
The page contains an example query for every syntax construct SOQL supports
Properties
- clientId: Client ID from the connected app
- clientSecret: Client Secret from the connected app
- username: Username
- password: Password
- query: SOQL query
- sObjectName: Salesforce object name. If value is provided plugin will get all fields for this object from Salesforce and generate SOQL query: "select <FIELD_1, FIELD_2, ..., FIELD_N> from ${sObjectName}". Ignored if SOQL query is provided.
datetimeFilter: SObject query datetime filter applied to system field `LastModifiedDate` to allow incremental query. Filter will be added to SObject query as "WHERE LastModifiedDate > ${datetimeFilter}". If value is not provided, it means all records to be read since the beginning of time. Expected formats: Salesforce Date Format (e.g. 2019-03-12T11:29:52Z) or Salesforce Date Literal (e.g. LAST_WEEK). https://developer.salesforce.com/docs/atlas.en-us.soql_sosl.meta/soql_sosl/sforce_api_calls_soql_select_dateformats.htm
duration: SObject query duration filter applied to system field `LastModifiedDate` to allow range query. For example, if duration is '6' (6 hours) and the pipeline runs at 9am, it will read data updated from 3am-9am. Ignored if datetimeFilter is provided.
offset: SObject query offset filter applied to system field `LastModifiedDate` to allow range query. For example, if duration is '6' (6 hours) and the offset is '1' (1 hour) and the pipeline runs at 9am, it will read data updated from 2am-8am. Ignored if datetimeFilter is provided.
- outputSchema: Key value pair for output column and datatype.If user doesn't configure output schema, then all the response data along with meta data will be emitted from the plugin(default column would be Records).
- loginUrl (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is different. That's why user needs this option.
Input JSON
{ "name": "HTTPSalesforce", "plugin": { "name": "HTTPSalesforce", "type": "batchsource", "label": "HTTPSalesforce", "properties": { "clientId": "XXXXXXX", "clientSecret": "XXXXXXXXX", "username": "XXXXX", "password": "XXXXX", "query": "SELECT Name FROM Opportunity", "outputSchema": "Name:String", "loginUrl": "https://login.salesforce.com/services/oauth2/token" } } }
Example
clientId:XXXX
clientSecret:XXXXXX
username:XXX
password:XXX
query:SELECT Name FROM Opportunity
outputSchema:Name:String
loginUrl: https://login.salesforce.com/services/oauth2/token
Output (If outputSchema is configured as mentioned in above config section)
Name |
---|
United Oil Plant Standby Generators |
Edge Emergency Generator |
Grand Hotels Emergency Generators |
Output (If outputSchema is not configured)
Records |
---|
"Name":"United Oil Plant Standby Generators","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgJAAX"} |
"Name":"Edge Emergency Generator","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgKAAX"} |
"Name":"Grand Hotels Emergency Generators","attributes":{"type":"Opportunity","url":"/services/data/v37.0/sobjects/Opportunity/0062800000CQWgIAAX"} |
Salesforce Batch API Examples
OAuth2
Using the provided clilentId,clientSecret,username and password,access token and instance URI will be fetched using username and password flow of OAuth2.
e.g:
grant_type=password&client_id=<your_client_id>&client_secret=<your_client_secret>&username=<your_username>&password=<your_password>
The following parameters are required:
grant_type | Set this to password. |
client_id | Application's client identifier. |
client_secret | Application's client secret. |
username | The API user's Salesforce.com username, of the form user@example.com. |
password | The API user's Salesforce.com password. |
Response would be :
{
"id":"https://login.salesforce.com/id/00D50000000IZ3ZEAW/00550000001fg5OAAQ",
"issued_at":"1296509381665",
"instance_url":"https://na1.salesforce.com",
"signature":"+Nbl5EOl/DlsvUZ4NbGDno6vn935XsWGVbwoKyXHayo=",
"access_token":"00D50000000IZ3Z!AQgAQH0Yd9M51BU_rayzAdmZ6NmT3pXZBgzkc3JTwDOGBl8BP2AREOiZzL_A2zg7etH81kTuuQPljJVsX4CPt3naL7qustlb"
}
This access token and instance URI will be used to execute the queries via bulk and soap api
Submitting a job to batch API
Below is an example of submitting a job to Salesforce bulk API and reading all the batches returned using Salesforce java library. Note, that in actual code reading batches will be done in parallel using MapReduce.
// create connection String accessToken = oauth2(config.getClientId(), config.getClientSecret(), config.getUsername(), config.getPassword(), config.getApiVersion()) // an http request BulkConnection bulkConnection = new BulkConnection(accessToken) // create batch job JobInfo job = new JobInfo(); job.setObject("Opportunity"); job.setOperation(OperationEnum.query); job.setConcurrencyMode(ConcurrencyMode.Parallel); job.setContentType(ContentType.CSV); job = bulkConnection.createJob(job); assert job.getId() != null; job = bulkConnection.getJobStatus(job.getId()); // set query and inputStream reader String query = "SELECT Name, Id, IsWon FROM Opportunity"; long start = System.currentTimeMillis(); BatchInfo info = null; ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes()); info = bulkConnection.createBatchFromStream(job, bout); String[] queryResults = new String[1]; BatchInfo[] batches = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo(); // read all batches for(BatchInfo batchInfo:batches) { waitForBatchComplete(batchInfo); info = bulkConnection.getBatchInfo(job.getId(), batchInfo.getId()); if (batchInfo.getState() == BatchStateEnum.Completed) { QueryResultList list = bulkConnection.getQueryResultList(job.getId(), batchInfo.getId()); queryResults = ArrayUtils.add(queryResults, list.getResult()); break; } else if (batchInfo.getState() == BatchStateEnum.Failed) { System.out.println("-------------- failed ----------" + info); break; } else { System.out.println("-------------- waiting ----------" + info); } } // read results List<String> results = new ArrayList<>(); if (queryResults != null) { for (String resultId : queryResults) { InputStream queryResultStream = bulkConnection.getQueryResultStream(job.getId(), info.getId(), resultId); results.add(CharStreams.toString(new InputStreamReader(queryResultStream, Charsets.UTF_8))); } }
Implementation
Plugin Methods
prepareRun:
- setup a mapreduce SalesforceInputFormat class
- setup linage recorder
configurePipeline:
Check if authentication can be done and if salesforce api version and object are valid.
transform:
Build StructuredRecord from record in Salesforce format.
onRunFinish:
Get job status from api and make sure it was successfully complete. If not throw an exception.
Salesforce Mapreduce parallelization
One MapReduce split equals to a single batch returned by Salesforce bulk API. Batches are internally created by Salesforce processing server. Their size cannot be configured. So we just have to go with what we get.
Salesforce will split data into batches according to the following criteria:
- A batch can contain a maximum of 10,000 records.
- A batch can contain a maximum of 10,000,000 characters for all the data in a batch.
- A field can contain a maximum of 32,000 characters.
- A record can contain a maximum of 5,000 fields.
- A record can contain a maximum of 400,000 characters for all its fields.
- A batch must contain some content or an error occurs.
Most significant methods from InputFormat and RecordReader:
SalesforceInputFormat.getSplits():
create a salesforce bulk job. And return the batches created as separate splits
SalesforceRecordReader.initialize():
Wait for current batch status = complete. And get the records from the batch. If batch status = failed throw exception.
SalesforceRecordReader.nextKeyValue():
Iterate over records from current batch. As a value we will use single record we get from batch. As a key we will use NullWritable, since we don't need MapReduce to reduce any records in our case.
Table of Contents
Checklist
- User stories documented
- User stories reviewed
- Design documented
- Design reviewed
- Feature merged
- Examples and guides
- Integration tests
- Documentation for feature
- Short video demonstrating the feature