openresty代理websocket流量(apisix)
需求
基于apisix网关代理websocket流量(原生支持),可以管理websocket连接信息(查看连接数据,关闭指定连接)。但apisix本身不支持websocket的流量管理
问题
apisix仅对websocket请求做了验证转发,并未对websocket流量做流量管理,无法从apisix获取websocket上下文。
实现思路
在proxy之前,识别&hold住websockt upgrade请求,基于openresty.wesocket自定义websocket proxy,通过内存共享完成proxy上下文管理。
Apisix插件执行生命周期
代理逻辑代码
local core = require("apisix.core") local plugin = require("apisix.plugin") local router = require("apisix.router") local ngx = ngx local ipairs = ipairs local pairs = pairs local ws_server = require("resty.websocket.server") local ws_client = require("resty.websocket.client") -- nginx共享内存订阅,监听关闭信号。共享内存在worker内好使 local close_share_sub = ngx.shared["wesocket_close"] local _M = {} -- websocket不同帧类型编码 local code_map = { ["continuation"] = 0x0, ["text"] = 0x1, ["binary"] = 0x2, ["close"] = 0x8, ["ping"] = 0x9, ["pong"] = 0xa, } -- apisix插件开发的 钩子方法,通过该方法hold websocket请求 func _M.before_proxy(conf,ctx) local upgrate = core.request.header(ctx, "upgrade") or "" if upgrade ~= "websocket" then return end proxy(ctx) end -- 代理上下游连接,可以从上下文获取到更多proxy meta data,通过etcd上报到管理端 local func proxy(ctx) local upstream_url = "ws://" .. ctx.picked_server.upstream_host local p_client, err = ws_client:new{max_payload_len=65535} if err then return err end local ok, err = p_client:connect(upstream_url, { server_name="proxy_websocket", pool=upstream_url }) if not ok then return err end local p_server, err = ws_server:new{max_payload_len=65535} if err then reutnr err end -- 监听服务端流量,转发给客户端 local client_proxy = ngx.thread.spawn(wait, p_client, p_server, false, ctx) -- 监听客户端流量,转发给服务端 websocket规定客户端发送给服务端数据帧必须mask,防止缓存攻击 local server_proxy = ngx.thread.spawn(wait, p_server, p_client, true, ctx) local ok = ngx.thread.wait(client_proxy, server_proxy) if not ok then p_client:send_close() p_server:send_close() end ngx.thread.spawn(sub_close, p_client,p_server) end -- 监听转发上下游请求 local func wait(from_ws, to_ws, flip_masking, ctx) while true do local data, typ, err = form_ws:recv_frame(flip_masking) if data == nil then to_ws:send_close() break else if typ == "text" then --获取全局插件,执行自定义websocket log插件 ctx.wb_body = data plugin.run_global_rules(ctx, router.global_rules, "custom_websocket_log") end -- 连续帧信号识别、透传. 参考的文章里连续帧用是否continuation判断是不对的 -- fin=0表示数据帧未结束,fin=1表示数据帧是最后一帧 -- 当有超长帧拆分发送时, -- 第一帧 fin=0,opcode=text -- 中间帧 fin=0,opcode=continuation -- 结束帧 fin=1, opcode=continuation local fin = true if err == "again" then fin = false end local bytes, err = to_ws:send_frame(fin, code_map[typ], data, flip_masking) if typ == "close" then from_ws:send_close() break end if bytes == nil then from_ws:send_close() break end end end return nil end -- 监听共享内存,订阅关闭信号 local function sub_close(p_client, p_server) while true do core.sleep(2) local keys = close_share_sub:get_keys() local size = #keys if size ~= 0 then p_client:send_close() p_server:send_close() end end end function _M.websocket_log(conf,ctx) core.log.error(ctx.wb_body) end return _M
local core = require("apisix.core") local close_share_sub = ngx.shared["wesocket_close"] local _M = {} local key = "/public/websocket/close" -- 发布关闭close信号链接,共享内存只在节点内存生效,节点间close信号可以通过etcd通信 local function close() -- close_share_sub:set(key, "", 3) core.etcd.set(key, "", 3) return core.response.exit(200, {message="success"}) end function _M.api() return { { methods = {"POST"}, uri = "/apisix/public/websocket/close" handler = close, } } end -- 初始化监听etcd 关闭信号 function _M.init() core.timer.new("etcd_close_watcher", etcd_watch, { each_ttl = 10, sleep_fail = 5, sleep_succ = 5, check_interval = 1, }) end function etcd_watch() local etcdcli, prefix, err = core.etcd.get_etcd_syncer() local close_key = prefix .. key local res , err = etcdcli:readdir(close_key) if not res then return end if res.status ~=200 || res.status == 404 || not res.body.kvs then return end for _, item in ipairs(kvs) do close_share__sub:set(item.value, "", 3) end end return _M
参考:
https://gist.github.com/ykst/52205d4d4968298137ea0050c4569170
https://apisix.apache.org/docs/apisix/plugin-develop/
https://apisix.apache.org/zh/docs/apisix/architecture-design/apisix/