Couchbase database plugin

Introduction

A separate database plugin to support Couchbase-specific features and configurations.

Use-Case

  • Users can choose and install Couchbase source and sink plugins.
  • Users should see Couchbase logo on plugin configuration page for better experience.
  • Users should get relevant information from the tool tip:
    • The tool tip should describe accurately what each field is used for.
  • Users should not have to specify any redundant configuration
  • Users should get field level lineage for the source and sink that is being used.
  • Reference documentation should be updated to account for the changes.
  • The source code for Couchbase database plugin should be placed in repo under data-integrations org.
  • The data pipeline using source and sink plugins should run on both mapreduce and spark engines.

User Stories

  • User should be able to install Couchbase specific database source and sink plugins from the Hub
  • Users should have each tool tip accurately describe what each field does
  • Users should get field level lineage information for the Couchbase source and sink 
  • Users should be able to setup a pipeline avoiding specifying redundant information
  • Users should get updated reference document for Couchbase source and sink
  • Users should be able to read all the DB types

Plugin Type

  • Batch Source
  • Batch Sink 
  • Real-time Source
  • Real-time Sink
  • Action
  • Post-Run Action
  • Aggregate
  • Join
  • Spark Model
  • Spark Compute

Couchbase Overview

Couchbase Server is an open-source, distributed, NoSQL document-oriented engagement database. It exposes a fast key-value store with managed cache for sub-millisecond data operations, purpose-built indexers for fast queries and a powerful query engine for executing SQL-like queries. Sub-millisecond data operations are provided by powerful services for querying and indexing, and by a feature-rich, document-oriented query-language, N1QL. Multiple instances of Couchbase Server can be combined into a single cluster. Couchbase Server stores data as items. Each item consists of a key, by which the item is referenced; and an associated value, which must be either binary or a JSON document. Items are stored in named Buckets

The Couchbase Data Model

The Couchbase data model is based on JSON, which provides a simple, lightweight, human-readable notation. It supports basic data types, such as numbers and strings; and complex types, such as embedded documents and arrays. Couchbase Server does not enforce uniformity: document-structures can vary.

N1QL Query Language

N1QL Query Language embraces the JSON document model and uses SQL-like syntax. In N1QL, you operate on JSON documents, and the result of your operation is another JSON document. 

A basic N1QL query has the following parts:

  • SELECT — The fields of each document to return.
  • FROM — The data bucket in which to look.
  • WHERE — The conditions that the document must satisfy.

Here’s an example of a basic N1QL query and the JSON document it returns. The query asks for the country that is associated with the airline Excel Airways:

SELECT country FROM `travel-sample` WHERE name = "Excel Airways";


