Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bigframes/core/bigframe_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@
class Field:
id: identifiers.ColumnId
dtype: bigframes.dtypes.Dtype
# Best effort, nullable=True if not certain
nullable: bool = True

def with_nullable(self) -> Field:
return Field(self.id, self.dtype, nullable=True)

def with_nonnull(self) -> Field:
return Field(self.id, self.dtype, nullable=False)

def with_id(self, id: identifiers.ColumnId) -> Field:
return Field(id, self.dtype, nullable=self.nullable)


@dataclasses.dataclass(eq=False, frozen=True)
Expand Down Expand Up @@ -274,10 +285,15 @@ def defined_variables(self) -> set[str]:
def get_type(self, id: identifiers.ColumnId) -> bigframes.dtypes.Dtype:
return self._dtype_lookup[id]

# TODO: Deprecate in favor of field_by_id, and eventually, by rich references
@functools.cached_property
def _dtype_lookup(self):
def _dtype_lookup(self) -> dict[identifiers.ColumnId, bigframes.dtypes.Dtype]:
return {field.id: field.dtype for field in self.fields}

@functools.cached_property
def field_by_id(self) -> Mapping[identifiers.ColumnId, Field]:
return {field.id: field for field in self.fields}

# Plan algorithms
def unique_nodes(
self: BigFrameNode,
Expand Down
1 change: 0 additions & 1 deletion bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2049,7 +2049,6 @@ def concat(

def isin(self, other: Block):
# TODO: Support multiple other columns and match on label
# TODO: Model as explicit "IN" subquery/join to better allow db to optimize
assert len(other.value_columns) == 1
unique_other_values = other.expr.select_columns(
[other.value_columns[0]]
Expand Down
175 changes: 167 additions & 8 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import functools
import itertools
import typing
from typing import Optional, Sequence
from typing import Literal, Optional, Sequence

import bigframes_vendored.ibis
import bigframes_vendored.ibis.backends.bigquery.backend as ibis_bigquery
Expand Down Expand Up @@ -94,7 +94,7 @@ def to_sql(
return typing.cast(str, sql)

@property
def columns(self) -> typing.Tuple[ibis_types.Value, ...]:
def columns(self) -> tuple[ibis_types.Value, ...]:
return self._columns

@property
Expand All @@ -107,7 +107,7 @@ def _ibis_bindings(self) -> dict[str, ibis_types.Value]:

def projection(
self,
expression_id_pairs: typing.Tuple[typing.Tuple[ex.Expression, str], ...],
expression_id_pairs: tuple[tuple[ex.Expression, str], ...],
) -> UnorderedIR:
"""Apply an expression to the ArrayValue and assign the output to a column."""
cannot_inline = any(expr.expensive for expr, _ in expression_id_pairs)
Expand All @@ -126,7 +126,7 @@ def projection(

def selection(
self,
input_output_pairs: typing.Tuple[typing.Tuple[ex.DerefOp, str], ...],
input_output_pairs: tuple[tuple[ex.DerefOp, str], ...],
) -> UnorderedIR:
"""Apply an expression to the ArrayValue and assign the output to a column."""
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
Expand Down Expand Up @@ -203,7 +203,7 @@ def filter(self, predicate: ex.Expression) -> UnorderedIR:

def aggregate(
self,
aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]],
aggregations: typing.Sequence[tuple[ex.Aggregation, str]],
by_column_ids: typing.Sequence[ex.DerefOp] = (),
dropna: bool = True,
order_by: typing.Sequence[OrderingExpression] = (),
Expand Down Expand Up @@ -323,7 +323,105 @@ def from_pandas(
columns=columns,
)

## Methods that only work with ordering
def join(
self: UnorderedIR,
right: UnorderedIR,
conditions: tuple[tuple[str, str], ...],
type: Literal["inner", "outer", "left", "right", "cross"],
*,
join_nulls: bool = True,
) -> UnorderedIR:
"""Join two expressions by column equality.

Arguments:
left: Expression for left table to join.
left_column_ids: Column IDs (not label) to join by.
right: Expression for right table to join.
right_column_ids: Column IDs (not label) to join by.
how: The type of join to perform.
join_nulls (bool):
If True, will joins NULL keys to each other.
Returns:
The joined expression. The resulting columns will be, in order,
first the coalesced join keys, then, all the left columns, and
finally, all the right columns.
"""
# Shouldn't need to select the column ids explicitly, but it seems that ibis has some
# bug resolving column ids otherwise, potentially because of the "JoinChain" op
left_table = self._to_ibis_expr().select(self.column_ids)
right_table = right._to_ibis_expr().select(right.column_ids)

join_conditions = [
_join_condition(
left_table[left_index], right_table[right_index], nullsafe=join_nulls
)
for left_index, right_index in conditions
]

combined_table = bigframes_vendored.ibis.join(
left_table,
right_table,
predicates=join_conditions,
how=type, # type: ignore
)
columns = [combined_table[col.get_name()] for col in self.columns] + [
combined_table[col.get_name()] for col in right.columns
]
return UnorderedIR(
combined_table,
columns=columns,
)

def isin_join(
self: UnorderedIR,
right: UnorderedIR,
indicator_col: str,
conditions: tuple[str, str],
*,
join_nulls: bool = True,
) -> UnorderedIR:
"""Join two expressions by column equality.

Arguments:
left: Expression for left table to join.
right: Expression for right table to join.
conditions: Id pairs to compare
Returns:
The joined expression.
"""
left_table = self._to_ibis_expr()
right_table = right._to_ibis_expr()
if join_nulls: # nullsafe isin join must actually use "exists" subquery
new_column = (
(
_join_condition(
left_table[conditions[0]],
right_table[conditions[1]],
nullsafe=True,
)
)
.any()
.name(indicator_col)
)

else: # Can do simpler "in" subquery
new_column = (
(left_table[conditions[0]])
.isin((right_table[conditions[1]]))
.name(indicator_col)
)

columns = tuple(
itertools.chain(
(left_table[col.get_name()] for col in self.columns), (new_column,)
)
)

return UnorderedIR(
left_table,
columns=columns,
)

def project_window_op(
self,
expression: ex.Aggregation,
Expand Down Expand Up @@ -429,7 +527,7 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec):
group_by: typing.List[ibis_types.Value] = (
[
typing.cast(
ibis_types.Column, _as_identity(self._compile_expression(column))
ibis_types.Column, _as_groupable(self._compile_expression(column))
)
for column in window_spec.grouping_keys
]
Expand Down Expand Up @@ -514,7 +612,68 @@ def _convert_ordering_to_table_values(
return ordering_values


def _as_identity(value: ibis_types.Value):
def _string_cast_join_cond(
lvalue: ibis_types.Column, rvalue: ibis_types.Column
) -> ibis_types.BooleanColumn:
result = (
lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0"))
== rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0"))
) & (
lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1"))
== rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1"))
)
return typing.cast(ibis_types.BooleanColumn, result)


def _numeric_join_cond(
lvalue: ibis_types.Column, rvalue: ibis_types.Column
) -> ibis_types.BooleanColumn:
lvalue1 = lvalue.fill_null(ibis_types.literal(0))
lvalue2 = lvalue.fill_null(ibis_types.literal(1))
rvalue1 = rvalue.fill_null(ibis_types.literal(0))
rvalue2 = rvalue.fill_null(ibis_types.literal(1))
if lvalue.type().is_floating() and rvalue.type().is_floating():
# NaN aren't equal so need to coalesce as well with diff constants
lvalue1 = (
typing.cast(ibis_types.FloatingColumn, lvalue)
.isnan()
.ifelse(ibis_types.literal(2), lvalue1)
)
lvalue2 = (
typing.cast(ibis_types.FloatingColumn, lvalue)
.isnan()
.ifelse(ibis_types.literal(3), lvalue2)
)
rvalue1 = (
typing.cast(ibis_types.FloatingColumn, rvalue)
.isnan()
.ifelse(ibis_types.literal(2), rvalue1)
)
rvalue2 = (
typing.cast(ibis_types.FloatingColumn, rvalue)
.isnan()
.ifelse(ibis_types.literal(3), rvalue2)
)
result = (lvalue1 == rvalue1) & (lvalue2 == rvalue2)
return typing.cast(ibis_types.BooleanColumn, result)


def _join_condition(
lvalue: ibis_types.Column, rvalue: ibis_types.Column, nullsafe: bool
) -> ibis_types.BooleanColumn:
if (lvalue.type().is_floating()) and (lvalue.type().is_floating()):
# Need to always make safe join condition to handle nan, even if no nulls
return _numeric_join_cond(lvalue, rvalue)
if nullsafe:
# TODO: Define more coalesce constants for non-numeric types to avoid cast
if (lvalue.type().is_numeric()) and (lvalue.type().is_numeric()):
return _numeric_join_cond(lvalue, rvalue)
else:
return _string_cast_join_cond(lvalue, rvalue)
return typing.cast(ibis_types.BooleanColumn, lvalue == rvalue)


def _as_groupable(value: ibis_types.Value):
# Some types need to be converted to string to enable groupby
if value.type().is_float64() or value.type().is_geospatial():
return value.cast(ibis_dtypes.str)
Expand Down
11 changes: 5 additions & 6 deletions bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.explode
import bigframes.core.compile.ibis_types
import bigframes.core.compile.isin
import bigframes.core.compile.scalar_op_compiler
import bigframes.core.compile.scalar_op_compiler as compile_scalar
import bigframes.core.compile.schema_translator
import bigframes.core.compile.single_column
import bigframes.core.expression as ex
import bigframes.core.identifiers as ids
import bigframes.core.nodes as nodes
Expand Down Expand Up @@ -130,24 +128,25 @@ def compile_join(self, node: nodes.JoinNode):
condition_pairs = tuple(
(left.id.sql, right.id.sql) for left, right in node.conditions
)

left_unordered = self.compile_node(node.left_child)
right_unordered = self.compile_node(node.right_child)
return bigframes.core.compile.single_column.join_by_column_unordered(
left=left_unordered,
return left_unordered.join(
right=right_unordered,
type=node.type,
conditions=condition_pairs,
join_nulls=node.joins_nulls,
)

@_compile_node.register
def compile_isin(self, node: nodes.InNode):
left_unordered = self.compile_node(node.left_child)
right_unordered = self.compile_node(node.right_child)
return bigframes.core.compile.isin.isin_unordered(
left=left_unordered,
return left_unordered.isin_join(
right=right_unordered,
indicator_col=node.indicator_col.sql,
conditions=(node.left_col.id.sql, node.right_col.id.sql),
join_nulls=node.joins_nulls,
)

@_compile_node.register
Expand Down
71 changes: 0 additions & 71 deletions bigframes/core/compile/isin.py

This file was deleted.

Loading