9 min read

To implement a use-case, we need to use a well-known Spring module, Spring Web and Spring Web MVC. Our application will not use the new features of Spring 5, so it will run similarly on Spring Framework 4.x.

This article is an excerpt taken from the book Hands-On Reactive Programming in Spring 5 by Oleh Dokuka and Igor Lozynskyi. This book covers the difference between a reactive system and reactive programming, the basics of reactive programming in Spring 5 and much more.

In this article, you will learn how to bootstrap a Spring application, implement business logic, and much more.

To bootstrap our application, we may configure and download a Gradle project from the Spring Initializer website at start.spring.io. For now, we need to select the preferred Spring Boot version and dependency for the web (the actual dependency identifier in Gradle config will be org.springframework.boot:spring-boot-starter-web), as shown in the following screenshot:

Diagram 2.4 Web-based Spring Initializer simplifies the bootstrapping of a new Spring Boot application

Alternatively, we may generate a new Spring Boot project using cURL and the HTTP API of the Spring Boot Initializer site. The following command will effectively create and download the same empty project with all the desired dependencies:

curl https://start.spring.io/starter.zip \
  -d dependencies=web,actuator \
  -d type=gradle-project \
  -d bootVersion=2.0.2.RELEASE \
  -d groupId=com.example.rpws.chapters \
  -d artifactId=SpringBootAwesome \
  -o SpringBootAwesome.zip

Implementing business logic

We may now outline the design of our system in the following diagram:

Diagram 2.5 Events flow from a temperature sensor to a user

In this use case, the domain model will consist only of the Temperature class with the only double value inside. For simplicity purposes, it is also used as an event object, as shown in the following code:

final class Temperature {
   private final double value;
   // constructor & getter...
}

To simulate the sensor, let’s implement the TemperatureSensor class and decorate it with a @Component annotation to register the Spring bean, as follows:

@Component
public class TemperatureSensor {
   private final ApplicationEventPublisher publisher;              // (1)
   private final Random rnd = new Random();                        // (2)
   private final ScheduledExecutorService executor =               // (3)
           Executors.newSingleThreadScheduledExecutor();
public TemperatureSensor(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}

@PostConstruct
public void startProcessing() { // (4)
this.executor.schedule(this::probe, 1, SECONDS);
}

private void probe() { // (5)
double temperature = 16 + rnd.nextGaussian() * 10;
publisher.publishEvent(new Temperature(temperature));

// schedule the next read after some random delay (0-5 seconds)
executor
.schedule(this::probe, rnd.nextInt(5000), MILLISECONDS); // (5.1)
}
}

So, our simulated temperature sensor only depends on the ApplicationEventPublisher class (1), provided by Spring Framework. This class makes it possible to publish events to the system. It is a requirement to have a random generator (2) to contrive temperatures with some random intervals. An event generation process happens in a separate ScheduledExecutorService (3), where each event’s generation schedules the next round of an event’s generation with a random delay (5.1). All that logic is defined in the probe() method (5).  In turn, the mentioned class has the startProcessing() method annotated with @PostConstruct (4), which is called by Spring Framework when the bean is ready and triggers the whole sequence of random temperature values.

Asynchronous HTTP with Spring Web MVC

The introduced in Servlet 3.0 asynchronous support expands the ability to process an HTTP request in non-container threads. Such a feature is pretty useful for long-running tasks. With those changes, in Spring Web MVC we can return not only a value of type T in @Controller but also a Callable<T> or a DeferredResult<T>. The Callable<T> may be run inside a non-container thread, but still, it would be a blocking call. In contrast, DeferredResult<T> allows an asynchronous response generation on a non-container thread by calling the setResult(T result) method so it could be used within the event-loop.

Starting from version 4.2, Spring Web MVC makes it possible to return ResponseBodyEmitter, which behaves similarly to DeferredResult, but can be used to send multiple objects, where each object is written separately with an instance of a message converter (defined by the HttpMessageConverter interface).

The SseEmitter extends ResponseBodyEmitter and makes it possible to send many outgoing messages for one incoming request in accordance with SSE’s protocol requirements. Alongside ResponseBodyEmitter and SseEmitter, Spring Web MVC also respects the StreamingResponseBody interface. When returned from @Controller, it allows us to send raw data (payload bytes) asynchronously. StreamingResponseBody may be very handy for streaming large files without blocking Servlet threads.

Exposing the SSE (Server Sent Events) endpoint

The next step requires adding the TemperatureController class with the @RestController annotation, which means that the component is used for HTTP communication, as shown in the following code:

@RestController
public class TemperatureController {
   private final Set<SseEmitter> clients =                          // (1)
      new CopyOnWriteArraySet<>();
@RequestMapping(
value = "/temperature-stream", // (2)
method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) { // (3)
SseEmitter emitter = new SseEmitter(); // (4)
clients.add(emitter); // (5)

// Remove emitter from clients on error or disconnect
emitter.onTimeout(() -> clients.remove(emitter)); // (6)
emitter.onCompletion(() -> clients.remove(emitter)); // (7)

return emitter; // (8)
}
@Async // (9)
@EventListener // (10)
public void handleMessage(Temperature temperature) { // (11)
List<SseEmitter> deadEmitters = new ArrayList<>(); // (12)
clients.forEach(emitter -> { 
try {
emitter.send(temperature, MediaType.APPLICATION_JSON); // (13)
} catch (Exception ignore) {
deadEmitters.add(emitter); // (14)
}
});
clients.removeAll(deadEmitters); // (15)
}
}

