表格型数据集#

pyarrow.dataset 模块提供了高效处理表格型数据集的功能,这些数据集可能比内存大,并包含多个文件。这包括:

  • 支持不同来源和文件格式以及不同文件系统(本地、云端)的统一接口。

  • 源的发现(爬取目录,处理基于目录的分区数据集,基本模式规范化等)

  • 优化读取,具有谓词下推(过滤行)、投影(选择和派生列),以及可选的并行读取功能。

目前支持的文件格式包括 Parquet、Feather/Arrow IPC、CSV 和 ORC(请注意,目前只能读取 ORC 数据集,尚不支持写入)。未来的目标是扩展对其他文件格式和数据源(例如数据库连接)的支持。

对于熟悉现有的 ParquetDataset 用于读取 Parquet 数据集的人来说:pyarrow.dataset 的目标相似,但不特定于 Parquet 格式,也不局限于 Python:相同的数据集 API 也在 R 绑定或 Arrow 中暴露。此外,pyarrow.dataset 还具有改进的性能和新功能(例如,在文件内进行过滤,而不仅仅是基于分区键)。

读取数据集#

对于下面的例子,让我们创建一个包含两个 Parquet 文件的目录的小数据集:

import tempfile
import pathlib
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np

base = pathlib.Path(tempfile.mkdtemp(prefix="pyarrow-"))
(base / "parquet_dataset").mkdir(exist_ok=True)
# creating an Arrow Table
table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})
# writing it into two parquet files
pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")
pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")

数据集发现#

可以使用 dataset() 函数创建 Dataset 对象。我们可以将包含数据文件的目录路径传递给它:

import pyarrow.dataset as ds

dataset = ds.dataset(base / "parquet_dataset", format="parquet")

dataset
<pyarrow._dataset.FileSystemDataset at 0x7f0e29c6e4a0>

除了搜索基本目录,dataset() 还接受单个文件的路径或文件路径列表。

创建 Dataset 对象并不开始读取数据本身。如果需要,它只会爬取目录以找到所有文件:

dataset.files
['/tmp/pyarrow-c4is7cm7/parquet_dataset/data1.parquet',
 '/tmp/pyarrow-c4is7cm7/parquet_dataset/data2.parquet']

…并推断数据集的模式(默认从第一个文件中):

print(dataset.schema.to_string(show_field_metadata=False))
a: int64
b: double
c: int64

使用 to_table() 方法,我们可以将数据集(或其一部分)读取到 pyarrow Table 中(请注意,根据您的数据集大小,这可能需要大量内存,请参阅下面的过滤/迭代加载):

dataset.to_table()
pyarrow.Table
a: int64
b: double
c: int64
----
a: [[0,1,2,3,4],[5,6,7,8,9]]
b: [[-0.36660657590355145,-1.2677056276752197,0.834977532736649,1.089111028681581,-0.2463050392006342],[-0.8100217218706993,0.4037321895338094,0.5875142419792823,-0.8535172116279354,-0.0036295735816949596]]
c: [[1,2,1,2,1],[2,1,2,1,2]]
# converting to pandas to see the contents of the scanned table
dataset.to_table().to_pandas()
a b c
0 0 -0.366607 1
1 1 -1.267706 2
2 2 0.834978 1
3 3 1.089111 2
4 4 -0.246305 1
5 5 -0.810022 2
6 6 0.403732 1
7 7 0.587514 2
8 8 -0.853517 1
9 9 -0.003630 2

读取不同的文件格式#

上面的示例使用 Parquet 文件作为数据集源,但 Dataset API 提供了跨多种文件格式和文件系统的一致接口。目前支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;未来计划支持更多格式。

如果我们将表格保存为 Feather 文件而不是 Parquet 文件:

import pyarrow.feather as feather

feather.write_feather(table, base / "data.feather")

…然后我们可以使用相同的函数读取 Feather 文件,但需要指定格式为 "feather"

