学Mysql得知道的: Client 端调用主流程

一 .前言

出于好奇 , 这一篇来看一下 MySQL Client 端的调用主流程 , 为后续的 MySQL 系列文档开个头

二 . 创建 Connect

以获取连接为例 , 当获取连接时 , 会通过多种方式调用 Spring 的 DataSourceUtils # getConnection

此时还处在 Spring 的业务体系中. Connect 的流程在启动时创建和运行时调用是两个完全不同的流程 , 先来看一 CreateConnect 的主流程

// Step 1 : Connect 的创建入口
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    
    // ...
    Connection con = fetchConnection(dataSource);
    // ...

}


// Step 2 : 连接池处理
// 如果使用了连接池 , 此时的连接会交给对应连接池来处理
pool = result = new HikariPool(this);


> 以下跳过连接池的相关原理 ,直接看到 com.mysql.cj.jdbc.Driver 的核心处理流程


// Step 3 :  MySQL 驱动入口
public java.sql.Connection connect(String url, Properties info) throws SQLException {

    try {
        if (!ConnectionUrl.acceptsUrl(url)) {
            /*
             * According to JDBC spec:
             * The driver should return "null" if it realizes it is the wrong kind of driver to connect to the given URL. This will be common, as when the
             * JDBC driver manager is asked to connect to a given URL it passes the URL to each loaded driver in turn.
             */
            return null;
        }

        ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
        // 根据类型创建不同的连接方式
        switch (conStr.getType()) {
            case SINGLE_CONNECTION:
                return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost());

            case LOADBALANCE_CONNECTION:
                return LoadBalancedConnectionProxy.createProxyInstance((LoadbalanceConnectionUrl) conStr);

            case FAILOVER_CONNECTION:
                return FailoverConnectionProxy.createProxyInstance(conStr);

            case REPLICATION_CONNECTION:
                return ReplicationConnectionProxy.createProxyInstance((ReplicationConnectionUrl) conStr);

            default:
                return null;
        }

    } catch (UnsupportedConnectionStringException e) {
        // when Connector/J can't handle this connection string the Driver must return null
        return null;

    } catch (CJException ex) {
        throw ExceptionFactory.createException(UnableToConnectException.class,
                Messages.getString("NonRegisteringDriver.17", new Object[] { ex.toString() }), ex);
    }
}

复制代码

三 . 运行 SQL 流程

这一节来看一下执行 SQL 时的调用流程

在一个 SQL 的生命周期中 , 主要有2个主要的流程 :

  • 流程一 : 基于事务起点的 SET autocommit
  • 流程二 : 真正核心的 SQL 执行语句

3.1 事务的入口

Spring 的事务起点是 TransactionAspectSupport , 进入一系列流程后 , 会进入连接池的处理中 , 这里涉及到 SpringTransaction 的流程 , 可以看看这篇文章 基于 Spring 的事务管理, 这里只是简单过一下

  • Step 1 : TransactionImpl # begin : 由 Begin 开启事务流程
  • Step 2 :ConnectionImpl # setAutoCommit : 开启自动提交流程
  • Step 3 : NativeSession # execSQL : 进入 SQL 执行核心流程
public void begin() {
    if ( !doConnectionsFromProviderHaveAutoCommitDisabled() ) {
        getConnectionForTransactionManagement().setAutoCommit( false );
    }
    status = TransactionStatus.ACTIVE;
}
复制代码

事务会调用setAutoCommit , 其根本也是调用一个execSQL 语句来控制事务

this.session.execSQL(null, autoCommitFlag ? "SET autocommit=1" : "SET autocommit=0", -1, null, false, this.nullStatementResultSetFactory, this.database, null, false);
复制代码

到了这里会直接调用到 execSQL , 而流程二的普通 SQL 语句 , 会由对应的 executeUpdate / executeQuery /executeInternal 发起流程处理

3.2 普通业务执行流程

  • Step 1 : AbstractEntityPersister # insert : 由 Hibernate/JPA 发起的操作流程
  • Step 2 : ResultSetReturnImpl # executeUpdate : 执行 Update 语句 (或者 Query -> executeQuery)
  • Step 3 : HikariProxyPreparedStatement # executeUpdate : 连接池的中间处理 , 后续可以专门看看
  • Step 4 : ClientPreparedStatement # executeUpdate : 由 mysql 驱动接管
  • Step 5 : ClientPreparedStatement # executeUpdateInternal :
  • Step 5 : ClientPreparedStatement # executeInternal : 由底层方法调用抽象类 , 最终调用 execSQL
