openresty代理websocket流量(apisix)
需求
基于apisix网关代理websocket流量(原生支持),可以管理websocket连接信息(查看连接数据,关闭指定连接)。但apisix本身不支持websocket的流量管理
问题
apisix仅对websocket请求做了验证转发,并未对websocket流量做流量管理,无法从apisix获取websocket上下文。
实现思路
在proxy之前,识别&hold住websockt upgrade请求,基于openresty.wesocket自定义websocket proxy,通过内存共享完成proxy上下文管理。
Apisix插件执行生命周期
代理逻辑代码
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local router = require("apisix.router")
local ngx = ngx
local ipairs = ipairs
local pairs = pairs
local ws_server = require("resty.websocket.server")
local ws_client = require("resty.websocket.client")
-- nginx共享内存订阅,监听关闭信号。共享内存在worker内好使
local close_share_sub = ngx.shared["wesocket_close"]
local _M = {}
-- websocket不同帧类型编码
local code_map = {
["continuation"] = 0x0,
["text"] = 0x1,
["binary"] = 0x2,
["close"] = 0x8,
["ping"] = 0x9,
["pong"] = 0xa,
}
-- apisix插件开发的 钩子方法,通过该方法hold websocket请求
func _M.before_proxy(conf,ctx)
local upgrate = core.request.header(ctx, "upgrade") or ""
if upgrade ~= "websocket" then
return
end
proxy(ctx)
end
-- 代理上下游连接,可以从上下文获取到更多proxy meta data,通过etcd上报到管理端
local func proxy(ctx)
local upstream_url = "ws://" .. ctx.picked_server.upstream_host
local p_client, err = ws_client:new{max_payload_len=65535}
if err then
return err
end
local ok, err = p_client:connect(upstream_url, {
server_name="proxy_websocket",
pool=upstream_url
})
if not ok then
return err
end
local p_server, err = ws_server:new{max_payload_len=65535}
if err then
reutnr err
end
-- 监听服务端流量,转发给客户端
local client_proxy = ngx.thread.spawn(wait, p_client, p_server, false, ctx)
-- 监听客户端流量,转发给服务端 websocket规定客户端发送给服务端数据帧必须mask,防止缓存攻击
local server_proxy = ngx.thread.spawn(wait, p_server, p_client, true, ctx)
local ok = ngx.thread.wait(client_proxy, server_proxy)
if not ok then
p_client:send_close()
p_server:send_close()
end
ngx.thread.spawn(sub_close, p_client,p_server)
end
-- 监听转发上下游请求
local func wait(from_ws, to_ws, flip_masking, ctx)
while true do
local data, typ, err = form_ws:recv_frame(flip_masking)
if data == nil then
to_ws:send_close()
break
else
if typ == "text" then
--获取全局插件,执行自定义websocket log插件
ctx.wb_body = data
plugin.run_global_rules(ctx, router.global_rules, "custom_websocket_log")
end
-- 连续帧信号识别、透传. 参考的文章里连续帧用是否continuation判断是不对的
-- fin=0表示数据帧未结束,fin=1表示数据帧是最后一帧
-- 当有超长帧拆分发送时,
-- 第一帧 fin=0,opcode=text
-- 中间帧 fin=0,opcode=continuation
-- 结束帧 fin=1, opcode=continuation
local fin = true
if err == "again" then
fin = false
end
local bytes, err = to_ws:send_frame(fin, code_map[typ], data, flip_masking)
if typ == "close" then
from_ws:send_close()
break
end
if bytes == nil then
from_ws:send_close()
break
end
end
end
return nil
end
-- 监听共享内存,订阅关闭信号
local function sub_close(p_client, p_server)
while true do
core.sleep(2)
local keys = close_share_sub:get_keys()
local size = #keys
if size ~= 0 then
p_client:send_close()
p_server:send_close()
end
end
end
function _M.websocket_log(conf,ctx)
core.log.error(ctx.wb_body)
end
return _M
local core = require("apisix.core")
local close_share_sub = ngx.shared["wesocket_close"]
local _M = {}
local key = "/public/websocket/close"
-- 发布关闭close信号链接,共享内存只在节点内存生效,节点间close信号可以通过etcd通信
local function close()
-- close_share_sub:set(key, "", 3)
core.etcd.set(key, "", 3)
return core.response.exit(200, {message="success"})
end
function _M.api()
return {
{
methods = {"POST"},
uri = "/apisix/public/websocket/close"
handler = close,
}
}
end
-- 初始化监听etcd 关闭信号
function _M.init()
core.timer.new("etcd_close_watcher", etcd_watch, {
each_ttl = 10,
sleep_fail = 5,
sleep_succ = 5,
check_interval = 1,
})
end
function etcd_watch()
local etcdcli, prefix, err = core.etcd.get_etcd_syncer()
local close_key = prefix .. key
local res , err = etcdcli:readdir(close_key)
if not res then
return
end
if res.status ~=200 || res.status == 404 || not res.body.kvs then
return
end
for _, item in ipairs(kvs) do
close_share__sub:set(item.value, "", 3)
end
end
return _M
参考:
https://gist.github.com/ykst/52205d4d4968298137ea0050c4569170
https://apisix.apache.org/docs/apisix/plugin-develop/
https://apisix.apache.org/zh/docs/apisix/architecture-design/apisix/