dataset = ds.dataset(base / "data.feather", format="feather")
dataset.to_table().to_pandas().head()
a b c
0 0 -0.366607 1
1 1 -1.267706 2
2 2 0.834978 1
3 3 1.089111 2
4 4 -0.246305 1

自定义文件格式#

格式名称为字符串,例如:

ds.dataset(..., format="parquet")

是默认构造的 ParquetFileFormat 的简写:

ds.dataset(..., format=ds.ParquetFileFormat())

FileFormat 对象可以使用关键字进行自定义。例如:

parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
ds.dataset(..., format=parquet_format)

将在扫描时配置列 'a' 为字典编码。

过滤数据#

当只需要子集时,为避免读取所有数据,可以使用 columnsfilter 关键字。

columns 关键字可用于仅读取指定的列:

dataset = ds.dataset(base / "parquet_dataset", format="parquet")

dataset.to_table(columns=['a', 'b']).to_pandas()
a b
0 0 -0.366607
1 1 -1.267706
2 2 0.834978
3 3 1.089111
4 4 -0.246305
5 5 -0.810022
6 6 0.403732
7 7 0.587514
8 8 -0.853517
9 9 -0.003630

使用 filter 关键字,不匹配过滤谓词的行将不会包含在返回的表格中。该关键字期望一个引用至少一列的布尔 Expression

dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
a b c
0 7 0.587514 2
1 8 -0.853517 1
2 9 -0.003630 2
dataset.to_table(filter=ds.field('c') == 2).to_pandas()
a b c
0 1 -1.267706 2
1 3 1.089111 2
2 5 -0.810022 2
3 7 0.587514 2
4 9 -0.003630 2

构建这些 Expression 对象最简单的方法是使用 field() 辅助函数。任何列 - 不仅仅是分区列 - 都可以使用 field() 函数(它创建 FieldExpression)来引用。提供了运算符重载来组合过滤器,包括比较(等于、大于/小于等)、集合成员测试和布尔组合(&|~):

ds.field('a') != 3
<pyarrow.compute.Expression (a != 3)>
ds.field('a').isin([1, 2, 3])
<pyarrow.compute.Expression is_in(a, {value_set=int64:[
  1,
  2,
  3
], null_matching_behavior=MATCH})>
(ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
<pyarrow.compute.Expression ((a > b) and (b > 1))>

请注意, Expression 对象不能通过 Python 逻辑运算符 andornot 进行组合。

投影列#

通过传递列名列表,可以使用 columns 关键字来读取数据集的列子集。该关键字还可以与表达式结合使用,进行更复杂的 投影 (projection)。

在这种情况下,我们传递给它一个字典,键是结果列的名称,值是用于构造列值的表达式:

projection = {
    "a_renamed": ds.field("a"),
    "b_as_float32": ds.field("b").cast("float32"),
    "c_1": ds.field("c") == 1,
}
dataset.to_table(columns=projection).to_pandas().head()
a_renamed b_as_float32 c_1
0 0 -0.366607 True
1 1 -1.267706 False
2 2 0.834978 True
3 3 1.089111 False
4 4 -0.246305 True

字典还决定了列选择(只有字典中的键会作为结果表格中的列出现)。如果您想在现有列之外包含一个派生列,您可以根据数据集模式构建字典:

projection = {col: ds.field(col) for col in dataset.schema.names}

projection.update({"b_large": ds.field("b") > 1})

dataset.to_table(columns=projection).to_pandas().head()
a b c b_large
0 0 -0.366607 1 False
1 1 -1.267706 2 False
2 2 0.834978 1 False
3 3 1.089111 2 True
4 4 -0.246305 1 False

读取分区数据#

上面展示了一个由平面目录和文件组成的数据集。然而,数据集可以利用嵌套的目录结构来定义一个分区数据集,其中子目录名称包含了存储在该目录中的数据子集的信息。

例如,按年和月分区的数据集在磁盘上可能看起来像这样:

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

上面的分区方案使用的是 “/key=value/” 目录名称,如 Apache Hive 中所见到的。

让我们创建一个小型的分区数据集。write_to_dataset() 函数可以写入这样的类似 Hive 的分区数据集。

table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
                  'part': ['a'] * 5 + ['b'] * 5})


