资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 软件开发 > 后端开发 > Java

Dubbo源码篇03---点点直连如何实现及背后原理

Java 更新时间: 发布时间: 计算机考试归档 最新发布

Dubbo源码篇03---点点直连如何实现及背后原理

Dubbo源码篇03---从点点直连探究Complier编译的原理

  • 什么是点点直连
  • 实际需求
    • 如何实现动态编译?
    • 如何发起调用?
      • 点点直连原理
      • 实现点点直连
        • 消费端
        • 提供端
        • 测试
  • 点点直连小结

什么是点点直连

Dubbo正常的请求模型,都是消费端从注册中心拉取服务提供者列表,然后采用适当的负载均衡策略,挑选出一个服务提供者URL,随机发起请求。

但是,Dubbo也给我们提供了一种方式,可以在没有注册中心的时候,直接使用提前设置好的URL发起请求,或者在有注册的中心的时候,绕过注册中心,使用设置好的URL发起请求,这种方式也被称为点点直连。

那么点点直连在实际项目开发过程中,究竟有没有用处呢?

下面我们跟随着实际需求的视角,具体来看看吧。


实际需求


订单系统这边由于入库订单的状态异常,导致该笔订单消息及时无法推送到供应商系统,从而阻碍了该笔订单在供应商侧的功能运转。

为了争取最短时间内恢复这笔订单的功能运转,我们需要尽快修改这条推送记录在数据库的状态,此时我们可能会想到以下几个做法:

  1. 通过编写update语句直接修改线上那条出现问题的记录,但是通常一家公司中的数据订正流程会很繁琐,耗时较长,并非这里的最佳选择
  2. 利用Web服务器后台的日志,重放一遍用户的请求,但是问题在于并不是所有场景都能根据重放用户请求解决,需要根据具体业务场景进行抉择
  3. 在深入理解JVM第三版一书中曾介绍过使用java类加载器提供的热更新能力实现动态调试线上服务的实现,我们能否借鉴这一思路,编写调用DAO层完成记录状态更新的代码,然后通过暴露出来的调试接口,将代码上传,然后利用类加载器提供的热更新技术动态加载类,然后调用调试方法完成订单状态更新呢?

如何实现动态编译?

Java代码从编译到执行的流程如下所示:


开发者编写的“Java 源代码”被编译后变成 class 字节码文件,然后字节码文件被 JVM 加载,直到变成可使用的类。

在这样的开发过程中,动态编译一般有两种方式:

  • 自主编码实现,比如通过 Runtime 调用 javac,或者通过 JavaCompile 调用 run。
  • 调用插件实现,比如使用市面上常用的 groovy-all.jar 插件。

关于热更新技术的原理可以阅读我之前写的两篇文章:

  • JAVA实现代码热更新
  • Groovy实现热部署

出于简单性考虑,本文使用groovy插件实现java代码的动态编译。


如何发起调用?

由于需要将用于修复的代码上传到生产环境的机器上执行,因此每一个生产环境服务都需要对外暴露一个接口,用于接收动态调试请求:

由于修复代码需要上传到生产环境执行,因此为了避免引发不必要的产线事故,我们一般会拿某台机器节点做个小范围的验证,也就是说,这里需要用到一开始讲到的点点直连技术。

那么下一个问题就来了,如何实现点点直连呢?


点点直连原理

Dubbo在ReferenceConfig的父类ReferenceConfigBase类中提供了一个名为Url的字段:

public abstract class ReferenceConfigBase extends AbstractReferenceConfig {        protected String url;    ....

那么该字段的构成规则是怎样的呢? 又是如何起的作用的呢?

下面我们来简单追踪一下url被使用到的地方:

  • 当消费者端服务启动时,会为指定的服务接口创建一个代理,创建代理需要用到的客户端配置参数由ReferenceConfig负责提供,因此创建代理的动作也是在ReferenceConfig内部的createProxy方法内完成的
   //ReferenceConfig   private T createProxy(Map referenceParameters) {        ...          // 是否配置了客户端用户点点直连的Url          if (StringUtils.isNotEmpty(url)) {                //如果消费者端配置了url属性,那么dubbo会认为该rul是一个点对点地址,或者是一个注册中心的地址                parseUrl(referenceParameters);          } else {                // dubbo走从注册中心拉取服务提供者url那套逻辑                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {                    aggregateUrlFromRegistry(referenceParameters);                }         }        createInvokerForRemote();        ...        URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,            referenceParameters.get(INTERFACE_KEY), referenceParameters);        consumerUrl = consumerUrl.setScopeModel(getScopeModel());        consumerUrl = consumerUrl.setServiceModel(consumerModel);        MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());        // create service proxy        return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));    }
  • referenceParameters保存了客户端各种配置

parseUrl方法负责解析用户配置的点对点直连URL:

