【Java】Dubbo底层RPC模拟实现

模拟Dubbo底层RPC实现,Socket通信,实现参数回调功能。


之前也写过自己的RPC框架,就想着去了解一下Dubbo底层核心的RPC并模拟实现一下。发现Dubbo底层的RPC相比我之前更加完善,特别是【参数回调】这个我之前并没有的功能:

RPC为远程方法调用

故,我们要实现的是消费者远程调用服务器的方法。

整体思路:消费者发送要实现的方法信息及参数,服务器反射执行后再发送回消费者。


目录

1.注册中心

2.服务提供者

2.1 上下文中心

2.2 RPC服务端

2.3 代理工厂

3.测试

3.1 服务暴露者

3.2 服务接口

3.3 服务接口实现类

3.4 订单实体类

3.5 消费者示例

3.6 运行结果


1.注册中心

先看注册中心,其作用是保存与他连接的服务暴露者并存储服务暴露者信息。通过长连接连接服务暴露者,如果服务暴露者异常中断,注册中心则会从服务列表中移除该服务。本项目中使用zookeeper


2.服务提供者

先从服务提供者来看,服务提供者先向注册中心注册,需要发送给注册中心他的服务名服务地址(ip:port),方便消费者从注册中心获得服务,连接,调用。

此外,为了让消费者通过接口调用服务暴露者方法,服务暴露者本身需要存储 <接口, 接口实现类模板> hashMap,当消费者通过接口远程调用方法时,匹配到实现类,并反射执行。

2.1 上下文中心

上下文中心的作用是保存隐式参数

何为隐式参数?举个例子来说就是消费者远程调用服务暴露者方法,想要额外传递的参数。这个参数不是该方法所需的参数。

这里用到了ThreadLocal:即,每个线程拥有它自己的LOCAL和SERVER,说白了就是各自的一个map,以键值对的方式存放该线程服务器和消费者的隐式参数。键:参数名  值:参数

/**
 * 隐式上下文参数对象
 */
public class FYContext {

    /**
     * 客户端本地上下文
     */
    private static ThreadLocal<FYContext> LOCAL = new ThreadLocal<FYContext>();

    /**
     * 服务端上下文
     */
    private static ThreadLocal<FYContext> SERVER = new ThreadLocal<FYContext>();

    /**
     * 隐式参数容器
     */
    private final Map<String, String> attachment = new HashMap<String, String>();

    /**
     * 获得本地上下文对象
     * @return
     */
    public static FYContext getContext() {
        FYContext fyContext = LOCAL.get();
        if (fyContext == null) {
            fyContext = new FYContext();
            LOCAL.set(fyContext);
        }
        return fyContext;
    }

    /**
     * 获得服务端上下文对象
     * @return
     */
    public static FYContext getServerContext() {
        FYContext fyContext = SERVER.get();
        if (fyContext == null) {
            fyContext = new FYContext();
            SERVER.set(fyContext);
        }
        return fyContext;
    }

    /**
     * 清楚客户端上下文
     */
    public void removeContext() {
        LOCAL.remove();
    }

    /**
     * 清除服务器上下文
     */
    public void removeServerContext() {
        SERVER.remove();
    }

    /**
     * 存储需要传递的参数
     * @param key 参数键
     * @param value 参数值
     */
    public void setAttachment(String key, String value) {
        attachment.put(key, value);
    }

    /**
     * @return 本地线程所有的隐方参数容器
     */
    public Map<String, String> getAttachments() {
        return attachment;
    }

    /**
     * 获得容器参数值
     * @param key
     * @return
     */
    public String getAttachment(String key) {
        return attachment.get(key);
    }

    /**
     * 接收所有的参数
     * @param attachment 参数容器
     */
    public void setAttachments(Map<String, String> attachment) {
        this.attachment.putAll(attachment);
    }
}

2.2 RPC服务端

rpc服务端就是服务暴露者,说说其需要实现的功能及实现逻辑。

1.向注册机注册服务

2.自身注册接口,创建SocketServer。

3.当有socket连接与之通信,接收需要的接口名,方法名,参数类型,参数值,隐士参数,反射执行,发送回消费者。(这是没有参数回调的情况)

