vlambda博客
学习文章列表

MySQL审计数据归档演示

在此博客中,我将演示如何在许多mysql实例之间将审计日志进行合并归档。在后续文章中,我将展示如何通过在该归档文件上创建一个简单的哈希链来扩展此示例–这样您就可以证明是否可以通过任何方式对其进行了修改或污染,以及在何处进行了修改。

在示例代码中,我将使用mysql audit_log_read函数的新扩展功能,并说明为什么mysqlx API可以使某些任务更加简单。这个新的审计阅读功能已在MySQL 8.0.22企业版中发布。示例内容使用SQL和python模式运行MySQL Shell

将展示一些的其他技巧包括:

  1. 从JSON审计数据中提取行–使用JSON_TABLE函数将JSON数据转换为表格式

  2. 将这些行从已审计的数据库插入到审计数据归档的MySQL数据库中。如您所见,mysqlx API将使事情变得更加简单。

一些事实。正如许多DBA可以告诉您的那样,无论是法规阻止还是出于其他安全原因,DBA通常不想(或无法)访问运行MySQL的底层OS服务器。DBA没有SSH!通常从安全角度来看,运行数据库服务的OS上的内容越少越好。

由于安全性、分析等多种原因,最佳做法是经常从MySQL服务器上获取审计数据,并将其收集到一些中央数据存储中,您可以在其中查看所有MySQL服务器上的活动。为什么会这样做?

  1. 易于分析

  2. 防止数据被破坏

  3. 法规要求

  4. 存储管理

当然,可以使用多种方法通过各种产品来执行移动审计数据任务。这只是一种可能的设计模式,可以轻松地进行第三方集成或更改为将数据写入对象存储或某些其他审计数据存储库。

在术语方面,我将合并审计数据的服务器称为“归档服务器”。该服务器将拥有一个帐户,我将称其为“ auditarchiver”,该帐户只能在audit_data表中插入并选择。(它不能更改数据)。

将要提取审计数据的每个服务器都有一个帐户,该帐户通过SQL连接读取审计数据,并从审计文件中读取JSON数据。

首先让我们以管理员身份登录到归档MySQL服务器实例上–我将使用root。整个示例都需要使用mysql shell。它包括用于从目标服务器提取审计数据进行计划批处理归档的python。

步骤1 –审计归档数据库设置。
在归档服务器上创建模式和表
在审计数据归档服务器上

> mysqlsh

