19-编写类postman后端功能

写在前面

本次章节,由于要完成postman的主要发送接口功能,因此篇幅预计在一万字上下。且有部分代码不进行注释,只在后面针对核心方法进行讲解,对水平要求偏高,望提前知晓!

回顾

接上篇,上次我们在前端项目中,补全了前端的自定义导航栏。要知道,咱们这是一个接口测试平台。一个接口自动化平台,最核心的当然是对api的请求操作,所以咱们刻不容缓,加快进度,趁热打铁,来点干货吧。用aiohttp来协助我们完成接口自动化请求。

技术选型
  1. 异步支持:aiohttp是一个异步的HTTP客户端库,而requests是同步的。在异步编程中,使用异步库可以充分利用单线程处理多个并发请求,提高程序的性能和吞吐量。在高并发场景下,aiohttp通常能够更好地处理大量的并发请求。
  2. 非阻塞IO:aiohttp使用非阻塞的IO模型,在发起请求时不会阻塞主线程,可以在等待响应的同时继续处理其他任务。这在IO密集型的场景下特别有优势,比如在网络请求过程中可以同时进行其他计算任务,提高了程序的运行效率。
  3. Python异步生态系统:aiohttp是Python异步生态系统的一部分,与其他异步库(如asyncio)很好地集成在一起,可以方便地编写异步代码。而requests是同步的,如果在异步环境中使用,可能需要借助额外的库来实现异步功能,增加了复杂性。
  4. 对WebSocket的支持:aiohttp还提供了对WebSocket的原生支持,可以方便地处理WebSocket连接,而requests并不直接支持WebSocket。
  5. Python 3.5+兼容性:aiohttp支持Python 3.5及以上版本,而requests虽然也可以在Python 3.5+上运行,但更早的Python版本可能需要使用较老的requests版本,导致不同Python版本的代码兼容性较差。

需要注意的是,选择使用aiohttp还要考虑项目的实际需求和技术栈。如果项目对异步处理要求较高,有大量的并发请求或需要与异步IO库集成,那么aiohttp可能是更好的选择。但如果项目规模较小,对并发要求不高,且更喜欢简单易用的同步请求库,那么requests也是一个很好的选择。

综上所述,在此技术选型选择aiohttp。

设计思路

由于后期我们不只是需要支持http请求,还有一系列的rpc、ws等请求,因此我准备将核心方法新增一个目录进行管理,名字就叫requestpages。

编码

新增文件abandon-server/src/app/requestpages/AsyncHttpClient.py

import json
import time
from urllib.parse import urlencode

import aiohttp
from aiohttp import FormData


