【JAVA】如何基于Netty实现简单的RPC 框架
如何基于Netty实现简单的RPC 框架
1. 项目模块与依赖
common 模块依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>myRPC</artifactId> <groupId>com.sgg</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>common</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!--netty依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> <!--json依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.80</version> </dependency> <!--lombok依赖 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> </dependencies> </project>
rpc-client模块依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>myRPC</artifactId> <groupId>com.sgg</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rpc-client</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.sgg</groupId> <artifactId>common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
rpc-server 模块依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>myRPC</artifactId> <groupId>com.sgg</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rpc-server</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.sgg</groupId> <artifactId>common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <!--spring相关依赖 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> </dependencies> </project>
myRPC
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sgg</groupId> <artifactId>myRPC</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>common</module> <module>rpc-client</module> <module>rpc-server</module> </modules> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.4.RELEASE</version> </parent> </project>
2. common 通用模块
2.1 RpcRequest
package com.sgg.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author sz * @DATE 2022/5/6 21:54 */ @Data @AllArgsConstructor @NoArgsConstructor public class RpcRequest { /** * 全限定类名 */ private String className; /** * 方法名 */ private String methodName; /** * 参数类型 */ private Class<?>[] parameterTypes; /** * 实参 */ private Object[] paramters; }
2.2 RpcResponse
package com.sgg.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author sz * @DATE 2022/5/6 21:54 */ @Data @AllArgsConstructor @NoArgsConstructor public class RpcResponse { //返回状态码 private Integer code; //返回结果 private String result; //错误信息 private String error; }
2.3 User
package com.sgg.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author sz * @DATE 2022/5/6 21:55 */ @Data @AllArgsConstructor @NoArgsConstructor public class User { private Integer id; private String name; }
2.4 UserService
package com.sgg.service; import com.sgg.pojo.User; public interface UserService { User getUserById(Integer id); }
3. rpc-server 服务端模块
3.1 MyServiceRpc
package com.sgg.anno; import java.lang.annotation.*; @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MyServiceRpc { }
3.2 MyServerHandler
package com.sgg.handler; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.sgg.anno.MyServiceRpc; import com.sgg.pojo.RpcRequest; import com.sgg.pojo.RpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.springframework.beans.BeansException; import org.springframework.cglib.reflect.FastClass; import org.springframework.cglib.reflect.FastMethod; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.lang.reflect.InvocationTargetException; import java.util.*; /** * @author sz * @DATE 2022/5/6 22:16 */ @Component public class MyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware { private static ApplicationContext app; private static HashMap<String, Object> cache = new HashMap<>(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { app = applicationContext; //拿到容器中所有标注了@MyServiceRpc 注解的 bean Map<String, Object> beansWithAnnotation = app.getBeansWithAnnotation(MyServiceRpc.class); //拿到bean实现的接口的全限定类名 Set<Map.Entry<String, Object>> entries = beansWithAnnotation.entrySet(); entries.stream().forEach(ent->{ Class<?>[] interfaces = ent.getValue().getClass().getInterfaces(); if (null!=interfaces && interfaces.length != 0){ Arrays.stream(interfaces).forEach(inter->{ cache.put(inter.getName(),ent.getValue()); }); } }); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端连接 : "+ctx.channel().remoteAddress().toString().substring(1)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String json) throws Exception { //封装结果 RpcResponse rpcResponse = new RpcResponse(); Object result = null; try { //将json字符串转换为RpcRequest 对象 RpcRequest rpcRequest = JSONObject.parseObject(json, RpcRequest.class); //拿到需要调用的类 String className = rpcRequest.getClassName(); Object bean = cache.get(className); //需要调用的方法名 String methodName = rpcRequest.getMethodName(); //方法参数类型 Class<?>[] parameterTypes = rpcRequest.getParameterTypes(); //方法实参 Object[] paramters = rpcRequest.getParamters(); //反射调用方法 FastClass fastClass = FastClass.create(bean.getClass()); FastMethod fastClassMethod = fastClass.getMethod(methodName, parameterTypes); result = fastClassMethod.invoke(bean, paramters); rpcResponse.setCode(200); rpcResponse.setResult((String) result); } catch (Exception e) { e.printStackTrace(); rpcResponse.setCode(400); rpcResponse.setError(e.getMessage()); } //将结果用json字符串写回去 channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse)); } }
3.3 ServerProvider
package com.sgg.provider; import com.sgg.handler.MyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ServerChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import org.springframework.stereotype.Service; import java.io.Closeable; import java.io.IOException; import java.net.ServerSocket; /** * @author sz * @DATE 2022/5/6 22:07 */ @Service public class ServerProvider implements Closeable { private NioEventLoopGroup boss ; private NioEventLoopGroup work ; public void start(String ip,Integer port) { //创建两个线程组 boss = new NioEventLoopGroup(1); //默认线程数 = CPU数 * 2 work = new NioEventLoopGroup(); //创建启动组手 try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, work) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //解析字符串 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); //内容处理 pipeline.addLast(new MyServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync(); System.out.println(">>>>>>>服务器启动成功<<<<<<<<"); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); if (null!=boss){ boss.shutdownGracefully(); } if (null!=boss){ work.shutdownGracefully(); } } } @Override public void close() throws IOException { System.out.println("容器关闭我被调用了"); if (null!=boss){ boss.shutdownGracefully(); } if (null!=boss){ work.shutdownGracefully(); } } }
3.4 UserServiceImpl
package com.sgg.service.impl; import com.sgg.anno.MyServiceRpc; import com.sgg.pojo.User; import com.sgg.service.UserService; import org.springframework.stereotype.Service; import java.util.HashMap; /** * @author sz * @DATE 2022/5/6 22:18 */ @MyServiceRpc @Service public class UserServiceImpl implements UserService { private static HashMap<Integer,User> map = new HashMap(); static { map.put(1,new User(1,"张三")); map.put(2,new User(2,"李四")); } @Override public User getUserById(Integer id) { return map.get(id); } }
3.5 ServerApp
package com.sgg; import com.sgg.provider.ServerProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author sz * @DATE 2022/5/6 22:04 */ @SpringBootApplication public class ServerApp implements CommandLineRunner { @Autowired private ServerProvider serverProvider; public static void main(String[] args) { SpringApplication.run(ServerApp.class,args); } @Override public void run(String... args) throws Exception { new Thread(()->{ serverProvider.start("127.0.0.1",9999); }).start(); } }
4. rpc-client 客户端模块
4.1 RpcClient
package com.sgg.client; import com.sgg.handler.MyClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author sz * @DATE 2022/5/6 22:54 */ @Data @AllArgsConstructor @NoArgsConstructor public class RpcClient { private String ip; private Integer port; public RpcClient(String ip, Integer port) { this.ip = ip; this.port = port; init(); } private NioEventLoopGroup eventLoopGroup; private Channel channel; private MyClientHandler myClientHandler = new MyClientHandler(); private ExecutorService executorService = Executors.newCachedThreadPool(); public Object sendMess(String message) throws ExecutionException, InterruptedException { myClientHandler.setRequestMsg(message); Future submit = executorService.submit(myClientHandler); return submit.get(); } public void init() { //创建线程组 eventLoopGroup = new NioEventLoopGroup(); //创建启动组手 Bootstrap bootstrap = new Bootstrap(); //分组 try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); //业务 pipeline.addLast(myClientHandler); } }); channel = bootstrap.connect(ip, port).sync().channel(); } catch (Exception e) { e.printStackTrace(); if (null != channel) { channel.close(); } if (null != eventLoopGroup) { eventLoopGroup.shutdownGracefully(); } } } public void close() { if (null != channel) { channel.close(); } if (null != eventLoopGroup) { eventLoopGroup.shutdownGracefully(); } } }
4.2 MyClientHandler
package com.sgg.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.concurrent.Callable; /** * @author sz * @DATE 2022/5/6 23:04 */ public class MyClientHandler extends SimpleChannelInboundHandler<String> implements Callable { private String requestMsg; private String responseMsg; private ChannelHandlerContext context; public void setRequestMsg(String str) { this.requestMsg = str; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.context = ctx; } @Override protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception { this.responseMsg = str; //唤醒 notify(); } @Override public synchronized Object call() throws Exception { this.context.writeAndFlush(requestMsg); //线程等待 拿到响应数据 wait(); return responseMsg; } }
4.3 RpcProxy
package com.sgg.proxy; import com.alibaba.fastjson.JSON; import com.sgg.client.RpcClient; import com.sgg.pojo.RpcRequest; import com.sgg.pojo.RpcResponse; import com.sgg.pojo.User; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; /** * @author sz * @DATE 2022/5/6 22:46 */ public class RpcProxy { public static Object createProxy(Class target) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{target}, (Object proxy, Method method, Object[] args) -> { RpcRequest rpcRequest = new RpcRequest(); //设置类名 rpcRequest.setClassName(method.getDeclaringClass().getName()); //设置方法名 rpcRequest.setMethodName(method.getName()); //设置方法参数类型 rpcRequest.setParameterTypes(method.getParameterTypes()); //设置方法实际参数 rpcRequest.setParamters(args); //发送信息,拿到返回值 RpcClient rpcClient = new RpcClient("127.0.0.1", 9999); String mess = (String) rpcClient.sendMess(JSON.toJSONString(rpcRequest)); //转换为rpcResponse RpcResponse rpcResponse = JSON.parseObject(mess, RpcResponse.class); //拿到返回结果 if (200==rpcResponse.getCode()){ return JSON.parseObject(rpcResponse.getResult(), User.class); } return null; } ); } }
4.4 ClientApp
package com.sgg; import com.sgg.pojo.User; import com.sgg.proxy.RpcProxy; import com.sgg.service.UserService; /** * @author sz * @DATE 2022/5/6 22:44 */ public class ClientApp { public static void main(String[] args) { UserService proxy = (UserService) RpcProxy.createProxy(UserService.class); User userById = proxy.getUserById(2); System.out.println("userById = " + userById); } }#Java开发##Java#