    //ReferenceConfig    private void parseUrl(Map referenceParameters) {        //按照空格,;对消费者端设置的url进行切分,这里说明一个url属性中,我们可以通过空格或者;设置多个服务提供者的直连地址        //或者指定一个或者多个专属的注册中心地址        String[] us = SEMICOLON_SPLIT_PATTERN.split(url);        if (ArrayUtils.isNotEmpty(us)) {            for (String u : us) {                //解析当前url字符串,并解析为一个Dubbo提供的URL对象返回                URL url = URL.valueOf(u);                //url内部对象urlAddress对象的path属性--具体看下图                if (StringUtils.isEmpty(url.getPath())) {                    //大部分情况下我们不会指定path,因此一般path值默认为服务接口名                    url = url.setPath(interfaceName);                }                ...                //判断我们设置的是否是一个注册中心地址                if (UrlUtils.isRegistry(url)) {                    //添加进urls集合保存,并且使用REFER_KEY属性表明当前url代表的是注册中心地址                    urls.add(url.putAttribute(REFER_KEY, referenceParameters));                } else {                   //将referenceParameters集合中的参数以&的形式拼接在当前url后面,类比http的请求参数url的拼接方式                   //然后将拼接完整的url添加进urls集合                    URL peerUrl = getScopeModel().getApplicationModel().getBeanFactory().getBean(ClusterUtils.class).mergeUrl(url, referenceParameters);                    peerUrl = peerUrl.putAttribute(PEER_KEY, true);                    urls.add(peerUrl);                }            }        }    }
  • URL.valueOf方法负责解析当前url字符串,并解析为一个Dubbo提供的URL对象返回,格式如下:
  • url.getPath方法返回的是Url内部的urlAddress对象的path属性
    • 不指定path,默认被设置为服务接口名的情况
  • peerUrl拼接得到的结果
dubbo://127.0.0.1:80/dubbo.dubboSpi.HelloService?application=generic-call-consumer&async=true&background=false&generic=true&interface=dubbo.dubboSpi.HelloService¶m=value&pid=4600®ister.ip=192.168.18.131&side=consumer&sticky=false&timeout=7000

从parseUrl方法逻辑可知,dubbo会将客户端各种配置参数以类似http请求参数的url拼接方式组织起来。


createInvokerForRemote方法负责构造发起请求调用的Invoker对象:

