1. Objective:
The objective of this tutorial is to understand what is Flux and Mono in Spring Webflux and also to understand the various ways to create Flux and Mono.
2. Background:
Reactive Programming
Reactor is a Java library built around the Reactive Streams specification, bringing the paradigm of Reactive Programming on the JVM. More details about the basics of Reactive Programming and Project Reactor is covered in my other blog.
Spring WebFlux
Spring WebFlux is the alternative to the Spring MVC module. Spring WebFlux is used to create fully asynchronous and non-blocking application built on the event-loop execution model.
If you are looking to develop a web application or Rest web service on the non-blocking reactive model, then you can look into Spring WebFlux.
Spring WebFlux is supported on Tomcat, Jetty, Servlet 3.1+ containers, as well as on non-Servlet runtimes such as Netty and Undertow.
Spring WebFlux is built on Project Reactor. Project Reactor is the implementation of Reactive Streams specification. Reactor provides two types:
Mono: implements Publisher and returns 0 or 1 elements
Mono<String> mono = Mono.just("Alex");
Mono<String> mono = Mono.empty();
Flux: implements Publisher and returns 0...N elements.
Flux<String> flux = Flux.just("A", "B", "C");
Flux<String> flux = Flux.fromArray(new String[]{"A", "B", "C"});
Flux<String> flux = Flux.fromIterable(Arrays.asList("A", "B", "C");
//To subscribe call method
flux.subscribe();
3. Overview of Flux:
A Flux<T> is a Reactive Streams Publisher, augmented with a lot of operators that can be used to generate, transform, orchestrate Flux sequences.
It can emit 0 to n <T> elements (onNext event) then either completes or errors (onComplete and onError terminal events).
If no terminal event is triggered, the Flux is infinite.
Sample usage of Flux with other operators:
Flux.fromIterable(getSomeLongList())
.delayElements(Duration.ofMillis(100))
.doOnNext(serviceA::someObserver)
.map(d -> d * 2)
.take(3)
.onErrorResumeWith(errorHandler::fallback)
.doAfterTerminate(serviceM::incrementTerminate)
.subscribe(System.out::println);
4. Most common ways to create a Flux:
There are several ways to create a Flux. The below code snippet presents some of the most common:
1. Creating Flux using just():
Flux<String> fooBarFluxFromStringValues() {
return Flux.just("foo", "bar");
}
Flux<Integer> fooBarFluxFromIntegerValues() {
return Flux.just("1, 2, 3");
}
2. Creating Flux from List or Iterable:
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
3. Create a Flux from range:
Flux<Integer> fooFromRange() {
return Flux.range(1, 5));
}
4. Create a Flux based on time:
Creates a Flux that emits the sequence 0 through 9, and each emission should be spaced out by 100 ms. Here we’ll use the Flux.interval() factory and limit it to only 10 items:
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.take(10);
}
5. Create a Flux from another one or Mono:
Flux<String> fluxFromAnother() {
return Flux.from(fluxFromList);
}
6. Creating a Flux from Error:
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
5. Overview of Mono
A Mono<T> is a Reactive Streams Publisher, also augmented with a lot of operators that can be used to generate, transform, orchestrate Mono sequences.
It is a specialization of Flux that can emit at most 1 <T> element: a Mono is either valued (complete with element), empty (complete without element) or failed (error).
A Mono<Void> can be used in cases where only the completion signal is interesting (the Reactive Streams equivalent of a Runnable task completing).
Like for Flux, the operators can be used to define an asynchronous pipeline which will be materialized anew for each Subscription.
Sample usage of Flux with other operators:
Mono.just(1)
.map(integer -> "foo" + integer)
.or(Mono.delay(Duration.ofMillis(100)))
.subscribe(System.out::println);
6. Most common ways to create a Mono
Methods are different for a Mono, except the just(T... data) method. The below code snippet presents some of the most common:
1. Creating Mono using just():
Mono<String> fooMono() {
return Mono.just("foo");
}
2. Creating an empty Mono :
Mono<String> emptyMono() {
return Mono.empty();
}
3. Creating a Mono fromCallable() and Java 8 method reference :
Mono<String> fromCallable() {
return Mono.fromCallable(() -> "Hello World!");
}
Using Java 8 method reference :: operator
Mono<String> fromCallableMethodReference() {
return Mono.fromCallable(UserService::fetchAnyUser);
}
4. Creating a Mono from Supplier:
Mono<Double> fromSupplier() {
Random rnd = new Random();
return Mono.fromSupplier(rnd::nextDouble);
}
5. Creating a Mono from a Future :
Mono<String> monoFromFuture() {
CompletableFuture<String> helloWorldFuture = api.getHelloWorldAsync();
return Mono.fromFuture(helloWorldFuture);
}
6. Creating a Mono from another one:
Mono<Double> fromAnother() {
return Mono.from(monoFromSupplier);
}
Mono<Integer> fromAnotherFlux() {
return Mono.from(Flux.range(1,10));
}
As you may have noticed, being a 0–1 element stream, Mono is a perfect fit to create reactive streams from Futures, Suppliers or even Runnable as Java methods return at most 1 element.
7. Creating a Mono from Error:
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
7. Important points to note
The following are the very important points to note with regard to usage of Mono and Flux.
1. Nothing happens until you subscribe to a Flux or Mono
2. Avoid using synchronous blocking code - block() (like Mono.block() or Flux.block()), blockFirst(), blockLast(), blockLast(Duration timeout), Flux.toStream() and Flux.toIterable()
8. Conclusion:
In this tutorial, we covered about what is Flux and Mono in Spring Webflux and also various ways to create Flux and Mono. In the next tutorial, we will go through various operators available.
No comments:
Post a Comment