[论文翻译]Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores2

Delta lake

博客总算开张了,感觉每天其实花不了多少时间就能整理一下,以后也可以把之前一些Notion里面的笔记整理出来发成博客,希望能坚持下去

Abstract

Amazon S3 等云对象存储是地球上最大、最具成本效益的存储系统之一,使其成为存储大型数据仓库和数据湖的有吸引力的目标。不幸的是,它们作为键值存储的实现使得难以实现 ACID 事务和高性能:metadata 操作(例如 LIST )成本高昂,并且一致性保证有限。在本文中,我们介绍了 Delta Lake,它是最初在 Databricks 开发的云对象存储之上的开源 ACID 表存储层。 Delta Lake 使用压缩为 Apache Parquet 格式的事务日志来为大型表格数据集提供 ACID 属性、time travel 和显着更快的 metadata 操作(例如,能够快速搜索数十亿个表分区以查找与查询相关的分区)。它还利用这种设计来提供高级功能,例如自动数据布局优化、更新插入、缓存和审计日志。 Delta Lake 表可以从 Apache Spark、Hive、Presto、Redshift 和其他系统访问。 Delta Lake 部署在数千个每天处理 EB 级数据的 Databricks 客户中,其中最大的实例管理 EB 级数据集和数十亿个对象。

1. Introduction

Amazon S3 [4] 和 Azure Blob Storage [17] 等云对象存储已成为地球上规模最大、使用最广泛的存储系统之一,为数百万客户保存 EB 级数据 [46]。除了云服务的传统优势,如即用即付计费、规模经济和专家管理[15],云对象存储特别有吸引力,因为它们允许用户分别扩展计算和存储资源:对于例如,用户可以存储 PB 的数据,但只能运行一个集群来对其执行几个小时的查询。因此,许多组织现在使用云对象存储来管理数据仓库和数据湖中的大型结构化数据集。

主要的开源“大数据”系统,包括 Apache Spark、Hive 和 Presto [45, 52, 42],支持使用 Apache Parquet 和 ORC [13, 12] 等文件格式读取和写入云对象存储。包括 AWS Athena、Google BigQuery 和 Redshift Spectrum [1, 29, 39] 在内的商业服务也可以直接查询这些系统和这些开放文件格式。

不幸的是,尽管许多系统支持对云对象存储的读写,但在这些系统上实现高性能和可变的表存储具有挑战性,因此很难在它们上实现数据仓库功能。与 HDFS [5] 等分布式文件系统或 DBMS 中的自定义存储引擎不同,大多数云对象存储只是键值存储,没有跨键一致性保证。它们的性能特征也与分布式文件系统有很大不同,需要特别注意。

在云对象存储中存储关系数据集的最常见方法是使用列式文件格式,例如 Parquet 和 ORC,其中每个表都存储为一组对象(Parquet 或 ORC “文件”),可能按某些字段(例如,每个日期的一组单独对象)聚集成“分区”[45]。只要目标文件适中,这种方法就可以为扫描工作负载提供可接受的性能。但是,它为更复杂的工作负载带来了正确性和性能方面的挑战。首先,因为多对象更新不是原子的,查询之间没有隔离:例如,如果一个查询需要更新表中的多个对象(例如,删除所有表的 Parquet 文件中关于一个用户的记录),读者当查询单独更新每个对象时,将看到部分更新。回滚写入也很困难:如果更新查询崩溃,则表处于损坏状态。其次,对于拥有数百万个对象的大表, metadata 操作成本很高。例如,Parquet 文件包含具有最小/最大统计信息的 footer ,可用于在选择性查询中跳过读取它们。在 HDFS 上读取这样的 footer 可能需要几毫秒,但云对象存储的延迟要高得多,以至于这些数据跳过检查可能比实际查询需要更长的时间。

根据我们与云客户合作的经验,这些一致性和性能问题给企业数据团队带来了重大挑战。大多数企业数据集都在不断更新,因此它们需要原子写入的解决方案;大多数关于用户的数据集需要全表更新以实施隐私政策,例如 GDPR 合规性 [27];甚至纯粹的内部数据集也可能需要更新以修复不正确的数据、合并迟到的记录等。有趣的是,在 Databricks 云服务的最初几年(2014-2016),我们收到的支持升级中约有一半是由于数据由于云存储策略而导致的损坏、一致性或性能问题(例如,撤销崩溃的更新作业的影响,或提高读取数万个对象的查询的性能)。

为了应对这些挑战,我们设计了 Delta Lake,这是一个基于云对象存储的 ACID 表存储层,我们于 2017 年开始向客户提供并于 2019 年开源 [26]。 Delta Lake 的核心思想很简单:我们以 ACID 方式维护有关哪些对象属于 Delta 表一部分的信息,使用本身存储在云对象存储中的预写日志。对象本身是用 Parquet 编码的,这使得从已经可以处理 Parquet 的引擎编写连接器变得容易。这种设计允许客户端以可序列化的方式一次更新多个对象,用另一个对象替换对象的一个子集,等等,同时仍然从对象本身实现高并行读写性能(类似于原始 Parquet)。日志还包含 metadata ,例如每个数据文件的最小/最大统计信息,与“对象存储中的文件”方法相比, metadata 搜索速度快了一个数量级。至关重要的是,我们设计了 Delta Lake,以便所有 metadata 都在底层对象存储中,并且使用针对对象存储的乐观并发 protocol 来实现事务(一些细节因云提供商而异)。这意味着不需要运行服务器来维护 Delta 表的状态;用户只需在运行查询时启动服务器,即可享受计算和存储分开扩展的好处。

基于这种事务性设计,我们还能够在 Delta Lake 中添加传统云数据湖中不具备的多项其他功能,以解决常见的客户痛点,包括:

  • time travel 让用户可以查询时间点快照或回滚对其数据的错误更新。
  • UPSERT、DELETE 和 MERGE 操作,它们有效地重写相关对象以实现对存档数据和合规性工作流的更新(例如,对于 GDPR [27])。
  • 高效的流I/O,通过让流式作业以低延迟将小对象写入表中,然后稍后将它们以事务方式合并为更大的对象以提高性能。 还支持对添加到表中的新数据进行快速“拖尾”读取,因此作业可以将 Delta 表视为消息总线。
  • 缓存:因为Delta 表中的对象及其日志是不可变的,集群节点可以安全地将它们缓存在本地存储上。我们在 Databricks 云服务中利用它来为 Delta 表实现透明的 SSD 缓存。
  • 数据布局优化:我们的云服务包括一项功能,可自动优化表中对象的大小和数据记录的聚类(例如,以 Z-order 存储记录以实现多个维度的局部性),而不会影响正在运行的查询。
  • 架构演变,允许 Delta 继续读取旧的 Parquet 文件,而无需在表的架构更改时重写它们。
  • 基于事务日志的审计日志。

这些功能共同提高了在云对象存储中处理数据的可管理性和性能,并启用了一个“lakehouse”范式,该范式结合了数据仓库和数据湖的关键功能:标准 DBMS 管理功能可直接用于低成本对象存储。事实上,我们发现许多 Databricks 客户可以使用 Delta Lake 简化他们的整体数据架构,通过用 Delta 表替换以前单独的数据湖、数据仓库和流存储系统,为所有这些用例提供适当的功能。图 1 显示了一个极端示例,其中包含对象存储、消息队列和用于不同商业智能团队(每个团队运行自己的计算资源)的两个数据仓库的 data pipeline 被替换为对象存储上的 Delta 表,使用 Delta 的流 I/O 和性能特性来运行 ETL 和 BI。新管道仅使用低成本的对象存储并创建更少的数据副本,从而降低了存储成本和维护开销。

