Class Neo4jIO

java.lang.Object
org.apache.beam.sdk.io.neo4j.Neo4jIO

public class Neo4jIO extends Object
This is a Beam IO to read from, and write data to, Neo4j.

Driver configuration

To read from or write to Neo4j you have to provide a Neo4jIO.DriverConfiguration using
Neo4jIO.DriverConfiguration.create() or Neo4jIO.DriverConfiguration.create(String, String, String) (URL, username and password). Note that subclasses of DriverConfiguration must also be Serializable).
At the level of the Neo4j driver configuration you can specify a Neo4j Config object with Neo4jIO.DriverConfiguration.withConfig(Config). This way you can configure the Neo4j driver characteristics. Likewise, you can control the characteristics of Neo4j sessions by optionally passing a SessionConfig object to Neo4jIO.ReadAll or Neo4jIO.WriteUnwind. For example, the session configuration will allow you to target a specific database or set a fetch size. Finally, in even rarer cases you might need to configure the various aspects of Neo4j transactions, for example their timeout. You can do this with a Neo4j TransactionConfig object.

Neo4j Aura

If you have trouble connecting to a Neo4j Aura database please try to disable a few security algorithms in your JVM. This makes sure that the right one is picked to connect:


 Security.setProperty(
       "jdk.tls.disabledAlgorithms",
       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
 

To execute this code on GCP Dataflow you can create a class which extends JvmInitializer and implement the JvmInitializer.onStartup() method. You need to annotate this new class with AutoService


 @AutoService(value = JvmInitializer.class)
 

Reading from Neo4j

readAll() source returns a bounded collection of OuptutT as a PCollection<OutputT>. OutputT is the type returned by the provided Neo4jIO.RowMapper. It accepts parameters as input in the form of ParameterT as a PCollection<ParameterT>

The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for each provided age.

The mapping SerializableFunction maps input values to each execution of the Cypher statement. In the function simply return a map containing the parameters you want to set.

The Neo4jIO.RowMapper converts output Neo4j Record values to the output of the source.


 pipeline
   .apply(Create.of(40, 50, 60))
   .apply(Neo4jIO.<Integer, String>readAll()
     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
     .withReadTransaction()
     .withCoder(StringUtf8Coder.of())
     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
     .withRowMapper( record -> return record.get(0).asString() )
   );
 

Writing to Neo4j

The Neo4j Neo4jIO.WriteUnwind transform supports writing data to a graph. It writes a PCollection to the graph by collecting a batch of elements after which all elements in the batch are written together to Neo4j.

Like the source, to configure this sink, you have to provide a Neo4jIO.DriverConfiguration.

In the following example we'll merge a collection of Row into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The rows are being used as a container for the parameters of the Cypher statement. The used Cypher in question needs to be an UNWIND statement. Like in the read case, the parameters SerializableFunction converts parameter values to a Map. The difference here is that the resulting Map is stored in a List (containing maps) which in turn is stored in another Map under the name provided by the Neo4jIO.WriteUnwind.withUnwindMapName(String) method. All of this is handled automatically. You do need to provide the unwind map name so that you can reference that in the UNWIND statement.

For example:


 pipeline
   .apply(...)
   .apply(Neo4jIO.<Row>writeUnwind()
      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
      .withUnwindMapName("rows")
      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
      .withParametersFunction( row -> ImmutableMap.of(
        "id", row.getString("id),
        "first", row.getString("firstName")
        "last", row.getString("lastName")))
    );
 
  • Constructor Details

    • Neo4jIO

      public Neo4jIO()
  • Method Details

    • readAll

      public static <ParameterT, OutputT> Neo4jIO.ReadAll<ParameterT,OutputT> readAll()
      Read all rows using a Neo4j Cypher query.
      Type Parameters:
      ParameterT - Type of the data representing query parameters.
      OutputT - Type of the data to be read.
    • writeUnwind

      public static <ParameterT> Neo4jIO.WriteUnwind<ParameterT> writeUnwind()
      Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch size of 5000.
      Type Parameters:
      ParameterT - Type of the data representing query parameters.