在 AWS 大数据博客中使用 Amazon Athena 与 Spark SQL 处理开源事务表格式
使用 Amazon Athena 搭配 Spark SQL 处理开源交易表格式
主要重点
在这篇文章中,我们将介绍如何使用 Amazon Athena 与 Spark SQL 来创建和管理 Apache Iceberg、Apache Hudi以及 Linux Foundation Delta Lake 这三种开源交易表格式。这些表格式提供了 ACID 交易、数据更新及删除等功能,以及早期仅可用于数据仓库的快照和时间旅行特性。本指南涵盖了如何从 AWS Glue 数据目录创建数据库和表、插入数据、查询数据等操作。
Amazon S3 支持的数据湖,配合 AWS 优越的可用性,非常适合不同数据和分析方法的组合。随著数据湖的扩展和成熟,确保数据与业务事件的持续一致性也愈加重要。越来越多的用户选择开源的交易表格式来处理数据,这些格式可以帮助用户高效地存储数据并简化处理过程。
前提条件
在开始之前,请确保满足以下前提:
确保符合 在 Amazon Athena Spark 上运行 Spark SQL 中提到的所有条件。创建一个名为 sparkblogdb 的数据库,并在 AWS Glue 数据目录中创建名为 noaapq 的表,具体详情请参见上述链接。设置 Athena 工作组使用的 IAM 角色对 Amazon S3 的存取权限,包括读写权限和删除对象的权限,详细信息请参考 Amazon S3 允许对 S3 存储桶中对象的读写访问。下载并导入示例笔记本
如需跟随本指南,请从以下链接下载相关的 Jupyter 笔记本:
教程名称下载链接Iceberg 教程笔记本s3//athenaexamplesuseast1/athenasparksqlblog/notebooks/SparkSQLicebergipynbHudi 教程笔记本s3//athenaexamplesuseast1/athenasparksqlblog/notebooks/SparkSQLhudiipynbDelta 教程笔记本s3//athenaexamplesuseast1/athenasparksqlblog/notebooks/SparkSQLdeltaipynb下载后,请根据 管理笔记本文件 中的 导入笔记本 章节将其导入到 Athena Spark 环境中。
使用 Apache Iceberg 表
使用 Athena 的 Spark 笔记本时,可以直接运行 SQL 查询,而无需使用 PySpark。我们通过在笔记本单元中添加 sql 魔法指令来达成,这样会将整个单元内容解释为一条 SQL 语句。
设置笔记本会话
在 Athena 中使用 Apache Iceberg 时,创建或编辑会话时,请在 Apache Spark 属性 部分选择 Apache Iceberg 选项。其将自动填充设置,如下所示:
详情请参考 编辑会话细节 或 创建笔记本。
在本节中使用的代码可从 SparkSQLicebergipynb 获取。
创建数据库和 Iceberg 表
首先,在 AWS Glue 数据目录中创建名为 icebergdb 的数据库:
sqlsqlCREATE DATABASE icebergdb
接下来,在 icebergdb 数据库中创建名为 noaaiceberg 的 Iceberg 表,并指向 Amazon S3 中将用于加载数据的路径。运行以下语句,并将 s3//ltyourS3bucketgt/ltprefixgt/ 替换为你的 S3 存储桶和前缀:
sqlsqlCREATE TABLE icebergdbnoaaiceberg( station string date string latitude string longitude string elevation string name string temp string tempattributes string dewp string dewpattributes string slp string slpattributes string stp string stpattributes string visib string visibattributes string wdsp string wdspattributes string mxspd string gust string max string maxattributes string min string minattributes string prcp string prcpattributes string sndp string frshtt string)USING icebergPARTITIONED BY (year string)LOCATION s3//ltyourS3bucketgt/ltprefixgt/noaaiceberg/
向表中插入数据
为填充 noaaiceberg Iceberg 表,我们将从之前创建的 sparkblogdbnoaapq Parquet 表中插入数据。可以通过以下 INSERT INTO 语句完成:
sqlsqlINSERT INTO icebergdbnoaaiceberg select from sparkblogdbnoaapq
或者,你可以使用 CREATE TABLE AS SELECT 语句,在一个步骤中创建 Iceberg 表并从源表插入数据:
sqlsqlCREATE TABLE icebergdbnoaaicebergUSING icebergPARTITIONED BY (year)AS SELECT FROM sparkblogdbnoaapq ORDER BY year
查询 Iceberg 表
现在数据已插入 Iceberg 表,我们可以开始分析它。以下查询将找出‘SEATTLE TACOMA AIRPORT WA US’位置的每年最低气温:
sqlsqlselect name year min(MIN) as minimumtemperaturefrom icebergdbnoaaicebergwhere name = SEATTLE TACOMA AIRPORT WA USgroup by 1 2
我们获得的输出如下:
更新 Iceberg 表中的数据
让我们看一下如何更新表中的数据。我们希望将站名‘SEATTLE TACOMA AIRPORT WA US’更改为‘SeaTac’。使用 Spark SQL,我们可以对 Iceberg 表执行以下 UPDATE 语句:
sqlsqlUPDATE icebergdbnoaaicebergSET name = SeaTacWHERE name = SEATTLE TACOMA AIRPORT WA US
然后,我们可以再次运行之前的 SELECT 查询,以查找‘SeaTac’地点的最低录得气温:
sqlsqlselect name year min(MIN) as minimumtemperaturefrom icebergdbnoaaicebergwhere name = SeaTacgroup by 1 2
我们得到的输出如下:
精简数据文件
像 Iceberg 这样的开放表格式通过创建增量变更来运作,并通过清单文件追踪行的版本。更多的数据文件会导致清单文件中存储更多元数据,而小数据文件则通常会造成不必要的元数据,降低查询效率,增加 Amazon S3 存取成本。在 Athena 中对表运行 Iceberg 的 rewritedatafiles 操作将压缩数据文件,将多个小的增量更改文件合并为一组更小的可读优化 Parquet 文件。如下 SQL 可用于运行压缩操作:
sqlsqlCALL sparkcatalogsystemrewritedatafiles(table =gt icebergdbnoaaiceberg strategy=gtsort sortorder =gt zorder(name))
rewritedatafiles 提供选项以指定你的排序策略,这可以帮助重新组织和合并数据。
列出表快照
每次写入、更新、删除、插入和压缩操作在 Iceberg 表上都会创建一个新的快照,同时保留旧数据和元数据以支持快照隔离和时间旅行。要列出 Iceberg 表的快照,则可运行如下 SQL 语句:
sqlsqlSELECT FROM sparkcatalogicebergdbnoaaicebergsnapshots
删除旧快照
建议定期删除快照,以删除不再需要的数据文件,并保持表元数据的较小尺寸。这不会删除仍然被非过期快照所需的文件。在 Athena 的 Spark 环境中,运行下列 SQL 可以删除 icebergdbnoaaiceberg 表中早于特定时间戳的快照:
sqlsqlCALL sparkcatalogsystemexpiresnapshots(icebergdbnoaaiceberg TIMESTAMP 20231130 000000000)
注意,时间戳值以 yyyyMMdd HHmmssfff 格式的字符串表示。命令的输出将显示已删除的数据文件和元数据文件数量。
删除表及数据库
通过运行以下 Spark SQL 语句,可以清理本次实验中创建的 Iceberg 表及相关的数据:
sqlsqlDROP TABLE icebergdbnoaaiceberg PURGE
接著执行以下 Spark SQL 来删除 icebergdb 数据库:
sqlsqlDROP DATABASE icebergdb
想了解更多有关使用 Spark for Athena 操作 Iceberg 表的资讯,请参考 Spark 查询 和 Spark 程序 的 Iceberg 文档。
使用 Apache Hudi 表
接下来,我们将展示如何使用 Spark for Athena 的 SQL 进行 Apache Hudi 的创建、分析和管理工作。
设置笔记本会话
在 Athena 中使用 Apache Hudi 时,在创建或编辑会话时,请在 Apache Spark 属性 部分选择 Apache Hudi 选项,这将自动填充相关设置。
详情请参考 编辑会话细节 或 创建笔记本。
在这一部分中使用的代码可以从 SparkSQLhudiipynb 获取。
创建数据库和 Hudi 表
首先,我们创建一个名为 hudidb 的数据库,将其存储在 AWS Glue 数据目录中,接下来创建 Hudi 表:
sqlsqlCREATE DATABASE hudidb
我们创建指向 Amazon S3 存储路径的 Hudi 表,这是在其中加载数据的地方。注意,该表为 拷贝型copyonwrite,并在表 DDL 中定义为 type=cow。我们定义站名和日期为主键,year 为 preCombineField。此外,该表还依据年份分区。运行以下语句,并将 s3//ltyourS3bucketgt/ltprefixgt/ 替换为你的 S3 存储桶和前缀:

