vlambda博客
学习文章列表

zuul搭配ribbon和hystrix路由重试机制以及参数配置


一、zuul搭配ribbon重试机制需要区分版本

1、第一个版本   Spring Cloud Netflix Core 1.2.6.RELEASE版本以前

AbstractRibbonCommand类代码如下

@Overrideprotected ClientHttpResponse run() throws Exception { final RequestContext context = RequestContext.getCurrentContext(); RQ request = createRequest(); //主要看这行代码,client默认实现是RibbonLoadBalancingHttpClient ,重试机制主要在 //RetryHandler接口实现 RS response = this.client.executeWithLoadBalancer(request); .... 省略 .... return new RibbonHttpResponse(response);}

目前已不使用该版本,就不说了

2、第二个  Spring Cloud Netflix Core 1.4.6.RELEASE版本之后的  

AbstractRibbonCommand类代码如下

@Overrideprotected ClientHttpResponse run() throws Exception { final RequestContext context = RequestContext.getCurrentContext();
RQ request = createRequest(); RS response; boolean retryableClient = this.client instanceof AbstractLoadBalancingClient && ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request); //目前新版本重试都是走13行代码,不过需要引入新依赖,注入RetryableRibbonLoadBalancingHttpClient这个类 if (retryableClient) { response = this.client.execute(request, config); } else { response = this.client.executeWithLoadBalancer(request, config); } context.set("ribbonResponse", response); .... 省略 .... return new RibbonHttpResponse(response);}

2.1、pom文件引入的新的依赖如下

<!-- zuul重试 --><dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId></dependency>

这个依赖会注入RetryableRibbonLoadBalancingHttpClient这个类的实现

@Bean@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient, RibbonLoadBalancerContext ribbonLoadBalancerContext) { RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient( httpClient, config, serverIntrospector, loadBalancedRetryFactory); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); client.setRibbonLoadBalancerContext(ribbonLoadBalancerContext); Monitors.registerObject("Client_" + this.name, client); return client;}




二、参数配置详解

1、目前用得比较多的重试策略的配置参数如下

# Zuul 配置项,对应 ZuulProperties 配置类zuul: servlet-path: / # ZuulServlet 匹配的路径,默认为 /zuul # 路由配置项,对应 ZuulRoute Map routes: #全局配置所有路由开启重试 默认为false retryable: true route_weiye: path: /hwy/** url: https://www.baidu.com route_users: path: /users/** service-id: user-service #单独对这个路由开启重试 retryable: true
# ribbon 配置项ribbon: #同一实例最大重试次数,不含首次调用,默认为0 MaxAutoRetries: 1 #重试负载均衡其他实例最大重试次数,不含首次实例,默认为1 MaxAutoRetriesNextServer: 2 #ribbon httpclient 读取超时时间 ReadTimeout: 30000 #ribbon httpclient 连接超时时间 ConnectTimeout: 3000 #httpclient请求返回响应码包含以下配置即会触发重试 retryableStatusCodes: 404,500,503 #是否所有操作都重试,若false则仅get请求重试 OkToRetryOnAllOperations: true
#hystrix command 配置执行超时时间hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds: 19800

针对配置详细说明一下

  • zuul.routes.retryable=true   #全局开启路由重试

  • zuul.routes.route_users.retryable=true  #针对单个路由开启重试

  • ribbon.OkToRetryOnAllOperations=true  #是否所有操作都重试,若false则仅get请求重试 ,代码如下

public class RibbonLoadBalancedRetryPolicy implements LoadBalancedRetryPolicy{ .... 省略 .... public boolean canRetry(LoadBalancedRetryContext context) { HttpMethod method = context.getRequest().getMethod(); return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations(); } }
  • ribbon.retryableStatusCodes: 404,500,503   #httpclient请求返回响应码包含以下配置即会触发重试

public class RibbonLoadBalancedRetryPolicy implements LoadBalancedRetryPolicy{ .... 省略 .... public RibbonLoadBalancedRetryPolicy(String serviceId, RibbonLoadBalancerContext context, ServiceInstanceChooser loadBalanceChooser, IClientConfig clientConfig) { .... 省略 .... String retryableStatusCodesProp = clientConfig .getPropertyAsString(RETRYABLE_STATUS_CODES, ""); String[] retryableStatusCodesArray = retryableStatusCodesProp.split(","); for (String code : retryableStatusCodesArray) { if (!StringUtils.isEmpty(code)) { try { retryableStatusCodes.add(Integer.valueOf(code.trim())); } catch (NumberFormatException e) { log.warn("We cant add the status code because the code [ " + code + " ] could not be converted to an integer. ", e); } } } }  @Override public boolean retryableStatusCode(int statusCode) { return retryableStatusCodes.contains(statusCode); } }


