Implementing a non-blocking cross-service communication with WebClient[Tutorial]

9 min read

The  WebClient is the reactive replacement for the old RestTemplate.  However, in WebClient, we have a functional API that fits better with the reactive approach and offers built-in mapping to Project Reactor types such as Flux or Mono.

This article is an excerpt taken from the book Hands-On Reactive Programming in Spring 5 written by Oleh Dokuka and Igor Lozynskyi. This book covers the difference between a reactive system and reactive programming, the basics of reactive programming in Spring 5 and much more.

In this article, you will understand the basics of non-blocking cross-service communication with WebClient, reactive WebSocket API, server-side WebSocket API, and much more.

WebClient.create("http://localhost/api")                           // (1)
         .get()                                                    // (2)
         .uri("/users/{id}", userId)                               // (3)
         .retrieve()                                               // (4)
         .bodyToMono(User.class)                                   // (5)
         .map(...)                                                 // (6)
         .subscribe();                                             //

In the preceding example, we create a WebClient instance using a factory method called create, shown at point 1. Here, the create method allows us to specify the base URI, which is used internally for all future HTTP calls. Then, in order to start building a call to a remote server, we may execute one of the WebClient methods that sounds like an HTTP method. In the previous example, we used WebClient#get, shown at point (2). Once we call the WebClient#get method, we operate on the request builder instance and can specify the relative path in the uri method, shown at point (3). In addition to the relative path, we can specify headers, cookies, and a request body. However, for simplicity, we have omitted those settings in this case and moved on to composing the request by calling the retrieve or exchange methods. In this example, we use the retrieve method, shown at point (4). This option is useful when we are only interested in retrieving the body and performing further processing. Once the request is set up, we may use one of the methods that help us with the conversion of the response body. Here, we use the bodyToMono method, which converts the incoming payload of the User to Mono, shown at point (5). Finally, we can build the processing flow of the incoming response using the Reactor API, and execute the remote call by calling the subscribe method.

WebClient follows the behavior described in the Reactive Streams specification. This means that only by calling the subscribe method will WebClient wire the connection and start sending the data to the remote server.

Even though, in most cases, the most common response processing is body processing, there are some cases where we need to process the response status, headers, or cookies. For example, let’s build a call to our password checking service and process the response status in a custom way using the WebClient API:

class DefaultPasswordVerificationService                           // (1)
   implements PasswordVerificationService {                        //
final WebClient webClient; // (2)
//
public DefaultPasswordVerificationService( // 
WebClient.Builder webClientBuilder //
) { //
this.webClient = webClientBuilder // (2.1)
.baseUrl("http://localhost:8080") // 
.build(); //
} //

@Override // (3)
public Mono check(String raw, String encoded) { //
return webClient //
.post() // (3.1)
.uri("/check") //
.body(BodyInserters.fromPublisher( // (3.2)
Mono.just(new PasswordDTO(raw, encoded)), //
PasswordDTO.class //
)) //
.exchange() // (3.3)
.flatMap(response -> { // (3.4)
if (response.statusCode().is2xxSuccessful()) { // (3.5)
return Mono.empty(); //
} //
else if(resposne.statusCode() == EXPECTATION_FAILD) { //
return Mono.error( // (3.6)
new BadCredentialsException(...) //
); //
} //
return Mono.error(new IllegalStateException()); //
}); //
} //
} //

The following numbered list describes the preceding code sample:

  1. This is the implementation of the PasswordVerificationService interface.
  2. This is the initialization of the WebClient instance. It is important to note that we use a WebClient instance per class here, so we do not have to initialize a new one on each execution of the check method. Such a technique reduces the need to initialize a new instance of WebClient and decreases the method’s execution time. However, the default implementation of WebClient uses the Reactor-Netty HttpClient, which in default configurations shares a common pool of resources among all the HttpClient instances. Hence, the creation of a new HttpClient instance does not cost that much. Once the constructor of DefaultPasswordVerificationService is called, we start initializing webClient and use a fluent builder, shown at point (2.1), in order to set up the client.
  3. This is the implementation of the check method. Here, we use the webClient instance in order to execute a post request, shown at point (3.1). In addition, we send the body, using the body method, and prepare to insert it using the BodyInserters#fromPublisher factory method, shown in (3.2). We then execute the exchange method at point (3.3), which returns Mono. We may, therefore, process the response using the flatMap operator, shown in (3.4). If the password is verified successfully, as shown at point (3.5), the check method returns Mono.empty. Alternatively, in the case of an EXPECTATION_FAILED(417) status code, we may return the Mono of BadCredentialsExeception, as shown at point (3.6).

As we can see from the previous example, in a case where it is necessary to process the status code, headers, cookies, and other internals of the common HTTP response, the most appropriate method is the exchange method, which returns ClientResponse.

As mentioned, DefaultWebClient uses the Reactor-Netty HttpClient in order to provide asynchronous and non-blocking interaction with the remote server. However, DefaultWebClient is designed to be able to change the underlying HTTP client easily. For that purpose, there is a low-level reactive abstraction around the HTTP connection, which is called org.springframework.http.client.reactive.ClientHttpConnector. By default, DefaultWebClient is preconfigured to use ReactorClientHttpConnector, which is an implementation of the ClientHttpConnector interface. Starting from Spring WebFlux 5.1, there is a JettyClientHttpConnector implementation, which uses the reactive HttpClient from Jetty. In order to change the underlying HTTP client engine, we may use the WebClient.Builder#clientConnector method and pass the desired instance, which might be either a custom implementation or the existing one.

In addition to the useful abstract layer, ClientHttpConnector may be used in a raw format. For example, it may be used for downloading large files, on-the-fly processing, or just simple byte scanning. We will not go into details about ClientHttpConnector; we will leave this for curious readers to look into themselves.

