MySQL通过源码了解DML执行位置
extern int mysqld_main(int argc, char **argv);
int main(int argc, char **argv)
{
return mysqld_main(argc, argv);
}
通过入口函数,可以了解到最主要的函数是:mysql_main,在这个函数中进行了一些资源和变量的一些初始化工作
my_progname= argv[0];
参数会通过栈空间传递给my_progname变量
handle_connections_methods();
handle_connections_sockets();
在handle_connection_sockets函数内会创建一个主线程,然后监听客户端消息,当消息到达后会创建一个子线程去处理,主线程继续进行监听。
if (!(thd= new THD))
{
(void) mysql_socket_shutdown(new_sock, SHUT_RDWR);
(void) mysql_socket_close(new_sock);
statistic_increment(connection_errors_internal, &LOCK_status);
continue;
}
bool is_unix_sock= (mysql_socket_getfd(sock) == mysql_socket_getfd(unix_sock));
enum_vio_type vio_type= (is_unix_sock ? VIO_TYPE_SOCKET : VIO_TYPE_TCPIP);
uint vio_flags= (is_unix_sock ? VIO_LOCALHOST : 0);
vio_tmp= mysql_socket_vio_new(new_sock, vio_type, vio_flags);
if (!vio_tmp || my_net_init(&thd->net, vio_tmp))
{
/*
Only delete the temporary vio if we didn't already attach it to the
NET object. The destructor in THD will delete any initialized net
structure.
*/
if (vio_tmp && thd->net.vio != vio_tmp)
vio_delete(vio_tmp);
else
{
(void) mysql_socket_shutdown(new_sock, SHUT_RDWR);
(void) mysql_socket_close(new_sock);
}
delete thd;
statistic_increment(connection_errors_internal, &LOCK_status);
continue;
}
init_net_server_extension(thd);
if (mysql_socket_getfd(sock) == mysql_socket_getfd(unix_sock))
thd->security_ctx->set_host((char*) my_localhost);
create_new_thread(thd);
}
在得到消息以后会对thd结构体进行填充实际内容,然后创建新的县城去进行下一步的操作
/*
Initialize scheduler for --thread-handling=one-thread-per-connection
*/
void one_thread_per_connection_scheduler()
{
scheduler_init();
one_thread_per_connection_scheduler_functions.max_threads= max_connections;
thread_scheduler= &one_thread_per_connection_scheduler_functions;
}
通过宏定义初始化thread_scheduler
MYSQL_CALLBACK(thread_scheduler, add_connection, (thd))
进行子线程的创建
if ((error= mysql_thread_create(key_thread_one_connection,
&thd->real_id, &connection_attrib,
handle_one_connection,
(void*) thd)))
handle_one_connection就是线程函数,其中有一个do_command函数值得关注,注意它是从sql_connect过来的
true; m_server_idle=
packet_length= my_net_read(net);
false; m_server_idle=
几经转折之后来到读取客户端消息的位置,其中的net是消息结构里存在的。它将会对消息包进行读取,解压缩,剥离出头部数据
size_t vio_read(Vio *vio, uchar *buf, size_t size)
{
ssize_t ret;
int flags= 0;
DBUG_ENTER("vio_read");
/* Ensure nobody uses vio_read_buff and vio_read simultaneously. */
DBUG_ASSERT(vio->read_end == vio->read_pos);
/* If timeout is enabled, do not block if data is unavailable. */
if (vio->read_timeout >= 0)
flags= VIO_DONTWAIT;
while ((ret= mysql_socket_recv(vio->mysql_socket, (SOCKBUF_T *)buf, size, flags)) == -1)
{
int error= socket_errno;
/* The operation would block? */
if (error != SOCKET_EAGAIN && error != SOCKET_EWOULDBLOCK)
break;
/* Wait for input data to become available. */
if ((ret= vio_socket_io_wait(vio, VIO_IO_EVENT_READ)))
break;
}
DBUG_RETURN(ret);
}
对套接字进行读取使用的是recv
int recv(
SOCKET s,
char *buf,
int len,
int flags
);
result= recv(mysql_socket.fd, buf, IF_WIN((int),) n, flags);
command= (enum enum_server_command) (uchar) packet[0];
消息体的第一个数据是非常有用的,它是:标识消息类型的代码
bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length)
{
switch (command) {
case COM_INIT_DB: ...
case SQLCOM_EXECUTE:
{
mysql_sql_stmt_execute(thd);
break;
}
case SQLCOM_DEALLOCATE_PREPARE:
{
mysql_sql_stmt_close(thd);
break;
}
/* and so on for 18 other cases */
default:
send_error(thd, ER_UNKNOWN_COM_ERROR);
break;
}
进入分发会发现一个非常大的switch包含:查询,终止进程,睡眠,连接以及一些次要命令
mysql_execute_command(THD *thd)
{
case SQLCOM_REPLACE:
#ifndef DBUG_OFF
if (mysql_bin_log.is_open())
{
/*
Generate an incident log event before writing the real event
to the binary log. We put this event is before the statement
since that makes it simpler to check that the statement was
not executed on the slave (since incidents usually stop the
slave).
Observe that any row events that are generated will be
generated before.
This is only for testing purposes and will not be present in a
release build.
*/
Incident incident= INCIDENT_NONE;
DBUG_PRINT("debug", ("Just before generate_incident()"));
DBUG_EXECUTE_IF("incident_database_resync_on_replace",
incident= INCIDENT_LOST_EVENTS;);
if (incident)
{
Incident_log_event ev(thd, incident);
if (mysql_bin_log.write_incident(&ev, true/*need_lock_log=true*/))
{
res= 1;
break;
}
}
DBUG_PRINT("debug", ("Just after generate_incident()"));
}
#endif
case SQLCOM_INSERT:
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
/*
Since INSERT DELAYED doesn't support temporary tables, we could
not pre-open temporary tables for SQLCOM_INSERT / SQLCOM_REPLACE.
Open them here instead.
*/
if (first_table->lock_type != TL_WRITE_DELAYED)
{
if ((res= open_temporary_tables(thd, all_tables)))
break;
}
if ((res= insert_precheck(thd, all_tables)))
break;
MYSQL_INSERT_START(thd->query());
res= mysql_insert(thd, all_tables, lex->field_list, lex->many_values,
lex->update_list, lex->value_list,
lex->duplicates, lex->ignore);
MYSQL_INSERT_DONE(res, (ulong) thd->get_row_count_func());
/*
If we have inserted into a VIEW, and the base table has
AUTO_INCREMENT column, but this column is not accessible through
a view, then we should restore LAST_INSERT_ID to the value it
had before the statement.
*/
if (first_table->view && !first_table->contain_auto_increment)
thd->first_successful_insert_id_in_cur_stmt=
thd->first_successful_insert_id_in_prev_stmt;
DBUG_EXECUTE_IF("after_mysql_insert",
{
const char act[]=
"now "
"wait_for signal.continue";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
};);
break;
}
case SQLCOM_REPLACE_SELECT:
case SQLCOM_INSERT_SELECT:
通过statement对象,可以得到更细小的指令集
error= write_record(thd, table, &info, &update);
mysql_insert会带我们到达一个更底层的位置
int handler::ha_write_row(uchar *buf)
{
int error;
Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
DBUG_ENTER("handler::ha_write_row");
DBUG_EXECUTE_IF("inject_error_ha_write_row",
DBUG_RETURN(HA_ERR_INTERNAL_ERROR); );
MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str);
mark_trx_read_write();
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
{ error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error);
if (unlikely(error))
DBUG_RETURN(error);
if (unlikely(error= binlog_log_row(table, 0, buf, log_func)))
DBUG_RETURN(error); /* purecov: inspected */
DEBUG_SYNC_C("ha_write_row_end");
DBUG_RETURN(0);
}
其中的write_row是一个虚函数,它将在运行的时候被赋予不同引擎的接口实现
virtual int write_row(uchar *buf MY_ATTRIBUTE((unused)))
{
return HA_ERR_WRONG_COMMAND;
}
例如CSV的实现
int ha_tina::write_row(uchar * buf)
{
int size;
DBUG_ENTER("ha_tina::write_row");
if (share->crashed)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
ha_statistic_increment(&SSV::ha_write_count);
size= encode_quote(buf);
if (!share->tina_write_opened)
if (init_tina_writer())
DBUG_RETURN(-1);
/* use pwrite, as concurrent reader could have changed the position */
if (mysql_file_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size,
MYF(MY_WME | MY_NABP)))
DBUG_RETURN(-1);
/* update local copy of the max position to see our own changes */
local_saved_data_file_length+= size;
/* update shared info */
mysql_mutex_lock(&share->mutex);
share->rows_recorded++;
/* update status for the log tables */
if (share->is_log_table)
update_status();
mysql_mutex_unlock(&share->mutex);
stats.records++;
DBUG_RETURN(0);
}