Class Neo4jIO
Driver configuration
To read from or write to Neo4j you have to provide a Neo4jIO.DriverConfiguration using
Neo4jIO.DriverConfiguration.create() or Neo4jIO.DriverConfiguration.create(String, String, String) (URL, username and password). Note that subclasses of DriverConfiguration must also be
Serializable).
At the level of the Neo4j driver configuration you can specify a Neo4j Config object with
Neo4jIO.DriverConfiguration.withConfig(Config). This way you can configure the Neo4j driver
characteristics. Likewise, you can control the characteristics of Neo4j sessions by optionally
passing a SessionConfig object to Neo4jIO.ReadAll or Neo4jIO.WriteUnwind. For example,
the session configuration will allow you to target a specific database or set a fetch size.
Finally, in even rarer cases you might need to configure the various aspects of Neo4j
transactions, for example their timeout. You can do this with a Neo4j TransactionConfig
object.
Neo4j Aura
If you have trouble connecting to a Neo4j Aura database please try to disable a few security algorithms in your JVM. This makes sure that the right one is picked to connect:
Security.setProperty(
"jdk.tls.disabledAlgorithms",
"SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
To execute this code on GCP Dataflow you can create a class which extends JvmInitializer and implement the JvmInitializer.onStartup() method. You need to annotate
this new class with AutoService
@AutoService(value = JvmInitializer.class)
Reading from Neo4j
readAll() source returns a bounded collection of OuptutT as a
PCollection<OutputT>. OutputT is the type returned by the provided Neo4jIO.RowMapper. It accepts
parameters as input in the form of ParameterT as a PCollection<ParameterT>
The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for each provided age.
The mapping SerializableFunction maps input values to each execution of the Cypher
statement. In the function simply return a map containing the parameters you want to set.
The Neo4jIO.RowMapper converts output Neo4j Record values to the output of the source.
pipeline
.apply(Create.of(40, 50, 60))
.apply(Neo4jIO.<Integer, String>readAll()
.withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
.withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
.withReadTransaction()
.withCoder(StringUtf8Coder.of())
.withParametersFunction( age -> Collections.singletonMap( "age", age ))
.withRowMapper( record -> return record.get(0).asString() )
);
Writing to Neo4j
The Neo4j Neo4jIO.WriteUnwind transform supports writing data to a graph. It writes a PCollection to the graph by collecting a batch of elements after which all elements in the batch
are written together to Neo4j.
Like the source, to configure this sink, you have to provide a Neo4jIO.DriverConfiguration.
In the following example we'll merge a collection of Row
into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
question needs to be an UNWIND statement. Like in the read case, the parameters SerializableFunction converts parameter values to a Map. The difference here is that the
resulting Map is stored in a List (containing maps) which in turn is stored in another
Map under the name provided by the Neo4jIO.WriteUnwind.withUnwindMapName(String) method. All of
this is handled automatically. You do need to provide the unwind map name so that you can
reference that in the UNWIND statement.
For example:
pipeline
.apply(...)
.apply(Neo4jIO.<Row>writeUnwind()
.withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
.withUnwindMapName("rows")
.withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
.withParametersFunction( row -> ImmutableMap.of(
"id", row.getString("id),
"first", row.getString("firstName")
"last", row.getString("lastName")))
);
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classThis describes all the information needed to create a Neo4jSession.static classWraps aNeo4jIO.DriverConfigurationto provide aDriver.static classThis is the class which handles the work behind thereadAll()method.static interfaceAn interface used byNeo4jIO.ReadAllfor converting each row of a Neo4jResultrecordRecordinto an element of the resultingPCollection.static classThis is the class which handles the work behind thewriteUnwind()method. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <ParameterT,OutputT>
Neo4jIO.ReadAll<ParameterT, OutputT> readAll()Read all rows using a Neo4j Cypher query.static <ParameterT>
Neo4jIO.WriteUnwind<ParameterT> Write all rows using a Neo4j Cypher UNWIND cypher statement.
-
Constructor Details
-
Neo4jIO
public Neo4jIO()
-
-
Method Details
-
readAll
Read all rows using a Neo4j Cypher query.- Type Parameters:
ParameterT- Type of the data representing query parameters.OutputT- Type of the data to be read.
-
writeUnwind
Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch size of 5000.- Type Parameters:
ParameterT- Type of the data representing query parameters.
-