Integrating Kafka with Neo4j

Kafka is used in many organisations to shuffle data around between different services. I’m not going into what Kafka is and how to run it as there are many videos and tutorials out there. I liked the Apache Kafka explained video as it is concise and cuts down on noise. If you prefer something longer, I recommend Introduction to Apache Kafka by James Ward. Of interest for this post is that Kafka provides a broker where publishers can send messages to a topic and subscriber can poll these topics. The Kafka Connect service allows registering plugins with the platform.

Neo4j can act as a writer(source) or destination (sink) for messages. The 2 available integration options are:

  • Neo4j plugin; Installed inside the database, it can act as a source of messages as well as receive messages from Kafka and interact with the database. This option also allows for CDC (change data capture) by registering a transaction handler and send messaged with the changed data to Kafka. It also provides functions to stream results from queries to Kafka.

  • Kafka plugin; Is registered with Kafka Connect and runs separably from Neo4j. As such, it can not provide CDC but acts just like any other Neo4j client using the Bolt protocol to speak with Neo4j.

To follow along, I provide the necessary docker-compose files. All code example refers to that repository and assumes that you are in a directory with that repository checked out. Each compose file uses a directory underneath $HOME/tmp/docker/. with a name that matches the compose file name.

Both options are described in detail at the plugin home page.

If all you want to do is stream data into Neo4j, I prefer the second option, running the plugin inside Kafka Connect. Let’s look at this option first.

Kafka Connect plugin

As the name implies, this is a plugin for Kafka Connect. The compose file installs 2 plugins when firing up the image:

confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.0
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

We will use the kafka-connect-datagen plugin to generate some messages to ingest. With this, the plugins are available, but not yet configured and therefore inactive.

Before we can configure the plugins, you need to fire up the stack:

docker-compose -f kafka-sink2neo-35.yml up

This will take some time. Once ready, you can connect to the Kafka Control Center and the Neo4j Browser (neo4j/changeme).

The datagen plugin comes with a few predefined templates, in this example, I used the users schema that generates JSON messages such as this:

{"registertime":1500599586519,"userid":"User_9","regionid":"Region_5","gender":"MALE"}
{"registertime":1493882456812,"userid":"User_9","regionid":"Region_3","gender":"OTHER"}
{"registertime":1514584804940,"userid":"User_9","regionid":"Region_8","gender":"FEMALE"}
{"registertime":1498613454415,"userid":"User_7","regionid":"Region_9","gender":"FEMALE"}
{"registertime":1510970841590,"userid":"User_8","regionid":"Region_8","gender":"OTHER"}

With

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @configs/datagen-users.json

we create one connector. In the above-used file configs/datagen-users.json the lines

    "kafka.topic": "users",
    "quickstart": "users",

define with template to use and to with topic the messages are to send. The users topic should now show up in the Kafka Control Center.

In the same way, we can create a Neo4j sink connector:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @configs/neo4j.sink.json

Beside the Kafka options, the configs/neo4j.sink.json file configures the following:

{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "users", (1)
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "kafka.key.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
    "kafka.value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"false",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:17687", (2)
    "neo4j.authentication.basic.username": "neo4j", (3)
    "neo4j.authentication.basic.password": "changeme", (3)
    "neo4j.topic.cypher.users":
         "MERGE (u:User{id: event.userid}) \
            on create set u.registertime=event.registertime,\
                      u.gender=event.gender , u.source='datagen' \
          MERGE (r:Region{id: event.regionid}) \
          MERGE (u)-[:FROM]->(r)" (4)
  }
}
  1. Topic(s) to pull messages from

  2. Neo4j connect URL

  3. Username/Password to connect to Neo4j

  4. A Cypher template per topic to process each message

You can define multiple topics and a Cypher template for each topic. Messages are processed in batches. To see the queries send to Neo4j, lets switch on query logging via the Neo4j browser:

call dbms.setConfigValue('dbms.logs.query.parameter_logging_enabled', 'true')
call dbms.setConfigValue('dbms.logs.query.enabled', 'true')