Delta Lake 现在被 Databricks 的大多数大客户使用,每天处理 EB 的数据(大约是我们总工作量的一半)。它还得到了谷歌云、阿里巴巴、腾讯、Fivetran、Informatica、Qlik、Talend 等产品的支持 [50, 26, 33]。在 Databricks 客户中,Delta Lake 的 user case 非常多样化,从传统的 ETL 和数据仓库工作负载到生物信息学、实时网络安全分析(每天数百 TB 的流事件数据)、GDPR 合规性和机器学习的数据管理(管理数百万张图像作为 Delta 表中的记录而不是 S3 对象以获得 ACID 和改进的性能)。我们在第 5 节中详细介绍了这些用例。

lakehouse1

有趣的是,Delta Lake 将 Databricks 的云存储支持问题的比例从一半减少到几乎没有。 它还提高了大多数客户的工作负载性能,在使用其数据布局优化和快速访问统计数据来查询非常高维数据集(例如,网络安全和生物信息学用例)的极端情况下,加速高达 100 倍。 开源 Delta Lake 项目 [26] 包括到 Apache Spark(批处理或流媒体)、Hive、Presto、AWS Athena、Redshift 和 Snowflake 的连接器,并且可以在多个云对象存储或 HDFS 上运行。 在本文的其余部分,我们将介绍 Delta Lake 的动机和设计,以及激发我们设计动机的客户用例和性能实验。

2. Motivation: Characteristics and challenges of object stores

在本节中,我们将描述云对象存储的 API 和性能特征,以解释为什么这些系统上的高效表存储具有挑战性,并概述管理这些系统上的表格数据集的现有方法。

2.1 Object Store APIs

云对象存储,例如 Amazon S3 [4] 和 Azure Blob Storage [17]、Google Cloud Storage [30] 和 OpenStack Swift [38],提供了一个简单但易于扩展的键值存储接口。这些系统允许用户创建每个存储多个对象的存储桶,每个存储桶都是大小不超过几 TB 的二进制 blob(例如,在 S3 上,对象大小的限制为 5 TB [4])。每个对象都由一个字符串键标识。在文件系统路径(例如,warehouse/table1/part1.parquet)之后对键进行建模是很常见的,但与文件系统不同,云对象存储不提供对象或“目录”的廉价重命名。云对象存储还提供 metadata API,例如 S3 的 LIST 操作 [41],它通常可以在给定开始键的情况下,按照键的字典顺序列出存储桶中的可用对象。如果使用文件系统样式的路径,通过在代表该目录前缀的键(例如,warehouse/table1/)处启动 LIST 请求,这使得有效地列出“目录”中的对象成为可能。不幸的是,这些 metadata API 通常很昂贵:例如,S3 的 LIST 每次调用最多只能返回 1000 个键,并且每次调用需要几十到几百毫秒,因此使用顺序执行。

读取对象时,云对象存储通常支持字节范围请求,因此仅读取大对象内的范围(例如,字节 10,000 到 20,000)是有效的。这使得利用将经常访问的值聚集在一起的存储格式成为可能。

更新对象通常需要一次重写整个对象。这些更新可以是原子的,这样读者要么看到新的对象版本,要么看到旧的。一些系统还支持附加到对象 [48]。

一些云供应商还在 blob 存储上实现了分布式文件系统接口,例如 Azure 的 ADLS Gen2 [18],它基于与 Hadoop 的 HDFS 相似的语义(例如,目录和原子重命名)。尽管如此,Delta Lake 解决的许多问题,例如小文件 [36] 和跨多个目录的原子更新,即使在使用分布式文件系统时仍然存在——事实上,多个用户通过 HDFS 运行 Delta Lake。

2.2 Consistency Properties

最流行的云对象存储为每个键提供最终一致性,但没有跨键的一致性保证,这在管理由多个对象组成的数据集时带来了挑战,如简介中所述。特别是,当一个客户端上传一个新对象后,其他客户端不一定能立即看到 LIST 中的对象或进行读取操作。同样,其他客户端可能不会立即看到对现有对象的更新。此外,根据对象存储,即使客户端进行写入也可能不会立即看到新对象。

确切的一致性模型因云提供商而异,并且可能相当复杂。作为一个具体的例子,Amazon S3 为写入新对象的客户端提供了写后读一致性,这意味着 S3 的 GET 等读取操作将在 PUT 之后返回对象内容。但是,有一个例外:如果写入对象的客户端在其 PUT 之前向(不存在的)键发出 GET,则后续 GET 可能不会在一段时间内读取该对象,这很可能是因为 S3 使用了negative caching。此外,S3 的 LIST 操作始终是最终一致的,这意味着 PUT 之后的 LIST 可能不会返回新对象 [40]。其他云对象存储提供更强的保证 [31],但仍然缺乏跨多个键的原子操作。

注意到,现在 S3 已经提供了更强的一致性,见于 https://aws.amazon.com/cn/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/

2.3 Performance Characteristic

根据我们的经验,使用对象存储实现高吞吐量需要在大型顺序 I/O 和并行性之间进行仔细平衡。对于读取,可用的最细粒度的操作是读取顺序字节范围,如前所述。每次读取操作通常会产生至少 5-10 ms 的基本延迟,然后可以大约 50-100 MB/s 的速度读取数据,因此一次操作至少需要读取数百 KB 才能实现至少一半的顺序峰值吞吐量。读取和数兆字节以接近峰值吞吐量。此外,在典型的 VM 配置中,应用程序需要并行运行多个读取以最大化吞吐量。例如,AWS 上最常用于分析的 VM 类型至少具有 10 Gbps 的网络带宽,因此它们需要运行 8-10 个并行读取以充分利用此带宽。

LIST 操作也需要显着的并行性来快速列出大型对象集。例如,S3 的 LIST 操作每个请求最多只能返回 1000 个对象,并且需要几十到几百毫秒,因此客户端需要并行发出数百个 LIST 来列出大桶或“目录”。在我们针对云中 Apache Spark 的优化运行时中,我们有时会在 Spark 集群中的工作节点上并行化 LIST 操作以及驱动程序节点中的线程,以使它们运行得更快。在 Delta Lake 中,有关可用对象的 metadata (包括它们的名称和数据统计信息)存储在 Delta 日志中,但我们也在集群上并行化从该日志中读取。

写操作通常必须替换整个对象(或附加到它),如第 2.1 节所述。这意味着如果一个表需要接收点更新,那么其中的对象应该保持小,这与支持大读取不一致。或者,可以使用日志结构的存储格式。

对表存储的影响。对象存储的性能特征导致分析工作负载的三个注意事项:

  1. 将频繁访问的数据按顺序排列,这通常会导致选择列格式。
  2. 使object变大,但不要太大。大对象会增加更新数据的成本(例如,删除有关一个用户的所有数据),因为它们必须完全重写。
  3. 避免 LIST 操作,并尽可能让这些操作请求字典键范围。

2.4 Existing Approaches for Table Storage

基于对象存储的特性,目前使用三种主要的方法来管理它们上的表格数据集。我们简要概述了这些方法及其挑战。

  1. 文件目录。开源big data stack 以及许多云服务支持的最常见方法是将表存储为对象的集合,通常采用列式格式,例如 Parquet。作为一种改进,可以根据一个或多个属性将记录“划分”到目录中。例如,对于带有日期字段的表,我们可能会为每个日期创建一个单独的对象目录,例如 mytable/date=2020-01-01/obj1 和 mytable/date=2020-01-01/obj2 用于数据从 1 月 1 日开始,然后是 1 月 2 日的 mytable/date=2020-01-02/obj1,依此类推,并根据此字段将传入数据拆分为多个对象。这样的分区降低了 LIST 操作和只访问少数分区的查询的读取成本。