// 处理 Update 语句 , 核心流程如下 : 
protected long executeUpdateInternal(QueryBindings<?> bindings, boolean isReallyBatch) throws SQLException {

    // 1. 获取 JDBC 连接
    JdbcConnection locallyScopedConn = this.connection;

    // 2. 解析出要发送给 MySQL 的语句包
    Message sendPacket = ((PreparedQuery<?>) this.query).fillSendPacket(bindings);

    // 3. 调用通过处理方法执行 SQL 
    rs = executeInternal(-1, sendPacket, false, false, null, isReallyBatch);

    // 4. 设置处理结果
    this.results = rs;
    
    // 设置更新数量 ,这里会对重复语句进行统计处理
    this.updateCount = rs.getUpdateCount();

    // 5. 获取最后插入的 ID 
    this.lastInsertId = rs.getUpdateID();
        
    // 6. 返回更新数量
    return this.updateCount;

}
复制代码

executeInternal 主流程

protected <M extends Message> ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, M sendPacket, boolean createStreamingResultSet,
        boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException {

    // Step 1 : 获取连接
    JdbcConnection locallyScopedConnection = this.connection;

    // Step 2 : 获取插入值得绑定关系
    ((PreparedQuery<?>) this.query).getQueryBindings()
                    .setNumberOfExecutions(((PreparedQuery<?>) this.query).getQueryBindings().getNumberOfExecutions() + 1);

    // Step 3 :设置返回结果设置方法
    ResultSetInternalMethods rs;

    // Step 4 : 设置超时方法
    CancelQueryTask timeoutTask = startQueryTimer(this, getTimeoutInMillis());

    // Step 5 : 调用具体的 SQL 执行语句 
    rs = ((NativeSession) locallyScopedConnection.getSession()).execSQL(this, null, maxRowsToRetrieve, (NativePacketPayload) sendPacket,
                        createStreamingResultSet, getResultSetFactory(), this.getCurrentCatalog(), metadata, isBatch);

    // Step 6 : 超时时间处理 ,省略
}
复制代码

3.3 业务处理通用流程

看了入口方法, 现在来看一下 execQuery 的具体处理流程 :

// C- NativeSession
public <T extends Resultset> T execSQL(Query callingQuery, String query, int maxRows, NativePacketPayload packet, boolean streamResults,
        ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory, String catalog, ColumnDefinition cachedMetadata, boolean isBatch) {
    
    //  
    int endOfQueryPacketPosition = endOfQueryPacketPosition = packet.getPosition();

    // 
    long queryStartTime = System.currentTimeMillis();

    // 如果 packet == null , 调用如下处理
    return ((NativeProtocol) this.protocol).sendQueryString(callingQuery, query, encoding, maxRows, streamResults, catalog, cachedMetadata,
                    this::getProfilerEventHandlerInstanceFunction, resultSetFactory);

    // 
    return ((NativeProtocol) this.protocol).sendQueryPacket(callingQuery, packet, maxRows, streamResults, catalog, cachedMetadata,
                this::getProfilerEventHandlerInstanceFunction, resultSetFactory);

}
复制代码

在上文调用 sendQueryPacket 发起了SQL 执行操作

// C- NativeProtocol
public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults,
        String catalog, ColumnDefinition cachedMetadata, GetProfilerEventHandlerInstanceFunction getProfilerEventHandlerInstanceFunction,
        ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
    this.statementExecutionDepth++;

    byte[] queryBuf = null;
    int oldPacketPosition = 0;
    long queryStartTime = 0;
    long queryEndTime = 0;

    queryBuf = queryPacket.getByteBuffer();
    oldPacketPosition = queryPacket.getPosition(); // save the packet position
    
    // 查询启动时间
    queryStartTime = getCurrentTimeNanosOrMillis();
    
    // 查询语句
    LazyString query = new LazyString(queryBuf, 1, (oldPacketPosition - 1));

    // 发送命令
    NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);

    // 获取所有的 Result 结果
    T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory);

    // 反射拦截器
    T interceptedResults = invokeQueryInterceptorsPost(query, callingQuery, rs, false);

    // 返回结果
    return rs;

}
复制代码

3.5 发送命令

// C- NativeProtocol
public final NativePacketPayload sendCommand(Message queryPacket, boolean skipCheck, int timeoutMillis) {

    int command = queryPacket.getByteBuffer()[0];
    this.commandCount++;

    if (this.queryInterceptors != null) {
        NativePacketPayload interceptedPacketPayload = (NativePacketPayload) invokeQueryInterceptorsPre(queryPacket, false);

        if (interceptedPacketPayload != null) {
            return interceptedPacketPayload;
        }
    }

    this.packetReader.resetMessageSequence();
    
    // 获取旧 timeout 时间并且设置新的超时时间
    // PS : 这里的 oldTimeout 在 finally 中会再次设置 soTimeout
    int oldTimeout = 0;
    oldTimeout = this.socketConnection.getMysqlSocket().getSoTimeout();
    this.socketConnection.getMysqlSocket().setSoTimeout(timeoutMillis);

    // 
    checkForOutstandingStreamingData();
    
    // 设置互斥锁
    this.serverSession.setStatusFlags(0, true);

    // 清空输入流
    clearInputStream();
    this.packetSequence = -1;
    
    // 发起包
    send(queryPacket, queryPacket.getPosition());

    // 获取 Return 结果
    // 1. resultPacket = readMessage(this.reusablePacket)
    // 2. checkErrorMessage(resultPacket)
    NativePacketPayload returnPacket = checkErrorMessage(command);

    return returnPacket;
}
复制代码
// C- NativeProtocol
public final void send(Message packet, int packetLen) {
        //....
        
        // 通过 Sender 发送远程包
        this.packetSender.send(packet.getByteBuffer(), packetLen, this.packetSequence);
}
复制代码

