【Java】Dubbo底层RPC模拟实现
模拟Dubbo底层RPC实现,Socket通信,实现参数回调功能。
之前也写过自己的RPC框架,就想着去了解一下Dubbo底层核心的RPC并模拟实现一下。发现Dubbo底层的RPC相比我之前更加完善,特别是【参数回调】这个我之前并没有的功能:
RPC为远程方法调用
故,我们要实现的是消费者远程调用服务器的方法。
整体思路:消费者发送要实现的方法信息及参数,服务器反射执行后再发送回消费者。
目录
1.注册中心
先看注册中心,其作用是保存与他连接的服务暴露者并存储服务暴露者信息。通过长连接连接服务暴露者,如果服务暴露者异常中断,注册中心则会从服务列表中移除该服务。本项目中使用zookeeper。
2.服务提供者
先从服务提供者来看,服务提供者先向注册中心注册,需要发送给注册中心他的服务名和服务地址(ip:port),方便消费者从注册中心获得服务,连接,调用。
此外,为了让消费者通过接口调用服务暴露者方法,服务暴露者本身需要存储 <接口, 接口实现类模板> 的hashMap,当消费者通过接口远程调用方法时,匹配到实现类,并反射执行。
2.1 上下文中心
上下文中心的作用是保存隐式参数。
何为隐式参数?举个例子来说就是消费者远程调用服务暴露者方法,想要额外传递的参数。这个参数不是该方法所需的参数。
这里用到了ThreadLocal:即,每个线程拥有它自己的LOCAL和SERVER,说白了就是各自的一个map,以键值对的方式存放该线程服务器和消费者的隐式参数。键:参数名 值:参数
/**
* 隐式上下文参数对象
*/
public class FYContext {
/**
* 客户端本地上下文
*/
private static ThreadLocal<FYContext> LOCAL = new ThreadLocal<FYContext>();
/**
* 服务端上下文
*/
private static ThreadLocal<FYContext> SERVER = new ThreadLocal<FYContext>();
/**
* 隐式参数容器
*/
private final Map<String, String> attachment = new HashMap<String, String>();
/**
* 获得本地上下文对象
* @return
*/
public static FYContext getContext() {
FYContext fyContext = LOCAL.get();
if (fyContext == null) {
fyContext = new FYContext();
LOCAL.set(fyContext);
}
return fyContext;
}
/**
* 获得服务端上下文对象
* @return
*/
public static FYContext getServerContext() {
FYContext fyContext = SERVER.get();
if (fyContext == null) {
fyContext = new FYContext();
SERVER.set(fyContext);
}
return fyContext;
}
/**
* 清楚客户端上下文
*/
public void removeContext() {
LOCAL.remove();
}
/**
* 清除服务器上下文
*/
public void removeServerContext() {
SERVER.remove();
}
/**
* 存储需要传递的参数
* @param key 参数键
* @param value 参数值
*/
public void setAttachment(String key, String value) {
attachment.put(key, value);
}
/**
* @return 本地线程所有的隐方参数容器
*/
public Map<String, String> getAttachments() {
return attachment;
}
/**
* 获得容器参数值
* @param key
* @return
*/
public String getAttachment(String key) {
return attachment.get(key);
}
/**
* 接收所有的参数
* @param attachment 参数容器
*/
public void setAttachments(Map<String, String> attachment) {
this.attachment.putAll(attachment);
}
}
2.2 RPC服务端
rpc服务端就是服务暴露者,说说其需要实现的功能及实现逻辑。
1.向注册机注册服务
2.自身注册接口,创建SocketServer。
3.当有socket连接与之通信,接收需要的接口名,方法名,参数类型,参数值,隐士参数,反射执行,发送回消费者。(这是没有参数回调的情况)
4.当消费者传递来的参数是需要回调的,服务暴露者还需去和消费者“交流”,远程调用消费者的方法来“完善”参数,这里可能有点绕,一会下面有例子来说明。
下面为服务端代码:
/**
* RPC服务端
*/
public class Server {
/**
* zookeeper 服务注册根节点
*/
public static final String ZOOKEEPER_NOOD_FY = "/FY";
/**
* 服务本地注册容器
*/
private Map<String, Class> serverReigster = new HashMap<String, Class>();
/**
* 服务请求接收端
*/
private ServerSocket serverSocket;
/**
* 多线程容器
*/
private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 结果标记
*/
public final static int FLAG_RESULT = 0;
/**
* 远程回调标记
*/
public final static int FLAG_INVOKER = 1;
/**
* zk服务地址
*/
public final static String ZOOKEEPER_ADDRESS = "localhost";
/**
* zk服务端口
*/
public final static int port = 2181;
/**
* zk会话超时时间
*/
public final static int TIMEOUT = 1000 * 3;
/**
* 服务提供者地址
*/
public final static int SERVER_PORT = 10086;
/**
* 开启服务
* @param ip 暴漏地址
* @param port 暴露端口
*/
public void start(String ip, int port) throws IOException {
// 服务端启动配置
serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
serverSocket.setReceiveBufferSize(1024 * 10);
serverSocket.bind(new InetSocketAddress(ip, port));
// 接收客户端
Socket client;
// 循环处理所有接收的客户端
while ((client = serverSocket.accept()) != null) {
// 临时变量
Socket finalClient = client;
// 多线程处理
executorService.execute(new Runnable() {
@Override
public void run() {
ObjectInputStream objectInputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
objectInputStream = new ObjectInputStream(finalClient.getInputStream());
objectOutputStream = new ObjectOutputStream(finalClient.getOutputStream());
// 开始处理协议
while (true) {
// 如有中断则跳出
if (Thread.currentThread().isInterrupted()) {
break;
}
// 接口名
String className = objectInputStream.readUTF();
// 方法名
String methodName = objectInputStream.readUTF();
// 参数类型
Class[] params = (Class[]) objectInputStream.readObject();
// 参数值
Object[] values = (Object[]) objectInputStream.readObject();
// 隐式参数
Map<String, String> attachements = (Map<String, String>) objectInputStream.readObject();
// 设置客户端传入的隐式参数值
FYContext.getContext().setAttachments(attachements);
/**
* 设置参数回调采用参数标志方式
* 参数.callback.1
*/
for (int i = 0; i < values.length; i++) {
// 判断如果为回调参数,则采用代理
if (values[i] instanceof String && values[i].equals(params[i] + ".callback." + i)) {
// 将字符串参数回调值转换成代理
values[i] = BeanProxy.getBean(params[i], objectInputStream, objectOutputStream, i);
}
}
// 获得接口实现类模板
Class implementClass = serverReigster.get(className);
// 获得类模板实现类实体
Object instance = implementClass.newInstance();
// 根据协议解析反射回调方法
Method method = implementClass.getMethod(methodName, params);
// 获取服务端远程调用结果
Object result = method.invoke(instance, values);
// 输出最终结果标记
objectOutputStream.writeInt(FLAG_RESULT);
// 将最终结果输给客户端
objectOutputStream.writeObject(result);
// 发送服务端上下文容器参数
objectOutputStream.writeObject(FYContext.getServerContext().getAttachments());
/**
* 清理上下文
* 避免多签名方法导致上下文参数不一致
*/
FYContext.getContext().removeContext();
FYContext.getServerContext().removeServerContext();
}
// 关闭输出流
objectOutputStream.close();
// 关闭输入流
objectInputStream.close();
} catch (Exception e) {
System.out.println("客户端已关闭");
e.printStackTrace();
} finally {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
}
}
/**
* 本地服务注册
* @param className 注册接口名
* @param implementClass 注册接口实现类
*/
public void register(String className, Class implementClass) {
this.serverReigster.put(className, implementClass);
}
/**
* 服务关闭
*/
public void stop() throws IOException {
serverSocket.close();
}
}
2.3 代理工厂
消费者通过接口远程调用服务暴露者的方法当然是用到代理,代理执行对应方法的时候才远程连接Socket通信,同样,服务暴露者回调消费者方法也要用到代理。
所以参数回调是二者代理的“交流”过程。不需要参数回调的rpc方法,也不需要服务暴露者的代理对象去远程调用消费者。
二者的到代理对象的唯一不同点是,消费者创建代理对象只需要接口和服务器地址,服务暴露者则额外需要回调的参数的下标,便于消费者对应到参数类型并反射。
/**
* 创建bean代理工厂
*/
public class BeanProxy {
public static <T> T getBean(Class interfaceClass, ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream, Integer paramIndex) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvokerInvocationHandler(objectInputStream, objectOutputStream, interfaceClass, paramIndex));
}
public static <T> T getBean(Class interfaceClass, ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvokerInvocationHandler(objectInputStream, objectOutputStream, interfaceClass));
}
public static <T> T getBean(Class interfaceClass, String address, int port) throws Exception {
// 创建客户端
Socket client = new Socket(address, port);
// 创建客户端接收输出流
ObjectOutputStream objectOutputStream = new ObjectOutputStream(client.getOutputStream());
// 创建客户端接受输入流
ObjectInputStream objectInputStream = new ObjectInputStream(client.getInputStream());
return getBean(interfaceClass, objectInputStream, objectOutputStream);
}
/**
* 反射请求处理方式
*
* 公共部分
*
* 客户端代理
* 服务端代理 全部使用公共部分
*/
private static class InvokerInvocationHandler implements InvocationHandler {
// 客户端接收服务端输入流
private ObjectInputStream objectInputStream = null;
// 客户端接收服务端输出流
private ObjectOutputStream objectOutputStream = null;
// 请求接口类
private Class interfaceClass = null;
// 参数回调索引位置
private Integer paramIndex = null;
/**
* 客户端代理使用构造方法
* 构造方法传参
* @param objectInputStream 输入流
* @param objectOutputStream 输出流
* @param interfaceClass 请求接口类
*/
public InvokerInvocationHandler(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream, Class interfaceClass) {
this.objectInputStream = objectInputStream;
this.objectOutputStream = objectOutputStream;
this.interfaceClass = interfaceClass;
}
/**
* 服务端代理使用构造方法
* 构造方法传参
* @param objectInputStream 输入流
* @param objectOutputStream 输出流
* @param interfaceClass 请求接口类
* @param paramIndex 参数回调索引位置
*/
public InvokerInvocationHandler(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream, Class interfaceClass, Integer paramIndex) {
this.objectInputStream = objectInputStream;
this.objectOutputStream = objectOutputStream;
this.interfaceClass = interfaceClass;
this.paramIndex = paramIndex;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 不为空则为服务端动态代理
if (paramIndex != null) {
// 服务端动态代理
// 输出回调标记
objectOutputStream.writeInt(Server.FLAG_INVOKER);
// 输出回调第几个标记
objectOutputStream.writeInt(paramIndex);
} else {
// 客户端动态代理
// 发送请求接口
objectOutputStream.writeUTF(interfaceClass.getName());
}
// 发送方法名
objectOutputStream.writeUTF(method.getName());
// 发送方法参数类型
Class<?>[] paramTypes = method.getParameterTypes();
objectOutputStream.writeObject(paramTypes);
// 发送方法参数值
Map<Integer, Object> callbackMapper = new HashMap<>();
// 循环替换所有的参数回调的值
for (int i = 0; i < paramTypes.length; i++) {
// 获取请求参数类型
Class<?> instanceClass = args[i].getClass();
if (Arrays.stream(instanceClass.getInterfaces()).anyMatch(aClass -> aClass.isAssignableFrom(IOrder.BeanListener.class))) {
callbackMapper.put(i, args[i]);
args[i] = paramTypes[i] + ".callback." + i;
}
}
objectOutputStream.writeObject(args);
// 非服务端代理--客户端代理
if (paramIndex == null) {
// 发送隐式参数
objectOutputStream.writeObject(FYContext.getContext().getAttachments());
// 清理数据
FYContext.getServerContext().removeServerContext();
FYContext.getContext().removeContext();
} else {
return objectInputStream.readObject();
}
// 默认获得结果标记
int flag = Server.FLAG_RESULT;
Object result = null;
do {
// 读取标记
flag = objectInputStream.readInt();
// 如果等于回调标记
if (flag == Server.FLAG_INVOKER) {
// 回调第几个参数
int callIndex = objectInputStream.readInt();
// 调用方法
String callMethod = objectInputStream.readUTF();
// 参数
Class[] params = (Class<?>[]) objectInputStream.readObject();
// 参数值
Object[] values = (Object[]) objectInputStream.readObject();
// 获得回调对象
Object instance = callbackMapper.get(callIndex);
// 获得回调对象类型
Class<?> aClass = instance.getClass();
// 获得回调方法
Method aMethod = aClass.getMethod(callMethod, params);
// 执行回调方法 输出结果
objectOutputStream.writeObject(aMethod.invoke(instance, values));
}
if (flag == Server.FLAG_RESULT) {
result = objectInputStream.readObject();
FYContext.getServerContext().setAttachments((Map<String, String>) objectInputStream.readObject());
}
} while (flag == Server.FLAG_INVOKER);
return result;
}
@Override
protected void finalize() throws Throwable {
objectOutputStream.close();
objectInputStream.close();
}
}
}
3.测试
3.1 服务暴露者
注册服务,注册接口,启动等待消费者请求。
/**
* 启动流程
* @param args
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/**
* zk服务注册与发现 容器
* 通过长连接与服务暴露者 与 服务监听者 保持一致
*
* 实现原理:
* 服务提供者发生异常中断,TCP层会发送中断信号给zk,注册中心收到信号,则移除临时结点,然后通知
* 服务监听者
*
* 服务挂掉是通过结点存在状态进行判断
*/
// 设置递减锁
CountDownLatch countDownLatch = new CountDownLatch(1);
// zookeeper客户端连接
System.out.println("开始连接zookeeper!");
ZooKeeper zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, TIMEOUT, new Watcher() {
public void process(WatchedEvent watchedEvent) {
// 服务连接成功
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
});
// 等待与zk连接成功
countDownLatch.await();
System.out.println("zookeeper 连接成功!");
// 注册中心注册业务根节点
if (zooKeeper.exists(Server.ZOOKEEPER_NOOD_FY, false) == null) {
zooKeeper.create(Server.ZOOKEEPER_NOOD_FY, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 服务提供者名称
String serverNodeName = "fy1";
// 服务提供者地址
String serverNodeAddress = "localhost" + ":" + SERVER_PORT;
// 服务提供者zk目录
String tempServerNodeName = Server.ZOOKEEPER_NOOD_FY.concat("/").concat(serverNodeName);
// 判断服务提供结点已经存在
if (zooKeeper.exists(tempServerNodeName, true) != null) {
// 删除 -1为最新版本
zooKeeper.delete(tempServerNodeName, -1);
}
// 创建最新服务结点数据
zooKeeper.create(tempServerNodeName, serverNodeAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 创建服务容器
Server server = new Server();
// 注册本地服务
server.register(IOrder.class.getName(), IOrderImpl.class);
// 开启服务
System.out.println("开启服务!");
server.start("localhost", Server.SERVER_PORT);
}
3.2 服务接口
当然这个接口是消费者和服务暴露者都有的
判断是否参数回调就是判断参数类型是否为BeanListener接口,若是该接口才进行参数值替换,以便于服务暴露者发现
/**
* 定义订单接口
*/
public interface IOrder {
/**
* 根据订单编号查询订单实例
* @param id 订单编号
* @return 订单实例
*/
public Order selectById(Long id);
/**
* 根据订单编号异步获取订单
* @param id 订单编号
* @param listener 回调
*/
public void selectById(Long id, BeanListener listener);
/**
* 回调***
*/
public interface BeanListener {
public String beanNotify(Order order);
}
}
3.3 服务接口实现类
/**
* 订单提供者
*/
public class IOrderImpl implements IOrder {
/**
* 通过订单编号获取订单
* @param id 订单编号
* @return 订单
*/
@Override
public Order selectById(Long id) {
System.out.println("==============selectById=============");
System.out.println(FYContext.getContext().getAttachment("user"));
System.out.println(FYContext.getContext().getAttachment("password"));
FYContext.getServerContext().setAttachment("参数名", "想隐式传递给服务器的内容");
return new Order(id, "购买了XXXXXX");
}
/**
* 通过订单编号异步获取订单
* @param id 订单编号
* @param listener 回调
*/
@Override
public void selectById(Long id, BeanListener listener) {
System.out.println("==============selectById-notify=============");
System.out.println(FYContext.getContext().getAttachment("user"));
System.out.println(FYContext.getContext().getAttachment("password"));
Order order = new Order(id, "购买了XXXXXX-参数回调");
listener.beanNotify(order);
FYContext.getServerContext().setAttachment("参数名", "想隐式传递给服务器的内容");
}
}
3.4 订单实体类
/**
* 订单实体类
*/
public class Order implements Serializable {
/**
* 订单编号
*/
private long id;
/**
* 订单描述
*/
private String desc;
@Override
public String toString() {
return "Order{" +
"id=" + id +
", desc='" + desc + '\'' +
'}';
}
public Order() {
}
public Order(long id, String desc) {
this.id = id;
this.desc = desc;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
3.5 消费者示例
public class Client {
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181",30*1000, event -> {
if (event.getState()== Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
});
countDownLatch.await();
List<String> children = zooKeeper.getChildren(Server.ZOOKEEPER_NOOD_FY, false);
Long i = new Long(5);
//获得所有的临时子节点
for (String child : children) {
//获得子节点上上的统计数据
Stat stat = new Stat();
//获得节点数据
byte[] data = zooKeeper.getData(Server.ZOOKEEPER_NOOD_FY + "/" + child, false, stat);
//字节转字符串
String value = new String(data);
//分割内容
String[] split = value.split(":");
//远程调用服务RPC
IOrder orderImplement = BeanProxy.getBean(IOrder.class, split[0], Integer.parseInt(split[1]));
FYContext.getContext().setAttachment("user","zhangsan");
FYContext.getContext().setAttachment("password","123456");
//调用返回结果
System.out.println("-----------------rpc调用---------------------");
System.out.println(orderImplement.selectById(i++));
System.out.println("---------------rpc调用完成-------------------");
System.out.println();
//异步回调
FYContext.getContext().setAttachment("user","zhangsan");
FYContext.getContext().setAttachment("password","123456");
System.out.println("-----------------异步回调--------------------");
orderImplement.selectById(i, new IOrder.BeanListener() {
@Override
public String beanNotify(Order order) {
System.out.println("回调!!!");
System.out.println(order);
return "回调执行结果";
}
});
System.out.println("--------------异步回调完成-------------------");
}
}
}
3.6 运行结果
服务暴露者
服务暴露者输出了隐式参数。
消费者