Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Removed schema.

Introduction 

Salesforce replication plugins can help users automatically replicate their Salesforce objects to specified destinations with only a few clicks.


Use-case
A user would like to specify multiple objects from Salesforce, that need to be replicated to corresponding objects in the destination (e.g. BigQuery tables, GCS directories). A user would specify a set of objects in the source, and would connect the appropriate sink (a BigQuery dataset, a GCS bucket). The pipeline would replicate data into the destination separately for all objects. The objects are not to be joined, but are to be replicated separately into their corresponding destinations. In addition, a user should be able to specify whether to replicate the object's data since the beginning of time, or since a given timestamp.

User Stories

  • As a data pipeline user, I would like to be able to create a pipeline to replicate a single Salesforce object to a specified destination, so that I can use the Salesforce object for analytics in my downstream processes.
  • As a data pipeline user, I would like to be able to create a pipeline to replicate multiple Salesforce objects to their corresponding destinations, so that I can use the Salesforce objects for analytics in my downstream processes.

Design considerations and constraints

Ideally, this use case would be solved using a separate pipeline template, that knows that there is a dependency between the source and the sink, in the sense that the sinks would only work with certain kinds of sources (that merely provide a list of objects to replicate). Another constraint in such a template would be that such a pipeline does not support any transformations between the source and the sink. This pipeline is basically a single source to many sinks pipeline. It does not require users to specify output schema or anything like that, because all of that is like a black box. Such a pipeline merely reads input objects, and replicates them as-is to destination(s). Perhaps this pipeline template exposes special kinds of plugins that can solve this use case.

However, given that that solution is more long term, this document will focus on a way to achieve this using the current data pipeline template. The proposal involves providing a Salesforce batch source for replication, that allows users to specify source objects to replicate. It then also provides a set of batch sinks (for BigQuery and GCS to start off with), that replicate data from these specific sources into their respective destinations.

