Java中使用KCP协议
传统游戏项目一般使用TCP协议进行通信,得益于它的稳定和可靠,不过在网络不稳定的情况下,会出现丢包严重。
不过近期有不少基于UDP的应用层协议,声称对UDP的不可靠进行了改造,这意味着我们既可以享受网络层提供稳定可靠的服务,又可以享受它的速度。
KCP就是这样的一个协议
不过网上说的再天花乱坠,我们也得亲自调研,分析源码和它的机制,并测试它的性能,是否满足项目上线要求。本文从C版本的源码入手理解KCP的机制,再研究各种Java版本的实现
一、KCP协议
原版源码(C代码):
https://github.com/skywind3000/kcp
基于底层协议(一般是UDP)之上,完全在应用层实现类TCP的可靠机制(快速重传,拥塞控制等)
二、KCP特性
KCP实现以下特性,也可参考github中README中对KCP的定义
特性 |
说明 |
源码位置 |
RTO优化 |
超时时间计算优于TCP |
ikcp_update_ack |
选择性重传 |
KCP只重传真正丢失的数据包,TCP会全部重传丢失包之后的全部数据 |
ikcp_parse_fastack,ikcp_flush |
快速重传 |
根据配置,可以在丢失包被跳过一定次数后直接重传,不等RTO超时 |
ikcp_parse_fastack,ikcp_flush |
UNA + ACK |
ARQ模型响应有两种,UNA(此编号前所有包已收到,如TCP),ACK(该编号包已收到),光用UNA将导致全部重传,光用ACK则丢失成本太高,以往协议都是二选其一,而 KCP协议中,除去单独的 ACK包外,所有包都有UNA信息。 |
ikcp_flush(每次update,都发送ACK) |
非延迟ACK |
KCP可配置是否延迟发送ACK |
ikcp_update_ack |
流量控制 |
同TCP的公平退让原则,发送窗口大小由:发送缓存大小、接收端剩余接收缓存大小、丢包退让及慢启动这四要素决定 |
ikcp_input, |
ikcp_flush |
|
|
三、KCP报文
1. 报文解析源码
源码中对报文解析部分代码如下
data = ikcp_decode32u(data, &conv); if (conv != kcp->conv) return -1; data = ikcp_decode8u(data, &cmd); data = ikcp_decode8u(data, &frg); data = ikcp_decode16u(data, &wnd); data = ikcp_decode32u(data, &ts); data = ikcp_decode32u(data, &sn); data = ikcp_decode32u(data, &una); data = ikcp_decode32u(data, &len);
2. 报文定义
报文中标识的定义
名词 |
全称 |
备注 |
作用 |
conv |
conversation id |
会话ID |
每个连接的唯一标识 |
cmd |
command |
命令 |
每个数据包指定逻辑 |
frg |
fragment count |
数据分段序号 |
根据mtu(最大传输单元)和mss(最大报文长度)的数据分段 |
wnd |
window size |
接收窗口大小 |
流量控制 |
ts |
timestamp |
时间戳 |
数据包发送时间记录 |
sn |
serial number |
数据报的序号 |
确保包的有序 |
una |
un-acknowledged serial number |
对端下一个要接收的数据报序号 |
确保包的有序 |
3. 消息类型
KCP报文的四种消息类型
const IUINT32 IKCP_CMD_PUSH = 81; // cmd: push data: 推送数据 const IUINT32 IKCP_CMD_ACK = 82; // cmd: ack: 对推送数据的确认 const IUINT32 IKCP_CMD_WASK = 83; // cmd: window probe (ask): 询问窗口大小 const IUINT32 IKCP_CMD_WINS = 84; // cmd: window size (tell): 回复窗口大小
- 报文结构
四、源码解析
在网络四层模型中,KCP和TCP/UDP(传输层),IP(网络层)等协议有着本质上区别,理论上KCP是属于应用层协议。
KCP并不提供协议实际收发处理,它只是在传输层只上对消息和链接的一层中间管理。
在KCP的源码中,它仅仅包含ikcp.c和ikcp.h两个文件,仅提供KCP的数据管理和数据接口,而用户需要在应用层进行KCP的调度
1. 结构体定义
KCP分包结构KCP对象结构体定义
struct IKCPSEG { struct IQUEUEHEAD node; IUINT32 conv; //用来标记这个seg属于哪个kcp IUINT32 cmd;//这个包的指令是: // 数据 ack 询问/应答窗口大小 IUINT32 frg; //分包时,分包的序号,0为终结 IUINT32 wnd;//发送这个seg的这个端的 窗口大小--> 远端的接收窗口大小 IUINT32 ts; //我不知道为什么要用时间轴,这个都1秒,有什么用 ?? IUINT32 sn;//相当于tcp的ack IUINT32 una;//una 远端等待接收的一个序号 IUINT32 len; //data的长度 IUINT32 resendts;//重发的时间轴 IUINT32 rto;//等于发送端kcp的 rx_rto->由 计算得来 IUINT32 fastack;//ack跳过的次数,用于快速重传 IUINT32 xmit;// fastack resend次数 char data[1];//当malloc时,只需要 malloc(sizeof(IKCPSEG)+datalen) 则,data长=数据长度+1 刚好用来放0 }; struct IKCPCB { //会话ID,最大传输单元,最大分片大小,状态 mss=mtu-sizeof(IKCPSEG) IUINT32 conv, mtu, mss, state; //第一个未接收到的包,待发送的包(可以认为是tcp的ack自增),接收消息的序号-> 用来赋seg的una值 IUINT32 snd_una, snd_nxt, rcv_nxt; //前两个不知道干嘛 拥塞窗口的阈值 用来控制cwnd值变化的 IUINT32 ts_recent, ts_lastack, ssthresh; //这几个变量是用来更新rto的 // rx_rttval 接收ack的浮动值 // rx_srtt 接收ack的平滑值 // rx_rto 计算出来的rto // rx_minrto 最小rto IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto; //发送队列的窗口大小 //接收队列的窗口大小 //远端的接收队列的窗口大小 //窗口大小 //probe 用来二进制标记 IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe; //时间轴 时间间隔 下一次flush的时间 xmit发射多少次? 看不到有什么地方用到 IUINT32 current, interval, ts_flush, xmit; //接收到的数据seg个数 //需要发送的seg个数 IUINT32 nrcv_buf, nsnd_buf; //接收队列的数据 seg个数 //发送队列的数据 seg个数 IUINT32 nrcv_que, nsnd_que; //是否为nodelay模式:如果开启,rto计算范围更小 //updated 在调用flush时,有没有调用过update IUINT32 nodelay, updated; //请求访问窗口的时间相关 当远程端口大小为0时 IUINT32 ts_probe, probe_wait; IUINT32 dead_link, incr; //发送队列 struct IQUEUEHEAD snd_queue; //接收队列 struct IQUEUEHEAD rcv_queue; //待发送队列 struct IQUEUEHEAD snd_buf; //待接收队列 struct IQUEUEHEAD rcv_buf; //用来缓存自己接收到了多少个ack IUINT32 *acklist; IUINT32 ackcount; IUINT32 ackblock; //用户信息 void *user; //好像就用来操作数据的中转站 char *buffer; //快速重传的阈值 int fastresend; //快速重传的上限 int fastlimit; //是否无视重传等其它设置窗口 //steam模式的话,会将几个小包合并成大包 int nocwnd, stream; int logmask; int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); void (*writelog)(const char *log, struct IKCPCB *kcp, void *user); };
2. 接口分析
分析C源码,KCP作为中间管理层,主要提供以下接口
//--------------------------------------------------------------------- // interface //--------------------------------------------------------------------- // create a new kcp control object, 'conv' must equal in two endpoint // from the same connection. 'user' will be passed to the output callback // output callback can be setup like this: 'kcp->output = my_udp_output' // 创建kcp对象,conv必须在两个端之间相同,user会被传递到output回调, // output回调这样设置:kcp->output = my_udp_output ikcpcb* ikcp_create(IUINT32 conv, void *user); // release kcp control object // 释放kcp对象 void ikcp_release(ikcpcb *kcp); // set output callback, which will be invoked by kcp // 设置kcp调用的output回调 void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len, ikcpcb *kcp, void *user)); // user/upper level recv: returns size, returns below zero for EAGAIN // 用户层/上层 接收消息:返回接收长度,数据读取错误返回值小于0 int ikcp_recv(ikcpcb *kcp, char *buffer, int len); // user/upper level send, returns below zero for error // 用户层/上层 发送消息,错误返回值小于0 int ikcp_send(ikcpcb *kcp, const char *buffer, int len); // update state (call it repeatedly, every 10ms-100ms), or you can ask // ikcp_check when to call it again (without ikcp_input/_send calling). // 'current' - current timestamp in millisec. // 更新状态(每10ms-100ms调用一次),或者你可以通过调用ikcp_check, // 来得知什么时候再次调用(不调用ikcp_input/_send) // current - 当前时间戳(毫秒) void ikcp_update(ikcpcb *kcp, IUINT32 current); // Determine when should you invoke ikcp_update: // returns when you should invoke ikcp_update in millisec, if there // is no ikcp_input/_send calling. you can call ikcp_update in that // time, instead of call update repeatly. // Important to reduce unnacessary ikcp_update invoking. use it to // schedule ikcp_update (eg. implementing an epoll-like mechanism, // or optimize ikcp_update when handling massive kcp connections) // 决定你什么时候调用ikcp_update // 返回你多少毫秒后应该调用ikcp_update,如果没有ikcp_input/_send调用,你可以在那个时间 // 调用ikcp_updates来代替自己驱动update调用 // 用于减少不必要的ikcp_update调用。用这个来驱动ikcp_update(比如:实现类epoll的机制, // 或者优化处理大量kcp连接时的ikcp_update调用) IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current); // when you received a low level packet (eg. UDP packet), call it // 接收下层数据包(比如:UDP数据包)时调用 int ikcp_input(ikcpcb *kcp, const char *data, long size); // flush pending data // 刷新数据 void ikcp_flush(ikcpcb *kcp); // check the size of next message in the recv queue // 检测接收队列里下条消息的长度 int ikcp_peeksize(const ikcpcb *kcp); // change MTU size, default is 1400 // 修改MTU长度,默认1400 int ikcp_setmtu(ikcpcb *kcp, int mtu); // set maximum window size: sndwnd=32, rcvwnd=32 by default // 设置最大窗口大小,默认值:sndwnd=32, rcvwnd=32 int ikcp_wndsize(ikcpcb *kcp, int sndwnd, int rcvwnd); // get how many packet is waiting to be sent // 获取准备发送的数据包 int ikcp_waitsnd(const ikcpcb *kcp); // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1) // nodelay: 0:disable(default), 1:enable // interval: internal update timer interval in millisec, default is 100ms // resend: 0:disable fast resend(default), 1:enable fast resend // nc: 0:normal congestion control(default), 1:disable congestion control // 快速设置:ikcp_nodelay(kcp, 1, 20, 2, 1) // nodelay:0:使用(默认),1:使用 // interval:update时间(毫秒),默认100ms // resend:0:不适用快速重发(默认), 其他:自己设置值,若设置为2(则2次ACK跨越将会直接重传) // nc:0:正常拥塞控制(默认), 1:不适用拥塞控制 int ikcp_nodelay(ikcpcb *kcp, int nodelay, int interval, int resend, int nc); void ikcp_log(ikcpcb *kcp, int mask, const char *fmt, ...); // setup allocator // 设置kcp allocator void ikcp_allocator(void* (*new_malloc)(size_t), void (*new_free)(void*)); // read conv // 获取conv IUINT32 ikcp_getconv(const void *ptr);
3. 调度逻辑
KCP关键接口:
-
更新(上层驱动KCP状态更新)
ikcp_update:kcp状态更新接口,需要上层进行调度,判断flush时间,满足条件调用ikcp_flush刷新数据,同时也负责对收到数据的kcp端回复ACK消息 -
发送
ikcp_send -> ikcp_update -> ikcp_output ikcp_send:上层调用发送接口,把数据根据mss值进行分片,设置分包编号,放到snd_queue队尾 ikcp_flush:发送数据接口,根据对端窗口大小,拷贝snd_queue的数据到snd_buf,遍历snd_buf,满足条件则调用output回调(调用网络层的发送) -
接收
ikcp_input -> ikcp_update -> ikcp_recv ikcp_input:解析上层输入数据,拷贝rcv_buf到rcv_queue ikcp_recv:数据接收接口,上层从rcv_queue中复制数据到网络层buffer
五、Java版本
目前github上有几个高star的java版本实现,选取最高的三个进行分析
1.https://github.com/szhnet/kcp-netty.git(star:212)
实现原理:
1.KCP逻辑是源码的Java翻译版(一模一样)
2.UkcpServerChannel继承ServerChannel,UkcpServerBootStrap
3.用Boss线程 EventLoopGroup的read事件来驱动KCP逻辑
优点:使用Netty的Boss线程Read事件来驱动KCP,不用while(true)的驱动;使用简单,只需使用指定的ServerChannel和ServerBootStrap来启动Netty
缺点:无明显缺点
2.https://github.com/beykery/jkcp.git(star:172)
实现原理:
1.KCP逻辑是源码的Java翻译版(一模一样)
2.启动指定线程数的KcpThread自定义IO线程池,进行KCP逻辑调度
3.Netty读消息时抛到KcpThread自定义IO线程
// 通过hash选择IO线程处理 InetSocketAddress sender = dp.sender(); int hash = sender.hashCode(); hash = hash < 0 ? -hash : hash; this.workers[hash % workers.length].input(dp);
优点:代码简单明了,容易理解,核心是翻译版源码,外壳套的是Netty+自定义IO线程池
缺点:IO线程池会while(true)的调用KCP的update
3.https://github.com/l42111996/java-Kcp.git(star:187)
实现原理:
1.KCP逻辑是源码的Java翻译版(一模一样)
2.Netty读消息时,扔到定时器,1ms后,抛出任务到自定义IO线程
优点:拥有1的全部优点,也在Netty的读消息,把消息抛到定时器去调用KCP的逻辑,避免了2的无意义的while(true),同时实现功能更全,有上线项目验证(据作者描述)
缺点:Netty相关逻辑完全封装起来,不能修改任何Netty参数(不过源码中对Netty的参数已配置的很好了)
目前看来,第三种实现(
https://github.com/l42111996/java-Kcp.git)是最理想的方式
如果大家感兴趣,后边会对第三种实现进行详细的源码分析