diff --git a/bigframes/core/bigframe_node.py b/bigframes/core/bigframe_node.py index 5012e99c9a..32c7f92912 100644 --- a/bigframes/core/bigframe_node.py +++ b/bigframes/core/bigframe_node.py @@ -37,6 +37,17 @@ class Field: id: identifiers.ColumnId dtype: bigframes.dtypes.Dtype + # Best effort, nullable=True if not certain + nullable: bool = True + + def with_nullable(self) -> Field: + return Field(self.id, self.dtype, nullable=True) + + def with_nonnull(self) -> Field: + return Field(self.id, self.dtype, nullable=False) + + def with_id(self, id: identifiers.ColumnId) -> Field: + return Field(id, self.dtype, nullable=self.nullable) @dataclasses.dataclass(eq=False, frozen=True) @@ -274,10 +285,15 @@ def defined_variables(self) -> set[str]: def get_type(self, id: identifiers.ColumnId) -> bigframes.dtypes.Dtype: return self._dtype_lookup[id] + # TODO: Deprecate in favor of field_by_id, and eventually, by rich references @functools.cached_property - def _dtype_lookup(self): + def _dtype_lookup(self) -> dict[identifiers.ColumnId, bigframes.dtypes.Dtype]: return {field.id: field.dtype for field in self.fields} + @functools.cached_property + def field_by_id(self) -> Mapping[identifiers.ColumnId, Field]: + return {field.id: field for field in self.fields} + # Plan algorithms def unique_nodes( self: BigFrameNode, diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 8d3732f3fe..10970b24e8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2049,7 +2049,6 @@ def concat( def isin(self, other: Block): # TODO: Support multiple other columns and match on label - # TODO: Model as explicit "IN" subquery/join to better allow db to optimize assert len(other.value_columns) == 1 unique_other_values = other.expr.select_columns( [other.value_columns[0]] diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 93be998b5b..6a2b583b28 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -16,7 +16,7 @@ import functools import itertools import typing -from typing import Optional, Sequence +from typing import Literal, Optional, Sequence import bigframes_vendored.ibis import bigframes_vendored.ibis.backends.bigquery.backend as ibis_bigquery @@ -94,7 +94,7 @@ def to_sql( return typing.cast(str, sql) @property - def columns(self) -> typing.Tuple[ibis_types.Value, ...]: + def columns(self) -> tuple[ibis_types.Value, ...]: return self._columns @property @@ -107,7 +107,7 @@ def _ibis_bindings(self) -> dict[str, ibis_types.Value]: def projection( self, - expression_id_pairs: typing.Tuple[typing.Tuple[ex.Expression, str], ...], + expression_id_pairs: tuple[tuple[ex.Expression, str], ...], ) -> UnorderedIR: """Apply an expression to the ArrayValue and assign the output to a column.""" cannot_inline = any(expr.expensive for expr, _ in expression_id_pairs) @@ -126,7 +126,7 @@ def projection( def selection( self, - input_output_pairs: typing.Tuple[typing.Tuple[ex.DerefOp, str], ...], + input_output_pairs: tuple[tuple[ex.DerefOp, str], ...], ) -> UnorderedIR: """Apply an expression to the ArrayValue and assign the output to a column.""" bindings = {col: self._get_ibis_column(col) for col in self.column_ids} @@ -203,7 +203,7 @@ def filter(self, predicate: ex.Expression) -> UnorderedIR: def aggregate( self, - aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], + aggregations: typing.Sequence[tuple[ex.Aggregation, str]], by_column_ids: typing.Sequence[ex.DerefOp] = (), dropna: bool = True, order_by: typing.Sequence[OrderingExpression] = (), @@ -323,7 +323,105 @@ def from_pandas( columns=columns, ) - ## Methods that only work with ordering + def join( + self: UnorderedIR, + right: UnorderedIR, + conditions: tuple[tuple[str, str], ...], + type: Literal["inner", "outer", "left", "right", "cross"], + *, + join_nulls: bool = True, + ) -> UnorderedIR: + """Join two expressions by column equality. + + Arguments: + left: Expression for left table to join. + left_column_ids: Column IDs (not label) to join by. + right: Expression for right table to join. + right_column_ids: Column IDs (not label) to join by. + how: The type of join to perform. + join_nulls (bool): + If True, will joins NULL keys to each other. + Returns: + The joined expression. The resulting columns will be, in order, + first the coalesced join keys, then, all the left columns, and + finally, all the right columns. + """ + # Shouldn't need to select the column ids explicitly, but it seems that ibis has some + # bug resolving column ids otherwise, potentially because of the "JoinChain" op + left_table = self._to_ibis_expr().select(self.column_ids) + right_table = right._to_ibis_expr().select(right.column_ids) + + join_conditions = [ + _join_condition( + left_table[left_index], right_table[right_index], nullsafe=join_nulls + ) + for left_index, right_index in conditions + ] + + combined_table = bigframes_vendored.ibis.join( + left_table, + right_table, + predicates=join_conditions, + how=type, # type: ignore + ) + columns = [combined_table[col.get_name()] for col in self.columns] + [ + combined_table[col.get_name()] for col in right.columns + ] + return UnorderedIR( + combined_table, + columns=columns, + ) + + def isin_join( + self: UnorderedIR, + right: UnorderedIR, + indicator_col: str, + conditions: tuple[str, str], + *, + join_nulls: bool = True, + ) -> UnorderedIR: + """Join two expressions by column equality. + + Arguments: + left: Expression for left table to join. + right: Expression for right table to join. + conditions: Id pairs to compare + Returns: + The joined expression. + """ + left_table = self._to_ibis_expr() + right_table = right._to_ibis_expr() + if join_nulls: # nullsafe isin join must actually use "exists" subquery + new_column = ( + ( + _join_condition( + left_table[conditions[0]], + right_table[conditions[1]], + nullsafe=True, + ) + ) + .any() + .name(indicator_col) + ) + + else: # Can do simpler "in" subquery + new_column = ( + (left_table[conditions[0]]) + .isin((right_table[conditions[1]])) + .name(indicator_col) + ) + + columns = tuple( + itertools.chain( + (left_table[col.get_name()] for col in self.columns), (new_column,) + ) + ) + + return UnorderedIR( + left_table, + columns=columns, + ) + def project_window_op( self, expression: ex.Aggregation, @@ -429,7 +527,7 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec): group_by: typing.List[ibis_types.Value] = ( [ typing.cast( - ibis_types.Column, _as_identity(self._compile_expression(column)) + ibis_types.Column, _as_groupable(self._compile_expression(column)) ) for column in window_spec.grouping_keys ] @@ -514,7 +612,68 @@ def _convert_ordering_to_table_values( return ordering_values -def _as_identity(value: ibis_types.Value): +def _string_cast_join_cond( + lvalue: ibis_types.Column, rvalue: ibis_types.Column +) -> ibis_types.BooleanColumn: + result = ( + lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0")) + == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0")) + ) & ( + lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1")) + == rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1")) + ) + return typing.cast(ibis_types.BooleanColumn, result) + + +def _numeric_join_cond( + lvalue: ibis_types.Column, rvalue: ibis_types.Column +) -> ibis_types.BooleanColumn: + lvalue1 = lvalue.fill_null(ibis_types.literal(0)) + lvalue2 = lvalue.fill_null(ibis_types.literal(1)) + rvalue1 = rvalue.fill_null(ibis_types.literal(0)) + rvalue2 = rvalue.fill_null(ibis_types.literal(1)) + if lvalue.type().is_floating() and rvalue.type().is_floating(): + # NaN aren't equal so need to coalesce as well with diff constants + lvalue1 = ( + typing.cast(ibis_types.FloatingColumn, lvalue) + .isnan() + .ifelse(ibis_types.literal(2), lvalue1) + ) + lvalue2 = ( + typing.cast(ibis_types.FloatingColumn, lvalue) + .isnan() + .ifelse(ibis_types.literal(3), lvalue2) + ) + rvalue1 = ( + typing.cast(ibis_types.FloatingColumn, rvalue) + .isnan() + .ifelse(ibis_types.literal(2), rvalue1) + ) + rvalue2 = ( + typing.cast(ibis_types.FloatingColumn, rvalue) + .isnan() + .ifelse(ibis_types.literal(3), rvalue2) + ) + result = (lvalue1 == rvalue1) & (lvalue2 == rvalue2) + return typing.cast(ibis_types.BooleanColumn, result) + + +def _join_condition( + lvalue: ibis_types.Column, rvalue: ibis_types.Column, nullsafe: bool +) -> ibis_types.BooleanColumn: + if (lvalue.type().is_floating()) and (lvalue.type().is_floating()): + # Need to always make safe join condition to handle nan, even if no nulls + return _numeric_join_cond(lvalue, rvalue) + if nullsafe: + # TODO: Define more coalesce constants for non-numeric types to avoid cast + if (lvalue.type().is_numeric()) and (lvalue.type().is_numeric()): + return _numeric_join_cond(lvalue, rvalue) + else: + return _string_cast_join_cond(lvalue, rvalue) + return typing.cast(ibis_types.BooleanColumn, lvalue == rvalue) + + +def _as_groupable(value: ibis_types.Value): # Some types need to be converted to string to enable groupby if value.type().is_float64() or value.type().is_geospatial(): return value.cast(ibis_dtypes.str) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 64a0ae265f..0b508b01ab 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -29,11 +29,9 @@ import bigframes.core.compile.concat as concat_impl import bigframes.core.compile.explode import bigframes.core.compile.ibis_types -import bigframes.core.compile.isin import bigframes.core.compile.scalar_op_compiler import bigframes.core.compile.scalar_op_compiler as compile_scalar import bigframes.core.compile.schema_translator -import bigframes.core.compile.single_column import bigframes.core.expression as ex import bigframes.core.identifiers as ids import bigframes.core.nodes as nodes @@ -130,24 +128,25 @@ def compile_join(self, node: nodes.JoinNode): condition_pairs = tuple( (left.id.sql, right.id.sql) for left, right in node.conditions ) + left_unordered = self.compile_node(node.left_child) right_unordered = self.compile_node(node.right_child) - return bigframes.core.compile.single_column.join_by_column_unordered( - left=left_unordered, + return left_unordered.join( right=right_unordered, type=node.type, conditions=condition_pairs, + join_nulls=node.joins_nulls, ) @_compile_node.register def compile_isin(self, node: nodes.InNode): left_unordered = self.compile_node(node.left_child) right_unordered = self.compile_node(node.right_child) - return bigframes.core.compile.isin.isin_unordered( - left=left_unordered, + return left_unordered.isin_join( right=right_unordered, indicator_col=node.indicator_col.sql, conditions=(node.left_col.id.sql, node.right_col.id.sql), + join_nulls=node.joins_nulls, ) @_compile_node.register diff --git a/bigframes/core/compile/isin.py b/bigframes/core/compile/isin.py deleted file mode 100644 index 29acf9e284..0000000000 --- a/bigframes/core/compile/isin.py +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2024 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Helpers to join ArrayValue objects.""" - -from __future__ import annotations - -import itertools -from typing import Tuple - -import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes -import bigframes_vendored.ibis.expr.types as ibis_types - -import bigframes.core.compile.compiled as compiled - - -def isin_unordered( - left: compiled.UnorderedIR, - right: compiled.UnorderedIR, - indicator_col: str, - conditions: Tuple[str, str], -) -> compiled.UnorderedIR: - """Join two expressions by column equality. - - Arguments: - left: Expression for left table to join. - right: Expression for right table to join. - conditions: Id pairs to compare - Returns: - The joined expression. - """ - left_table = left._to_ibis_expr() - right_table = right._to_ibis_expr() - new_column = ( - value_to_join_key(left_table[conditions[0]]) - .isin(value_to_join_key(right_table[conditions[1]])) - .name(indicator_col) - ) - - columns = tuple( - itertools.chain( - (left_table[col.get_name()] for col in left.columns), (new_column,) - ) - ) - - return compiled.UnorderedIR( - left_table, - columns=columns, - ) - - -def value_to_join_key(value: ibis_types.Value): - """Converts nullable values to non-null string SQL will not match null keys together - but pandas does.""" - if not value.type().is_string(): - value = value.cast(ibis_dtypes.str) - return ( - value.fill_null(ibis_types.literal("$NULL_SENTINEL$")) - if hasattr(value, "fill_null") - else value.fillna(ibis_types.literal("$NULL_SENTINEL$")) - ) diff --git a/bigframes/core/compile/single_column.py b/bigframes/core/compile/single_column.py deleted file mode 100644 index 9216051d91..0000000000 --- a/bigframes/core/compile/single_column.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Helpers to join ArrayValue objects.""" - -from __future__ import annotations - -from typing import Literal, Tuple - -import bigframes_vendored.ibis.expr.api as ibis_api -import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes -import bigframes_vendored.ibis.expr.types as ibis_types - -import bigframes.core.compile.compiled as compiled - - -def join_by_column_unordered( - left: compiled.UnorderedIR, - right: compiled.UnorderedIR, - conditions: Tuple[Tuple[str, str], ...], - type: Literal["inner", "outer", "left", "right", "cross"], -) -> compiled.UnorderedIR: - """Join two expressions by column equality. - - Arguments: - left: Expression for left table to join. - left_column_ids: Column IDs (not label) to join by. - right: Expression for right table to join. - right_column_ids: Column IDs (not label) to join by. - how: The type of join to perform. - allow_row_identity_join (bool): - If True, allow matching by row identity. Set to False to always - perform a true JOIN in generated SQL. - Returns: - The joined expression. The resulting columns will be, in order, - first the coalesced join keys, then, all the left columns, and - finally, all the right columns. - """ - # Shouldn't need to select the column ids explicitly, but it seems that ibis has some - # bug resolving column ids otherwise, potentially because of the "JoinChain" op - left_table = left._to_ibis_expr().select(left.column_ids) - right_table = right._to_ibis_expr().select(right.column_ids) - join_conditions = [ - value_to_join_key(left_table[left_index]) - == value_to_join_key(right_table[right_index]) - for left_index, right_index in conditions - ] - - combined_table = ibis_api.join( - left_table, - right_table, - predicates=join_conditions, - how=type, # type: ignore - ) - columns = [combined_table[col.get_name()] for col in left.columns] + [ - combined_table[col.get_name()] for col in right.columns - ] - return compiled.UnorderedIR( - combined_table, - columns=columns, - ) - - -def value_to_join_key(value: ibis_types.Value): - """Converts nullable values to non-null string SQL will not match null keys together - but pandas does.""" - if not value.type().is_string(): - value = value.cast(ibis_dtypes.str) - return ( - value.fill_null(ibis_types.literal("$NULL_SENTINEL$")) - if hasattr(value, "fill_null") - else value.fillna(ibis_types.literal("$NULL_SENTINEL$")) - ) diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index 8621d5d915..afd290827d 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -165,6 +165,10 @@ def expensive(self) -> bool: isinstance(ex, OpExpression) and ex.op.expensive for ex in self.walk() ) + @property + def nullable(self) -> bool: + return True + @property @abc.abstractmethod def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: @@ -248,6 +252,10 @@ def is_const(self) -> bool: def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: return () + @property + def nullable(self) -> bool: + return pd.isna(self.value) # type: ignore + def output_type( self, input_types: dict[ids.ColumnId, bigframes.dtypes.Dtype] ) -> dtypes.ExpressionType: @@ -344,6 +352,11 @@ def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: def is_const(self) -> bool: return False + @property + def nullable(self) -> bool: + # Safe default, need to actually bind input schema to determine + return True + def output_type( self, input_types: dict[ids.ColumnId, bigframes.dtypes.Dtype] ) -> dtypes.ExpressionType: @@ -408,6 +421,14 @@ def is_const(self) -> bool: def children(self): return self.inputs + @property + def nullable(self) -> bool: + # This is very conservative, need to label null properties of individual ops to get more precise + null_free = self.is_identity and not any( + child.nullable for child in self.inputs + ) + return not null_free + def output_type( self, input_types: dict[ids.ColumnId, dtypes.ExpressionType] ) -> dtypes.ExpressionType: diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index b786032a74..e2093e57d9 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -216,7 +216,7 @@ def explicitly_ordered(self) -> bool: @property def added_fields(self) -> Tuple[Field, ...]: - return (Field(self.indicator_col, bigframes.dtypes.BOOL_DTYPE),) + return (Field(self.indicator_col, bigframes.dtypes.BOOL_DTYPE, nullable=False),) @property def fields(self) -> Iterable[Field]: @@ -250,6 +250,12 @@ def referenced_ids(self) -> COLUMN_SET: def additive_base(self) -> BigFrameNode: return self.left_child + @property + def joins_nulls(self) -> bool: + left_nullable = self.left_child.field_by_id[self.left_col.id].nullable + right_nullable = self.right_child.field_by_id[self.right_col.id].nullable + return left_nullable or right_nullable + def replace_additive_base(self, node: BigFrameNode): return dataclasses.replace(self, left_child=node) @@ -310,7 +316,23 @@ def explicitly_ordered(self) -> bool: @property def fields(self) -> Iterable[Field]: - return itertools.chain(self.left_child.fields, self.right_child.fields) + left_fields = self.left_child.fields + if self.type in ("right", "outer"): + left_fields = map(lambda x: x.with_nullable(), left_fields) + right_fields = self.right_child.fields + if self.type in ("left", "outer"): + right_fields = map(lambda x: x.with_nullable(), right_fields) + return itertools.chain(left_fields, right_fields) + + @property + def joins_nulls(self) -> bool: + for left_ref, right_ref in self.conditions: + if ( + self.left_child.field_by_id[left_ref.id].nullable + and self.right_child.field_by_id[right_ref.id].nullable + ): + return True + return False @functools.cached_property def variables_introduced(self) -> int: @@ -403,6 +425,7 @@ def explicitly_ordered(self) -> bool: @property def fields(self) -> Iterable[Field]: # TODO: Output names should probably be aligned beforehand or be part of concat definition + # TODO: Handle nullability return ( Field(id, field.dtype) for id, field in zip(self.output_ids, self.children[0].fields) @@ -476,7 +499,9 @@ def explicitly_ordered(self) -> bool: @functools.cached_property def fields(self) -> Iterable[Field]: - return (Field(self.output_id, next(iter(self.start.fields)).dtype),) + return ( + Field(self.output_id, next(iter(self.start.fields)).dtype, nullable=False), + ) @functools.cached_property def variables_introduced(self) -> int: @@ -555,6 +580,7 @@ class ScanList: @dataclasses.dataclass(frozen=True, eq=False) class ReadLocalNode(LeafNode): # TODO: Combine feather_bytes, data_schema, n_rows into a LocalDataDef struct + # TODO: Track nullability for local data feather_bytes: bytes data_schema: schemata.ArraySchema n_rows: int @@ -569,7 +595,8 @@ def fields(self) -> Iterable[Field]: fields = (Field(col_id, dtype) for col_id, dtype, _ in self.scan_list.items) if self.offsets_col is not None: return itertools.chain( - fields, (Field(self.offsets_col, bigframes.dtypes.INT_DTYPE),) + fields, + (Field(self.offsets_col, bigframes.dtypes.INT_DTYPE, nullable=False),), ) return fields @@ -655,6 +682,11 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: else tuple(table.clustering_fields), ) + @property + @functools.cache + def schema_by_id(self): + return {col.name: col for col in self.physical_schema} + @dataclasses.dataclass(frozen=True) class BigqueryDataSource: @@ -697,7 +729,10 @@ def session(self): @property def fields(self) -> Iterable[Field]: - return (Field(col_id, dtype) for col_id, dtype, _ in self.scan_list.items) + return ( + Field(col_id, dtype, self.source.table.schema_by_id[source_id].is_nullable) + for col_id, dtype, source_id in self.scan_list.items + ) @property def relation_ops_created(self) -> int: @@ -808,9 +843,7 @@ def non_local(self) -> bool: @property def fields(self) -> Iterable[Field]: - return itertools.chain( - self.child.fields, [Field(self.col_id, bigframes.dtypes.INT_DTYPE)] - ) + return itertools.chain(self.child.fields, self.added_fields) @property def relation_ops_created(self) -> int: @@ -834,7 +867,7 @@ def referenced_ids(self) -> COLUMN_SET: @property def added_fields(self) -> Tuple[Field, ...]: - return (Field(self.col_id, bigframes.dtypes.INT_DTYPE),) + return (Field(self.col_id, bigframes.dtypes.INT_DTYPE, nullable=False),) @property def additive_base(self) -> BigFrameNode: @@ -856,6 +889,7 @@ def remap_refs( @dataclasses.dataclass(frozen=True, eq=False) class FilterNode(UnaryNode): + # TODO: Infer null constraints from predicate predicate: ex.Expression @property @@ -1025,8 +1059,13 @@ def _validate(self): @functools.cached_property def fields(self) -> Iterable[Field]: + input_fields_by_id = {field.id: field for field in self.child.fields} return tuple( - Field(output, self.child.get_type(ref.id)) + Field( + output, + input_fields_by_id[ref.id].dtype, + input_fields_by_id[ref.id].nullable, + ) for ref, output in self.input_output_pairs ) @@ -1091,10 +1130,22 @@ def _validate(self): @functools.cached_property def added_fields(self) -> Tuple[Field, ...]: input_types = self.child._dtype_lookup - return tuple( - Field(id, bigframes.dtypes.dtype_for_etype(ex.output_type(input_types))) - for ex, id in self.assignments - ) + + fields = [] + for expr, id in self.assignments: + field = Field( + id, + bigframes.dtypes.dtype_for_etype(expr.output_type(input_types)), + nullable=expr.nullable, + ) + # Special case until we get better nullability inference in expression objects themselves + if expr.is_identity and not any( + self.child.field_by_id[id].nullable for id in expr.column_references + ): + field = field.with_nonnull() + fields.append(field) + + return tuple(fields) @property def fields(self) -> Iterable[Field]: @@ -1169,7 +1220,7 @@ def non_local(self) -> bool: @property def fields(self) -> Iterable[Field]: - return (Field(self.col_id, bigframes.dtypes.INT_DTYPE),) + return (Field(self.col_id, bigframes.dtypes.INT_DTYPE, nullable=False),) @property def variables_introduced(self) -> int: @@ -1219,19 +1270,22 @@ def non_local(self) -> bool: @functools.cached_property def fields(self) -> Iterable[Field]: - by_items = ( - Field(ref.id, self.child.get_type(ref.id)) for ref in self.by_column_ids - ) + # TODO: Use child nullability to infer grouping key nullability + by_fields = (self.child.field_by_id[ref.id] for ref in self.by_column_ids) + if self.dropna: + by_fields = (field.with_nonnull() for field in by_fields) + # TODO: Label aggregate ops to determine which are guaranteed non-null agg_items = ( Field( id, bigframes.dtypes.dtype_for_etype( agg.output_type(self.child._dtype_lookup) ), + nullable=True, ) for agg, id in self.aggregations ) - return tuple(itertools.chain(by_items, agg_items)) + return tuple(itertools.chain(by_fields, agg_items)) @property def variables_introduced(self) -> int: @@ -1336,6 +1390,7 @@ def row_count(self) -> Optional[int]: @functools.cached_property def added_field(self) -> Field: input_types = self.child._dtype_lookup + # TODO: Determine if output could be non-null return Field( self.output_name, bigframes.dtypes.dtype_for_etype(self.expression.output_type(input_types)), @@ -1453,6 +1508,7 @@ def fields(self) -> Iterable[Field]: bigframes.dtypes.arrow_dtype_to_bigframes_dtype( self.child.get_type(field.id).pyarrow_dtype.value_type # type: ignore ), + nullable=True, ) if field.id in set(map(lambda x: x.id, self.column_ids)) else field @@ -1460,7 +1516,8 @@ def fields(self) -> Iterable[Field]: ) if self.offsets_col is not None: return itertools.chain( - fields, (Field(self.offsets_col, bigframes.dtypes.INT_DTYPE),) + fields, + (Field(self.offsets_col, bigframes.dtypes.INT_DTYPE, nullable=False),), ) return fields diff --git a/tests/system/small/core/test_indexers.py b/tests/system/small/core/test_indexers.py index 2c670f790d..20f1c56185 100644 --- a/tests/system/small/core/test_indexers.py +++ b/tests/system/small/core/test_indexers.py @@ -54,26 +54,10 @@ def string_indexed_number_series(session): ) -def test_non_string_indexed_struct_series_with_string_key_should_warn(session): - s = bpd.Series( - [ - {"project": "pandas", "version": 1}, - ], - dtype=bpd.ArrowDtype( - pa.struct([("project", pa.string()), ("version", pa.int64())]) - ), - session=session, - ) - - with pytest.warns(bigframes.exceptions.BadIndexerKeyWarning): - s["a"] - - @pytest.mark.parametrize( "series", [ "string_indexed_struct_series", - "number_series", "string_indexed_number_series", ], )