pq.write_to_dataset(table, "parquet_dataset_partitioned",
                    partition_cols=['part'])

上面创建了一个包含两个子目录(“part=a”和“part=b”)的目录,这些目录中写入的 Parquet 文件不再包含“part”列。

使用 dataset() 读取这个数据集时,我们现在指定数据集应该使用类似 Hive 的分区方案,使用 partitioning 关键字:

dataset = ds.dataset("parquet_dataset_partitioned", format="parquet",
                     partitioning="hive")


dataset.files
['parquet_dataset_partitioned/part=a/dc7b7061d7ee42a083f62dbf107519c8-0.parquet',
 'parquet_dataset_partitioned/part=b/dc7b7061d7ee42a083f62dbf107519c8-0.parquet']

尽管分区字段不包含在实际的 Parquet 文件中,但在扫描此数据集时,它们将被添加回结果表格:

dataset.to_table().to_pandas().head(3)
a b c part
0 0 -0.241296 1 a
1 1 0.690334 2 a
2 2 -0.959934 1 a

我们现在可以在分区键上进行过滤,如果不匹配过滤器,它们将完全避免加载文件:

dataset.to_table(filter=ds.field("part") == "b").to_pandas()
a b c part
0 5 0.705458 2 b
1 6 -1.128177 1 b
2 7 -0.308782 2 b
3 8 -0.370452 1 b
4 9 0.606135 2 b

不同的分区方案#

上面的例子使用了类似 Hive 的目录方案,如“/year=2009/month=11/day=15”。我们通过传递 partitioning="hive" 关键字来指定这一点。在这种情况下,分区键的类型是从文件路径中推断出来的。

也可以使用 partitioning() 函数显式定义分区键的模式。例如:

part = ds.partitioning(
    pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
    flavor="hive"
)
dataset = ds.dataset(..., partitioning=part)

“目录分区”也受支持,其中文件路径中的段表示分区键的值,不包括名称(字段名在段的索引中是隐式的)。例如,给定字段名“year”、“month”和“day”,一个路径可能是“/2019/11/15”。

由于名称不包含在文件路径中,因此在构造目录分区时必须指定这些名称:

part = ds.partitioning(field_names=["year", "month", "day"])

目录分区也支持提供完整的模式,而不是从文件路径推断类型。

手动指定数据集#

dataset() 函数允许轻松创建数据集,查看目录,并爬取所有子目录中的文件和分区信息。然而,有时不需要发现功能,数据集的文件和分区已经已知(例如,当这些信息存储在元数据中时)。在这种情况下,可以明确创建一个数据集,无需任何自动发现或推断。

对于这里的示例,我们将使用一个数据集,其中文件名包含额外的分区信息:

table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})

(base / "parquet_dataset_manual").mkdir(exist_ok=True)

pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")

pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")

要从一个文件列表创建数据集,我们需要手动指定路径、模式、格式、文件系统和分区表达式:

from pyarrow import fs

schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])

dataset = ds.FileSystemDataset.from_paths(
    ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
    filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
    partitions=[ds.field('year') == 2018, ds.field('year') == 2019])

由于我们为文件指定了“分区表达式”,这些信息在读取数据时会具体化为列,并可用于过滤:

dataset.to_table().to_pandas()
year col1 col2
0 2018 0 0.563188
1 2018 1 0.985952
2 2018 2 -0.610352
3 2019 0 0.563188
4 2019 1 0.985952
5 2019 2 -0.610352
dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
year col1 col2
0 2019 0 0.563188
1 2019 1 0.985952
2 2019 2 -0.610352

