Couchbase database plugin
Introduction
A separate database plugin to support Couchbase-specific features and configurations.
Use-Case
- Users can choose and install Couchbase source and sink plugins.
- Users should see Couchbase logo on plugin configuration page for better experience.
- Users should get relevant information from the tool tip:
- The tool tip should describe accurately what each field is used for.
- Users should not have to specify any redundant configuration
- Users should get field level lineage for the source and sink that is being used.
- Reference documentation should be updated to account for the changes.
- The source code for Couchbase database plugin should be placed in repo under data-integrations org.
- The data pipeline using source and sink plugins should run on both mapreduce and spark engines.
User Stories
- User should be able to install Couchbase specific database source and sink plugins from the Hub
- Users should have each tool tip accurately describe what each field does
- Users should get field level lineage information for the Couchbase source and sink
- Users should be able to setup a pipeline avoiding specifying redundant information
- Users should get updated reference document for Couchbase source and sink
- Users should be able to read all the DB types
Plugin Type
- Batch Source
- Batch Sink
- Real-time Source
- Real-time Sink
- Action
- Post-Run Action
- Aggregate
- Join
- Spark Model
- Spark Compute
Couchbase Overview
Couchbase Server is an open-source, distributed, NoSQL document-oriented engagement database. It exposes a fast key-value store with managed cache for sub-millisecond data operations, purpose-built indexers for fast queries and a powerful query engine for executing SQL-like queries. Sub-millisecond data operations are provided by powerful services for querying and indexing, and by a feature-rich, document-oriented query-language, N1QL. Multiple instances of Couchbase Server can be combined into a single cluster. Couchbase Server stores data as items. Each item consists of a key, by which the item is referenced; and an associated value, which must be either binary or a JSON document. Items are stored in named Buckets.
The Couchbase Data Model
The Couchbase data model is based on JSON, which provides a simple, lightweight, human-readable notation. It supports basic data types, such as numbers and strings; and complex types, such as embedded documents and arrays. Couchbase Server does not enforce uniformity: document-structures can vary.
N1QL Query Language
N1QL Query Language embraces the JSON document model and uses SQL-like syntax. In N1QL, you operate on JSON documents, and the result of your operation is another JSON document.
A basic N1QL query has the following parts:
- SELECT — The fields of each document to return.
- FROM — The data bucket in which to look.
- WHERE — The conditions that the document must satisfy.
Here’s an example of a basic N1QL query and the JSON document it returns. The query asks for the country that is associated with the airline Excel Airways:
SELECT country FROM `travel-sample` WHERE name = "Excel Airways";
Note that for all identifiers (bucket names) that contain a hyphen character, the name is needed to be enclosed with backtick (`) characters.
The results:
{ "requestID": "9e1cd084-f45e-4059-9e7a-edec30f60dd2", "signature": { "country": "json" }, "results": [ { "country": "United Kingdom" } ], "status": "success", "metrics": { "elapsedTime": "7.42097249s", "executionTime": "7.420925841s", "resultCount": 1, "resultSize": 51 } }
Design Tips
Couchbase Java SDK reference: https://docs.couchbase.com/java-sdk/2.7/start-using-sdk.html
Design
The suggestion is to create a new maven project in it's own repository: https://github.com/data-integrations/couchbase
Although Couchbase has JDBC drivers provided by Simba and cdata, they are paid products. So, it seems that a better solution is to use the Official Couchbase Java SDK with the Apache-2.0 license.
Compatibility
The suggestion is to use Java SDK 2.7 which is compatible with Couchbase Server versions 4.0-4.5, 4.6, 5.0-5.5, 6.0.
Couchbase Server 3.x can not be supported.
Output schema inference
Couchbase Server 4.5 introduces INFER, a N1QL statement that infers the metadata of documents. This can be used to infer the Output Schema.
For sample `test-bucket` bucket, which contains 3 documents as below:
INFER `test-bucket` statement will result in:
Let's consider a single attribute for 'number_double' property. The value of this property for the first document equals java.lang.Double.MIN_VALUE, for the second - java.lang.Double.MAX_VALUE, for the third - 0.
... "number_double": { "#docs": 3, "%docs": 100, "samples": [ 0, 0.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005, 179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 ], "type": "number" } ...
For each identified attribute, the statement returns the following details:
#docs: Specifies the number of documents in the sample that contain this attribute.
%docs: Specifies the percentage of documents in the sample that contain this attribute.
minitems: If the data type is an array, specifies the minimum number of elements (array size).
maxitems: If the data type is an array, specifies the maximum number of elements (array size).
samples: Displays a list of sample values for the attribute found in the sample population.
type: Specifies the identified data type of the attribute.
Thus, we can use type and samples info to infer schema field type.
However, there are some concerns:
Couchbase's number type can be mapped to a CDAP int, long, double and decimal. To infer actual CDAP type we can do our best by analyzing samples.
Let's suppose all values of some property less than java.lang.Integer.MAX_VALUE, but one equals to java.lang.Long.MAX_VALUE. In the case, when samples do not contain java.lang.Long.MAX_VALUE, we will infer invalid CDAP type.
There is no way to determine if a field is nullable. The proposal is to make all fields nullable and let the user change this manually,
INFER does not honor the SELECT query and returns all documents attributes. If we want to filter out schema fields according to the specified query, we have to manually parse the query.
INFER statement supported since Couchbase Server 4.5, so we won't be able to support versions [4.0-4.5).
Source Splitter
The proposal is to add "Number of Splits" Source configuration property, which allows specifying the desired number of splits to divide the query into when reading from Couchbase.
Fewer splits may be created if the query cannot be divided into the desired number of splits.
Also, we can use '0' as the default value for this configuration property and determine the number of splits according to the number of map tasks (controlled by the "mapreduce.job.maps" property):
public List<InputSplit> getSplits(JobContext job) throws IOException { ... int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); ...
'SELECT COUNT(*)' query can be used in order to get a total number of documents, that will be divided between splits using 'OFFSET' and 'LIMIT'.
Source Properties
Section | User Configuration Label | Label Description | Options | Default | Variable | User Widget |
---|---|---|---|---|---|---|
General | Label | Label for UI. | textbox | |||
Reference Name | Uniquely identified name for lineage. | referenceName | textbox | |||
Nodes | List of nodes to use when connecting to the Couchbase cluster. | nodes | csv | |||
Bucket | Couchbase Bucket name. | bucket | textbox | |||
Select Fields | Comma-separated list of fields to be read. | * | selectFields | textbox | ||
Conditions | Optional criteria (filters or predicates) that the result documents must satisfy. Corresponds to the WHERE clause in N1QL SELECT statement. | conditions | textbox | |||
Output Schema | Specifies the schema of the documents. | schema | schema | |||
Number of Splits | Desired number of splits to divide the query into when reading from Couchbase. Fewer splits may be created if the query cannot be divided into the desired number of splits. If the specified value is zero, the plugin will use the number of map tasks as the number of splits. | 0 | numSplits | number | ||
Credentials | Username | User identity for connecting to the Couchbase. | username | textbox | ||
Password | Password to use to connect to the Couchbase. | password | password | |||
Error Handling | On Record Error | How to handle error in record processing. |
| Fail pipeline | on-error | radio-group (layout: block) |
Advanced | Sample Size | Specifies the number of documents to randomly sample in the bucket when inferring the schema. The default sample size is 1000 documents. If a bucket contains fewer documents than the specified number, then all the documents in the bucket will be used. | 1000 | sampleSize | number | |
Max Parallelism | Maximum number of CPU cores can be used to process a query. If the specified value is less than zero or greater than the total number of cores in a cluster, the system will use all available cores in the cluster. | 0 | maxParallelism | number | ||
Scan Consistency | Specifies the consistency guarantee or constraint for index scanning |
| Not Bounded | scanConsistency | select | |
Query Timeout | Number of seconds to wait before a timeout has occurred on a query. | 600 | timeout | number |
Notes:
Couchbase Server 4.5 introduces INFER, a N1QL statement that infers the metadata of documents. This can be used to infer the Output Schema.
- Query Workbench, native UI tool, allows specifying query preferences such as Max Parallelism, Scan Consistency, and Query Timeout.
- For more information about the 'Scan Consistency' configuration option, see N1QL REST API.
Source Data Types Mapping
The source requires Output Schema to be set. Based on the schema source will expect a field in each document to be of a specific Couchbase data type.
On Record Error error handling property allows the user to decide whether the pipeline should fail, the record should be skipped, or the record should be sent to the error dataset.
The following table shows what Couchbase data types can be read as CDAP types.
Couchbase Data Types | CDAP Schema Data Type |
---|---|
Boolean | boolean |
Number | int, long, double, decimal, string |
String | string |
Array | array |
Object | record The following schema: {"type":"record","name":"object","fields":[{"name":"inner_field","type":"string"}]} is used for 'object' field: { "object" : { "inner_field" : "val" } } map The following schema: {"type":"map","keys":"string","values":"string"} is used for 'object' field: { "object" : { "inner_field" : "val" } } |
Notes:
See:
- https://docs.couchbase.com/server/6.0/n1ql/n1ql-language-reference/datatypes.html
- https://docs.couchbase.com/server/6.5/n1ql/n1ql-language-reference/literals.html
- https://docs.couchbase.com/java-sdk/current/document-operations.html#java-formats-non-json
- https://docs.couchbase.com/sdk-api/couchbase-java-client-2.6.0/index.html?com/couchbase/client/java/document/json/JsonObject.html
Sink Properties
Section | User Configuration Label | Label Description | Options | Default | Variable | User Widget |
---|---|---|---|---|---|---|
General | Label | Label for UI. | textbox | |||
Reference Name | Uniquely identified name for lineage. | referenceName | textbox | |||
Nodes | List of nodes to use when connecting to the Couchbase cluster. | nodes | csv | |||
Bucket | Couchbase Bucket name. | bucket | textbox | |||
Key Field | Allows the user to specify which of the incoming fields should be used as a document identifier. Identifier is expected to be of type string. | keyField | input-field-selector | |||
Operation | Type of write operation to perform. This can be set to Insert, Replace or Upsert. |
| Insert | operation | radio-group | |
Credentials | Username | User identity for connecting to the Couchbase. | username | textbox | ||
Password | Password to use to connect to the Couchbase. | password | password | |||
Advanced | Batch Size | Size (in number of records) of the batched writes to the Couchbase bucket. Each write to Couchbase contains some overhead. To maximize bulk write throughput, maximize the amount of data stored per write. Commits of 1 MiB usually provide the best performance. Default value is 100 records. | 100 | batchSize | number |
Notes:
The ID of the document is mandatory in Couchbase.
- For more information about batching, see Batching Operations.
Sink Data Types Mapping
CDAP Schema Data Type | Couchbase Data Types |
---|---|
boolean | Boolean |
bytes | base64 encoded String |
date | String |
double | Number |
decimal | Number |
float | Number |
int | Number |
long | Number |
string | String |
time | String |
timestamp | String |
array | Array |
record | Object |
enum | String |
map | Object |
union | Depends on the actual value. For example, if it's a union: ["string","int","long"] and the value is actually a long, the result document will have the field as a Number. If a different record comes in with the value as a string, the result document will end up with a String for that field. |
Action Properties
User Configuration Label | Label Description | Options | Default | Variable | User Widget | |
---|---|---|---|---|---|---|
General | Label | Label for UI | textbox | |||
Nodes | List of nodes to use when connecting to the Couchbase cluster. | nodes | csv | |||
Bucket | Couchbase Bucket name. | bucket | textbox | |||
Query | N1QL query to run. | query | textbox | |||
Credentials | Username | User identity for connecting to the Couchbase. | username | |||
Password | Password to use to connect to the Couchbase. | password | ||||
Advanced | Max Parallelism | Maximum number of CPU cores can be used to process a query. If the specified value is less than zero or greater than the total number of cores in a cluster, the system will use all available cores in the cluster. | 0 | maxParallelism | ||
Scan Consistency | Specifies the consistency guarantee or constraint for index scanning |
| Not Bounded | scanConsistency | ||
Query Timeout | Number of seconds to wait before a timeout has occurred on a query. | 600 | timeout |
Approach
Create a new maven project in it's own repository.