sqlsqlCREATE TABLE hudidbnoaahudi( station string date string latitude string longitude string elevation string name string temp string tempattributes string dewp string dewpattributes string slp string slpattributes string stp string stpattributes string visib string visibattributes string wdsp string wdspattributes string mxspd string gust string max string maxattributes string min string minattributes string prcp string prcpattributes string sndp string frshtt string year string)USING HUDIPARTITIONED BY (year)TBLPROPERTIES( primaryKey = station date preCombineField = year type = cow)LOCATION s3//ltyourS3bucketgt/ltprefixgt/noaahudi/
向表中插入数据
使用 INSERT INTO 语句,我们可以从 sparkblogdbnoaapq 表插入数据到 Hudi 表中:
sqlsqlINSERT INTO hudidbnoaahudi select from sparkblogdbnoaapq
查询 Hudi 表
现在表已创建,让我们查询‘SEATTLE TACOMA AIRPORT WA US’位置的最大记录气温:
sqlsqlselect name year max(MAX) as maximumtemperaturefrom hudidbnoaahudiwhere name = SEATTLE TACOMA AIRPORT WA USgroup by 1 2
更新 Hudi 表中的数据
为了更新表,我们可以将站名从‘SEATTLE TACOMA AIRPORT WA US’更改为‘SeaTac’。可使用其更新UPDATE语句:
sqlsqlUPDATE hudidbnoaahudiSET name = SeaTacWHERE name = SEATTLE TACOMA AIRPORT WA US
然后再次运行 SELECT 查询以查找‘SeaTac’位置的最大记录气温:
sqlsqlselect name year max(MAX) as maximumtemperaturefrom hudidbnoaahudiwhere name = SeaTacgroup by 1 2
运行时间旅行查询
我们可以使用 SQL 的时间旅行查询来分析过去的数据快照。以下查询的例子:
云梯加速器免费sqlsqlselect name year max(MAX) as maximumtemperaturefrom hudidbnoaahudi timestamp as of 20231201 235343100where name = SEATTLE TACOMA AIRPORT WA USgroup by 1 2