手动列出文件的另一个好处是,文件的顺序控制了数据的顺序。当执行有序读取(或读取到表格)时,返回的行将与给定的文件顺序匹配。这只适用于使用文件列表构造数据集的情况。如果通过扫描目录发现文件,则不保证有特定的顺序。

迭代(内核外或流式)读取#

前面的例子已经展示了如何使用 to_table() 将数据读入表格。如果数据集较小或只需要读取少量数据,这是有用的。数据集 API 包含了额外的方法,可以以流式方式读取和处理大量数据。

最简单的方法是使用 to_batches() 方法。这个方法返回一个记录批次的迭代器。例如,我们可以使用这个方法来计算一列的平均值,而无需将整个列加载到内存中:

import pyarrow.compute as pc

col2_sum = 0

count = 0

for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
    col2_sum += pc.sum(batch.column("col2")).as_py()
    count += batch.num_rows


mean_a = col2_sum/count

自定义批处理大小#

数据集的迭代读取通常称为数据集的“扫描”,pyarrow 使用一个名为 Scanner 的对象来完成这个操作。通过数据集的 to_table()to_batches() 方法,会自动为您创建 Scanner。您传递给这些方法的任何参数都将传递给 Scanner 构造函数。

其中一个参数是 batch_size。这控制了扫描器返回的批次的最大大小。如果数据集由小文件组成,或者这些文件本身由小行组组成,批次的大小可能仍然小于 batch_size。例如,每个行组有 \(10\,000\) 行的 parquet 文件将产生最多 \(10\,000\) 行的批次,除非将 batch_size 设置为较小的值。

默认的批处理大小是一百万行,这通常是一个不错的默认值,但如果您正在读取大量列,您可能想要自定义它。

关于事务和 ACID 保证的说明#

数据集 API 不提供任何事务(transactions)支持或 ACID 保证。这影响读写操作。并发读取是可以的。并发写入或与读取同时进行的写入可能会出现意外行为。可以采用各种方法避免对同一文件进行操作,例如为每个写入器使用唯一的基础名称模板、为新文件使用临时目录,或者存储文件列表而不是依赖于目录发现。

在写入过程中意外终止进程可能会使系统处于不一致的状态。写入调用通常在要写入的字节完全传递给操作系统页面缓存后立即返回。尽管写入操作已经完成,如果在写入调用后立即发生突然断电,文件的部分内容可能会丢失。

大多数文件格式都有魔术数字,这些数字会在最后被写入。这意味着可以安全地检测并丢弃部分文件写入。CSV 文件格式没有这样的概念,部分写入的 CSV 文件可能会被检测为有效。

写入数据集#

数据集 API 还简化了使用 write_dataset() 将数据写入数据集的过程。当你想要对数据进行分区或需要写入大量数据时,这可能会很有用。基本的数据集写入类似于写入表格,不同之处在于你需要指定一个目录而不是文件名。

table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})

ds.write_dataset(table, "sample_dataset", format="parquet")

上面的例子将在我们 sample_dataset 目录中创建一个名为 part-0.parquet 的单个文件。

警告

如果你再次运行这个例子,它将替换现有的 part-0.parquet 文件。要将文件追加到现有数据集中,需要为每次调用 ds.write_dataset 指定一个新的 basename_template,以避免覆盖。

写入分区数据#

可以使用分区对象来指定输出数据应该如何分区。这使用了我们读取数据集时使用的同类型的分区对象。要将上述数据写入分区目录,我们只需指定我们希望数据集如何分区。例如:

part = ds.partitioning(
    pa.schema([("c", pa.int16())]), flavor="hive"
)

ds.write_dataset(table, "partitioned_dataset", format="parquet", partitioning=part)

这将创建两个文件。我们的一半数据将在 dataset_root/c=1 目录中,另一半将在 dataset_root/c=2 目录中。

分区性能考虑#