# 定义一个异步请求类AsyncRequest
class AsyncRequest(object):

    def __init__(self, url: str, timeout=15, **kwargs):
        self.url = url
        self.kwargs = kwargs
        self.timeout = aiohttp.ClientTimeout(total=timeout)

    def get_cookie(self, session):
        """
        获取Cookie的方法,接收一个session对象作为参数
        :param session:
        :return:
        """
        cookies = session.cookie_jar.filter_cookies(self.url)
        return {k: v.value for k, v in cookies.items()}

    def get_data(self, kwargs):
        """
        获取请求的数据的方法,接收一个kwargs字典作为参数
        :param kwargs:
        :return:
        """
        if kwargs.get("json") is not None:
            return kwargs.get("json")
        return kwargs.get("data")

    async def invoke(self, method: str):
        """
        发送异步请求的方法,接收一个method字符串作为参数
        :param method: str
        :return:
        """
        start = time.time()
        async with aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
            async with session.request(method, self.url, timeout=self.timeout, **self.kwargs) as resp:
                if resp.status != 200:
                    # 修复bug,当http状态码不为200的时候给出提示
                    return await self.collect(False, self.get_data(self.kwargs), resp.status, msg="http状态码不为200")
                cost = "%.0fms" % ((time.time() - start) * 1000)
                response, json_format = await AsyncRequest.get_resp(resp)
                cookie = self.get_cookie(session)
                return await self.collect(True, self.get_data(self.kwargs), resp.status, response,
                                          resp.headers, resp.request_info.headers, elapsed=cost,
                                          cookies=cookie, json_format=json_format)

    @staticmethod
    async def client(url: str, timeout=15, **kwargs):
        """
        用于创建AsyncRequest对象并返回
        :param url: str
        :param timeout: int
        :param kwargs:
        :return:
        """
        if not url.startswith(("http://", "https://")):
            raise Exception("请输入正确的url, 记得带上http哦")
        headers = kwargs.get("headers")
        body = kwargs.get("body", {})
        if body is None:
            r = AsyncRequest(url, headers=headers, timeout=timeout)
        elif body.get("body_type") == "none":
            r = AsyncRequest(url, headers=headers, timeout=timeout)
        elif body.get("body_type") == "json":
            if "Content-Type" not in headers:
                headers['Content-Type'] = "application/json; charset=UTF-8"
            try:
                body_data = body["body"]
                body_data = json.loads(json.dumps(body_data))
            except json.JSONDecodeError as e:
                raise Exception(f"json格式不正确: {e}")
            r = AsyncRequest(url, headers=headers, timeout=timeout,
                             json=body_data)
        elif body.get("body_type") == "formdata":
            try:
                body_data = body.get("body", [])
                form_data = FormData()
                for item in body_data:
                    # 如果是文本类型,直接添加key-value
                    if item.get("type") == 'TEXT':
                        form_data.add_field(item.get("key"), item.get("value", ''))
                    # todo 后期可能会改写add_file方法,暂时先注释掉基础写法
                    # else:
                    #     # 如果是文件类型,使用add_file方法添加文件
                    #     file_content = await file.read()
                    #     form_data.add_file(item.get("key"), file_content, filename=item.get("value"))
                r = AsyncRequest(url, headers=headers, data=form_data, timeout=timeout)
            except Exception as e:
                raise Exception(f"解析form-data失败: {str(e)}")
        elif body["body_type"] == "xform":
            body_data = kwargs.get("body", "{}")
            body_encoded = urlencode(body_data)
            r = AsyncRequest(url, headers=headers, data=body_encoded, timeout=timeout)
        else:
            # 暂时未支持其他类型
            r = AsyncRequest(url, headers=headers, timeout=timeout, data=kwargs.get("body", {})["body"])
        return r

    @staticmethod
    async def get_resp(resp):
        """
        用于获取响应的数据,返回响应的文本内容和一个布尔值,表示是否为json格式
        :param resp: str
        :return:
        """
        try:
            data = await resp.json(encoding='utf-8')
            # 说明是json格式
            return json.dumps(data, ensure_ascii=False, indent=4), True
        except:
            data = await resp.text()
            # 说明不是json格式,我们不做loads操作了
            return data, False

    @staticmethod
    def get_request_data(body):
        """
        用于获取请求的数据
        :param body: str
        :return:
        """
        request_body = body
        if isinstance(body, bytes):
            request_body = request_body.decode()
        if isinstance(body, FormData):
            request_body = str(body)
        if isinstance(request_body, str) or request_body is None:
            return request_body
        return json.dumps(request_body, ensure_ascii=False, indent=4)

    @staticmethod
    async def collect(status, request_data, status_code=200, response=None, response_headers=None,
                      request_headers=None, cookies=None, elapsed=None, msg="Request Successful!", **kwargs):
        """
        用于收集http返回的数据,接收多个参数,并将它们封装成字典形式返回
        :param status: 请求状态
        :param request_data: 请求入参
        :param status_code: 状态码
        :param response: 相应
        :param response_headers: 返回header
        :param request_headers:  请求header
        :param cookies:  cookie
        :param elapsed: 耗时
        :param msg: 报错信息
        :return:
        """
        request_headers = json.dumps({k: v for k, v in request_headers.items()} if request_headers is not None else {},
                                     ensure_ascii=False)
        response_headers = json.dumps(
            {k: v for k, v in response_headers.items()} if response_headers is not None else {},
            ensure_ascii=False)
        cookies = {k: v for k, v in cookies.items()} if cookies is not None else {}
        cookies = json.dumps(cookies, ensure_ascii=False)
        return {
            "status": status, "response_data": response, "status_code": status_code,
            "request_data": AsyncRequest.get_request_data(request_data),
            "response_headers": response_headers, "request_headers": request_headers,
            "msg": msg, "elapsed_time": elapsed, "cookies": cookies, **kwargs,
        }