这种方法很有吸引力,因为表“只是一堆对象”,可以从许多工具访问,而无需运行任何额外的数据存储或系统。它起源于 HDFS 上的 Apache Hive [45],与 Parquet、Hive 和其他文件系统上的大数据软件相匹配。
这种方法的挑战。如简介中所述,“仅一堆文件”方法在云对象存储上存在性能和一致性问题。客户遇到的最常见挑战是:

  1. 没有跨多个对象的原子性:任何需要写入或更新多个对象的事务都有部分写入对其他客户端可见的风险。此外,如果此类事务失败,数据将处于损坏状态。
  2. 最终一致性:即使 transaction 成功,客户端也可能会看到一些更新的对象,但看不到其他对象。
  3. 性能差: LIST 以查找与查询相关的对象的开销很大,即使它们通过一个键分区到目录中。此外,访问存储在 Parquet 或 ORC 文件中的每个对象的统计数据非常昂贵,因为它需要对每个功能进行额外的高延迟读取。
  4. 无管理功能:对象存储不实施标准实用程序,例如数据仓库中熟悉的表版本控制或审计日志。
  5. 自定义存储引擎。为云构建的“封闭世界”存储引擎,例如 Snowflake 数据仓库 [23],可以通过在单独的、强一致的服务中管理 metadata 本身来绕过云对象存储的许多一致性挑战,该服务拥有“source of truth”关于哪些 objects 构成了一张 table。在这些引擎中,云对象存储可以被视为dumb block device,并且可以使用标准技术在云对象上实现高效的 metadata 存储、搜索、更新等。然而,这种方法需要运行高度可用的服务来管理 metadata ,这可能很昂贵,在使用外部计算引擎查询数据时会增加开销,并且会将用户锁定到一个提供者。

这种方法的挑战。尽管全新的“封闭世界”设计有很多好处,但我们在使用这种方法时遇到的一些具体挑战是:

  • 所有I/O 操作都需要联系 metadata 服务,这会增加其资源成本并降低性能和可用性。例如,在 Spark 中访问 Snowflake 数据集时,从 Snowflake 的 Spark 连接器读取的数据通过 Snowflake 的服务流式传输,与直接从云对象存储读取相比,性能降低。
  • 与重用现有开放格式(如 Parquet)的方法相比,连接到现有计算引擎需要更多的工程工作来实现。根据我们的经验,数据团队希望对他们的数据使用广泛的计算引擎(例如 Spark、TensorFlow、PyTorch 等),因此使连接器易于实现非常重要。
  • 专有 metadata 服务将用户与特定的服务提供商联系起来,而基于直接访问云存储中的对象的方法使用户能够始终使用不同的技术访问他们的数据。
    Apache Hive ACID [32] 通过使用 Hive Metastore(一种事务性 RDBMS,如 MySQL)在 HDFS 或对象存储上实现了类似的方法,以跟踪保存以 ORC 格式存储的表的更新的多个文件。但是,这种方法受到 Metastore 性能的限制,根据我们的经验,这可能成为具有数百万个对象的表的瓶颈。
  1. Object Stores 中的 metadata 。 DeltaLake 的方法是直接在云对象存储中存储事务日志和 metadata ,并在对象存储操作上使用一组 protocol 来实现可序列化。 然后,表中的数据以 Parquet 格式存储,只要有最小的连接器可用于发现要读取的对象集,就可以轻松地从任何已经支持 Parquet 的软件访问。 尽管我们认为 Delta Lake 是第一个使用这种设计的系统(从 2016 年开始),但现在还有另外两个软件包也支持它——Apache Hudi [8] 和 Apache Iceberg [10]。 Delta Lake 提供了许多这些系统不支持的独特功能,例如 Z-order 集群、缓存和后台优化。 我们在第 8 节中更详细地讨论了这些系统之间的异同。

3. Delta Lack Storage format and access protocols

Delta Lake 表是云对象存储或文件系统上的一个目录,其中包含包含表内容和事务操作日志(偶尔有 checkpoint )的数据对象。 客户端使用我们针对云对象存储特性量身定制的乐观并发控制 protocol 来更新这些数据结构。 在本节中,我们将描述 Delta Lake 的存储格式和这些访问 protocol 。 我们还描述了 Delta Lake 的事务隔离级别,包括表内的可序列化和快照隔离。

3.1 Storage Format

图 2 显示了 Delta 表的存储格式。 每个表都存储在文件系统目录(此处为 mytable)中,或作为对象存储中以相同“目录”键前缀开头的对象。

image-20211117231929209

3.1.1 Data Objects

表内容存储在 Apache Parquet 对象中,可能使用 Hive 的分区命名约定组织到目录中。
例如,在图 2 中,表按日期列分区,因此每个日期的数据对象位于不同的目录中。我们选择 Parquet 作为我们的底层数据格式,因为它面向列,提供多样化的压缩更新,支持半结构化数据的嵌套数据类型,并且已经在许多引擎中实现了高性能。建立在现有的开放文件格式上还确保 Delta Lake 可以继续利用新发布的 Parquet 库更新,并简化与其他引擎的连接器的开发(第 4.8 节)。其他开源格式,如 ORC [12],可能也有类似的工作方式,但 Parquet 在 Spark 中拥有最成熟的支持。
Delta 中的每个数据对象都有一个唯一的名称,通常由作者通过生成 GUID 来选择。但是,哪些对象属于表的每个版本是由事务日志决定的。

3.1.2 Log

日志存储在表内的 _delta_log 子目录中。 它包含一系列 JSON 对象,这些对象具有递增的、用零填充的数字 ID 来存储日志记录,以及特定日志对象的临时 checkpoint ,这些对象以 Parquet 格式汇总到该点的日志。 正如我们在第 3.2 节中讨论的,一些简单的访问 protocol (取决于每个对象存储中可用的原子操作)用于创建新的日志条目或 checkpoint ,并让客户端就事务的顺序达成一致。

每个日志记录对象(例如 000003.json)都包含一组操作,这些操作应用于表的先前版本以生成下一个版本。 可用的操作是:

  • Change Metadata ,metaData 操作更改表的当前 metadata 。表的第一个版本必须包含 metadata 操作。后续的 metadata 操作会完全覆盖表的当前 metadata 。 metadata 是一个数据结构,包含模式、分区列名称(即我们示例中的日期)、数据文件的存储格式(通常是 Parquet,但这提供了可扩展性)和其他配置选项,例如将表标记为仅附加。
  • Add or Remove Files。添加和删除操作分别用于通过添加或删除单个数据对象来修改表中的数据。因此,客户端可以搜索日志以查找所有尚未删除的添加对象,以确定构成该表的对象集。
    • 数据对象的 add record 还可以包括数据统计信息,例如总记录数和每列的最小值/最大值和空值。当表中已存在的路径遇到添加操作时,最新版本的统计信息将替换任何先前版本的统计信息。这可用于在新版本的 Delta Lake 中“升级”具有更多类型统计信息的旧表。
    • Delete record 包括一个时间戳,指示删除发生的时间。在用户指定的保留时间阈值之后,数据对象的物理删除可能会延迟发生。此延迟允许并发读取器继续针对过时的数据快照执行。删除操作应该保留在日志和任何日志 checkpoint 中作为 tombstone ,直到底层数据对象被删除。
    • 添加或删除操作上的 dataChange 标志可以设置为 false 以指示此操作与同一日志记录对象中的其他操作组合时,仅重新排列现有数据或添加统计信息。例如,跟踪事务日志的流式查询可以使用此标志跳过不会影响其结果的操作,例如更改早期数据文件中的排序顺序。
  • Protocol Evolution, protocol 操作用于增加读取或写入给定表所需的 Delta protocol 版本。 我们使用此操作向格式添加新功能,同时指示哪些客户端仍然兼容。
  • Add Provenance Information. 每个日志记录对象还可以在 commitInfo 操作中包含出处信息,例如,记录哪个用户执行了操作。
  • Update Application Transaction IDs. DeltaLake 还为应用程序提供了一种将它们自己的数据包含在日志记录中的方法,这对于实现端到端的事务应用程序非常有用。例如,写入 Delta 表的流处理系统需要知道哪些写入之前已经提交,以实现“ exactly-once ”语义:如果流式作业崩溃,它需要知道哪些写入之前已提交将其放入表中,以便它可以 replay 从其输入流中正确偏移量开始的后续写入。为了支持这个用例,Delta Lake 允许应用程序在其日志记录对象中使用 appId 和 version 字段编写自定义 txn 操作,这些字段可以跟踪特定于应用程序的信息,例如我们示例中输入流中的相应偏移量。通过将此信息与相应的 Delta 添加和删除操作放在同一日志记录中,并以原子方式插入日志,应用程序可以确保 Delta Lake 以原子方式添加新数据并存储其版本字段。每个应用程序可以简单地随机生成其 appId 以接收唯一 ID。我们在 Delta Lake 连接器中使用此工具进行 Spark 结构化流 [14]。

