数据类型和内存中的数据模型#

Apache Arrow 通过将类型元数据与内存缓冲区组合来定义列式数组数据结构,就像在内存和 IO中解释的那样。这些数据结构通过一系列相互关联的类在 Python 中被暴露出来:

  • 类型元数据:pyarrow.DataType 的实例,描述逻辑数组类型。

  • 模式:pyarrow.Schema 的实例,描述一个命名的类型集合。这些可以被视为类似表格对象中的列类型。

  • 数组:pyarrow.Array 的实例,是由 Arrow Buffer 对象组成的原子性、连续的列式数据结构。

  • 记录批次:pyarrow.RecordBatch 的实例,是一组具有特定模式的 Array 对象的集合。

  • 表格:pyarrow.Table 的实例,是一种逻辑表格数据结构,其中每一列由一个或多个相同类型的 pyarrow.Array 对象组成。

类型元数据#

Apache Arrow 定义了与语言无关的列式数组数据结构。这些包括:

  • 固定长度的原始(primitive)类型:数字、布尔值、日期和时间、固定大小的二进制、十进制以及其他可以放入给定数量的值。

  • 可变长度的原始(primitive)类型:二进制、字符串。

  • 嵌套类型:列表、映射、结构体和联合体。

  • 字典类型:一种编码后的分类类型。

Arrow 中的每个逻辑数据类型都有对应的工厂函数,用于在 Python 中创建该类型对象的实例:

import pyarrow as pa

pa.int32()
DataType(int32)
pa.string()
DataType(string)
pa.binary()
DataType(binary)
pa.binary(10)
FixedSizeBinaryType(fixed_size_binary[10])
pa.timestamp('ms')
TimestampType(timestamp[ms])

使用“逻辑类型”(logical type)这个名称,是因为一种或多种类型的物理存储(physical storage)可能相同。例如,int64float64timestamp[ms] 每个值都占用 64 位。

这些对象是元数据;它们用于描述数组、模式和记录批次中的数据。在 Python 中,它们可以用在输入数据(例如 Python 对象)可能被强制转换为多个 Arrow 类型的函数中。

Field 类型是类型加上名称和可选的用户定义元数据:

t1 = pa.int32()
f0 = pa.field('int32_field', t1)
f0
pyarrow.Field<int32_field: int32>
f0.name, f0.type
('int32_field', DataType(int32))

Arrow 支持嵌套值类型,如列表、映射、结构体和联合体。创建这些时,必须传递类型或字段以指示类型的子项的数据类型。例如,我们可以用以下方式定义 int32 值的列表:

t1 = pa.int32()
t6 = pa.list_(t1)
t6
ListType(list<item: int32>)

结构体是一组命名字段的集合:

t1 = pa.int32()

t2 = pa.string()

t3 = pa.binary()

t4 = pa.binary(10)

t5 = pa.timestamp('ms')

fields = [
    pa.field('s0', t1),
    pa.field('s1', t2),
    pa.field('s2', t4),
    pa.field('s3', t6),
]


t7 = pa.struct(fields)

print(t7)
struct<s0: int32, s1: string, s2: fixed_size_binary[10], s3: list<item: int32>>

为了方便,您可以直接传递 (name, type) 元组而不是 Field 实例:

t8 = pa.struct([('s0', t1), ('s1', t2), ('s2', t4), ('s3', t6)])

print(t8)

t8 == t7
struct<s0: int32, s1: string, s2: fixed_size_binary[10], s3: list<item: int32>>
True

有关数据类型函数的完整列表,请参阅数据类型 API

模式#

Schema 类型与 struct 数组类型类似;它定义了记录批次或表格数据结构中的列名和类型。pyarrow.schema() 工厂函数在 Python 中创建新的 Schema 对象:

my_schema = pa.schema([('field0', t1),
                       ('field1', t2),
                       ('field2', t4),
                       ('field3', t6)])


my_schema
field0: int32
field1: string
field2: fixed_size_binary[10]
field3: list<item: int32>
  child 0, item: int32

在某些应用中,您可能不需要直接创建模式,只需使用嵌入在 IPC 消息 中的模式。

数组#

