From 60ebebe29f60b3a10fbaa5c8e8583efb57bcb37a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 10 Feb 2025 21:59:26 +0000 Subject: [PATCH 01/11] perf: Use simple null constraints to simplify queries --- bigframes/core/compile/compiled.py | 106 +++++++++++++++++++++- bigframes/core/compile/compiler.py | 11 ++- bigframes/core/compile/isin.py | 71 --------------- bigframes/core/compile/single_column.py | 83 ----------------- bigframes/core/expression.py | 21 +++++ bigframes/core/nodes.py | 115 +++++++++++++++++++----- 6 files changed, 224 insertions(+), 183 deletions(-) delete mode 100644 bigframes/core/compile/isin.py delete mode 100644 bigframes/core/compile/single_column.py diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 93be998b5b..ad6e846692 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -16,7 +16,7 @@ import functools import itertools import typing -from typing import Optional, Sequence +from typing import Literal, Optional, Sequence, Tuple import bigframes_vendored.ibis import bigframes_vendored.ibis.backends.bigquery.backend as ibis_bigquery @@ -323,7 +323,97 @@ def from_pandas( columns=columns, ) - ## Methods that only work with ordering + def join( + self: UnorderedIR, + right: UnorderedIR, + conditions: Tuple[Tuple[str, str], ...], + type: Literal["inner", "outer", "left", "right", "cross"], + *, + join_nulls: bool = True, + ) -> UnorderedIR: + """Join two expressions by column equality. + + Arguments: + left: Expression for left table to join. + left_column_ids: Column IDs (not label) to join by. + right: Expression for right table to join. + right_column_ids: Column IDs (not label) to join by. + how: The type of join to perform. + join_nulls (bool): + If True, will joins NULL keys to each other. + Returns: + The joined expression. The resulting columns will be, in order, + first the coalesced join keys, then, all the left columns, and + finally, all the right columns. + """ + # Shouldn't need to select the column ids explicitly, but it seems that ibis has some + # bug resolving column ids otherwise, potentially because of the "JoinChain" op + left_table = self._to_ibis_expr().select(self.column_ids) + right_table = right._to_ibis_expr().select(right.column_ids) + + keyfunc = value_to_join_key if join_nulls else _as_identity + join_conditions = [ + keyfunc(left_table[left_index]) == keyfunc(right_table[right_index]) + for left_index, right_index in conditions + ] + + combined_table = bigframes_vendored.ibis.join( + left_table, + right_table, + predicates=join_conditions, + how=type, # type: ignore + ) + combined_table = bigframes_vendored.ibis.join( + left_table, + right_table, + predicates=join_conditions, + how=type, # type: ignore + ) + columns = [combined_table[col.get_name()] for col in self.columns] + [ + combined_table[col.get_name()] for col in right.columns + ] + return UnorderedIR( + combined_table, + columns=columns, + ) + + def isin_join( + self: UnorderedIR, + right: UnorderedIR, + indicator_col: str, + conditions: Tuple[str, str], + *, + join_nulls: bool = True, + ) -> UnorderedIR: + """Join two expressions by column equality. + + Arguments: + left: Expression for left table to join. + right: Expression for right table to join. + conditions: Id pairs to compare + Returns: + The joined expression. + """ + left_table = self._to_ibis_expr() + right_table = right._to_ibis_expr() + keyfunc = value_to_join_key if join_nulls else _as_identity + new_column = ( + keyfunc(left_table[conditions[0]]) + .isin(keyfunc(right_table[conditions[1]])) + .name(indicator_col) + ) + + columns = tuple( + itertools.chain( + (left_table[col.get_name()] for col in self.columns), (new_column,) + ) + ) + + return UnorderedIR( + left_table, + columns=columns, + ) + def project_window_op( self, expression: ex.Aggregation, @@ -519,3 +609,15 @@ def _as_identity(value: ibis_types.Value): if value.type().is_float64() or value.type().is_geospatial(): return value.cast(ibis_dtypes.str) return value + + +def value_to_join_key(value: ibis_types.Value): + """Converts nullable values to non-null string SQL will not match null keys together - but pandas does.""" + if not value.type().is_string(): + value = value.cast(ibis_dtypes.str) + + return ( + value.fill_null(ibis_types.literal("$NULL_SENTINEL$")) + if hasattr(value, "fill_null") + else value.fillna(ibis_types.literal("$NULL_SENTINEL$")) + ) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index ff5f1d61c8..12938523d2 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -29,11 +29,9 @@ import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.explode import bigframes.core.compile.ibis_types -import bigframes.core.compile.isin import bigframes.core.compile.scalar_op_compiler import bigframes.core.compile.scalar_op_compiler as compile_scalar import bigframes.core.compile.schema_translator -import bigframes.core.compile.single_column import bigframes.core.expression as ex import bigframes.core.identifiers as ids import bigframes.core.nodes as nodes @@ -130,24 +128,25 @@ def compile_join(self, node: nodes.JoinNode): condition_pairs = tuple( (left.id.sql, right.id.sql) for left, right in node.conditions ) + left_unordered = self.compile_node(node.left_child) right_unordered = self.compile_node(node.right_child) - return bigframes.core.compile.single_column.join_by_column_unordered( - left=left_unordered, + return left_unordered.join( right=right_unordered, type=node.type, conditions=condition_pairs, + join_nulls=node.joins_nulls, ) @_compile_node.register def compile_isin(self, node: nodes.InNode): left_unordered = self.compile_node(node.left_child) right_unordered = self.compile_node(node.right_child) - return bigframes.core.compile.isin.isin_unordered( - left=left_unordered, + return left_unordered.isin_join( right=right_unordered, indicator_col=node.indicator_col.sql, conditions=(node.left_col.id.sql, node.right_col.id.sql), + join_nulls=node.joins_nulls, ) @_compile_node.register diff --git a/bigframes/core/compile/isin.py b/bigframes/core/compile/isin.py deleted file mode 100644 index 29acf9e284..0000000000 --- a/bigframes/core/compile/isin.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2024 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Helpers to join ArrayValue objects.""" - -from __future__ import annotations - -import itertools -from typing import Tuple - -import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes -import bigframes_vendored.ibis.expr.types as ibis_types - -import bigframes.core.compile.compiled as compiled - - -def isin_unordered( - left: compiled.UnorderedIR, - right: compiled.UnorderedIR, - indicator_col: str, - conditions: Tuple[str, str], -) -> compiled.UnorderedIR: - """Join two expressions by column equality. - - Arguments: - left: Expression for left table to join. - right: Expression for right table to join. - conditions: Id pairs to compare - Returns: - The joined expression. - """ - left_table = left._to_ibis_expr() - right_table = right._to_ibis_expr() - new_column = ( - value_to_join_key(left_table[conditions[0]]) - .isin(value_to_join_key(right_table[conditions[1]])) - .name(indicator_col) - ) - - columns = tuple( - itertools.chain( - (left_table[col.get_name()] for col in left.columns), (new_column,) - ) - ) - - return compiled.UnorderedIR( - left_table, - columns=columns, - ) - - -def value_to_join_key(value: ibis_types.Value): - """Converts nullable values to non-null string SQL will not match null keys together - but pandas does.""" - if not value.type().is_string(): - value = value.cast(ibis_dtypes.str) - return ( - value.fill_null(ibis_types.literal("$NULL_SENTINEL$")) - if hasattr(value, "fill_null") - else value.fillna(ibis_types.literal("$NULL_SENTINEL$")) - ) diff --git a/bigframes/core/compile/single_column.py b/bigframes/core/compile/single_column.py deleted file mode 100644 index 9216051d91..0000000000 --- a/bigframes/core/compile/single_column.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Helpers to join ArrayValue objects.""" - -from __future__ import annotations - -from typing import Literal, Tuple - -import bigframes_vendored.ibis.expr.api as ibis_api -import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes -import bigframes_vendored.ibis.expr.types as ibis_types - -import bigframes.core.compile.compiled as compiled - - -def join_by_column_unordered( - left: compiled.UnorderedIR, - right: compiled.UnorderedIR, - conditions: Tuple[Tuple[str, str], ...], - type: Literal["inner", "outer", "left", "right", "cross"], -) -> compiled.UnorderedIR: - """Join two expressions by column equality. - - Arguments: - left: Expression for left table to join. - left_column_ids: Column IDs (not label) to join by. - right: Expression for right table to join. - right_column_ids: Column IDs (not label) to join by. - how: The type of join to perform. - allow_row_identity_join (bool): - If True, allow matching by row identity. Set to False to always - perform a true JOIN in generated SQL. - Returns: - The joined expression. The resulting columns will be, in order, - first the coalesced join keys, then, all the left columns, and - finally, all the right columns. - """ - # Shouldn't need to select the column ids explicitly, but it seems that ibis has some - # bug resolving column ids otherwise, potentially because of the "JoinChain" op - left_table = left._to_ibis_expr().select(left.column_ids) - right_table = right._to_ibis_expr().select(right.column_ids) - join_conditions = [ - value_to_join_key(left_table[left_index]) - == value_to_join_key(right_table[right_index]) - for left_index, right_index in conditions - ] - - combined_table = ibis_api.join( - left_table, - right_table, - predicates=join_conditions, - how=type, # type: ignore - ) - columns = [combined_table[col.get_name()] for col in left.columns] + [ - combined_table[col.get_name()] for col in right.columns - ] - return compiled.UnorderedIR( - combined_table, - columns=columns, - ) - - -def value_to_join_key(value: ibis_types.Value): - """Converts nullable values to non-null string SQL will not match null keys together - but pandas does.""" - if not value.type().is_string(): - value = value.cast(ibis_dtypes.str) - return ( - value.fill_null(ibis_types.literal("$NULL_SENTINEL$")) - if hasattr(value, "fill_null") - else value.fillna(ibis_types.literal("$NULL_SENTINEL$")) - ) diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index 8621d5d915..afd290827d 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -165,6 +165,10 @@ def expensive(self) -> bool: isinstance(ex, OpExpression) and ex.op.expensive for ex in self.walk() ) + @property + def nullable(self) -> bool: + return True + @property @abc.abstractmethod def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: @@ -248,6 +252,10 @@ def is_const(self) -> bool: def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: return () + @property + def nullable(self) -> bool: + return pd.isna(self.value) # type: ignore + def output_type( self, input_types: dict[ids.ColumnId, bigframes.dtypes.Dtype] ) -> dtypes.ExpressionType: @@ -344,6 +352,11 @@ def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: def is_const(self) -> bool: return False + @property + def nullable(self) -> bool: + # Safe default, need to actually bind input schema to determine + return True + def output_type( self, input_types: dict[ids.ColumnId, bigframes.dtypes.Dtype] ) -> dtypes.ExpressionType: @@ -408,6 +421,14 @@ def is_const(self) -> bool: def children(self): return self.inputs + @property + def nullable(self) -> bool: + # This is very conservative, need to label null properties of individual ops to get more precise + null_free = self.is_identity and not any( + child.nullable for child in self.inputs + ) + return not null_free + def output_type( self, input_types: dict[ids.ColumnId, dtypes.ExpressionType] ) -> dtypes.ExpressionType: diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 88e084d79c..19e340274d 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -51,6 +51,17 @@ class Field: id: bfet_ids.ColumnId dtype: bigframes.dtypes.Dtype + # Best effort, nullable=True if not certain + nullable: bool = True + + def with_nullable(self) -> Field: + return Field(self.id, self.dtype, nullable=True) + + def with_nonnull(self) -> Field: + return Field(self.id, self.dtype, nullable=False) + + def with_id(self, id: bfet_ids.ColumnId) -> Field: + return Field(id, self.dtype, nullable=self.nullable) @dataclasses.dataclass(eq=False, frozen=True) @@ -278,10 +289,15 @@ def defines_namespace(self) -> bool: def get_type(self, id: bfet_ids.ColumnId) -> bigframes.dtypes.Dtype: return self._dtype_lookup[id] + # TODO: Deprecate in favor of field_by_id, and eventually, by rich references @functools.cached_property - def _dtype_lookup(self): + def _dtype_lookup(self) -> dict[bfet_ids.ColumnId, bigframes.dtypes.Dtype]: return {field.id: field.dtype for field in self.fields} + @functools.cached_property + def field_by_id(self) -> Mapping[bfet_ids.ColumnId, Field]: + return {field.id: field for field in self.fields} + class AdditiveNode: """Definition of additive - if you drop added_fields, you end up with the descendent. @@ -456,7 +472,7 @@ def explicitly_ordered(self) -> bool: @property def added_fields(self) -> Tuple[Field, ...]: - return (Field(self.indicator_col, bigframes.dtypes.BOOL_DTYPE),) + return (Field(self.indicator_col, bigframes.dtypes.BOOL_DTYPE, nullable=False),) @property def fields(self) -> Iterable[Field]: @@ -490,6 +506,12 @@ def referenced_ids(self) -> COLUMN_SET: def additive_base(self) -> BigFrameNode: return self.left_child + @property + def joins_nulls(self) -> bool: + left_nullable = self.left_child.field_by_id[self.left_col.id].nullable + right_nullable = self.right_child.field_by_id[self.right_col.id].nullable + return left_nullable and right_nullable + def replace_additive_base(self, node: BigFrameNode): return dataclasses.replace(self, left_child=node) @@ -550,7 +572,23 @@ def explicitly_ordered(self) -> bool: @property def fields(self) -> Iterable[Field]: - return itertools.chain(self.left_child.fields, self.right_child.fields) + left_fields = self.left_child.fields + if self.type in ("right", "outer"): + left_fields = map(lambda x: x.with_nullable(), left_fields) + right_fields = self.right_child.fields + if self.type in ("left", "outer"): + right_fields = map(lambda x: x.with_nullable(), right_fields) + return itertools.chain(left_fields, right_fields) + + @property + def joins_nulls(self) -> bool: + for left_ref, right_ref in self.conditions: + if ( + self.left_child.field_by_id[left_ref.id].nullable + and self.right_child.field_by_id[right_ref.id].nullable + ): + return True + return False @functools.cached_property def variables_introduced(self) -> int: @@ -643,6 +681,7 @@ def explicitly_ordered(self) -> bool: @property def fields(self) -> Iterable[Field]: # TODO: Output names should probably be aligned beforehand or be part of concat definition + # TODO: Handle nullability return ( Field(id, field.dtype) for id, field in zip(self.output_ids, self.children[0].fields) @@ -716,7 +755,9 @@ def explicitly_ordered(self) -> bool: @functools.cached_property def fields(self) -> Iterable[Field]: - return (Field(self.output_id, next(iter(self.start.fields)).dtype),) + return ( + Field(self.output_id, next(iter(self.start.fields)).dtype, nullable=False), + ) @functools.cached_property def variables_introduced(self) -> int: @@ -795,6 +836,7 @@ class ScanList: @dataclasses.dataclass(frozen=True, eq=False) class ReadLocalNode(LeafNode): # TODO: Combine feather_bytes, data_schema, n_rows into a LocalDataDef struct + # TODO: Track nullability for local data feather_bytes: bytes data_schema: schemata.ArraySchema n_rows: int @@ -809,7 +851,8 @@ def fields(self) -> Iterable[Field]: fields = (Field(col_id, dtype) for col_id, dtype, _ in self.scan_list.items) if self.offsets_col is not None: return itertools.chain( - fields, (Field(self.offsets_col, bigframes.dtypes.INT_DTYPE),) + fields, + (Field(self.offsets_col, bigframes.dtypes.INT_DTYPE, nullable=False),), ) return fields @@ -895,6 +938,11 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: else tuple(table.clustering_fields), ) + @property + @functools.cache + def schema_by_id(self): + return {col.name: col for col in self.physical_schema} + @dataclasses.dataclass(frozen=True) class BigqueryDataSource: @@ -937,7 +985,10 @@ def session(self): @property def fields(self) -> Iterable[Field]: - return (Field(col_id, dtype) for col_id, dtype, _ in self.scan_list.items) + return ( + Field(col_id, dtype, self.source.table.schema_by_id[source_id].is_nullable) + for col_id, dtype, source_id in self.scan_list.items + ) @property def relation_ops_created(self) -> int: @@ -1051,9 +1102,7 @@ def non_local(self) -> bool: @property def fields(self) -> Iterable[Field]: - return itertools.chain( - self.child.fields, [Field(self.col_id, bigframes.dtypes.INT_DTYPE)] - ) + return itertools.chain(self.child.fields, self.added_fields) @property def relation_ops_created(self) -> int: @@ -1077,7 +1126,7 @@ def referenced_ids(self) -> COLUMN_SET: @property def added_fields(self) -> Tuple[Field, ...]: - return (Field(self.col_id, bigframes.dtypes.INT_DTYPE),) + return (Field(self.col_id, bigframes.dtypes.INT_DTYPE, nullable=False),) @property def additive_base(self) -> BigFrameNode: @@ -1099,6 +1148,7 @@ def remap_refs( @dataclasses.dataclass(frozen=True, eq=False) class FilterNode(UnaryNode): + # TODO: Infer null constraints from predicate predicate: ex.Expression @property @@ -1268,8 +1318,13 @@ def _validate(self): @functools.cached_property def fields(self) -> Iterable[Field]: + input_fields_by_id = {field.id: field for field in self.child.fields} return tuple( - Field(output, self.child.get_type(ref.id)) + Field( + output, + input_fields_by_id[ref.id].dtype, + input_fields_by_id[ref.id].nullable, + ) for ref, output in self.input_output_pairs ) @@ -1336,10 +1391,22 @@ def _validate(self): @functools.cached_property def added_fields(self) -> Tuple[Field, ...]: input_types = self.child._dtype_lookup - return tuple( - Field(id, bigframes.dtypes.dtype_for_etype(ex.output_type(input_types))) - for ex, id in self.assignments - ) + + fields = [] + for expr, id in self.assignments: + field = Field( + id, + bigframes.dtypes.dtype_for_etype(expr.output_type(input_types)), + nullable=expr.nullable, + ) + # Special case until we get better nullability inference in expression objects themselves + if expr.is_identity and not any( + self.child.field_by_id[id].nullable for id in expr.column_references + ): + field = field.with_nonnull() + fields.append(field) + + return tuple(fields) @property def fields(self) -> Iterable[Field]: @@ -1414,7 +1481,7 @@ def non_local(self) -> bool: @property def fields(self) -> Iterable[Field]: - return (Field(self.col_id, bigframes.dtypes.INT_DTYPE),) + return (Field(self.col_id, bigframes.dtypes.INT_DTYPE, nullable=False),) @property def variables_introduced(self) -> int: @@ -1466,19 +1533,22 @@ def non_local(self) -> bool: @functools.cached_property def fields(self) -> Iterable[Field]: - by_items = ( - Field(ref.id, self.child.get_type(ref.id)) for ref in self.by_column_ids - ) + # TODO: Use child nullability to infer grouping key nullability + by_fields = (self.child.field_by_id[ref.id] for ref in self.by_column_ids) + if self.dropna: + by_fields = (field.with_nonnull() for field in by_fields) + # TODO: Label aggregate ops to determine which are guaranteed non-null agg_items = ( Field( id, bigframes.dtypes.dtype_for_etype( agg.output_type(self.child._dtype_lookup) ), + nullable=True, ) for agg, id in self.aggregations ) - return tuple(itertools.chain(by_items, agg_items)) + return tuple(itertools.chain(by_fields, agg_items)) @property def variables_introduced(self) -> int: @@ -1583,6 +1653,7 @@ def row_count(self) -> Optional[int]: @functools.cached_property def added_field(self) -> Field: input_types = self.child._dtype_lookup + # TODO: Determine if output could be non-null return Field( self.output_name, bigframes.dtypes.dtype_for_etype(self.expression.output_type(input_types)), @@ -1700,6 +1771,7 @@ def fields(self) -> Iterable[Field]: bigframes.dtypes.arrow_dtype_to_bigframes_dtype( self.child.get_type(field.id).pyarrow_dtype.value_type # type: ignore ), + nullable=True, ) if field.id in set(map(lambda x: x.id, self.column_ids)) else field @@ -1707,7 +1779,8 @@ def fields(self) -> Iterable[Field]: ) if self.offsets_col is not None: return itertools.chain( - fields, (Field(self.offsets_col, bigframes.dtypes.INT_DTYPE),) + fields, + (Field(self.offsets_col, bigframes.dtypes.INT_DTYPE, nullable=False),), ) return fields From c57327e0ec59da398e81b8399dcd6ff14cdbbebb Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 11 Feb 2025 00:35:25 +0000 Subject: [PATCH 02/11] fix test failures --- bigframes/core/nodes.py | 2 +- tests/system/small/core/test_indexers.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 19e340274d..2fa77a217d 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -510,7 +510,7 @@ def additive_base(self) -> BigFrameNode: def joins_nulls(self) -> bool: left_nullable = self.left_child.field_by_id[self.left_col.id].nullable right_nullable = self.right_child.field_by_id[self.right_col.id].nullable - return left_nullable and right_nullable + return left_nullable or right_nullable def replace_additive_base(self, node: BigFrameNode): return dataclasses.replace(self, left_child=node) diff --git a/tests/system/small/core/test_indexers.py b/tests/system/small/core/test_indexers.py index 2c670f790d..9fa03864a1 100644 --- a/tests/system/small/core/test_indexers.py +++ b/tests/system/small/core/test_indexers.py @@ -73,7 +73,6 @@ def test_non_string_indexed_struct_series_with_string_key_should_warn(session): "series", [ "string_indexed_struct_series", - "number_series", "string_indexed_number_series", ], ) From 759d4d65d4c5e67ea94211ceae28a017e1030d95 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 00:41:20 +0000 Subject: [PATCH 03/11] fix dupe code and tuple annotations --- bigframes/core/compile/compiled.py | 20 +++++++------------- bigframes/core/nodes.py | 6 +++--- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index ad6e846692..54ba63fb81 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -16,7 +16,7 @@ import functools import itertools import typing -from typing import Literal, Optional, Sequence, Tuple +from typing import Literal, Optional, Sequence import bigframes_vendored.ibis import bigframes_vendored.ibis.backends.bigquery.backend as ibis_bigquery @@ -94,7 +94,7 @@ def to_sql( return typing.cast(str, sql) @property - def columns(self) -> typing.Tuple[ibis_types.Value, ...]: + def columns(self) -> tuple[ibis_types.Value, ...]: return self._columns @property @@ -107,7 +107,7 @@ def _ibis_bindings(self) -> dict[str, ibis_types.Value]: def projection( self, - expression_id_pairs: typing.Tuple[typing.Tuple[ex.Expression, str], ...], + expression_id_pairs: tuple[tuple[ex.Expression, str], ...], ) -> UnorderedIR: """Apply an expression to the ArrayValue and assign the output to a column.""" cannot_inline = any(expr.expensive for expr, _ in expression_id_pairs) @@ -126,7 +126,7 @@ def projection( def selection( self, - input_output_pairs: typing.Tuple[typing.Tuple[ex.DerefOp, str], ...], + input_output_pairs: tuple[tuple[ex.DerefOp, str], ...], ) -> UnorderedIR: """Apply an expression to the ArrayValue and assign the output to a column.""" bindings = {col: self._get_ibis_column(col) for col in self.column_ids} @@ -203,7 +203,7 @@ def filter(self, predicate: ex.Expression) -> UnorderedIR: def aggregate( self, - aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], + aggregations: typing.Sequence[tuple[ex.Aggregation, str]], by_column_ids: typing.Sequence[ex.DerefOp] = (), dropna: bool = True, order_by: typing.Sequence[OrderingExpression] = (), @@ -326,7 +326,7 @@ def from_pandas( def join( self: UnorderedIR, right: UnorderedIR, - conditions: Tuple[Tuple[str, str], ...], + conditions: tuple[tuple[str, str], ...], type: Literal["inner", "outer", "left", "right", "cross"], *, join_nulls: bool = True, @@ -357,12 +357,6 @@ def join( for left_index, right_index in conditions ] - combined_table = bigframes_vendored.ibis.join( - left_table, - right_table, - predicates=join_conditions, - how=type, # type: ignore - ) combined_table = bigframes_vendored.ibis.join( left_table, right_table, @@ -381,7 +375,7 @@ def isin_join( self: UnorderedIR, right: UnorderedIR, indicator_col: str, - conditions: Tuple[str, str], + conditions: tuple[str, str], *, join_nulls: bool = True, ) -> UnorderedIR: diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 46ca288b54..42e3216d84 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -59,7 +59,7 @@ def with_nullable(self) -> Field: def with_nonnull(self) -> Field: return Field(self.id, self.dtype, nullable=False) - def with_id(self, id: bfet_ids.ColumnId) -> Field: + def with_id(self, id: identifiers.ColumnId) -> Field: return Field(id, self.dtype, nullable=self.nullable) @@ -290,11 +290,11 @@ def get_type(self, id: identifiers.ColumnId) -> bigframes.dtypes.Dtype: # TODO: Deprecate in favor of field_by_id, and eventually, by rich references @functools.cached_property - def _dtype_lookup(self) -> dict[bfet_ids.ColumnId, bigframes.dtypes.Dtype]: + def _dtype_lookup(self) -> dict[identifiers.ColumnId, bigframes.dtypes.Dtype]: return {field.id: field.dtype for field in self.fields} @functools.cached_property - def field_by_id(self) -> Mapping[bfet_ids.ColumnId, Field]: + def field_by_id(self) -> Mapping[identifiers.ColumnId, Field]: return {field.id: field for field in self.fields} From dd3749a37ef286fb136766866fe9c6815fe7926a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 02:30:21 +0000 Subject: [PATCH 04/11] optimize nullsafe joins --- bigframes/core/blocks.py | 1 - bigframes/core/compile/compiled.py | 52 ++++++++++++++++++------------ scripts/tpch_result_verify.py | 2 +- 3 files changed, 33 insertions(+), 22 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 8d3732f3fe..10970b24e8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2049,7 +2049,6 @@ def concat( def isin(self, other: Block): # TODO: Support multiple other columns and match on label - # TODO: Model as explicit "IN" subquery/join to better allow db to optimize assert len(other.value_columns) == 1 unique_other_values = other.expr.select_columns( [other.value_columns[0]] diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 54ba63fb81..771f2a2842 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -351,11 +351,17 @@ def join( left_table = self._to_ibis_expr().select(self.column_ids) right_table = right._to_ibis_expr().select(right.column_ids) - keyfunc = value_to_join_key if join_nulls else _as_identity join_conditions = [ - keyfunc(left_table[left_index]) == keyfunc(right_table[right_index]) + _join_condition( + left_table[left_index], right_table[right_index], nullsafe=join_nulls + ) for left_index, right_index in conditions ] + if join_nulls: + join_conditions.extend( + (left_table[left_index]).isnull() & (right_table[right_index]).isnull() + for left_index, right_index in conditions + ) combined_table = bigframes_vendored.ibis.join( left_table, @@ -390,12 +396,19 @@ def isin_join( """ left_table = self._to_ibis_expr() right_table = right._to_ibis_expr() - keyfunc = value_to_join_key if join_nulls else _as_identity - new_column = ( - keyfunc(left_table[conditions[0]]) - .isin(keyfunc(right_table[conditions[1]])) - .name(indicator_col) - ) + if join_nulls: # nullsafe isin join must actually use "exists" subquery + cond1 = (left_table[conditions[0]]) == (right_table[conditions[1]]) + cond2 = ( + left_table[conditions[0]].isnull() & right_table[conditions[1]].isnull() + ) + new_column = (cond1 | cond2).any().name(indicator_col) + + else: # Can do simpler "in" subquery + new_column = ( + (left_table[conditions[0]]) + .isin((right_table[conditions[1]])) + .name(indicator_col) + ) columns = tuple( itertools.chain( @@ -598,20 +611,19 @@ def _convert_ordering_to_table_values( return ordering_values +def _join_condition( + lvalue: ibis_types.Value, rvalue: ibis_types.Value, nullsafe: bool +) -> ibis_types.BooleanValue: + if nullsafe: + # BigQuery recognizes this pattern and optimizes it well + return (_as_identity(lvalue) == _as_identity(rvalue)) | ( + lvalue.isnull() & rvalue.isnull() + ) + return _as_identity(lvalue) == _as_identity(rvalue) + + def _as_identity(value: ibis_types.Value): # Some types need to be converted to string to enable groupby if value.type().is_float64() or value.type().is_geospatial(): return value.cast(ibis_dtypes.str) return value - - -def value_to_join_key(value: ibis_types.Value): - """Converts nullable values to non-null string SQL will not match null keys together - but pandas does.""" - if not value.type().is_string(): - value = value.cast(ibis_dtypes.str) - - return ( - value.fill_null(ibis_types.literal("$NULL_SENTINEL$")) - if hasattr(value, "fill_null") - else value.fillna(ibis_types.literal("$NULL_SENTINEL$")) - ) diff --git a/scripts/tpch_result_verify.py b/scripts/tpch_result_verify.py index 0c932f6eac..76f38e4fb0 100644 --- a/scripts/tpch_result_verify.py +++ b/scripts/tpch_result_verify.py @@ -23,7 +23,7 @@ import bigframes project_id = "bigframes-dev-perf" -dataset_id = "tpch_0001g" +dataset_id = "tpch_0100g" dataset = { "line_item_ds": f"bigframes-dev-perf.{dataset_id}.LINEITEM", "region_ds": f"bigframes-dev-perf.{dataset_id}.REGION", From be2f7c0e6d172117e9aa747294e6b70a37c31bb5 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 02:31:23 +0000 Subject: [PATCH 05/11] revert verify script change --- scripts/tpch_result_verify.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/tpch_result_verify.py b/scripts/tpch_result_verify.py index 76f38e4fb0..0c932f6eac 100644 --- a/scripts/tpch_result_verify.py +++ b/scripts/tpch_result_verify.py @@ -23,7 +23,7 @@ import bigframes project_id = "bigframes-dev-perf" -dataset_id = "tpch_0100g" +dataset_id = "tpch_0001g" dataset = { "line_item_ds": f"bigframes-dev-perf.{dataset_id}.LINEITEM", "region_ds": f"bigframes-dev-perf.{dataset_id}.REGION", From 79cdc09df41f0bbd02f91618ac98e419bad69d87 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 06:47:48 +0000 Subject: [PATCH 06/11] partial nullsafe join fix --- bigframes/core/compile/compiled.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 771f2a2842..3cb69727df 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -357,11 +357,6 @@ def join( ) for left_index, right_index in conditions ] - if join_nulls: - join_conditions.extend( - (left_table[left_index]).isnull() & (right_table[right_index]).isnull() - for left_index, right_index in conditions - ) combined_table = bigframes_vendored.ibis.join( left_table, @@ -616,10 +611,8 @@ def _join_condition( ) -> ibis_types.BooleanValue: if nullsafe: # BigQuery recognizes this pattern and optimizes it well - return (_as_identity(lvalue) == _as_identity(rvalue)) | ( - lvalue.isnull() & rvalue.isnull() - ) - return _as_identity(lvalue) == _as_identity(rvalue) + return ((lvalue) == (rvalue)) | (lvalue.isnull() & rvalue.isnull()) + return (lvalue) == (rvalue) def _as_identity(value: ibis_types.Value): From 0517b59df742df998c4a7c1726ba5c57267b6700 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 07:44:39 +0000 Subject: [PATCH 07/11] more robust nullsafe join condition --- bigframes/core/compile/compiled.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 3cb69727df..209db77efe 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -521,7 +521,7 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec): group_by: typing.List[ibis_types.Value] = ( [ typing.cast( - ibis_types.Column, _as_identity(self._compile_expression(column)) + ibis_types.Column, _as_groupable(self._compile_expression(column)) ) for column in window_spec.grouping_keys ] @@ -610,12 +610,26 @@ def _join_condition( lvalue: ibis_types.Value, rvalue: ibis_types.Value, nullsafe: bool ) -> ibis_types.BooleanValue: if nullsafe: - # BigQuery recognizes this pattern and optimizes it well - return ((lvalue) == (rvalue)) | (lvalue.isnull() & rvalue.isnull()) - return (lvalue) == (rvalue) + if lvalue.type().is_numeric(): + return ( + lvalue.fill_null(ibis_types.literal(0)) + == rvalue.fill_null(ibis_types.literal(1)) + ) & ( + lvalue.fill_null(ibis_types.literal(0)) + == rvalue.fill_null(ibis_types.literal(1)) + ) + else: + return ( + lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("")) + == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("")) + ) & ( + lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("")) + == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("")) + ) + return lvalue == rvalue -def _as_identity(value: ibis_types.Value): +def _as_groupable(value: ibis_types.Value): # Some types need to be converted to string to enable groupby if value.type().is_float64() or value.type().is_geospatial(): return value.cast(ibis_dtypes.str) From 61cdf2ba027ddea8ac56b6880b7eba6070f61bd2 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 08:13:48 +0000 Subject: [PATCH 08/11] fix float join conditions --- bigframes/core/compile/compiled.py | 61 ++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 209db77efe..b7ee728d9c 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -392,11 +392,17 @@ def isin_join( left_table = self._to_ibis_expr() right_table = right._to_ibis_expr() if join_nulls: # nullsafe isin join must actually use "exists" subquery - cond1 = (left_table[conditions[0]]) == (right_table[conditions[1]]) - cond2 = ( - left_table[conditions[0]].isnull() & right_table[conditions[1]].isnull() + new_column = ( + ( + _join_condition( + left_table[conditions[0]], + right_table[conditions[1]], + nullsafe=True, + ) + ) + .any() + .name(indicator_col) ) - new_column = (cond1 | cond2).any().name(indicator_col) else: # Can do simpler "in" subquery new_column = ( @@ -606,26 +612,43 @@ def _convert_ordering_to_table_values( return ordering_values +def _string_cast_join_cond( + lvalue: ibis_types.Value, rvalue: ibis_types.Value +) -> ibis_types.Value: + return ( + lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0")) + == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0")) + ) & ( + lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1")) + == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1")) + ) + + +def _numeric_join_cond( + lvalue: ibis_types.Value, rvalue: ibis_types.Value +) -> ibis_types.Value: + return ( + lvalue.fill_null(ibis_types.literal(0)) + == rvalue.fill_null(ibis_types.literal(1)) + ) & ( + lvalue.fill_null(ibis_types.literal(0)) + == rvalue.fill_null(ibis_types.literal(1)) + ) + + def _join_condition( lvalue: ibis_types.Value, rvalue: ibis_types.Value, nullsafe: bool ) -> ibis_types.BooleanValue: + if lvalue.type().is_floating() or rvalue.type().is_floating(): + # Could try to keep in float domain, but need to handle both NaN and Null separately + return _string_cast_join_cond(lvalue, rvalue) + if nullsafe: - if lvalue.type().is_numeric(): - return ( - lvalue.fill_null(ibis_types.literal(0)) - == rvalue.fill_null(ibis_types.literal(1)) - ) & ( - lvalue.fill_null(ibis_types.literal(0)) - == rvalue.fill_null(ibis_types.literal(1)) - ) + # TODO: Define more coalesce constants for non-numeric types + if (lvalue.type().is_numeric()) and (lvalue.type().is_numeric()): + return _numeric_join_cond(lvalue, rvalue) else: - return ( - lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("")) - == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("")) - ) & ( - lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("")) - == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("")) - ) + return _string_cast_join_cond(lvalue, rvalue) return lvalue == rvalue From 3b59d59a34ca2f13205428eb62535243a63de1bc Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 17:41:55 +0000 Subject: [PATCH 09/11] handle float nullsafe join without cast --- bigframes/core/compile/compiled.py | 47 +++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index b7ee728d9c..05c90409d6 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -626,25 +626,44 @@ def _string_cast_join_cond( def _numeric_join_cond( lvalue: ibis_types.Value, rvalue: ibis_types.Value -) -> ibis_types.Value: - return ( - lvalue.fill_null(ibis_types.literal(0)) - == rvalue.fill_null(ibis_types.literal(1)) - ) & ( - lvalue.fill_null(ibis_types.literal(0)) - == rvalue.fill_null(ibis_types.literal(1)) - ) +) -> ibis_types.BooleanValue: + lvalue1 = lvalue.fill_null(ibis_types.literal(0)) + lvalue2 = lvalue.fill_null(ibis_types.literal(1)) + rvalue1 = rvalue.fill_null(ibis_types.literal(0)) + rvalue2 = rvalue.fill_null(ibis_types.literal(1)) + if lvalue.type().is_floating() and rvalue.type().is_floating(): + # NaN aren't equal so need to coalesce as well with diff constants + lvalue1 = ( + typing.cast(ibis_dtypes.FloatingValue, lvalue) + .isnan() + .ifelse(ibis_types.literal(2), lvalue1) + ) + lvalue2 = ( + typing.cast(ibis_dtypes.FloatingValue, lvalue) + .isnan() + .ifelse(ibis_types.literal(3), lvalue2) + ) + rvalue1 = ( + typing.cast(ibis_dtypes.FloatingValue, rvalue) + .isnan() + .ifelse(ibis_types.literal(2), rvalue1) + ) + rvalue2 = ( + typing.cast(ibis_dtypes.FloatingValue, rvalue) + .isnan() + .ifelse(ibis_types.literal(3), rvalue2) + ) + return (lvalue1 == rvalue1) & (lvalue2 == rvalue2) def _join_condition( lvalue: ibis_types.Value, rvalue: ibis_types.Value, nullsafe: bool -) -> ibis_types.BooleanValue: - if lvalue.type().is_floating() or rvalue.type().is_floating(): - # Could try to keep in float domain, but need to handle both NaN and Null separately - return _string_cast_join_cond(lvalue, rvalue) - +) -> ibis_types.Value: + if (lvalue.type().is_floating()) and (lvalue.type().is_floating()): + # Need to always make safe join condition to handle nan, even if no nulls + return _numeric_join_cond(lvalue, rvalue) if nullsafe: - # TODO: Define more coalesce constants for non-numeric types + # TODO: Define more coalesce constants for non-numeric types to avoid cast if (lvalue.type().is_numeric()) and (lvalue.type().is_numeric()): return _numeric_join_cond(lvalue, rvalue) else: From 9f00e02dfac2a145d4160194efa4a1c73520a5a8 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 18:35:48 +0000 Subject: [PATCH 10/11] fix typing issues --- bigframes/core/compile/compiled.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 05c90409d6..6a2b583b28 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -613,20 +613,21 @@ def _convert_ordering_to_table_values( def _string_cast_join_cond( - lvalue: ibis_types.Value, rvalue: ibis_types.Value -) -> ibis_types.Value: - return ( + lvalue: ibis_types.Column, rvalue: ibis_types.Column +) -> ibis_types.BooleanColumn: + result = ( lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0")) == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0")) ) & ( lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1")) == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1")) ) + return typing.cast(ibis_types.BooleanColumn, result) def _numeric_join_cond( - lvalue: ibis_types.Value, rvalue: ibis_types.Value -) -> ibis_types.BooleanValue: + lvalue: ibis_types.Column, rvalue: ibis_types.Column +) -> ibis_types.BooleanColumn: lvalue1 = lvalue.fill_null(ibis_types.literal(0)) lvalue2 = lvalue.fill_null(ibis_types.literal(1)) rvalue1 = rvalue.fill_null(ibis_types.literal(0)) @@ -634,31 +635,32 @@ def _numeric_join_cond( if lvalue.type().is_floating() and rvalue.type().is_floating(): # NaN aren't equal so need to coalesce as well with diff constants lvalue1 = ( - typing.cast(ibis_dtypes.FloatingValue, lvalue) + typing.cast(ibis_types.FloatingColumn, lvalue) .isnan() .ifelse(ibis_types.literal(2), lvalue1) ) lvalue2 = ( - typing.cast(ibis_dtypes.FloatingValue, lvalue) + typing.cast(ibis_types.FloatingColumn, lvalue) .isnan() .ifelse(ibis_types.literal(3), lvalue2) ) rvalue1 = ( - typing.cast(ibis_dtypes.FloatingValue, rvalue) + typing.cast(ibis_types.FloatingColumn, rvalue) .isnan() .ifelse(ibis_types.literal(2), rvalue1) ) rvalue2 = ( - typing.cast(ibis_dtypes.FloatingValue, rvalue) + typing.cast(ibis_types.FloatingColumn, rvalue) .isnan() .ifelse(ibis_types.literal(3), rvalue2) ) - return (lvalue1 == rvalue1) & (lvalue2 == rvalue2) + result = (lvalue1 == rvalue1) & (lvalue2 == rvalue2) + return typing.cast(ibis_types.BooleanColumn, result) def _join_condition( - lvalue: ibis_types.Value, rvalue: ibis_types.Value, nullsafe: bool -) -> ibis_types.Value: + lvalue: ibis_types.Column, rvalue: ibis_types.Column, nullsafe: bool +) -> ibis_types.BooleanColumn: if (lvalue.type().is_floating()) and (lvalue.type().is_floating()): # Need to always make safe join condition to handle nan, even if no nulls return _numeric_join_cond(lvalue, rvalue) @@ -668,7 +670,7 @@ def _join_condition( return _numeric_join_cond(lvalue, rvalue) else: return _string_cast_join_cond(lvalue, rvalue) - return lvalue == rvalue + return typing.cast(ibis_types.BooleanColumn, lvalue == rvalue) def _as_groupable(value: ibis_types.Value): From 87f441639cfe3521326b662bdc650758b6d09ea1 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Feb 2025 20:28:58 +0000 Subject: [PATCH 11/11] remove bad test --- tests/system/small/core/test_indexers.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/system/small/core/test_indexers.py b/tests/system/small/core/test_indexers.py index 9fa03864a1..20f1c56185 100644 --- a/tests/system/small/core/test_indexers.py +++ b/tests/system/small/core/test_indexers.py @@ -54,21 +54,6 @@ def string_indexed_number_series(session): ) -def test_non_string_indexed_struct_series_with_string_key_should_warn(session): - s = bpd.Series( - [ - {"project": "pandas", "version": 1}, - ], - dtype=bpd.ArrowDtype( - pa.struct([("project", pa.string()), ("version", pa.int64())]) - ), - session=session, - ) - - with pytest.warns(bigframes.exceptions.BadIndexerKeyWarning): - s["a"] - - @pytest.mark.parametrize( "series", [