从源码全面解析 dubbo 服务订阅的来龙去脉
一、引言
对于 Java
开发者而言,关于 dubbo
,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。
但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。
本期 dubbo
源码解析系列文章,将带你领略 dubbo
源码的奥秘
本期源码文章吸收了之前 Spring
、Kakfa
、JUC
源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。
虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!
废话不多说,发车!
二、消费者订阅服务
我们当时留了一个 EnableDubboConfig
注解里面的 ReferenceAnnotationBeanPostProcessor
方法
1、消费端配置
@DubboReference(protocol = "dubbo", timeout = 100)
private IUserService iUserService;
从这个配置我们可以得出一个信息,Spring
不会自动将 IUserService
注入 Bean
工厂中
当然这句话也是一个废话,人家 Dubbo
自定义的注解,Spring
怎么可能扫描到......
而 ReferenceAnnotationBeanPostProcessor
这个方法是消费端扫描 @Reference
使用的
本篇将正式的介绍下消费端是如何订阅我们服务端注册在 Zookeeper
上的服务的
2、扫描注解
我们先看这个类的实现:
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor
implements ApplicationContextAware, BeanFactoryPostProcessor {}
实现了 BeanFactoryPostProcessor
接口,这个时候如果看过博主的 Spring
的源码系列文章,DNA
应该已经开始活动了
没错,基本上这个接口就是为了往我们的 BeanDefinitionMap
里面注册 BeanDefinition
信息的
想必到这里,就算我们不看源码,也能猜到
这个哥们绝对是将 @DubboReference
的注解扫描封装成 BeanDefinition
注册至 BeanDefinitionMap
的
我们直接看源码
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory){
// 拿到当前Spring工厂所有的bean名称
String[] beanNames = beanFactory.getBeanDefinitionNames();
for (String beanName : beanNames) {
// 获取bean的类型
beanType = beanFactory.getType(beanName);
// 省略一些代码
if (beanType != null) {
// 获取元数据信息
AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, beanType, null);
// 解析@DubboReference注解并注册至BeanDefinitionMap中
prepareInjection(metadata);
}
}
}
我们详细看看这个 prepareInjection
方法是如何解析的 @DubboReference
注解并做了一些什么特殊的操作
protected void prepareInjection(AnnotatedInjectionMetadata metadata) throws BeansException {
for (AnnotatedFieldElement fieldElement : metadata.getFieldElements()) {
Class<?> injectedType = fieldElement.field.getType();
// 配置的参数
AnnotationAttributes attributes = fieldElement.attributes;
// 解析&注册
String referenceBeanName = registerReferenceBean(fieldElement.getPropertyName(), injectedType, attributes, fieldElement.field);
}
}
}
这个 registerReferenceBean
里面的逻辑较多,我们只取最关键的,感兴趣的朋友也可以自己去看一看
public String registerReferenceBean(String propertyName, Class<?> injectedType, Map<String, Object> attributes, Member member){
RootBeanDefinition beanDefinition = new RootBeanDefinition();
// ReferenceBean.class.getName() = org.apache.dubbo.config.spring.ReferenceBean
beanDefinition.setBeanClassName(ReferenceBean.class.getName());
beanDefinition.getPropertyValues().add(ReferenceAttributes.ID, referenceBeanName);
// referenceProps = 配置的信息
// interfaceName = com.common.service.IUserService
// interfaceClass = interface com.common.service.IUserService
beanDefinition.setAttribute(Constants.REFERENCE_PROPS, attributes);
beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_CLASS, interfaceClass);
beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_NAME, interfaceName);
GenericBeanDefinition targetDefinition = new GenericBeanDefinition();
targetDefinition.setBeanClass(interfaceClass);
beanDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, referenceBeanName + "_decorated"));
beanDefinition.setAttribute(Constants.OBJECT_TYPE_ATTRIBUTE, interfaceClass);
// 注册至BeanDefinitionMap中
beanDefinitionRegistry.registerBeanDefinition(referenceBeanName, beanDefinition);
referenceBeanManager.registerReferenceKeyAndBeanName(referenceKey, referenceBeanName);
return referenceBeanName;
}
到这里,我们的 ReferenceAnnotationBeanPostProcessor
方法将 @DubboReference
扫描组装成 BeanDefinition
注册到了 BeanDefinitionMap
中
3、创建代理对象
在我们上面注册的时候,有这么一行代码:
beanDefinition.setBeanClassName(ReferenceBean.class.getName());
表明我们当前注册的 Bean
的 Class
类型为 org.apache.dubbo.config.spring.ReferenceBean
当我们的 Spring
去实例化 BeanDefinitionMap
中的对象时,这个时候会调用 ReferenceBean
的 getObject
方法
Spring 在实例化时会获取每一个
BeanDefinition
的 Object,不存在则创建
我们发现,在 ReferenceBean
里面实际上是重写了 getObject
的方法:
public T getObject() {
if (lazyProxy == null) {
createLazyProxy();
}
return (T) lazyProxy;
}
private void createLazyProxy() {
// 创建代理对象
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
proxyFactory.addInterface(interfaceClass);
Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
for (Class<?> anInterface : internalInterfaces) {
proxyFactory.addInterface(anInterface);
}
// 进行动态代理(生成动态代理的对象)
// 这里动态代理用的是JdkDynamicAopProxy
this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}
我们看下 JdkDynamicAopProxy
里面做了什么?
final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
// proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
// 这里的targetSource = DubboReferenceLazyInitTargetSource
TargetSource targetSource = this.advised.targetSource;
Object target = targetSource.getTarget()
}
public synchronized Object getTarget() throws Exception {
// 第一次为null,未初始化
if (this.lazyTarget == null) {
logger.debug("Initializing lazy target object");
this.lazyTarget = createObject();
}
return this.lazyTarget;
}
我们看下 DubboReferenceLazyInitTargetSource
的 createObject
// 第一次调用时,会初始化该方法
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
@Override
protected Object createObject() throws Exception {
return getCallProxy();
}
@Override
public synchronized Class<?> getTargetClass() {
return getInterfaceClass();
}
}
这个 getCallProxy
就是我们订阅服务的地方
4、订阅服务
当我们初始化完毕之后,在我们第一次调用的时候,会调用 getCallProxy()
该方法,去进行服务的订阅,这里会执行该方法:
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
@Override
protected Object createObject() throws Exception {
return getCallProxy();
}
@Override
public synchronized Class<?> getTargetClass() {
return getInterfaceClass();
}
}
private Object getCallProxy() throws Exception {
synchronized(((DefaultSingletonBeanRegistry)getBeanFactory()).getSingletonMutex()) {
// 获取reference
return referenceConfig.get();
}
}
我们继续向下看,这里会来到 ReferenceConfig
的 init
方法
protected synchronized void init() {
ref = createProxy(referenceParameters);
}
private T createProxy(Map<String, String> referenceParameters) {
// 1、监听注册中心
// 2、本地保存服务
createInvokerForRemote();
URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
referenceParameters.get(INTERFACE_KEY), referenceParameters);
consumerUrl = consumerUrl.setScopeModel(getScopeModel());
consumerUrl = consumerUrl.setServiceModel(consumerModel);
MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
// 创建代理类
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
4.1 监听注册中心
private void createInvokerForRemote() {
if (urls.size() == 1) {
// URL:
// registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=2532&qos.enable=true®istry=zookeeper&release=3.1.8×tamp=1686063555583
URL curUrl = urls.get(0);
invoker = protocolSPI.refer(interfaceClass, curUrl);
}
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=14952&qos.enable=true&release=3.1.8×tamp=1686063658064
url = getRegistryUrl(url);
Registry registry = getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
String group = qs.get(GROUP_KEY);
if (StringUtils.isNotEmpty(group)) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
}
}
Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url, qs);
}
这里的 QS
如下:
我们直接跳到 MigrationRuleHandler
的 doMigrate
中:
public synchronized void doMigrate(MigrationRule rule) {
MigrationStep step = MigrationStep.APPLICATION_FIRST;
float threshold = -1f;
step = rule.getStep(consumerURL);
threshold = rule.getThreshold(consumerURL);
if (refreshInvoker(step, threshold, rule)) {
setMigrationRule(rule);
}
}
在这个 refreshInvoker
里面,会判断当前注册的方式
private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
MigrationStep originStep = currentStep;
if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
boolean success = true;
switch (step) {
// 接口&应用
case APPLICATION_FIRST:
migrationInvoker.migrateToApplicationFirstInvoker(newRule);
break;
// 应用
case FORCE_APPLICATION:
success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
break;
// 接口
case FORCE_INTERFACE:
default:
success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
}
return success;
}
return true;
}
我们本次只讲 接口注册
,我们直接跳到:ZookeeperRegistry
的 doSubscribe
方法
public void doSubscribe(final URL url, final NotifyListener listener) {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
// 创建目录
zkClient.create(path, false, true);
// 增加监听
// 1、/dubbo/com.msb.common.service.IUserService/providers
// 2、/dubbo/com.msb.common.service.IUserService/configurators
// 3、/dubbo/com.msb.common.service.IUserService/routers
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 将Zookeeper服务保存
notify(url, listener, urls);
}
4.2 本地保存服务
直接跳到 AbstractRegistry
的 doSaveProperties
方法
- 创建文件
- 将服务端数据存入文件中
public void doSaveProperties(long version) {
File lockfile = null;
// 创建文件
lockfile = new File(file.getAbsolutePath() + ".lock");
tmpProperties = new Properties();
Set<Map.Entry<Object, Object>> entries = properties.entrySet();
for (Map.Entry<Object, Object> entry : entries) {
tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
tmpProperties.store(outputFile, "Dubbo Registry Cache");
}
}
这里存储的数据如下:简单理解,各种服务端的信息
com.common.service.IUserService -> empty://192.168.0.103/com.common.service.IUserService?application=dubbo-consumer&background=false&category=routers&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100×tamp=1686064969935&unloadClusterRelated=false empty://192.168.0.103/com.msb.common.service.IUserService?application=dubbo-consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100×tamp=1686064969935&unloadClusterRelated=false dubbo://192.168.0.103:20883/com.msb.common.service.IUserService?anyhost=true&application=dubbo-provider&background=false&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.msb.common.service.IUserService&metadata-type=remote&methods=getUserById®ister-mode=inter
如果后续我们的注册中心(Zookeeper)挂掉之后,我们的系统从本地磁盘读取服务信息也可以正常通信。
只是没有办法及时更新服务
4.3 创建动态代理类
private T createProxy(Map<String, String> referenceParameters) {
// 省略代码
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
这里我们直接跳到 JavassistProxyFactory
的 getProxy
方法
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
这里直接创建代理类,当我们去调用 InvokerInvocationHandler
这个方法
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
if (serviceModel instanceof ConsumerModel) {
rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
}
return InvocationUtil.invoke(invoker, rpcInvocation);
}
三、流程图
四、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。
其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。
我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。
我们下期再见。
#实习##面试##Java#我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。