3.1.3 Log CheckPoints

为了性能,需要定期将日志压缩成 checkpoint 。 checkpoint 以 Parquet 格式将所有非冗余操作存储在表的日志中,直到某个日志记录 ID。某些操作集是多余的,可以删除。这些包括:

  • 为同一数据对象添加操作,然后是删除操作。由于数据对象不再是表的一部分,因此可以删除添加。根据表的数据保留配置,删除操作应保留为 tombstone 。具体来说,客户端在删除操作中使用时间戳来决定何时从存储中删除对象。
  • 同一对象的多次添加可以被最后一个替换,因为新添加的只能添加统计信息。
  • 来自同一个appId 的多个 txn 操作可以替换为最新的操作,其中包含其最新版本字段。
  • changeMetadata 和 protocol 操作也可以合并以仅保留最新的 metadata 。

因此, checkpoint 过程的最终结果是一个 Parquet 文件,其中包含为仍在表中的每个对象添加记录,删除已删除但需要保留到保留期到期的对象的记录,以及一个少数的其他记录,例如 txn、 protocol 和 changeMetadata。这个面向列的文件是查询有关表的 metadata 以及根据数据统计查找哪些对象可能包含与选择性查询相关的数据的理想格式。根据我们的经验,使用 Delta Lake checkpoint 查找要为查询读取的对象集几乎总是比使用 LIST 操作和读取对象存储上的 Parquet 文件 footer 更快。

任何客户端都可以尝试创建一个 checkpoint ,直到给定的日志记录 ID,如果成功,应该将它写为相应 ID 的 .parquet 文件。例如,000003.parquet 将代表直到并包括 000003.json 的记录的 checkpoint 。默认情况下,我们的客户端每 10 个事务写入一次 checkpoint 。

最后,访问 Delta Lake 表的客户端需要高效地找到最后一个 checkpoint (以及日志的尾部),而无需列出 _delta_log 目录中的所有对象。 如果 _delta_log/_last_checkpoint 文件比该文件中的当前 ID 新,则 checkpoint 编写者会将其新 checkpoint ID 写入该文件中。 请注意,由于云对象存储的最终一致性问题,_last_checkpoint 文件过时是可以的,因为客户端仍将在此文件中的 ID 之后搜索新的 checkpoint 。

3.2 Access Protocols

Delta Lake 的访问 protocol 旨在让客户端仅使用对象存储上的操作来实现可序列化的事务,尽管对象存储有最终的一致性保证。 使这成为可能的关键选择是日志记录对象,例如 000003.json,是客户端读取特定版本表所需的 “root” 数据结构。 给定这个对象的内容,客户端然后可以从对象存储中查询其他对象,如果由于最终的一致性延迟它们尚不可见,可能会等待,并读取表数据。 对于执行写入的事务,客户端需要一种方法来确保只有一个写入者可以创建下一条日志记录(例如,000003.json),然后可以使用它来实现乐观并发控制。

3.2.1 Reading from Tables

我们首先描述如何对 Delta 表运行只读事务。 这些事务将安全地读取表的某些版本。 只读事务有五个步骤:

  1. 读取表日志目录中的 _last_checkpoint 对象(如果存在)以获取最近的 checkpoint ID。
  2. 使用 LIST 操作,其开始键是最后一个 checkpoint ID(如果存在),否则为 0,在表的日志目录中查找任何更新的 .json 和 .parquet 文件。这提供了一个列表文件,可用于从最近的 checkpoint 开始重建表的状态。 (请注意,由于云对象存储的最终一致性,此 LIST 操作可能会返回一组不连续的对象,例如有 000004.json 和 000006.json 但没有 000005.json。尽管如此,客户端可以使用返回的最大 ID 作为要读取的目标表版本,并等待丢失的对象变为可见。)
  3. 使用上一步中确定的 checkpoint (如果存在)和后续日志记录来重建表的状态——即,具有添加记录但没有相应删除记录的数据对象集及其关联的数据统计信息。我们的格式旨在使此任务可以并行运行:例如,在我们的 Spark 连接器中,我们使用 Spark 作业读取 checkpoint Parquet 文件和日志对象。
  4. 使用统计信息识别与读取查询相关的数据对象文件集。
  5. 查询对象存储以读取相关数据对象,可能跨集群并行。请注意,由于云对象存储的最终一致性,某些工作节点可能无法查询查询计划器在日志中找到的对象;这些可以在很短的时间后简单地重试。

我们注意到该 protocol 旨在容忍每一步的最终一致性。 例如,如果客户端读取 _last_checkpoint 文件的旧版本,它仍然可以在后续 LIST 操作中发现更新的日志文件并重建表的最近快照。 _last_checkpoint 文件仅通过提供最近的 checkpoint ID 来帮助降低 LIST 操作的成本。

同样,客户端可以容忍在列出最近的记录(例如,日志记录 ID 中的间隙)或读取日志中引用的数据对象时的不一致,这些对象在对象存储中可能尚不可见。

3.2.2 Write Transactions

写入数据的事务通常最多分五个步骤进行,具体取决于事务中的操作:

  1. 使用读取 protocol 的第 1-2 步(即,从最后一个 checkpoint ID 向前看)确定最近的日志记录 ID,例如 r。然后事务将读取表版本 r 的数据(如果需要)并尝试写入日志记录 r + 1。
  2. 如果需要,使用与读取 protocol 相同的步骤读取表版本 r 的数据(即,组合先前的 checkpoint 和任何进一步的日志记录,然后读取其中引用的数据对象)。
  3. 将事务旨在添加到表中的任何新数据对象写入正确数据目录中的新文件中,使用 GUID 生成对象名称。这一步可以并行发生。最后,这些对象已准备好在新的日志记录中引用。
  4. 如果没有其他客户端写入此对象,则尝试将事务的日志记录写入 r + 1 .json 日志对象。这一步需要是原子的,我们很快就会讨论如何在各种对象存储中实现这一点。如果步骤失败,可以重试事务;根据查询的语义,客户端还可以重用它在第 3 步中写入的新数据对象,并尝试将它们添加到新日志记录中的表中。
  5. 可选地,为日志记录 r + 1 写入一个新的 .parquet checkpoint 。(实际上,我们的实现默认每 10 条记录执行一次。)然后,在写入完成后,更新 _last_checkpoint 文件以指向 checkpoint r + 1.
    请注意,第五步,即写入 checkpoint ,然后更新 _last_checkpoint 对象,只会影响性能,并且在此步骤中任何地方的客户端故障都不会破坏数据。例如,如果客户端写入 checkpoint 失败,或写入 checkpoint Parquet 对象但未更新 _last_checkpoint,则其他客户端仍然可以使用较早的 checkpoint 读取该表。如果第 4 步成功,事务将自动提交。