C- SimplePacketSender

public void send(byte[] packet, int packetLen, byte packetSequence) throws IOException {
    PacketSplitter packetSplitter = new PacketSplitter(packetLen);
    
    // 持续从远程读取包
    while (packetSplitter.nextPacket()) {
        this.outputStream.write(NativeUtils.encodeMysqlThreeByteInteger(packetSplitter.getPacketLen()));
        this.outputStream.write(packetSequence++);
        this.outputStream.write(packet, packetSplitter.getOffset(), packetSplitter.getPacketLen());
    }
    this.outputStream.flush();
}
复制代码

3.6 解析 Result

Step 1 : packetReader.readMessage

这里的 packetReader 主要包含以下几种实现 MultiPacketReader :

public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {

    // 获取长度和 Message 实现对哦下
    int packetLength = header.getMessageSize();
    NativePacketPayload buf = this.packetReader.readMessage(reuse, header);

    // 此处通过 do-while 进行循环获取
    do {
            
            //......
            this.packetReader.readMessage(Optional.of(multiPacket), hdr);
            // 写入 byte 数据
            buf.writeBytes(StringLengthDataType.STRING_FIXED, multiPacket.getByteBuffer(), 0, multiPacketLength);
    // 循环获取 , 直到最大长度 -> MAX_PACKET_SIZE = 256 * 256 * 256 - 1;
    } while (multiPacketLength == NativeConstants.MAX_PACKET_SIZE);

    return buf;
}
复制代码

checkErrorMessage 判断是否为错误返回

public void checkErrorMessage(NativePacketPayload resultPacket) {

    resultPacket.setPosition(0);
    
    // 获取状态嘛
    byte statusCode = (byte) resultPacket.readInteger(IntegerDataType.INT1);

    // Error handling
    // 此处通过状态码判断是否为异常结果
    if (statusCode == (byte) 0xff) {
        // 省略 error 处理环节   


        // 此处会通过状态和异常处理的结果抛出对应的异常 , 该方法没有具体的返回值
        if (xOpen != null) {
            if (xOpen.startsWith("22")) {
                throw new DataTruncationException(errorBuf.toString(), 0, true, false, 0, 0, errno);
            }

            if (errno == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD) {
                throw ExceptionFactory.createException(PasswordExpiredException.class, errorBuf.toString(), getExceptionInterceptor());

            } else if (errno == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD_LOGIN) {
                throw ExceptionFactory.createException(ClosedOnExpiredPasswordException.class, errorBuf.toString(), getExceptionInterceptor());
            }
        }

        throw ExceptionFactory.createException(errorBuf.toString(), xOpen, errno, false, null, getExceptionInterceptor());

    }
}
复制代码

获取最终返回结果

public <T extends Resultset> T readAllResults(int maxRows, boolean streamResults, NativePacketPayload resultPacket, boolean isBinaryEncoded,
        ColumnDefinition metadata, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {

    // 调用具体的实现类获取最终结果 , 结果会被放在 rowData 中     
    T topLevelResultSet = read(Resultset.class, maxRows, streamResults, resultPacket, isBinaryEncoded, metadata, resultSetFactory);


}

// 这里的 Read 有多种 , 后面再来看一看如何实现 Byte 读取的

复制代码

总结

东西不多 , 主要是一些主流程代码 , 其中很多环节都比较模糊 , 主要是为了串联整个流程 , 后续小细节会抽空深入的了解一下.

相对于其他的框架代码 , Mysql 的代码看起来很生涩难懂 , 每个主流程间参杂了很多辅助属性 , 如果业务上出现了问题 , 又不确定最终的执行语句 ,可以考虑在 sendCommand 等方法中添加断点

连接池的使用通常在 ResultSetReturnImpl 和 ClientPreparedStatement 之间进行处理 , 后续可以关注一下



链接:https://juejin.cn/post/7034140455667236894
 

全部评论

相关推荐

孤寡孤寡的牛牛很热情:为什么我2本9硕投了很多,都是简历或者挂,难道那个恶心人的测评真的得认真做吗
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务