【Java】手写实现简单的微服务--RPC RMI 框架

目录

什么是RPC和RMI?

个人简单实现思路:​

RPC服务器端启动前的准备工作:

1.RpcBeanDefinition  

2.RpcBeanFactory

3.RpcBeanRegistry

RPC服务器端:

1.RpcServer

2.RpcServerExecutor

RPC客户端:

1.RpcClient

2.RpcClientExecutor

改进,优化,深入措施:


什么是RPC和RMI?

RPC :Remote Procedure Call,远程过程调用

RMI:Remote Method Invocation,远程方法调用

RPC和RMI是有区别的:可以这么理解,RMI就是面向对象的RPC,即,RMI就是用Java实现的RPC

其目的是:利用网络通信,客户端远程调用服务器的方法

其核心思想是:一个端可以通过调用另一个端的方法,实现相关功能。一个端“执行”一个方法,而这个方法的实际执行是在另一端进行的!


个人简单实现思路:

连接方式:TCP  短连接 

上图已经很完整的表现了我的实现思路:

建立服务器的时候,首先要进行的就是包扫描,扫描出可以供远程调用的类,把类的方法“注册”,形成一个个的键值对,当客户端远程调用我服务器的方法的时候,根据其发送过来的“键”,找到对应的方法,再结合其发送过来的参数,利用反射机制执行方法,并由服务器将执行结果发送回客户端,客户端收到结果,断开和服务器的连接。实现短连接的一次远程调用。

实现思路很简单,但在完成代码的时候也踩了不少坑。

当然,还有许多问题要结合代码来说明,所以,上代码:


RPC服务器端启动前的准备工作:

1.RpcBeanDefinition  

因为需要根据客户端发送过来的“消息”来执行方法,所以就必须要把类,对象,方法“捆绑”在一起

	private Class<?> klass;
	private Method method;
	private Object object;

	RpcBeanDefinition() {
	}

	RpcBeanDefinition(Class<?> klass, Method method, Object object) {
		this.klass = klass;
		this.method = method;
		this.object = object;
	}

 以上就是该类的一些主要内容,剩下还有get和set方法,就不多说了

2.RpcBeanFactory

我希望每一个Definition都有其唯一的“号码”,客户端只用发送给我服务器他想要调用方法“号码”,我服务器就能通过收到“号码”来找到这个方法,进而执行,多方便。

所以RpcBeanFactory应运而生,他来负责“号码”为键和RpcBeanDefinition为值的键值对集合——rpcBeanMap

	private final Map<String, RpcBeanDefinition> rpcBeanMap;

	public RpcBeanFactory() {
		rpcBeanMap = new HashMap<>();
	}
	
	/**
	 * 往rpcBeanMap中增加键值对,就是注册方法<br>
	 * 若已经存在该“号码”,则直接返回
	 * @param rpcBeanId
	 * @param rpcBeanDefinition
	 */
	void rpcBeanRegistry(String rpcBeanId, RpcBeanDefinition rpcBeanDefinition) {
		RpcBeanDefinition tmp = getRpcBean(rpcBeanId);
		if (tmp != null) {
			return;
		}
		rpcBeanMap.put(rpcBeanId, rpcBeanDefinition);
	}

	/**
	 * 从rpcBeanMap中获取“值”
	 * @param rpcBeanId
	 * @return
	 */
	RpcBeanDefinition getRpcBean(String rpcBeanId) {
		return rpcBeanMap.get(rpcBeanId);
	}

3.RpcBeanRegistry

这个类的作用在于“注册”rpcBean,就是来生成“号码” rpcBeanId 和 rpcBeanDefination 的!

这个类的主要方法如下:

	/**
	 * 注册:生成 rpcBeanId 和 rpcBeanDefinition <br>
	 * 并将其加入到rpcbeanFactory中的map里
	 * @param rpcBeanFactory
	 * @param klass
	 * @param object
	 */
	private static void doRegist(RpcBeanFactory rpcBeanFactory, Class<?> klass, Object object) {
		Method[] methods = klass.getDeclaredMethods();
		for (Method method : methods) {
			RpcBeanDefinition rpcBeanDefinition = new RpcBeanDefinition(klass, method, object);
			String rpcBeanId = String.valueOf(method.toString().hashCode());
			rpcBeanFactory.rpcBeanRegistry(rpcBeanId, rpcBeanDefinition);
		}
	}

 说说第一个参数:rpcBeanFactory,因为我们只是在服务器启动前扫描注册他,因此,我们要保证其只有一份!

