Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
CDAP Schema is a superset of Avro schema. This is because Avro schema only supports string keys where as CDAP schema can have arbitrary types as keys. From Avro 1.8, Date and Time is supported as Avro logical types. CDAP Schema should also support that.
Goals
Goal for this feature is to support Date/Time as CDAP Schema data types. This should also be surfaced in plugins so that plugin developers can use it.
User Stories
- As a pipeline developer, I should be able read date and time as first class types from sources like datasets or database.
As a pipeline developer, I should be able write date and time as first class types to sinks like datasets or database.
- As a plugin developer, I should be able to create structured records with date and time field types.
As a CDAP user, I should be able to explore CDAP datasets that have date and time fields.
- As a CDAP application developer, I should be able to programmatically create dataset schema with date and time fields.
Design
In Avro 1.8, date/time is represented as LogicalType. A logical type is a primitive type with extra attribute `logicalType`. Below table represent avro types and schema supported in 5.1. Please note that other logical avro types: Duration, Decimal are not in scope of this feature.
{
"type": "int",
"logicalType": "date"
}
{ "type": "long", "logicalType": "timestamp-millis" }
Timestamp in micro secs
{ "type": "long", "logicalType": "timestamp-micros" }
{ "type": "int", "logicalType": "time-millis" }
{ "type": "long", "logicalType": "time-micros" }
To support Date and Time in CDAP, we will have to map avro logical types to corresponding jdbc and hive types. Below table represents mapping of logical avro type to corresponding hive and relational database types.
-- (not explorable)
TIME
-- (not explorable)
TIME
Please note that logical type `Time` does not correspond to any of the hive types so datasets with avro type `Time` will not be explorable. For ObjectMappedTable, CDAP supports conversion of Java Pojo to Schema objects. CDAP should support conversion of Date and Timestamp in java class to corresponding CDAP Schema types.
NOTE:
Open Questions
Javascript and Python transform plugins should be able to get Date/Time fields from the context. This is challenging for Javascript transform because it receives a json record. Also javascript does not support micro second level timestamps.
API changes
New Programmatic APIs
Changes in cdap dataset schema to support logical avro types:
Code Block | ||||
---|---|---|---|---|
| ||||
public final class Schema implements Serializable {
private final Type type;
@Nullable
private final LogicalType logicalType;
public enum Type {
NULL(true),
BOOLEAN(true),
...
}
public enum LogicalType {
TIME_MILLIS, //primitive type INT
TIME_MICROS, // primitive type LONG
TIMESTAMP_MILLIS, // primitive type LONG
TIMESTAMP_MICROS, // primitive type LONG
DATE // primitive type INT
}
/**
* Creates a {@link Schema} for the given logical type.
*
* @param type LogicalType of the schema to create.
* @return A {@link Schema} with the given type.
*/
public static Schema of(LogicalType type) {
if (type.equals(LogicalType.DATE)) {
return new Schema(Type.INT, type, null, null, null, null, null, null, null);
} else if (type.equals(LogicalType.TIMESTAMP_MILLIS)) {
return new Schema(Type.LONG, type, null, null, null, null, null, null, null);
}....
}
private Schema(Type type,@Nullable LogicalType logicalType,
@Nullable Set<String> enumValues, // Not null for enum type
@Nullable Schema componentSchema, // Not null for array type
@Nullable Schema keySchema, @Nullable Schema valueSchema, // Not null for map type
@Nullable String recordName, @Nullable Map<String, Field> fieldMap, // Not null for record type
@Nullable List<Schema> unionSchemas) { // Not null for union type
this.type = type;
this.logicalType = logicalType;
...
}
/**
* @return The {@link LogicalType} that this schema represents.
*/
public LogicalType getLogicalType() {
return logicalType;
}
}
|
Api changes in StructuredRecord for getting and setting the date/time fields. Please note that setters will accept both Java7 and Java 8 apis where as getters will only return java 8 apis. If user wants to get fields in java 7 apis, he/she can use StructuredRecord#get() which will return underlying avro field long/int and type cast the object as needed.
language | java |
---|---|
title | StructureRecord.java |
Table of Contents |
---|
Checklist
- User Stories Documented
- User Stories Reviewed
- Design Reviewed
- APIs reviewed
- Release priorities assigned
- Test cases reviewed
- Blog post
Introduction
CDAP Schema is a superset of Avro schema. This is because Avro schema only supports string keys where as CDAP schema can have arbitrary types as keys. From Avro 1.8, Date and Time is supported as Avro logical types. CDAP Schema should also support that.
Goals
Goal for this feature is to support Date/Time as CDAP Schema data types. This should also be surfaced in plugins so that plugin developers can use it.
User Stories
- As a pipeline developer, I should be able read date and time as first class types from sources like datasets or database.
As a pipeline developer, I should be able write date and time as first class types to sinks like datasets or database.
- As a plugin developer, I should be able to create structured records with date and time field types.
As a CDAP user, I should be able to explore CDAP datasets that have date and time fields.
- As a CDAP application developer, I should be able to programmatically create dataset schema with date and time fields.
Design
In Avro 1.8, date/time is represented as LogicalType. A logical type is a primitive type with extra attribute `logicalType`. Below table represent avro types and schema supported in 5.1. Please note that other logical avro types: Duration, Decimal are not in scope of this feature.
Avro Type | Avro Schema | Description |
---|---|---|
Date | {
"type": "int",
"logicalType": "date"
} | where the int stores the number of days from the unix epoch |
Timestamp in milli secs | { "type": "long", "logicalType": "timestamp-millis" } | where the long stores the number of milliseconds from the unix epoch |
Timestamp in micro secs | { "type": "long", "logicalType": "timestamp-micros" } | where the long stores the number of microseconds from the unix epoch |
Time in milli secs | { "type": "int", "logicalType": "time-millis" } | where the int stores the number of milliseconds after midnight |
Time in micro secs | { "type": "long", "logicalType": "time-micros" } | where the long stores the number of microseconds after midnight |
To support Date and Time in CDAP, we will have to map avro logical types to corresponding jdbc and hive types. Below table represents mapping of logical avro type to corresponding hive and relational database types.
Avro Type | Hive Type | JDBC Type | MySQL | Oracle | MS SQL | BigQuery | Spanner | Redshift |
---|---|---|---|---|---|---|---|---|
Date | Date | java.sql.Date | DATE | N/A | DATE | DATE | DATE | DATE |
Timestamp in milli secs | Timestamp | java.sql.Timestamp | TIMESTAMP | DATE/TIMESTAMP | DATETIME | N/A | N/A | N/A |
Timestamp in micro secs | Timestamp | java.sql.Timestamp | TIMESTAMP | DATE/TIMESTAMP | DATETIME2 | TIMESTAMP | TIMESTAMP | TIMESTAMP/TIMESTAMPZ |
Time in milli secs | -- (not explorable) | java.sql.Time | N/A | N/A | TIME | N/A | N/A | N/A |
Time in micro secs | -- (not explorable) | java.sql.Timestamp | N/A | N/A | TIME | TIME | N/A | N/A |
Please note that logical type `Time` does not correspond to any of the hive types so datasets with avro type `Time` will not be explorable. For ObjectMappedTable, CDAP supports conversion of Java Pojo to Schema objects. CDAP should support conversion of Date and Timestamp in java class to corresponding CDAP Schema types.
NOTE:
- Since streams are deprecated, we will not be supporting Date/Time types for streams
Usage in plugins:
CDAP logical types support two different resolutions, millis and micros, for timestamp and time logical types. This means plugins should be able to support both the time resolutions in input and output schemas. However, while generating the plugin schemas for databases like mysql, bigquery, plugins would consider more granular time unit (micros) wherever possible. This is because many of the datetime/timestamp types in relational and cloud databases are more granular than milliseconds.
New Programmatic APIs
Changes in cdap dataset schema to support logical avro types:
Code Block | ||||
---|---|---|---|---|
| ||||
public final class Schema implements Serializable {
private final Type type;
@Nullable
private final LogicalType logicalType;
public enum Type {
NULL(true),
BOOLEAN(true),
...
}
public enum LogicalType {
TIME_MILLIS, //primitive type INT
TIME_MICROS, // primitive type LONG
TIMESTAMP_MILLIS, // primitive type LONG
TIMESTAMP_MICROS, // primitive type LONG
DATE // primitive type INT
}
/**
* Creates a {@link Schema} for the given logical type.
*
* @param type LogicalType of the schema to create.
* @return A {@link Schema} with the given type.
*/
public static Schema of(LogicalType type) {
if (type.equals(LogicalType.DATE)) {
return new Schema(Type.INT, type, null, null, null, null, null, null, null);
} else if (type.equals(LogicalType.TIMESTAMP_MILLIS)) {
return new Schema(Type.LONG, type, null, null, null, null, null, null, null);
}....
}
private Schema(Type type,@Nullable LogicalType logicalType,
@Nullable Set<String> enumValues, // Not null for enum type
@Nullable Schema componentSchema, // Not null for array type
@Nullable Schema keySchema, @Nullable Schema valueSchema, // Not null for map type
@Nullable String recordName, @Nullable Map<String, Field> fieldMap, // Not null for record type
@Nullable List<Schema> unionSchemas) { // Not null for union type
this.type = type;
this.logicalType = logicalType;
...
}
/**
* @return The {@link LogicalType} that this schema represents.
*/
public LogicalType getLogicalType() {
return logicalType;
}
}
|
Api changes in StructuredRecord for getting and setting the date/time fields. Please note that setters will accept both Java7 and Java 8 apis where as getters will only return java 8 apis. If user wants to get fields in java 7 apis, he/she can use StructuredRecord#get() which will return underlying avro field long/int and type cast the object as needed.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Get the value of date field in the record. * @param fieldName field to get. * @return Java 8 {@link LocalDate}, to get Java 7 {@link Date}, please use get method */ public LocalDate getDate(String fieldName) { } /** * SetGet the timestampvalue of time field toin the givenrecord. value * @param fieldName Name of the field to setget. * @param@return valueJava Value8 for the field * @return This builder * @throws UnexpectedFormatException if the field is not in the schema, or the field is not nullable but a null * value is given */ public Builder setTimestamp(String fieldName, @Nullable Object value) { } |
Example of adding an avro logical type field in CDAP Schema:
Code Block | ||||
---|---|---|---|---|
| ||||
public class SchemaExample {
Schema schema = Schema.recordOf(
"logicalTypeRecord",
Schema.Field.of("id", Schema.of(Schema.Type.STRING)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("date", Schema.of(Schema.LogicalType.DATE)),
Schema.Field.of("timestamp", Schema.of(Schema.LogicalType.TIMESTAMP_MILLIS)));
}} |
Example usage in StructuredRecord:
Code Block | ||||
---|---|---|---|---|
| ||||
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
// java 8 Date support
builder.setDate(dateField1, LocalDate.now());
// java 7 Date support
builder.setDate(dateField2, new Date()); |
Schema for the plugins in pipeline spec will be same as avro logical type schema, below are example of how each avro logical type schema will be represented in pipeline spec:
Code Block |
---|
Date: { "properties": { "schema": { "type": "record", "name": "etlSchemaBody", "fields": [ { "name": "offset", "type": "long" }, { "name": "body", "type": "string" }, { "name": "Date", "type": { "type": "int", "logicalType": "date" } }, { "name": "map", "type": { "type": "map", "keys": "long", "values": "float" } } ] } } .......... } Timestamp{@link LocalTime}, to get Java 7 {@link Date}, please use get method */ public LocalTime getTime(String fieldName) { } /** * Get the value of date field in the record. UTC by default. * @param fieldName field to get. * @return Java 8 {@link Instant}, to get Java 7 {@link Date}, please use get method */ public ZonedDateTime getTimestamp(String fieldName) { } /** * Get the value of date field in the record. * @param fieldName field to get. * @return Java 8 {@link Instant}, to get Java 7 {@link Date}, please use get method */ public ZonedDateTime getTimestamp(String fieldName, ZoneId zoneId) { } /** * Set the date field to the given value * @param fieldName Name of the field to set * @param value Value for the field * @return This builder * @throws UnexpectedFormatException if the field is not in the schema, or the field is not nullable but a null * value is given */ public Builder setDate(String fieldName, @Nullable LocalDate value) { } /** * Set the time field to the given value * @param fieldName Name of the field to set * @param value Value for the field * @return This builder * @throws UnexpectedFormatException if the field is not in the schema, or the field is not nullable but a null * value is given */ public Builder setTime(String fieldName, @Nullable LocalTime value) { } /** * Set the timestamp field to the given value * @param fieldName Name of the field to set * @param value Value for the field * @return This builder * @throws UnexpectedFormatException if the field is not in the schema, or the field is not nullable but a null * value is given */ public Builder setTimestamp(String fieldName, @Nullable ZonedDateTime value) { } |
Example of adding an avro logical type field in CDAP Schema:
Code Block | ||||
---|---|---|---|---|
| ||||
public class SchemaExample {
Schema schema = Schema.recordOf(
"logicalTypeRecord",
Schema.Field.of("id", Schema.of(Schema.Type.STRING)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("date", Schema.of(Schema.LogicalType.DATE)),
Schema.Field.of("timestamp", Schema.of(Schema.LogicalType.TIMESTAMP_MILLIS)));
}} |
Example usage in StructuredRecord:
Code Block | ||||
---|---|---|---|---|
| ||||
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
builder.setDate(dateField1, LocalDate.now()); |
Schema for the plugins in pipeline spec will be same as avro logical type schema, below are example of how each avro logical type schema will be represented in pipeline spec:
Code Block |
---|
Date: { "properties": { "schema": { "type": "record", "name": "etlSchemaBody", "fields": [ { "name": "offset", "type": "long" }, { "name": "body", "type": "string" }, { "name": "TimestampDate", "type": { "type": "longint", "logicalType": "timestamp-microsdate" } }, { "name": "map", "type": { "type": "map", "keys": "long", "values": "float" } } ] } } .......... } TimeTimestamp micros: { "properties": { "schema": { "type": "record", "name": "etlSchemaBody", "fields": [ { "name": "offset", "type": "long" }, { "name": "body", "type": "string" }, { "name": "TimeTimestamp-Micro", "type": { "type": "long", "logicalType": "timetimestamp-micros" } }, { "name": "map", "type": { "type": "map", "keys": "long", "values": "float" } } ] } } ....... } |
Limitation
- We would not support table creation based on Date/Time type. This is because each database has different date and time types and it is hard to know at plugin level which specific type is needed for the table creation. Also while running the pipeline for data transformation/data migration, it is not a common usecase to create table on each pipeline run.
}
.......
}
Timestamp millis:
{
"properties": {
"schema": {
"type": "record",
"name": "etlSchemaBody",
"fields": [
{
"name": "offset",
"type": "long"
},
{
"name": "body",
"type": "string"
},
{
"name": "Timestamp-Millis",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "map",
"type": {
"type": "map",
"keys": "long",
"values": "float"
}
}
]
}
}
.......
}
Time in micros:
{
"properties": {
"schema": {
"type": "record",
"name": "etlSchemaBody",
"fields": [
{
"name": "offset",
"type": "long"
},
{
"name": "body",
"type": "string"
},
{
"name": "Time",
"type": {
"type": "long",
"logicalType": "time-micros"
}
},
{
"name": "map",
"type": {
"type": "map",
"keys": "long",
"values": "float"
}
}
]
}
}
.......
}
Time in millis:
{
"properties": {
"schema": {
"type": "record",
"name": "etlSchemaBody",
"fields": [
{
"name": "offset",
"type": "long"
},
{
"name": "body",
"type": "string"
},
{
"name": "Time",
"type": {
"type": "int",
"logicalType": "time-millis"
}
},
{
"name": "map",
"type": {
"type": "map",
"keys": "long",
"values": "float"
}
}
]
}
}
.......
} |
Limitation
- We would not support table creation with date/time types from database plugins. This is because each database has different date and time types and it is hard to know at database plugin level which specific type is needed for the table creation. Also while running the pipeline for data transformation/data migration, it is not a common usecase to create table on each pipeline run.
Date/Time support in Data Prep
Below is the list of existing directives supported for date/time conversion:
Directive | Input | Output | Supported |
---|---|---|---|
parse-as-date | String | Timestamp | yes |
parse-as-simple-date | String | Timestamp | yes |
diff-date | Timestamp | long(millis) | yes |
format-date | Timestamp | String | yes |
Data prep does not have any directive to convert data with long to Timestamp. This conversion may be needed if data is stored as csv or json. To parse this data, we will add a new directive `parse-timestamp-to-date`
Code Block |
---|
parse-timestamp <column_name> <time-unit> |
New Directive | Supported input column type | Supported Output column type | Description |
---|---|---|---|
parse-timestamp | long (timestamp in millis) | Timestamp | Convert long representing unix timestamp in millis to Timestamp |
List of directives not supported for columns with date/time types:
- Find and replace
- find-and-replace
- Extract
- extract-regex-groups
- split-to-columns
- cut-character
- Explode:
- split-to-rows
- flatten
- Change type
- set-type
- Filter
- Custom Transform
UI Impact or Changes
Below is the mapping of backend types to UI types.
Backend schema type | UI type |
---|---|
date | Date |
time-micros | Time |
timestamp-micros | Timestamp |
time-millis | int |
timestamp-millis | long |
Column types in data prep UI will be same as types in pipeline UI.
Support for Datetime in Dataprep and in Schema
Jira Legacy | ||||||
---|---|---|---|---|---|---|
|
Test Scenarios
Please note that the tests should be conducted for mysql, ms sql, oracle, bigquery and redshift databases.
Test ID | Test Description | Expected Results |
---|---|---|
1. | Using cdap pipeline, read records with Date type from a mysql table using DatabaseSource plugin | The correct date should be read from the mysql table |
2. | Using cdap pipeline, write records with Date type to a mysql table using DatabaseSink plugin | The correct date should be written to the mysql table |
3. | Using cdap pipeline, read records with Timestamp type from a mysql table using DatabaseSource plugin | The correct timestamp should be read from the mysql table |
4. | Using cdap pipeline, write records with Timestamp type to a mysql table using DatabaseSink plugin | The correct timestamp should be written to the mysql table |
5. | Using cdap pipeline, read records with Timestamp in microseconds from a mysql table using DatabaseSource plugin | The correct timestamp along with micro seconds should be read from the mysql table |
6. | Using cdap pipeline, write records with Timestamp in microseconds to a mysql table using DatabaseSink plugin | The correct timestamp along with micro seconds should be written to the mysql table |
7. | Using cdap pipeline, read records with Time type from a mysql table using DatabaseSource plugin | The correct time should be read from the mysql table |
8. | Using cdap pipeline, write records with Time type to a mysql table using DatabaseSink plugin | The correct time should be written to the mysql table |
9. | Using cdap pipeline, write timestamp to cdap Table dataset | The correct timestamp should be written to cdap table dataset |
10. | Explore cdap table dataset which has timestamp in its schema | Correct timestamp values should be visible |
11. | Using cdap pipeline, write date to cdap Table dataset | The correct date should be written to cdap table dataset |
12. | Explore cdap table dataset which has date in its schema | Correct date values should be visible |
13. | Using cdap pipeline, write timestamp to cdap avro partitioned fileset | The correct timestamp should be written to cdap fileset |
14. | Explore cdap fileset dataset which has timestamp in its schema | Correct timestamp values should be visible |
15. | Using cdap pipeline, write date to cdap avro partitioned fileset | The correct date should be written to cdap fileset |
16. | Explore cdap fileset dataset which has date in its schema | Correct date values should be visible |
17. | Using cdap pipeline, write timestamp to cdap parquet partitioned fileset | The correct timestamp should be written to cdap fileset |
18. | Explore cdap parquet fileset dataset which has timestamp in its schema | Correct timestamp values should be visible |
19. | Using cdap pipeline, write date to cdap parquet partitioned fileset | The correct timestamp should be written to cdap fileset |
20. | Explore cdap parquet fileset dataset which has date in its schema | Correct date values should be visible |
Releases
Targeted release CDAP 5.1
Future work
Support other logical Avro types like Decimal and Duration.