Table of Contents |
---|
Introduction
...
For each identified attribute, the statement returns the following details:
#docs: Specifies the number of documents in the sample that contain this attribute.
%docs: Specifies the percentage of documents in the sample that contain this attribute.
minitems: If the data type is an array, specifies the minimum number of elements (array size).
maxitems: If the data type is an array, specifies the maximum number of elements (array size).
samples: Displays a list of sample values for the attribute found in the sample population.
type: Specifies the identified data type of the attribute.
Thus, we can use type and samples info to infer schema field type.
However, there are some concerns:
Couchbase's number type can be mapped to a CDAP int, long, double and decimal. To infer actual CDAP type we can do our best by analyzing samples.
Let's suppose all values of some property less than java.lang.Integer.MAX_VALUE, but one equals to java.lang.Long.MAX_VALUE. In the case, when samples do not contain java.lang.Long.MAX_VALUE, we will infer invalid CDAP type.
There is no way to determine if a field is nullable. The proposal is to make all fields nullable and let the user change this manually,
INFER does not honor the SELECT query and returns all documents attributes. If we want to filter out schema fields according to the specified query, we have to manually parse the query.
INFER statement supported since Couchbase Server 4.5, so we won't be able to support versions [4.0-4.5).
Source Splitter
The proposal is to add "Number of Splits" Source configuration property, which allows specifying the desired number of splits to divide the query into when reading from Couchbase.
Fewer splits may be created if the query cannot be divided into the desired number of splits.
Also, we can use '0' as the default value for this configuration property and determine the number of splits according to the number of map tasks (controlled by the "mapreduce.job.maps" property):
Code Block |
---|
public List<InputSplit> getSplits(JobContext job) throws IOException {
...
int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
... |
'SELECT COUNT(*)' query can be used in order to get a total number of documents, that will be divided between splits using 'OFFSET' and 'LIMIT'.
Source Properties
Section | User Configuration Label | Label Description | Options | Default | Variable | User Widget |
---|---|---|---|---|---|---|
General | Label | Label for UI. | textbox | |||
Reference Name | Uniquely identified name for lineage. | referenceName | textbox | |||
Nodes | List of nodes to use when connecting to the Couchbase cluster. | nodes | csv | |||
Bucket | Couchbase Bucket name. | bucket | textbox | |||
Select Fields | Comma-separated list of fields to be read. | * | selectFields | textbox | ||
Conditions | Optional criteria (filters or predicates) that the result documents must satisfy. Corresponds to the WHERE clause in N1QL SELECT statement. | conditions | textbox | |||
Output Schema | Specifies the schema of the documents. | schema | schema | |||
Credentials | Username | User identity for connecting to the Couchbase. | username | textbox | ||
Password | Password to use to connect to the Couchbase. | password | password | |||
Error Handling | On Record Error | How to handle error in record processing. |
| Fail pipeline | on-error | radio-group (layout: block) |
Advanced | Max Parallelism | Maximum number of CPU cores can be used to process a query. If the specified value is less than zero or greater than the total number of cores in a cluster, the system will use all available cores in the cluster. | 0 | maxParallelism | number | |
Scan Consistency | Specifies the consistency guarantee or constraint for index scanning |
| Not Bounded | scanConsistency | select | |
Query Timeout | Number of seconds to wait before a timeout has occurred on a query. | 600 | timeout | number |
...
The source requires Output Schema to be set. Based on the schema source will expect a field in each document to be of a specific Couchbase data type.
On Record Error error handling property allows the user to decide whether the pipeline should fail, the record should be skipped, or the record should be sent to the error dataset.
...
Section | User Configuration Label | Label Description | Options | Default | Variable | User Widget |
---|---|---|---|---|---|---|
General | Label | Label for UI. | textbox | |||
Reference Name | Uniquely identified name for lineage. | referenceName | textbox | |||
Nodes | List of nodes to use when connecting to the Couchbase cluster. | nodes | csv | |||
Bucket | Couchbase Bucket name. | bucket | textbox | |||
Key Field | Allows the user to specify which of the incoming fields should be used as a document identifier. Identifier is expected to be of type string. | keyField | input-field-selector | |||
Operation | Type of write operation to perform. This can be set to Insert, Replace or Upsert. |
| Insert | operation | radio-group | |
Credentials | Username | User identity for connecting to the Couchbase. | username | textbox | ||
Password | Password to use to connect to the Couchbase. | password | password | |||
Advanced | Batch Size | Size (in number of records) of the batched writes to the Couchbase bucket. Each write to Couchbase contains some overhead. To maximize bulk write throughput, maximize the amount of data stored per write. Commits of 1 MiB usually provide the best performance. Default value is 100 records. | 100 | batchSize | number |
...