public class RetryableRibbonLoadBalancingHttpClient extends RibbonLoadBalancingHttpClient {  @Override public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {  .... 省略 .... if (retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) { throw new HttpClientStatusCodeException( RetryableRibbonLoadBalancingHttpClient.this.clientName, httpResponse, HttpClientUtils.createEntity(httpResponse), httpUriRequest.getURI()); } }}

从代码来看httpclient执行请求返回响应后,判断响应码是包含在配置数据内,如果是,则抛出异常给上层进行重试

  • ribbon.ReadTimeout=30000  #  httpclient读取超时时间

  • ribbon.ConnectTimeout=3000  #httpclient 连接超时时间

  • hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=19800 #hystrix command 配置执行超时时间

这三个配置的超时时间关系如下

public abstract class AbstractRibbonCommand<LBC extends AbstractLoadBalancerAwareClient<RQ, RS>, RQ extends ClientRequest, RS extends HttpResponse> extends HystrixCommand<ClientHttpResponse> implements RibbonCommand{  protected static int getHystrixTimeout(IClientConfig config, String commandKey) { int ribbonTimeout = getRibbonTimeout(config, commandKey); DynamicPropertyFactory dynamicPropertyFactory = DynamicPropertyFactory .getInstance(); int defaultHystrixTimeout = dynamicPropertyFactory.getIntProperty( "hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds", 0).get(); int commandHystrixTimeout = dynamicPropertyFactory .getIntProperty("hystrix.command." + commandKey + ".execution.isolation.thread.timeoutInMilliseconds", 0) .get(); int hystrixTimeout; if (commandHystrixTimeout > 0) { hystrixTimeout = commandHystrixTimeout; } else if (defaultHystrixTimeout > 0) { hystrixTimeout = defaultHystrixTimeout; } else { hystrixTimeout = ribbonTimeout; } if (hystrixTimeout < ribbonTimeout) { LOGGER.warn("The Hystrix timeout of " + hystrixTimeout + "ms for the command " + commandKey + " is set lower than the combination of the Ribbon read and connect timeout, " + ribbonTimeout + "ms."); } return hystrixTimeout; }
protected static int getRibbonTimeout(IClientConfig config, String commandKey) { int ribbonTimeout; if (config == null) { ribbonTimeout = RibbonClientConfiguration.DEFAULT_READ_TIMEOUT + RibbonClientConfiguration.DEFAULT_CONNECT_TIMEOUT; } else { int ribbonReadTimeout = getTimeout(config, commandKey, "ReadTimeout", IClientConfigKey.Keys.ReadTimeout, RibbonClientConfiguration.DEFAULT_READ_TIMEOUT); int ribbonConnectTimeout = getTimeout(config, commandKey, "ConnectTimeout", IClientConfigKey.Keys.ConnectTimeout, RibbonClientConfiguration.DEFAULT_CONNECT_TIMEOUT); int maxAutoRetries = getTimeout(config, commandKey, "MaxAutoRetries", IClientConfigKey.Keys.MaxAutoRetries, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES); int maxAutoRetriesNextServer = getTimeout(config, commandKey, "MaxAutoRetriesNextServer", IClientConfigKey.Keys.MaxAutoRetriesNextServer, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER); ribbonTimeout = (ribbonReadTimeout + ribbonConnectTimeout) * (maxAutoRetries + 1) * (maxAutoRetriesNextServer + 1); } return ribbonTimeout; } }

ribbon整体timeout时间 即读取超时加上连接超时乘上最大重试次数加1和最大重试其他节点次数+1

ribbonTimeout =(ribbonReadTimeout + ribbonConnectTimeout)* (maxAutoRetries + 1) *(maxAutoRetriesNextServer + 1);


hystrix超时时间需要大于ribbon整体超时时间,否则会先触发hystrix超时,从而中断点重试

if (hystrixTimeout < ribbonTimeout) { LOGGER.warn("The Hystrix timeout of " + hystrixTimeout + "ms for the command " + commandKey + " is set lower than the combination of the Ribbon read and connect timeout, " + ribbonTimeout + "ms.");}
  • ribbon.MaxAutoRetries: 1  #同一实例最大重试次数,不含首次调用,默认为0

  • ribbon.MaxAutoRetriesNextServer: 2  #重试负载均衡其他实例最大重试次数,不含首次实例,默认为1


