...
The proposal is to use an internal stage since files can then be downloaded directly using SnowflakeConnection#downloadStream method.
Sink Plugin
For the Sink, it's possible to write data to the internal stage files first(according to the File Sizing Recommendations) and then use COPY INTO <table> command.
Example of loading files from a named internal stage into a table:
Code Block |
---|
copy into mytable
from @my_int_stage; |
Data Files can be uploaded directly from a Stream to an Internal Stage:
Code Block |
---|
Connection connection = DriverManager.getConnection(url, prop);
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);
// upload file stream to user stage
connection.unwrap(SnowflakeConnection.class).uploadStream("MYSTAGE", "testUploadStream",
fileInputStream, "destFile.csv", true); |
Files can be staged in one of the following locations:
- Named internal stage (or table/user stage). Files can be staged using the PUT command.
- Named external stage that references an external location (AWS S3, Google Cloud Storage, or Microsoft Azure).
- External location (AWS S3, Google Cloud Storage, or Microsoft Azure).
The proposal is to use an internal stage since files can be uploaded directly using SnowflakeConnection#uploadStream method.
Option 3
Although it is possible to create a Batch Sink Plugin that will accept files' locations and utilize COPY INTO <table> command (or Snowpipe) to load data files into Snowflake, it seems that it's not a good idea:
- COPY INTO <table> command uses locations of the raw data files, so it won't be possible to use this sink in the same way as other Database Sinks.
- No transformations can be done on actual data on the CDAP since we operate on file locations and not on the actual data. It is still possible to perform a transformation on the Snowflake side using Transformation Parameters.
Source Properties
...
Source Splitter
The proposal is to determine the number of splits according to the number of staged files that were created using COPY INTO <location> command. The number of resulting files can be controlled using MAX_FILE_SIZE Copy Options. The proposal is to add "Maximum Split Size" Source configuration property which will use MAX_FILE_SIZE copy option.
LIST command returns a list of files that have been staged.
Example of listing the files that match a regular expression (i.e. all file names containing the string data_0) in a named stage (my_csv_stage) with a prefix (/analysis/):
Code Block |
---|
list @my_csv_stage/analysis/ pattern='.*data_0.*';
+--------------------+------+----------------------------------+------------------------------+
| name | size | md5 | last_modified |
|--------------------+------+----------------------------------+------------------------------|
| employees01.csv.gz | 288 | a851f2cc56138b0cd16cb603a97e74b1 | Tue, 9 Jan 2018 15:31:44 GMT |
| employees02.csv.gz | 288 | 125f5645ea500b0fde0cdd5f54029db9 | Tue, 9 Jan 2018 15:31:44 GMT |
| employees03.csv.gz | 304 | eafee33d3e62f079a054260503ddb921 | Tue, 9 Jan 2018 15:31:45 GMT |
| employees04.csv.gz | 304 | 9984ab077684fbcec93ae37479fa2f4d | Tue, 9 Jan 2018 15:31:44 GMT |
| employees05.csv.gz | 304 | 8ad4dc63a095332e158786cb6e8532d0 | Tue, 9 Jan 2018 15:31:44 GMT |
+--------------------+------+----------------------------------+------------------------------+ |
Sink Plugin
For the Sink, it's possible to write data to the internal stage files first(according to the File Sizing Recommendations) and then use COPY INTO <table> command.
Example of loading files from a named internal stage into a table:
Code Block |
---|
copy into mytable
from @my_int_stage; |
Data Files can be uploaded directly from a Stream to an Internal Stage:
Code Block |
---|
Connection connection = DriverManager.getConnection(url, prop);
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);
// upload file stream to user stage
connection.unwrap(SnowflakeConnection.class).uploadStream("MYSTAGE", "testUploadStream",
fileInputStream, "destFile.csv", true); |
Files can be staged in one of the following locations:
- Named internal stage (or table/user stage). Files can be staged using the PUT command.
- Named external stage that references an external location (AWS S3, Google Cloud Storage, or Microsoft Azure).
- External location (AWS S3, Google Cloud Storage, or Microsoft Azure).
The proposal is to use an internal stage since files can be uploaded directly using SnowflakeConnection#uploadStream method.
Option 3
Although it is possible to create a Batch Sink Plugin that will accept files' locations and utilize COPY INTO <table> command (or Snowpipe) to load data files into Snowflake, it seems that it's not a good idea:
- COPY INTO <table> command uses locations of the raw data files, so it won't be possible to use this sink in the same way as other Database Sinks.
- No transformations can be done on actual data on the CDAP since we operate on file locations and not on the actual data. It is still possible to perform a transformation on the Snowflake side using Transformation Parameters.
Source Properties
Option 1
Section | User Configuration Label | Label Description | Options | Default | Variable | User Widget |
---|---|---|---|---|---|---|
General | Label | Label for UI. | textbox | |||
Reference Name | Uniquely identified name for lineage. | referenceName | textbox | |||
Account Name | Full name of Snowflake account. | accountName | textbox | |||
Database | Database name to connect to. | database | textbox | |||
Import Query | Query for import data. | importQuery | textarea | |||
Credentials | Username | User identity for connecting to the specified database. | username | textbox | ||
Password | Password to use to connect to the specified database. | password | password | |||
Key Pair Authentication | Key Pair Authentication Enabled | If true, plugin will perform Key Pair authentication. |
| False | keyPairEnabled | toggle |
Key File Path | Path to the private key file. | path | textbox | |||
OAuth2 | OAuth2 Enabled | If true, plugin will perform OAuth2 authentication. |
| False | oauth2Enabled | toggle |
Auth URL | Endpoint for the authorization server used to retrieve the authorization code. | authUrl | textbox | |||
Token URL | Endpoint for the resource server, which exchanges the authorization code for an access token. | tokenUrl | textbox | |||
Client ID | Client identifier obtained during the Application registration process. | clientId | textbox | |||
Client Secret | Client secret obtained during the Application registration process. | clientSecret | password | |||
Scopes | Scope of the access request, which might have multiple space-separated values. | scopes | textbox | |||
Refresh Token | Token used to receive accessToken, which is end product of OAuth2. | refreshToken | textbox | |||
Advanced | Bounding Query | Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one. | boundingQuery | textarea | ||
Split-By Field Name | Field Name which will be used to generate splits. Not required if numSplits is set to one. | splitBy | textbox | |||
Number of Splits to Generate | Number of splits to generate. | numSplits | textbox | |||
Connection Arguments | A list of arbitrary string tag/value pairs as connection arguments. See: https://docs.snowflake.net/manuals/user-guide/jdbc-configure.html#jdbc-driver-connection-string | connectionArguments | keyvalue |
Notes:
- Please, refer Plugin OAuth2 Common Module for OAuth2 common module design information.
Option 2
Section | User Configuration Label | Label Description | Options | Default | Variable | User Widget | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
General | Label | Label for UI. | textbox | |||||||||||||||||||||||||
Reference Name | Uniquely identified name for lineage. | referenceName | textbox | |||||||||||||||||||||||||
Account Name | Full name of Snowflake account. | accountName | textbox | |||||||||||||||||||||||||
Database | Database name to connect to. | database | textbox | |||||||||||||||||||||||||
Import Query | Query for import data. | importQuery | textarea | |||||||||||||||||||||||||
Credentials | Username | User identity for connecting to the specified database. | username | textbox | ||||||||||||||||||||||||
Password | Password to use to connect to the specified database. | password | password | |||||||||||||||||||||||||
Key Pair Authentication | Key Pair Authentication Enabled | If true, plugin will perform Key Pair authentication. |
| False | keyPairEnabled | toggle | ||||||||||||||||||||||
Key File Path | Path to the private key file. | path | textbox | |||||||||||||||||||||||||
OAuth2 | OAuth2 Enabled | If true, plugin will perform OAuth2 authentication. |
| False | oauth2Enabled | toggle | ||||||||||||||||||||||
Auth URL | Endpoint for the authorization server used to retrieve the authorization code. | authUrl | textbox | |||||||||||||||||||||||||
Token URL | Endpoint for the resource server, which exchanges the authorization code for an access token. | tokenUrl | textbox | |||||||||||||||||||||||||
Client ID | Client identifier obtained during the Application registration process. | clientId | textbox | |||||||||||||||||||||||||
Client Secret | Client secret obtained during the Application registration process. | clientSecret | password | Scopes | Scope of the access request, which might have multiple space-separated values. | scopes | textbox | Refresh Token | Token used to receive accessToken, which is end product of OAuth2. | refreshToken | textbox | Advanced | Bounding Query | Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one. | boundingQuery | textarea | Split-By Field Name | Field Name which will be used to generate splits. Not required if numSplits is set to one. | splitBy | textbox | Number of Splits to Generate | Number of splits to generate. | numSplits | textbox | password | |||
Scopes | Scope of the access request, which might have multiple space-separated values. | scopes | textbox | |||||||||||||||||||||||||
Refresh Token | Token used to receive accessToken, which is end product of OAuth2. | refreshToken | textbox | |||||||||||||||||||||||||
Advanced | Maximum Split Size | Maximum split size specified in bytes | maxSplitSize | number | ||||||||||||||||||||||||
Connection Arguments | A list of arbitrary string tag/value pairs as connection arguments. See: https://docs.snowflake.net/manuals/user-guide/jdbc-configure.html#jdbc-driver-connection-string | connectionArguments | keyvalue |
Notes:
- Please, refer Plugin OAuth2 Common Module for OAuth2 common module design information
Option 2
TODO
- The table above is similar to the Option1.1 one except of splitter-related properties. Please, refer to the Design section for the proposal of the splitter design.
Source Data Types Mapping
...
Sink Properties
...
Section | User Configuration Label | Label Description | Options | Default | Variable | User Widget |
---|---|---|---|---|---|---|
General | Label | Label for UI. | textbox | |||
Reference Name | Uniquely identified name for lineage. | referenceName | textbox | |||
Account Name | Full name of Snowflake account. | accountName | textbox | |||
Database | Database name to connect to connect to. | database | textbox | |||
Table Name | Name of a database table to write to. | table | textbox | |||
Credentials | Username | User identity for connecting to the specified database. | username | textbox | ||
Password | Password to use to connect to the specified database. | password | password | |||
Key Pair Authentication | Key Pair Authentication Enabled | If true, plugin will perform Key Pair authentication. |
| False | keyPairEnabled | toggle |
Key File Path | Path to the private key file. | path | textbox | |||
OAuth2 | OAuth2 Enabled | If true, plugin will perform OAuth2 authentication. |
| False | oauth2Enabled | toggle |
Auth URL | Endpoint for the authorization server used to retrieve the authorization code. | authUrl | textbox | |||
Token URL | Endpoint for the resource server, which exchanges the authorization code for an access token. | tokenUrl | textbox | |||
Client ID | Client identifier obtained during the Application registration process. | clientId | textbox | |||
Client Secret | Client secret obtained during the Application registration process. | clientSecret | password | |||
Scopes | Scope of the access request, which might have multiple space-separated values. | scopes | textbox | |||
Refresh Token | Token used to receive accessToken, which is end product of OAuth2. | refreshToken | textbox | |||
Advanced | Connection Arguments | A list of arbitrary string tag/value pairs as connection arguments. See: https://docs.snowflake.net/manuals/user-guide/jdbc-configure.html#jdbc-driver-connection-string | connectionArguments | keyvalue |
Option 2
...
Sink Data Types Mapping
CDAP Schema Data Type | Snowflake Data Types | Comment |
---|---|---|
boolean | BOOLEAN | |
bytes | BINARY | |
date | DATE | |
double | FLOAT | Snowflake uses double-precision (64 bit) IEEE 754 floating point numbers. |
decimal | NUMBER(s, p) | |
float | FLOAT | |
int | NUMBER(s, p) | Where p >= 10. It's safe to write primitives as values of decimal logical type in the case of valid precision. |
long | NUMBER(s, p) | Where p >= 19. It's safe to write primitives as values of decimal logical type in the case of valid precision. |
string | VARCHAR | |
time | TIME | |
timestamp | TIMESTAMP_NTZ | |
array | ARRAY | |
record | OBJECT | |
enum | VARCHAR | |
map | OBJECT | |
union | VARIANT |
...