对于每种数据类型,都有一个相应的数组数据结构用于保存内存缓冲区,这些缓冲区定义了单个连续的列式数组数据块。当您使用 PyArrow 时,这些数据可能来自 IPC 工具,但也可以从各种类型的 Python 序列(列表、NumPy 数组、pandas 数据)创建。

创建数组的简单方法是使用 pyarrow.array,它类似于 numpy.array 函数。默认情况下,PyArrow 会为您推断数据类型:

arr = pa.array([1, 2, None, 3])

arr
<pyarrow.lib.Int64Array object at 0x7f72fb0ee440>
[
  1,
  2,
  null,
  3
]

但您也可以传递特定的数据类型来覆盖类型推断:

pa.array([1, 2], type=pa.uint16())
<pyarrow.lib.UInt16Array object at 0x7f72fb0ee5c0>
[
  1,
  2
]

数组的 type 属性是相应的类型元数据部分:

arr.type
DataType(int64)

每个内存数组都有一个已知的长度和空值计数(如果没有空值,则为 0):

len(arr), arr.null_count
(4, 1)

可以使用常规索引选择标量值。pyarrow.array()None 值转换为 Arrow 空值;返回特殊的 pyarrow.NA 值来表示空值:

arr[0], arr[2]
(<pyarrow.Int64Scalar: 1>, <pyarrow.Int64Scalar: None>)

Arrow 数据是不可变的,因此可以选择值但不能赋值。

数组可以在不复制的情况下进行切片:

arr[1:3]
<pyarrow.lib.Int64Array object at 0x7f733c106680>
[
  2,
  null
]

None 值和 NAN 处理#

如上一节所述,在转换为 pyarrow.Array 时,Python 对象 None 总是被转换为 Arrow 空元素。对于由 Python 对象 float('nan')numpy.nan 表示的浮点数 NaN 值,我们通常在转换过程中将其转换为有效的浮点数值。如果向 pyarrow.array 提供包含 np.nan 的整数输入,将引发 ValueError

为了更好地与 Pandas 兼容,支持将 NaN 值解释为空元素。这在所有 from_pandas 函数上自动启用,并且可以通过将 from_pandas=True 作为函数参数传递给其他转换函数来启用。

列表数组#

pyarrow.array() 能够推断出简单嵌套数据结构(如列表)的类型:

nested_arr = pa.array([[], None, [1, 2], [None, 1]])

print(nested_arr.type)
list<item: int64>

结构体数组#

pyarrow.array() 能够从字典数组中推断出结构体类型的模式:

pa.array([{'x': 1, 'y': True}, {'z': 3.4, 'x': 4}])
<pyarrow.lib.StructArray object at 0x7f72fb0eec80>
-- is_valid: all not null
-- child 0 type: int64
  [
    1,
    4
  ]
-- child 1 type: bool
  [
    true,
    null
  ]
-- child 2 type: double
  [
    null,
    3.4
  ]

结构体数组可以从 Python 字典或元组的序列中初始化。对于元组,您必须显式传递类型:

ty = pa.struct([('x', pa.int8()),
                ('y', pa.bool_())])
pa.array([{'x': 1, 'y': True}, {'x': 2, 'y': False}], type=ty)
<pyarrow.lib.StructArray object at 0x7f72fb0eece0>
-- is_valid: all not null
-- child 0 type: int8
  [
    1,
    2
  ]
-- child 1 type: bool
  [
    true,
    false
  ]
pa.array([(3, True), (4, False)], type=ty)
<pyarrow.lib.StructArray object at 0x7f72fb0eef20>
-- is_valid: all not null
-- child 0 type: int8
  [
    3,
    4
  ]
-- child 1 type: bool
  [
    true,
    false
  ]

初始化结构体数组时,在结构级别和各个字段级别都允许有空值。如果从 Python 字典序列中初始化,缺失的字典键将被视为空值:

pa.array([{'x': 1}, None, {'y': None}], type=ty)
<pyarrow.lib.StructArray object at 0x7f72fb0eefe0>
-- is_valid:
  [
    true,
    false,
    true
  ]
-- child 0 type: int8
  [
    1,
    0,
    null
  ]