Now, to understand the logic of the TemperatureController class, we need to describe the SseEmitter. Spring Web MVC provides that class with the sole purpose of sending SSE events. When a request-handling method returns the SseEmitter instance, the actual request processing continues until SseEnitter.complete(), an error, or a timeout occurs.

The TemperatureController provides one request handler (3) for the URI /temperature-stream (2) and returns the SseEmitter (8). In the case when a client requests that URI, we create and return the new SseEmitter instance (4) with its previous registration in the list of the active clients (5). Furthermore, the SseEmitter constructor may consume the timeout parameter.

For the clients‘ collection, we may use the CopyOnWriteArraySet class from the java.util.concurrent package (1). Such an implementation allows us to modify the list and iterate over it at the same time. When a web client opens a new SSE session, we add a new emitter to the clients‘ collection. The SseEmitter removes itself from the clients‘ list when it has finished processing or has reached timeout (6) (7).

Now, having a communication channel with clients means that we need to be able to receive events about temperature changes. For that purpose, our class has a handleMessage() method (11). It is decorated with the @EventListener annotation (10) in order to receive events from Spring. This framework will invoke the handleMessage() method only when receiving Temperature events, as this type of method’s argument is known as temperature. The @Async annotation (9) marks a method as a candidate for the asynchronous execution, so it is invoked in the manually configured thread pool. The handleMessage() method receives a new temperature event and asynchronously sends it to all clients in JSON format in parallel for each event (13). Also, when sending to individual emitters, we track all failing ones (14) and remove them from the list of the active clients (15). Such an approach makes it possible to spot clients that are not operational anymore. Unfortunately, SseEmitter does not provide any callback for handling errors, and can be done by handling errors thrown by the send() method only.

Configuring asynchronous support

To run everything, we need an entry point for our application with the following customized methods:

@EnableAsync                                                         // (1)
@SpringBootApplication                                               // (2)
public class Application implements AsyncConfigurer {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Override
public Executor getAsyncExecutor() { // (3)
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// (4)
executor.setCorePoolSize(2);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(5); // (5) 
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
return new SimpleAsyncUncaughtExceptionHandler(); // (6)
}
}

As we can see, the example is a Spring Boot application (2), with an asynchronous execution enabled by the @EnableAsync annotation (1). Here, we may configure an exception handler for exceptions thrown from the asynchronous execution (6). That is also where we prepare Executor for asynchronous processing. In our case, we use ThreadPoolTaskExecutor with two core threads that may be increased to up to one hundred threads. It is important to note that without a properly configured queue capacity (5), the thread pool is not able to grow. That is because the SynchronousQueue would be used instead, limiting concurrency.

Building a UI with SSE support

The last thing that we need in order to complete our use case is an HTML page with some JavaScript code to communicate with the server. For the sake of conciseness, we will strip all HTML tags and leave only the minimum that is required to achieve a result, as follows:

<body>
<ul id="events"></ul>
<script type="application/javascript">
function add(message) {
    const el = document.createElement("li");
    el.innerHTML = message;
    document.getElementById("events").appendChild(el);
}
var eventSource = new EventSource("/temperature-stream"); // (1)
eventSource.onmessage = e => { // (2)
const t = JSON.parse(e.data);
const fixed = Number(t.value).toFixed(2);
add('Temperature: ' + fixed + ' C');
}
eventSource.onopen = e => add('Connection opened'); // (3)
eventSource.onerror = e => add('Connection closed'); //
</script>
</body>

Here, we are using the EventSource object pointed at /temperature-stream (1). This handles incoming messages by invoking the onmessage() function (2), error handling, and reaction to the stream opening, which are done in the same fashion (3). We should save this page as index.html and put it in the src/main/resources/static/ folder of our project. By default, Spring Web MVC serves the content of the folder through HTTP. Such behavior could be changed by providing a configuration that extends the WebMvcConfigurerAdapter class.

Verifying application functionality

After rebuilding and completing our application’s startup, we should be able to access the mentioned web page in a browser at the following address: http://localhost:8080 (Spring Web MVC uses port 8080 for the web server as the default one. However, this can be changed in the application.properties file using the configuration line server.port=9090). After a few seconds, we may see the following output:

Connection opened
Temperature: 14.71 C
Temperature: 9.67 C
Temperature: 19.02 C
Connection closed
Connection opened
Temperature: 18.01 C
Temperature: 16.17 C

As we can see, our web page reactively receives events, preserving both client and server resources. It also supports auto-reconnect in the case of network issues or timeouts. As the current solution is not exclusive to JavaScript, we may connect with other clients for example, curl. By running the next command in a terminal, we receive the following stream of raw, but not formatted, events:

> curl http://localhost:8080/temperature-stream
data:{"value":22.33210856124129}
data:{"value":13.83133638119636}

In this article, we learned how to bootstrap a Spring application, implement business logic, and much more. To know more about the difference between a reactive system and reactive programming, check out the book Hands-On Reactive Programming in Spring 5 by Oleh Dokuka and Igor Lozynskyi.

Read Next

Netflix adopts Spring Boot as its core Java framework

Implementing Dependency Injection in Spring [Tutorial]

How to recover deleted data from an Android device [Tutorial]