Syncing Neo4j

Neo4j provides the facilities to keep members of a cluster in sync. These allow also for complex setups with multiple data-centres and geographically distributed setups. But, they require the members to be of the same (major) version.

If you want to keep 2 (or more) installations in sync that are not of the same version, you need some other technique. One reason to do so is upgrading to a new major version without downtime.

To do so, one would need to capture all changes done on one database and apply them to other databases. In my previous article, I explained various options of using Kafka to stream data between databases. In this post, I would like to explain the options available for the following scenario:

  • an existing Neo4j 3.5 installation (cluster or single)

  • a constant stream of data coming into Neo4j

  • plan to upgrade to Neo4j 4.x without downtime

The solutions sketched in this post will allow for the above and will enable to keep the both 3.5 and 4.x running in parallel for some time to validate the correct working of your application with 4.x.

To capture changes in the 3.5 (source) database, we need to install and configure the Neo4j streams plugin. The plugin will register a TransactionListener and send all data changes in Debezium format to the configured Kafka topic. On the receiving side (4.x), we have the choice to use a Neo4j plugin or use Kafka Connect to run a Kafka plugin. In this blog post will use the second option, as the Connect plugin has the following advantages:

  • restarting the plugin to change configuration option does not need a restart of Neo4j

  • memory and CPU are not shared with Neo4j installations

Change events are created in Debezium format. Using the information provided, the receiving side (sink) has to identify the nodes and relationships to apply changes. The Neo4j streams module offers 2 strategies for that: SourceId and Schema.

Note
I use docker-compose to create the needed infrastructure. To follow along, checkout the repository. It contains instructions at the head of the file kafka-neo4j-sync.yml. At the time of writing version 1.0.6 was just made available via cunfluent hub. For the examples here, version 1.0.7 is needed. You can either download and place it into the connect plugin folder, or, if 1.0.7 is available on confluent hub, remove the comments in the command section of the connect service in the compose file. Docker compose makes it easy to start/restart/stop the services and experiment with different settings. The above compose file create directories for the volumes under $HOME/tmp/docker/kafka-neo4j-sync/. This allows for easy deletion and creating a fresh starting point.

The result of the compose file looks like this:

Architecture Overview
Figure 1. architecture overview

SourceID strategy

This strategy uses the (internal) id of a node/relationship for matching. As these ids are basically pointers to objects in the store files, they are unique for each database and can change by events that alter the layout of the store files (esp.: backup/restore). As a result, the SourceId strategy keeps track of the Id in the source system and adds a label to newly created nodes.

To see how this works, we create a very simple graph in the source database (3.5):

create (:A {name:'node A'})-[:RELATES_TO {prop1:true}]->(:A {name:'node B'})

The provided docker-compose file enabled query logging, so we will find the following statements in the sink database:

UNWIND $events AS event
 MERGE (n:SourceEvent{sourceId: event.id})
  SET n = event.properties
  SET n.sourceId = event.id
  SET n:A
{events: [{properties: {name: 'node A'}, id: '0'}, {properties: {name: 'node B'}, id: '1'}]}

UNWIND $events AS event
  MERGE (start:SourceEvent{sourceId: event.start})
  MERGE (end:SourceEvent{sourceId: event.end})
  MERGE (start)-[r:RELATES_TO{sourceId: event.id}]->(end)
    SET r = event.properties
    SET r.sourceId = event.id
{events: [{start: '0', end: '1', id: '0', properties: {prop1: true}}]}

The first statement creates the 2 nodes by merging on the sourceId, while the second statement will create the relationship. To identify nodes and relationships, this strategy relies on the existence of the source node id and a label.

The name of the label and property to store the sourceId can be configured in the Connector plugin via:

streams.sink.topic.cdc.sourceId.labelName=SourceEvent
streams.sink.topic.cdc.sourceId.idName=sourceId

The given values above are the default values and can be omitted.

To match existing nodes and relationships, we need to add these identifiers. This needs to be done before the import of the database, as (major) version upgrades often have a different store file format. The database would perform a store migration at first startup:

neo4j40            | 2020-04-02 10:29:58.782+0000 INFO  [db1] Migrating Store files (1/6):
neo4j40            | 2020-04-02 10:29:59.235+0000 INFO  [db1]   10% completed
neo4j40            | 2020-04-02 10:29:59.237+0000 INFO  [db1]   20% completed
...

