Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The salesforce batch sink is responsible for using Salesforce API to upsert salesforce objects. The sink should handle large batches of data (~10 GB) and should handle all object types - contacts, campaigns, oppurtunities, leads, custom objects. Users should be able to upload subset of fields 

...

SectionUser Configuration LabelDescriptionDefaultUser Widget
Early Validations
AuthenticationUsernameSalesforce username
Text Box

Try a to login to bulk API with given credentials.





Password

Password

Consumer KeyConsumer Key from the connected app 
Text Box

Consumer SecretConsumer Secret from the connected app 
Password

Login UrlFor Salesforce sandbox runs login url is different. That's why user needs to have this option.https://login.salesforce.com/services/oauth2/tokenText Box
AdvancedSObjectName of Salesforce sObject - ex: Contact, Campaign, Oppurtunity.
Text BoxCheck if sObject with given name exists in Bulk API.

Maximum bytes per batchIf size of batch data is larger than given number of bytes, split the batch.10,000,000 [2]Text BoxIf more than 10,000,000 than fail [3]

Maximum records per batchIf there are more than given number of records, split the batch.10,000 [2]Text BoxIf more than 10,000 fail [4]

Error handling

Bulk API will return success results per row so this is necessary [1] (unlike for source plugins).

Possible values:

"Skip on error" - ignores any reports about records not inserted
"Send to error" - sends records which failed to insert to error handler
"Stop on error" - fails pipeline is any of records were failed on insertion


Select

...

The job type is set to insert, not an upsert. Since for upsert we need to know IDs for every record we update.

Anchor
split_data
split_data
STEP 3. Split data into batches

Here we create multiple CSV files. Every of them contains multitude of records.

...

BatchSink class uses NullWritable as key and CSVRecord as value.


BatchSink#initialize

Load creatable fields

BatchSink#transform

convert

OutputCommitter#

OutputCommitter#

OutputCommitter#

Other TODOs:

...

Check which fields for given sObject are creatable by querying Salesforce SOAP API (for more information see "Policy regarding unknown or non-creatable fields")

BatchSink#transform

Create a CSVRecord instance using the StructuredRecord. In process convert logical types to expected format.

OutputCommitter#setupJob

Create Salesforce job, which the batches will be added to.

OutputCommitter#commitJob

Close Salesforce job.

RecordWritter#constructor

Create a tmp folder and establish connection to Salesforce Bulk API

RecordWritter#write

Append a record to a csv file. If according to the our batching policy we need the record to go into the new batch, we close the file and submit it to the Salesforce job as a separate batch. After that the new file is created and the record is appended to it.

For information on how we calculate batches please see Splitting data into batches.

RecordWritter#close

  1. Close currently opened csv file and submit it as a batch to Salesforce job.
  2. Wait for completion of EVERY batch which was submitted by current mapper.
  3. Check results for every record in EVERY batch (submitted by current mapper), and act on them according to the error handling strategy configured by user.

Other points

Anchor
#unknown_fields
#unknown_fields
Policy regarding unknown or non-creatable fields

Some points on behavior if schema contains fields which are not present or are not editable (creatable) in target sObject.

Let's consider a case where user wants to copy Contacts from one Salesforce instance to another. This is done by simply connecting Salesforce Source and Sink in ETL.

But this will inevitably fail, the reason for this is that any sObject contains a lot of fields which cannot be inserted like Id, isDeleted, LastModified and a lot of other fields which are often auto-generated.

These fields are different for every sObject. Good news is that we can query Salesforce SOAP API and check if any field in sObject is creatable.

Based on above said, I propose that we skip any non-creatable fields or fields that do not exist. We simply produce a log message with the list of fields that were ignored.

Converting fields

We will have to convert logical types like date, datetime, time from long to string format accepted by Salesforce. Other types won't require converting.

Typeformat expected by Salesforce
dateyyyy-mm-dd
datetimeISO 8601
timeHH:mm:ss,SSS

Compound fields

For source plugins we don't use nested records for compound fields. Instead we ask user to provide sub-fields for every compound object separately.


Compound fields %s cannot be fetched when a SOQL query is given.
Please specify the individual attributes instead of compound field name in SOQL query.
For example, instead of ‘Select BillingAddress ...’, use ‘Select BillingCountry, BillingCity, BillingStreet ...

I think handling of these fields should be consistent across all Salesforce plugins. Meaning sink needs to fail with similar error message, if somebody tries to insert a compound field directly. The message would state that user should fully qualify every sub-field he wants to insert.