新增HttpRequestForm的结构体,新增abandon-server/src/app/schema/http.py

from typing import Optional, Union
from pydantic import BaseModel, validator

from src.app.exception.error import ParamsError


# 定义一个数据模型,用于接收HTTP请求的相关信息
class HttpRequestForm(BaseModel):
    method: str
    url: str
    # 定义HTTP请求的请求体,可以是字典或列表类型,可选参数,默认为None
    body: Optional[Union[dict, list]] = None
    # 定义HTTP请求的请求头,可以是字典类型,可选参数,默认为一个空字典
    headers: Optional[dict] = {}

    # 使用pydantic的validator装饰器,对method和url字段进行验证
    @validator('method', 'url')
    def name_not_empty(cls, v):
        # 验证方法:检查字符串是否为空或仅包含空格
        if isinstance(v, str) and len(v.strip()) == 0:
            # 如果为空,抛出自定义异常ParamsError,提示不能为空
            raise ParamsError("不能为空")
        # 如果验证通过,返回原始值v
        return v

核心方法讲解

本次功能,核心方法大概分为invoke函数和client

  • 首先讲解的是client函数:

这个函数方法client用于创建AsyncRequest对象并返回。它的作用是根据传入的URL、超时时间和其他参数来创建不同类型的AsyncRequest对象,用于进行异步的HTTP请求。

下面对这个函数方法的具体实现进行详细解释:

  1. async def client(url: str, timeout=15, **kwargs)::这是一个异步函数方法,接收三个参数:url为字符串类型的HTTP请求URL,timeout为超时时间,默认为15秒,**kwargs为可变关键字参数。

  2. 首先,通过url.startswith(("http://", "https://"))来判断URL是否以http://https://开头,如果不是,就抛出异常,提示请输入正确的URL。

  3. 然后,从**kwargs中获取headersbody的值。headers是HTTP请求的头部信息,body是HTTP请求的请求体,它是一个字典类型,默认为空字典{}

  4. 根据body.get("body_type")来判断body中的body_type字段的值,从而决定创建何种类型的AsyncRequest对象:

    a. 如果body_type不存在(即body.get("body_type")返回None),或者body_type的值是"none",则创建一个不带请求体的AsyncRequest对象,只包含URL和请求头。

    b. 如果body_type的值是"json",则首先检查请求头headers中是否包含"Content-Type"字段,如果不包含,则将"Content-Type"设置为"application/json; charset=UTF-8"。然后尝试将body["body"]的值转换为JSON格式,如果转换失败(出现JSONDecodeError异常),则抛出异常提示"json格式不正确"。最终创建一个带有JSON格式的请求体的AsyncRequest对象。

    c. 如果body_type的值是"formdata",则将body["body"]中的数据处理成FormData格式,逐个添加到form_data对象中,形成请求体。其中,如果item的"key"对应的是文本类型,则直接添加key-value到请求体中;如果是文件类型,暂时注释掉"else"部分,后期可能会添加更改的方法。最终创建一个带有FormData格式的请求体的AsyncRequest对象。

    d. 如果body_type的值是"xform",则将kwargs["body"]进行URL编码,得到一个字符串形式的请求体,并创建一个带有x-www-form-urlencoded格式的AsyncRequest对象。

    e. 如果body_type不属于上述几种类型,暂时未支持其他类型,创建一个空的请求体({})的AsyncRequest对象。

  5. 最后,根据不同的body_type的值,创建不同类型的AsyncRequest对象,并将其返回。

  • invoke函数

这个函数方法invoke是用于发送异步请求的方法,它接收一个字符串类型的method参数作为HTTP请求的方法(如GET、POST等),然后进行异步的HTTP请求,并返回请求结果。