leading to changed internal ids.

To circumvent that problem, we need to assign the label and property before exporting the database:

match (n) set n:SourceEvent, n.sourceId=toString(id(n));
match ()-[r]->() set r.sourceId=toString(id(r));
# for a larger graph, use apoc to prevent OOM
call apoc.periodic.iterate('match (n) return n', 'set n:SourceEvent, n.sourceId=toString(id(n))',
    {batchsize:100, iterateList:true, parallel:true});
call apoc.periodic.iterate('match ()-[r]->() return r', 'set r.sourceId=toString(id(r))',
   {batchsize:100, iterateList:true, parallel:true})

To speed up the data ingestion, we should create an index on the sourceId property:

CREATE CONSTRAINT ON (n:SourceEvent) ASSERT n.sourceId IS UNIQUE

After exporting/importing the data and starting the Kafka infrastructure, all changes to the source database should be reflected on the sink database.

Schema Strategy

The other strategy to identify nodes and relationships uses schema information, e.g. constrains. Using the same example as with the sourceId strategy above, but with a unique constraint on the name field:

create constraint on (n:A) assert n.name is unique;
# wait some time, see below
create (:A {name:'node A'})-[:RELATES_TO {prop1:true}]->(:A {name:'node b'})

The schema information is sent along with the cdc data:

 "schema": {
    "properties": {
      "name": "String"
    },
    "constraints": [
      {
        "label": "A",
        "properties": [
          "name"
        ],
        "type": "UNIQUE"
      }

Schema information is refreshed periodically, the interval can be configured via streams.source.schema.polling.interval. The default is 300.000ms, which might be too long for testing purposes. The provided docker-compose file sets it to 10.000 ms.

In the query.log in the 4.x instance, we will see 3 queries executed:

UNWIND $events AS event
  MERGE (n:A{name: event.properties.name})
    SET n = event.properties - {events: [{properties: {name: 'node A'}}]}

UNWIND $events AS event
  MERGE (n:A{name: event.properties.name})
    SET n = event.properties - {events: [{properties: {name: 'node B'}}]}

UNWIND $events AS event
  MERGE (start:A{name: event.start.name})
  MERGE (end:A{name: event.end.name})
  MERGE (start)-[r:RELATES_TO]->(end)
    SET r = event.properties -
  {events: [{start: {name: 'node A'}, end: {name: 'node B'}, properties: {prop1: true}}]}

As you can see, the schema constraint is used to merge on the nodes. Adding 2 properties to one node results to:

match (a:A) where id(a) = 0 set a.string="a string value", a.number=123
# -> query.log
UNWIND $events AS event
  MERGE (n:A{name: event.properties.name})
  SET n = event.properties -
  {events: [{properties: {name: 'node A', string: 'a string value', number: 123}}]}

This looks a lot easier to use than the sourceId strategy, but there is a catch. Looking at the query for the relationship, we see that the relationship is identified by source and target nodes and the type of the relationships. If your model relies on multiple relationships of the same type between 2 nodes, this will not work. Let’s see what happens:

match (a:A) where id(a) = 0 match (b) where id(b) = 1 create (a)-[r:RELATES_TO {prop1:false}]->(b)

Note the create for the relationship. After execution, we will see 2 relationships between our 2 nodes, with the prop1 set to true on one and false on the other. The query executed against the sink database:

UNWIND $events AS event
  MERGE (start:A{name: event.start.name})
  MERGE (end:A{name: event.end.name})
  MERGE (start)-[r:RELATES_TO]->(end)
    SET r = event.properties -
    {events: [{start: {name: 'node A'}, end: {name: 'node B'}, properties: {prop1: false}}]}

does not reflect this. Therefore we end up with just one RELATES_TO relationship in the sink database, with the prop1 set to the latest value (false). So, while the schema strategy is easier to configure and use, it may not suit your data model.

Kafka Considerations

I can’t give general guidance on how to configure Kafka. Just a few remarks:

  • Don’t use partitions to process the cdc events. For database replication, the order of events is important.

  • The Plugins (Neo4j plugin as well as the Connect plugin) use the official Kafka Java driver and support all the configuration options provided by this driver. All configuration options that start with kafka. are passed through to the driver.