Note that for all identifiers (bucket names) that contain a hyphen character, the name is needed to be enclosed with backtick (`) characters.
The results:

{
    "requestID": "9e1cd084-f45e-4059-9e7a-edec30f60dd2",
    "signature": {
        "country": "json"
	},
    "results": [
        {
            "country": "United Kingdom"
        }
    ],
    "status": "success",
	"metrics": {
        "elapsedTime": "7.42097249s",
        "executionTime": "7.420925841s",
        "resultCount": 1,
        "resultSize": 51
    }
}


Design Tips

Couchbase Java SDK reference: https://docs.couchbase.com/java-sdk/2.7/start-using-sdk.html


Design

The suggestion is to create a new maven project in it's own repository: https://github.com/data-integrations/couchbase 

Although Couchbase has JDBC drivers provided by Simba and cdata, they are paid products. So, it seems that a better solution is to use the Official Couchbase Java SDK with the Apache-2.0 license.

Compatibility

The suggestion is to use Java SDK 2.7 which is compatible with Couchbase Server versions 4.0-4.5, 4.6, 5.0-5.5, 6.0.

Couchbase Server 3.x can not be supported.

Output schema inference

Couchbase Server 4.5 introduces INFER, a N1QL statement that infers the metadata of documents. This can be used to infer the Output Schema.

For sample `test-bucket` bucket, which contains 3 documents as below:

Bucket documents
	[
        {
            "test-bucket": {
                "boolean": false,
                "boolean_array": [],
                "created": 666028241280339,
                "date_as_string": "2006-03-02T15:04:05.567+08:00",
                "null": null,
                "number_array": [],
                "number_big_int": 0,
                "number_decimal": 0,
                "number_double": 0,
                "number_int": 0,
                "number_long": 0,
                "object": {
                    "key": "value"
                },
                "object_array": [],
                "object_map": {
                    "key": "value"
                },
                "string": ""
            }
        },
        {
            "test-bucket": {
                "boolean": false,
                "boolean_array": [
                    true,
                    false
                ],
                "created": 666028241251904,
                "date_as_string": "2006-03-02T15:04:05.567+08:00",
                "null": null,
                "number_array": [
                    -9223372036854775808,
                    0,
                    9223372036854775807
                ],
                "number_big_int": 18446744073709552000,
                "number_decimal": 3.14,
                "number_double": 179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
                "number_int": 2147483647,
                "number_long": 9223372036854775807,
                "object": {
                    "key": "value"
                },
                "object_array": [],
                "object_map": {
                    "key": "value"
                },
                "string": "string_value"
            }
        },
        {
            "test-bucket": {
                "boolean": true,
                "boolean_array": [
                    true,
                    false
                ],
                "created": 666028240839735,
                "date_as_string": "2006-01-02T15:04:05.567+08:00",
                "null": null,
                "number_array": [
                    -9223372036854775808,
                    0,
                    9223372036854775807
                ],
                "number_big_int": -18446744073709552000,
                "number_decimal": 3.14,
                "number_double": 0.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005,
                "number_int": -2147483648,
                "number_long": -9223372036854775808,
                "object": {
                    "key": "value"
                },
                "object_array": [
                    {
                        "key": "value1"
                    },
                    {
                        "key": "value2"
                    },
                    {
                        "key": "value3"
                    }
                ],
                "object_map": {
                    "key": "value"
                },
                "string": "string_value"
            }
        }
    ]
INFER `test-bucket` statement will result in:
INFER statement result
	[
        [
            {
                "#docs": 3,
                "$schema": "http://json-schema.org/schema#",
                "Flavor": "",
                "properties": {
                    "boolean": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            false,
                            true
                        ],
                        "type": "boolean"
                    },
                    "boolean_array": {
                        "#docs": 3,
                        "%docs": 100,
                        "items": {
                            "type": "boolean"
                        },
                        "maxItems": 2,
                        "minItems": 0,
                        "samples": [
                            [],
                            [
                                true,
                                false
                            ]
                        ],
                        "type": "array"
                    },
                    "created": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            666028240839735,
                            666028241251904,
                            666028241280339
                        ],
                        "type": "number"
                    },
                    "date_as_string": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            "2006-01-02T15:04:05.567+08:00",
                            "2006-03-02T15:04:05.567+08:00"
                        ],
                        "type": "string"
                    },
                    "null": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            null
                        ],
                        "type": "null"
                    },
                    "number_array": {
                        "#docs": 3,
                        "%docs": 100,
                        "items": {
                            "type": "number"
                        },
                        "maxItems": 3,
                        "minItems": 0,
                        "samples": [
                            [],
                            [
                                -9223372036854775808,
                                0,
                                9223372036854775807
                            ]
                        ],
                        "type": "array"
                    },
                    "number_big_int": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            -18446744073709552000,
                            0,
                            18446744073709552000
                        ],
                        "type": "number"
                    },
                    "number_decimal": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            0,
                            3.14
                        ],
                        "type": "number"
                    },
                    "number_double": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            0,
                            0.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005,
                            179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
                        ],
                        "type": "number"
                    },
                    "number_int": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            -2147483648,
                            0,
                            2147483647
                        ],
                        "type": "number"
                    },
                    "number_long": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            -9223372036854775808,
                            0,
                            9223372036854775807
                        ],
                        "type": "number"
                    },
                    "object": {
                        "#docs": 3,
                        "%docs": 100,
                        "properties": {
                            "key": {
                                "#docs": 3,
                                "%docs": 100,
                                "samples": [
                                    "value"
                                ],
                                "type": "string"
                            }
                        },
                        "samples": [
                            {
                                "key": "value"
                            }
                        ],
                        "type": "object"
                    },
                    "object_array": {
                        "#docs": 3,
                        "%docs": 100,
                        "items": {
                            "#docs": 3,
                            "$schema": "http://json-schema.org/schema#",
                            "Flavor": "",
                            "properties": {
                                "key": {
                                    "type": "string"
                                }
                            },
                            "type": "object"
                        },
                        "maxItems": 3,
                        "minItems": 0,
                        "samples": [
                            [],
                            [
                                {
                                    "key": "value1"
                                },
                                {
                                    "key": "value2"
                                },
                                {
                                    "key": "value3"
                                }
                            ]
                        ],
                        "type": "array"
                    },
                    "object_map": {
                        "#docs": 3,
                        "%docs": 100,
                        "properties": {
                            "key": {
                                "#docs": 3,
                                "%docs": 100,
                                "samples": [
                                    "value"
                                ],
                                "type": "string"
                            }
                        },
                        "samples": [
                            {
                                "key": "value"
                            }
                        ],
                        "type": "object"
                    },
                    "string": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            "string_value",
                            ""
                        ],
                        "type": "string"
                    }
                },
                "type": "object"
            }
        ]
    ]

Let's consider a single attribute for 'number_double' property. The value of this property for the first document equals java.lang.Double.MIN_VALUE, for the second - java.lang.Double.MAX_VALUE, for the third - 0.

...
                    "number_double": {
                        "#docs": 3,
                        "%docs": 100,
                        "samples": [
                            0,
                            0.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005,
                            179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
                        ],
                        "type": "number"
                    }
...


For each identified attribute, the statement returns the following details:

  • #docs: Specifies the number of documents in the sample that contain this attribute.

  • %docs: Specifies the percentage of documents in the sample that contain this attribute.

  • minitems: If the data type is an array, specifies the minimum number of elements (array size).

  • maxitems: If the data type is an array, specifies the maximum number of elements (array size).

  • samples: Displays a list of sample values for the attribute found in the sample population.

  • type: Specifies the identified data type of the attribute.

Thus, we can use type and samples info to infer schema field type.

However, there are some concerns:

  • Couchbase's number type can be mapped to a CDAP int, long, double and decimal. To infer actual CDAP type we can do our best by analyzing samples. 

          Let's suppose all values of some property less than java.lang.Integer.MAX_VALUE, but one equals to java.lang.Long.MAX_VALUE. In the case, when samples do not contain java.lang.Long.MAX_VALUE, we will infer invalid CDAP type.          

  • There is no way to determine if a field is nullable. The proposal is to make all fields nullable and let the user change this manually,

  • INFER does not honor the SELECT query and returns all documents attributes. If we want to filter out schema fields according to the specified query, we have to manually parse the query.

  • INFER statement supported since Couchbase Server 4.5, so we won't be able to support versions [4.0-4.5).


Source Splitter

The proposal is to add "Number of Splits" Source configuration property, which allows specifying the desired number of splits to divide the query into when reading from Couchbase. 

Fewer splits may be created if the query cannot be divided into the desired number of splits.

Also, we can use '0' as the default value for this configuration property and determine the number of splits according to the number of map tasks (controlled by the "mapreduce.job.maps" property):

public List<InputSplit> getSplits(JobContext job) throws IOException {

    ...

    int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
    
    ...


'SELECT COUNT(*)' query can be used in order to get a total number of documents, that will be divided between splits using 'OFFSET' and 'LIMIT'.


Source Properties

Section

User Configuration LabelLabel DescriptionOptionsDefaultVariableUser Widget
GeneralLabelLabel for UI.


textbox

Reference NameUniquely identified name for lineage.

referenceNametextbox

NodesList of nodes to use when connecting to the Couchbase cluster.

nodescsv

BucketCouchbase Bucket name.

buckettextbox

Select FieldsComma-separated list of fields to be read.
*selectFieldstextbox

ConditionsOptional criteria (filters or predicates) that the result documents must satisfy. Corresponds to the WHERE clause in N1QL SELECT statement.

conditionstextbox

Output SchemaSpecifies the schema of the documents.

schemaschema

Number of SplitsDesired number of splits to divide the query into when reading from Couchbase. Fewer splits may be created if the query cannot be divided into the desired number of splits. If the specified value is zero, the plugin will use the number of map tasks as the number of splits.
0numSplitsnumber
CredentialsUsernameUser identity for connecting to the Couchbase.

usernametextbox

PasswordPassword to use to connect to the Couchbase.

passwordpassword
Error HandlingOn Record ErrorHow to handle error in record processing.
  • Skip error
  • Send to error
  • Fail pipeline

Fail pipeline

on-errorradio-group (layout: block)
AdvancedSample SizeSpecifies the number of documents to randomly sample in the bucket when inferring the schema. The default sample size is 1000 documents. If a bucket contains fewer documents than the specified number, then all the documents in the bucket will be used.
1000sampleSizenumber

Max ParallelismMaximum number of CPU cores can be used to process a query. If the specified value is less than zero or greater than the total number of cores in a cluster, the system will use all available cores in the cluster.
0maxParallelismnumber

Scan ConsistencySpecifies the consistency guarantee or constraint for index scanning
  • Not Bounded
  • At Plus
  • Request Plus
  • Statement Plus
Not BoundedscanConsistencyselect

Query TimeoutNumber of seconds to wait before a timeout has occurred on a query. 
600timeoutnumber

Notes:

Source Data Types Mapping

The source requires Output Schema to be set. Based on the schema source will expect a field in each document to be of a specific Couchbase data type.

On Record Error error handling property allows the user to decide whether the pipeline should fail, the record should be skipped, or the record should be sent to the error dataset.

The following table shows what Couchbase data types can be read as CDAP types.

Couchbase Data TypesCDAP Schema Data Type
Boolean
boolean
Number
int, long, double, decimal, string
Stringstring
Array
array
Object

record

The following schema:

{"type":"record","name":"object","fields":[{"name":"inner_field","type":"string"}]}

is used for 'object' field:

{
 "object" : {
        "inner_field" : "val"
    }
}


map

The following schema:

{"type":"map","keys":"string","values":"string"}

is used for 'object' field:

{
 "object" : {
        "inner_field" : "val"
    }
}


Notes:

See: 


Sink Properties

SectionUser Configuration LabelLabel DescriptionOptionsDefaultVariableUser Widget
GeneralLabelLabel for UI.


textbox

Reference NameUniquely identified name for lineage.

referenceNametextbox

NodesList of nodes to use when connecting to the Couchbase cluster.

nodescsv

BucketCouchbase Bucket name.

buckettextbox

Key Field

Allows the user to specify which of the incoming fields should be used as a document identifier.

Identifier is expected to be of type string.



keyFieldinput-field-selector

OperationType of write operation to perform. This can be set to Insert, Replace or Upsert.
  • Insert
  • Replace
  • Upsert

Insert

operationradio-group
CredentialsUsernameUser identity for connecting to the Couchbase.

usernametextbox

PasswordPassword to use to connect to the Couchbase.

passwordpassword
AdvancedBatch SizeSize (in number of records) of the batched writes to the Couchbase bucket. Each write to Couchbase contains some overhead. To maximize bulk write throughput, maximize the amount of data stored per write. Commits of 1 MiB usually provide the best performance. Default value is 100 records.
100batchSizenumber

Notes:

Sink Data Types Mapping

CDAP Schema Data TypeCouchbase Data Types
booleanBoolean
bytesbase64 encoded String
dateString
doubleNumber
decimalNumber
floatNumber
intNumber
longNumber
stringString
timeString
timestampString
arrayArray
record

Object

enumString
map

Object

union

Depends on the actual value.

For example, if it's a union:

["string","int","long"]

and the value is actually a long, the result document will have the field as a Number. If a different record comes in with the value as a string, the result document will end up with a String for that field.

Action Properties


User Configuration Label

Label Description

OptionsDefault

Variable

User Widget
General

Label

Label for UI




textbox

Nodes

List of nodes to use when connecting to the Couchbase cluster.



nodes

csv

Bucket

Couchbase Bucket name.

bucket

textbox

Query

N1QL query to run.



query

textbox
Credentials

Username

User identity for connecting to the Couchbase.



username



Password

Password to use to connect to the Couchbase.



password


Advanced

Max Parallelism

Maximum number of CPU cores can be used to process a query. If the specified value is less than zero or greater than the total number of cores in a cluster, the system will use all available cores in the cluster.


0

maxParallelism



Scan Consistency

Specifies the consistency guarantee or constraint for index scanning

  • Not Bounded
  • At Plus
  • Request Plus
  • Statement Plus


Not Bounded

scanConsistency



Query Timeout

Number of seconds to wait before a timeout has occurred on a query. 
600

timeout



Approach

Create a new maven project in it's own repository.

Pipeline Samples


Releases

Release X.Y.Z

Related Work

Database plugin enhancements