    private void createInvokerForRemote() {       //此处我们的urls集合中的url只有一个,有多个逻辑这里跳过不看       //如果urls的长度为1,说明只有一个服务提供者,则直接通过protocolSPI.refer方法创建一个Invoker实例,       //如果这个服务提供者不是注册中心,则使用StaticDirectory对这个Invoker进行包装。       //StaticDirectory是Dubbo框架中的一个类,用于将一组Invoker封装成一个目录,以便消费者调用        if (urls.size() == 1) {            URL curUrl = urls.get(0);            //这里根据urlAddress内部的protocol属性作为key,通过dubbo的SPI机制寻找对应协议的实现类             //这里实际调用的是DubboProtocol的refer方法,因为我们这里urlAddress的protocol值为dubbo            invoker = protocolSPI.refer(interfaceClass, curUrl);            //如果当前url并非指代一个注册中心地址            if (!UrlUtils.isRegistry(curUrl)) {                List> invokers = new ArrayList<>();                invokers.add(invoker);                //默认情况下Cluster会通过Registry拿到一堆服务提供方的IP地址列表后,然后通过一定的路由和负载均衡策略决定具体选择调用哪一个Provider                      invoker = Cluster.getCluster(scopeModel, Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);            }        } else {           ...        }    }
  • dubbo的Adaptive动态适配机制会为ProtocolSPI类创建一个代理对象,代理对象的refer如下所示:(本系列还未讲到dubbo SPI原理部分,所以这部分大家先了解即可,感兴趣的也可以自行研究一下)
public class Protocol$Adaptive        implements Protocol {            public Invoker refer(Class clazz, URL uRL) throws RpcException {        String string;        if (uRL == null) {            throw new IllegalArgumentException("url == null");        }        string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();        if (string == null) {            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + uRL + ") use keys([protocol])");        }        ScopeModel scopeModel = ScopeModelUtil.getOrDefault(uRL.getScopeModel(), Protocol.class);        Protocol protocol = scopeModel.getExtensionLoader(Protocol.class).getExtension(string);        return protocol.refer(clazz, uRL);    }    ...

DubboProtocol的refer方法实现如下:

    @Override    public  Invoker refer(Class type, URL url) throws RpcException {        ...         return protocolBindingRefer(type, url);    }    public  Invoker protocolBindingRefer(Class serviceType, URL url) throws RpcException {        ...         // create rpc invoker.        // dubbo总共分为十层,各个层之间的交互主要是通过Inovker完成的,可以理解分层的实现是Invoker套Invoker        //这里只需要知道invoker的doInvoke方法中会完成本层应该做的逻辑        //例如这里DubboInvoker会在protocol层完成相关逻辑处理        DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);        ...        return invoker;    }    

实现点点直连

通过上面的分析可知,dubbo为我们在客户端配置中提供了一个url参数用来实现点点直连,url的构成规则为:

  • [protocol://][username:password@][host:port]/[path][?k1=v1&k2=v2]
dubbo://127.0.0.1:80/dubbo.dubboSpi.HelloService?application=generic-call-consumer&async=true&background=false&generic=true&interface=dubbo.dubboSpi.HelloService¶m=value&pid=4600®ister.ip=192.168.18.131&side=consumer&sticky=false&timeout=7000

可见dubbo的url 的构成规则,居然和 http 的构成规则如出一辙,那我们试着通过赋值 url 为dubbo://[机器IP结点]:[机器IP提供Dubbo服务的端口],应该就大功告成了。


准备一个页面,填入 5 个字段信息,接口类名、接口方法名、接口方法参数类名、指定的 URL 节点、修复问题的 Java 代码,然后将这 5 个字段通过 HTTP 请求发往 Web 服务器,Web 服务器接收到请求后组装泛化所需对象,最后通过泛化调用的形式完成功能修复。


消费端

  • 负责接收动态调试请求的控制器
@RestControllerpublic class DynamicDebugController {    private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";    @PostMapping("/gateway/dynamic/debug/request")    public Object repairRequest(@RequestBody DynamicDebugRequest dynamicDebugRequest) {        // 将入参的req转为下游方法的入参对象,并发起远程调用        return commonInvoke(dynamicDebugRequest);    }    private Object commonInvoke(DynamicDebugRequest dynamicDebugRequest) {        // 然后试图通过类信息对象想办法获取到该类对应的实例对象        ReferenceConfig referenceConfig =                createReferenceConfig(dynamicDebugRequest.getClassName(), dynamicDebugRequest.getUrl());        // 远程调用        GenericService genericService = referenceConfig.get();        return genericService.$invoke(                dynamicDebugRequest.getMtdName(),                new String[]{dynamicDebugRequest.getParameterTypeName()},                new Object[]{dynamicDebugRequest.getParamsMap()});    }    private static ReferenceConfig createReferenceConfig(String className, String url) {        DubboBootstrap dubboBootstrap = DubboBootstrap.getInstance();        // 设置应用服务名称        ApplicationConfig applicationConfig = new ApplicationConfig();        applicationConfig.setName(dubboBootstrap.getApplicationModel().getApplicationName());        // 设置注册中心的地址        RegistryConfig registryConfig = new RegistryConfig(zookeeperAddress);        ReferenceConfig referenceConfig = new ReferenceConfig<>();        referenceConfig.setApplication(applicationConfig);        referenceConfig.setRegistry(registryConfig);        referenceConfig.setInterface(className);        // 设置泛化调用形式        referenceConfig.setGeneric("true");        // 设置默认超时时间5秒        referenceConfig.setTimeout(5 * 1000);        // 设置点对点连接的地址        referenceConfig.setUrl(url);        return referenceConfig;    }}
  • 承载动态调试请求参数的对象
@Setter@Getterpublic class DynamicDebugRequest {        private String className;        private String mtdName;        private String parameterTypeName;        private String url;        private Map paramsMap;}

提供端

  • 服务提供者的启动类
public class Provider {    private static String zookeeperAddress = "zookeeper://" + System.getProperty("zookeeper.address", "127.0.0.1") + ":2181";    public static void main(String[] args) throws InterruptedException {        //启动内嵌的zk        new EmbeddedZooKeeper(2181, false).start();        //创建ApplicationConfig        ApplicationConfig applicationConfig = new ApplicationConfig();        applicationConfig.setName("dynamic-debug-service-provider");        //创建注册中心配置        RegistryConfig registryConfig = new RegistryConfig();        registryConfig.setAddress(zookeeperAddress);        //新建服务实现类,注意要使用GenericService接收        DynamicDebugService helloService = new DynamicDebugServiceImpl();        //创建服务相关配置        ServiceConfig service = new ServiceConfig<>();        service.setApplication(applicationConfig);        service.setRegistry(registryConfig);        service.setInterface(DynamicDebugService.class);        service.setRef(helloService);        service.export();        new CountDownLatch(1).await();    }}

官方文档Demo提供的EmbeddedZooKeeper类源码,大家copy到自己本地即可

  • 对外暴露的动态调试服务接口
public interface DynamicDebugService {        Object dynamicDebug(Map req);}
  • 服务接口的实现类
public class DynamicDebugServiceImpl implements DynamicDebugService {    private final GroovyClassLoader groovyClassLoader = new GroovyClassLoader();    @SneakyThrows    @Override    public Object dynamicDebug(Map req) {        // 编译 Java 代码,然后变成 JVM 可识别的 Class 对象信息        Class javaClass = compile(req.get("code"));        //和spring结合的扩展思路: 创建实例对象,并经过spring的后置处理        // Object bean = instantiationAndPostProcessBean(javaClass);        //这里没有和spring结合,直接简单实例化即可        Object bean = javaClass.newInstance();        if(!(bean instanceof Function)){            throw new IllegalArgumentException("动态java类并非Function类型");        }        Function, Object> function = (Function) bean;        // 执行单例对象的方法即可        return function.apply(req);    }        private Class compile(String javaCode) {        return groovyClassLoader.parseClass(javaCode);    }        private Object instantiationAndPostProcessBean(Class javaClass) {        return ((DefaultListableBeanFactory) SpringUtil.getBeanFactory()).createBean(javaClass);    }}

测试

测试,首先我们需要准备一个测试类:

public class TestJavaCode implements Function,String> {    @Override    public String apply(Map s) {        System.out.println("执行动态方法: "+s);        return "res: "+s;    }}

该类会作为请求参数传递给动态调试控制器,然后由动态调试控制器通过泛化调用,来调用服务端的动态调试服务接口,最终执行测试的apply方法。

  • 请求参数和请求结果
{    "className":"com.provider.one.DynamicDebugService",    "mtdName":"dynamicDebug",    "parameterTypeName":"java.util.Map",    "url":"dubbo://192.168.154.1:20880/com.provider.one.DynamicDebugService",    "paramsMap": {        "code": "package com.provider.one.code;import java.util.function.Function;public class TestJavaCode implements Function,String> {@Override public String apply(Map s) {System.out.println("执行动态方法: "+s);return "res: "+s;}}"    }}

点点直连小结

哪些应用场景需要用到点点直连呢?

  • 第一,修复生产环境突然Bug事件,通过直连 + 泛化 + 动态代码编译执行,可以轻松临时解决产线棘手的问题。
  • 第二,绕过注册中心直接联调测试,有些公司由于测试环境的复杂性,有时候不得不采用简单的直连方式,来快速联调测试验证功能。
  • 第三,检查服务存活状态,如果需要针对多台机器进行存活检查,那就需要循环调用所有服务的存活检查接口。

点点直连实现简单来说,分为如下几步:

  • 接口类名、接口方法名、接口方法参数类名、业务请求参数,四个维度的数据不能少。
  • 根据接口类名创建 ReferenceConfig 对象,设置 generic = true 、url = 协议 +IP+PORT 两个重要属性,调用 referenceConfig.get 拿到 genericService 泛化对象。
  • 传入接口方法名、接口方法参数类名、业务请求参数,调用 genericService.$invoke 方法拿到响应对象。
转载请注明:文章转载自 http://www.konglu.com/
本文地址:http://www.konglu.com/it/1097023.html
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【Dubbo源码篇03---点点直连如何实现及背后原理】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2