学Mysql得知道的: Client 端调用主流程
一 .前言
出于好奇 , 这一篇来看一下 MySQL Client 端的调用主流程 , 为后续的 MySQL 系列文档开个头
二 . 创建 Connect
以获取连接为例 , 当获取连接时 , 会通过多种方式调用 Spring 的 DataSourceUtils # getConnection
此时还处在 Spring 的业务体系中. Connect 的流程在启动时创建和运行时调用是两个完全不同的流程 , 先来看一 CreateConnect 的主流程
// Step 1 : Connect 的创建入口
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
// ...
Connection con = fetchConnection(dataSource);
// ...
}
// Step 2 : 连接池处理
// 如果使用了连接池 , 此时的连接会交给对应连接池来处理
pool = result = new HikariPool(this);
> 以下跳过连接池的相关原理 ,直接看到 com.mysql.cj.jdbc.Driver 的核心处理流程
// Step 3 : MySQL 驱动入口
public java.sql.Connection connect(String url, Properties info) throws SQLException {
try {
if (!ConnectionUrl.acceptsUrl(url)) {
/*
* According to JDBC spec:
* The driver should return "null" if it realizes it is the wrong kind of driver to connect to the given URL. This will be common, as when the
* JDBC driver manager is asked to connect to a given URL it passes the URL to each loaded driver in turn.
*/
return null;
}
ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
// 根据类型创建不同的连接方式
switch (conStr.getType()) {
case SINGLE_CONNECTION:
return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost());
case LOADBALANCE_CONNECTION:
return LoadBalancedConnectionProxy.createProxyInstance((LoadbalanceConnectionUrl) conStr);
case FAILOVER_CONNECTION:
return FailoverConnectionProxy.createProxyInstance(conStr);
case REPLICATION_CONNECTION:
return ReplicationConnectionProxy.createProxyInstance((ReplicationConnectionUrl) conStr);
default:
return null;
}
} catch (UnsupportedConnectionStringException e) {
// when Connector/J can't handle this connection string the Driver must return null
return null;
} catch (CJException ex) {
throw ExceptionFactory.createException(UnableToConnectException.class,
Messages.getString("NonRegisteringDriver.17", new Object[] { ex.toString() }), ex);
}
}
复制代码
三 . 运行 SQL 流程
这一节来看一下执行 SQL 时的调用流程
在一个 SQL 的生命周期中 , 主要有2个主要的流程 :
- 流程一 : 基于事务起点的 SET autocommit
- 流程二 : 真正核心的 SQL 执行语句
3.1 事务的入口
Spring 的事务起点是 TransactionAspectSupport , 进入一系列流程后 , 会进入连接池的处理中 , 这里涉及到 SpringTransaction 的流程 , 可以看看这篇文章 基于 Spring 的事务管理, 这里只是简单过一下
- Step 1 : TransactionImpl # begin : 由 Begin 开启事务流程
- Step 2 :ConnectionImpl # setAutoCommit : 开启自动提交流程
- Step 3 : NativeSession # execSQL : 进入 SQL 执行核心流程
public void begin() {
if ( !doConnectionsFromProviderHaveAutoCommitDisabled() ) {
getConnectionForTransactionManagement().setAutoCommit( false );
}
status = TransactionStatus.ACTIVE;
}
复制代码
事务会调用setAutoCommit , 其根本也是调用一个execSQL 语句来控制事务
this.session.execSQL(null, autoCommitFlag ? "SET autocommit=1" : "SET autocommit=0", -1, null, false, this.nullStatementResultSetFactory, this.database, null, false);
复制代码
到了这里会直接调用到 execSQL , 而流程二的普通 SQL 语句 , 会由对应的 executeUpdate / executeQuery /executeInternal 发起流程处理
3.2 普通业务执行流程
- Step 1 : AbstractEntityPersister # insert : 由 Hibernate/JPA 发起的操作流程
- Step 2 : ResultSetReturnImpl # executeUpdate : 执行 Update 语句 (或者 Query -> executeQuery)
- Step 3 : HikariProxyPreparedStatement # executeUpdate : 连接池的中间处理 , 后续可以专门看看
- Step 4 : ClientPreparedStatement # executeUpdate : 由 mysql 驱动接管
- Step 5 : ClientPreparedStatement # executeUpdateInternal :
- Step 5 : ClientPreparedStatement # executeInternal : 由底层方法调用抽象类 , 最终调用 execSQL
// 处理 Update 语句 , 核心流程如下 :
protected long executeUpdateInternal(QueryBindings<?> bindings, boolean isReallyBatch) throws SQLException {
// 1. 获取 JDBC 连接
JdbcConnection locallyScopedConn = this.connection;
// 2. 解析出要发送给 MySQL 的语句包
Message sendPacket = ((PreparedQuery<?>) this.query).fillSendPacket(bindings);
// 3. 调用通过处理方法执行 SQL
rs = executeInternal(-1, sendPacket, false, false, null, isReallyBatch);
// 4. 设置处理结果
this.results = rs;
// 设置更新数量 ,这里会对重复语句进行统计处理
this.updateCount = rs.getUpdateCount();
// 5. 获取最后插入的 ID
this.lastInsertId = rs.getUpdateID();
// 6. 返回更新数量
return this.updateCount;
}
复制代码
executeInternal 主流程
protected <M extends Message> ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, M sendPacket, boolean createStreamingResultSet,
boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException {
// Step 1 : 获取连接
JdbcConnection locallyScopedConnection = this.connection;
// Step 2 : 获取插入值得绑定关系
((PreparedQuery<?>) this.query).getQueryBindings()
.setNumberOfExecutions(((PreparedQuery<?>) this.query).getQueryBindings().getNumberOfExecutions() + 1);
// Step 3 :设置返回结果设置方法
ResultSetInternalMethods rs;
// Step 4 : 设置超时方法
CancelQueryTask timeoutTask = startQueryTimer(this, getTimeoutInMillis());
// Step 5 : 调用具体的 SQL 执行语句
rs = ((NativeSession) locallyScopedConnection.getSession()).execSQL(this, null, maxRowsToRetrieve, (NativePacketPayload) sendPacket,
createStreamingResultSet, getResultSetFactory(), this.getCurrentCatalog(), metadata, isBatch);
// Step 6 : 超时时间处理 ,省略
}
复制代码
3.3 业务处理通用流程
看了入口方法, 现在来看一下 execQuery 的具体处理流程 :
// C- NativeSession
public <T extends Resultset> T execSQL(Query callingQuery, String query, int maxRows, NativePacketPayload packet, boolean streamResults,
ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory, String catalog, ColumnDefinition cachedMetadata, boolean isBatch) {
//
int endOfQueryPacketPosition = endOfQueryPacketPosition = packet.getPosition();
//
long queryStartTime = System.currentTimeMillis();
// 如果 packet == null , 调用如下处理
return ((NativeProtocol) this.protocol).sendQueryString(callingQuery, query, encoding, maxRows, streamResults, catalog, cachedMetadata,
this::getProfilerEventHandlerInstanceFunction, resultSetFactory);
//
return ((NativeProtocol) this.protocol).sendQueryPacket(callingQuery, packet, maxRows, streamResults, catalog, cachedMetadata,
this::getProfilerEventHandlerInstanceFunction, resultSetFactory);
}
复制代码
在上文调用 sendQueryPacket 发起了SQL 执行操作
// C- NativeProtocol
public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults,
String catalog, ColumnDefinition cachedMetadata, GetProfilerEventHandlerInstanceFunction getProfilerEventHandlerInstanceFunction,
ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
this.statementExecutionDepth++;
byte[] queryBuf = null;
int oldPacketPosition = 0;
long queryStartTime = 0;
long queryEndTime = 0;
queryBuf = queryPacket.getByteBuffer();
oldPacketPosition = queryPacket.getPosition(); // save the packet position
// 查询启动时间
queryStartTime = getCurrentTimeNanosOrMillis();
// 查询语句
LazyString query = new LazyString(queryBuf, 1, (oldPacketPosition - 1));
// 发送命令
NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);
// 获取所有的 Result 结果
T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory);
// 反射拦截器
T interceptedResults = invokeQueryInterceptorsPost(query, callingQuery, rs, false);
// 返回结果
return rs;
}
复制代码
3.5 发送命令
// C- NativeProtocol
public final NativePacketPayload sendCommand(Message queryPacket, boolean skipCheck, int timeoutMillis) {
int command = queryPacket.getByteBuffer()[0];
this.commandCount++;
if (this.queryInterceptors != null) {
NativePacketPayload interceptedPacketPayload = (NativePacketPayload) invokeQueryInterceptorsPre(queryPacket, false);
if (interceptedPacketPayload != null) {
return interceptedPacketPayload;
}
}
this.packetReader.resetMessageSequence();
// 获取旧 timeout 时间并且设置新的超时时间
// PS : 这里的 oldTimeout 在 finally 中会再次设置 soTimeout
int oldTimeout = 0;
oldTimeout = this.socketConnection.getMysqlSocket().getSoTimeout();
this.socketConnection.getMysqlSocket().setSoTimeout(timeoutMillis);
//
checkForOutstandingStreamingData();
// 设置互斥锁
this.serverSession.setStatusFlags(0, true);
// 清空输入流
clearInputStream();
this.packetSequence = -1;
// 发起包
send(queryPacket, queryPacket.getPosition());
// 获取 Return 结果
// 1. resultPacket = readMessage(this.reusablePacket)
// 2. checkErrorMessage(resultPacket)
NativePacketPayload returnPacket = checkErrorMessage(command);
return returnPacket;
}
复制代码
// C- NativeProtocol
public final void send(Message packet, int packetLen) {
//....
// 通过 Sender 发送远程包
this.packetSender.send(packet.getByteBuffer(), packetLen, this.packetSequence);
}
复制代码
C- SimplePacketSender
public void send(byte[] packet, int packetLen, byte packetSequence) throws IOException {
PacketSplitter packetSplitter = new PacketSplitter(packetLen);
// 持续从远程读取包
while (packetSplitter.nextPacket()) {
this.outputStream.write(NativeUtils.encodeMysqlThreeByteInteger(packetSplitter.getPacketLen()));
this.outputStream.write(packetSequence++);
this.outputStream.write(packet, packetSplitter.getOffset(), packetSplitter.getPacketLen());
}
this.outputStream.flush();
}
复制代码
3.6 解析 Result
Step 1 : packetReader.readMessage
这里的 packetReader 主要包含以下几种实现 MultiPacketReader :
public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
// 获取长度和 Message 实现对哦下
int packetLength = header.getMessageSize();
NativePacketPayload buf = this.packetReader.readMessage(reuse, header);
// 此处通过 do-while 进行循环获取
do {
//......
this.packetReader.readMessage(Optional.of(multiPacket), hdr);
// 写入 byte 数据
buf.writeBytes(StringLengthDataType.STRING_FIXED, multiPacket.getByteBuffer(), 0, multiPacketLength);
// 循环获取 , 直到最大长度 -> MAX_PACKET_SIZE = 256 * 256 * 256 - 1;
} while (multiPacketLength == NativeConstants.MAX_PACKET_SIZE);
return buf;
}
复制代码
checkErrorMessage 判断是否为错误返回
public void checkErrorMessage(NativePacketPayload resultPacket) {
resultPacket.setPosition(0);
// 获取状态嘛
byte statusCode = (byte) resultPacket.readInteger(IntegerDataType.INT1);
// Error handling
// 此处通过状态码判断是否为异常结果
if (statusCode == (byte) 0xff) {
// 省略 error 处理环节
// 此处会通过状态和异常处理的结果抛出对应的异常 , 该方法没有具体的返回值
if (xOpen != null) {
if (xOpen.startsWith("22")) {
throw new DataTruncationException(errorBuf.toString(), 0, true, false, 0, 0, errno);
}
if (errno == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD) {
throw ExceptionFactory.createException(PasswordExpiredException.class, errorBuf.toString(), getExceptionInterceptor());
} else if (errno == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD_LOGIN) {
throw ExceptionFactory.createException(ClosedOnExpiredPasswordException.class, errorBuf.toString(), getExceptionInterceptor());
}
}
throw ExceptionFactory.createException(errorBuf.toString(), xOpen, errno, false, null, getExceptionInterceptor());
}
}
复制代码
获取最终返回结果
public <T extends Resultset> T readAllResults(int maxRows, boolean streamResults, NativePacketPayload resultPacket, boolean isBinaryEncoded,
ColumnDefinition metadata, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
// 调用具体的实现类获取最终结果 , 结果会被放在 rowData 中
T topLevelResultSet = read(Resultset.class, maxRows, streamResults, resultPacket, isBinaryEncoded, metadata, resultSetFactory);
}
// 这里的 Read 有多种 , 后面再来看一看如何实现 Byte 读取的
复制代码
总结
东西不多 , 主要是一些主流程代码 , 其中很多环节都比较模糊 , 主要是为了串联整个流程 , 后续小细节会抽空深入的了解一下.
相对于其他的框架代码 , Mysql 的代码看起来很生涩难懂 , 每个主流程间参杂了很多辅助属性 , 如果业务上出现了问题 , 又不确定最终的执行语句 ,可以考虑在 sendCommand 等方法中添加断点
连接池的使用通常在 ResultSetReturnImpl 和 ClientPreparedStatement 之间进行处理 , 后续可以关注一下
链接:https://juejin.cn/post/7034140455667236894