下面对这个函数方法的具体实现进行详细解释:

  1. async def invoke(self, method: str)::这是一个异步函数方法,接收一个名为method的字符串参数,表示HTTP请求的方法(GET、POST等)。

  2. 首先,记录请求开始时间start = time.time()

  3. 创建一个异步的ClientSession对象session,用于发送HTTP请求。cookie_jar=aiohttp.CookieJar(unsafe=True)表示使用不安全的CookieJar,即可以在httphttps之间共享cookie。

  4. 使用sessionrequest方法发送HTTP请求,传入methodself.url(请求URL)、timeout(超时时间)和**self.kwargs(其他HTTP请求参数,如请求头和请求体)。

  5. 判断响应的状态码resp.status是否为200,如果不为200,则通过self.collect方法收集请求失败的信息,并返回False表示请求不成功,同时附带HTTP状态码不为200的提示信息。

  6. 如果响应的状态码为200,则计算请求耗时cost = "%.0fms" % ((time.time() - start) * 1000),将耗时信息格式化成毫秒单位的字符串。

  7. 调用AsyncRequest.get_resp(resp)方法,解析响应内容,返回一个元组,第一个元素是响应内容response,第二个元素是一个布尔值json_format,表示响应是否为JSON格式。

  8. 使用sessioncookie_jar属性获取请求中的cookie信息,并将其转换成字典格式,保存在cookie变量中。

  9. 最后,通过self.collect方法收集请求成功的信息,并将请求结果、状态码、响应内容、响应头、请求头、耗时、cookie信息以及是否为JSON格式的标记等信息一并返回。其中,状态为True表示请求成功。

注册http请求相关路由

新增路由相关abandon-server/src/app/routes/request/http.py

from fastapi import APIRouter

from src.app.customized.customized_response import AbandonJSONResponse
from src.app.requestpages.AsyncHttpClient import AsyncRequest
from src.app.schema.http import HttpRequestForm

router = APIRouter(prefix="/request")


@router.post("/http")
async def http_request(data: HttpRequestForm):
    try:
        r = await AsyncRequest.client(data.url, headers=data.headers, body=data.body)
        response = await r.invoke(data.method)
        return AbandonJSONResponse.success(response)
    except Exception as e:
        return AbandonJSONResponse.failed(e)


@router.post("/posttest")
async def post_none():
    return AbandonJSONResponse.success('success')

注册路由

image.png

验证
  1. GET 空接口

image.png

  1. POST 空接口

image.png 3. POST none接口

后续只发curl,不截图了

curl --location 'http://127.0.0.1:9923/request/http' \
--header 'Content-Type: application/json' \
--data '{
    "method": "POST",
    
    "url": "http://127.0.0.1:9923/request/posttest",
    "body": {"body_type": "none"}
    
    
    
}'
  1. POST json接口
curl --location 'http://127.0.0.1:9923/request/http' \
--header 'Content-Type: application/json' \
--data '{
    "method": "POST",
    
    "url": "http://127.0.0.1:9923/request/posttest",
    "headers": {"Content-Type": "application/json"},
    "body": {"body_type": "json", "body":{"data":"2"}}
    
    
    
    
}'
  1. POST form-data接口
curl --location 'http://127.0.0.1:9923/request/http' \
--header 'Content-Type: application/json' \
--data '{
    "method": "POST",
    
    "url": "http://127.0.0.1:9923/request/posttest",
    
    "headers": {"Content-Type": "application/json"},
    
    "body": {"body_type": "formdata", "body":[{"key":"s","value":"v","type":"TEXT"},{"key":"s","value":"v","type":"FILE"}]}
    
}'
  1. POST x-www-form-urlencoded接口
curl --location 'http://127.0.0.1:9923/request/http' \
--header 'Content-Type: application/json' \
--data '{
    "method": "POST",
    
    "url": "http://127.0.0.1:9923/request/posttest",
    
    
    "headers": {"Content-Type": "application/json"},
    
    "body": {"body_type": "xform", "body":{"data":"2"}}
}'
全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务