1. Overview
In this tutorial, we’ll deep dive into Java reactive programming to solve an interesting problem of how to read Flux<DataBuffer>
into a single InputStream
.
2. Request Setup
As a first step to solving the problem of reading Flux<DataBuffer>
into a single InputStream
, we’ll use the Spring reactive WebClient for making a GET
request. Further, we can use one of the public API endpoints hosted by gorest.co.in for such testing scenarios:
|
|
Next, let’s define the getWebClient()
method for getting a new instance of the WebClient
class:
At this point, we’re ready to make a GET
request to the /public/v2/users
endpoint. However, we must get the response body as a Flux<DataBuffer>
object. So, let’s move on to the next section about BodyExtractors
to do precisely this.
3. BodyExtractors
and DataBufferUtils
We can use the toDataBuffers()
method of the BodyExtractors
class available in spring-webflux to extract the response body into Flux<DataBuffer>
.
Let’s go ahead and create body
as an instance of Flux<DataBuffer>
type:
Next, as we require to collect these streams of DataBuffer
into a single InputStream
, a good strategy to achieve this is by using PipedInputStream andPipedOutputStream.
Further, we intend to write to the PipedOutputStream
and eventually read from the PipedInputStream
. So, let’s see how we can create these two connected streams:
We must note that the default size is 1024
bytes. However, we expect that the collected result from the Flux<DataBuffer>
could exceed the default value. Therefore, we need to explicitly specify a larger value for the size, which in this case is 1024*10
.
Finally, we use the write()
utility method available in the DataBufferUtils
class for writing body
as a publisher to outputStream
:
|
|
We must note that we connected inputStream
to outputStream
at the time of declaration. So, we’re good to read from inputStream
. Let’s move on to the next section to see this in action.
4. Reading From the PipedInputStream
First, let’s defined a helper method, readContent()
, to read an InputStream
as a String
object:
|
|
Next, because it’s a typical practice to read the PipedInputStream
in a different thread, let’s create the readContentFromPipedInputStream()
method that internally spawns a new thread to read contents from the PipedInputStream
into a String
object by calling the readContent()
method:
|
|
At this stage, our code is ready to use for a simulation. Let’s see it in action:
As we’re dealing with an asynchronous system, we’re delaying the read by an arbitrary 3 secs before reading from the stream so that we’re able to see the complete response. Additionally, at the time of logging, we’re inserting a newline character to break the long output to multiple lines.
Finally, let’s verify the output generated by the code execution:
|
|
That’s it! It looks like we’ve got it all right.
5. Conclusion
In this article, we used the concept of piped streams and the utility methods available in the BodyExtractors
and DataBufferUtils
classes to read Flux<DataBuffer>
into a single InputStream
.
As always, the complete source code for the tutorial is available over on GitHub.
Reference https://www.baeldung.com/spring-reactive-read-flux-into-inputstream