【JAVA】如何基于Netty实现简单的RPC 框架

如何基于Netty实现简单的RPC 框架

1. 项目模块与依赖

common 模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>common</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--netty依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <!--json依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>
        </dependency>
        <!--lombok依赖 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>

rpc-client模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-client</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.sgg</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

rpc-server 模块依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myRPC</artifactId>
        <groupId>com.sgg</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-server</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.sgg</groupId>
            <artifactId>common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <!--spring相关依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    </dependencies>

</project>

myRPC

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sgg</groupId>
    <artifactId>myRPC</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>common</module>
        <module>rpc-client</module>
        <module>rpc-server</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
    </parent>


</project>

2. common 通用模块

image-20220507102547079

2.1 RpcRequest

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest {

    /**
     * 全限定类名
     */
    private String className;


    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;


    /**
     * 实参
     */
    private Object[] paramters;
}

2.2 RpcResponse

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse {

    //返回状态码
    private Integer code;
    //返回结果
    private String result;
    //错误信息
    private String error;

}

2.3 User

package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:55
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

    private Integer id;
    private String name;

}

2.4 UserService

package com.sgg.service;

import com.sgg.pojo.User;

public interface UserService {
    User getUserById(Integer id);
}

3. rpc-server 服务端模块

image-20220507102807642

3.1 MyServiceRpc

package com.sgg.anno;


import java.lang.annotation.*;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyServiceRpc {
}

3.2 MyServerHandler

package com.sgg.handler;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.util.*;

/**
 * @author sz
 * @DATE 2022/5/6  22:16
 */
@Component
public class MyServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {

    private static ApplicationContext app;
    private static HashMap<String, Object> cache = new HashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        app = applicationContext;
        //拿到容器中所有标注了@MyServiceRpc 注解的 bean
        Map<String, Object> beansWithAnnotation = app.getBeansWithAnnotation(MyServiceRpc.class);
        //拿到bean实现的接口的全限定类名
        Set<Map.Entry<String, Object>> entries = beansWithAnnotation.entrySet();
        entries.stream().forEach(ent->{
            Class<?>[] interfaces = ent.getValue().getClass().getInterfaces();
            if (null!=interfaces && interfaces.length != 0){
                Arrays.stream(interfaces).forEach(inter->{
                    cache.put(inter.getName(),ent.getValue());
                });
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接 : "+ctx.channel().remoteAddress().toString().substring(1));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String json) throws Exception {
        //封装结果
        RpcResponse rpcResponse = new RpcResponse();

        Object result = null;
        try {
            //将json字符串转换为RpcRequest 对象
            RpcRequest rpcRequest = JSONObject.parseObject(json, RpcRequest.class);
            //拿到需要调用的类
            String className = rpcRequest.getClassName();
            Object bean = cache.get(className);
            //需要调用的方法名
            String methodName = rpcRequest.getMethodName();
            //方法参数类型
            Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
            //方法实参
            Object[] paramters = rpcRequest.getParamters();

            //反射调用方法
            FastClass fastClass = FastClass.create(bean.getClass());
            FastMethod fastClassMethod = fastClass.getMethod(methodName, parameterTypes);
            result = fastClassMethod.invoke(bean, paramters);
            rpcResponse.setCode(200);
            rpcResponse.setResult((String) result);
        } catch (Exception e) {
            e.printStackTrace();
            rpcResponse.setCode(400);
            rpcResponse.setError(e.getMessage());
        }

        //将结果用json字符串写回去
        channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
    }


}

3.3 ServerProvider

package com.sgg.provider;

import com.sgg.handler.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;

import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;

/**
 * @author sz
 * @DATE 2022/5/6  22:07
 */
@Service
public class ServerProvider implements Closeable {

    private NioEventLoopGroup boss ;
    private NioEventLoopGroup work ;



    public void start(String ip,Integer port)  {
        //创建两个线程组
        boss = new NioEventLoopGroup(1);
        //默认线程数 = CPU数 * 2
        work = new NioEventLoopGroup();

        //创建启动组手
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //解析字符串
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //内容处理
                            pipeline.addLast(new MyServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
            System.out.println(">>>>>>>服务器启动成功<<<<<<<<");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            if (null!=boss){
                boss.shutdownGracefully();
            }
            if (null!=boss){
                work.shutdownGracefully();
            }
        }
    }


    @Override
    public void close() throws IOException {
        System.out.println("容器关闭我被调用了");
        if (null!=boss){
            boss.shutdownGracefully();
        }
        if (null!=boss){
            work.shutdownGracefully();
        }
    }
}

3.4 UserServiceImpl

package com.sgg.service.impl;

import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.User;
import com.sgg.service.UserService;
import org.springframework.stereotype.Service;

import java.util.HashMap;

/**
 * @author sz
 * @DATE 2022/5/6  22:18
 */
@MyServiceRpc
@Service
public class UserServiceImpl implements UserService {

    private static HashMap<Integer,User> map = new HashMap();

    static {
        map.put(1,new User(1,"张三"));
        map.put(2,new User(2,"李四"));
    }

    @Override
    public User getUserById(Integer id) {
        return map.get(id);
    }

}

3.5 ServerApp

package com.sgg;

import com.sgg.provider.ServerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author sz
 * @DATE 2022/5/6  22:04
 */
@SpringBootApplication
public class ServerApp implements CommandLineRunner {

    @Autowired
    private ServerProvider serverProvider;

    public static void main(String[] args) {
        SpringApplication.run(ServerApp.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(()->{
                serverProvider.start("127.0.0.1",9999);
        }).start();
    }
}

4. rpc-client 客户端模块

image-20220507103000153

4.1 RpcClient

package com.sgg.client;

import com.sgg.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author sz
 * @DATE 2022/5/6  22:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcClient {

    private String ip;
    private Integer port;

    public RpcClient(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
        init();
    }

    private NioEventLoopGroup eventLoopGroup;
    private Channel channel;
    private MyClientHandler myClientHandler = new MyClientHandler();
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public Object sendMess(String message) throws ExecutionException, InterruptedException {
        myClientHandler.setRequestMsg(message);
        Future submit = executorService.submit(myClientHandler);
        return submit.get();
    }

   public void init() {
        //创建线程组
        eventLoopGroup = new NioEventLoopGroup();
        //创建启动组手
        Bootstrap bootstrap = new Bootstrap();
        //分组
        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //业务
                            pipeline.addLast(myClientHandler);
                        }
                    });

            channel = bootstrap.connect(ip, port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
            if (null != channel) {
                channel.close();
            }
            if (null != eventLoopGroup) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }

    public void close() {
        if (null != channel) {
            channel.close();
        }
        if (null != eventLoopGroup) {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

4.2 MyClientHandler

package com.sgg.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.Callable;

/**
 * @author sz
 * @DATE 2022/5/6  23:04
 */
public class MyClientHandler extends SimpleChannelInboundHandler<String> implements Callable {

    private String requestMsg;
    private String responseMsg;

    private ChannelHandlerContext context;

    public void setRequestMsg(String str) {
        this.requestMsg = str;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception {
        this.responseMsg = str;
        //唤醒
        notify();
    }

    @Override
    public synchronized Object call() throws Exception {
        this.context.writeAndFlush(requestMsg);
        //线程等待  拿到响应数据
        wait();
        return responseMsg;
    }
}

4.3 RpcProxy

package com.sgg.proxy;

import com.alibaba.fastjson.JSON;
import com.sgg.client.RpcClient;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import com.sgg.pojo.User;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author sz
 * @DATE 2022/5/6  22:46
 */
public class RpcProxy {

    public static Object createProxy(Class target) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{target},
                (Object proxy, Method method, Object[] args) -> {

                    RpcRequest rpcRequest = new RpcRequest();
                    //设置类名
                    rpcRequest.setClassName(method.getDeclaringClass().getName());
                    //设置方法名
                    rpcRequest.setMethodName(method.getName());
                    //设置方法参数类型
                    rpcRequest.setParameterTypes(method.getParameterTypes());
                    //设置方法实际参数
                    rpcRequest.setParamters(args);

                    //发送信息,拿到返回值
                    RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
                    String mess = (String) rpcClient.sendMess(JSON.toJSONString(rpcRequest));
                    //转换为rpcResponse
                    RpcResponse rpcResponse = JSON.parseObject(mess, RpcResponse.class);
                    //拿到返回结果
                    if (200==rpcResponse.getCode()){
                        return JSON.parseObject(rpcResponse.getResult(), User.class);
                    }

                    return null;
                }
        );
    }

}

4.4 ClientApp

package com.sgg;

import com.sgg.pojo.User;
import com.sgg.proxy.RpcProxy;
import com.sgg.service.UserService;

/**
 * @author sz
 * @DATE 2022/5/6  22:44
 */

public class ClientApp {

    public static void main(String[] args) {

        UserService proxy = (UserService) RpcProxy.createProxy(UserService.class);
        User userById = proxy.getUserById(2);
        System.out.println("userById = " + userById);
    }

}
#Java开发##Java#
全部评论
学废了学废了🤣
点赞 回复 分享
发布于 2022-05-10 12:35

相关推荐

11-28 18:41
已编辑
厦门大学 C++
百度 云服务go后端开发 24+签字费
点赞 评论 收藏
分享
评论
点赞
15
分享
牛客网
牛客企业服务