public class RibbonLoadBalancedRetryPolicy implements LoadBalancedRetryPolicy { .... 省略 .... @Override public boolean canRetrySameServer(LoadBalancedRetryContext context) { //判读同一节点重试次数是否小于配置最大次数 return sameServerCount < lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context); }  @Override public boolean canRetryNextServer(LoadBalancedRetryContext context) { //判读下一个节点重试次数是否小于配置最大次数 return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context); }   @Override public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) { //是否是熔断性错误,默认是ConnectException.class, SocketTimeoutException.class //若是,则更新节点记录统计数据 if (lbContext.getRetryHandler().isCircuitTrippingException(throwable)) { updateServerInstanceStats(context); }  //检查是否需要切换另外一个节点进行重试 if (!canRetrySameServer(context) && canRetryNextServer(context)) { context.setServiceInstance(loadBalanceChooser.choose(serviceId)); }  //同一节点超过重试次数则重制切换到下一个节点进行重试 if (sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) { sameServerCount = 0; nextServerCount++; if (!canRetryNextServer(context)) { context.setExhaustedOnly(); } } else { sameServerCount++; }  }  }



三、自定义重试策略

1、注入重试策略工厂类

@Configurationpublic class RibbonLoadBalancedRetryFactoryConfig { @Bean @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate") @ConditionalOnMissingBean public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory( final SpringClientFactory clientFactory) { //注入自定义负载均衡重试工厂类 return new MyRibbonLoadBalancedRetryFactory(clientFactory); }}

2.MyRibbonLoadBalancedRetryFactory创建重试策略时,创建自定义重试策略

public class MyRibbonLoadBalancedRetryFactory extends RibbonLoadBalancedRetryFactory { private static final Logger log= LoggerFactory.getLogger(MyRibbonLoadBalancedRetryFactory.class);
private SpringClientFactory clientFactory;
public MyRibbonLoadBalancedRetryFactory(SpringClientFactory clientFactory) { super(clientFactory); this.clientFactory = clientFactory; }
@Override public LoadBalancedRetryPolicy createRetryPolicy(String service, ServiceInstanceChooser serviceInstanceChooser) { RibbonLoadBalancerContext lbContext = this.clientFactory .getLoadBalancerContext(service); //创建自定义重试策略 return new MyRibbonLoadBalancedRetryPolicy(service, lbContext, serviceInstanceChooser, clientFactory.getClientConfig(service)); }}


3、重试策略继承RibbonLoadBalancedRetryPolicy或者实现LoadBalancedRetryPolicy接口

public class MyRibbonLoadBalancedRetryPolicy extends RibbonLoadBalancedRetryPolicy {
private static final Logger log= LoggerFactory.getLogger(MyRibbonLoadBalancedRetryFactory.class);
private RibbonLoadBalancerContext lbContext;

public MyRibbonLoadBalancedRetryPolicy(String serviceId, RibbonLoadBalancerContext context, ServiceInstanceChooser loadBalanceChooser, IClientConfig clientConfig) { super(serviceId,context,loadBalanceChooser,clientConfig); this.lbContext=context; }
@Override public boolean canRetry(LoadBalancedRetryContext context) { //实现自己的重试逻辑 HttpMethod method = context.getRequest().getMethod(); Throwable lastThrowable = context.getLastThrowable(); if (lastThrowable instanceof SocketTimeoutException){ String message = lastThrowable.getMessage(); log.info("--------------------------,{}",message); if ("Read timed out".equals(message)){ return false; } } return HttpMethod.GET == method || HttpMethod.POST == method|| lbContext.isOkToRetryOnAllOperations(); }


四、zuul配置hystix熔断机制

zuul提供hystrix熔断接口FallbackProvider进行实现

示例如下

@Componentpublic class ApiFallbackProvider implements FallbackProvider { private static final Logger logger = LoggerFactory.getLogger(ApiFallbackProvider.class);  @Override public String getRoute() { //针对转发的路由进行熔断,也可以针对单个路由进行,比如return "user-service" return "*"; }
@Override public ClientHttpResponse fallbackResponse(String route, final Throwable cause) { // 获取当前请求上下文 RequestContext ctx = RequestContext.getCurrentContext(); HttpServletRequest request = ctx.getRequest(); String method = request.getMethod(); logger.info("============method:{}",method); logger.info("============route:{}",route); logger.info("=============error:",cause); //新建一个请求响应体并返回 return new ClientHttpResponse() {
@Override public HttpStatus getStatusCode() { return HttpStatus.OK; }
@Override public int getRawStatusCode() { return HttpStatus.OK.value(); }
@Override public String getStatusText() { return HttpStatus.OK.getReasonPhrase(); }
@Override public void close() {}
@Override public InputStream getBody() { // 响应内容 String bodyText = String.format("{\"code\": 500,\"message\": \"Service unavailable:%s\"}", cause.getMessage()); return new ByteArrayInputStream(bodyText.getBytes()); }
@Override public HttpHeaders getHeaders() { // 响应头 HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); // json 返回 return headers; }
}; }
}