Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The salesforce batch sink is responsible for using Salesforce API to insert/upsert/update salesforce objects. The sink should handle large batches of data (~10 GB) and should handle all object types - contacts, campaigns, oppurtunities, leads, custom objects. Users should be able to upload subset of fields 

...

SectionUser Configuration LabelDescriptionDefaultUser Widget
Early Validations
AuthenticationUsernameSalesforce username
Text Box

Try a to login to bulk API with given credentials.





Password

Password

Consumer KeyConsumer Key from the connected app 
Text Box

Consumer SecretConsumer Secret from the connected app 
Password

Login UrlFor Salesforce sandbox runs login url is different. That's why user needs to have this option.https://login.salesforce.com/services/oauth2/tokenText Box
AdvancedSObjectName of Salesforce sObject - ex: Contact, Campaign, Oppurtunity.
Text BoxCheck if sObject with given name exists in Bulk API.

Operation

Possible values are:

  • Insert
  • Upsert [5]
  • Update [5]
InsertSelectIf operation is upsert or insert. Validate input schema to contain id/external id fields.

Upsert external id fieldExternal id field name. Used only for upsert. [5]
Text Box

If empty and operation is upsert fail.

If not empty and operation is insert or update fail.


Maximum bytes per batchIf size of batch data is larger than given number of bytes, split the batch.10,000,000 [2]Text BoxIf more than 10,000,000 than fail [3]

Maximum records per batchIf there are more than given number of records, split the batch.10,000 [2]Text BoxIf more than 10,000 fail [4]

Error handling

Bulk API will return success results per row so this is necessary [1] (unlike for source plugins).

Possible values:

"Skip on error" - ignores any reports about records not inserted. Simply prints an error log.
"Stop on error" - fails pipeline is any of records were failed on insertion

Skip on errorSelect

[1] Salesforce Bulk API will respond with result entry for inserting every single record. Either will with "record **ID** was inserted" or with "error: **error_message**". During my testing it happened pretty often that insert was partially successful. For example:

...

[4] according to Bulk API Limitations batch will fail if it has more than 10.000 records. So if user sets maximum records to something more than 10.000 we should tell the user that it does not make sense by failing the pipeline.

[5] see Upsert and update section.

Salesforce Bulk API for INSERT and how we use it.

...

Ask Bulk API to create us a job and return it's id. So we can submit batches to it later.

The job type is set to either insert, not an upsert. Since for upsert we need to know IDs for every record we upsert or update.

Anchor
split_data
split_data
STEP 3. Split data into batches

Here we create multiple CSV filesstrings with CSV data. Every of them contains multitude of records. Every string represents data for a separate batch.

Splitting data into batches is done considering these 3 factors:

  1. User configurations "Maximum bytes per batch" and "Maximum records per batch" must be obeyed.
  2. Bulk API limitations must be obeyed. Here's the list of them:
    1. Batches for data loads can consist of a single CSV file that CSV that is no larger than 10 MB.
    2. A batch can contain a maximum of 10,000 records.
    3. A batch can contain a maximum of 10,000,000 characters for all the data in a batch.
    4. A field can contain a maximum of 32,000 characters.
    5. A record can contain a maximum of 5,000 fields.
    6. A record can contain a maximum of 400,000 characters for all its fields.
    7. A batch must contain some content or an error occurs.
    How we handle these:

    A,B,C - controlled by splitting the data correctly.
    E - checked during schema validation.
    D,F,G - if the data, which comes from sink, exceeds these, simply let the batch fail. Nothing we can do about these.

  3. How many records comes to a specific mapper. For more details see <TODO>.

STEP 4. Add batches to CSV

Pass a FileInputStream ByteArrayInputStream of csv file string to Bulk API and ask it to create a batch using this fileit.

STEP 5. Close Job

Ask Bulk API to close the job. This means that no more batches will be expected by Salesforce.

...

Code Block
Id,Success,Created,Error
fa4t2fggee,true,true,
rqewetrter,true,true,
,false,false,Field 'Name' is required and cannot be empty for sObject 'Contact'
gre3jvd245,true,true,

Records which has either success=false OR created=false are has success=false are considered erroneous. Erroneous are processed according to user configuration "Error handling". For more information look at section "User configurations".

...

RecordWritter#constructor

Create a tmp folder and establish Establish connection to Salesforce Bulk API

RecordWritter#write

Append a record to a csv fileStringBuilder. If according to the our batching policy we need the record to go into the new batch, we close the file and submit it submit the CSV string to the Salesforce job as a separate batch. After that the new file string is created and the record is appended to it.

For information on how we calculate batches please see Splitting data into batches.

RecordWritter#close

  1. Close currently opened csv file and submit it as Submit current CSV StringBuilder as a batch to Salesforce job.
  2. Wait for completion of EVERY batch which was submitted by current mapper.
  3. Check results for every record in EVERY batch (submitted by current mapper), and act on them according to the error handling strategy configured by user.

Other points

Anchor
#unknown_fields
#unknown_fields

...

Some points on behavior if schema contains fields which are not present or are not editable (creatable) in target sObject.

Let's consider a case where user wants to copy Contacts from one Salesforce instance to another. This is done by simply connecting Salesforce Source and Sink in ETL.

...

Validating schema

  1. SObject contains a lot of fields which cannot be inserted (non-creatable fields) like Id, isDeleted,

...

  1. LastModifiedDate and a lot of other fields which are often auto-generated.

...

These fields are different for every sObject. Good news is that we can query Salesforce SOAP API and check if any field in sObject is creatable.

...

  1. We do the early validation and check if schema contains fields which are not present or not creatable in target sObject.
  2. If operation is insert, check if external id field is in schema. 
  3. If operation is update, check if Id field is in schema.

Converting fields

We will have to convert logical types like date, datetime, time from long to string format accepted by Salesforce. Other types won't require converting.

...

I think handling of these fields should be consistent across all Salesforce plugins. Meaning sink needs to fail with similar error message, if somebody tries to insert a compound field directly. The message would state that user should fully qualify every sub-field he wants to insert. For example, if there is a 'BillingAddress' field in the sObject of type address, the sink will expect 'BillingCountry', 'BillingCity', etc fields.

Emitting errors cannot be done

It would be great to emit errors from this plugin, since it's very common for a part of records to fail to insert (but not all of them). But unfortunately SinkEmitter does not support emitting errors (codelink), also even if it did it would not help. We don't know if it's an error on transform stage. We only know that once the batches are submitted and results are checked. This happens of on task finalization at (RecordWritter#close)

There is 

Jira Legacy
serverCask Community Issue Tracker
serverId45b48dee-c8d6-34f0-9990-e6367dc2fe4b
keyCDAP-15453
 however to address this.

Anchor
upsert_and_update
upsert_and_update
Upsert and Update operations

Upsert - Salesforce requires user to provide an external id field name. This field is used as basis for upsert (Salesforce will decide if objects are the same using it).

Id field can be used for that (which is present for all Salesforce sObjects). Also user can create a custom field and checkmark it as "External Id" via Salesforce UI.

Update - Specifying an external id field is not supported. For updates Salesforce will always use 'Id' field as basis.