Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The salesforce batch sink is responsible for using Salesforce API to upsert salesforce objects. The sink should handle large batches of data (~10 GB) and should handle all object types - contacts, campaigns, oppurtunities, leads, custom objects. Users should be able to upload subset of fields 

...

  • Users should be able configure the login URL
    • Hints should be provide in the default value 
  • Users should be able configure the sensitive information such as client id, secret in secure store 
  • Any error in the upload fails the pipeline
  • Field level metadata should be captured by the sink 

User Configurations 

Text Box
SectionUser Configuration LabelLabel DescriptionOptionsDefaultVariableUser WidgetStandardLogin REST URLA REST API for Salesforce cloudsalesforce-rest-urlText BoxObject TypeType of Salesforce object - ex: contacts, campaigns, oppurtunitiesrecord-typeDrop downUsernameSalesforce usernameusernameText BoxShared SecretSalesforce passwordpasswordText BoxAdvancedBatch SizeBatch size of the uploadbatch-sizeUser Widget
Early Validations
AuthenticationUsernameSalesforce username
Text Box

Try a to login to bulk API with given credentials.





Password

Password

Consumer KeyConsumer Key from the connected app 
Text Box

Consumer SecretConsumer Secret from the connected app 
Password

Login UrlFor Salesforce sandbox runs login url is different. That's why user needs to have this option.https://login.salesforce.com/services/oauth2/tokenText Box
AdvancedSObjectName of Salesforce sObject - ex: Contact, Campaign, Oppurtunity.
Text BoxCheck if sObject with given name exists in Bulk API.

Maximum bytes per batchIf size of batch data is larger than given number of bytes, split the batch.10,000,000 [2]Text BoxIf more than 10,000,000 than fail [3]

Maximum records per batchIf there are more than given number of records, split the batch.10,000 [2]Text BoxIf more than 10,000 fail [3]

Error handling

Bulk API will return success results per row so this is necessary [1] (unlike for source plugins).

Possible values:

"Skip on error" - ignores any reports about records not inserted
"Send to error" - sends records which failed to insert to error handler
"Stop on error" - fails pipeline is any of records were failed on insertion


Select

[1] Salesforce Bulk API will respond with result entry for inserting every single record. Either will "record **ID** was inserted" or with "error: **error_message**". During my testing it happened pretty often that insert was partially successful. For example:

  • for records with some field empty it's SUCCESS, but for records where it's not, it says field is not insertable of wrong type etc.
  • insertion of part of records was successful, while other failed due to "Storage limit exceeded".
  • some field is required so records where it's empty will get an insertion error.

[2] defaults for "Maximum bytes per batch" and "Maximum records per batch" are taken from examples in Salesforce documenation.

[3] according to Bulk API Limitations batch will fail if it has more than 10 MB. So if user sets maximum bytes to something more than 10 millions we should tell the user that it does not make sense by failing the pipeline.

[4] according to Bulk API Limitations batch will fail if it has more than 10.000 records. So if user sets maximum records to something more than 10.000 we should tell the user that it does not make sense by failing the pipeline.

Salesforce Bulk API for INSERT and how we use it.

To insert records to Salesforce via Bulk API the following steps has to be taken.

  1. Connect to Bulk API via OAuth2
  2. Create a bulk job id
  3. Split data into batches
  4. Add batches to the job
  5. Close job
  6. Await every batch completion
  7. Check results for every batch

In this section we will go though implementation details of every step.

STEP 1.  Connect to Bulk API via OAuth2

Using the provided clilentId, clientSecret, username and password,access token and instance URI will be fetched using username and password flow of OAuth2.

e.g:

grant_type=password&client_id=<your_client_id>&client_secret=<your_client_secret>&username=<your_username>&password=<your_password>

The following parameters are required:

grant_typeSet this to password.
client_idApplication's client identifier.
client_secretApplication's client secret.
usernameThe API user's Salesforce.com username, of the form user@example.com.
passwordThe API user's Salesforce.com password.


Response would be :

{
"id":"https://login.salesforce.com/id/00D50000000IZ3ZEAW/00550000001fg5OAAQ",
"issued_at":"1296509381665",
"instance_url":"https://na1.salesforce.com",
"signature":"+Nbl5EOl/DlsvUZ4NbGDno6vn935XsWGVbwoKyXHayo=",
"access_token":"00D50000000IZ3Z!AQgAQH0Yd9M51BU_rayzAdmZ6NmT3pXZBgzkc3JTwDOGBl8BP2AREOiZzL_A2zg7etH81kTuuQPljJVsX4CPt3naL7qustlb"
}

This access token and instance URI will be used to execute the queries via bulk and soap api

STEP 2. Create a bulk job id

Ask Bulk API to create us a job and return it's id. So we can submit batches to it later.

The job type is set to insert, not an upsert. Since for upsert we need to know IDs for every record we update.

STEP 3. Split data into batches

Here we create multiple CSV files. Every of them contains multitude of records.

Splitting data into batches is done considering these 3 factors:

  1. User configurations "Maximum bytes per batch" and "Maximum records per batch" must be obeyed.
  2. Bulk API limitations must be obeyed. Here's the list of them:
    1. Batches for data loads can consist of a single CSV file that is no larger than 10 MB.
    2. A batch can contain a maximum of 10,000 records.
    3. A batch can contain a maximum of 10,000,000 characters for all the data in a batch.
    4. A field can contain a maximum of 32,000 characters.
    5. A record can contain a maximum of 5,000 fields.
    6. A record can contain a maximum of 400,000 characters for all its fields.
    7. A batch must contain some content or an error occurs.
    How we handle these:

    A,B,C - controlled by splitting the data correctly.
    E - checked during schema validation.
    D,F,G - if the data, which comes from sink, exceeds these, simply let the batch fail. Nothing we can do about these.

  3. How many records comes to a specific mapper. For more details see <TODO>.

STEP 4. Add batches to CSV

Pass a FileInputStream of csv file to Bulk API and ask it to create a batch using this file.

STEP 5. Close Job

Ask Bulk API to close the job. This means that no more batches will be expected by Salesforce.

STEP 6. Await every batch completion

Poll for ever batch status until status is either Completed or Failed. Salesforce server enforces a timeout on batch processing, we don't need to implement any timeout logic here.

If any of batches fail, we fail the pipeline with an exception.

STEP 7. Check results for every batch

Query Bulk API to provide results of insertion of every single record. Result be a CSV and will contain a row representing row insertion result. Generally this will look like this:

Code Block
Id,Success,Created,Error
fa4t2fggee,true,true,
rqewetrter,true,true,
,false,false,Field 'Name' is required and cannot be empty for sObject 'Contact'
gre3jvd245,true,true,

Records which has either success=false OR created=false are considered erroneous. Erroneous are processed according to user configuration "Error handling". For more information look at section "User configurations".



MapReduce Parallelization

<TODO>


Other TODOs:

  • Converting logical types
  • Compound fields
  • Filtering non-existent and non-creatable fields.