-- child 1 type: bool
  [
    null,
    false,
    null
  ]

您还可以从每个结构组件的现有数组构建结构数组。在这种情况下,数据存储将与各个数组共享,不涉及复制:

xs = pa.array([5, 6, 7], type=pa.int16())

ys = pa.array([False, True, True])

arr = pa.StructArray.from_arrays((xs, ys), names=('x', 'y'))
arr.type, arr
(StructType(struct<x: int16, y: bool>),
 <pyarrow.lib.StructArray object at 0x7f72fb0ef040>
 -- is_valid: all not null
 -- child 0 type: int16
   [
     5,
     6,
     7
   ]
 -- child 1 type: bool
   [
     false,
     true,
     true
   ])

映射数组#

可以从元组(键-值对)的列表构建映射数组,但只有在类型显式传递给 pyarrow.array() 函数时才能这样做:

data = [[('x', 1), ('y', 0)], [('a', 2), ('b', 45)]]

ty = pa.map_(pa.string(), pa.int64())

pa.array(data, type=ty)
<pyarrow.lib.MapArray object at 0x7f72fb0ef100>
[
  keys:
  [
    "x",
    "y"
  ]
  values:
  [
    1,
    0
  ],
  keys:
  [
    "a",
    "b"
  ]
  values:
  [
    2,
    45
  ]
]

映射数组也可以从偏移量、键和项数组构建。偏移量代表每个映射的起始位置。请注意,pyarrow.MapArray.keys()pyarrow.MapArray.items() 属性提供扁平化的键和项。要使键和项与它们的行相关联,请使用 pyarrow.ListArray.from_arrays() 构造函数与 pyarrow.MapArray.offsets 属性。

arr = pa.MapArray.from_arrays([0, 2, 3], ['x', 'y', 'z'], [4, 5, 6])

arr.keys
<pyarrow.lib.StringArray object at 0x7f72fb0ef280>
[
  "x",
  "y",
  "z"
]
arr.items
<pyarrow.lib.Int64Array object at 0x7f72fb0eee00>
[
  4,
  5,
  6
]
pa.ListArray.from_arrays(arr.offsets, arr.keys)
<pyarrow.lib.ListArray object at 0x7f72fb0ef4c0>
[
  [
    "x",
    "y"
  ],
  [
    "z"
  ]
]
pa.ListArray.from_arrays(arr.offsets, arr.items)
<pyarrow.lib.ListArray object at 0x7f72fb0ef700>
[
  [
    4,
    5
  ],
  [
    6
  ]
]

联合数组#

联合类型表示嵌套数组类型,其中每个值可以是(且仅是)一组可能类型中的一个。联合数组有两种可能的存储类型:稀疏和密集。

在稀疏联合数组中,每个子数组的长度与结果联合数组相同。它们与一个 int8 类型的“types”数组相连,该数组指示每个值必须从哪个子数组中选择:

xs = pa.array([5, 6, 7])

ys = pa.array([False, False, True])

types = pa.array([0, 1, 1], type=pa.int8())

union_arr = pa.UnionArray.from_sparse(types, [xs, ys])
union_arr.type, union_arr
(SparseUnionType(sparse_union<0: int64=0, 1: bool=1>),
 <pyarrow.lib.UnionArray object at 0x7f72fb0ef160>
 -- is_valid: all not null
 -- type_ids:   [
     0,
     1,
     1
   ]
 -- child 0 type: int64
   [
     5,
     6,
     7
   ]
 -- child 1 type: bool
   [
     false,
     false,
     true
   ])

在密集联合数组中,除了 int8 类型的“types”数组外,还需要传递 int32 类型的“offsets”数组,该数组指示每个值在选定的子数组中的每个偏移量处可以找到。

xs = pa.array([5, 6, 7])

ys = pa.array([False, True])

types = pa.array([0, 1, 1, 0, 0], type=pa.int8())

offsets = pa.array([0, 0, 1, 1, 2], type=pa.int32())

union_arr = pa.UnionArray.from_dense(types, offsets, [xs, ys])