分区数据集有两个方面影响性能:它增加了文件数量,并且围绕文件创建了目录结构。这两者都有利有弊。根据你的配置和数据集的大小,成本可能会超过收益。

因为分区将数据集分割成多个文件,分区的数据集可以并行读写。然而,每个额外的文件都会在文件系统交互中增加一点处理开销。它还会增加整体数据集大小,因为每个文件都包含一些共享的元数据。例如,每个 parquet 文件包含模式和组级统计信息。分区的数量是文件数量的下限。如果你按日期对一个年度的数据进行分区,你至少会有 \(365\) 个文件。如果你进一步按另一个维度(有 \(1000\) 个唯一值)进行分区,你最多会有 \(365000\) 个文件。这种细粒度的分区通常会导致主要由元数据组成的小文件。

分区数据集创建嵌套文件夹结构,这些允许我们修剪加载扫描的文件。然而,这增加了在数据集中发现文件的开销,因为我们需要递归地“列出目录”来找到数据文件。过于细致的分区在这里可能会引起问题:按日期对一年的数据进行分区将需要 \(365\) 次列表调用来找到所有文件;添加另一个基数为 \(1000\) 的列将使这一数字变为 \(365365\) 次调用。

最优的分区布局将取决于你的数据、访问模式以及哪些系统将读取数据。大多数系统,包括 Arrow,应该能适应一系列文件大小和分区布局,但有些极端情况你应该避免。这些指南可以帮助避免一些已知的最坏情况:

  • 避免使用小于 20MB 和大于 2GB 的文件。

  • 避免使用具有超过 \(10\,000\) 个不同分区的分区布局。

配置写入期间打开的文件#

当将数据写入磁盘时,有几个参数对于优化写入可能很重要,例如每个文件的行数以及在写入期间允许的最大打开文件数。

使用 write_dataset()max_open_files 参数设置最大打开文件数。

如果将 max_open_files 设置为大于 0 的值,则会限制可以保持打开状态的最大文件数。这只适用于写入分区数据集,其中根据分区值将行分派到适当的文件。如果尝试打开太多文件,则会关闭最近最少使用的文件。如果此设置过低,您可能会将数据分散到许多小文件中。

如果你的进程同时使用其他文件处理器,无论是使用数据集扫描器还是其他方式,你可能会遇到系统文件处理器限制。例如,如果你正在扫描包含 \(300\) 个文件的数据集,并且写入 \(900\) 个文件,总共 \(1200\) 个文件可能超过了系统限制。(在 Linux 上,这可能是一个“打开的文件太多”的错误。)你可以减少 max_open_files 设置或者增加系统上的文件处理器限制。默认值是 \(900\),这允许扫描器在达到 Linux 默认限制 \(1024\) 之前打开一些数量的文件。

write_dataset() 中另一个重要的配置是 max_rows_per_file

使用 write_dataset()max_rows_per_files 参数设置每个文件中写入的最大行数。

如果将 max_rows_per_file 设置为大于 0 的值,则这将限制任何单个文件中放置的行数。否则,将没有限制,除非需要关闭文件以遵守 max_open_files,否则将在每个输出目录中创建一个文件。此设置是控制文件大小的主要方式。对于写入大量数据的工作量,如果没有行数上限,文件可能会变得非常大,导致下游阅读器出现内存不足错误。行数和文件大小之间的关系取决于数据集模式以及数据压缩(如果有)的程度。

配置写入期间每个组的行数#

可以配置每组写入磁盘的数据量。这个配置包括一个下限和一个上限。使用 write_dataset()min_rows_per_group 参数定义形成行组所需的最小行数。

备注

如果将 min_rows_per_group 设置为大于 0 的值,则这将导致数据集写入器批处理传入的数据,并且仅在累积了足够的行时才将行组写入磁盘。如果其他选项(如 max_open_filesmax_rows_per_file)强制使用较小的行组大小,则最终的行组大小可能小于这个值。