**Adding Log Records Atomically.**。在写入 protocol 中很明显,第 4 步,即创建 r + 1 .json 日志记录对象,需要是原子的:只有一个客户端应该成功创建具有该名称的对象。不幸的是,并不是所有的大型存储系统都有一个原子的 put-if-absent 操作,但是我们能够以不同的方式为不同的存储系统实现这个步骤:

  • Google Cloud Storage 和Azure Blob Store 支持原子put-if-absent 操作,因此我们使用它们。
  • 在分布式文件系统(如 HDFS)上,我们使用原子重命名将临时文件重命名为目标名称(例如 000004.json),如果它已经存在则失败。 Azure Data Lake Storage [18] 还提供了一个带有原子重命名的文件系统 API,所以我们在那里使用相同的方法
  • Amazon S3 没有原子“如果不存在则放置”或重命名操作。在 Databricks 服务部署中,我们使用单独的轻量级协调服务来确保只有一个客户端可以为每个日志 ID 添加一条记录。该服务仅用于日志写入(不是读取,也不是数据操作),因此其负载较低。在我们用于 Apache Spark 的开源 Delta Lake 连接器中,我们确保通过同一个 Spark 驱动程序(SparkContext 对象)的写入使用内存状态获得不同的日志记录 ID,这意味着用户仍然可以对 Delta 表进行并发操作在单个 Spark 集群中。我们还提供了一个 API 来插入自定义 LogStore 类,如果用户想要运行一个单独的、高度一致的存储,该类可以使用其他协调机制。

3.3 Available Isolation Levels

鉴于 Delta Lake 的并发控制 protocol ,所有执行写入的事务都是可序列化的,从而导致按日志记录 ID 递增顺序的串行调度。这遵循写入事务的提交 protocol ,其中只有一个事务可以使用每个记录 ID 写入记录。读取事务可以实现快照隔离或可序列化。我们在 3.2.1 节中描述的读取 protocol 只读取表的快照,因此利用该 protocol 的客户端将实现快照隔离,但希望运行可序列化读取(可能在其他可序列化事务之间)的客户端可以执行读取-write 事务执行虚拟写入以实现此目的。在实践中,Delta Lake 连接器实现还将他们为内存中的每个表访问过的最新日志记录 ID 缓存,因此即使客户端使用快照隔离进行读取,客户端也会“读取自己的写入”,并在读取时读取表版本的单调序列进行多次读取。

重要的是,Delta Lake 目前仅支持一张表内的事务。将来也可以扩展对象存储日志设计以管理同一日志中的多个表。

3.4 Transaction Rates

Delta Lake 的写入事务率受到写入新日志记录的 put-if-absent 操作延迟的限制,如第 3.2.2 节所述。与任何乐观并发控制 protocol 一样,写入事务的高速率将导致提交失败。在实践中,写入对象存储的延迟可能是几十到几百毫秒,将写入事务的速率限制为每秒几个事务。然而,我们发现这个速率几乎适用于所有当前的 Delta Lake 应用程序:即使是将流数据摄取到云存储中的应用程序通常也有一些高度并行的作业(例如 Spark Streaming 作业)进行写入,这些作业可以将许多新数据对象批处理在一起一笔 transaction 。如果将来需要更高的速率,我们相信协调对日志的访问的自定义 LogStore,类似于我们的 S3 提交服务,可以提供明显更快的提交时间(例如,通过在低延迟 DBMS 并将其异步写入对象存储)。当然,快照隔离级别的读取事务不会产生争用,因为它们只读取对象存储中的对象,因此可以同时运行任意数量的事务。

4. Higher-Level features in Delta

Delta Lake 的事务设计支持广泛的高级数据管理功能,类似于传统分析 DBMS 中的许多设施。 在本节中,我们将讨论一些最广泛使用的功能以及激发它们的客户用例或痛点。

4.1 Time Travel and Rollbacks

Data engineering pipelines 经常出错,尤其是在从外部系统摄取“脏”数据时,但在传统的数据湖设计中,很难撤消将对象添加到表中的更新。此外,一些工作负载,例如机器学习训练,需要忠实地再现旧版本的数据(例如,在相同数据上比较新旧训练算法)。这两个问题都给 Delta Lake 之前的 Databricks 用户带来了重大挑战,要求他们设计复杂的补救措施来解决 data pipeline 错误或复制数据集。

由于 Delta Lake 的数据对象和日志是不可变的,Delta Lake 可以直接查询数据的过去快照,就像在典型的 MVCC 实现中一样。客户端只需要根据旧的日志记录 ID 读取表状态。为了方便time travel ,Delta Lake 允许用户配置每个表的数据保留间隔,并支持 SQL AS OF 时间戳和 VERSION AS OF commit_id 语法来读取过去的快照。客户端还可以通过 Delta Lake 的 API 发现他们刚刚在操作中读取或写入的提交 ID。例如,我们在 MLflow 开源项目 [51] 中使用这个 API 来自动记录在 ML 训练工作负载期间读取的表版本。

用户发现time travel 对于修复 data pipeline 中的错误特别有用。例如,为了有效地撤消覆盖某些用户数据的更新,分析人员可以使用表的 SQL MERGE 语句针对其以前的版本,如下所示:

1
2
3
MERGE INTO mytable target
USING mytable TIMESTAMP AS OF <old_date> source ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *

我们还开发了一个 CLONE 命令,该命令创建一个表的写时复制新版本,从其现有快照之一开始。

4.2 Efficient UPSERT, DELETE and MERGE

企业中的许多分析数据集需要随着时间的推移进行修改。例如,为了遵守 GDPR [27] 等数据隐私法规,企业需要能够按需删除特定用户的数据。即使内部数据集与个人无关,由于上游数据收集中的错误或迟到的数据,旧记录可能需要更新。最后,计算聚合数据集(例如,业务分析师查询的表格摘要)的应用程序将需要随着时间的推移对其进行更新。

在传统的数据湖存储格式中,例如 S3 上的 Parquet 文件目录,很难在不停止并发读取器的情况下执行这些更新。即便如此,更新作业也必须小心执行,因为作业期间的失败将使表处于部分更新状态。使用 Delta Lake,所有这些操作都可以事务性地执行,通过在 Delta 日志中添加和删除新记录来替换任何更新的对象。 Delta Lake 支持标准的 SQL UPSERT、DELETE 和 MERGE 语法。

4.3 Streaming Ingest and Consumption

许多数据团队希望将流式管道部署到 ETL 或实时聚合数据,但传统的云数据湖很难用于此目的。 因此,这些团队部署了单独的流消息总线,例如 Apache Kafka [11] 或 Kinesis [2],这通常会复制数据并增加管理复杂性。
我们设计了 Delta Lake,这样表的日志可以帮助数据生产者和消费者将其视为消息队列,从而在许多场景中不需要单独的消息总线。 这种支持来自三个主要功能:

  • Write Compaction。组织为对象集合的简单数据湖可以轻松插入数据(只需写入新对象),但会在写入延迟和查询性能之间产生令人不快的折衷。如果写入者希望通过写入小对象快速将新记录添加到表中,由于较小的顺序读取和更多的 metadata 操作,读取器最终会变慢。相比之下,Delta Lake 允许用户运行后台进程,以事务方式压缩小数据对象,而不会影响读者。在第 3.1.2 节中描述的压缩文件的日志记录上将 dataChange 标志设置为 false 还允许流消费者在他们已经读取小对象的情况下完全忽略这些压缩操作。因此,流应用程序可以通过写入小对象来快速地相互传输数据,同时对旧数据的查询保持快速。
  • Exactly-OnceStreamingWrites。 Writer 可以使用 3.1.2 节中描述的日志记录中的 txnaction 类型来跟踪他们将哪些数据写入 Delta Lake 表并实现“ exactly-once ”写入。通常,旨在更新外部存储中数据的流处理系统需要某种机制来使其写入具有幂等性,以避免在发生故障后重复写入。这可以通过确保每条记录在覆盖的情况下具有唯一键来完成,或者更一般地说,通过在每次写入时自动更新“上次写入的”记录,然后可以仅用于写入较新的更改。 Delta Lake 通过允许应用程序更新每个事务的 (appId, version) 对来促进后一种模式。我们在结构化流 [14] 连接器中使用此功能来支持任何类型的流计算(追加、聚合、更新插入等)的一次性写入。
  • **Efficient Log Tailing.**。使用 Delta Lake 表作为消息队列所需的最后一个工具是消费者有效查找新写入的机制。幸运的是,日志的存储格式是一系列具有字典序递增 ID 的 .json 对象,使这变得容易:消费者可以简单地从它看到的最后一个日志记录 ID 开始运行对象存储 LIST 操作,以发现新的.日志记录中的 dataChange 标志允许流消费者跳过仅压缩或重新排列现有数据的日志记录,而只读取新的数据对象。通过记住它完成处理的最后一个记录 ID,流应用程序也很容易在 Delta Lake 表中的同一日志记录处停止和重新启动。

