Versions Compared
Key
- This line was added.
- This line was removed.
- Formatting was changed.
Introduction
Salesforce replication plugins can help users automatically replicate their Salesforce objects to specified destinations with only a few clicks.
User Stories
- As a data pipeline user, I would like to be able to create a pipeline to replicate a single Salesforce object to a specified destination, so that I can use the Salesforce object for analytics in my downstream processes.
- As a data pipeline user, I would like to be able to create a pipeline to replicate multiple Salesforce objects to their corresponding destinations, so that I can use the Salesforce objects for analytics in my downstream processes.
Design considerations and constraints
Ideally, this use case would be solved using a separate pipeline template, that knows that there is a dependency between the source and the sink, in the sense that the sinks would only work with certain kinds of sources (that merely provide a list of objects to replicate). Another constraint in such a template would be that such a pipeline does not support any transformations between the source and the sink. This pipeline is basically a single source to many sinks pipeline. It does not require users to specify output schema or anything like that, because all of that is like a black box. Such a pipeline merely reads input objects, and replicates them as-is to destination(s). Perhaps this pipeline template exposes special kinds of plugins that can solve this use case.
However, given that that solution is more long term, this document will focus on a way to achieve this using the current data pipeline template. The proposal involves providing a Salesforce batch source for replication, that allows users to specify source objects to replicate. It then also provides a set of batch sinks (for BigQuery and GCS to start off with), that replicate data from these specific sources into their respective destinations.
Requirements
- Support BigQuery and GCS as destinations
- Select source object(s) and a destination. For each object, create a corresponding object in the destination (if it doesn't exist already), and push data to it.
- A user should be able to easily identify the destination object based on its name. The name should be unique, and must contain the source object name. For uniqueness purposes, more information may be encoded in the destination object name.
- Support full (get all data since the beginning of time) and incremental (get data since the specified timestamp) modes.
- Automatically handle schema changes between source and destination. If a destination exists, but with a different schema from the source because the source's schema has been updated, the destination's schema should be updated.
Type of plugin
- Batch Source
- Batch Sink (s)
User flow
- User specifies Salesforce credentials (username, password, clientId and clientSecret)
- The source plugin determines the Salesforce instance to use based on these credentials
- User selects the Salesforce objects to replicate
- There may be a limitation here that CDAP currently does not have the capability to list all objects for the user to select from. If that's the case, we may require the user to manually enter the objects to begin with, and then enhance this experience later (perhaps when connections and plugins are unified)
- User specifies whether he wants to run a "full refresh" (pull all data since the beginning of time) or incremental (pull data since the specified timestamp).
- User specifies the destination.
- This is perhaps a different set of sink plugins
- There is one such plugin for BigQuery and GCS
- For each such plugin, the user only specifies the top level container. e.g. For GCS, user specifies a bucket. For BigQuery, a dataset.
- For all Salesforce objects selected
- The sink plugins create a corresponding object in the destination- e.g. a directory in the GCS bucket, a table in the BigQuery dataset.
- The sink plugins ensure that the schema of the destination objects is consistent with the schema of the source, at the time of the run, and make an effort to resolve inconsistencies by updating the destination schema
- Ignore deleted columns, add new columns.
Technical Design
Source Properties
loginUrl (default isSection | Property Name | Display Name | Type | Description |
---|---|---|---|---|
Authentication |
username | Username | String | Salesforce username. | |
Authentication | password | Password | String | Salesforce password. |
Authentication | clientId | Client Id | String | Application Client Id. This is also called "Consumer Key" in Salesforce UI. To create Client Id user needs to create a connected application in Salesforce first. |
Authentication | clientSecret | Client Secret | String | Application Client Secret. This is also called "Consumer Secret" in Salesforce UI. To create Client Secret user needs to create a connected application in Salesforce first. |
Authentication | loginUrl | Login Url | String | Salesforce OAuth2 login url. Default - https://login.salesforce.com/services/oauth2/token |
. | ||||
SObject specification | whiteList | White List | String (values separated by comma) | List of SObjects to replicate from the Salesforce. By default all the SObjects will be white listed. For each while listed SObject, SOQL query will be generated: `select <FIELD_1, FIELD_2, ..., FIELD_N> from ${sObjectName}`. |
SObject specification | blackList | Black List | String (values separated by comma) | List of SObjects NOT to replicate from the Salesforce. By default NONE of the SObjects will be black listed. |
Incremental Load Properties | datetimeFilter | Datetime Filter | String | SObject query datetime filter is applied to system field `LastModifiedDate` to allow incremental query. Filter is applied in `>` (greater than) comparison only and will be added to SObject query |
in a form of where clause (`WHERE LastModifiedDate > ${datetimeFilter} |
`). If value is not provided, it means all records to be read since the beginning of time. |
Incremental Load Properties | duration | Duration | Integer | SObject query duration filter is applied to system |
field `LastModifiedDate` to allow range query. |
Duration units set to `hours`. For example, if duration is '6' (6 hours) and the pipeline runs at 9am, it will read data updated from 3am - 9am. |
Ignored if `datetimeFilter` is provided. | |
Incremental Load Properties | offset |
Offset | Integer | SObject query offset filter is applied to system |
field `LastModifiedDate` to allow range query. |
Offset units set to `hours`. For example, if duration is '6' (6 hours) and the offset is '1' (1 hour) and the pipeline runs at 9am, it will read data updated from 2am - 8am. |
Ignored if `datetimeFilter` is provided. |
whiteList: List of SObjects to replicate from the Salesforce. By default all the SObjects will be white listed. For each while listed SObject, SOQL query will be generated: "select <FIELD_1, FIELD_2, ..., FIELD_N> from ${sObjectName}".
blackList: List of SObjects NOT to replicate from the Salesforce. By default NONE of the SObjects will be black listed.
Advanced | errorHandling | Error Handling | String | Strategy used to handle erroneous records. Acceptable values are Skip on error, Send to error, Stop on error. |
Advanced | sObjectNameField | SObject Name Field | String | The name of the field that holds the SObject name. Must not be the name of any SObject column that will be read. Defaults to 'tablename' |
. |
Example of multi source plugin for DB tables: https://github.com/data-integrations/multi-table-plugins/blob/develop/docs/MultiTableDatabase-batchsource.md
Sink Properties
BigQuery
- (Optional) Project ID
- (Optional) Service Account File Path
- (Required) Dataset
- (Optional) Bucket (GCS bucket to store temporary data in)
- (Optional) Split Field (tablename by default)
Implementation: https://github.com/data-integrations/google-cloud/blob/develop/docs/BigQueryMultiTable-batchsink.md
Schema management improvements will be covered in different design document (they will be applicable for both BigQuery Batch Sink and BigQuery Batch Multi Sink plugins: BigQuery Batch Sink: Schema Management
GCS
- (Optional) Project ID
- (Optional) Service Account File Path
- (Required) Path
- (Optional) Path suffix (defaults to YYYYMMDDHHmm)
- (Optional) Format (defaults to JSON)
- (Optional) Codec
- (Optional) Split Field
- (Optional) Schema (for some formats as avro, parquet, orc)
Implementation: https://github.com/data-integrations/google-cloud/blob/develop/docs/GCSMultiFiles-batchsink.md
Schema management is not applicable for GCS since data is loaded in folders by timestamp. Each incremental run would have its own partition folder.
Deliverables
- Batch source plugin for Salesforce
- Batch sink plugin for BigQuery and GCS
- Template pipelines using these plugins for the Hub
Table of Contents
Table of Contents style circle
Checklist
- User stories documented
- User stories reviewed
- Design documented
- Design reviewed
- Feature merged
- Examples and guides
- Integration tests
- Documentation for feature
- Short video demonstrating the feature
...