Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposal is to use an internal stage since files can then be downloaded directly using SnowflakeConnection#downloadStream method.

Sink Plugin

For the Sink, it's possible to write data to the internal stage files first(according to the File Sizing Recommendations) and then use COPY INTO <table> command.

Example of loading files from a named internal stage into a table:

Code Block
copy into mytable
from @my_int_stage;

Data Files can be uploaded directly from a Stream to an Internal Stage:

Code Block
Connection connection = DriverManager.getConnection(url, prop);
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);

// upload file stream to user stage
connection.unwrap(SnowflakeConnection.class).uploadStream("MYSTAGE", "testUploadStream",
   fileInputStream, "destFile.csv", true);

Files can be staged in one of the following locations:

  • Named internal stage (or table/user stage). Files can be staged using the PUT command.
  • Named external stage that references an external location (AWS S3, Google Cloud Storage, or Microsoft Azure).
  • External location (AWS S3, Google Cloud Storage, or Microsoft Azure).

The proposal is to use an internal stage since files can be uploaded directly using SnowflakeConnection#uploadStream method.

Option 3

Although it is possible to create a Batch Sink Plugin that will accept files' locations and utilize COPY INTO <table> command (or Snowpipe) to load data files into Snowflake, it seems that it's not a good idea:

  • COPY INTO <table> command uses locations of the raw data files, so it won't be possible to use this sink in the same way as other Database Sinks.
  • No transformations can be done on actual data on the CDAP since we operate on file locations and not on the actual data. It is still possible to perform a transformation on the Snowflake side using Transformation Parameters.

Source Properties

...

Source Splitter

The proposal is to determine the number of splits according to the number of staged files that were created using COPY INTO <location> command. The number of resulting files can be controlled using MAX_FILE_SIZE Copy Options. The proposal is to add "Maximum Split Size" Source configuration property which will use MAX_FILE_SIZE copy option.  

LIST command returns a list of files that have been staged.

Example of listing the files that match a regular expression (i.e. all file names containing the string data_0) in a named stage (my_csv_stage) with a prefix (/analysis/):

Code Block
list @my_csv_stage/analysis/ pattern='.*data_0.*';

+--------------------+------+----------------------------------+------------------------------+
| name               | size | md5                              | last_modified                |
|--------------------+------+----------------------------------+------------------------------|
| employees01.csv.gz |  288 | a851f2cc56138b0cd16cb603a97e74b1 | Tue, 9 Jan 2018 15:31:44 GMT |
| employees02.csv.gz |  288 | 125f5645ea500b0fde0cdd5f54029db9 | Tue, 9 Jan 2018 15:31:44 GMT |
| employees03.csv.gz |  304 | eafee33d3e62f079a054260503ddb921 | Tue, 9 Jan 2018 15:31:45 GMT |
| employees04.csv.gz |  304 | 9984ab077684fbcec93ae37479fa2f4d | Tue, 9 Jan 2018 15:31:44 GMT |
| employees05.csv.gz |  304 | 8ad4dc63a095332e158786cb6e8532d0 | Tue, 9 Jan 2018 15:31:44 GMT |
+--------------------+------+----------------------------------+------------------------------+


Sink Plugin

For the Sink, it's possible to write data to the internal stage files first(according to the File Sizing Recommendations) and then use COPY INTO <table> command.

Example of loading files from a named internal stage into a table:

Code Block
copy into mytable
from @my_int_stage;


Data Files can be uploaded directly from a Stream to an Internal Stage:

Code Block
Connection connection = DriverManager.getConnection(url, prop);
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);

// upload file stream to user stage
connection.unwrap(SnowflakeConnection.class).uploadStream("MYSTAGE", "testUploadStream",
   fileInputStream, "destFile.csv", true);

Files can be staged in one of the following locations:

  • Named internal stage (or table/user stage). Files can be staged using the PUT command.
  • Named external stage that references an external location (AWS S3, Google Cloud Storage, or Microsoft Azure).
  • External location (AWS S3, Google Cloud Storage, or Microsoft Azure).

The proposal is to use an internal stage since files can be uploaded directly using SnowflakeConnection#uploadStream method.

Option 3

Although it is possible to create a Batch Sink Plugin that will accept files' locations and utilize COPY INTO <table> command (or Snowpipe) to load data files into Snowflake, it seems that it's not a good idea:

  • COPY INTO <table> command uses locations of the raw data files, so it won't be possible to use this sink in the same way as other Database Sinks.
  • No transformations can be done on actual data on the CDAP since we operate on file locations and not on the actual data. It is still possible to perform a transformation on the Snowflake side using Transformation Parameters.

Source Properties

Option 1

Section

User Configuration LabelLabel DescriptionOptionsDefaultVariableUser Widget
GeneralLabelLabel for UI.


textbox

Reference NameUniquely identified name for lineage.

referenceNametextbox

Account NameFull name of Snowflake account.

accountNametextbox

Database

Database name to connect to.



databasetextbox

Import Query

Query for import data.



importQuerytextarea
CredentialsUsernameUser identity for connecting to the specified database.

usernametextbox

PasswordPassword to use to connect to the specified database.

passwordpassword
Key Pair AuthenticationKey Pair Authentication EnabledIf true, plugin will perform Key Pair authentication.
  • True
  • False
FalsekeyPairEnabledtoggle

Key File PathPath to the private key file.

pathtextbox
OAuth2OAuth2 EnabledIf true, plugin will perform OAuth2 authentication.
  • True
  • False
Falseoauth2Enabledtoggle

Auth URLEndpoint for the authorization server used to retrieve the authorization code.

authUrltextbox

Token URLEndpoint for the resource server, which exchanges the authorization code for an access token.

