Project Reactor lets you build reactive Java applications, but the real magic isn’t just asynchronous programming; it’s about declaratively managing streams of events over time, allowing your application to be more resilient and responsive under load.
Imagine a web server handling incoming HTTP requests. In a traditional, blocking model, each request might tie up a thread waiting for a database query or an external API call. If you have 100 concurrent requests, you might need 100 threads, which quickly consumes memory and CPU.
// Traditional blocking approach
public class BlockingController {
@GetMapping("/user/{id}")
public User getUser(@PathVariable Long id) {
// This call blocks until the database returns data
User user = database.findUserById(id);
// This call blocks until the external API returns data
Address address = addressService.getAddressForUser(user.getId());
user.setAddress(address);
return user;
}
}
With Project Reactor, you shift to a non-blocking, event-driven paradigm. Instead of threads waiting, you have event loops that efficiently manage many concurrent operations. When an operation is initiated (like a database query), the thread is freed up to do other work. When the operation completes, a callback is triggered, and Reactor handles the rest of the processing.
// Reactive approach with Project Reactor (using Spring WebFlux)
import reactor.core.publisher.Mono;
@RestController
public class ReactiveController {
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable Long id) {
// Initiates non-blocking calls and returns a Mono
return database.findUserByIdReactive(id) // Returns Mono<User>
.flatMap(user -> addressService.getAddressForUserReactive(user.getId()) // Returns Mono<Address>
.map(address -> {
user.setAddress(address);
return user;
}));
}
}
In this reactive example, Mono<User> represents a stream that will emit at most one User item. database.findUserByIdReactive(id) doesn’t block the current thread; it starts the query and returns immediately. The thread is then free to handle other requests. When the database operation completes, Reactor will schedule the flatMap operation to execute. Similarly, addressService.getAddressForUserReactive is also non-blocking.
The core building blocks in Reactor are Flux and Mono.
Flux<T>: Represents a stream of 0 to N items. Think of it like an asynchronousList. It can emit multiple items over time.Mono<T>: Represents a stream of 0 or 1 item. Think of it like an asynchronousOptionalor a single result.
These publishers emit events:
onNext(T value): An item from the stream.onComplete(): The stream has finished successfully.onError(Throwable error): An error occurred.
The key to Reactor’s power is its rich set of operators. These are methods you chain onto Flux and Mono instances to transform, filter, combine, and manage the streams of data. For instance, map transforms each item, filter discards items based on a condition, flatMap transforms an item into another asynchronous stream (crucial for chaining operations), and zip combines multiple streams.
Consider this scenario: you need to fetch user details, their orders, and their preferences, and then combine them into a single response.
Mono<User> userMono = userRepository.findById(userId);
Flux<Order> ordersFlux = orderRepository.findByUserId(userId);
Mono<Preferences> preferencesMono = preferencesRepository.getPreferences(userId);
Mono<UserDetailsDTO> userDetails = Mono.zip(
userMono,
ordersFlux.collectList(), // Collect all orders into a List
preferencesMono
).map(tuple -> {
User user = tuple.getT1();
List<Order> orders = tuple.getT2();
Preferences preferences = tuple.getT3();
return new UserDetailsDTO(user, orders, preferences);
});
// Then subscribe to userDetails to get the final DTO
userDetails.subscribe(
dto -> System.out.println("User Details: " + dto),
error -> System.err.println("Error fetching details: " + error)
);
Here, Mono.zip takes multiple publishers and waits for all of them to emit their single item (or complete, in the case of ordersFlux after collectList). It then bundles these results into a Tuple (e.g., Tuples.of(user, ordersList, preferences)), which we then map into our UserDetailsDTO.
A common pattern you’ll encounter is the need to handle potential nulls or missing data gracefully. Reactor’s defaultIfEmpty operator is perfect for this. If a Mono or Flux completes without emitting any items, defaultIfEmpty will emit a specified default value instead.
For example, if a user has no past orders, orderRepository.findByUserId(userId) might return an empty Flux. If you then call collectList() on this empty Flux, you get an empty List. However, if you wanted to ensure your UserDetailsDTO always has some list of orders, even if empty, you could do this:
Mono<User> userMono = userRepository.findById(userId);
Flux<Order> ordersFlux = orderRepository.findByUserId(userId)
.defaultIfEmpty(new Order("No orders found")); // Provide a dummy order if none exist
Mono<Preferences> preferencesMono = preferencesRepository.getPreferences(userId);
Mono<UserDetailsDTO> userDetails = Mono.zip(
userMono,
ordersFlux.collectList(),
preferencesMono
).map(tuple -> {
User user = tuple.getT1();
List<Order> orders = tuple.getT2();
Preferences preferences = tuple.getT3();
return new UserDetailsDTO(user, orders, preferences);
});
This ensures that tuple.getT2() will never be an empty list if the user exists; it will contain at least one element, even if it’s a placeholder.
The power of reactive programming with Reactor lies in its ability to handle backpressure. When a producer (e.g., a fast API) generates data faster than a consumer can process it, backpressure signals the producer to slow down. Reactor’s publishers and subscribers inherently support this flow control, preventing your application from being overwhelmed. This is managed through the request signals that flow from subscriber to publisher.
When you start writing reactive code, you’ll often hit a point where you have a Mono or Flux but don’t know what to do with it. The most fundamental operation is subscribe(). This is where you "activate" the stream and provide callbacks for onNext, onError, and onComplete. However, subscribe() is also where you might see the next common error if you’re not careful: java.lang.IllegalStateException: block is not allowed on a non-blocking thread. This happens when you try to use a blocking method (like Thread.sleep() or some legacy JDBC calls) within a reactive pipeline, defeating the purpose of non-blocking I/O.