union_arr.type, union_arr
(DenseUnionType(dense_union<0: int64=0, 1: bool=1>),
 <pyarrow.lib.UnionArray object at 0x7f72fb0ef640>
 -- is_valid: all not null
 -- type_ids:   [
     0,
     1,
     1,
     0,
     0
   ]
 -- value_offsets:   [
     0,
     0,
     1,
     1,
     2
   ]
 -- child 0 type: int64
   [
     5,
     6,
     7
   ]
 -- child 1 type: bool
   [
     false,
     true
   ])

字典数组#

PyArrow 中的 Dictionary 类型是一种特殊类型的数组,类似于 R 中的因子或 pandas.Categorical。它允许一个或多个文件或流中的记录批次传输引用共享字典的整数索引,该字典包含逻辑数组中的独特值。这种方法特别常用于字符串以节省内存并提高性能。

在 Apache Arrow 格式中处理字典的方式以及它们在 C++ 和 Python 中的呈现方式略有不同。我们定义了特殊的 DictionaryArray 类型和相应的字典类型。让我们考虑一个例子:

indices = pa.array([0, 1, 0, 1, 2, 0, None, 2])

dictionary = pa.array(['foo', 'bar', 'baz'])

dict_array = pa.DictionaryArray.from_arrays(indices, dictionary)

dict_array
<pyarrow.lib.DictionaryArray object at 0x7f72fb1245f0>

-- dictionary:
  [
    "foo",
    "bar",
    "baz"
  ]
-- indices:
  [
    0,
    1,
    0,
    1,
    2,
    0,
    null,
    2
  ]
dict_array.indices
<pyarrow.lib.Int64Array object at 0x7f72fb0ef3a0>
[
  0,
  1,
  0,
  1,
  2,
  0,
  null,
  2
]
print(dict_array.type)
dict_array.dictionary
dictionary<values=string, indices=int64, ordered=0>
<pyarrow.lib.StringArray object at 0x7f72fb0efb80>
[
  "foo",
  "bar",
  "baz"
]

当在 pandas 中使用 DictionaryArray 时,对应的是 pandas.Categorical

dict_array.to_pandas()
0    foo
1    bar
2    foo
3    bar
4    baz
5    foo
6    NaN
7    baz
dtype: category
Categories (3, object): ['foo', 'bar', 'baz']

记录批次#

在 Apache Arrow 中,记录批次(Record Batch)是一组等长的数组实例。让我们考虑数组集合:

data = [
    pa.array([1, 2, 3, 4]),
    pa.array(['foo', 'bar', 'baz', None]),
    pa.array([True, None, False, True])
]

可以使用 RecordBatch.from_arrays 从数组列表创建记录批次。

batch = pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2'])
batch.num_columns, batch.num_rows
(3, 4)
batch.schema
f0: int64
f1: string
f2: bool
batch[1]
<pyarrow.lib.StringArray object at 0x7f72fb0efa00>
[
  "foo",
  "bar",
  "baz",
  null
]

记录批次可以像数组一样进行切片,而无需复制内存。

batch2 = batch.slice(1, 3)

batch2[1]
<pyarrow.lib.StringArray object at 0x7f72fb0efdc0>
[
  "bar",
  "baz",
  null
]

PyArrow 表格#

PyArrow Table 类型不是 Apache Arrow 规范的一部分,而是一个工具,用于帮助处理多个记录批次和数组片段作为单个逻辑数据集。例如,我们可能需要从套接字流中接收多个小记录批次,然后将它们连接成连续的内存以供 NumPy 或 pandas 使用。Table 对象可以在不要求额外内存复制的情况下实现这一点。

考虑上面创建的记录批次,我们可以使用 Table.from_batches 创建一个包含一个或多个批次副本的表:

batches = [batch] * 5

table = pa.Table.from_batches(batches)

table.num_rows, table
(20,
 pyarrow.Table
 f0: int64
 f1: string
 f2: bool
 ----
 f0: [[1,2,3,4],[1,2,3,4],...,[1,2,3,4],[1,2,3,4]]
 f1: [["foo","bar","baz",null],["foo","bar","baz",null],...,["foo","bar","baz",null],["foo","bar","baz",null]]
 f2: [[true,null,false,true],[true,null,false,true],...,[true,null,false,true],[true,null,false,true]])

