In order to update documents in a MongoDB collection, we often use update requests, if the volume of data is too large, it could lead to performance issues and overconsumption of hardware resources.
We will implement a solution to enrich and update efficiently a large amount of data using Spring Data MongoDB Reactive.
Before continuing the reading, if you are not familiar with Spring reactive stack and MongoDB, I suggest you to check the resources section.
1. EIP content enricher
Enterprise Integration Pattern Content Enricher appends information to an existing message from an external source. It uses information inside the incoming message to perform the enrichment operation.
We will implement a simplified version of the EIP:
- Input message : represented by a MongoDB document.
- Enricher : our application.
- Resource : call to a RESTful API.
- Output message : we will keep only the enriched document.
1.1. Integration flow
The application will read the address documents, add the product and save the enriched documents to the MongoDB database.
2. Project setup
2.1. Requirements
- Java 1.8+
- Maven 3+
- Docker Compose
- MongoDB Database Tools
2.2. Generation
We generate the project skeleton from Spring Initializr
.
2.3. Structure
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
.
│ .gitignore
│ docker-compose.yml
│ pom.xml
│ README.adoc
├───data
│ ├───mongodb
│ │ address.ndjson
│ └───product
│ db.json
└───src
├───main
│ ├───java
│ │ └───com
│ │ └───maoudia
│ │ └───tutorial
│ │ Application.java
│ │ AppProperties.java
│ │ CollectionService.java
│ │ NetworkConfig.java
│ └───resources
│ application.yml
└───test
└───java
└───com
└───maoudia
└───tutorial
CollectionServiceTest.java
|
2.4. Containers
Download data
directory to the root of the project.
We use docker-compose
to create the needed containers for this tutorial.
docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
services:
mongodb: #1
container_name: maoudia-mongodb
image: mongo:5.0.8
environment:
- MONGO_INITDB_DATABASE=test
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=password
networks:
- mongodb-network
ports:
- 15015:27017
volumes:
- ./data/mongodb:/data/mongodb
mongo-express: #2
container_name: maoudia-mongo-express
image: mongo-express:0.54.0
depends_on:
- mongodb
networks:
- mongodb-network
environment:
- ME_CONFIG_MONGODB_SERVER=maoudia-mongodb
- ME_CONFIG_MONGODB_ADMINUSERNAME=admin
- ME_CONFIG_MONGODB_ADMINPASSWORD=password
ports:
- 1515:8081
volumes:
- ./data/mongodb:/data/mongodb
product-api: #3
container_name: maoudia-product-api
image: clue/json-server:latest
ports:
- 1519:80
volumes:
- ./data/product/db.json:/data/db.json
networks:
mongodb-network:
driver: bridge
|
#1
: MongoDB initialized with the test
database.
#2
: MongoExpress is a MongoDB administration interface.
#3
: Product API which is configured from db.json
file.
We start up the services:
2.5. Data initialization
We use a JSON document from the French address database.
Address
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
{
"id": "59350",
"type": "municipality",
"name": "Lille",
"postcode": [
"59000",
"59800",
"59260",
"59777",
"59160"
],
"citycode": "59350",
"x": 703219.96,
"y": 7059335.72,
"lon": 3.045433,
"lat": 50.630992,
"population": 234475,
"city": "Lille",
"context": "59, Nord, Hauts-de-France",
"importance": 0.56333
}
|
Import address collection :
1
|
mongoimport --uri "mongodb://admin:password@localhost:15015" --authenticationDatabase=admin --db test --collection address ./data/mongodb/address.ndjson
|
Ou :
We use MongoExpress which is available at http://localhost:1515
.
Product represents a satellite internet offer.
Product
1
2
3
4
5
6
7
|
{
"id": 1,
"available": true,
"company": "SPACEX",
"provider": "STARLINK",
"type": "SATELLITE"
}
|
Product API is available at http://localhost:1519
.
3. Application
3.1. Configuration
We change file extension from application.properties
to application.yml
.
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
app:
buffer-max-size: 500
bulk-size: 100
collection-name: address
enriching-key: product
enriching-uri: http://localhost:1519/products/1
spring:
main:
web-application-type: none
data:
mongodb:
database: test
uri: mongodb://admin:password@localhost:15015
---
spring.config.activate.on-profile: dev
logging:
level:
org.mongodb.driver: debug
---
spring.config.activate.on-profile: test
app:
bulk-size: 2
|
We declare a class which contains application configuration properties.
AppProperties.java
1
2
3
4
5
6
7
8
9
|
@ConfigurationProperties(prefix = "app")
public class AppProperties {
private int bulkSize;
private int bufferMaxSize;
private String collectionName;
private String enrichingKey;
private String enrichingUri;
// Getter and Setter are omitted
}
|
We create a @Bean
of Spring non-blocking HTTP client.
NetworkConfig.java
1
2
3
4
5
6
7
8
9
|
@Configuration
public class NetworkConfig {
@Bean
public WebClient client() {
return WebClient.create();
}
}
|
3.2. Implementation
We create a @Service
which contains application business logic.
CollectionService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Service
public class CollectionService {
private final AppProperties properties;
private final ReactiveMongoTemplate template;
private final WebClient client;
public CollectionService(AppProperties properties,
ReactiveMongoTemplate template,
WebClient client) {
this.properties = properties;
this.template = template;
this.client = client;
}
public Flux<BulkWriteResult> enrichAll(String collectionName, String enrichingKey, String enrichingUri) {
return template.findAll(Document.class, collectionName) // #1
.onBackpressureBuffer(properties.getBufferMaxSize()) //#2
.flatMap(document -> enrich(document, enrichingKey, enrichingUri)) //#3
.map(CollectionService::toReplaceOneModel) //#4
.window(properties.getBulkSize()) //#5
.flatMap(replaceOneModelFlux -> bulkWrite(replaceOneModelFlux, collectionName)); // #6
}
}
|
#1
: Creates a stream of documents from the collection.
#2
:Limits the maximum number of loaded documents in the RAM in case of consumption process is slower than production. If the maximum buffer size is exceeded, an IllegalStateException
is thrown.
#3
:Enriches document asynchronously with the external one.
#4
:Creates a ReplaceOneModel
from document.
#5
:Group documents into streams of fixed size. The last stream can be smaller.
#6
:Calls bulk write function.
Configuration property app.bulk-size can be adjusted according to the project needs and available hardware resources. The larger the value of the maximum size, the higher the memory consumption and the size of the requests.
We create document enrichment functions.
CollectionService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private Publisher<Document> enrich(Document document, String enrichingKey, String enrichingUri) { // #1
return getEnrichingDocument(enrichingUri)
.map(enrichingDocument -> {
document.put(enrichingKey, enrichingDocument);
document.put("updatedAt", new Date());
return document;
});
}
private Mono<Document> getEnrichingDocument(String enrichingUri) { //#2
return client.get()
.uri(URI.create(enrichingUri))
.retrieve()
.bodyToMono(Document.class);
}
|
#1
: Adds the retrieved document from HTTP call to root of document to be enriched with the key passed in parameter.
#2
: Retrieves a document from an URI.
MongoDB converts and stores dates in UTC by default.
CollectionService.java
1
2
3
4
5
6
7
8
|
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions(); //#1
private static ReplaceOneModel<Document> toReplaceOneModel (Document document) {
return new ReplaceOneModel<>(
Filters.eq("_id", document.get("_id")), //#1
document, // #3
REPLACE_OPTIONS
);
}
|
#1
: Instantiates default replacement configuration.
#2
: Filter that allows matching by document identifier.
#3
: Content to be replaced, represents the complete enriched document.
CollectionService.java
1
2
3
4
5
6
|
private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions().ordered(false); // #1
private Flux<BulkWriteResult> bulkWrite(Flux<ReplaceOneModel<Document>> updateOneModelFlux, String collectionName) {
return updateOneModelFlux.collectList() // #2
.flatMapMany(unused -> template.getCollection(collectionName) //#3
.flatMapMany(collection -> collection.bulkWrite(updateOneModels, BULK_WRITE_OPTIONS))); // #4
}
|
#1
: Instantiates writing options with disabling operations order.
#2
: Collects the stream into a list.
#3
: Retrieves the collection passed as a parameter.
#4
: Bulk writes documents into MongoDB collection.
Transactions are supported on Replicaset since MongoDB 4.2. If transactions are enabled, we can use @Transactional
or TransactionalOperator
to make a method transactional.
We implement the following interfaces:
CommandLineRunner
: runs enrichment command at application startup.
ExitCodeGenerator
: manages application system exit code.
Application.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
@SpringBootApplication(exclude = MongoReactiveRepositoriesAutoConfiguration.class) // #1
@ConfigurationPropertiesScan("com.maoudia.tutorial") // #2
public class Application implements CommandLineRunner, ExitCodeGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
private final AppProperties properties;
private final CollectionService service;
private int exitCode = 255;
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
}
public Application(AppProperties properties, CollectionService service) {
this.properties = properties;
this.service = service;
}
@Override
public void run(final String... args) {
service.enrichAll(properties.getCollectionName(), properties.getEnrichingKey(), properties.getEnrichingUri())
.doOnSubscribe(unused -> LOGGER.info("------------------< Staring Collection Enriching Command >-------------------")) //#3
.doOnNext(bulkWriteResult -> LOGGER.info("Bulk write result with {} modified document(s)", bulkWriteResult.getModifiedCount()))
.doOnError(throwable -> {
exitCode = 1;
LOGGER.error("Collection enriching failed due to : {}", throwable.getMessage(), throwable);
})
.doOnComplete(() -> exitCode = 0)
.doOnTerminate(() -> LOGGER.info("------------------< Collection Enriching Command Finished >------------------"))
.blockLast(); // #4
}
@Override
public int getExitCode() {
return exitCode;
}
}
|
#1
: Disables auto-configuration of repositories, as we use MongoReactiveTemplate
only.
#2
: Allows scanning and detecting beans that carry the @ConfigProperties
annotation.
#3
: Subscribing to stream triggers the processing.
#4
: Without a running web server, we have to subscribe indefinitely to the Publisher
in order to trigger and wait until the end of the execution.
3.3. Demo
We launch the application :
Output :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
...
2022-06-10 00:36:45.152 INFO 7036 --- [ main] com.maoudia.tutorial.Application : Started Application in 2.755 seconds (JVM running for 3.251)
2022-06-10 00:36:45.227 INFO 7036 --- [ main] com.maoudia.tutorial.Application : ------------------< Staring Collection Enriching Command >-------------------
2022-06-10 00:36:45.297 INFO 7036 --- [ main] org.mongodb.driver.cluster : No server chosen by com.mongodb.reactivestreams.client.internal.ClientSessionHelper$$Lambda$543/543409470@4647881c from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, serverDescriptions=[ServerDescription{address=localhost:15015, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
2022-06-10 00:36:46.527 INFO 7036 --- [localhost:15015] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:39}] to localhost:15015
2022-06-10 00:36:46.527 INFO 7036 --- [localhost:15015] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:40}] to localhost:15015
2022-06-10 00:36:46.527 INFO 7036 --- [localhost:15015] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:15015, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=13, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=61576400}
2022-06-10 00:36:46.692 INFO 7036 --- [ntLoopGroup-2-3] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:41}] to localhost:15015
2022-06-10 00:36:48.355 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:48.482 INFO 7036 --- [ntLoopGroup-2-4] org.mongodb.driver.connection : Opened connection [connectionId{localValue:4, serverValue:42}] to localhost:15015
2022-06-10 00:36:48.562 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:48.742 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:48.982 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:49.222 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:49.488 INFO 7036 --- [ntLoopGroup-2-4] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:49.701 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:49.852 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:50.031 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:50.105 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : Bulk write result with 100 modified document(s)
2022-06-10 00:36:50.106 INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application : ------------------< Collection Enriching Command Finished >------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.315 s
[INFO] Finished at: 2022-06-10T00:36:54+02:00
[INFO] ------------------------------------------------------------------------
Process finished with exit code 0
|
3.4. VisuelVM report
VisualVM is a lightweight profiling tool. It is used to have an overview of the threads which are launched by the application.
There are two groups of threads that execute operations in parallel, each group forms an event loop.
- MongoDB requests are executed by
nioEventLoopGroup
.
- HTTP requests are executed by
reactor-http-nio
.
4. Integration tests
We use JUnit 5 and the Testcontainers MongoDB module for the integration tests. It allows to have a feedback close to the real behaviour of the application which essentially do read/write operations.
To keep this tutorial short, we will only write one test.
CollectionServiceTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
@Profile("test")
@SpringBootTest
@Testcontainers // #1
class CollectionServiceTest {
@Container
private static final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:5.0.8") // #2
.withReuse(true);
@DynamicPropertySource
private static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl); // #3
}
@Autowired
private AppProperties properties;
@Autowired
private CollectionService command;
@Autowired
private ReactiveMongoTemplate template;
@Test
void multipleBulkWriteResultsAreReturned() {
Document givenDocument1 = new Document();
givenDocument1.put("_id", "628ea3edb5110304e5e814f6");
givenDocument1.put("type", "municipality");
Document givenDocument2 = new Document();
givenDocument2.put("_id", "628ea3edb5110304e5e814f7");
givenDocument2.put("type", "street");
Document givenDocument3 = new Document();
givenDocument3.put("_id", "628ea3edb5110304e5e814f8");
givenDocument3.put("type", "housenumber");
template.insert(Arrays.asList(givenDocument1, givenDocument2, givenDocument3), properties.getCollectionName()).blockLast();
BulkWriteResult expectedBulkWriteResult1 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 2, 2, Collections.emptyList(),
Collections.emptyList());
BulkWriteResult expectedBulkWriteResult2 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 1, 1, Collections.emptyList(),
Collections.emptyList());
command.enrichAll( properties.getCollectionName(), properties.getEnrichingKey() , properties.getEnrichingUri())
.as(StepVerifier::create) // #4
.expectNext(expectedBulkWriteResult1)
.expectNext(expectedBulkWriteResult2)
.verifyComplete();
}
}
|
#1
: Adds TestContainers Junit 5 extension.
#2
: Starts a MongoDB container.
#3
: Sets up application with container’s URI.
#4
: Uses StepVerifier
from Reactor Test to assert output stream.
We launch the integration tests :
1
|
mvn test -Dspring.profiles.active=test
|
Test results :
1
2
3
4
5
6
7
8
9
10
11
12
13
|
...
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 20.563 s - in com.maoudia.tutorial.CollectionServiceTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 32.100 s
[INFO] Finished at: 2022-06-10T01:02:17+02:00
[INFO] ------------------------------------------------------------------------
|
5. Conclusion
In this tutorial, we managed to implement a complete solution to enrich and update efficiently a MongoDB collection. Moreover, we have seen how to write integration tests with JUnit 5 and Testcontainers.
The complete source code is available on Github.
In the next chapter of MongoDB Reactive CLI series, we will add new features and use Picocli to facilitate interactions with the application.
6. Resources
Reference https://www.maoudia.com/blog/bulk-update-with-spring-data-mongodb-reactive/