结合这三个特性,我们发现许多用户可以完全避免运行单独的消息总线系统,而使用低成本的云对象存储和 Delta 来实现延迟为秒级的流管道。

4.4 Data Layout Optimization

数据布局对分析系统中的查询性能有很大影响,特别是因为许多分析查询是高度选择性的。由于 Delta Lake 可以事务性地更新表示表的数据结构,因此可以在不影响并发操作的情况下支持多种布局优化。例如,后台进程可以压缩数据对象,更改这些对象内的记录顺序,甚至更新辅助数据结构,例如数据统计和索引,而不会影响其他客户端。我们利用这个属性来实现一些数据布局优化功能:

优化命令。用户可以在表上手动运行 OPTIMIZE 命令,在不影响正在进行的事务的情况下压缩小对象,并计算任何丢失的统计信息。默认情况下,此操作旨在使每个数据对象的大小为 1 GB,我们发现该值适合许多工作负载,但用户可以自定义此值。

**Z-Ordering by Multiple Attributes.**。许多数据集会根据多个属性接收高度选择性的查询。例如,我们使用的一个网络安全数据集将网络上发送的数据的信息存储在作为(sourceIp、destIp、time)元组中,每个维度都有高度选择性的查询。一个简单的目录分区方案,如 Apache Hive [45],可以帮助在写入数据后通过几个属性对数据进行分区,但是当使用多个属性时,分区数量变得非常大。 Delta Lake 支持按照给定的属性集以 Z-order [35] 重新组织表中的记录,以实现多个维度的高局部性。 Z 阶曲线是一种易于计算的空间填充曲线,可在所有指定维度上创建局部性。对于在实践中结合这些维度的查询工作负载,它可以显着提高性能,正如我们在第 6 节中展示的那样。用户可以在表上设置 Z-order 规范,然后运行 OPTIMIZE 来移动所需的数据子集(例如,只是最新的记录)沿着选定的属性到 Z 排序的对象中。用户也可以稍后更改顺序。

Z-ordering 与数据统计一起工作,让查询读取更少的数据。特别是,Z-ordering 将倾向于使每个数据对象在每个所选属性中包含一个小范围的可能值,以便在运行选择性查询时可以跳过更多的数据对象。

AUTO OPTIMIZE.。在 Databricks 的云服务上,用户可以在表上设置 AUTO OPTIMIZE 属性,让服务自动压缩新写入的数据对象。

更一般地说,Delta Lake 的设计还允许在更新表时维护索引或计算成本高的统计信息。我们正在探索该领域的几个新功能。

4.5 Caching

许多云用户为临时查询工作负载运行相对较长的集群,可能会根据他们的工作负载自动扩展和缩减集群。 在这些集群中,有机会通过在本地设备上缓存对象存储数据来加速对频繁访问的数据的查询。 例如,AWS i3 实例为每个内核提供 237 GB 的 NVMe SSD 存储,成本比相应的 m5(通用)实例高出约 50%。

在 Databricks,我们构建了一个功能来透明地在集群上缓存 Delta Lake 数据,通过缓存数据和日志对象来加速对这些表的数据和 metadata 查询。 缓存是安全的,因为 Delta Lake 表中的数据、日志和 checkpoint 对象是不可变的。 正如我们在第 6 节中展示的,从缓存中读取可以显着提高查询性能。

4.6 Audit Logging

Delta Lake 的事务日志也可以用于基于 commitInfo 记录的审计日志。 在 Databricks 上,我们为 Spark 集群提供了锁定执行模式,用户自定义函数不能直接访问云存储(或调用 Apache Spark 中的私有 API),这使我们能够确保只有运行时引擎才能写入 commitInfo 记录 ,并确保不可变的审计日志。 用户可以使用 DESCRIBE HISTORY 命令查看 Delta Lake 表的历史记录,如图 3 所示。 Delta Lake 的开源版本中也提供了提交信息日志记录。 审计日志是一种数据安全最佳实践,由于法规的原因,它对许多企业来说越来越具有强制性。

4.7 Schema Evolution and Enforcement

长期维护的数据集通常需要模式更新,但将这些数据集存储为“只是一堆对象”意味着旧对象(例如,旧 Parquet 文件)可能具有“错误”模式。 Delta Lake 可以事务性地执行架构更改,并在需要时随架构更改一起更新底层对象(例如,删除用户不再希望保留的列)。 在事务日志中保留架构更新的历史记录还可以允许使用较旧的 Parquet 对象,而无需针对某些架构更改(例如,添加列)重写它们。 同样重要的是,Delta 客户端确保新写入的数据遵循表的架构。 这些简单的检查发现了许多用户错误,这些错误将数据附加到错误的模式中,当单个作业在使用 Delta Lake 之前只是将 Parquet 文件写入同一目录时,这些错误很难追踪。

4.8 Connectors to Query and ETL Engines

Delta Lake 使用 Apache Spark 的数据源 API [16] 为 Spark SQL 和结构化流提供了成熟的连接器。此外,它目前提供与其他几个系统的只读集成:Apache Hive、Presto、AWS Athena、AWS Redshift 和 Snowflake,使这些系统的用户能够使用熟悉的工具查询 Delta 表并将它们与这些系统中的数据连接起来。最后,包括 Fivetran、Informatica、Qlik 和 Talend 在内的 ETL 和变更数据捕获 (CDC) 工具可以写入 Delta Lake [33, 26]。

一些查询引擎集成使用一种特殊机制,最初用于 Hive 中的符号链接,称为符号链接清单文件。符号链接清单文件是对象存储或文件系统中的文本文件,其中包含应该在目录中可见的路径列表。各种兼容 Hive 的系统可以在读取目录时查找此类清单文件,通常命名为 _symlink_format_manifest,然后将清单文件中指定的路径视为目录的内容。在 Delta Lake 的上下文中,清单文件允许我们通过简单地创建一个列出这些对象的清单文件,将构成表格的 Parquet 数据对象的静态快照公开给支持这种输入格式的读者。该文件可以为每个目录自动写入,这意味着从非分区 Delta 表读取的系统会看到该表的完全一致的只读快照,而从分区表读取的系统会看到每个分区的一致快照目录。要为表生成清单文件,用户需要运行一个简单的 SQL 命令。然后,他们可以将数据作为外部表加载到 Presto、Athena、Redshift 或 Snowflake 中。
在其他情况下,例如 Apache Hive,开源社区使用可用的插件 API 设计了 Delta Lake 连接器。

5. Delta Lake Use Cases