这个方法,通过扫描得到的类,遍历其方法,生成每个方法的唯一“号码”rpcBeanId 和 rpcBeanDefinition,并将其注册进map中

这里的id等于method.toString().hashCode(),经过实验,可以确保其是唯一的。这是最核心的私有方法。

看到这里,想到了一个问题:如果客户端传过来的类是接口呢?又要如何处理接口?为保证id的唯一性,我们一定要知道该接口的实现类。

来,看看该类的其他方法:

	/**
	 * 若该类不是接口
	 * @param rpcBeanFactory
	 * @param klass       类
	 */
	static void registClass(RpcBeanFactory rpcBeanFactory, Class<?> klass) {
		try {
			doRegist(rpcBeanFactory, klass, klass.newInstance());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 若该类是接口
	 * @param rpcBeanFactory
	 * @param interfaces  接口
	 * @param klass       实现类
	 */
	static void registInterface(RpcBeanFactory rpcBeanFactory, Class<?> interfaces, Class<?> klass) {
		if (!interfaces.isAssignableFrom(klass)) {
			return;
		}
		try {
			doRegist(rpcBeanFactory, interfaces, klass.newInstance());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	/**
	 * 重载方法
	 * @param rpcBeanFactory
	 * @param interfaces     接口
	 * @param object         实现类的对象
	 */
	static void registInterface(RpcBeanFactory rpcBeanFactory, Class<?> interfaces, Object object) {
		if (!interfaces.isAssignableFrom(object.getClass())) {
			return;
		}
		doRegist(rpcBeanFactory, interfaces, object);
	}

当要注册的类为接口,就一定要知道他的实现类或实现类对象!

当然,判断其是否为接口不是咱们这层应该干的事,由包扫描是判断在分别注册! 


RPC服务器端:

1.RpcServer

这个类是对外提供的一个类,他实现了Runnable接口,因为作为服务器,我一定要有线程来时刻侦听客户端的连接请求,当然开始,停止,以及包扫描工作必不可少。

接着上面的注册,先看包扫描方法

	/**
	 * 包扫描:<br>
	 * 扫描包路径下所有带有Rpc注解的类<br>
	 * 将其method注册<br>
	 * 该方法即客户端可以远程调用的方法
	 * @param packageName
	 */
	public void scanRpcBean(String packageName) {
		new PackageScanner() {	
			@Override
			public void dealClass(Class<?> klass) {
				if (!klass.isAnnotationPresent(Rpc.class)) {
					return;
				}
				Class<?>[] interfaces = klass.getInterfaces();
//				if (interfaces.length - 1 > 0) {
//					for (Class<?> interfase : interfaces) {
//						RpcBeanRegistry.registInterface(rpcBeanFactory, interfase, klass);
//					}
//				} else {
//					RpcBeanRegistry.registClass(rpcBeanFactory, klass);
//				}
				int length = interfaces.length;
				if (length > 0) {
					for (int index = 0; index < length; index++) {
						if (!interfaces[index].equals(Serializable.class)) {
							RpcBeanRegistry.registInterface(rpcBeanFactory, interfaces[index], klass);
						}
					}
				}
				RpcBeanRegistry.registClass(rpcBeanFactory, klass);			
			}
		}.packageScanner(packageName);
	}

包扫描工具传送门>包扫描<

根据包扫描工具。我这里处理的类是过滤了枚举,接口,注解和八大基本类型的

首先,我先判断他是否带有Rpc注解,我只允许客户端远程调用我允许他调用的类。

再判断其接口数量,要知道,要被客户端远程调用的类一定要有其序列号,即,实现 Serializable 接口。如果他还实现了别的接口,那他的接口数一定是大于1的,

写博客的时候发现了自己的一个漏洞,就是在敲上面那句话的时候,代码注释掉的是我以前的做法,遗漏了有些类不用实现Serializable也有序列号的问题,以前做法还没有把本身类注册,唉,还是年轻。

这次先判断接口数量,不管多少我都是要把该类注册进去,遍历接口数组,若是Serializable我不处理,其他接口才处理,完成!

看看这个类还干了什么?让我们启动服务器

	private ServerSocket server;
	private int serverPort;
	private boolean goon;
	private static long count;
	private RpcBeanFactory rpcBeanFactory;

	public RpcServer() {
		this.goon = false;
		rpcBeanFactory = new RpcBeanFactory();
	}

	/**
	 * 启动Rpc服务器<br>
	 * 若未设置服务器端口则抛出异常<br>
	 * 执行侦听服务器连接线程
	 * @throws Exception
	 */
	public void startupRpcServer() throws Exception {
		if (serverPort == 0) {
			throw new RpcServerPortIsNotSetted("RpcSever端口未设置");
		}
		this.server = new ServerSocket(serverPort);
		goon = true;
		new Thread(this,"RPC-Server").start();
	}

	public void setServerPort(int serverPort) {
		this.serverPort = serverPort;
	}

创建服务器对象的时候,new 出了 RpcBeanFactory,保证factory只有一份!

启动服务器即启动侦听客户端连接的线程

	@Override
	public void run() {
		while (goon) {
			try {
				Socket socket = server.accept();
				new RpcServerExecutor(socket, rpcBeanFactory, ++count);
			} catch (Exception e) {
				goon = false;
				e.printStackTrace();
			}
		}
		stop();
	}

每侦听到一个客户端连接,就要new 一个 RpcServerExecutor 来进行和客户端的交流。出现异常时,线程就要停止,并且关闭服务器

	public void stop() {
		if (server != null && !server.isClosed()) {
			try {
				server.close();
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				server = null;
			}
		}
	}

2.RpcServerExecutor

这个类负责与侦听到的客户端交流,即,处理客户端的远程方法调用的请求

	private Socket socket;
	private ObjectInputStream ois;
	private ObjectOutputStream oos;
	private RpcBeanFactory rpcBeanFactory;
	
	RpcServerExecutor(Socket socket, RpcBeanFactory rpcBeanFactory, long threadId) throws Exception {
		this.socket = socket;
		this.rpcBeanFactory = rpcBeanFactory;
		this.oos = new ObjectOutputStream(socket.getOutputStream());
		this.ois = new ObjectInputStream(socket.getInputStream());
		new Thread(this, "RPC_EXECUTOR_" + threadId).start();
	}

传过来的参数有:侦听到的客户端,rpcBeanFactory,和线程编号,然后启动”交流“线程

	@Override
	public void run() {
		try {
			// 接收rpc客户端发来的id和参数
			String rpcBeanId = ois.readUTF();
			Object[] paras = (Object[]) ois.readObject();
			// 定位并执行方法
			RpcBeanDefinition rpcBeanDefinition = rpcBeanFactory.getRpcBean(rpcBeanId);
			Object object = rpcBeanDefinition.getObject();
			Method method = rpcBeanDefinition.getMethod();			
			Object result = method.invoke(object, paras);
			// 发送执行结果
			oos.writeObject(result);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			stop();
		}
	}

处理完客户端远程方法调用的请求后,就要断开连接

	void stop() {
	   if (socket != null && !socket.isClosed()) { 
			try {
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				socket = null;
			}
	   }
	   if (oos != null) { 
			try {
				oos.close();
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				oos = null;
			}
	   }
	   if (ois != null) { 
			try {
				ois.close();
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				ois = null;
			}
	   }
	}

服务器方面就说到这里,来看客户端


RPC客户端:

1.RpcClient

这层是开放给外面使用的,外界可以通过它获得代理对象

	private RpcClientExecutor rpcClientExecutor;

	public RpcClient(String rpcServerIp, int rpcServerPort) {
		this.rpcClientExecutor = new RpcClientExecutor(rpcServerIp, rpcServerPort);
	}

显而易见,每个客户端都应有自己的一个执行者rpcClientExecutor,由他来连接服务器,远程调用方法,这一层只需获得代理对象

	public <T> T getProxy(Class<?> klass) {
		T result = null;
		// 判断klass是否是接口,是则使用jdk代理方式
		if (!klass.isInterface()) {
			// cglib方式
			result = getCglibProxy(klass);
		} else {
			// jdk方式
			result = getJdkProxy(klass);
		}
		return result;
	}

客户端可以通过接口获取代理对象,合理!当klass为接口的时候,就必须要用Jdk方式来获取代理对象了

	/**
	 * 以Jdk方式获得代理
	 * @param klass
	 * @return
	 */
	@SuppressWarnings("unchecked")
	private <T> T getJdkProxy(Class<?> klass) {
		return (T) Proxy.newProxyInstance(
				klass.getClassLoader(), 
				new Class<?>[] {klass}, 
				new InvocationHandler() {		
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				// 根据要执行的方法获取Id
				String rpcBeanId = String.valueOf(method.toString().hashCode());
				// 远程调用方法
			 	Object result = rpcClientExecutor.rpcExecutor(rpcBeanId, args);
				return result;
			}
		});
	}
	
	/**
	 * 以Cglib方式获得代理
	 * @param klass
	 * @return
	 */
	@SuppressWarnings("unchecked")
	private <T> T getCglibProxy(Class<?> klass) {
		Enhancer enhancer = new Enhancer();
		enhancer.setSuperclass(klass);
		enhancer.setCallback(new MethodInterceptor() {		
			@Override
			public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
				String rpcBeanId = String.valueOf(method.toString().hashCode());
			 	Object result = rpcClientExecutor.rpcExecutor(rpcBeanId, args);
				return result;
			}
		});
		return (T) enhancer.create();
	}

现在的代理里面没有加***,毕竟这篇博客只是单纯的在说RPC,后期这里还应结合AOP和IOC

2.RpcClientExecutor

这个类来与服务器交互

	private String rpcServerIp;
	private int rpcServerPort;

	public RpcClientExecutor() {
	}

	public RpcClientExecutor(String rpcServerIp, int rpcServerPort) {
		this.rpcServerIp = rpcServerIp;
		this.rpcServerPort = rpcServerPort;
	}

传入Ip和Port来连接Rpc服务器

	public Object rpcExecutor(String rpcBeanId, Object[] paras) throws Exception {
			Socket socket = new Socket(rpcServerIp, rpcServerPort);
			ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
			// 向rpc服务器发送id和参数
			oos.writeUTF(rpcBeanId);
			oos.writeObject(paras);			
			// 收到服务器发来的结果
			ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
			Object result = ois.readObject();			
			// 断开连接
			stop(socket, oos, ois);			
			// 返回结果
			return result;
	}

连接Rpc服务器,发送给服务器Id和参数,由服务器来执行方法,并把结果发送回客户端,客户端收到结果后,断开与服务器的连接,一次远程方法调用结束


改进,优化,深入措施:

1.结合AOP IOC

2.在启动服务器,侦听到客户端连接后开启与客户端交流线程,这里应该使用线程池

3.ip和port应用配置文件配置化

4.在网络通讯过程中,有多个客户机,客户机即可做Rpc客户端,也可以做Rpc服务器,若要用一个服务器来”管理“他们,该服务器就相当于一个超级管理员,让他来给各个客户机,子服务器来分配port,来整体合作完成同一个任务,以达到”云“的目的。

全部评论

相关推荐

昨天 09:08
裁应届生,一分钱补偿没有,离职了还脑控你,跟踪你,定位你,丁东服务是搞系每一个人
牛客吹哨人:建议细说...哨哥晚点统一更新到黑名单:不要重蹈覆辙!25届毁意向毁约裁员黑名单https://www.nowcoder.com/discuss/1317104
叮咚买菜稳定性 9人发布 投递叮咚买菜等公司10个岗位 >
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务