Nacos, as a configuration center, necessarily needs to ensure high availability of service nodes, so how does Nacos implement clustering?
The following diagram, represents the deployment diagram of Nacos cluster.
How Nacos clusters work
Nacos is an uncentralized node design in the cluster architecture as a configuration center. Since there are no master-slave nodes and no election mechanism, it is necessary to add virtual IPs (VIPs) in order to enable hot standby.
The data storage of Nacos is divided into two parts
- Mysql database storage, where all Nacos nodes share the same copy of data, and the copy mechanism of data is solved by Mysql’s own master-slave scheme, thus ensuring data reliability.
- local disk of each node, which stores a full amount of data, with the following path:
/data/program/nacos-1/data/config-data/${GROUP}
.
In Nacos design, Mysql is a central data repository and the data in Mysql is considered to be absolutely correct. In addition, Nacos writes a copy of the data in Mysql to local disk at startup.
The advantage of this design is that it improves performance. When a client needs to request a configuration item, the server will want Ian to read the corresponding file from disk and return it, which is more efficient than the database.
When the configuration is changed.
- Nacos saves the changed configuration to the database and then writes it to a local file.
- Then an HTTP request is sent to other nodes in the cluster, and the other nodes receive the event and dump the data just written from Mysql to the local file.
In addition, when NacosServer is started, it will start a timed task to dump the full amount of data to the local file every 6 hours.
Configuration change synchronization portal
When a configuration is modified, deleted, or added, a notifyConfigChange
event is issued.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
//Omit..
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
}//Omit
return true;
}
|
AsyncNotifyService
Configure data change events with a dedicated listener, AsyncNotifyService, which will handle synchronization events after data changes.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
Collection<Member> ipList = memberManager.allMembers(); //Get the list of ip's in the cluster
// Build NotifySingleTask and add it to the queue.
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (Member member : ipList) { //Iterate over each node in the cluster
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
}
//Asynchronous execution of tasks, AsyncTask
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
|
AsyncTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {//Iterate over the data in the queue until it is empty
NotifySingleTask task = queue.poll(); //Get task
String targetIp = task.getTargetIP(); //Get target ip
if (memberManager.hasMember(targetIp)) { //If the list of ip's in the cluster contains the target ip
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
//Determine the health status of the target ip
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
if (unHealthNeedDelay) { //If the target service is non-healthy, it continues to be added to the queue and deferred for further execution.
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, task.target);
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
//Build header
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
AuthHeaderUtil.addIdentityToHeader(header);
//Initiate a remote call via restTemplate, and if the call is successful, execute the AsyncNotifyCallBack callback method
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
}
}
}
}
|
The target node receives the request
The request address for data synchronization is: task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
//
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
|
The dumpService.dump is used to implement the configuration update with the following code
The current task will be added to DumpTaskMgr to manage it.
1
2
3
4
5
6
7
|
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
|
TaskManager.addTask
, call the parent class first to finish adding the task.
1
2
3
4
5
|
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
super.addTask(key, newTask);
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}
|
In this scenario design, it is usually done using the producer-consumer pattern, so it is not hard to guess here that the task will be saved to a queue and then there will be another thread to execute it.
NacosDelayTaskExecuteEngine
The parent class of TaskManager is NacosDelayTaskExecuteEngine,
This class has a member property protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
that holds the deferred task type AbstractDelayTask.
In the constructor of this class, a delayed task is initialized, where the specific task is ProcessRunnable.
1
2
3
4
5
6
7
|
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
|
ProcessRunnable
1
2
3
4
5
6
7
8
9
10
11
|
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
|
processTasks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
protected void processTasks() {
//Get all tasks
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
//Get the task processor, here the DumpProcessor is returned
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
|
DumpProcessor.process
Retrieves the latest data from the database and then updates the local cache and disk.