The query log can be found in $HOME/tmp/docker/kafka-source35/neo4j/logs/query.log. A typical entry in the log will lock like this:

UNWIND {events} AS event
MERGE (u:User{id: event.userid})
  on create set u.registertime=event.registertime, u.gender=event.gender ,
    u.source='datagen'
MERGE (r:Region{id: event.regionid})
MERGE (u)-[:FROM]->(r) -
{events: [
{registertime: 1493833790153, gender: 'FEMALE', regionid: 'Region_8', userid: 'User_1'},
{registertime: 1509889577306, gender: 'FEMALE', regionid: 'Region_3', userid: 'User_3'},
{registertime: 1511248947881, gender: 'MALE', regionid: 'Region_2', userid: 'User_5'},
{registertime: 1517976858275, gender: 'OTHER', regionid: 'Region_9', userid: 'User_6'},
..snip..
]}

As you can see, the message template we defined is preceded with an UNWIND and the actual messages passed into the template as an event object. Messages are batched for better performance. To actually see the batching happen, you may have to adjust the max.interval parameter for the datagen, as well as start a few generators in parallel.

As the plugin home page contains most of the information and links, setting this up was not difficult. The biggest problem was to configure the (de)serializer and converter correctly.

Neo4j Streams Plugin

The other option to integrate Neo4j with Kafka is to use a plugin in Neo4j. As with all Neo4j plugins, these are provided as jar files and must reside in the plugins folder of your Neo4j installation. Plugins are loaded and registered at startup of Neo4j, so a restart is required to add the plugin.

Unfortunately, this plugin does not yet support automatic version detection and loading during docker startup and needs to be downloaded into the plugin directory:

mkdir -p  $HOME/tmp/docker/kafka-source35/neo4j/plugins
curl -L https://github.com/neo4j-contrib/neo4j-streams/releases/download/3.5.6/neo4j-streams-3.5.6.jar \
     --output $HOME/tmp/docker/kafka-source35/neo4j/plugins/neo4j-streams-3.5.6.jar --silent

With the plugin in place, we can fire up the Kafka components along with Neo4j 3.5:

docker-compose -f kafka-source35.yml up | tee up.log
Note
The above command sends the output into the file up.log to search for error messages. Esp. the Kafka broker is extremely spammy, making it hard to find error messages from other services in the compose file.

The Neo4j service in the compose file loads the APOC plugin and configures the streams plugin to find the Kafka components:

      - NEO4J_kafka_zookeeper_connect=zookeeper:2181
      - NEO4J_kafka_bootstrap_servers=broker:9093

Streams Procedures

The plugin comes with 2 procedures to send data to and receive data from Kafka topics.

Once all services are up, we can connect to the Neo4j browser and send a test message:

call streams.publish('hello', 'Hello Kafka!')

Since we don’t have a consumer defined, we must use the Control Center to see our message.

The payload streamed to Kafka can be anything, so if we want to stream some data from the example movie graph, we could do something like:

MATCH p=()--() with p limit 5
call streams.publish('movies', p) return p

we would get a message in the movies topic similar to this example. With APOCs background jobs one could build a simple system that periodically pushes data to Kafka:

call apoc.periodic.repeat("just4fun","
MATCH p=()--() with p limit 5
call streams.publish('movies', p) return p",30)

It is also possible to consume message from Kafka via streams.consume. Again, together with the above APOC function, one could build a simple data ingestion pipeline.

Change Data Capturing

The Neo4j streams plugin can also perform CDC, sending the changed data to Kafka topic(s). Change data events are created for every Create, Update, Delete operation. The documentation has more details on the structure of these events.

To enable CDC events, the compose file contains the following setting:

      - NEO4J_streams_source_enabled=true

Without further configuration, all CDC events will be sent to a neo4j topic in Kafka. This can be fine-tuned via streams.source.topic.nodes and streams.source.topic.relationships which allows defining different topics and control what labels / relationships and attributes are sent to the configured Kafka topics.