Reactive WebSocket API

We have now covered most of the new features of the new WebFlux module. However, one of the crucial parts of the modern web is a streaming interaction model, where both the client and server can stream messages to each other. In this section, we will look at one of the most well-known duplex protocols for duplex client-server communication, called WebSocket.

Despite the fact that communication over the WebSocket protocol was introduced in the Spring Framework in early 2013 and designed for asynchronous message sending, the actual implementation still has some blocking operations. For instance, both writing data to I/O or reading data from I/O are still blocking operations and therefore both impact on the application’s performance. Therefore, the WebFlux module has introduced an improved version of the infrastructure for WebSocket.

WebFlux offers both client and server infrastructure. We are going to start by analyzing the server-side WebSocket and will then cover the client-side possibilities.

Server-side WebSocket API

WebFlux offers WebSocketHandler as the central interface for handling WebSocket connections. This interface has a method called handle, which accepts WebSocketSession. The WebSocketSession class represents a successful handshake between the client and server and provides access to information, including information about the handshake, session attributes, and the incoming stream of data. In order to learn how to deal with this information, let’s consider the following example of responding to the sender with echo messages:

class EchoWebSocketHandler implements WebSocketHandler {           // (1)
   @Override                                                       // 
   public Mono handle(WebSocketSession session) {            // (2)
      return session                                               // (3)
         .receive()                                                // (4)
         .map(WebSocketMessage::getPayloadAsText)                  // (5)
         .map(tm -> "Echo: " + tm)                                 // (6)
         .map(session::textMessage)                                // (7)
         .as(session::send);                                       // (8)
    }                                                              //
}

As we can see from the previous example, the new WebSocket API is built on top of the reactive types from Project Reactor. Here, at point (1), we provide an implementation of the WebSocketHandler interface and override the handle method at point (2). Then, we use the WebSocketSession#receive method at point (3) in order to build the processing flow of the incoming WebSocketMessage using the Flux API. WebSocketMessage is a wrapper around DataBuffer and provides additional functionalities, such as translating the payload represented in bytes to text in point (5). Once the incoming message is extracted, we prepend to that text the "Echo: " suffix shown at point (6), wrap the new text message in the WebSocketMessage, and send it back to the client using the WebSocketSession#send method. Here, the send method accepts Publisher and returns Mono as the result. Therefore, using the as operator from the Reactor API, we may treat Flux as Mono and use session::send as a transformation function.

Apart from the WebSocketHandler interface implementation, setting up the server-side WebSocket API requires configuring additional HandlerMapping and WebSocketHandlerAdapter instances. Consider the following code as an example of such a configuration:

@Configuration                                                     // (1)
public class WebSocketConfiguration {                              //
@Bean // (2)
public HandlerMapping handlerMapping() { //
SimpleUrlHandlerMapping mapping = //
new SimpleUrlHandlerMapping(); // (2.1)
mapping.setUrlMap(Collections.singletonMap( // (2.2)
"/ws/echo", //
new EchoWebSocketHandler() // 
)); //
mapping.setOrder(-1); // (2.3)
return mapping; //
} //

@Bean // (3)
public HandlerAdapter handlerAdapter() { //
return new WebSocketHandlerAdapter(); //
} //
}

The preceding example can be described as follows:

  1. This is the class that is annotated with @Configuration.
  2. Here, we have the declaration and setup of the HandlerMapping bean. At point (2.1), we create SimpleUrlHandlerMapping, which allows setup path-based mapping, shown at point (2.2), to WebSocketHandler. In order to allow SimpleUrlHandlerMapping to be handled prior to other HandlerMapping instances, it should be a higher priority.
  3. This is the declaration of the HandlerAdapter bean, which is WebSocketHandlerAdapter. Here, WebSocketHandlerAdapter plays the most important role, since it upgrades the HTTP connection to the WebSocket one and then calls the WebSocketHandler#handle method.

Client-side WebSocket API

Unlike the WebSocket module (which is based on WebMVC), WebFlux provides us with client-side support too. In order to send a WebSocket connection request, we have the WebSocketClient class. WebSocketClient has two central methods to execute WebSocket connections, as shown in the following code sample:

public interface WebSocketClient {
   Mono execute(
      URI url,
      WebSocketHandler handler
   );
   Mono execute(
      URI url,
      HttpHeaders headers, 
      WebSocketHandler handler
   );
}

As we can see, WebSocketClient uses the same WebSockeHandler interface in order to process messages from the server and send messages back. There are a few WebSocketClient implementations that are related to the server engine, such as the TomcatWebSocketClient implementation or the JettyWebSocketClient implementation. In the following example, we will look at ReactorNettyWebSocketClient:

WebSocketClient client = new ReactorNettyWebSocketClient();

client.execute(
   URI.create("http://localhost:8080/ws/echo"),
   session -> Flux
      .interval(Duration.ofMillis(100))
      .map(String::valueOf)
      .map(session::textMessage)
      .as(session::send)
);

The preceding example shows how we can use ReactorNettyWebSocketClient to wire a WebSocket connection and start sending periodic messages to the server.

To summarize, we learned the basics of non-blocking cross-service communication with WebClient, reactive WebSocket API, server-side WebSocket API, and much more.

To know more about the reactive system and reactive programming, check out the book, Hands-On Reactive Programming in Spring 5 written by Oleh Dokuka and Igor Lozynskyi. 

Read Next

Getting started with React Hooks by building a counter with useState and useEffect

Implementing Dependency Injection in Swift [Tutorial]

Reactive programming in Swift with RxSwift and RxCocoa [Tutorial]

Share this post

Popular