Class Neo4jIO
Driver configuration
To read from or write to Neo4j you have to provide a Neo4jIO.DriverConfiguration
using
Neo4jIO.DriverConfiguration.create()
or Neo4jIO.DriverConfiguration.create(String, String, String)
(URL, username and password). Note that subclasses of DriverConfiguration must also be
Serializable
).
At the level of the Neo4j driver configuration you can specify a Neo4j Config
object with
Neo4jIO.DriverConfiguration.withConfig(Config)
. This way you can configure the Neo4j driver
characteristics. Likewise, you can control the characteristics of Neo4j sessions by optionally
passing a SessionConfig
object to Neo4jIO.ReadAll
or Neo4jIO.WriteUnwind
. For example,
the session configuration will allow you to target a specific database or set a fetch size.
Finally, in even rarer cases you might need to configure the various aspects of Neo4j
transactions, for example their timeout. You can do this with a Neo4j TransactionConfig
object.
Neo4j Aura
If you have trouble connecting to a Neo4j Aura database please try to disable a few security algorithms in your JVM. This makes sure that the right one is picked to connect:
Security.setProperty(
"jdk.tls.disabledAlgorithms",
"SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
To execute this code on GCP Dataflow you can create a class which extends JvmInitializer
and implement the JvmInitializer.onStartup()
method. You need to annotate
this new class with AutoService
@AutoService(value = JvmInitializer.class)
Reading from Neo4j
readAll()
source returns a bounded collection of OuptutT
as a
PCollection<OutputT>
. OutputT is the type returned by the provided Neo4jIO.RowMapper
. It accepts
parameters as input in the form of ParameterT
as a PCollection<ParameterT>
The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for each provided age.
The mapping SerializableFunction
maps input values to each execution of the Cypher
statement. In the function simply return a map containing the parameters you want to set.
The Neo4jIO.RowMapper
converts output Neo4j Record
values to the output of the source.
pipeline
.apply(Create.of(40, 50, 60))
.apply(Neo4jIO.<Integer, String>readAll()
.withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
.withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
.withReadTransaction()
.withCoder(StringUtf8Coder.of())
.withParametersFunction( age -> Collections.singletonMap( "age", age ))
.withRowMapper( record -> return record.get(0).asString() )
);
Writing to Neo4j
The Neo4j Neo4jIO.WriteUnwind
transform supports writing data to a graph. It writes a PCollection
to the graph by collecting a batch of elements after which all elements in the batch
are written together to Neo4j.
Like the source, to configure this sink, you have to provide a Neo4jIO.DriverConfiguration
.
In the following example we'll merge a collection of Row
into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
question needs to be an UNWIND statement. Like in the read case, the parameters SerializableFunction
converts parameter values to a Map
. The difference here is that the
resulting Map is stored in a List
(containing maps) which in turn is stored in another
Map under the name provided by the Neo4jIO.WriteUnwind.withUnwindMapName(String)
method. All of
this is handled automatically. You do need to provide the unwind map name so that you can
reference that in the UNWIND statement.
For example:
pipeline
.apply(...)
.apply(Neo4jIO.<Row>writeUnwind()
.withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
.withUnwindMapName("rows")
.withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
.withParametersFunction( row -> ImmutableMap.of(
"id", row.getString("id),
"first", row.getString("firstName")
"last", row.getString("lastName")))
);
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
This describes all the information needed to create a Neo4jSession
.static class
Wraps aNeo4jIO.DriverConfiguration
to provide aDriver
.static class
This is the class which handles the work behind thereadAll()
method.static interface
An interface used byNeo4jIO.ReadAll
for converting each row of a Neo4jResult
recordRecord
into an element of the resultingPCollection
.static class
This is the class which handles the work behind thewriteUnwind()
method. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <ParameterT,
OutputT>
Neo4jIO.ReadAll<ParameterT, OutputT> readAll()
Read all rows using a Neo4j Cypher query.static <ParameterT>
Neo4jIO.WriteUnwind<ParameterT> Write all rows using a Neo4j Cypher UNWIND cypher statement.
-
Constructor Details
-
Neo4jIO
public Neo4jIO()
-
-
Method Details
-
readAll
Read all rows using a Neo4j Cypher query.- Type Parameters:
ParameterT
- Type of the data representing query parameters.OutputT
- Type of the data to be read.
-
writeUnwind
Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch size of 5000.- Type Parameters:
ParameterT
- Type of the data representing query parameters.
-