每个组允许的最大行数由 write_dataset()max_rows_per_group 参数定义。

如果将 max_rows_per_group 设置为大于 0 的值,则数据集写入器可能会将大型传入批次分割成多个行组。如果设置了这个值,那么也应该设置 min_rows_per_group,否则你可能会得到非常小的行组(例如,如果传入的行组大小仅略大于这个值)。

行组是 Parquet 和 IPC/Feather 格式的内置概念,但不影响 JSON 或 CSV。在 Arrow 中回读 Parquet 和 IPC 格式时,行组边界成为记录批次边界,决定了下游阅读器的默认批次大小。此外,Parquet 文件中的行组具有列统计信息,可以帮助阅读器跳过不相关的数据,但可能会增加文件的大小。举一个极端的例子,如果在 Parquet 中设置 max_rows_per_group=1,他们将得到大文件,因为大多数文件将是行组统计信息。

写入大量数据#

上述示例从表中写入数据。如果你正在写入大量数据,可能无法将所有数据加载到单个内存表中。幸运的是, write_dataset() 方法也接受记录批次的可迭代对象。这使得重新分区大型数据集变得非常简单,而无需将整个数据集加载到内存中:

old_part = ds.partitioning(
    pa.schema([("c", pa.int16())]), flavor="hive"
)

new_part = ds.partitioning(
    pa.schema([("c", pa.int16())]), flavor=None
)

input_dataset = ds.dataset("partitioned_dataset", partitioning=old_part)

扫描器可以充当记录批次的迭代器,但您也可以从网络(例如通过 flight)接收数据,来自您自己的扫描,或来自任何其他产生记录批次的方法。此外,您可以直接将数据集传入 write_dataset(),但如果您想要自定义扫描器(例如过滤输入数据集或设置最大批次大小),这种方法非常有用。

scanner = input_dataset.scanner()

ds.write_dataset(scanner, "repartitioned_dataset", format="parquet", partitioning=new_part)

在上述示例运行之后,我们的数据将位于 dataset_root/1dataset_root/2 目录中。在这个简单的例子中,我们没有改变数据的结构(只是目录命名模式),但您也可以使用这种机制来更改用于分区数据集的列。当您预期以特定方式查询数据时,这一点非常有用,您可以利用分区来减少需要读取的数据量。

自定义和检查写入的文件#

默认情况下,数据集 API 将创建名为 “part-i.format”的文件,其中“i”是在写入过程中生成的整数,“format”是在 write_dataset() 调用中指定的文件格式。对于简单的数据集,可能可以知道将创建哪些文件,但对于较大或分区的数据集则不容易。file_visitor 关键字可用于提供一个访问者,当每个文件创建时都会调用该访问者:

def file_visitor(written_file):
    print(f"path={written_file.path}")
    print(f"size={written_file.size} bytes")
    print(f"metadata={written_file.metadata}")
ds.write_dataset(table, "dataset_visited", format="parquet", partitioning=part,
                 file_visitor=file_visitor)
path=dataset_visited/c=1/part-0.parquet
size=979 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7f0e29c62a70>
  created_by: parquet-cpp-arrow version 15.0.1
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0
path=dataset_visited/c=2/part-0.parquet
size=981 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7f0e29c62a70>
  created_by: parquet-cpp-arrow version 15.0.1
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0

这将允许您收集属于数据集的文件名并将它们存储在其他地方,这在您下次需要读取数据时希望避免扫描目录时非常有用。它还可以用来生成其他工具(如 Dask 或 Spark)使用的 _metadata 索引文件,以创建数据集的索引。

在写入期间配置特定格式的参数#

除了所有格式共享的通用选项之外,还有特定于某种格式的格式特定选项。例如,在写入 Parquet 文件时允许截断的时间戳:

parquet_format = ds.ParquetFileFormat()

write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)

ds.write_dataset(table, "sample_dataset2", format="parquet", partitioning=part,
                 file_options=write_options)