4.当消费者传递来的参数是需要回调的,服务暴露者还需去和消费者“交流”,远程调用消费者的方法来“完善”参数,这里可能有点绕,一会下面有例子来说明。

下面为服务端代码:

/**
 * RPC服务端
 */
public class Server {

    /**
     * zookeeper 服务注册根节点
     */
    public static final String ZOOKEEPER_NOOD_FY = "/FY";

    /**
     * 服务本地注册容器
     */
    private Map<String, Class> serverReigster = new HashMap<String, Class>();

    /**
     * 服务请求接收端
     */
    private ServerSocket serverSocket;

    /**
     * 多线程容器
     */
    private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    /**
     * 结果标记
     */
    public final static int FLAG_RESULT = 0;

    /**
     * 远程回调标记
     */
    public final static int FLAG_INVOKER = 1;

    /**
     * zk服务地址
     */
    public final static String ZOOKEEPER_ADDRESS = "localhost";

    /**
     * zk服务端口
     */
    public final static int port = 2181;

    /**
     * zk会话超时时间
     */
    public final static int TIMEOUT = 1000 * 3;

    /**
     * 服务提供者地址
     */
    public final static int SERVER_PORT = 10086;

    /**
     * 开启服务
     * @param ip 暴漏地址
     * @param port 暴露端口
     */
    public void start(String ip, int port) throws IOException {
        // 服务端启动配置
        serverSocket = new ServerSocket();
        serverSocket.setReuseAddress(true);
        serverSocket.setReceiveBufferSize(1024 * 10);
        serverSocket.bind(new InetSocketAddress(ip, port));
        // 接收客户端
        Socket client;

        // 循环处理所有接收的客户端
        while ((client = serverSocket.accept()) != null) {
            // 临时变量
            Socket finalClient = client;

            // 多线程处理
            executorService.execute(new Runnable() {

                @Override
                public void run() {
                    ObjectInputStream objectInputStream = null;
                    ObjectOutputStream objectOutputStream = null;

                    try {
                        objectInputStream = new ObjectInputStream(finalClient.getInputStream());
                        objectOutputStream = new ObjectOutputStream(finalClient.getOutputStream());

                        // 开始处理协议
                        while (true) {
                            // 如有中断则跳出
                            if (Thread.currentThread().isInterrupted()) {
                                break;
                            }
                            // 接口名
                            String className = objectInputStream.readUTF();
                            // 方法名
                            String methodName = objectInputStream.readUTF();
                            // 参数类型
                            Class[] params = (Class[]) objectInputStream.readObject();
                            // 参数值
                            Object[] values = (Object[]) objectInputStream.readObject();
                            // 隐式参数
                            Map<String, String> attachements = (Map<String, String>) objectInputStream.readObject();
                            // 设置客户端传入的隐式参数值
                            FYContext.getContext().setAttachments(attachements);

                            /**
                             * 设置参数回调采用参数标志方式
                             * 参数.callback.1
                             */
                            for (int i = 0; i < values.length; i++) {
                                // 判断如果为回调参数,则采用代理
                                if (values[i] instanceof String && values[i].equals(params[i] + ".callback." + i)) {
                                    // 将字符串参数回调值转换成代理
                                    values[i] = BeanProxy.getBean(params[i], objectInputStream, objectOutputStream, i);
                                }
                            }

                            // 获得接口实现类模板
                            Class implementClass = serverReigster.get(className);
                            // 获得类模板实现类实体
                            Object instance = implementClass.newInstance();
                            // 根据协议解析反射回调方法
                            Method method = implementClass.getMethod(methodName, params);
                            // 获取服务端远程调用结果
                            Object result = method.invoke(instance, values);

                            // 输出最终结果标记
                            objectOutputStream.writeInt(FLAG_RESULT);
                            // 将最终结果输给客户端
                            objectOutputStream.writeObject(result);
                            // 发送服务端上下文容器参数
                            objectOutputStream.writeObject(FYContext.getServerContext().getAttachments());

                            /**
                             * 清理上下文
                             * 避免多签名方法导致上下文参数不一致
                             */
                            FYContext.getContext().removeContext();
                            FYContext.getServerContext().removeServerContext();
                        }

                        // 关闭输出流
                        objectOutputStream.close();
                        // 关闭输入流
                        objectInputStream.close();

                    } catch (Exception e) {
                        System.out.println("客户端已关闭");
                        e.printStackTrace();
                    } finally {
                        if (objectInputStream != null) {
                            try {
                                objectInputStream.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (objectOutputStream != null) {
                            try {
                                objectOutputStream.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        }
    }

    /**
     * 本地服务注册
     * @param className 注册接口名
     * @param implementClass 注册接口实现类
     */
    public void register(String className, Class implementClass) {
        this.serverReigster.put(className, implementClass);
    }

    /**
     * 服务关闭
     */
    public void stop() throws IOException {
        serverSocket.close();
    }
}

2.3 代理工厂

消费者通过接口远程调用服务暴露者的方法当然是用到代理,代理执行对应方法的时候才远程连接Socket通信,同样,服务暴露者回调消费者方法也要用到代理。

所以参数回调二者代理的“交流”过程。不需要参数回调的rpc方法,也不需要服务暴露者的代理对象去远程调用消费者。

二者的到代理对象的唯一不同点是,消费者创建代理对象只需要接口和服务器地址,服务暴露者则额外需要回调的参数的下标,便于消费者对应到参数类型并反射。

/**
 * 创建bean代理工厂
 */
public class BeanProxy {

    public static <T> T getBean(Class interfaceClass, ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream, Integer paramIndex) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvokerInvocationHandler(objectInputStream, objectOutputStream, interfaceClass, paramIndex));
    }

    public static <T> T getBean(Class interfaceClass, ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvokerInvocationHandler(objectInputStream, objectOutputStream, interfaceClass));
    }

    public static <T> T getBean(Class interfaceClass, String address, int port) throws Exception {
        // 创建客户端
        Socket client = new Socket(address, port);
        // 创建客户端接收输出流
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(client.getOutputStream());
        // 创建客户端接受输入流
        ObjectInputStream objectInputStream = new ObjectInputStream(client.getInputStream());

        return getBean(interfaceClass, objectInputStream, objectOutputStream);
    }

    /**
     * 反射请求处理方式
     *
     * 公共部分
     *
     * 客户端代理
     * 服务端代理  全部使用公共部分
     */
    private static class InvokerInvocationHandler implements InvocationHandler {

        // 客户端接收服务端输入流
        private ObjectInputStream objectInputStream = null;
        // 客户端接收服务端输出流
        private ObjectOutputStream objectOutputStream = null;
        // 请求接口类
        private Class interfaceClass = null;
        // 参数回调索引位置
        private Integer paramIndex = null;

        /**
         * 客户端代理使用构造方法
         * 构造方法传参
         * @param objectInputStream 输入流
         * @param objectOutputStream 输出流
         * @param interfaceClass 请求接口类
         */
        public InvokerInvocationHandler(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream, Class interfaceClass) {
            this.objectInputStream = objectInputStream;
            this.objectOutputStream = objectOutputStream;
            this.interfaceClass = interfaceClass;
        }

        /**
         * 服务端代理使用构造方法
         * 构造方法传参
         * @param objectInputStream 输入流
         * @param objectOutputStream 输出流
         * @param interfaceClass 请求接口类
         * @param paramIndex 参数回调索引位置
         */
        public InvokerInvocationHandler(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream, Class interfaceClass, Integer paramIndex) {
            this.objectInputStream = objectInputStream;
            this.objectOutputStream = objectOutputStream;
            this.interfaceClass = interfaceClass;
            this.paramIndex = paramIndex;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 不为空则为服务端动态代理
            if (paramIndex != null) {
                // 服务端动态代理
                // 输出回调标记
                objectOutputStream.writeInt(Server.FLAG_INVOKER);
                // 输出回调第几个标记
                objectOutputStream.writeInt(paramIndex);
            } else {
                // 客户端动态代理
                // 发送请求接口
                objectOutputStream.writeUTF(interfaceClass.getName());
            }
            // 发送方法名
            objectOutputStream.writeUTF(method.getName());
            // 发送方法参数类型
            Class<?>[] paramTypes = method.getParameterTypes();
            objectOutputStream.writeObject(paramTypes);
            // 发送方法参数值
            Map<Integer, Object> callbackMapper = new HashMap<>();

            // 循环替换所有的参数回调的值
            for (int i = 0; i < paramTypes.length; i++) {
                // 获取请求参数类型
                Class<?> instanceClass = args[i].getClass();
                if (Arrays.stream(instanceClass.getInterfaces()).anyMatch(aClass -> aClass.isAssignableFrom(IOrder.BeanListener.class))) {
                    callbackMapper.put(i, args[i]);
                    args[i] = paramTypes[i] + ".callback." + i;
                }
            }

            objectOutputStream.writeObject(args);

            // 非服务端代理--客户端代理
            if (paramIndex == null) {
                // 发送隐式参数
                objectOutputStream.writeObject(FYContext.getContext().getAttachments());
                // 清理数据
                FYContext.getServerContext().removeServerContext();
                FYContext.getContext().removeContext();
            } else {
                return objectInputStream.readObject();
            }


            // 默认获得结果标记
            int flag = Server.FLAG_RESULT;
            Object result = null;
            do {
                // 读取标记
                flag = objectInputStream.readInt();
                // 如果等于回调标记
                if (flag == Server.FLAG_INVOKER) {
                    // 回调第几个参数
                    int callIndex = objectInputStream.readInt();
                    // 调用方法
                    String callMethod = objectInputStream.readUTF();
                    // 参数
                    Class[] params = (Class<?>[]) objectInputStream.readObject();
                    // 参数值
                    Object[] values = (Object[]) objectInputStream.readObject();

                    // 获得回调对象
                    Object instance = callbackMapper.get(callIndex);
                    // 获得回调对象类型
                    Class<?> aClass = instance.getClass();
                    // 获得回调方法
                    Method aMethod = aClass.getMethod(callMethod, params);
                    // 执行回调方法 输出结果
                    objectOutputStream.writeObject(aMethod.invoke(instance, values));

                }
                if (flag == Server.FLAG_RESULT) {
                    result = objectInputStream.readObject();
                    FYContext.getServerContext().setAttachments((Map<String, String>) objectInputStream.readObject());
                }
            } while (flag == Server.FLAG_INVOKER);

            return result;
        }

        @Override
        protected void finalize() throws Throwable {
            objectOutputStream.close();
            objectInputStream.close();
        }
    }
}

3.测试

3.1 服务暴露者

注册服务,注册接口,启动等待消费者请求。

    /**
     * 启动流程
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        /**
         * zk服务注册与发现 容器
         * 通过长连接与服务暴露者 与 服务监听者 保持一致
         *
         * 实现原理:
         *      服务提供者发生异常中断,TCP层会发送中断信号给zk,注册中心收到信号,则移除临时结点,然后通知
         *      服务监听者
         *
         *      服务挂掉是通过结点存在状态进行判断
         */

        // 设置递减锁
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // zookeeper客户端连接
        System.out.println("开始连接zookeeper!");
        ZooKeeper zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, TIMEOUT, new Watcher() {

            public void process(WatchedEvent watchedEvent) {
                // 服务连接成功
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
            }
        });
        // 等待与zk连接成功
        countDownLatch.await();
        System.out.println("zookeeper 连接成功!");

        // 注册中心注册业务根节点
        if (zooKeeper.exists(Server.ZOOKEEPER_NOOD_FY, false) == null) {
            zooKeeper.create(Server.ZOOKEEPER_NOOD_FY, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        // 服务提供者名称
        String serverNodeName = "fy1";
        // 服务提供者地址
        String serverNodeAddress = "localhost" + ":" + SERVER_PORT;

        // 服务提供者zk目录
        String tempServerNodeName = Server.ZOOKEEPER_NOOD_FY.concat("/").concat(serverNodeName);

        // 判断服务提供结点已经存在
        if (zooKeeper.exists(tempServerNodeName, true) != null) {
            // 删除  -1为最新版本
            zooKeeper.delete(tempServerNodeName, -1);
        }

        // 创建最新服务结点数据
        zooKeeper.create(tempServerNodeName, serverNodeAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        // 创建服务容器
        Server server = new Server();
        // 注册本地服务
        server.register(IOrder.class.getName(), IOrderImpl.class);
        // 开启服务
        System.out.println("开启服务!");
        server.start("localhost", Server.SERVER_PORT);
    }

3.2 服务接口

当然这个接口是消费者和服务暴露者都有的

判断是否参数回调就是判断参数类型是否为BeanListener接口,若是该接口才进行参数值替换,以便于服务暴露者发现

/**
 * 定义订单接口
 */
public interface IOrder {

    /**
     * 根据订单编号查询订单实例
     * @param id 订单编号
     * @return 订单实例
     */
    public Order selectById(Long id);

    /**
     * 根据订单编号异步获取订单
     * @param id 订单编号
     * @param listener 回调
     */
    public void selectById(Long id, BeanListener listener);

    /**
    * 回调***
    */
    public interface BeanListener {
        public String beanNotify(Order order);
    }
}

3.3 服务接口实现类

/**
 * 订单提供者
 */
public class IOrderImpl implements IOrder {

    /**
     * 通过订单编号获取订单
     * @param id 订单编号
     * @return 订单
     */
    @Override
    public Order selectById(Long id) {
        System.out.println("==============selectById=============");
        System.out.println(FYContext.getContext().getAttachment("user"));
        System.out.println(FYContext.getContext().getAttachment("password"));
        FYContext.getServerContext().setAttachment("参数名", "想隐式传递给服务器的内容");
        return new Order(id, "购买了XXXXXX");
    }

    /**
     * 通过订单编号异步获取订单
     * @param id 订单编号
     * @param listener 回调
     */
    @Override
    public void selectById(Long id, BeanListener listener) {
        System.out.println("==============selectById-notify=============");
        System.out.println(FYContext.getContext().getAttachment("user"));
        System.out.println(FYContext.getContext().getAttachment("password"));
        Order order = new Order(id, "购买了XXXXXX-参数回调");
        listener.beanNotify(order);
        FYContext.getServerContext().setAttachment("参数名", "想隐式传递给服务器的内容");
    }
}

3.4 订单实体类

/**
 * 订单实体类
 */
public class Order implements Serializable {
    /**
     * 订单编号
     */
    private long id;

    /**
     * 订单描述
     */
    private String desc;

    @Override
    public String toString() {
        return "Order{" +
                "id=" + id +
                ", desc='" + desc + '\'' +
                '}';
    }

    public Order() {
    }

    public Order(long id, String desc) {
        this.id = id;
        this.desc = desc;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }
}

3.5 消费者示例

public class Client {
    
    public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181",30*1000, event -> {
            if (event.getState()== Watcher.Event.KeeperState.SyncConnected) {
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        List<String> children = zooKeeper.getChildren(Server.ZOOKEEPER_NOOD_FY, false);
        Long i = new Long(5);
        //获得所有的临时子节点
        for (String child : children) {
            //获得子节点上上的统计数据
            Stat stat = new Stat();
            //获得节点数据
            byte[] data = zooKeeper.getData(Server.ZOOKEEPER_NOOD_FY + "/" + child, false, stat);
            //字节转字符串
            String value = new String(data);
            //分割内容
            String[] split = value.split(":");
            //远程调用服务RPC
            IOrder orderImplement = BeanProxy.getBean(IOrder.class, split[0], Integer.parseInt(split[1]));
            FYContext.getContext().setAttachment("user","zhangsan");
            FYContext.getContext().setAttachment("password","123456");
            //调用返回结果
            System.out.println("-----------------rpc调用---------------------");
            System.out.println(orderImplement.selectById(i++));
            System.out.println("---------------rpc调用完成-------------------");
            System.out.println();
            //异步回调
            FYContext.getContext().setAttachment("user","zhangsan");
            FYContext.getContext().setAttachment("password","123456");


            System.out.println("-----------------异步回调--------------------");
            orderImplement.selectById(i, new IOrder.BeanListener() {
                @Override
                public String beanNotify(Order order) {
                    System.out.println("回调!!!");
                    System.out.println(order);
                    return "回调执行结果";
                }
            });
            System.out.println("--------------异步回调完成-------------------");
        }
    }
}

3.6 运行结果

服务暴露者

服务暴露者输出了隐式参数。

消费者

全部评论

相关推荐

09-29 11:19
门头沟学院 Java
点赞 评论 收藏
分享
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务