Neo4j’s reactive driver, when dealing with streaming queries, doesn’t actually "drop" data; instead, it employs a sophisticated mechanism to prevent overwhelming the client application.
Imagine you’ve got a massive graph database, and you’re asking for a huge dataset back, like "all users and their friends, recursively, 10 levels deep." If your application is trying to process this one record at a time, but the database is churning out results much faster than your application can handle, you’re going to run into trouble. The reactive driver’s job is to gracefully manage this mismatch. It doesn’t just blast all the data at you and hope for the best. Instead, it uses something called "backpressure."
Think of it like a garden hose. If you turn the tap on full blast and try to catch the water in a tiny teacup, you’ll make a mess. Backpressure is like the hose automatically sensing that the teacup can’t handle the flow and throttling itself down. In the context of Neo4j, this means the driver signals to the database, "Hey, slow down! I can’t keep up right now." This signal travels all the way back to the Neo4j server, which then pauses sending more results until the driver indicates it’s ready for more.
Let’s see this in action. Suppose we have a simple Neo4j graph and we want to stream a large number of nodes.
import org.neo4j.driver.reactive.RxGraphDatabase;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactiveStreamingExample {
public static void main(String[] args) {
// Assuming you have a running Neo4j instance and the driver is configured
try (var driver = RxGraphDatabase.driver("bolt://localhost:7687", "neo4j", "password")) {
// Create some dummy data for demonstration
driver.rxSession().run("CREATE (:Item {id: 1}), (:Item {id: 2}), (:Item {id: 3})").blockLast();
// A query that might produce a lot of data
Flux<org.neo4j.driver.Record> results = driver.rxSession().run("MATCH (n:Item) RETURN n")
.records();
// Now, let's process these results with a delay to simulate a slow consumer
results
.doOnNext(record -> {
System.out.println("Processing: " + record.get("n").asNode().get("id").asInt());
try {
// Simulate a slow processing step
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})
.doOnError(error -> System.err.println("Error: " + error.getMessage()))
.subscribe(
record -> {}, // onNext - already handled in doOnNext
error -> System.err.println("Subscription error: " + error.getMessage()),
() -> System.out.println("Stream finished.")
);
// Keep the main thread alive long enough for the stream to complete
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
In this example, the Thread.sleep(500) inside the doOnNext operator simulates our application taking a significant amount of time to process each Item node. Without backpressure, the Neo4j driver would try to fetch all the Item nodes from the database as quickly as possible, and our slow processing would eventually lead to memory issues or errors as the results queue up.
However, because we’re using Reactor’s Flux (which is built on the reactive streams specification), the doOnNext operator, when it calls Thread.sleep, is effectively telling the upstream publisher (the Neo4j driver) that it’s not ready for more data. Reactor’s Flux implementation, adhering to the reactive streams protocol, will automatically signal a "demand" for fewer elements to the Neo4j driver. The Neo4j driver, in turn, will communicate this reduced demand to the Neo4j server through the Bolt protocol. The server will then pause sending more results until the client signals it has capacity again.
The core of this mechanism lies in the reactive streams Subscriber interface and the Publisher interface. The Subscriber has a request(long n) method that tells the Publisher how many elements it’s ready to receive. When a Subscriber (like our Reactor Flux processing pipeline) cannot keep up, it effectively stops calling request(long n) or requests smaller numbers of elements. The Publisher (the Neo4j driver) respects this and adjusts its fetch rate from the Publisher upstream (the Neo4j server).
The Neo4j reactive driver leverages the Netty transport layer, which is inherently designed for asynchronous I/O and backpressure. When a Flux or Stream (depending on the specific reactive API used) is subscribed to, the driver initiates a query and starts receiving data chunks. It then emits these chunks downstream. If the downstream consumer signals that it’s not ready for more data (e.g., by not requesting more elements or by processing them slowly), the driver will pause its own receiving operations from Netty, and Netty will signal backpressure to the Neo4j server’s Bolt protocol handler. This prevents the driver from buffering an unbounded amount of data.
A common misconception is that backpressure means the database is "dropping" records. It’s not. It’s merely pausing the delivery of those records. The records are still there, waiting to be sent once the client signals it’s ready. The database server maintains the state of the query and the results it has produced up to the point of pausing.
If you’re experiencing high memory usage or errors that look like data loss with streaming queries, it’s almost always because the backpressure mechanism isn’t being fully utilized or is being bypassed. This can happen if you’re not using a truly reactive stream processing library, or if you’re collecting all results into a blocking collection before processing, or if your processing logic itself isn’t properly integrated into the reactive stream.
One subtle point is how the RxSession.run() method itself returns a Publisher<StatementResult>. When you then call .records() on that StatementResult, you are getting a Flux<Record>. If your processing logic after .records() is slow, the backpressure propagates correctly. However, if you were to, for instance, .collectList() immediately on the Flux<Record>, you’d be asking the driver to fetch all records into memory before you even start processing, defeating the purpose of streaming and backpressure.
The next logical step in understanding reactive Neo4j is exploring how to handle transactional integrity within a reactive stream, especially when multiple operations are involved.