Skip to content
Merged
17 changes: 15 additions & 2 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
Union,
)

import google.api_core.exceptions
import google.cloud.bigquery as bigquery
import numpy
import pandas
Expand Down Expand Up @@ -2508,7 +2509,14 @@ def to_gbq(
)
if_exists = "replace"

if "." not in destination_table:
table_parts = destination_table.split(".")
default_project = self._block.expr.session.bqclient.project

if len(table_parts) == 2:
destination_dataset = f"{default_project}.{table_parts[0]}"
elif len(table_parts) == 3:
destination_dataset = f"{table_parts[0]}.{table_parts[1]}"
else:
raise ValueError(
f"Got invalid value for destination_table {repr(destination_table)}. "
"Should be of the form 'datasetId.tableId' or 'projectId.datasetId.tableId'."
Expand All @@ -2523,11 +2531,16 @@ def to_gbq(
f"Valid options include None or one of {dispositions.keys()}."
)

try:
self._session.bqclient.get_dataset(destination_dataset)
except google.api_core.exceptions.NotFound:
self._session.bqclient.create_dataset(destination_dataset, exists_ok=True)

job_config = bigquery.QueryJobConfig(
write_disposition=dispositions[if_exists],
destination=bigquery.table.TableReference.from_string(
destination_table,
default_project=self._block.expr.session.bqclient.project,
default_project=default_project,
),
)

Expand Down
15 changes: 15 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3683,3 +3683,18 @@ def test_to_pandas_downsampling_option_override(session):
total_memory_bytes = df.memory_usage(deep=True).sum()
total_memory_mb = total_memory_bytes / (1024 * 1024)
assert total_memory_mb == pytest.approx(download_size, rel=0.3)


def test_to_gbq_and_create_dataset(session, scalars_df_index, dataset_id_not_created):
dataset_id = dataset_id_not_created
destination_table = f"{dataset_id}.scalars_df"

result_table = scalars_df_index.to_gbq(destination_table)
assert (
result_table == destination_table
if destination_table
else result_table is not None
)

loaded_scalars_df_index = session.read_gbq(result_table)
assert not loaded_scalars_df_index.empty