pyarrow 快速上手#

Arrow 通过数组(pyarrow.Array)管理数据,这些数组可以组合成表格(pyarrow.Table),以表示表格数据中的列。

Arrow 还支持各种格式,以便将表格数据从磁盘和网络中读取和写入。最常用的格式包括 Parquet(读取和写入 Apache Parquet 格式)和 IPC 格式(流式传输、序列化和 IPC)。

创建数组和表格#

Arrow 中的数组是统一类型的数据集合。这使得 Arrow 能够使用最佳性能的实现来存储数据并对其进行计算。因此,每个数组都包含数据和类型。

import pyarrow as pa

days = pa.array([1, 12, 17, 23, 28], type=pa.int8())
days
<pyarrow.lib.Int8Array object at 0x7fc00127a260>
[
  1,
  12,
  17,
  23,
  28
]

多个数组可以结合在表格中,当附加到列名时,形成表格数据中的列。

months = pa.array([1, 3, 5, 7, 1], type=pa.int8())

years = pa.array([1990, 2000, 1995, 2000, 1995], type=pa.int16())

birthdays_table = pa.table([days, months, years],
                           names=["days", "months", "years"])


birthdays_table
pyarrow.Table
days: int8
months: int8
years: int16
----
days: [[1,12,17,23,28]]
months: [[1,3,5,7,1]]
years: [[1990,2000,1995,2000,1995]]

请参阅 数据类型和内存中的数据模型 以获取更多详细信息。

保存和加载表格#

一旦你有了表格数据,Arrow 就提供了一些现成的功能来保存和恢复 Parquet 这样的常见格式的数据。

import pyarrow.parquet as pq

pq.write_table(birthdays_table, 'birthdays.parquet')

一旦你的数据存储在磁盘上,加载回来只需要一个简单的函数调用,Arrow 对内存和速度进行了大量优化,因此加载数据将尽可能快。

reloaded_birthdays = pq.read_table('birthdays.parquet')

reloaded_birthdays
pyarrow.Table
days: int8
months: int8
years: int16
----
days: [[1,12,17,23,28]]
months: [[1,3,5,7,1]]
years: [[1990,2000,1995,2000,1995]]

在 Arrow 中保存和加载数据通常通过 ParquetIPC 格式Feather 文件格式)、CSVLine-Delimited JSON 格式进行。

执行计算#

Arrow 附带了一组计算函数,可以应用于其数组和表格,因此通过这些计算函数,可以对数据应用转换。

import pyarrow.compute as pc

pc.value_counts(birthdays_table["years"])
<pyarrow.lib.StructArray object at 0x7fc0012dba60>
-- is_valid: all not null
-- child 0 type: int16
  [
    1990,
    2000,
    1995
  ]
-- child 1 type: int64
  [
    1,
    2,
    2
  ]

请参阅计算函数以获取可用计算函数的列表以及如何使用它们的说明。

处理大型数据#

Arrow 还提供了 pyarrow.dataset API 来处理大型数据,它将为您处理将数据分割成较小的块。

%rm -rf .temp/
import pyarrow.dataset as ds

ds.write_dataset(
    birthdays_table, ".temp", format="parquet",
    partitioning=ds.partitioning(
    pa.schema([birthdays_table.schema.field("years")]))
)

加载分区数据集时,将检测到块。

birthdays_dataset = ds.dataset(".temp", format="parquet", partitioning=["years"])

birthdays_dataset.files
['.temp/1990/part-0.parquet',
 '.temp/1995/part-0.parquet',
 '.temp/2000/part-0.parquet']

并且只有在迭代它们时才会懒加载数据块

import datetime

current_year = datetime.datetime.now(datetime.UTC).year

for table_chunk in birthdays_dataset.to_batches():
    print("AGES", pc.subtract(current_year, table_chunk["years"]))
AGES [
  34
]
AGES [
  29,
  29
]
AGES [
  24,
  24
]