tokenUrltextbox

Client IDClient identifier obtained during the Application registration process.

clientIdtextbox

Client SecretClient secret obtained during the Application registration process.

clientSecretpassword

ScopesScope of the access request, which might have multiple space-separated values.

scopestextbox

Refresh TokenToken used to receive accessToken, which is end product of OAuth2.

refreshTokentextbox
AdvancedBounding QueryBounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.

boundingQuerytextarea

Split-By Field NameField Name which will be used to generate splits. Not required if numSplits is set to one.

splitBytextbox

Number of Splits to GenerateNumber of splits to generate.

numSplitstextbox

Connection ArgumentsA list of arbitrary string tag/value pairs as connection arguments. See: https://docs.snowflake.net/manuals/user-guide/jdbc-configure.html#jdbc-driver-connection-string

connectionArgumentskeyvalue

Notes:

Option 2

Section

User Configuration LabelLabel DescriptionOptionsDefaultVariableUser Widget
GeneralLabelLabel for UI.


textbox

Reference NameUniquely identified name for lineage.

referenceNametextbox

Account NameFull name of Snowflake account.

accountNametextbox

Database

Database name to connect to.



databasetextbox

Import Query

Query for import data.



importQuerytextarea
CredentialsUsernameUser identity for connecting to the specified database.

usernametextbox

PasswordPassword to use to connect to the specified database.

passwordpassword
Key Pair AuthenticationKey Pair Authentication EnabledIf true, plugin will perform Key Pair authentication.
  • True
  • False
FalsekeyPairEnabledtoggle

Key File PathPath to the private key file.

pathtextbox
OAuth2OAuth2 EnabledIf true, plugin will perform OAuth2 authentication.
  • True
  • False
Falseoauth2Enabledtoggle

Auth URLEndpoint for the authorization server used to retrieve the authorization code.

authUrltextbox

Token URLEndpoint for the resource server, which exchanges the authorization code for an access token.

tokenUrltextbox

Client IDClient identifier obtained during the Application registration process.

clientIdtextbox

Client SecretClient secret obtained during the Application registration process.

clientSecretpasswordScopesScope of the access request, which might have multiple space-separated values.scopestextboxRefresh TokenToken used to receive accessToken, which is end product of OAuth2.refreshTokentextboxAdvancedBounding QueryBounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.boundingQuerytextareaSplit-By Field NameField Name which will be used to generate splits. Not required if numSplits is set to one.splitBytextboxNumber of Splits to GenerateNumber of splits to generate.numSplitstextboxpassword

ScopesScope of the access request, which might have multiple space-separated values.

scopestextbox

Refresh TokenToken used to receive accessToken, which is end product of OAuth2.

refreshTokentextbox
AdvancedMaximum Split SizeMaximum split size specified in bytes

maxSplitSizenumber

Connection ArgumentsA list of arbitrary string tag/value pairs as connection arguments. See: https://docs.snowflake.net/manuals/user-guide/jdbc-configure.html#jdbc-driver-connection-string

connectionArgumentskeyvalue

Notes:

Option 2

TODO

  • The table above is similar to the Option1.1 one except of splitter-related properties. Please, refer to the Design section for the proposal of the splitter design. 

Source Data Types Mapping

...

Sink Properties

...

SectionUser Configuration LabelLabel DescriptionOptionsDefaultVariableUser Widget
GeneralLabelLabel for UI.


textbox

Reference NameUniquely identified name for lineage.

referenceNametextbox

Account NameFull name of Snowflake account.

accountNametextbox

Database

Database name to connect to connect to.



databasetextbox

Table Name

Name of a database table to write to.



tabletextbox
CredentialsUsernameUser identity for connecting to the specified database.

usernametextbox

PasswordPassword to use to connect to the specified database.

passwordpassword
Key Pair AuthenticationKey Pair Authentication EnabledIf true, plugin will perform Key Pair authentication.
  • True
  • False
FalsekeyPairEnabledtoggle

Key File PathPath to the private key file.

pathtextbox
OAuth2OAuth2 EnabledIf true, plugin will perform OAuth2 authentication.
  • True
  • False
Falseoauth2Enabledtoggle

Auth URLEndpoint for the authorization server used to retrieve the authorization code.

authUrltextbox

Token URLEndpoint for the resource server, which exchanges the authorization code for an access token.

tokenUrltextbox

Client IDClient identifier obtained during the Application registration process.

clientIdtextbox

Client SecretClient secret obtained during the Application registration process.

clientSecretpassword

ScopesScope of the access request, which might have multiple space-separated values.

scopestextbox

Refresh TokenToken used to receive accessToken, which is end product of OAuth2.

refreshTokentextbox
AdvancedConnection ArgumentsA list of arbitrary string tag/value pairs as connection arguments. See: https://docs.snowflake.net/manuals/user-guide/jdbc-configure.html#jdbc-driver-connection-string

connectionArgumentskeyvalue

Option 2

...

Sink Data Types Mapping

CDAP Schema Data TypeSnowflake Data TypesComment
booleanBOOLEAN
bytesBINARY
dateDATE
doubleFLOATSnowflake uses double-precision (64 bit) IEEE 754 floating point numbers.
decimalNUMBER(s, p)
floatFLOAT
intNUMBER(s, p)

Where p >= 10.


It's safe to write primitives as values of decimal logical type in the case of valid precision.

longNUMBER(s, p)

Where p >= 19.


It's safe to write primitives as values of decimal logical type in the case of valid precision.

stringVARCHAR
timeTIME
timestampTIMESTAMP_NTZ
arrayARRAY
recordOBJECT
enumVARCHAR
mapOBJECT
unionVARIANT

...