From e5643825f6fbfd9526b128dab32057653f297538 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 15 Apr 2025 01:02:36 +0000 Subject: [PATCH 1/6] feat: Support write api as loading option --- bigframes/core/local_data.py | 76 +++++++++++++++++---- bigframes/core/utils.py | 3 + bigframes/session/__init__.py | 5 ++ bigframes/session/clients.py | 31 +++++++++ bigframes/session/loader.py | 63 +++++++++++++++-- setup.py | 2 + tests/system/small/test_dataframe.py | 3 +- tests/system/small/test_session.py | 68 ++++++------------ third_party/bigframes_vendored/constants.py | 6 +- 9 files changed, 190 insertions(+), 67 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 70b1741af7..25b462f70f 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -96,27 +96,65 @@ def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable: schemata.ArraySchema(tuple(fields)), ) - def to_parquet( + def to_arrow( self, - dst: Union[str, io.IOBase], *, offsets_col: Optional[str] = None, geo_format: Literal["wkb", "wkt"] = "wkt", duration_type: Literal["int", "duration"] = "duration", json_type: Literal["string"] = "string", - ): - pa_table = self.data - if offsets_col is not None: - pa_table = pa_table.append_column( - offsets_col, pa.array(range(pa_table.num_rows), type=pa.int64()) - ) + ) -> tuple[pa.Schema, Iterable[pa.RecordBatch]]: if geo_format != "wkt": raise NotImplementedError(f"geo format {geo_format} not yet implemented") - if duration_type != "duration": - raise NotImplementedError( - f"duration as {duration_type} not yet implemented" - ) assert json_type == "string" + + batches = self.data.to_batches() + schema = self.data.schema + if duration_type == "int": + + @_recursive_map_types + def durations_to_ints(type: pa.DataType) -> pa.DataType: + if pa.types.is_duration(type): + return pa.int64() + return type + + schema = pa.schema( + pa.field(field.name, durations_to_ints(field.type)) + for field in self.data.schema + ) + + # Can't use RecordBatch.cast until set higher min pyarrow version + def convert_batch(batch: pa.RecordBatch) -> pa.RecordBatch: + return pa.record_batch( + [arr.cast(type) for arr, type in zip(batch.columns, schema.types)], + schema=schema, + ) + + batches = map(convert_batch, batches) + + if offsets_col is not None: + return schema.append(pa.field(offsets_col, pa.int64())), _append_offsets( + batches, offsets_col + ) + else: + return schema, batches + + def to_parquet( + self, + dst: Union[str, io.IOBase], + *, + offsets_col: Optional[str] = None, + geo_format: Literal["wkb", "wkt"] = "wkt", + duration_type: Literal["int", "duration"] = "duration", + json_type: Literal["string"] = "string", + ): + schema, batches = self.to_arrow( + offsets_col=offsets_col, + geo_format=geo_format, + duration_type=duration_type, + json_type=json_type, + ) + pa_table = pa.Table.from_batches(batches, schema=schema) pyarrow.parquet.write_table(pa_table, where=dst) def itertuples( @@ -329,3 +367,17 @@ def _physical_type_replacements(dtype: pa.DataType) -> pa.DataType: if dtype in _ARROW_MANAGED_STORAGE_OVERRIDES: return _ARROW_MANAGED_STORAGE_OVERRIDES[dtype] return dtype + + +def _append_offsets( + batches: Iterable[pa.RecordBatch], offsets_col_name: str +) -> Iterable[pa.RecordBatch]: + offset = 0 + for batch in batches: + offsets = pa.array(range(offset, offset + batch.num_rows), type=pa.int64()) + batch_w_offsets = pa.record_batch( + [*batch.columns, offsets], + schema=batch.schema.append(pa.field(offsets_col_name, pa.int64())), + ) + offset += batch.num_rows + yield batch_w_offsets diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index ee09fc69cb..9731857ea0 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -142,6 +142,9 @@ def label_to_identifier(label: typing.Hashable, strict: bool = False) -> str: identifier = re.sub(r"[^a-zA-Z0-9_]", "", identifier) if not identifier: identifier = "id" + elif identifier[0].isdigit(): + # first character must be letter or underscore + identifier = "_" + identifier return identifier diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index a27094952f..3b4a723a03 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -255,6 +255,7 @@ def __init__( session=self, bqclient=self._clients_provider.bqclient, storage_manager=self._temp_storage_manager, + write_client=self._clients_provider.bqstoragewriteclient, default_index_type=self._default_index_type, scan_index_uniqueness=self._strictly_ordered, force_total_order=self._strictly_ordered, @@ -805,6 +806,10 @@ def _read_pandas( return self._loader.read_pandas( pandas_dataframe, method="stream", api_name=api_name ) + elif write_engine == "bigquery_write": + return self._loader.read_pandas( + pandas_dataframe, method="write", api_name=api_name + ) else: raise ValueError(f"Got unexpected write_engine '{write_engine}'") diff --git a/bigframes/session/clients.py b/bigframes/session/clients.py index 86be8bd897..5ef974d565 100644 --- a/bigframes/session/clients.py +++ b/bigframes/session/clients.py @@ -134,6 +134,9 @@ def __init__( self._bqstoragereadclient: Optional[ google.cloud.bigquery_storage_v1.BigQueryReadClient ] = None + self._bqstoragewriteclient: Optional[ + google.cloud.bigquery_storage_v1.BigQueryWriteClient + ] = None self._cloudfunctionsclient: Optional[ google.cloud.functions_v2.FunctionServiceClient ] = None @@ -238,6 +241,34 @@ def bqstoragereadclient(self): return self._bqstoragereadclient + @property + def bqstoragewriteclient(self): + if not self._bqstoragewriteclient: + bqstorage_options = None + if "bqstoragewriteclient" in self._client_endpoints_override: + bqstorage_options = google.api_core.client_options.ClientOptions( + api_endpoint=self._client_endpoints_override["bqstoragewriteclient"] + ) + elif self._use_regional_endpoints: + bqstorage_options = google.api_core.client_options.ClientOptions( + api_endpoint=_BIGQUERYSTORAGE_REGIONAL_ENDPOINT.format( + location=self._location + ) + ) + + bqstorage_info = google.api_core.gapic_v1.client_info.ClientInfo( + user_agent=self._application_name + ) + self._bqstoragewriteclient = ( + google.cloud.bigquery_storage_v1.BigQueryWriteClient( + client_info=bqstorage_info, + client_options=bqstorage_options, + credentials=self._credentials, + ) + ) + + return self._bqstoragewriteclient + @property def cloudfunctionsclient(self): if not self._cloudfunctionsclient: diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 2fa6e64baa..211a395bd5 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -23,6 +23,7 @@ import typing from typing import ( Dict, + Generator, Hashable, IO, Iterable, @@ -36,12 +37,13 @@ import bigframes_vendored.constants as constants import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import google.api_core.exceptions +from google.cloud import bigquery_storage_v1 import google.cloud.bigquery as bigquery -import google.cloud.bigquery.table +from google.cloud.bigquery_storage_v1 import types as bq_storage_types import pandas import pyarrow as pa -from bigframes.core import local_data, utils +from bigframes.core import guid, local_data, utils import bigframes.core as core import bigframes.core.blocks as blocks import bigframes.core.schema as schemata @@ -142,6 +144,7 @@ def __init__( self, session: bigframes.session.Session, bqclient: bigquery.Client, + write_client: bigquery_storage_v1.BigQueryWriteClient, storage_manager: bigframes.session.temporary_storage.TemporaryStorageManager, default_index_type: bigframes.enums.DefaultIndexKind, scan_index_uniqueness: bool, @@ -149,6 +152,7 @@ def __init__( metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, ): self._bqclient = bqclient + self._write_client = write_client self._storage_manager = storage_manager self._default_index_type = default_index_type self._scan_index_uniqueness = scan_index_uniqueness @@ -165,7 +169,7 @@ def __init__( def read_pandas( self, pandas_dataframe: pandas.DataFrame, - method: Literal["load", "stream"], + method: Literal["load", "stream", "write"], api_name: str, ) -> dataframe.DataFrame: # TODO: Push this into from_pandas, along with index flag @@ -183,6 +187,8 @@ def read_pandas( array_value = self.load_data(managed_data, api_name=api_name) elif method == "stream": array_value = self.stream_data(managed_data) + elif method == "write": + array_value = self.write_data(managed_data) else: raise ValueError(f"Unsupported read method {method}") @@ -198,7 +204,7 @@ def load_data( self, data: local_data.ManagedArrowTable, api_name: Optional[str] = None ) -> core.ArrayValue: """Load managed data into bigquery""" - ordering_col = "bf_load_job_offsets" + ordering_col = guid.generate_guid("load_offsets_") # JSON support incomplete for item in data.schema.items: @@ -244,7 +250,7 @@ def load_data( def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: """Load managed data into bigquery""" - ordering_col = "bf_stream_job_offsets" + ordering_col = guid.generate_guid("stream_offsets_") schema_w_offsets = data.schema.append( schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) ) @@ -277,6 +283,53 @@ def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: n_rows=data.data.num_rows, ).drop_columns([ordering_col]) + def write_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: + """Load managed data into bigquery""" + ordering_col = guid.generate_guid("stream_offsets_") + schema_w_offsets = data.schema.append( + schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) + ) + bq_schema = schema_w_offsets.to_bigquery(_STREAM_JOB_TYPE_OVERRIDES) + bq_table_ref = self._storage_manager.create_temp_table( + bq_schema, [ordering_col] + ) + + requested_stream = bq_storage_types.stream.WriteStream() + requested_stream.type_ = bq_storage_types.stream.WriteStream.Type.COMMITTED # type: ignore + + stream_request = bq_storage_types.CreateWriteStreamRequest( + parent=bq_table_ref.to_bqstorage(), write_stream=requested_stream + ) + stream = self._write_client.create_write_stream(request=stream_request) + + def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: + schema, batches = data.to_arrow( + offsets_col=ordering_col, duration_type="int" + ) + for batch in batches: + request = bq_storage_types.AppendRowsRequest(write_stream=stream.name) + request.arrow_rows.writer_schema.serialized_schema = ( + schema.serialize().to_pybytes() + ) + request.arrow_rows.rows.serialized_record_batch = ( + batch.serialize().to_pybytes() + ) + yield request + + for response in self._write_client.append_rows(requests=request_gen()): + if response.row_errors: + raise ValueError( + f"Problem loading at least one row from DataFrame: {response.row_errors}. {constants.FEEDBACK_LINK}" + ) + destination_table = self._bqclient.get_table(bq_table_ref) + return core.ArrayValue.from_table( + table=destination_table, + schema=schema_w_offsets, + session=self._session, + offsets_col=ordering_col, + n_rows=data.data.num_rows, + ).drop_columns([ordering_col]) + def _start_generic_job(self, job: formatting_helpers.GenericJob): if bigframes.options.display.progress_bar is not None: formatting_helpers.wait_for_job( diff --git a/setup.py b/setup.py index 7ee4c2a4d9..5397ae2b4f 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,8 @@ "google-cloud-bigtable >=2.24.0", "google-cloud-pubsub >=2.21.4", "google-cloud-bigquery[bqstorage,pandas] >=3.31.0", + # 2.30 needed for arrow support. + "google-cloud-bigquery-storage >= 2.30.0, < 3.0.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", "google-cloud-iam >=2.12.1", diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 304f2fea4f..16fd6afb2c 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -84,6 +84,7 @@ def test_df_construct_pandas_default(scalars_dfs): ("bigquery_inline"), ("bigquery_load"), ("bigquery_streaming"), + ("bigquery_write"), ], ) def test_read_pandas_all_nice_types( @@ -1769,7 +1770,7 @@ def test_len(scalars_dfs): ) @pytest.mark.parametrize( "write_engine", - ["bigquery_load", "bigquery_streaming"], + ["bigquery_load", "bigquery_streaming", "bigquery_write"], ) def test_df_len_local(session, n_rows, write_engine): assert ( diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index c969c4a588..5dbe6d03e7 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -38,6 +38,17 @@ import bigframes.ml.linear_model from tests.system import utils +all_write_engines = pytest.mark.parametrize( + "write_engine", + [ + "default", + "bigquery_inline", + "bigquery_load", + "bigquery_streaming", + "bigquery_write", + ], +) + @pytest.fixture(scope="module") def df_and_local_csv(scalars_df_index): @@ -867,10 +878,7 @@ def test_read_pandas_tokyo( # old versions don't support local casting to arrow duration @utils.skip_legacy_pandas -@pytest.mark.parametrize( - "write_engine", - ["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"], -) +@all_write_engines def test_read_pandas_timedelta_dataframes(session, write_engine): pandas_df = pd.DataFrame({"my_col": pd.to_timedelta([1, 2, 3], unit="d")}) @@ -883,10 +891,7 @@ def test_read_pandas_timedelta_dataframes(session, write_engine): pd.testing.assert_frame_equal(actual_result, expected_result) -@pytest.mark.parametrize( - "write_engine", - ["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"], -) +@all_write_engines def test_read_pandas_timedelta_series(session, write_engine): expected_series = pd.Series(pd.to_timedelta([1, 2, 3], unit="d")) @@ -901,10 +906,7 @@ def test_read_pandas_timedelta_series(session, write_engine): ) -@pytest.mark.parametrize( - "write_engine", - ["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"], -) +@all_write_engines def test_read_pandas_timedelta_index(session, write_engine): expected_index = pd.to_timedelta( [1, 2, 3], unit="d" @@ -919,15 +921,7 @@ def test_read_pandas_timedelta_index(session, write_engine): pd.testing.assert_index_equal(actual_result, expected_index) -@pytest.mark.parametrize( - ("write_engine"), - [ - pytest.param("default"), - pytest.param("bigquery_load"), - pytest.param("bigquery_streaming"), - pytest.param("bigquery_inline"), - ], -) +@all_write_engines def test_read_pandas_json_dataframes(session, write_engine): json_data = [ "1", @@ -946,15 +940,7 @@ def test_read_pandas_json_dataframes(session, write_engine): pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False) -@pytest.mark.parametrize( - ("write_engine"), - [ - pytest.param("default"), - pytest.param("bigquery_load"), - pytest.param("bigquery_streaming"), - pytest.param("bigquery_inline"), - ], -) +@all_write_engines def test_read_pandas_json_series(session, write_engine): json_data = [ "1", @@ -972,15 +958,7 @@ def test_read_pandas_json_series(session, write_engine): ) -@pytest.mark.parametrize( - ("write_engine"), - [ - pytest.param("default"), - pytest.param("bigquery_load"), - pytest.param("bigquery_streaming"), - pytest.param("bigquery_inline", marks=pytest.mark.xfail(raises=ValueError)), - ], -) +@all_write_engines def test_read_pandas_json_index(session, write_engine): json_data = [ "1", @@ -1028,6 +1006,7 @@ def test_read_pandas_w_nested_json_fails(session, write_engine): pytest.param("default"), pytest.param("bigquery_inline"), pytest.param("bigquery_streaming"), + pytest.param("bigquery_write"), ], ) def test_read_pandas_w_nested_json(session, write_engine): @@ -1085,6 +1064,7 @@ def test_read_pandas_w_nested_json_index_fails(session, write_engine): pytest.param("default"), pytest.param("bigquery_inline"), pytest.param("bigquery_streaming"), + pytest.param("bigquery_write"), ], ) def test_read_pandas_w_nested_json_index(session, write_engine): @@ -1105,15 +1085,7 @@ def test_read_pandas_w_nested_json_index(session, write_engine): pd.testing.assert_index_equal(bq_idx, pd_idx) -@pytest.mark.parametrize( - ("write_engine",), - ( - ("default",), - ("bigquery_inline",), - ("bigquery_load",), - ("bigquery_streaming",), - ), -) +@all_write_engines def test_read_csv_for_gcs_file_w_write_engine(session, df_and_gcs_csv, write_engine): scalars_df, path = df_and_gcs_csv diff --git a/third_party/bigframes_vendored/constants.py b/third_party/bigframes_vendored/constants.py index d11d8ba2cb..af87694cd5 100644 --- a/third_party/bigframes_vendored/constants.py +++ b/third_party/bigframes_vendored/constants.py @@ -47,6 +47,10 @@ ) WriteEngineType = Literal[ - "default", "bigquery_inline", "bigquery_load", "bigquery_streaming" + "default", + "bigquery_inline", + "bigquery_load", + "bigquery_streaming", + "bigquery_write", ] VALID_WRITE_ENGINES = typing.get_args(WriteEngineType) From cb524cab017d7092b541df1a7a78bdbaf9e48a07 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 16 Apr 2025 20:07:28 +0000 Subject: [PATCH 2/6] fix test_get_standardized_ids_indexes --- tests/unit/core/test_bf_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/core/test_bf_utils.py b/tests/unit/core/test_bf_utils.py index cb3b03d988..9b4c4f8742 100644 --- a/tests/unit/core/test_bf_utils.py +++ b/tests/unit/core/test_bf_utils.py @@ -46,7 +46,7 @@ def test_get_standardized_ids_indexes(): assert col_ids == ["duplicate_2"] assert idx_ids == [ "string", - "0", + "_0", utils.UNNAMED_INDEX_ID, "duplicate", "duplicate_1", From dc6b7a889c4bc6e6fcb3cbefee2797848a863990 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 18 Apr 2025 20:10:01 +0000 Subject: [PATCH 3/6] finalize stream, refactor duration-int logic --- bigframes/core/local_data.py | 43 +++++++++++++++++++----------------- bigframes/session/loader.py | 4 ++++ 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 476751957b..54d88385b3 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -113,26 +113,8 @@ def to_arrow( batches = self.data.to_batches() schema = self.data.schema if duration_type == "int": - - @_recursive_map_types - def durations_to_ints(type: pa.DataType) -> pa.DataType: - if pa.types.is_duration(type): - return pa.int64() - return type - - schema = pa.schema( - pa.field(field.name, durations_to_ints(field.type)) - for field in self.data.schema - ) - - # Can't use RecordBatch.cast until set higher min pyarrow version - def convert_batch(batch: pa.RecordBatch) -> pa.RecordBatch: - return pa.record_batch( - [arr.cast(type) for arr, type in zip(batch.columns, schema.types)], - schema=schema, - ) - - batches = map(convert_batch, batches) + schema = _schema_durations_to_ints(schema) + batches = map(functools.partial(_cast_pa_batch, schema=schema), batches) if offsets_col is not None: return schema.append(pa.field(offsets_col, pa.int64())), _append_offsets( @@ -414,3 +396,24 @@ def _append_offsets( ) offset += batch.num_rows yield batch_w_offsets + + +@_recursive_map_types +def _durations_to_ints(type: pa.DataType) -> pa.DataType: + if pa.types.is_duration(type): + return pa.int64() + return type + + +def _schema_durations_to_ints(schema: pa.Schema) -> pa.Schema: + return pa.schema( + pa.field(field.name, _durations_to_ints(field.type)) for field in schema + ) + + +# TODO: Use RecordBatch.cast once min pyarrow>=16.0 +def _cast_pa_batch(batch: pa.RecordBatch, schema: pa.Schema) -> pa.RecordBatch: + return pa.record_batch( + [arr.cast(type) for arr, type in zip(batch.columns, schema.types)], + schema=schema, + ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 1576b3ecab..9b530b867c 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -321,6 +321,10 @@ def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: raise ValueError( f"Problem loading at least one row from DataFrame: {response.row_errors}. {constants.FEEDBACK_LINK}" ) + # This step isn't strictly necessary in COMMITTED mode, but avoids max active stream limits + response = self._write_client.finalize_write_stream(name=stream.name) + assert response.row_count == data.data.num_rows + destination_table = self._bqclient.get_table(bq_table_ref) return core.ArrayValue.from_table( table=destination_table, From 3d2d83e3c5986984b6c873d48ed79be118b107ee Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 18 Apr 2025 20:14:50 +0000 Subject: [PATCH 4/6] add offsets --- bigframes/session/loader.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 9b530b867c..c8b22b9f3a 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -306,14 +306,18 @@ def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: schema, batches = data.to_arrow( offsets_col=ordering_col, duration_type="int" ) + offset = 0 for batch in batches: - request = bq_storage_types.AppendRowsRequest(write_stream=stream.name) + request = bq_storage_types.AppendRowsRequest( + write_stream=stream.name, offset=offset + ) request.arrow_rows.writer_schema.serialized_schema = ( schema.serialize().to_pybytes() ) request.arrow_rows.rows.serialized_record_batch = ( batch.serialize().to_pybytes() ) + offset += batch.num_rows yield request for response in self._write_client.append_rows(requests=request_gen()): From 5f22defdade325d8d84523441163168902346fc4 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 23 Apr 2025 20:05:01 +0000 Subject: [PATCH 5/6] add to read_pandas docstring --- bigframes/session/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index e4274b5363..6379a6f2e8 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -732,7 +732,9 @@ def read_pandas( workload is such that you exhaust the BigQuery load job quota and your data cannot be embedded in SQL due to size or data type limitations. - + * "bigquery_write": + [Preview] Use the BigQuery Storage Write API. This feature + is in public preview. Returns: An equivalent bigframes.pandas.(DataFrame/Series/Index) object From dc7c71fbe726a4bc5803dcac3af7673c794e62ef Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 24 Apr 2025 17:53:33 +0000 Subject: [PATCH 6/6] fix batch/schema switcharoo --- bigframes/core/local_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index bb5f5266e7..2fca5524af 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -130,7 +130,7 @@ def to_pyarrow_table( duration_type: Literal["int", "duration"] = "duration", json_type: Literal["string"] = "string", ) -> pa.Table: - batches, schema = self.to_arrow( + schema, batches = self.to_arrow( offsets_col=offsets_col, geo_format=geo_format, duration_type=duration_type,