openresty代理websocket流量(apisix)

需求

基于apisix网关代理websocket流量(原生支持),可以管理websocket连接信息(查看连接数据,关闭指定连接)。但apisix本身不支持websocket的流量管理

问题

apisix仅对websocket请求做了验证转发,并未对websocket流量做流量管理,无法从apisix获取websocket上下文。

实现思路

在proxy之前,识别&hold住websockt upgrade请求,基于openresty.wesocket自定义websocket proxy,通过内存共享完成proxy上下文管理。

Apisix插件执行生命周期

代理逻辑代码

local core = require("apisix.core")
local plugin = require("apisix.plugin")
local router = require("apisix.router")
local ngx = ngx
local ipairs = ipairs
local pairs = pairs
local ws_server = require("resty.websocket.server")
local ws_client = require("resty.websocket.client")
-- nginx共享内存订阅,监听关闭信号。共享内存在worker内好使
local close_share_sub = ngx.shared["wesocket_close"]
local _M = {}
-- websocket不同帧类型编码
local code_map = {
    ["continuation"] = 0x0,
    ["text"] = 0x1,
    ["binary"] = 0x2,
    ["close"] = 0x8,
    ["ping"] = 0x9,
    ["pong"] = 0xa,
}
-- apisix插件开发的 钩子方法,通过该方法hold websocket请求
func _M.before_proxy(conf,ctx)
	local upgrate = core.request.header(ctx, "upgrade") or ""
	if upgrade ~= "websocket" then
    return
	end
	proxy(ctx)
end

-- 代理上下游连接,可以从上下文获取到更多proxy meta data,通过etcd上报到管理端
local func proxy(ctx)
	local upstream_url = "ws://" .. ctx.picked_server.upstream_host
	local p_client, err = ws_client:new{max_payload_len=65535}
	if err then
  	return err
	end
	local ok, err = p_client:connect(upstream_url, {
    server_name="proxy_websocket",
    pool=upstream_url
  })
	if not ok then
  	return err
	end
	local p_server, err = ws_server:new{max_payload_len=65535}
  if err then 
  	reutnr err
  end
  -- 监听服务端流量,转发给客户端
	local client_proxy = ngx.thread.spawn(wait, p_client, p_server, false, ctx)
  -- 监听客户端流量,转发给服务端 websocket规定客户端发送给服务端数据帧必须mask,防止缓存攻击
	local server_proxy = ngx.thread.spawn(wait, p_server, p_client, true, ctx)
	local ok = ngx.thread.wait(client_proxy, server_proxy)
	if not ok then
  	p_client:send_close()
  	p_server:send_close()
	end
	ngx.thread.spawn(sub_close, p_client,p_server)
end

-- 监听转发上下游请求
local func wait(from_ws, to_ws, flip_masking, ctx)
	while true do
  	local data, typ, err = form_ws:recv_frame(flip_masking)
  	if data == nil then
    	to_ws:send_close()
    	break
		else
    	if typ == "text" then
        --获取全局插件,执行自定义websocket log插件
        ctx.wb_body = data
  			plugin.run_global_rules(ctx, router.global_rules, "custom_websocket_log")
			end
    	-- 连续帧信号识别、透传. 参考的文章里连续帧用是否continuation判断是不对的
    	-- fin=0表示数据帧未结束,fin=1表示数据帧是最后一帧
    	-- 当有超长帧拆分发送时,
			-- 第一帧 fin=0,opcode=text
			-- 中间帧 fin=0,opcode=continuation
			-- 结束帧 fin=1, opcode=continuation
    	local fin = true
    	if err == "again" then
  			fin = false
			end
    	local bytes, err = to_ws:send_frame(fin, code_map[typ], data, flip_masking)
    	if typ == "close" then
  			from_ws:send_close()
        break
			end
    	if bytes == nil then
    			from_ws:send_close()
          break
			end
		end
	end
	return nil
end

-- 监听共享内存,订阅关闭信号
local function sub_close(p_client, p_server)
	while true do
  	core.sleep(2)
  	local keys = close_share_sub:get_keys()
  	local size = #keys
  	if size ~= 0 then
  		p_client:send_close()
    	p_server:send_close()
		end
	end
end

function _M.websocket_log(conf,ctx)
	core.log.error(ctx.wb_body)
end

return _M

local core = require("apisix.core")
local close_share_sub = ngx.shared["wesocket_close"]
local _M = {}
local key = "/public/websocket/close"
-- 发布关闭close信号链接,共享内存只在节点内存生效,节点间close信号可以通过etcd通信
local function close()
  -- close_share_sub:set(key, "", 3)
  core.etcd.set(key, "", 3)
  return core.response.exit(200, {message="success"})
end

function _M.api()
  return {
    {
      methods = {"POST"},
      uri = "/apisix/public/websocket/close"
      handler = close,
    }
  }
end

-- 初始化监听etcd 关闭信号
function _M.init()
	core.timer.new("etcd_close_watcher", etcd_watch, {
      each_ttl = 10,
      sleep_fail = 5,
      sleep_succ = 5,
      check_interval = 1,
  })
end

function etcd_watch()
	local etcdcli, prefix, err = core.etcd.get_etcd_syncer()
  local close_key = prefix .. key
  local res , err = etcdcli:readdir(close_key)
  if not res then
    return
  end
  if res.status ~=200 || res.status == 404 || not res.body.kvs then
    return
  end
  for _, item in ipairs(kvs) do
    close_share__sub:set(item.value,  "", 3)
  end
end

return _M

参考:

https://gist.github.com/ykst/52205d4d4968298137ea0050c4569170

https://apisix.apache.org/docs/apisix/plugin-develop/

https://apisix.apache.org/zh/docs/apisix/architecture-design/apisix/

https://github.com/abbshr/abbshr.github.io/issues/47

全部评论
大佬,custom_websocket_log 这个自定义插件是啥
点赞 回复 分享
发布于 2024-12-10 10:36 陕西

相关推荐

2024-12-20 15:55
门头沟学院 数据运营
点赞 评论 收藏
分享
评论
点赞
1
分享
牛客网
牛客企业服务