...
- Users can choose and install Neo4j source and sink plugins.
- Users should see Neo4j logo on plugin configuration page for better experience.
- Users should get relevant information from the tool tip:
- The tool tip should describe accurately what each field is used for.
- Users should not have to specify any redundant configuration.
- Users should get field level lineage for the source and sink that is being used.
- Reference documentation should be updated to account for the changes.
- The source code for Neo4j database plugin should be placed in repo under data-integrations.org.
- The data pipeline using source and sink plugins should run on both mapreduce and spark engines.
User Storie
- User should be able to install Neo4j specific database source and sink plugins from the Hub.
- Users should have each tool tip accurately describe what each field does.
- Users should get field level lineage information for the Neo4j source and sink.
- Users should be able to setup a pipeline avoiding specifying redundant information.
- Users should get updated reference document for Neo4j source and sink.
- Users should be able to read all the DB types.
Plugin Type
- Batch Source
- Batch Sink
- Real-time Source
- Real-time Sink
- Action
- Post-Run Action
- Aggregate
- Join
- Spark Model
- Spark Compute
Design Tips
- Reference to the Neo4j driver manual: https://neo4j.com/docs/driver-manual/1.7/
- Reference to the Neo4j jdbc driver (it is Community-Contributed Drivers): https://neo4j.com/developer/java-third-party/
- Reference to Cypher Query Language manual: https://neo4j.com/docs/cypher-manual/current/
Design
Neo4j Overview
Neo4j is a graph database management system with native graph storage and processing. In Neo4j, everything is stored in the form of an edge, node, or attribute. Each node and edge can have any number of attributes. Both nodes and edges can be labelled. Labels can be used to narrow searches.
Cypher Query Language
Cypher is a declarative graph query language that allows for expressive and efficient querying and updating of the graph.
Cypher is inspired by a number of different approaches and builds on established practices for expressive querying. Many of the keywords, such as WHERE
and ORDER BY
, are inspired by SQL. Pattern matching borrows expression approaches from SPARQL. Some of the list semantics are borrowed from languages such as Haskell and Python.
Here are a few clauses used to read from the graph:
MATCH
: The graph pattern to match. This is the most common way to get data from the graph.
WHERE
: Not a clause in its own right, but rather part of MATCH
, OPTIONAL MATCH
and WITH
. Adds constraints to a pattern, or filters the intermediate result passing through WITH
.
RETURN
: What to return.
Here’s an example of simple Cypher Query:
Code Block |
---|
MATCH (n) RETURN n |
Example of Neo4j data
Neo4j database contains information about Persons and Movies and relation between them.
For getting information what movies related with person 'Meg Ryan' can be used nex CQL query:
Code Block |
---|
MATCH (person:Person {name: "Meg Ryan"})-[rel]-(movie) RETURN person, rel, movie |
Result of this query will be next:
...
Image Removed
...
"person"
...
"rel"
...
"movie"
...
{"name":"Meg Ryan","born":1961}
...
{"roles":["DeDe","Angelica Graynamore","Patricia Graynamore"]}
...
Fraud detection use case
Source: Neo4j website
Traditional fraud prevention measures focus on discrete data points such as specific accounts, individuals, devices or IP addresses. However, today’s sophisticated fraudsters escape detection by forming fraud rings comprised of stolen and synthetic identities. To uncover such fraud rings, it is essential to look beyond individual data points to the connections that link them.
No fraud prevention measures are perfect, but by looking beyond individual data points to the connections that link them your efforts significantly improve. Neo4j uncovers difficult-to-detect patterns that far outstrip the power of a relational database.
Enterprise organizations use Neo4j to augment their existing fraud detection capabilities to combat a variety of financial crimes including first-party bank fraud, credit card fraud, ecommerce fraud, insurance fraud and money laundering – and all in real time.
User Storie
- User should be able to install Neo4j specific database source and sink plugins from the Hub.
- Users should have each tool tip accurately describe what each field does.
- Users should get field level lineage information for the Neo4j source and sink.
- Users should be able to setup a pipeline avoiding specifying redundant information.
- Users should get updated reference document for Neo4j source and sink.
- Users should be able to read all the DB types.
Plugin Type
- Batch Source
- Batch Sink
- Real-time Source
- Real-time Sink
- Action
- Post-Run Action
- Aggregate
- Join
- Spark Model
- Spark Compute
Design Tips
- Reference to the Neo4j driver manual: https://neo4j.com/docs/driver-manual/1.7/
- Reference to the Neo4j jdbc driver (it is Community-Contributed Drivers): https://neo4j.com/developer/java-third-party/
- Reference to Cypher Query Language manual: https://neo4j.com/docs/cypher-manual/current/
Design
Neo4j Overview
Neo4j is a graph database management system with native graph storage and processing. In Neo4j, everything is stored in the form of an edge, node, or attribute. Each node and edge can have any number of attributes. Both nodes and edges can be labelled. Labels can be used to narrow searches.
Cypher Query Language
Cypher is a declarative graph query language that allows for expressive and efficient querying and updating of the graph.
Cypher is inspired by a number of different approaches and builds on established practices for expressive querying. Many of the keywords, such as WHERE
and ORDER BY
, are inspired by SQL. Pattern matching borrows expression approaches from SPARQL. Some of the list semantics are borrowed from languages such as Haskell and Python.
Here are a few clauses used to read from the graph:
MATCH
: The graph pattern to match. This is the most common way to get data from the graph.
WHERE
: Not a clause in its own right, but rather part of MATCH
, OPTIONAL MATCH
and WITH
. Adds constraints to a pattern, or filters the intermediate result passing through WITH
.
RETURN
: What to return.
Here’s an example of simple Cypher Query:
Code Block |
---|
MATCH (n) RETURN n |
Example of Neo4j data
Neo4j database contains information about Persons and Movies and relation between them.
For getting information what movies related with person 'Meg Ryan' can be used next CQL query:
Code Block |
---|
MATCH (person:Person {name: "Meg Ryan"})-[rel]-(movie) RETURN person, rel, movie |
Result of this query will be next:
Graf view | Text view |
---|
Image Added | "person" | "rel" | "movie" |
{"name":"Meg Ryan","born":1961} | {"roles":["DeDe","Angelica Graynamore","Patricia Graynamore"]} | {"title":"Joe Versus the Volcano", "tagline":"A story of love, lava andburning desire.", "released":1990} |
{"name":"Meg Ryan","born":1961} | {"roles":["Sally Albright"]} | {"title":"When Harry Met Sally", "tagline":"Can two friends sleep toget her and still love each other in the morning?", "released":1998} |
{"name":"Meg Ryan","born":1961} | {"roles":["Kathleen Kelly"]} | {"title":"You've Got Mail", "tagline":"At odds in life... in love on-line.", "released":1998} |
{"name":"Meg Ryan","born":1961} | {"roles":["Carole"]} | {"title":"Top Gun", "tagline":"I feel the need, the need for speed.", "released":1986} |
{"name":"Meg Ryan","born":1961} | {"roles":["Annie Reed"]} | {"title":"Sleepless in Seattle", "tagline":"What if someone you never met, someone you never saw, someone you never knew was the only someone for you?", "released":1993} |
Source Splitter
The proposal is to add "Splits Number" Source configuration property, which allows specifying the desired number of splits to divide the query into when reading from Neo4j.
Fewer splits may be created if the query cannot be divided into the desired number of splits.
Also, we can use '0' as the default value for this configuration property and determine the number of splits according to the number of map tasks (controlled by the "mapreduce.job.maps" property):
Code Block |
---|
public List<InputSplit> getSplits(JobContext job) throws IOException {
...
int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
... |
'MATCH ... RETURN COUNT(*)' CQL query can be used in order to get a total number of documents, that will be divided between splits using 'SKIP' and 'LIMIT'
Source Properties
...
The query to use to import data from the Neo4j database.
Query example: 'MATCH (n:Label) RETURN n.property_1, n.property_2'.
...
Field Name which will be used for ordering during splits generation. This is required unless numSplits is set to one.
Source Data Types Mapping
Neo4j Data Types | CDAP Schema Data Types
|
---|
null | null |
List | array |
Map | record |
Boolean | boolean |
Integer | long |
Float | double |
String | string |
ByteArray | bytes |
Date | date |
Time | time-micros |
LocalTime | time-micros |
DateTime | timestamp-micros |
LocalDateTime | timestamp-micros |
Node | record
Schema example:
Code Block |
{"name": "n", "type": {
Other case using CQL for getting data from Neo4j:
Code Block |
---|
MATCH (person:Person {name: "Meg Ryan"})-[rel]-(movie) RETURN person.name AS name, rel.roles AS roles, movie.title AS title |
Result of this query will be next:
Text view |
---|
name | roles | title |
"Meg Ryan" | ["DeDe", "Angelica Graynamore", "Patricia Graynamore"] | "Joe Versus the Volcano" |
"Meg Ryan" | ["Sally Albright"] | "When Harry Met Sally" |
"Meg Ryan" | ["Kathleen Kelly"] | "You've Got Mail" |
"Meg Ryan" | ["Carole"] | "Top Gun" |
"Meg Ryan" | ["Annie Reed"] | "Sleepless in Seattle" |
Source Splitter
The proposal is to add "Splits Number" Source configuration property, which allows specifying the desired number of splits to divide the query into when reading from Neo4j.
Fewer splits may be created if the query cannot be divided into the desired number of splits.
Also, we can use '0' as the default value for this configuration property and determine the number of splits according to the number of map tasks (controlled by the "mapreduce.job.maps" property):
Code Block |
---|
public List<InputSplit> getSplits(JobContext job) throws IOException {
...
int targetNumTasks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
... |
'MATCH ... RETURN COUNT(*)' CQL query can be used in order to get a total number of documents, that will be divided between splits using 'SKIP' and 'LIMIT'
Example:
In this case each split will be run next query
Code Block |
---|
MATCH (person:Person) RETURN person ORDER BY person.born SKIP x LIMIT y |
where 'x' and 'y' determined for each split based on 'Splits Number' and total counts of records.
Source Properties
Section | User Facing Name | Widget Type | Description | Constraints |
---|
General | Label | textbox | Label for UI. |
|
| Reference Name | textbox | Uniquely identified name for lineage. | Required |
| Neo4j Host
| textbox | Neo4j database host. | Required |
| Neo4j Port | textbox
| Neo4j database port. | Required |
| Input Query | textbox | The query to use to import data from the Neo4j database. Query example: 'MATCH (n:Label) RETURN n.property_1, n.property_2'. | Required |
Credentials | Username | textbox | User identity for connecting to the Neo4j. | Required |
| Password | password | Password to use to connect to the Neo4j. | Required |
Advanced | Splits Number | number | The number of splits to generate. If set to one, the orderBy is not needed. |
|
| Order By | textbox | Field Name which will be used for ordering during splits generation. This is required unless numSplits is set to one. |
|
Source Data Types Mapping
Neo4j Data Types | CDAP Schema Data Types
|
---|
null | null |
List | array |
Map | record |
Boolean | boolean |
Integer | long |
Float | double |
String | string |
ByteArray | bytes |
Date | date |
Time | time-micros |
LocalTime | time-micros |
DateTime | timestamp-micros |
LocalDateTime | timestamp-micros |
Node https://neo4j.com/docs/cypher-manual/3.5/syntax/values/#structural-types | record Schema example: Code Block |
---|
{"name": "n", "type": {
"type": "record", "name": "n", "fields": [
{"name": "born", "type": "long"},
{"name": "name", "type": "string"},
{"name": "_id", "type": "long"},
{"name": "_labels", "type": {"type": "array", "items": "string"}}
]
}} |
|
Relationship https://neo4j.com/docs/cypher-manual/3.5/syntax/values/#structural-types | record Schema example: Code Block |
---|
{"name": "r", "type": {
"type": "record", "name": "r", "fields": [
{"name": "_startId", "type": "long"},
{"name": "roles", "type": {"type": "array", "items": "string"}},
{"name": "_type", "type": "string"},
{"name": "_endId", "type": "long"},
{"name": "_id", "type": "long"}
]
}} |
|
Duration A Duration represents a temporal amount, capturing the difference in time between two instants, and can be negative. https://neo4j.com/docs/cypher-manual/3.5/syntax/temporal/#cypher-temporal-durations | record Schema example: Code Block |
---|
{
"type": "record",
"name": "duration",
"fields": [
{"name": "duration", "type": "string"},
{"name": "seconds", "type": "long"},
{"name": "months", "type": "long"},
{"name": "days", "type": "long"},
{"name": "nanoseconds", "type": "int"}
]
} |
|
Point https://neo4j.com/docs/cypher-manual/3.5/syntax/spatial/
| record Schema example: point 2D Code Block |
---|
{
"type": "record",
"name": "point_2d",
"fields": [
{"name": "crs", "type": "string"},
{"name": "x", "type": "double"},
{"name": "y", "type": "double"},
{"name": "srid", "type": "int"}
]
} |
point 3D Code Block |
---|
{
"type": "record",
"name": "point_3d",
"fields": [
{"name": "crs", "type": "string"},
{"name": "x", "type": "double"},
{"name": "y", "type": "double"},
{"name": "z", "type": "double"},
{"name": "srid", "type": "int"}
]
} |
geo point 2D Code Block |
---|
{
"type": "record",
"name": "geo_2d",
"fields": [
{"name": "crs", "type": "string"},
{"name": "latitude", "type": "double"},
{"name": "x", "type": "double"},
{"name": "y", "type": "double"},
{"name": "srid", "type": "int"},
{"name": "longitude", "type": "double"}
]
} |
geo point 3D Code Block |
---|
{
"type": "record",
"name": "geo_3d",
"fields": [
{"name": "crs", "type": "string"},
{"name": "latitude", "type": "double"},
{"name": "x", "type": "double"},
{"name": "y", "type": "double"},
{"name": "z", "type": "double"},
{"name": "srid", "type": "int"},
{"name": "longitude", "type": "double"},
{"name": "height", "type": "double"}
]
} |
|
Path https://neo4j.com/docs/cypher-manual/3.5/syntax/values/#structural-types |
|
Sink Properties
Section | User Facing Name | Widget Type | Description | Constraints |
---|
General | Label | textbox | Label for UI. |
|
| Reference Name | textbox | Uniquely identified name for lineage. | Required |
| Neo4j Host | textbox | Neo4j database host. | Required |
| Neo4j Port | textbox
| Neo4j database port. | Required |
| Output Query | textbox | The query to use to export data to the Neo4j database. Query example: 'CREATE (n:<label_field> $(property_1, property_2))' or 'CREATE (n:<label_field> $(*))'
| Required |
Credentials | Username | textbox | User identity for connecting to the Neo4j. | Required |
| Password | password | Password to use to connect to the Neo4j. | Required |
Output query additionl information
Output query is based on CQL syntax, but using CQL query with CDAP has several problem:
- neo4j-jdbc-driver can process property values only if it primitive types or arrays thereof.
- difficult to relate the output data to CQL query.
To solve these problems, the following solution was proposed:
Using next structure $(...) for identify place where properties will be inserted.
Example of using $(...):
List of output fields: ["name", "age", "profesion", "company", "rating", "position"]
Output query | Expected results |
---|
CREATE (n:Node $(*)) | Will be created node with label Node and properties ["name", "age", "profesion", "company", "rating", "position"] |
CREATE (p:Person $(name, age, profesion)), (c:Company $(company, rating)) | Will be created node with label Person and properties ["name", "age", "profesion"] Will be created node with label Companyand properties ["company", "rating"] |
CREATE (p:Person $(name, profesion))-[r:WorkOn $(position)]->(c:Company $(company)) | Will be created node with label Person and properties ["name", "profesion"] Will be created relation with type WorkOn and properties ["position"] Will be created node with label Companyand properties ["company"] |
Sink Data Types Mapping
CDAP Schema Data Types | Neo4j Data Types |
---|
null | null |
array | List |
boolean | Boolean |
long | Integer |
double | Float |
string | String |
bytes | ByteArray |
date | Date |
time-micros | Time |
timestamp-micros | DateTime |
record Code Block |
---|
{
"type": "record",
"name": " |
|
n bornlong namestring _idmonths", "type": "long"},
|
|
_labelsdays", "type": "long"},
{" |
|
typearrayitemsstring} }RelationshipSchema example:point 2D Code Block |
---|
{
"type": "record",
"name": " |
|
rtype{ {"name": "crs", "type": " |
|
recordrfields[
_startIdlong
rolessrid", "type": "int"}
]
} |
point 3D |
arrayitemsstring}}_type
_endIdlong
_idlong]
}}Duration A Duration represents a temporal amount, capturing the difference in time between two instants, and can be negative. | record
Schema example:
Code Block |
---|
{"name": "z", "type": "double"},
{"name": " |
|
dr{geo point 2D Code Block |
---|
{
"type": "record",
"name": " |
|
dr
duration
secondslong
monthslong
dayslong
nanoseconds
]}}Point | record
Schema example:
Code Block |
plongitude", "type": "double"}
|
|
{geo point 3D Code Block |
---|
{
"type": "record",
"name": " |
|
p
{"name": "crs", "type": "string"}, |
|
xlatitude", "type": "double"}, |
|
y
sridstring"}
]
}}Path | Sink Properties
...
Sink Data Types Mapping
...
Approach
...
double"},
{"name": "z", "type": "double"},
{"name": "srid", "type": "int"},
{"name": "longitude", "type": "double"},
{"name": "height", "type": "double"}
]
} |
| Point |
Approach
Create a module neo4j-plugin in database-plugins project, reuse existing database-plugins code if possible. Add Neo4j-specific properties to configuration, add support for Neo4j-specific data types. Update UI widgets JSON definitions.
Pipeline Samples
Please attach one or more sample pipeline(s) and associated data.
...