Delta Lake 目前在数以千计的 Databricks 客户以及开源社区中的其他组织中积极使用,每天处理 EB 级数据 [26]。 这些用例跨越各种数据源和应用程序。 Delta Lake 中存储的数据类型包括来自企业 OLTP 系统的变更数据捕获 (CDC) 日志、应用程序日志、时间序列数据、图表、用于报告的聚合表以及用于机器学习 (ML) 的图像或特征数据。 在这些数据上运行的应用程序包括 SQL 工作负载(最常见的应用程序类型)、商业智能、流媒体、数据科学、机器学习和图形分析。 Delta Lake 非常适合大多数使用 Parquet 或 ORC 等结构化存储格式的数据湖应用程序,以及许多传统的数据仓库工作负载。

在这些用例中,我们发现客户经常使用 Delta Lake 来简化他们的企业数据架构,方法是直接针对云对象存储运行更多工作负载并创建一个具有数据湖和 transaction 功能的“ lake house ”系统。例如,考虑一个典型的 data pipeline ,它从多个来源加载记录——例如,来自 OLTP 数据库的 CDC 日志和来自设施的传感器数据——然后将其传递给 ETL 步骤,使派生表可用于数据仓库和数据科学工作负载(如图 1)。传统的实现需要结合消息队列(如 Apache Kafka [11]),用于需要实时计算的任何结果,数据湖用于长期存储,以及数据仓库(如 Redshift [3]),用于需要实时计算的用户需要通过利用索引和快速节点附加存储设备(例如,SSD)来进行快速分析查询。这需要数据的多个副本并不断地将作业摄取到每个系统中。借助 Delta Lake,其中一些存储系统可以根据工作负载替换为对象存储表,利用 ACID 事务、流 I/O 和 SSD 缓存等功能重新获得每个专用系统中的一些性能优化。虽然 Delta Lake 显然不能取代我们列出的系统中的所有功能,但我们发现在很多情况下它至少可以取代其中的一部分。 Delta 的连接器(第 4.8 节)还允许从许多现有引擎查询它。
在本节的其余部分,我们详细介绍了几个常见的用例。

5.1 Data Engineering and ETL

许多组织正在将 ETL/ELT 和数据仓库工作负载迁移到云中以简化其管理,而其他组织正在使用来自其他来源的更大数据流来增强传统企业数据源(例如 OLTP 系统中的销售点事件) (例如,网络访问或库存跟踪系统)用于下游数据和机器学习应用程序。这些应用程序都需要一个可靠且易于维护的数据工程/ETL 过程来为它们提供数据。当组织将他们的工作负载部署到云时,我们发现他们中的许多人更喜欢使用云对象存储作为着陆区(数据湖)以最大限度地降低存储成本,然后计算派生数据集并将其加载到更优化的数据仓库中系统(也许带有节点附加存储)。 Delta Lake 的 ACID 事务、UPSERT/MERGE 支持和time travel 功能允许这些组织重用现有的 SQL 查询来直接在对象存储上执行他们的 ETL 过程,并利用熟悉的维护功能,如回滚、time travel 和审计日志。此外,使用单个存储系统 (Delta Lake) 而不是单独的数据湖和仓库,通过消除对单独摄取过程的需要,减少了使新数据可查询的延迟。最后,Delta Lake 对 SQL 和编程 API(通过 Apache Spark)的支持使得使用各种工具编写数据工程管道变得容易。
这种数据工程用例在我们遇到的几乎所有数据和机器学习工作负载中都很常见,涵盖金融服务、医疗保健和媒体等行业。在许多情况下,一旦他们的基本 ETL 管道完成,组织也会将他们的部分数据公开给新的工作负载,这些工作负载可以简单地运行在单独的集群上,使用 Delta Lake 访问相同的对象存储(例如,使用 PySpark 的数据科学工作负载)。其他组织使用 Spark 的结构化流(流 SQL)[14] 等工具将管道的一部分转换为流查询。这些其他工作负载可以轻松地在新的云虚拟机上运行并访问相同的表。

5.2 Data Warehousing and BI

传统数据仓库系统将 ETL/ELT 功能与高效工具相结合,以查询生成的表,以启用交互式查询工作负载,例如商业智能 (BI)。支持这些工作负载的关键技术特性通常是高效的存储格式(例如列格式)、数据访问优化(如集群和索引)、快速存储硬件以及适当优化的查询引擎 [43]。 Delta Lake 可以直接为云对象存储中的表支持所有这些功能,通过组合列格式、数据布局优化、最大-最小统计和 SSD 缓存,所有这些功能都可以通过其事务设计可靠地实现。因此,我们发现大多数 Delta Lake 用户还直接通过 SQL 或通过 Tableau 等 BI 软件针对他们的 Lakehouse 数据集运行临时查询和 BI 工作负载。这个用例非常普遍,以至于 Databricks 为 BI 工作负载开发了一个新的矢量化执行引擎 [21],并对其 Spark 运行时进行了优化。与 ETL 工作负载的情况一样,直接在 Delta Lake 上运行 BI 的优势之一是更容易为分析师提供新数据进行工作,因为数据不需要加载到单独的系统中。

5.3 Compliance and Reproducibility

传统的数据湖存储格式主要是为不可变数据而设计的,但是欧盟的 GDPR [27] 等新的数据隐私法规以及行业最佳实践要求组织有一种有效的方法来删除或更正有关个人用户的数据。 我们已经看到多个行业的组织将现有的云数据集转换为 Delta Lake,以使用其高效的 UPSERT、MERGE 和 DELETE 功能。 用户还利用审计日志功能(第 4.6 节)进行数据治理。
Delta Lake 的time travel 支持对于可重复的数据科学和机器学习也很有用。 我们将 Delta Lake 与 MLflow [51](Databricks 开发的开源模型管理平台)集成,以自动记录使用哪个版本的数据集来训练 ML 模型并让开发人员重新加载它。

5.4 Specialized Use Cases

5.4.1 Computer System Event Data

我们见过的最大的单一用例之一是将 Delta Lake 部署为一家大型技术公司的安全信息和事件管理 (SIEM) 平台。该组织将整个公司范围内的各种计算机系统事件(例如网络上的 TCP 和 UDP 流、身份验证请求、SSH 登录等)记录到一组集中的 Delta Lake 表中,这些表可以跨越 PB 级。然后针对这些表运行多个编程 ETL、SQL、图形分析和机器学习作业,以搜索指示入侵的已知模式(例如,来自用户的可疑登录事件,或导出大量数据的一组服务器)。其中许多是流式作业,以最大限度地减少检测问题的时间。此外,超过 100 名分析师直接查询源和派生的 Delta Lake 表,以调查可疑警报或设计新的自动化监控作业。
这个信息安全用例很有趣,因为它很容易自动收集大量数据(在此部署中每天数百 TB),因为数据必须保留很长时间,以便对新发现的入侵进行取证分析(有时几个月后),并且因为数据需要沿多个维度进行查询。例如,如果分析人员发现某个特定服务器曾经被入侵,她可能希望通过源 IP 地址(以查看攻击者从那里访问的其他服务器)、目标 IP 地址(以查看如何攻击者登录到原始服务器)、时间和任意数量的其他维度(例如,该攻击者获得的员工访问令牌)。为这些多 PB 数据集维护重量级索引结构将非常昂贵,因此该组织使用 Delta Lake 的 ZORDER BY 功能来重新排列 Parquet 对象内的记录,以提供跨多个维度的聚类。因为沿着这些维度的取证查询是高度选择性的(例如,从数百万中寻找一个 IP 地址),Z-ordering 与 Delta Lake 最小/最大基于统计的跳过很好地结合在一起,以显着减少每个查询具有的对象数量阅读。尽管数百名开发人员在 data pipeline 上进行协作,但 Delta Lake 的 AUTO OPTIMIZE 功能、time travel 和 ACID 事务也在保持这些数据集的正确性和快速访问方面发挥了重要作用。