\sql\connect root@<archiving server>;create schema audit_archive;
use audit_archive;
CREATE TABLE `audit_data` (`server_uuid` varchar(45) NOT NULL,`id` int NOT NULL,`ts` timestamp NOT NULL,`class` varchar(20) DEFAULT NULL,`event` varchar(80) DEFAULT NULL,`the_account` varchar(80) DEFAULT NULL,`login_ip` varchar(200) DEFAULT NULL,`login_os` varchar(200) DEFAULT NULL,`login_user` varchar(200) DEFAULT NULL,`login_proxy` varchar(200) DEFAULT NULL,`connection_id` varchar(80) DEFAULT NULL,`db` varchar(40) DEFAULT NULL,`status` int DEFAULT NULL,`connection_type` varchar(40) DEFAULT NULL,`connect_os` varchar(40) DEFAULT NULL,`pid` varchar(40) DEFAULT NULL,`_client_name` varchar(80) DEFAULT NULL,`_client_version` varchar(80) DEFAULT NULL,`program_name` varchar(80) DEFAULT NULL,`_platform` varchar(80) DEFAULT NULL,`command` varchar(40) DEFAULT NULL,`sql_command` varchar(40) DEFAULT NULL,`command_status` varchar(40) DEFAULT NULL,`query` varchar(40) DEFAULT NULL,`query_status` int DEFAULT NULL,`start_server_id` varchar(400) DEFAULT NULL,`server_os_version` varchar(100) DEFAULT NULL,`server_mysqlversion` varchar(100) DEFAULT NULL,`args` varchar(80) DEFAULT NULL,`account_host` varchar(80) DEFAULT NULL,`mysql_version` varchar(80) DEFAULT NULL,`the_os` varchar(80) DEFAULT NULL,`the_os_ver` varchar(80) DEFAULT NULL,`server_id` varchar(8) DEFAULT NULL,PRIMARY KEY (`server_uuid`,`id`,`ts`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

步骤2 –在归档服务器上
首先创建帐户。授予auditarchiver帐户选择和插入权限。(没有其他权限)

\connect root@<archiving server>;create user auditarchiver identified by 'Th3Archivista!';grant select, insert on audit_archive.audit_data to auditarchiver;


步骤3 –对于每台服务器读取(拉出)审计数据–创建auditreader帐户

> mysqlsh

\sql\connect root@<servers to pull audit trail>;select @@version;

/ *如果您未运行mysql企业版8.0.22或更高版本,请停止操作并升级* /

create user auditreader identified by 'Pu11Archivister!';GRANT AUDIT_ADMINON *.* TO 'auditreader';

步骤4 –如果尚未安装企业审计,请安装。

安装MySQL企业审计
简单地说,执行位于MySQL share目录下面的SQL - audit_log_filter_linux_install.sql

第5步–编辑/etc/my.cnf –此示例至少需要包含前3行。

[mysqld]plugin-load-add=audit_log.soaudit-log=FORCE_PLUS_PERMANENTaudit-log-format=JSONaudit-log-strategy=SYNCHRONOUS

重新启动服务器

步骤6 –添加审计过滤器并绑定到用户
如果您以前没有添加审计过滤器,则以下内容将记录所有连接的人。这将记录很多内如,如果出于在测试环境中查看这项工作的目的,这是合理的。在生产中,您可能会希望更具选择性。

--- create audit filter - here log everything (this will generate a great deal of data)SELECT audit_log_filter_set_filter('log_all', '{ "filter": { "log": true } }');--- attach filter to accounts - here for all usersSELECT audit_log_filter_set_user('%', 'log_all');

对每个审计的实例重复此操作。

步骤7 –生成一些审计数据活动
以各种用户身份在安装mysql企业审计服务器上运行一些SQL查询。

步骤8 –选择一个可以在批处理模式下调度mysqlsh的服务器

下面是批处理python脚本的工作方式(最后会重复合并后的代码以复制、编辑和运行)。
请更改使用的密码并使用特定的服务器名称等。

首先,我将使用mysqlx API通过自己的会话连接到读取服务器和归档服务器。

将“ localhost”更改为归档服务器的ip /主机名。

archive_session = mysqlx.get_session( {</code> <code>'host': 'localhost', 'port': 33060,</code> <code>'user': 'auditarchiver', 'password': 'Th3Archivista!'} )

将“ localhost”更改为已审计服务器的ip /主机名。

read_session = mysqlx.get_session( {'host': 'localhost', 'port': 33060,'user': 'auditreader', 'password': 'Pu11Archivister!'} )

好了,现在我需要看看我是否有之前的归档数据——这样我就可以指出审计数据中我需要开始读取更新数据的地方。如果归档不包含此实例的数据—我将从日志数据的开头开始
如果归档表不包含此实例的数据(由其server_uuid标识),则在JSON中创建带有“start”的json字符串。start”告诉该功能执行常规日期时间搜索。

但是,如果已经加载了先前的数据,那么我将获得插入的最后一个时间戳和事件ID,并将其用作审计数据的指针–在这种情况下,JSON搜索字符串中没有“start”。

archive_empty = archive_session.run_sql("select count(*) from audit_archive.audit_data limit 1").fetch_one()
if (archive_empty[0] > 0): print("Data in archive getting last event ts and id") search_args = archive_session.run_sql("select id, ts from audit_archive.audit_data order by ts desc, id desc limit 1").fetch_one() x = "set @nextts ='{ \"timestamp\": \"" + str(search_args[1]) + "\",\"id\":" + str(search_args[0]) + ", \"max_array_length\": 100 }'" setnext = read_session.run_sql(x)else: print("The archive is empty - get oldest audit event") read_session.run_sql("set @nextts='{ \"start\": { \"timestamp\": \"2020-01-01\"}, \"max_array_length\": 100 }'")

好的,我们现在为我的搜索条件设置了会话变量– @nextts。

如果要查看JSON搜索字符串
view_nextts = read_session.run_sql("select @nextts")

在下一步中,您将在SQL中看到对audit_log_read组件的调用
AUDIT_LOG_READ(@nextts)

您将看到,我希望在归档中以行形式存储数据——因此我使用JSON_TABLE函数将JSON转换为行。

readaudit = read_session.run_sql(" ""SELECT @@server_uuid as server_uuid, id, ts, class, event, the_account,login_ip,login_os,login_user,login_proxy,connection_id,db, "" status,connection_type,connect_os,pid,_client_name,_client_version, "" program_name,_platform,command,sql_command,command_status,query, "" query_status,start_server_id,server_os_version,server_mysqlversion,args, "" account_host,mysql_version,the_os,the_os_ver,server_id ""FROM ""JSON_TABLE ""( "" AUDIT_LOG_READ(@nextts), "" '$[*]' "" COLUMNS "" ( "" id INT PATH '$.id', "" ts TIMESTAMP PATH '$.timestamp', "" class VARCHAR(20) PATH '$.class', "" event VARCHAR(80) PATH '$.event', "" the_account VARCHAR(80) PATH '$.account', "" login_ip VARCHAR(200) PATH '$.login.ip', "" login_os VARCHAR(200) PATH '$.login.os', "" login_user VARCHAR(200) PATH '$.login.user', "" login_proxy VARCHAR(200) PATH '$.login.proxy', "" connection_id VARCHAR(80) PATH '$.connection_id', "" db VARCHAR(40) PATH '$.connection_data.db', "" status INT PATH '$.connection_data.status', "" connection_type VARCHAR(40) PATH '$.connection_data.connection_type', "" connect_os VARCHAR(40) PATH '$.connection_data.connection_attributes._os', "" pid VARCHAR(40) PATH '$.connection_data.connection_attributes._pid', "" _client_name VARCHAR(80) PATH '$.connection_data.connection_attributes._client_name', "" _client_version VARCHAR(80) PATH '$.connection_data.connection_attributes._client_version', "" program_name VARCHAR(80) PATH '$.connection_data.connection_attributes.program_name', "" _platform VARCHAR(80) PATH '$.connection_data.connection_attributes._platform', "" command VARCHAR(40) PATH '$.general_data.command', "" sql_command VARCHAR(40) PATH '$.general_data.sql_command', "" command_status VARCHAR(40) PATH '$.general_data.status', "" query VARCHAR(40) PATH '$.genera_data.query', "" query_status INT PATH '$.general_data.status', "" start_server_id VARCHAR(400) PATH '$.startup_data.server_id', "" server_os_version VARCHAR(100) PATH '$.startup_data.os_version', "" server_mysqlversion VARCHAR(100) PATH '$.startup_data.mysql_version', "" args VARCHAR(80) PATH '$.startup_data.args', "" account_host VARCHAR(80) PATH '$.account.host', "" mysql_version VARCHAR(80) PATH '$.startup_data.mysql_version', "" the_os VARCHAR(80) PATH '$.startup_data.os', "" the_os_ver VARCHAR(80) PATH '$.startup_data.os_version', "" server_id VARCHAR(80) PATH '$.startup_data.server_id' "" ) "") AS auditdata; ")

现在您可以使用所有JSON并将每个事件存储为JSON数据类型。这也很简单。但在这里,我存储在一个表中。由你决定。

好了–现在作为Auditarchiver –我将保存刚刚提取的数据。
这是mysqlx api非常方便的地方。我可以循环执行结果,并用很少的代码保存到表中。

aschema=archive_session.get_schema('audit_archive')atable=aschema.get_table('audit_data')if (archive_empty[0] > 0): evt = readaudit.fetch_one_object() print("Archive was not empty - skip first duplicate event")else: print("Archive was empty - load all")
evt = readaudit.fetch_one_object()while evt: atable.insert(evt).execute() evt= readaudit.fetch_one_object()

正如您可能已经注意到的那样–我并没有尝试从审计日志中一次提取过多。最大取了100。此外还受到为–audit-log-read-buffer-size设置的缓冲区大小的限制。

参见https://dev.mysql.com/doc/refman/8.0/en/audit-log-reference.html#sysvar_audit_log_read_buffer_size

因此,将此脚本保存到目录中。
cd到目录

现在,您只需以批处理模式运行mysqlsh。
第一次运行

frank@Mike % mysqlsh --py < archiveauditbatch

The archive is empty – get oldest audit event
Archive was empty – load all

下一次运行
frank@Mike % mysqlsh --py < archiveauditbatch
Data in archive getting last event ts and id
Archive was not empty – skip first duplicate event

好的-现在您已经运行了一些测试,使用cron或您喜欢的调度程序创建一个计划的批处理。让它循环直到清空。

第9步–登录到归档服务器,查看数据
select * from audit_archive.audit_data order by server_uuid, id, ts;

运行一些统计

select event, count(event) from audit_archive.audit_data group by event;
select login_user, event, count(event) from audit_archive.audit_data group by login_user, event;
select distinct login_user, sql_command, status, query_status from audit_archive.audit_data ;

最后–这与生产质量代码相差甚远,
我插入了密码,没有为审计服务器提供循环和收集数据等参数。没有检查错误等。重点是演示一些技术来帮助对其进行尝试的人。

在后续博客中-
我将向您展示如何执行哈希链等-这样您就可以证明您的审计数据是不可变的且不受污染。

感谢您使用MySQL。这是完整的批处理脚本 https://mysqlserverteam.com/mysql-audit-data-consolidation-made-simple/archivebatch/

感谢您关注”MySQL解决方案工程师“!