众所周知springcloud ribbon 是用来做负载均衡的。那么它是怎么做到使restTemplate拥有负载均衡呢?带着这个疑问咱们来源码分析。
本文会详细介绍阅读源码的方式。如果从多实现中找到具体实现。(非idea断点方式)
demo 配置文件
server:
port: 8081
spring:
application:
name: spring-cloud-order-service
# 没有依赖eureka注册中心 使用配置文件配置server
spring-cloud-user-service:
ribbon:
listOfServers: localhost:8082,localhost:8083
远程调用demo
@Bean
@LoadBalanced //负载均衡注解
public RestTemplate restTemplate(RestTemplateBuilderrestTemplateBuilder) {
return restTemplateBuilder.build();
}
@Autowired
private RestTemplate restTemplate;
@RequestMapping("/test")
public Object test(){
return restTemplate.getForObject("http://spring-cloud-ser-service/getUserInfo",String.class);
}
上述代码可以看到在注册restTemplate时加上了@LoadBalanced,那么LoadBalanced的作用是什么呢?
/**
* Annotation to mark a RestTemplate or WebClient bean to be configured to use a
* LoadBalancerClient.
* @author Spencer Gibb
*/
@Target({
ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
我们可以看到LoadBalanced注解其实是继承了spring注解的Qualifier。
Qualifier相当于一个标记,它可以将所有被Qualifier标记的bean统一管理。有兴趣的同学可以深入研究一下。这里不做具体阐述
LoadBalanced这个注解在哪里用到的呢?让我们来看loadBalanced的自动装配类LoadBalancerAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
// 这里留个伏笔
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
// 省略部分代码 ...
可以看到被LoadBalanced所标记的依赖注入是个列表。这个列表里所注入的是所有被@LoadBalanced标记的RestTemplate。
让我们思考一下,如果想让我们自己实现restTemplate拥有负载均衡,那么有什么方案呢?例如拦截器?是的。ribbon也是这么做的。让我们来分析初始化ribbonInterceptor。
还是在LoadBalancerAutoConfiguration我们发现有这样一个装配
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.suport.RetryTemplate")
static class LoadBalancerInterceptorConfig {
// 初始化LoadBalancerInterceptor
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return newLoadBalancerInterceptor(loadBalancerClient,requestFactory);
}
// 将所有restTemplate设置loadBalancerInterceptor
// 这块是RestTemplateCustomizer的匿名内部类
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptorloadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = newArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
这是LoadBalancerAutoConfiguration的内部类。
看到这里我们在反过来看刚刚留下伏笔的Bean
// ObjectProvider<List<RestTemplateCustomizer>> ObjectProvider是延迟加载 为了上述代码加载后这里再加载
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
restTemplateCustomizers.ifAvailable() 这个方法作用是如果找到这个RestTemplateCustomizer bean那么就执行其中的方法(lambda)
customizer.customize(restTemplate);这句话其实是调用的restTemplateCustomizer(final LoadBalancerInterceptorloadBalancerInterceptor)这个方法的匿名内部类。这里比较绕。
到这里自动装配基本就告一段落了。那么ribbonInterceptor配置完在哪里去用到呢?
@Autowired
private RestTemplate restTemplate;
@RequestMapping("/test")
public Object test(){
return restTemplate.getForObject("http://spring-cloud-ser-service/getUserInfo",String.class);
}
让我们从RestTemplate.getForObject()方法入手
RestTemplate.getForObject() 会调用doExecute
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
try {
// 创建request
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
// 执行获取响应结果
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}
这个方法中有两个主线方法
了解设计模式的同学应该马上就能做出选择,没错是AbstractClientHttpRequest抽象类
@Override
public final ClientHttpResponse execute() throws IOException {
assertNotExecuted();
// 这里又是多个实现类
ClientHttpResponse result = executeInternal(this.headers);
this.executed = true;
return result;
}
// 模板方法
protected abstract ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput)
throws IOException;
上述代码中的executeInternal(this.headers);这块又是多个实现 如下:
那么具体用的是哪个呢?这回咱们在反过来看ClientHttpRequest request = createRequest(url, method);具体返回的是哪个实现类 如下:
org.springframework.http.client.support.HttpAccessor#createRequest
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
initialize(request);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
// 省略部分代码 ...
getRequestFactory()调用的其实是HttpAccessor子类的InterceptingHttpAccessor中的getRequestFactory()
org.springframework.http.client.support.InterceptingHttpAccessor#getRequestFactory
private final List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>();
// 呼应LoadBalancerAutoConfiguration中的restTemplateCustomizer()方法
public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
Assert.noNullElements(interceptors, "'interceptors' must not contain null elements");
// Take getInterceptors() List as-is when passed in here
if (this.interceptors != interceptors) {
this.interceptors.clear();
this.interceptors.addAll(interceptors);
AnnotationAwareOrderComparator.sort(this.interceptors);
}
}
public ClientHttpRequestFactory getRequestFactory() {
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
if (!CollectionUtils.isEmpty(interceptors)) {
// 因自动装配这里肯定不会空
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
// 实际创建的request对象(ClientHttpRequest子类)
// 将自动装配的拦截器放list入此类中后面的讲解会用到
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
// 省略部分代码
以上代码会发现看到了拦截器的影子。getInterceptors();现在我们回到createRequest()方法中。为了加深印象。下面的代码块是上一步骤
org.springframework.http.client.support.HttpAccessor#createRequest
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
// createRequest调用了AbstractClientHttpRequestFactoryWrapper.createRequest(url, method);
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
initialize(request);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
// 省略部分代码 ...
注: 以上代码的createRequest方法又出现了多个实现类
同理先进AbstractClientHttpRequestFactoryWrapper.createRequest(url, method);
@Override
public final ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
// 模板方法InterceptingClientHttpRequestFactory.createRequest(uri, httpMethod, this.requestFactory);
return createRequest(uri, httpMethod, this.requestFactory);
}
上述代码其实就是调用了InterceptingClientHttpRequestFactory.createRequest(uri, httpMethod, this.requestFactory);
看到这里是不是感觉很绕?是的spring的源码确实很绕。但是看多了,就会找到技巧了。
好了让我们回到AbstractClientHttpRequest.execute();
@Override
public final ClientHttpResponse execute() throws IOException {
assertNotExecuted();
ClientHttpResponse result = executeInternal(this.headers);
this.executed = true;
return result;
}
上述代码executeInternal又出现了多个实现
同理我们还是先进入AbstractBufferingClientHttpRequest内部
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
byte[] bytes = this.bufferedOutput.toByteArray();
if (headers.getContentLength() < 0) {
headers.setContentLength(bytes.length);
}
ClientHttpResponse result = executeInternal(headers, bytes);
this.bufferedOutput = new ByteArrayOutputStream(0);
return result;
}
// 模板方法
protected abstract ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput)
throws IOException;
executeInternal() 方法会有以下实现类
InterceptingClientHttpRequest这个类有没有很熟悉?没错就是它。它就是createRequest返回的。所以我们进到InterceptingClientHttpRequest.executeInternal(headers, bytes);方法中
@Override
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
return requestExecution.execute(this, bufferedOutput);
}
// InterceptingClientHttpRequest的内部类
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}
// 重点
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
// 将createRequest方法中放入的ClientHttpRequestInterceptor遍历
// 不为空执行拦截器
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
// 默认走此方法
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
上述重点代码我们会看到遍历后会执行nextInterceptor.intercept(request, body, this);实现类如下
会调用LoadBalancerInterceptor.intercept(request, body, this);为什么呢?
因为LoadBalancerAutoConfiguration自动装配装配的就是LoadBalancerInterceptor。大家可以回顾一下。
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
以上代码重点在this.loadBalancer.execute方法我们直接看
org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#execute(java.lang.String, org.springframework.cloud.client.loadbalancer.LoadBalancerRequest, java.lang.Object)
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
// 负载均衡(重点)
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
上述代码getServer就是用负载均衡规则来获取服务地址的。
我们直接来看com.netflix.loadbalancer.BaseLoadBalancer#chooseServer
private final static IRule DEFAULT_RULE = new RoundRobinRule();
protected IRule rule = DEFAULT_RULE;
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
// 根据不同规则获取
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
上述代码可以看出来默认使用RoundRobinRule轮训规则进行负载
那么它是怎么知道所以注册的机器呢?
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
private static Logger logger = LoggerFactory
.getLogger(BaseLoadBalancer.class);
private final static IRule DEFAULT_RULE = new RoundRobinRule();
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
private static final String DEFAULT_NAME = "default";
private static final String PREFIX = "LoadBalancer_";
protected IRule rule = DEFAULT_RULE;
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
// 监听
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
// 监听
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
// 部分代码 ...
上述代码中的两个监听证明了有多少机器是UpServerList和全部机器allServerList
至于如何监听本章节不做阐述。留在注册中心eureka中做说明。
LoadBalancerRequest.intercept()
public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
final byte[] body, final AsyncClientHttpRequestExecution execution)
throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return this.loadBalancer.execute(serviceName,
new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
@Override
public ListenableFuture<ClientHttpResponse> apply(
final ServiceInstance instance) throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request,
instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
return execution.executeAsync(serviceRequest, body);
}
});
}
通过以上讲解我们发现其实ribbon就是用的拦截器对restTemplate做了支撑。如果有拦截器。那么就会执行拦截器中的逻辑。如果没有那么就会走正常逻辑(无负载等)下面我为大家整理一下主线流程
文章浏览阅读1k次。前言现在直播平台由于弹幕的存在,主播与观众可以更轻松地进行互动,非常受年轻群众的欢迎。斗鱼TV就是一款非常流行的直播平台,弹幕更是非常火爆。看到有不少主播接入 弹幕语音播报器、 弹幕点歌等模块,这都需要首先连接斗鱼弹幕。经常看到其它编程语言的开发者,分享了他们斗鱼弹幕客户端的代码。.NET当然也能做,还能做得更好(只是不知为何很少见人分享????)。本文将包含以下内容:我将使用斗鱼TV官方公开的弹幕PD..._斗鱼弹幕服务器
文章浏览阅读825次。11-22晚上举行的华创杯决赛。提前看了决赛前十的作品,觉得自己前六的概率应该_华创杯个人心得
文章浏览阅读4.1w次,点赞71次,收藏393次。摘要关联分析,用于发现隐藏在大型数据集中的有意义的联系。这种联系反映一个事物与其他事物之间的相互依存性和关联性。如果两个或者多个事物之间存在一定的关联关系,那么,其中一个事物就能够通过其他事物预测到。超市在运营中保存了交易明细帐数据,考虑根据顾客购买商品的情况,分析商品购买之间的关联,从而为超市提供合理的建议。本次试验主要有两个分析方向,分别是分析商品之间的潜在联系和分析顾客可能还会购..._spssmodeler关联分析
文章浏览阅读1.2k次。MicroPython在官方网站上提供了一个在线测试的环境,可以让我们通过浏览器去运行和体验MicroPython。这个在线演示环境可以运行各种例程,查看各种外设和功能模块,如LED、GPIO、ADC、按键、舵机驱动、延时、数学计算等,可以看到LED的变化,但是不支持I2C、SPI、UART、定时器等硬件功能,因为这个在线演示是通过QEMU进行软件仿真的,并不是真实开发板运行(早期的在线..._pyboard
文章浏览阅读93次。#对于库的基本操作#展示所有的数据库#show databases;#创建一个数据库CREATE DATABASE [IF NOT EXISTS]test42;#删除数据库DROP DATABASE test42;#使用数据库USE test;#对于表的基本操作#修改表名ALTER TABLE student RENAME AS student1;#添加字段//新起一行ALTER TABLE student1 ADD s1 VARCHAR(30);#查看表DESC studen_myslq老版本修改事务语句
文章浏览阅读162次。房价预测是一个经典的机器学习问题,需要充分考虑数据的特点和问题的特点,选择合适的模型和特征,并进行充分的训练和优化,才能得到一个高性能的预测模型。通过学习gitee上的资料和一些博主的博客,我对于数据收集和预处理、特征选择和特征工程、模型选择和训练、模型评估和优化有了整体的认识,并使用Intel Distribution for Python实现了房价预测算法的实现。_oneapi机器学习预测淡水质量
文章浏览阅读1.3k次。堆栈的区别管理方式不同:栈:栈区空间由操作系统分配与释放,用于存储局部变量、函数参数等。堆:堆区空间由程序员自主分配与释放。空间大小不同:栈:栈的大小是固定的,不同的操作系统也不同。window一般为2M,linux下为10M堆:理论上可以分配虚拟地址空间大小的内存。分配效率不同:栈分配空间的效率更高。栈的擦偶哦在硬件层提供支持。分配专门的寄存器来存储栈的地址,压栈出..._栈的大小是固定的吗
文章浏览阅读182次。深度模型有高超的学习(数据拟合)能力,但为了提高模型的泛化能力、让模型在新的数据上也有好的表现,我们需要寻找一些方法干扰模型的训练过程,避免模型“记住”训练数据,这就是正则化。本次报告邀..._魏哲巍清华交叉信息研究院
文章浏览阅读8.5k次。SEO优化扫我一、服务器无法连接远程桌面1、Ping不通IP,网站打不开,不可以远程连接。可能是服务器死机了,或者网络有问题,请尝试Web重启服务器或联系服务商确认。2、Ping正常,网站可以打开,远程桌面无法连接,请尝试Web重启服务器或者联系服务商确认。另外你是否修改了远程桌面端口,而没有在防火墙例外该端口.3、终端服务器超出了最大允许连接数,Windows 2003 系统默认可以同时登陆2个..._windows 开启了远程桌面服务还是无法远程
文章浏览阅读1k次。https://blog.csdn.net/lvoyee/article/details/82017057_spring factories enableautoconfiguration
文章浏览阅读762次。1、JSON解析2、异步请求3、网络通信小结4、黑酷例子5、XML解析6、XML小结7、POST请求8、HTTP底层通信9、HTTP通信小结10、请求超时 & URL转码11、发送JSON给服务器12、多值参数
文章浏览阅读81次。Android UI基础之五大布局 Android的界面是有布局和组件协同完成的,布局好比是建筑里的框架,而组件则相当于建筑里的砖瓦。组件按照布局的要求依次排列,就组成了用户所看见的界面。Android的五大布局分别是LinearLayout(线性布局)、FrameLayout(单帧布局)、RelativeLayout(相对布局)、AbsoluteLayout(绝对布局,An..._android界面元素主要由5部分构成()