Requirements

  • Support BigQuery and GCS as destinations
  • Select source object(s) and a destination. For each object, create a corresponding object in the destination (if it doesn't exist already), and push data to it. 
  • A user should be able to easily identify the destination object based on its name. The name should be unique, and must contain the source object name. For uniqueness purposes, more information may be encoded in the destination object name.
  • Support full (get all data since the beginning of time) and incremental (get data since the specified timestamp) modes. 
  • Automatically handle schema changes between source and destination. If a destination exists, but with a different schema from the source because the source's schema has been updated, the destination's schema should be updated.

Type of plugin

  1. Batch Source
  2. Batch Sink (s)

User flow

  1. User specifies Salesforce credentials (username, password, clientId and clientSecret)
    1. The source plugin determines the Salesforce instance to use based on these credentials
  2. User selects the Salesforce objects to replicate
    1. There may be a limitation here that CDAP currently does not have the capability to list all objects for the user to select from. If that's the case, we may require the user to manually enter the objects to begin with, and then enhance this experience later (perhaps when connections and plugins are unified)
  3. User specifies whether he wants to run a "full refresh" (pull all data since the beginning of time) or incremental (pull data since the specified timestamp).
  4. User specifies the destination. 
    1. This is perhaps a different set of sink plugins
    2. There is one such plugin for BigQuery and GCS
    3. For each such plugin, the user only specifies the top level container. e.g. For GCS, user specifies a bucket. For BigQuery, a dataset.
  5. For all Salesforce objects selected
    • The sink plugins create a corresponding object in the destination- e.g. a directory in the GCS bucket, a table in the BigQuery dataset.
    • The sink plugins ensure that the schema of the destination objects is consistent with the schema of the source, at the time of the run, and make an effort to resolve inconsistencies by updating the destination schema
      • Ignore deleted columns, add new columns.

Technical Design

Source Properties

loginUrl (default is 
SectionProperty NameDisplay NameTypeDescription
Authentication
  • clientId: Client ID from the connected app 
  • cllientSecret: Client Secret from the connected app
  • username: Username
  • password: Password
  • usernameUsernameStringSalesforce username.
    AuthenticationpasswordPasswordStringSalesforce password.
    AuthenticationclientIdClient IdString

    Application Client Id. This is also called "Consumer Key" in Salesforce UI. To create Client Id user needs to create a connected application in Salesforce first.

    AuthenticationclientSecretClient SecretString

    Application Client Secret. This is also called "Consumer Secret" in Salesforce UI. To create Client Secret user needs to create a connected application in Salesforce first.

    AuthenticationloginUrlLogin UrlStringSalesforce OAuth2 login url. Default - https://login.salesforce.com/services/oauth2/token
    ) For Salesforce sandbox runs login url is different. That's why user needs this option.datetimeFilter: SObject query datetime filter applied to system field `LastModifiedDate` 
    .
    SObject specificationwhiteListWhite ListString (values separated by comma)

    List of SObjects to replicate from the Salesforce. By default all the SObjects will be white listed. For each while listed SObject, SOQL query will be generated: `select <FIELD_1, FIELD_2, ..., FIELD_N> from ${sObjectName}`.

    SObject specificationblackListBlack ListString (values separated by comma)

    List of SObjects NOT to replicate from the Salesforce. By default NONE of the SObjects will be black listed.

    Incremental Load PropertiesdatetimeFilterDatetime FilterString

    SObject query datetime filter is applied to system field `LastModifiedDate` to allow incremental query. Filter is applied in `>` (greater than) comparison only and will be added to SObject query

    as "WHERE LastModifiedDate > 

    in a form of where clause (`WHERE LastModifiedDate > ${datetimeFilter}

    ". If value is not

    `). If value is not provided, it means all records to be read since the beginning of time.

    Expected formats: Salesforce Date Format (e.g. 2019-03-12T11:29:52Z) or Salesforce Date Literal (e.g. LAST_WEEK). duration: 
    Incremental Load PropertiesdurationDurationInteger

    SObject query duration filter is applied to system

    field `LastModifiedDate` 

    field `LastModifiedDate` to allow range query.

    Time unit is hours

    Duration units set to `hours`. For example, if duration is '6' (6 hours) and the pipeline runs at 9am, it will read data updated from 3am - 9am.

     Ignored if datetimeFilter is

    Ignored if `datetimeFilter` is provided.

    Incremental Load Propertiesoffset
    OffsetInteger

    SObject query offset filter is applied to system

    field `LastModifiedDate` 

    field `LastModifiedDate` to allow range query.

    Time unit is hours..

    Offset units set to `hours`. For example, if duration is '6' (6 hours) and the offset is '1' (1 hour) and the pipeline runs at 9am, it will read data updated from 2am - 8am.

     Ignored if datetimeFilter is tableNameField:

    Ignored if `datetimeFilter` is provided.

  • whiteList: List of SObjects to replicate from the Salesforce. By default all the SObjects will be white listed. For each while listed SObject, SOQL query will be generated: "select <FIELD_1, FIELD_2, ..., FIELD_N> from ${sObjectName}".

  • blackList: List of SObjects NOT to replicate from the Salesforce. By default NONE of the SObjects will be black listed.

  • AdvancederrorHandlingError HandlingString

    Strategy used to handle erroneous records. Acceptable values are Skip on error, Send to error, Stop on error.

    AdvancedsObjectNameFieldSObject Name FieldString

    The name of the field that holds the SObject name. Must not be the name of any SObject column that will be read. Defaults to 'tablename'

    .
  • errorHandling: Strategy used to handle erroneous records. Acceptable values are Skip on error, Send to error, Stop on error.
  • schema: Schema for SObjects to be replicated. Optional. Can be retrieved using Get Schema button. Each SObject is represented as record field, where record field name is SObject name, nested fields are fields that belong to this SObject

    .


    Example of multi source plugin for DB tables: https://github.com/data-integrations/multi-table-plugins/blob/develop/docs/MultiTableDatabase-batchsource.md

    Sink Properties

    BigQuery

    • (Optional) Project ID
    • (Optional) Service Account File Path
    • (Required) Dataset
    • (Optional)  Bucket (GCS bucket to store temporary data in)
    • (Optional) Split Field (tablename by default)

    Implementation: https://github.com/data-integrations/google-cloud/blob/develop/docs/BigQueryMultiTable-batchsink.md

    Schema management improvements will be covered in different design document (they will be applicable for both BigQuery Batch Sink and BigQuery Batch Multi Sink plugins: BigQuery Batch Sink: Schema Management

    GCS

    • (Optional) Project ID
    • (Optional) Service Account File Path
    • (Required) Path
    • (Optional) Path suffix (defaults to YYYYMMDDHHmm)
    • (Optional) Format (defaults to JSON)
    • (Optional) Codec
    • (Optional) Split Field
    • (Optional) Schema (for some formats as avro, parquet, orc)

    Implementation: https://github.com/data-integrations/google-cloud/blob/develop/docs/GCSMultiFiles-batchsink.md

    Schema management is not applicable for GCS since data is loaded in folders by timestamp. Each incremental run would have its own partition folder.

    Deliverables

    1. Batch source plugin for Salesforce
    2. Batch sink plugin for BigQuery and GCS
    3. Template pipelines using these plugins for the Hub

    Table of Contents

    Table of Contents
    stylecircle

    Checklist

    •  User stories documented 
    •  User stories reviewed 
    •  Design documented 
    •  Design reviewed 
    •  Feature merged 
    •  Examples and guides 
    •  Integration tests 
    •  Documentation for feature 
    •  Short video demonstrating the feature


    ...