vlambda博客
学习文章列表

MySQL通过源码了解DML执行位置

吐槽:感觉现在的面试官还挺有意思。一直再问一些:“地球为什么是圆的”这类话题。
extern int mysqld_main(int argc, char **argv);
int main(int argc, char **argv){ return mysqld_main(argc, argv);}

通过入口函数,可以了解到最主要的函数是:mysql_main,在这个函数中进行了一些资源和变量的一些初始化工作

my_progname= argv[0];

参数会通过栈空间传递给my_progname变量

#if defined(_WIN32) || defined(HAVE_SMEM) handle_connections_methods();#else handle_connections_sockets();#endif /* _WIN32 || HAVE_SMEM */

handle_connection_sockets函数内会创建一个主线程,然后监听客户端消息,当消息到达后会创建一个子线程去处理,主线程继续进行监听。

 if (!(thd= new THD)) { (void) mysql_socket_shutdown(new_sock, SHUT_RDWR); (void) mysql_socket_close(new_sock); statistic_increment(connection_errors_internal, &LOCK_status); continue; }
bool is_unix_sock= (mysql_socket_getfd(sock) == mysql_socket_getfd(unix_sock)); enum_vio_type vio_type= (is_unix_sock ? VIO_TYPE_SOCKET : VIO_TYPE_TCPIP); uint vio_flags= (is_unix_sock ? VIO_LOCALHOST : 0);
vio_tmp= mysql_socket_vio_new(new_sock, vio_type, vio_flags);
if (!vio_tmp || my_net_init(&thd->net, vio_tmp)) { /* Only delete the temporary vio if we didn't already attach it to the NET object. The destructor in THD will delete any initialized net structure. */ if (vio_tmp && thd->net.vio != vio_tmp) vio_delete(vio_tmp); else { (void) mysql_socket_shutdown(new_sock, SHUT_RDWR); (void) mysql_socket_close(new_sock); } delete thd; statistic_increment(connection_errors_internal, &LOCK_status); continue; } init_net_server_extension(thd); if (mysql_socket_getfd(sock) == mysql_socket_getfd(unix_sock)) thd->security_ctx->set_host((char*) my_localhost);
create_new_thread(thd); }

在得到消息以后会对thd结构体进行填充实际内容,然后创建新的县城去进行下一步的操作

/* Initialize scheduler for --thread-handling=one-thread-per-connection*/
#ifndef EMBEDDED_LIBRARYvoid one_thread_per_connection_scheduler(){ scheduler_init(); one_thread_per_connection_scheduler_functions.max_threads= max_connections; thread_scheduler= &one_thread_per_connection_scheduler_functions;}#endif

通过宏定义初始化thread_scheduler

MYSQL_CALLBACK(thread_scheduler, add_connection, (thd))

进行子线程的创建

 if ((error= mysql_thread_create(key_thread_one_connection, &thd->real_id, &connection_attrib, handle_one_connection, (void*) thd)))

handle_one_connection就是线程函数,其中有一个do_command函数值得关注,注意它是从sql_connect过来的

 thd->m_server_idle= true; packet_length= my_net_read(net); thd->m_server_idle= false;

几经转折之后来到读取客户端消息的位置,其中的net是消息结构里存在的。它将会对消息包进行读取,解压缩,剥离出头部数据

size_t vio_read(Vio *vio, uchar *buf, size_t size){ ssize_t ret; int flags= 0; DBUG_ENTER("vio_read");
/* Ensure nobody uses vio_read_buff and vio_read simultaneously. */ DBUG_ASSERT(vio->read_end == vio->read_pos);
/* If timeout is enabled, do not block if data is unavailable. */ if (vio->read_timeout >= 0) flags= VIO_DONTWAIT;
while ((ret= mysql_socket_recv(vio->mysql_socket, (SOCKBUF_T *)buf, size, flags)) == -1) { int error= socket_errno;
/* The operation would block? */ if (error != SOCKET_EAGAIN && error != SOCKET_EWOULDBLOCK) break;
/* Wait for input data to become available. */ if ((ret= vio_socket_io_wait(vio, VIO_IO_EVENT_READ))) break; }
DBUG_RETURN(ret);}

对套接字进行读取使用的是recv

int recv( SOCKET s, char *buf, int len, int flags);result= recv(mysql_socket.fd, buf, IF_WIN((int),) n, flags);
command= (enum enum_server_command) (uchar) packet[0];

消息体的第一个数据是非常有用的,它是:标识消息类型的代码

