计算函数#
Arrow 支持在可能具有不同类型输入的情况下进行逻辑计算操作。
标准的计算操作由 pyarrow.compute
模块提供,可以直接使用:
import pyarrow as pa
import pyarrow.compute as pc
a = pa.array([1, 1, 2, 3])
pc.sum(a)
<pyarrow.Int64Scalar: 7>
分组聚合函数会引发异常,需要通过 pyarrow.Table.group_by()
方法使用。有关更多详细信息,请参阅 Grouped Aggregations。
标准计算函数#
许多计算函数支持数组(分块或不分块)和标量输入,但有些会强制要求使用其中一种。例如,sort_indices
要求其唯一的输入必须是数组。
以下是一些简单示例:
import pyarrow as pa
import pyarrow.compute as pc
a = pa.array([1, 1, 2, 3])
b = pa.array([4, 1, 2, 8])
pc.equal(a, b)
<pyarrow.lib.BooleanArray object at 0x7fc3c23bd540>
[
false,
true,
true,
false
]
x, y = pa.scalar(7.8), pa.scalar(9.3)
pc.multiply(x, y)
<pyarrow.DoubleScalar: 72.54>
这些函数不仅仅能进行元素级的运算。下面是对表格排序的示例:
import pyarrow as pa
import pyarrow.compute as pc
t = pa.table({'x':[1,2,3],'y':[3,2,1]})
i = pc.sort_indices(t, sort_keys=[('y', 'ascending')])
i
<pyarrow.lib.UInt64Array object at 0x7fc3c23bd600>
[
2,
1,
0
]
分组聚合#
PyArrow 支持通过 pyarrow.Table.group_by()
方法对 pyarrow.Table
进行分组聚合。该方法将返回一个分组声明,可以应用哈希聚合函数:
import pyarrow as pa
t = pa.table([
pa.array(["a", "a", "b", "b", "c"]),
pa.array([1, 2, 3, 4, 5]),
], names=["keys", "values"])
t.group_by("keys").aggregate([("values", "sum")])
pyarrow.Table
keys: string
values_sum: int64
----
keys: [["a","b","c"]]
values_sum: [[3,7,5]]
在上一个示例中传递给 aggregate
方法的 "sum"
聚合是 hash_sum
计算函数。
可以通过将它们提供给 aggregate
方法同时执行多个聚合操作:
import pyarrow as pa
t = pa.table([
pa.array(["a", "a", "b", "b", "c"]),
pa.array([1, 2, 3, 4, 5]),
], names=["keys", "values"])
t.group_by("keys").aggregate([
("values", "sum"),
("keys", "count")
])
pyarrow.Table
keys: string
values_sum: int64
keys_count: int64
----
keys: [["a","b","c"]]
values_sum: [[3,7,5]]
keys_count: [[2,2,1]]
还可以为每个聚合函数提供聚合选项,例如我们可以使用 CountOptions
来更改如何计算空值:
import pyarrow as pa
import pyarrow.compute as pc
table_with_nulls = pa.table([
pa.array(["a", "a", "a"]),
pa.array([1, None, None])
], names=["keys", "values"])
table_with_nulls.group_by(["keys"]).aggregate([
("values", "count", pc.CountOptions(mode="all"))
])
pyarrow.Table
keys: string
values_count: int64
----
keys: [["a"]]
values_count: [[3]]
表和数据集拼接#
pyarrow.Table
和 pyarrow.Dataset
都支持通过 pyarrow.Table.join()
和 pyarrow.Dataset.join()
方法进行连接操作。
这些方法接受一个右侧的表或数据集,该数据集将与初始数据集进行连接,并使用两个实体中的一个或多个键来执行连接操作。
默认情况下执行 left outer join
,但可以请求支持的任何连接类型。
基本的连接可以通过提供要连接的表和连接应执行的键来执行。
import pyarrow as pa
table1 = pa.table({'id': [1, 2, 3],
'year': [2020, 2022, 2019]})
table2 = pa.table({'id': [3, 4],
'n_legs': [5, 100],
'animal': ["Brittle stars", "Centipede"]})
joined_table = table1.join(table2, keys="id")
结果将是通过在 id
键上执行左外连接来创建的新表,该新表是将 table1
与 table2
连接起来的结果。
joined_table
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]
我们可以通过将它们传递给 join_type
参数来执行其他类型的连接,例如 full outer join
:
table1.join(table2, keys='id', join_type="full outer")
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2,4]]
year: [[2019,2020,2022,null]]
n_legs: [[5,null,null,100]]
animal: [["Brittle stars",null,null,"Centipede"]]
还可以提供额外的连接键,以便在两个键上进行连接,而不是一个。例如,我们可以向 table2
添加一个 year
列,以便我们可以连接 ('id', 'year')
:
table2_withyear = table2.append_column("year", pa.array([2019, 2022]))
table1.join(table2_withyear, keys=["id", "year"])
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]
其中只有 id=3
和 year=2019
的条目有数据,其余的将为空。
pyarrow.Dataset.join()
也有相同的功能,因此您可以获取两个数据集并将它们连接起来:
import pyarrow.dataset as ds
ds1 = ds.dataset(table1)
ds2 = ds.dataset(table2)
joined_ds = ds1.join(ds2, keys="id")
结果数据集将是一个包含连接数据的 pyarrow.dataset.InMemoryDataset
。
joined_ds.head(5)
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]
使用表达式进行过滤#
表格和数据集都可以通过布尔 Expression
进行过滤。
可以从 pyarrow.compute.field()
开始构建表达式。然后,可以将比较和转换应用于一个或多个字段,以构建您关心的过滤器表达式。
大多数计算函数可用于对 field
执行变换。
例如,我们可以构建一个过滤器,找到列 "nums"
中所有偶数行。
import pyarrow.compute as pc
even_filter = (pc.bit_wise_and(pc.field("nums"), pc.scalar(1)) == pc.scalar(0))
备注
过滤器通过执行数字和 1
之间的按位与操作来找到偶数。由于 1
在二进制形式中表示为 00000001
,只有最后一位设置为 1
的数字才会从 bit_wise_and
操作返回非零结果。这样我们就识别出了所有奇数。鉴于我们感兴趣的是偶数,然后我们检查 bit_wise_and
返回的数字是否等于 0
。只有最后一位是 0
的数字才会使 num & 1
的结果返回 0
,由于所有最后一位是 0
的数字都是 2
的倍数,我们将仅过滤出偶数。
一旦我们有了过滤器,我们可以将其提供给 pyarrow.Table.filter()
方法,以便仅过滤出匹配的行:
table = pa.table({'nums': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'chars': ["a", "b", "c", "d", "e", "f", "g", "h", "i", "l"]})
table.filter(even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[2,4,6,8,10]]
chars: [["b","d","f","h","l"]]
多个过滤器可以使用 &
、|
、~
来连接,以执行与、或和非操作。例如,使用 ~even_filter
实际上将过滤出所有奇数:
table.filter(~even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[1,3,5,7,9]]
chars: [["a","c","e","g","i"]]
可以通过将 even_filter
与 pc.field("nums") > 5
过滤器结合起来,构建一个找到所有大于 5
的偶数的过滤器:
table.filter(even_filter & (pc.field("nums") > 5))
pyarrow.Table
nums: int64
chars: string
----
nums: [[6,8,10]]
chars: [["f","h","l"]]
数据集可以通过 pyarrow.dataset.~Dataset.filter()
方法进行类似的过滤。该方法将返回 pyarrow.dataset.~Dataset
实例,当实际访问数据集的数据时,它将惰性地应用过滤器。
dataset = ds.dataset(table)
filtered = dataset.filter(pc.field("nums") < 5).filter(pc.field("nums") > 2)
filtered.to_table()
pyarrow.Table
nums: int64
chars: string
----
nums: [[3,4]]
chars: [["c","d"]]
用户定义函数#
PyArrow 允许定义和注册自定义计算函数。然后,这些函数可以通过它们注册的函数名在 Python 以及 C++(可能还包括其他包装Arrow C++ 的实现,如 R arrow
包)中调用。
UDF(用户定义函数)支持仅限于标量函数。标量函数是指在数组或标量上执行逐元素操作的函数。通常,标量函数的输出不依赖于参数值的顺序。请注意,这类函数大致对应于 SQL 表达式中使用的函数,或 NumPy 通用函数。
要注册 UDF(用户定义函数),需要定义函数名、函数文档、输入类型和输出类型。使用 pyarrow.compute.register_scalar_function()
进行注册,
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
function_name = "numpy_gcd"
function_docs = {
"summary": "Calculates the greatest common divisor",
"description":
"Given 'x' and 'y' find the greatest number that divides\n"
"evenly into both x and y."
}
input_types = {
"x" : pa.int64(),
"y" : pa.int64()
}
output_type = pa.int64()
def to_np(val):
if isinstance(val, pa.Scalar):
return val.as_py()
else:
return np.array(val)
def gcd_numpy(ctx, x, y):
np_x = to_np(x)
np_y = to_np(y)
return pa.array(np.gcd(np_x, np_y))
pc.register_scalar_function(gcd_numpy,
function_name,
function_docs,
input_types,
output_type)
用户定义函数的实现总是需要名为 ctx
(如上例中所示)的第一个上下文参数,它是 pyarrow.compute.UdfContext
的一个实例。这个上下文暴露了多个有用的属性,特别是 memory_pool()
,用于在用户定义函数的上下文中进行分配。
您可以使用 pyarrow.compute.call_function()
直接调用用户定义的函数:
pc.call_function("numpy_gcd", [pa.scalar(27), pa.scalar(63)])
<pyarrow.Int64Scalar: 9>
pc.call_function("numpy_gcd", [pa.scalar(27), pa.array([81, 12, 5])])
<pyarrow.lib.Int64Array object at 0x7fc3c2432140>
[
27,
3,
1
]
处理数据集#
更一般地,用户定义函数可以在任何可以通过其名称引用计算函数的地方使用。例如,它们可以使用 pyarrow.dataset.Expression._call()
在数据集的列上调用。
考虑一个实例,数据在一个表格中,我们想要计算一列与标量值 30 的最大公约数(GCD)。我们将重用上面创建的“numpy_gcd”用户定义函数:
import pyarrow.dataset as ds
data_table = pa.table({'category': ['A', 'B', 'C', 'D'], 'value': [90, 630, 1827, 2709]})
dataset = ds.dataset(data_table)
func_args = [pc.scalar(30), ds.field("value")]
dataset.to_table(
columns={
'gcd_value': ds.field('')._call("numpy_gcd", func_args),
'value': ds.field('value'),
'category': ds.field('category')
})
pyarrow.Table
gcd_value: int64
value: int64
category: string
----
gcd_value: [[30,30,3,3]]
value: [[90,630,1827,2709]]
category: [["A","B","C","D"]]
请注意,ds.field('')._call(...)
返回 pyarrow.compute.Expression
。传递给这个函数调用的参数是表达式,而不是标量值(注意 pyarrow.scalar()
和 pyarrow.compute.scalar()
之间的区别,后者产生一个表达式)。当投影操作执行时,这个表达式会被评估。
投影表达式#
在上述示例中,我们使用了一个表达式来向表中添加一个新列(gcd_value
)。向表中添加新的、动态计算的列被称为“投影”(projection),并且在投影表达式中可以使用哪些类型的函数是有限制的。投影函数必须为每个输入行发出一个单一的输出值。该输出值应完全从输入行计算得出,且不应依赖于任何其他行。例如,我们上面用作示例的“numpy_gcd”函数是可用于投影的有效函数。而“累积求和”函数将不是有效函数,因为每个输入行的结果取决于之前的行。“删除空值”函数也将无效,因为它不为某些行发出值。