表格的列是 ChunkedArray 的实例,它是相同类型的一个或多个数组的容器。

c = table[0]

c
<pyarrow.lib.ChunkedArray object at 0x7f72fb0eff40>
[
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
...,
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ]
]
c.num_chunks, c.chunk(0)
(5,
 <pyarrow.lib.Int64Array object at 0x7f72fb0ef460>
 [
   1,
   2,
   3,
   4
 ])

正如您将在 pandas 部分 看到的,我们可以将这些对象转换为连续的 NumPy 数组以供 pandas 使用:

c.to_pandas()
0     1
1     2
2     3
3     4
4     1
5     2
6     3
7     4
8     1
9     2
10    3
11    4
12    1
13    2
14    3
15    4
16    1
17    2
18    3
19    4
Name: f0, dtype: int64

如果模式相等,还可以使用 pyarrow.concat_tables() 将多个表连接在一起以形成单个表。

tables = [table] * 2

table_all = pa.concat_tables(tables)

print(table_all.num_rows)

c = table_all[0]

c.num_chunks
40
10

这个函数类似于 Table.from_batches,但是它使用表作为输入而不是记录批次。记录批次可以转换为表,但反过来不行,所以如果你的数据已经是表的形式,那么使用 concat_tables()

自定义 Schema 和字段元数据#

Arrow 支持模式级别和字段级别的自定义键值元数据,允许系统插入自己的应用程序定义的元数据以自定义行为。

可以在 pyarrow.Schema.metadata 中访问模式级别的自定义元数据,在 pyarrow.Field.metadata 中访问字段级别的自定义元数据。

请注意,这种元数据在流式处理、序列化和进程间通信(IPC)过程中得以保留。

要自定义现有表的模式元数据,可以使用 pyarrow.Table.replace_schema_metadata() 方法:

table.schema.metadata # empty

table = table.replace_schema_metadata({"f0": "First dose"})

table.schema.metadata
{b'f0': b'First dose'}

要自定义表模式中字段的元数据,可以使用 pyarrow.Field.with_metadata() 方法。

field_f1 = table.schema.field("f1")

field_f1.metadata # empty

field_f1 = field_f1.with_metadata({"f1": "Second dose"})

field_f1.metadata
{b'f1': b'Second dose'}

这两种选项都会创建数据的浅拷贝,并且实际上不会改变不可变的 Schema。要更改表的 Schema 中的元数据,我们在调用 pyarrow.Table.replace_schema_metadata() 时会创建一个新对象。

要更改模式中字段的元数据,我们需要定义一个新模式,并将数据转换为这个新模式:

my_schema2 = pa.schema([
   pa.field('f0', pa.int64(), metadata={"name": "First dose"}),
   pa.field('f1', pa.string(), metadata={"name": "Second dose"}),
   pa.field('f2', pa.bool_())],
   metadata={"f2": "booster"})
t2 = table.cast(my_schema2)

t2.schema.field("f0").metadata
{b'name': b'First dose'}
t2.schema.field("f1").metadata
{b'name': b'Second dose'}
t2.schema.metadata
{b'f2': b'booster'}

元数据键值对在 C++ 实现中是 std::string 对象,而在 Python 中则是字节对象(b'...')。

记录批次读取器#

PyArrow 中的许多函数要么返回要么接受 pyarrow.RecordBatchReader 作为参数。它可以像任何记录批次的可迭代对象一样使用,但在获取任何批次之前还提供了它们共同的模式。

schema = pa.schema([('x', pa.int64())])
def iter_record_batches():
   for i in range(2):
      yield pa.RecordBatch.from_arrays([pa.array([1, 2, 3])], schema=schema)
reader = pa.RecordBatchReader.from_batches(schema, iter_record_batches())
print(reader.schema)
x: int64
for batch in reader:
   print(batch)
pyarrow.RecordBatch
x: int64
----
x: [1,2,3]
pyarrow.RecordBatch
x: int64
----
x: [1,2,3]

它还可以使用 C 流接口在语言之间发送。