What is the Hystrix Isolation Policy?
Official documentation: https://github.com/Netflix/Hystrix/wiki/Configuration#executionisolationstrategy
Executing an isolation policy
This property instructs HystrixCommand.run()
which isolation policy to execute, being one of the following two options.
THREAD
- it is executed on a separate thread, and concurrent requests are limited by the number of threads in the thread pool
SEMAPHORE
- it is executed on the calling thread, and concurrent requests are limited by the amount of semaphore
Problem
When the isolation policy is THREAD
, there is no way to get the value in ThreadLocal
.
So, when a feign remote call is made between services, there is no way to get the request object from the request header and thus the token.
Solution 1 - Violently modify the Hystrix isolation policy
Set the isolation policy to SEMAPHORE
1
|
hystrix.command.default.execution.isolation.strategy: SEMAPHORE
|
However, Hystrix officially does not recommend this practice and strongly recommends using SEMAPHORE
as an isolation policy.
Thread or Semaphore
The default, and the recommended setting, is to run HystrixCommands using thread isolation (THREAD) and HystrixObservableCommands using semaphore isolation (SEMAPHORE).
Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.
Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.
Solution 2 - Custom Concurrency Policy
Custom Concurrency Strategy
Just write a class that inherits from HystrixConcurrencyStrategy
and override the wrapCallable
method.
CustomFeignHystrixConcurrencyStrategy
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
package com.lzhpo.common.feign.strategy;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Custom isolation strategy to solve the problem that {@code RequestContextHolder.getRequestAttributes()} is empty
*
* @author Zhaopo Liu
*/
@Slf4j
@Primary
@Component
public class CustomFeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private HystrixConcurrencyStrategy hystrixConcurrencyStrategy;
public CustomFeignHystrixConcurrencyStrategy() {
try {
this.hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.hystrixConcurrencyStrategy instanceof CustomFeignHystrixConcurrencyStrategy) {
// Welcome to singleton hell...
return;
}
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
} catch (Exception e) {
log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher, HystrixPropertiesStrategy propertiesStrategy) {
if (log.isDebugEnabled()) {
log.info(
"Current Hystrix plugins configuration is [" + "concurrencyStrategy [" + this.hystrixConcurrencyStrategy
+ "]," + "eventNotifier [" + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
log.info("Registering Sleuth Hystrix Concurrency Strategy.");
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return new WrappedCallable<>(callable, requestAttributes);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.hystrixConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.hystrixConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.hystrixConcurrencyStrategy.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return this.hystrixConcurrencyStrategy.getRequestVariable(rv);
}
static class WrappedCallable<T> implements Callable<T> {
private final Callable<T> target;
private final RequestAttributes requestAttributes;
public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
this.target = target;
this.requestAttributes = requestAttributes;
}
/**
* feign opens the fuse (hystrix): feign.hystrix.enabled=ture, and uses the default signal isolation level,
* The HttpServletRequest object is independent of each other in the parent thread
* and the child thread and is not shared.
* So the HttpServletRequest data of the parent thread used in the child thread is null,
* naturally it is impossible to obtain the token information of the request header
* In a multithreaded environment, call before the request, set the context before the call
*
* @return T
* @throws Exception Exception
*/
@Override
public T call() throws Exception {
try {
// Set true to share the parent thread's HttpServletRequest object setting
RequestContextHolder.setRequestAttributes(requestAttributes, true);
return target.call();
} finally {
RequestContextHolder.resetRequestAttributes();
}
}
}
}
|
SpringCloud integration with Feign
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package com.lzhpo.common.feign.starter.config;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
/**
* Feign interceptor config, set the request header.
*
* @author Zhaopo Liu
*/
@Slf4j
@Configuration
public class CustomFeignConfig implements RequestInterceptor {
private static final String TOKEN_HEADER = "Authorization";
@Override
public void apply(RequestTemplate requestTemplate) {
Optional.ofNullable(getHttpServletRequest())
.ifPresent(
request -> {
Map<String, String> headers = getHeaders(request);
String token = headers.get(TOKEN_HEADER);
log.info("Feign request header: {}", token);
requestTemplate.header(TOKEN_HEADER, token);
});
}
private HttpServletRequest getHttpServletRequest() {
return Optional.ofNullable(RequestContextHolder.getRequestAttributes())
.map(ServletRequestAttributes.class::cast)
.map(ServletRequestAttributes::getRequest)
.orElse(null);
}
private Map<String, String> getHeaders(HttpServletRequest request) {
Map<String, String> map = new LinkedHashMap<>();
Enumeration<String> enumeration = request.getHeaderNames();
while (enumeration.hasMoreElements()) {
String key = enumeration.nextElement();
String value = request.getHeader(key);
map.put(key, value);
}
log.info("Request header carried between services:{}", map);
return map;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.lzhpo.common.feign.config;
import com.lzhpo.common.feign.interceptor.LogInterceptor;
import feign.Feign;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.openfeign.FeignAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
/**
* OkHttp config
*
* @author Zhaopo Liu
*/
@Configuration
@ConditionalOnClass(Feign.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class FeignOkHttpConfig {
@Bean
public OkHttpClient okHttpClient() {
return new OkHttpClient.Builder()
// Connect timeout
.connectTimeout(60, TimeUnit.SECONDS)
// Read timeout
.readTimeout(60, TimeUnit.SECONDS)
// Write timeout
.writeTimeout(60, TimeUnit.SECONDS)
// Whether to reconnect automatically
.retryOnConnectionFailure(true).connectionPool(new ConnectionPool())
// Log interceptor
.addInterceptor(new LogInterceptor())
.build();
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
package com.lzhpo.common.feign.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
/**
* RestTemplateConfig
*
* @author Zhaopo Liu
*/
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
|
Log interceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.lzhpo.common.feign.interceptor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException;
/**
* Log interceptor
*
* @author Zhaopo Liu
*/
@Slf4j
public class LogInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
long t1 = System.nanoTime();
Request request = chain.request();
log.info(String.format("sending %s request %s%n%s", request.method(), request.url(), request.headers()));
Response response = chain.proceed(request);
long t2 = System.nanoTime();
log.info(String.format("received response for %s in %.1fms%n%s", response.request().url(), (t2 - t1) / 1e6d,
response.headers()));
return response;
}
}
|
Auto-injection configuration
I am a common-feign
dependency and need to configure spring.facts
for auto-injection.
So, configure: /resources/META-INF/spring.facts
1
2
3
4
5
|
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.lzhpo.common.feign.config.CustomFeignConfig,\
com.lzhpo.common.feign.config.FeignOkHttpConfig,\
com.lzhpo.common.feign.config.RestTemplateConfig,\
com.lzhpo.common.feign.strategy.CustomFeignHystrixConcurrencyStrategy
|
–Integrated user service demo–
Configuration file
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# feign config
feign:
httpclient:
enabled: false
okhttp:
enabled: true
hystrix:
enabled: true
# hystrix config
hystrix:
shareSecurityContext: true
command:
default:
execution:
isolation:
thread:
timeoutInMilliseconds: 60000
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: ALWAYS
|
User Service Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.lzhpo.systemservice.api.feign;
import com.lzhpo.common.constant.SysConst;
import com.lzhpo.common.feign.config.CustomFeignConfig;
import com.lzhpo.common.response.ResultVo;
import com.lzhpo.common.entity.SecurityUser;
import com.lzhpo.systemservice.api.entity.SysRole;
import com.lzhpo.systemservice.api.feign.factory.UserServiceClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
/** @author Zhaopo Liu */
@FeignClient(value = SysConst.SYSTEM_SERVICE, configuration = CustomFeignConfig.class,
fallbackFactory = UserServiceClientFallbackFactory.class)
public interface UserServiceClient {
@GetMapping("/api/user/v1/user/findUserInfoByUsername")
ResultVo<SecurityUser> findUserInfoByUsername(@RequestParam("username") String username);
}
|
Log breaker, Feign fallback method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package com.lzhpo.systemservice.api.feign.fallback;
import com.lzhpo.common.response.ResultVo;
import com.lzhpo.common.entity.SecurityUser;
import com.lzhpo.systemservice.api.entity.SysRole;
import com.lzhpo.systemservice.api.feign.UserServiceClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Log circuit breaker implementation, fallback method called by feign
*
* @author Zhaopo Liu
*/
@Slf4j
@Component
public class UserServiceClientFallbackImpl implements UserServiceClient {
private Throwable throwable;
public Throwable getThrowable() {
return throwable;
}
public void setThrowable(Throwable throwable) {
this.throwable = throwable;
}
@Override
public ResultVo<SecurityUser> findUserInfoByUsername(String username) {
log.error("Query user information based on user name [{}] failed! Exception information: {}", username, throwable.getMessage());
return null;
}
}
|
User circuit breaker factory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package com.lzhpo.systemservice.api.feign.factory;
import com.lzhpo.systemservice.api.feign.UserServiceClient;
import com.lzhpo.systemservice.api.feign.fallback.UserServiceClientFallbackImpl;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* User circuit breaker factory
*
* @author Zhaopo Liu
*/
@Component
@Slf4j
public class UserServiceClientFallbackFactory implements FallbackFactory<UserServiceClient> {
@Override
public UserServiceClient create(Throwable throwable) {
UserServiceClientFallbackImpl userServiceClientFallback = new UserServiceClientFallbackImpl();
userServiceClientFallback.setThrowable(throwable);
log.error("The user circuit breaker detects the feign abnormality, the reason for the abnormality: {}", throwable.getMessage());
return userServiceClientFallback;
}
}
|
Reference
http://www.lzhpo.com/article/167