Salesforce Batch Sink
The salesforce batch sink is responsible for using Salesforce API to insert/upsert/update salesforce objects. The sink should handle large batches of data (~10 GB) and should handle all object types - contacts, campaigns, oppurtunities, leads, custom objects. Users should be able to upload subset of fields
Use Cases
As a ETL developer, I would like to consolidate contacts data from multiple source systems and update in Salesforce
- As a ETL developer, I would like to aggregate results of a campaign from data warehouse into Salesforce
Assumptions
- Compound fields such as address and geo location will be represented as nested records in the source system
User Expectations
- Users should be able configure the login URL
- Hints should be provide in the default value
- Users should be able configure the sensitive information such as client id, secret in secure store
- Any error in the upload fails the pipeline
- Field level metadata should be captured by the sink
User Configurations
Section | User Configuration Label | Description | Default | User Widget | Early Validations |
---|---|---|---|---|---|
Authentication | Username | Salesforce username | Text Box | Try a to login to bulk API with given credentials. | |
Password | Password | ||||
Consumer Key | Consumer Key from the connected app | Text Box | |||
Consumer Secret | Consumer Secret from the connected app | Password | |||
Login Url | For Salesforce sandbox runs login url is different. That's why user needs to have this option. | https://login.salesforce.com/services/oauth2/token | Text Box | ||
Advanced | SObject | Name of Salesforce sObject - ex: Contact, Campaign, Oppurtunity. | Text Box | Check if sObject with given name exists in Bulk API. | |
Operation | Possible values are:
| Insert | Select | If operation is upsert or insert. Validate input schema to contain id/external id fields. | |
Upsert external id field | External id field name. Used only for upsert. [5] | Text Box | If empty and operation is upsert fail. If not empty and operation is insert or update fail. | ||
Maximum bytes per batch | If size of batch data is larger than given number of bytes, split the batch. | 10,000,000 [2] | Text Box | If more than 10,000,000 than fail [3] | |
Maximum records per batch | If there are more than given number of records, split the batch. | 10,000 [2] | Text Box | If more than 10,000 fail [4] | |
Error handling | Bulk API will return success results per row so this is necessary [1] (unlike for source plugins). Possible values: "Skip on error" - ignores any reports about records not inserted. Simply prints an error log. | Skip on error | Select |
[1] Salesforce Bulk API will respond with result entry for inserting every single record. Either with "record **ID** was inserted" or with "error: **error_message**". During my testing it happened pretty often that insert was partially successful. For example:
- for records with some field empty it's SUCCESS, but for records where it's not, it says field is not insertable of wrong type etc.
- insertion of part of records was successful, while other failed due to "Storage limit exceeded".
- some field is required so records where it's empty will get an insertion error.
[2] defaults for "Maximum bytes per batch" and "Maximum records per batch" are taken from examples in Salesforce documenation.
[3] according to Bulk API Limitations batch will fail if it has more than 10 MB. So if user sets maximum bytes to something more than 10 millions we should tell the user that it does not make sense by failing the pipeline.
[4] according to Bulk API Limitations batch will fail if it has more than 10.000 records. So if user sets maximum records to something more than 10.000 we should tell the user that it does not make sense by failing the pipeline.
[5] see Upsert and update section.
Salesforce Bulk API for INSERT and how we use it.
To insert records to Salesforce via Bulk API the following steps has to be taken.
- Connect to Bulk API via OAuth2
- Create a bulk job id
- Split data into batches
- Add batches to the job
- Close job
- Await every batch completion
Check results for every batch
In this section we will go though implementation details of every step.
STEP 1. Connect to Bulk API via OAuth2
Using the provided clilentId, clientSecret, username and password,access token and instance URI will be fetched using username and password flow of OAuth2.
e.g:
grant_type=password&client_id=<your_client_id>&client_secret=<your_client_secret>&username=<your_username>&password=<your_password>
The following parameters are required:
grant_type | Set this to password. |
client_id | Application's client identifier. |
client_secret | Application's client secret. |
username | The API user's Salesforce.com username, of the form user@example.com. |
password | The API user's Salesforce.com password. |
Response would be :
{
"id":"https://login.salesforce.com/id/00D50000000IZ3ZEAW/00550000001fg5OAAQ",
"issued_at":"1296509381665",
"instance_url":"https://na1.salesforce.com",
"signature":"+Nbl5EOl/DlsvUZ4NbGDno6vn935XsWGVbwoKyXHayo=",
"access_token":"00D50000000IZ3Z!AQgAQH0Yd9M51BU_rayzAdmZ6NmT3pXZBgzkc3JTwDOGBl8BP2AREOiZzL_A2zg7etH81kTuuQPljJVsX4CPt3naL7qustlb"
}
This access token and instance URI will be used to execute the queries via bulk and soap api
STEP 2. Create a bulk job id
Ask Bulk API to create us a job and return it's id. So we can submit batches to it later.
The job type is either insert, upsert or update.
STEP 3. Split data into batches
Here we create multiple strings with CSV data. Every of them contains multitude of records. Every string represents data for a separate batch.
Splitting data into batches is done considering these 3 factors:
- User configurations "Maximum bytes per batch" and "Maximum records per batch" must be obeyed.
- Bulk API limitations must be obeyed. Here's the list of them:
- Batches for data loads can consist of a single CSV that is no larger than 10 MB.
- A batch can contain a maximum of 10,000 records.
- A batch can contain a maximum of 10,000,000 characters for all the data in a batch.
- A field can contain a maximum of 32,000 characters.
- A record can contain a maximum of 5,000 fields.
- A record can contain a maximum of 400,000 characters for all its fields.
- A batch must contain some content or an error occurs.
A,B,C - controlled by splitting the data correctly.
E - checked during schema validation.
D,F,G - if the data, which comes from sink, exceeds these, simply let the batch fail. Nothing we can do about these. - How many records comes to a specific mapper. For more details see <TODO>.
STEP 4. Add batches to CSV
Pass a ByteArrayInputStream of csv string to Bulk API and ask it to create a batch using it.
STEP 5. Close Job
Ask Bulk API to close the job. This means that no more batches will be expected by Salesforce.
STEP 6. Await every batch completion
Poll for ever batch status until status is either Completed or Failed. Salesforce server enforces a timeout on batch processing, we don't need to implement any timeout logic here.
If any of batches fail, we fail the pipeline with an exception.
STEP 7. Check results for every batch
Query Bulk API to provide results of insertion of every single record. Result is a CSV. Every row of it represents a single record insertion result. Generally this will look like this:
Id,Success,Created,Error fa4t2fggee,true,true, rqewetrter,true,true, ,false,false,Field 'Name' is required and cannot be empty for sObject 'Contact' gre3jvd245,true,true,
Records which has success=false are considered erroneous. Erroneous are processed according to user configuration "Error handling". For more information look at section "User configurations".
MapReduce Parallelization
BatchSink class uses NullWritable as key and CSVRecord as value.
BatchSink#initialize
Check which fields for given sObject are creatable by querying Salesforce SOAP API (for more information see "Policy regarding unknown or non-creatable fields")
BatchSink#transform
Create a CSVRecord instance using the StructuredRecord. In process convert logical types to expected format.
OutputCommitter#setupJob
Create Salesforce job, which the batches will be added to.
OutputCommitter#commitJob
Close Salesforce job.
RecordWritter#constructor
Establish connection to Salesforce Bulk API
RecordWritter#write
Append a record to a csv StringBuilder. If according to the our batching policy we need the record to go into the new batch, we submit the CSV string to the Salesforce job as a separate batch. After that new string is created and the record is appended to it.
For information on how we calculate batches please see Splitting data into batches.
RecordWritter#close
- Submit current CSV StringBuilder as a batch to Salesforce job.
- Wait for completion of EVERY batch which was submitted by current mapper.
- Check results for every record in EVERY batch (submitted by current mapper), and act on them according to the error handling strategy configured by user.
Other points
Validating schema
- SObject contains a lot of fields which cannot be inserted (non-creatable fields) like Id, isDeleted, LastModifiedDate and a lot of other fields which are often auto-generated. We do the early validation and check if schema contains fields which are not present or not creatable in target sObject.
- If operation is insert, check if external id field is in schema.
- If operation is update, check if Id field is in schema.
Converting fields
We will have to convert logical types like date, datetime, time from long to string format accepted by Salesforce. Other types won't require converting.
Type | format expected by Salesforce |
---|---|
date | yyyy-mm-dd |
datetime | ISO 8601 |
time | HH:mm:ss,SSS |
Compound fields
For source plugins we don't use nested records for compound fields. Instead we ask user to provide sub-fields for every compound object separately.
Compound fields %s cannot be fetched when a SOQL query is given.
Please specify the individual attributes instead of compound field name in SOQL query.
For example, instead of ‘Select BillingAddress ...’, use ‘Select BillingCountry, BillingCity, BillingStreet ...
I think handling of these fields should be consistent across all Salesforce plugins. Meaning sink needs to fail with similar error message, if somebody tries to insert a compound field directly. The message would state that user should fully qualify every sub-field he wants to insert. For example, if there is a 'BillingAddress' field in the sObject of type address, the sink will expect 'BillingCountry', 'BillingCity', etc fields.
Emitting errors cannot be done
It would be great to emit errors from this plugin, since it's very common for a part of records to fail to insert (but not all of them). But unfortunately SinkEmitter does not support emitting errors (codelink), also even if it did it would not help. We don't know if it's an error on transform stage. We only know that once the batches are submitted and results are checked. This happens on task finalization (RecordWritter#close)
There is
Upsert and Update operations
Upsert - Salesforce requires user to provide an external id field name. This field is used as basis for upsert (Salesforce will decide if objects are the same using it).
Id field can be used for that (which is present for all Salesforce sObjects). Also user can create a custom field and checkmark it as "External Id" via Salesforce UI.
Update - Specifying an external id field is not supported. For updates Salesforce will always use 'Id' field as basis.