前
最近在看dubbo源码,记录一下dubbo的核心流程,即服务暴露和服务引用
正
服务暴露
服务暴露的入口在ServiceConfig#export()
,经过一系列的读取配置后进入doExportUrls()
方法,
在该方法中,先加载注册中心的URL,然后进入doExportUrlsFor1Protocol()
方法中,
该方法中,首先进行对应服务接口的方法的配置,然后跳过一系列的校验判断,到达服务暴露的部分,首先是遍历注册中心url,每一个url生成一个Invoker,创建Exporter,添加到全局变量数组中。
上述创建invoker的代码如下:
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`
exporters.add(exporter);
使用protocal暴露invoker对象
protocol是接口Protocol的实现类,这个接口比较核心,接口内容如下
@SPI("dubbo")
public interface Protocol {
/**
* Get default port when user doesn't config the port.
*
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation: <br>
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();<br>
* 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
* export the same URL<br>
* 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
*
* @param <T> Service type
* @param invoker Service invoker
* @return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
/**
* 暴露远程服务:<br>
* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
* 2. export() 必须是幂等的,也就是暴露同一个 URL 的 Invoker 两次,和暴露一次没有区别。<br>
* 3. export() 传入的 Invoker 由框架实现并传入,协议不需要关心。<br>
*
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service: <br>
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object <br>
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
* protocol sends remote request in the `Invoker` implementation. <br>
* 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
* connection fails.
*
* @param <T> Service type
* @param type Service class
* @param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
/**
* 引用远程服务:<br>
* 1. 当用户调用 refer() 所返回的 Invoker 对象的 invoke() 方法时,协议需相应执行同 URL 远端 export() 传入的 Invoker 对象的 invoke() 方法。<br>
* 2. refer() 返回的 Invoker 由协议实现,协议通常需要在此 Invoker 中发送远程请求。<br>
* 3. 当 url 中有设置 check=false 时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* Destroy protocol: <br>
* 1. Cancel all services this protocol exports and refers <br>
* 2. Release all occupied resources, for example: connection, port, etc. <br>
* 3. Protocol can continue to export and refer new service even after it's destroyed.
*/
/**
* 释放协议:<br>
* 1. 取消该协议所有已经暴露和引用的服务。<br>
* 2. 释放协议所占用的所有资源,比如连接和端口。<br>
* 3. 协议在释放后,依然能暴露和引用新的服务。<br>
*/
void destroy();
}
实现中中比较重要的有ProtocolFilterWrapper
,RegistryProtocol
,DubboProtocol
,第一个是一个包装类,会创建filter的调用链,或者打到refistryProtocol实例中,第二个主要处理向注册中心注册的工作,主流程在第三个中。
dubboProtocol中有以下几步骤
- 创建dubboExporter,放入缓存的map中
- 处理本地存根的问题
- 启动服务器
- 初始化序列化优化器
DubboExporter内容比较简单,就是一个包装类,内部加了一个unExport()的方法
本地存根先不看
启动服务器,这里会创建一个ExchangeServer实例,然后内部创建Transport实例, 默认实例就是NettyTransport,内部是NettyServer
下面就是netty的通信流程类, 具体的业务接口是channelHandler,真正做业务的实现类还是在DubboProtocol里,其余实现类基本上都是装饰器作用,例如多线程的处理器(ExecutionChannelHandler),多消息处理器(MutiMessageHandler)等等。
nettyServer
可以看到服务暴露最后到了NettyServer(不一定,这个是默认实现),前面说到了,真正的业务处理只有一处,我们详细看下DubboProtocol业务处理方法
核心方法是reply()方法,
它的invoker就是具体的逻辑,找到对应的exporter,然后拿到具体实现类的代理类invoker,调用方法,获取结果,通过netty返回相应。
这部分是服务引用到服务端之后的操作。
服务引用
服务引用的入口在ReferenceConfig#get()
,进入后检查是否有引用ref,如果没有进行初始化init()
init内部,首先还是一些读取配置和校验的工作,然后到ref = createProxy(map)
,创建引用
方法内部主要进行url的判断和读取设置,这个url可能是直连地址的,也可以是注册中心的,然后根据获取的url拿到Invoker,最后到达proxyFactory.getProxy(invoker),创建代理先不说,说拿到Invoker
Invoker的获取在这一行
refprotocol.refer()方法,refprotocol还是接口Protocol的实现类
和服务暴露类似,还是经过
ProtocolFilterWrapper,
RegistryProtocol后到达DubboProtocol,
在
DubboProtocol#refer`方法中,创建了一个DubboInvoker,在这个Invoker中的doInvoke进行真正的调用
doInvoke方法中,区分三种调用,oneway,同步,异步,根据操作都是根据传入的client实例调用request方法
看一下client的实例,回到DubboProtocol类,找到getClient方法,然后根据是否使用共享客户端选择进入getShardClient方法或者直接生成client
看一下getShardClient方法,首先在一个缓存map中寻找,找到了并且没有关闭则增加client引用计数然后返沪,关闭了则移出该client,然后走下去,下面就是加锁创建client
然后看一下直接生成client,该方法会创建一个ExchangeClient实例,该实例底层还是获取一个Exchanger接口的实例。
然后获得Transport的实例,nettyClient,调用channelHandler的方法。
这个channelHandler经过DecoderHandler将请求编码,到达HeaderExchangeHandler,发送消息
Exchangers
这里着重说一下Exchanger接口,它有一个工厂类,是门面模式,两个主要方法bind&connect,如果是服务端创建ExchangerServer,就调用bind,客户端创建ExchangerClient,就调用connect。
而Exchangers内部呢,就会使用Exchanger的实现类HeaderExchanger,这个类内部呢,就会创建Transport实例,服务端就是nettyServer,客户端是nettyClient