bool dispatch_command(enum enum_server_command command, THD *thd, char* packet, uint packet_length){ switch (command) { case COM_INIT_DB: ... case SQLCOM_EXECUTE: { mysql_sql_stmt_execute(thd); break; } case SQLCOM_DEALLOCATE_PREPARE: { mysql_sql_stmt_close(thd); break; }
/* and so on for 18 other cases */ default: send_error(thd, ER_UNKNOWN_COM_ERROR); break; }

进入分发会发现一个非常大的switch包含:查询,终止进程,睡眠,连接以及一些次要命令

mysql_execute_command(THD *thd){ case SQLCOM_REPLACE:#ifndef DBUG_OFF if (mysql_bin_log.is_open()) { /* Generate an incident log event before writing the real event to the binary log. We put this event is before the statement since that makes it simpler to check that the statement was not executed on the slave (since incidents usually stop the slave).
Observe that any row events that are generated will be generated before.
This is only for testing purposes and will not be present in a release build. */
Incident incident= INCIDENT_NONE; DBUG_PRINT("debug", ("Just before generate_incident()")); DBUG_EXECUTE_IF("incident_database_resync_on_replace", incident= INCIDENT_LOST_EVENTS;); if (incident) { Incident_log_event ev(thd, incident); if (mysql_bin_log.write_incident(&ev, true/*need_lock_log=true*/)) { res= 1; break; } } DBUG_PRINT("debug", ("Just after generate_incident()")); }#endif case SQLCOM_INSERT: { DBUG_ASSERT(first_table == all_tables && first_table != 0);
/* Since INSERT DELAYED doesn't support temporary tables, we could not pre-open temporary tables for SQLCOM_INSERT / SQLCOM_REPLACE. Open them here instead. */ if (first_table->lock_type != TL_WRITE_DELAYED) { if ((res= open_temporary_tables(thd, all_tables))) break; }
if ((res= insert_precheck(thd, all_tables))) break;
MYSQL_INSERT_START(thd->query()); res= mysql_insert(thd, all_tables, lex->field_list, lex->many_values, lex->update_list, lex->value_list, lex->duplicates, lex->ignore); MYSQL_INSERT_DONE(res, (ulong) thd->get_row_count_func()); /* If we have inserted into a VIEW, and the base table has AUTO_INCREMENT column, but this column is not accessible through a view, then we should restore LAST_INSERT_ID to the value it had before the statement. */ if (first_table->view && !first_table->contain_auto_increment) thd->first_successful_insert_id_in_cur_stmt= thd->first_successful_insert_id_in_prev_stmt;
DBUG_EXECUTE_IF("after_mysql_insert", { const char act[]= "now " "wait_for signal.continue"; DBUG_ASSERT(opt_debug_sync_timeout > 0); DBUG_ASSERT(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); };); break; } case SQLCOM_REPLACE_SELECT: case SQLCOM_INSERT_SELECT:

通过statement对象,可以得到更细小的指令集

 error= write_record(thd, table, &info, &update);

mysql_insert会带我们到达一个更底层的位置

int handler::ha_write_row(uchar *buf){ int error; Log_func *log_func= Write_rows_log_event::binlog_row_logging_function; DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK);
DBUG_ENTER("handler::ha_write_row"); DBUG_EXECUTE_IF("inject_error_ha_write_row", DBUG_RETURN(HA_ERR_INTERNAL_ERROR); );
MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str); mark_trx_read_write();
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0, { error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error); if (unlikely(error)) DBUG_RETURN(error);
if (unlikely(error= binlog_log_row(table, 0, buf, log_func))) DBUG_RETURN(error); /* purecov: inspected */
DEBUG_SYNC_C("ha_write_row_end"); DBUG_RETURN(0);}

其中的write_row是一个虚函数,它将在运行的时候被赋予不同引擎的接口实现

 virtual int write_row(uchar *buf MY_ATTRIBUTE((unused))) { return HA_ERR_WRONG_COMMAND;  }

例如CSV的实现

int ha_tina::write_row(uchar * buf){ int size; DBUG_ENTER("ha_tina::write_row");
if (share->crashed) DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
ha_statistic_increment(&SSV::ha_write_count);
size= encode_quote(buf);
if (!share->tina_write_opened) if (init_tina_writer()) DBUG_RETURN(-1);
/* use pwrite, as concurrent reader could have changed the position */ if (mysql_file_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size, MYF(MY_WME | MY_NABP))) DBUG_RETURN(-1);
/* update local copy of the max position to see our own changes */ local_saved_data_file_length+= size;
/* update shared info */ mysql_mutex_lock(&share->mutex); share->rows_recorded++; /* update status for the log tables */ if (share->is_log_table) update_status(); mysql_mutex_unlock(&share->mutex);
stats.records++; DBUG_RETURN(0);}