Yet another ETL tool, you ask? Well, kind of. But this is not a complete tool; it’s a set of LEGO bricks you can stack together to build an ETL pipeline. There are plenty of enterprise-grade ETL solutions out there and, if you have access to them, use them. But often enough—even in large organisations—I encounter teams that cannot or do not want to use these solutions.
Working as a consultant for a database company, I often help customers load data into Neo4j. Often, this starts off as a collection of LOAD) CSV scripts during a PoC phase. When moving into MVP or production, these scripts lack some core requirements I deem essential:
Logging of performed tasks and their outcomes
Error handling
Data validation
Streaming and batching
This toolbox provides all of them, and a few more, while all you have to do is place the Cypher commands (you may already have them from the PoC phase) into Python, add some code to set up the environment (with support from the toolbox), and optionally provide some Pydantic classes for data validation.
Tasks
Example class to load CSV data into Neo4j (from the GTFS Example):
class LoadAgenciesTask(CSVLoad2Neo4jTask):
def __init__(self, context: ETLContext, file: Path):
super().__init__(context, file)
def _query(self):
return """ UNWIND $batch AS row
MERGE (a:Agency {id: row.id})
SET a.name= row.name,
a.url = row.url,
a.timezone = row.timezone,
a.lang = row.lang,
a.phone = row.phone,
a.fareUrl = row.fare_url,
a.email = row.email
"""The above loads the CSV data in batches and writes them in batches to the graph, while collecting statistics such as nodes/relationships/properties added and (reporting) these statistics.
These tasks can then be placed into pipelines and executed. ((Parallel)TaskGroups allow building more structured pipelines. An example pipeline (again from GTFS)):
schema = SchemaTask(context=context)
init_group = TaskGroup(context=context, tasks=[schema], name="schema-init")
tasks = [
LoadAgenciesTask(context=context, file=input_directory / LoadAgenciesTask.file_name()),
LoadRoutesTask(context=context, file=input_directory / LoadRoutesTask.file_name()),
LoadStopsTask(context=context, file=input_directory / LoadStopsTask.file_name()),
LoadTripsTask(context=context, file=input_directory / LoadTripsTask.file_name()),
LoadCalendarTask(context=context, file=input_directory / LoadCalendarTask.file_name()),
LoadStopTimesTask(context=context, file=input_directory / LoadStopTimesTask.file_name()),
]
csv_group = TaskGroup(context=context, tasks=tasks, name="csv-loading")
post_group = TaskGroup(context=context, tasks=[CreateSequenceTask(context=context)], name="post-processing")
all_group = TaskGroup(context=context, tasks=[init_group, csv_group, post_group], name="main")
context.reporter.register_tasks(all_group)
all_group.execute()Reporting
Reporting is always done to stdout and file. Optionally, depending on a single environment variable, information about performed tasks and pipelines is also written to a Neo4j database.
A simple NeoDash dashboard (this still needs some love) is provided that allows viewing these statistics. As the statistic data are updated during the run of the pipeline, this also allows viewing the progress of running pipelines.
The same information can be viewed from the CLI via provided commands. The provided CLI component makes embedding this into one’s own program easy. Example of viewing the details of one specific pipeline execution:
$ python <your-cli>.py detail 69260954-0b94-4043-be1b-f99ce5a64d3a
Showing details for run ID: 69260954-0b94-4043-be1b-f99ce5a64d3a
+-------------------------------------------------------------------------------+----------+-----------+------------+-----------+
| task | status | batches | duration | changes |
|-------------------------------------------------------------------------------+----------+-----------+------------+-----------|
| TaskGroup(schema-init) | success | | 0:00:00 | 0 |
| Task(SchemaTask) | success | | 0:00:00 | 0 |
| TaskGroup(csv-loading) | success | | 0:00:57 | 4566469 |
| LoadAgenciesTask(/Users/bert/Downloads/mdb-2333-202412230030/agency.txt) | success | 1 / - | 0:00:00 | 6 |
| LoadRoutesTask(/Users/bert/Downloads/mdb-2333-202412230030/routes.txt) | success | 1 / - | 0:00:00 | 1495 |
| LoadStopsTask(/Users/bert/Downloads/mdb-2333-202412230030/stops.txt) | success | 1 / - | 0:00:00 | 33360 |
| LoadTripsTask(/Users/bert/Downloads/mdb-2333-202412230030/trips.txt) | success | 19 / - | 0:00:03 | 733552 |
| LoadCalendarTask(/Users/bert/Downloads/mdb-2333-202412230030/calendar.txt) | success | 1 / - | 0:00:00 | 424 |
| LoadStopTimesTask(/Users/bert/Downloads/mdb-2333-202412230030/stop_times.txt) | success | 380 / - | 0:00:54 | 3797632 |
| TaskGroup(post-processing) | success | | 0:00:07 | 0 |
| Task(CreateSequenceTask) | success | | 0:00:07 | 0 |
+-------------------------------------------------------------------------------+----------+-----------+------------+-----------+Data Sources (and sinks)
At the time of writing, the ETL lib supports reading from:
CSV files
SQL databases
Neo4j databases
See Data Sources and Data Sinks for more information.
Since tasks are internally composed of BatchProcessors, adding new sources and sinks is fairly easy. I plan to add support for Parquet and Avro soon.
Parallel Data Loading
Ever since I read the excellent Mix and Batch blog post by my colleague Eric Monk, I wanted to build tooling for this in pure Python. I also wanted to wait with this blog post until I had this implemented in this library.
In short, this technique allows data to be split into batches that do not overlap. This is especially important when writing relationships. It does so by placing data onto an array where the rows and columns are derived from the data (say, the last digit of the node ID). The cells form the batches. Parallel stripes of these cells are the batches that can be loaded in parallel.
Incoming data is prefetched and placed into the buffer. When a diagonal stripe of cells fulfils the criteria to be released (especially when each cell has at least batch_size items), this diagonal stripe is released to the next step (here, writing to the database).
I implemented an example to showcase this.
When enabling DEBUG mode, the matrix is printed:
2025-10-03 17:06:44,498 - DEBUG - etl_lib.core.SplittingBatchProcessor.SplittingBatchProcessor - [prefetcher] - buffer matrix:
+-----+---------+---------+---------+--------+-------+--------+---------+--------+---------+---------+
| | c00 | c01 | c02 | c03 | c04 | c05 | c06 | c07 | c08 | c09 |
|-----+---------+---------+---------+--------+-------+--------+---------+--------+---------+---------|
| r00 | 13849 | 12486 | 4432 | 10059 | 6857 | 256 | 12957 | 6340 | 8184 | [11044] |
| r01 | [13992] | 4001 | 5114 | 5475 | 0 | 339 | 9096 | 4723 | 5873 | 914 |
| r02 | 11076 | [15054] | 2666 | 14738 | 245 | 0 | 9758 | 7168 | 9989 | 7092 |
| r03 | 11261 | 8105 | [12767] | 3242 | 2830 | 103 | 2940 | 3596 | 6417 | 5535 |
| r04 | 7970 | 3685 | 302 | [6449] | 3749 | 949 | 245 | 0 | 2731 | 1912 |
| r05 | 142 | 488 | 161 | 129 | [951] | 492 | 571 | 42 | 0 | 163 |
| r06 | 11153 | 8887 | 5045 | 7217 | 491 | [1380] | 2823 | 6352 | 309 | 0 |
| r07 | 377 | 4197 | 3559 | 4306 | 700 | 152 | [11794] | 2355 | 1929 | 380 |
| r08 | 10475 | 4772 | 6786 | 11268 | 4587 | 403 | 3790 | [5371] | 8642 | 9080 |
| r09 | 5053 | 394 | 0 | 6540 | 2295 | 521 | 782 | 637 | [10871] | 1796 |
+-----+---------+---------+---------+--------+-------+--------+---------+--------+---------+---------+You can see the cells with their current number of items. The cells with the [1234] are the ones released at that moment.
To use it, you only need to provide the Cypher to write the batches. See Mix and Batch in the documentation.
The achievable speed-up depends on many factors, so it is hard to predict. Influencing factors include available CPU cores, network latency, available I/O, and others. Unscientific tests on my local laptop with Neo4j running in Docker, and on a GCP instance for loading the NYC taxi data:
| Environment | Sequential | Parallel |
|---|---|---|
Docker | 35 | 19 |
GCP | 59 | 27 |
All numbers are minutes to run the full import against an empty database (run multiple times and averaged).
There is also another option to process data in parallel using ParallelTaskGroups. This is much simpler but only works if the data written is known not to touch the same parts in the graph.
Testing
Last, but not least, the lib contains pytest fixtures and functions to allow testing of the pipelines. It uses the excellent TestContainers library to run tests with the DB running in a Docker environment.


Comments
With an account on the Fediverse or Mastodon, you can respond to this post.