5.4.2 Bioinformatics

生物信息学是我们看到 Delta Lake 广泛用于管理机器生成数据的另一个领域。许多数据源,包括 DNA 测序、RNA 测序、电子病历和医疗设备的时间序列,使生物医学公司能够收集有关患者和疾病的详细信息。这些数据源通常与公共数据集相结合,例如英国生物银行 [44],其中包含 500,000 个人的测序信息和医疗记录。
尽管传统的生物信息学工具使用了自定义数据格式,例如 SAM、BAM 和 VCF [34, 24],但许多组织现在正在将这些数据存储在数据湖格式中,例如 Parquet。大数据基因组学项目 [37] 开创了这种方法。 Delta Lake 通过支持快速多维查询(通过 Z 排序)、ACID 事务以及高效的 UPSERT 和 MERGE,进一步增强了生物信息学工作负载。在某些情况下,这些功能比以前的 Parquet 实现提高了 100 倍以上。 2019 年,Databricks 和 Regeneron 发布了 Glow [28],这是一个使用 Delta 进行存储的基因组数据开源工具包。

5.4.3 Media Datasets for Machine Learning

我们看到的一个更令人惊讶的应用程序是使用 Delta Lake 来管理多媒体数据集,例如上传到需要用于机器学习的网站的一组图像。 尽管图像和其他媒体文件已经以高效的二进制格式编码,但将这些数据集作为云对象存储中数百万个对象的集合进行管理是具有挑战性的,因为每个对象的大小只有几千字节。 对象存储 LIST 操作可能需要几分钟才能运行,而且很难并行读取足够多的对象来提供在 GPU 上运行的机器学习推理作业。 我们已经看到多个组织将这些媒体文件作为 BINARY 记录存储在 Delta 表中,并利用 Delta 进行更快的推理查询、流处理和 ACID 事务。 例如,领先的电子商务和旅游公司正在使用这种方法来管理数百万用户上传的图像。

6. PERFORMANCE EXPERIMENTS

7. DISCUSSION AND LIMITATIONS

我们在 Delta Lake 的经验表明,ACID 事务可以通过云对象存储实现,用于许多企业数据处理工作负载,并且它们可以支持大规模流、批处理和交互式工作负载。 Delta Lake 的设计特别有吸引力,因为它不需要任何其他重量级系统来调解对云存储的访问,这使得部署和从支持 Parquet 的各种查询引擎直接访问变得微不足道。 Delta Lake 对 ACID 的支持可以实现其他强大的性能和管理功能。
尽管如此,Delta Lake 的设计和当前的实现有一些限制,这些限制是未来工作的有趣途径。首先,Delta Lake 目前只提供单表内可序列化的事务,因为每张表都有自己的事务日志。跨多个表共享事务日志将消除此限制,但可能会增加通过乐观并发追加日志记录的争用。对于非常大的事务量,协调器还可以调解对日志的写入访问,而无需成为数据对象的读写路径的一部分。其次,对于流式工作负载,Delta Lake 受到底层云对象存储延迟的限制。例如,使用对象存储操作很难实现毫秒级的流延迟。但是,我们发现,对于用户希望运行并行作业的大型企业工作负载,使用 Delta Lake 表的几秒级延迟是可以接受的。第三,Delta Lake 目前不支持二级索引(每个数据对象的 min-max 统计除外),但我们已经开始对基于 Bloom 过滤器的索引进行原型设计。 Delta 的 ACID 事务允许我们通过更改基本数据以事务方式更新此类索引。

8. Related Work

多个研究和行业项目试图使数据管理系统适应云环境。例如,布兰特纳等人。探索在 S3 上构建 OLTP 数据库系统 [20];附加一致性 [19] 在最终一致的键值存储之上实现因果一致性; AWS Aurora [49] 是一个商业 OLTP DBMS,具有单独扩展的计算和存储层;和 Google BigQuery [29]、AWS Redshift Spectrum [39] 和 Snowflake [23] 是 OLAP DBMS,它们可以独立于存储扩展计算集群,并且可以从云对象存储中读取数据。其他工作,例如关系云项目 [22],考虑如何自动调整 DBMS 引擎以适应弹性、多租户工作负载。
Delta Lake 分享了这些作品的愿景,即利用广泛可用的云基础设施,但针对一组不同的要求。具体来说,大多数以前的 DBMS-on-cloud-storage 系统都需要 DBMS 来调解客户端和存储之间的交互(例如,通过让客户端连接到 Aurora 或 Redshift 前端服务器)。这会产生额外的操作负担(前端节点必须始终运行),以及在通过前端节点流式传输大量数据时可能出现的可扩展性、可用性或成本问题。相比之下,我们设计了 Delta Lake 以便许多独立运行的客户端可以通过云对象存储操作直接协调对表的访问,在大多数情况下不需要单独运行的服务(除了 S3 上日志记录 ID 的轻量级协调器,如上所述在第 3.2.2 节)。这种设计使 Delta Lake 对用户来说操作简单,并确保以与底层对象存储相同的成本进行高度可扩展的读取和写入。此外,该系统与底层云对象存储一样具有高可用性:无需加固或重新启动其他组件即可进行灾难恢复。当然,由于 Delta Lake 目标工作负载的性质,这种设计在这里是可行的:每秒写入事务相对较少但事务大小较大的 OLAP 工作负载,这与我们的乐观并发方法配合得很好。
与 Delta Lake 的设计和目标最接近的系统是 Apache Hudi [8] 和 Apache Iceberg [10],它们都定义了数据格式和访问 protocol ,以在云对象存储上实现事务性操作。这些系统是与 Delta Lake 同时开发的,并不提供其所有功能。例如,这两个系统都没有提供数据布局优化,例如 Delta Lake 的 ZORDER BY(第 4.4 节),应用程序可以使用流输入源有效扫描添加到表中的新记录(第 4.3 节),或支持本地缓存,如Databricks 服务(第 4.5 节)。此外,Apache Hudi 一次仅支持一个写入器(但支持多个读取器)[9]。这两个项目都提供连接到开源引擎(包括 Spark 和 Presto)的连接器,但缺乏连接到商业数据仓库(如 Redshift 和 Snowflake,我们使用清单文件(第 4.8 节)实现)和商业 ETL 工具的连接器。
Apache Hive ACID [32] 也通过对象存储或分布式文件系统实现事务,但它依赖于 Hive 元存储(在 OLTP DBMS 中运行)来跟踪每个表的状态。这会在具有数百万个分区的表中造成瓶颈,并增加用户的操作负担。 Hive ACID 也缺乏对time travel 的支持(第 4.1 节)。 HDFS 上的低延迟存储,例如 HBase [7] 和 Kudu [6],也可以在写入 HDFS 之前组合少量写入,但需要运行单独的分布式系统。
将高性能事务处理和分析处理结合起来还有很长的工作要做,例如 C-Store [43] 和 HTAP 系统。这些系统通常有一个单独的、针对 OLTP 优化的可写存储和一个针对分析优化的长期存储。在我们的工作中,我们试图通过设计直接针对对象存储的并发 protocol 来支持适度的事务率,而无需运行单独的高可用写入存储。

9. Conclusion

我们已经介绍了 Delta Lake,这是一个基于云对象存储的 ACID 表存储层,它为低成本云存储中的数据提供了广泛的类似于 DBMS 的性能和管理功能。 Delta Lake 仅作为一种存储格式和一组客户端访问 protocol 实现,使其操作简单且高度可用,并为客户端提供对对象存储的直接、高带宽访问。 Delta Lake 被数以千计的组织用于每天处理 EB 级的数据,通常会取代涉及多个数据管理系统的更复杂的架构。 它在 https://delta.io 上的 Apache 2 许可下是开源的。

references


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!