diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index f24ba1b5fb..b95067983f 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -204,9 +204,12 @@ class IbisSignature(NamedTuple): output_type: IbisDataType -def get_cloud_function_name(function_hash, session_id, uniq_suffix=None): +def get_cloud_function_name(function_hash, session_id=None, uniq_suffix=None): "Get a name for the cloud function for the given user defined function." - parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash] + parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX] + if session_id: + parts.append(session_id) + parts.append(function_hash) if uniq_suffix: parts.append(uniq_suffix) return _GCF_FUNCTION_NAME_SEPERATOR.join(parts) @@ -566,10 +569,13 @@ def provision_bq_remote_function( ) # Derive the name of the cloud function underlying the intended BQ - # remote function, also collect updated package requirements as - # determined in the name resolution + # remote function. Use the session id to identify the GCF for unnamed + # functions. The named remote functions are treated as a persistant + # artifacts, so let's keep them independent of session id, which also + # makes their naming more stable for the same udf code + session_id = None if name else self._session.session_id cloud_function_name = get_cloud_function_name( - function_hash, self._session.session_id, uniq_suffix + function_hash, session_id, uniq_suffix ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) @@ -635,13 +641,12 @@ def get_remote_function_specs(self, remote_function_name): ) try: for routine in routines: + routine = cast(bigquery.Routine, routine) if routine.reference.routine_id == remote_function_name: - # TODO(shobs): Use first class properties when they are available - # https://github.com/googleapis/python-bigquery/issues/1552 - rf_options = routine._properties.get("remoteFunctionOptions") + rf_options = routine.remote_function_options if rf_options: - http_endpoint = rf_options.get("endpoint") - bq_connection = rf_options.get("connection") + http_endpoint = rf_options.endpoint + bq_connection = rf_options.connection if bq_connection: bq_connection = os.path.basename(bq_connection) break @@ -731,15 +736,15 @@ class _RemoteFunctionSession: def __init__(self): # Session level mapping of remote function artifacts - self._temp_session_artifacts: Dict[str, str] = dict() + self._temp_artifacts: Dict[str, str] = dict() - # Lock to synchronize the update of the session level mapping - self._session_artifacts_lock = threading.Lock() + # Lock to synchronize the update of the session artifacts + self._artifacts_lock = threading.Lock() - def _update_artifacts(self, bqrf_routine: str, gcf_path: str): + def _update_temp_artifacts(self, bqrf_routine: str, gcf_path: str): """Update remote function artifacts in the current session.""" - with self._session_artifacts_lock: - self._temp_session_artifacts[bqrf_routine] = gcf_path + with self._artifacts_lock: + self._temp_artifacts[bqrf_routine] = gcf_path def clean_up( self, @@ -748,8 +753,8 @@ def clean_up( session_id: str, ): """Delete remote function artifacts in the current session.""" - with self._session_artifacts_lock: - for bqrf_routine, gcf_path in self._temp_session_artifacts.items(): + with self._artifacts_lock: + for bqrf_routine, gcf_path in self._temp_artifacts.items(): # Let's accept the possibility that the remote function may have # been deleted directly by the user bqclient.delete_routine(bqrf_routine, not_found_ok=True) @@ -761,7 +766,7 @@ def clean_up( except google.api_core.exceptions.NotFound: pass - self._temp_session_artifacts.clear() + self._temp_artifacts.clear() # Inspired by @udf decorator implemented in ibis-bigquery package # https://github.com/ibis-project/ibis-bigquery/blob/main/ibis_bigquery/udf/__init__.py @@ -1206,7 +1211,7 @@ def try_delattr(attr): # explicit name, we are assuming that the user wants to persist them # with that name and would directly manage their lifecycle. if created_new and (not name): - self._update_artifacts( + self._update_temp_artifacts( func.bigframes_remote_function, func.bigframes_cloud_function ) return func diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index eb990d2393..21f75eb82c 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -847,10 +847,28 @@ def clean_up_by_session_id( option_context = config.option_context """Global :class:`~bigframes._config.option_context` to configure BigQuery DataFrames.""" + # Session management APIs -get_global_session = global_session.get_global_session -close_session = global_session.close_session -reset_session = global_session.close_session +def get_global_session(): + return global_session.get_global_session() + + +get_global_session.__doc__ = global_session.get_global_session.__doc__ + + +def close_session(): + return global_session.close_session() + + +close_session.__doc__ = global_session.close_session.__doc__ + + +def reset_session(): + return global_session.close_session() + + +reset_session.__doc__ = global_session.close_session.__doc__ + # SQL Compilation uses recursive algorithms on deep trees # 10M tree depth should be sufficient to generate any sql that is under bigquery limit diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index ca242d269c..77a20026dd 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -400,7 +400,7 @@ def _clean_up_tables(self): def close(self): """Delete resources that were created with this session's session_id. This includes BigQuery tables, remote functions and cloud functions - serving the remote functions""" + serving the remote functions.""" self._clean_up_tables() self._remote_function_session.clean_up( self.bqclient, self.cloudfunctionsclient, self.session_id diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index d84d520988..c07a0afb44 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -77,15 +77,27 @@ def bq_cf_connection_location_project_mismatched() -> str: @pytest.fixture(scope="module") -def session_with_bq_connection( - bq_cf_connection, dataset_id_permanent -) -> bigframes.Session: +def session_with_bq_connection(bq_cf_connection) -> bigframes.Session: session = bigframes.Session( bigframes.BigQueryOptions(bq_connection=bq_cf_connection, location="US") ) return session +def get_rf_name(func, package_requirements=None, is_row_processor=False): + """Get a remote function name for testing given a udf.""" + # Augment user package requirements with any internal package + # requirements + package_requirements = rf._get_updated_package_requirements( + package_requirements, is_row_processor + ) + + # Compute a unique hash representing the user code + function_hash = rf._get_hash(func, package_requirements) + + return f"bigframes_{function_hash}" + + @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_direct_no_session_param( bigquery_client, @@ -96,8 +108,11 @@ def test_remote_function_direct_no_session_param( dataset_id_permanent, bq_cf_connection, ): - @rf.remote_function( - [int], + def square(x): + return x * x + + square = rf.remote_function( + int, int, bigquery_client=bigquery_client, bigquery_connection_client=bigqueryconnection_client, @@ -107,9 +122,8 @@ def test_remote_function_direct_no_session_param( bigquery_connection=bq_cf_connection, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - ) - def square(x): - return x * x + name=get_rf_name(square), + )(square) # Function should still work normally. assert square(2) == 4 @@ -153,8 +167,11 @@ def test_remote_function_direct_no_session_param_location_specified( dataset_id_permanent, bq_cf_connection_location, ): - @rf.remote_function( - [int], + def square(x): + return x * x + + square = rf.remote_function( + int, int, bigquery_client=bigquery_client, bigquery_connection_client=bigqueryconnection_client, @@ -164,9 +181,8 @@ def test_remote_function_direct_no_session_param_location_specified( bigquery_connection=bq_cf_connection_location, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - ) - def square(x): - return x * x + name=get_rf_name(square), + )(square) # Function should still work normally. assert square(2) == 4 @@ -204,13 +220,17 @@ def test_remote_function_direct_no_session_param_location_mismatched( dataset_id_permanent, bq_cf_connection_location_mismatched, ): + def square(x): + # Not expected to reach this code, as the location of the + # connection doesn't match the location of the dataset. + return x * x # pragma: NO COVER + with pytest.raises( ValueError, match=re.escape("The location does not match BigQuery connection location:"), ): - - @rf.remote_function( - [int], + rf.remote_function( + int, int, bigquery_client=bigquery_client, bigquery_connection_client=bigqueryconnection_client, @@ -220,11 +240,8 @@ def test_remote_function_direct_no_session_param_location_mismatched( bigquery_connection=bq_cf_connection_location_mismatched, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - ) - def square(x): - # Not expected to reach this code, as the location of the - # connection doesn't match the location of the dataset. - return x * x # pragma: NO COVER + name=get_rf_name(square), + )(square) @pytest.mark.flaky(retries=2, delay=120) @@ -237,8 +254,11 @@ def test_remote_function_direct_no_session_param_location_project_specified( dataset_id_permanent, bq_cf_connection_location_project, ): - @rf.remote_function( - [int], + def square(x): + return x * x + + square = rf.remote_function( + int, int, bigquery_client=bigquery_client, bigquery_connection_client=bigqueryconnection_client, @@ -248,9 +268,8 @@ def test_remote_function_direct_no_session_param_location_project_specified( bigquery_connection=bq_cf_connection_location_project, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - ) - def square(x): - return x * x + name=get_rf_name(square), + )(square) # Function should still work normally. assert square(2) == 4 @@ -288,15 +307,19 @@ def test_remote_function_direct_no_session_param_project_mismatched( dataset_id_permanent, bq_cf_connection_location_project_mismatched, ): + def square(x): + # Not expected to reach this code, as the project of the + # connection doesn't match the project of the dataset. + return x * x # pragma: NO COVER + with pytest.raises( ValueError, match=re.escape( "The project_id does not match BigQuery connection gcp_project_id:" ), ): - - @rf.remote_function( - [int], + rf.remote_function( + int, int, bigquery_client=bigquery_client, bigquery_connection_client=bigqueryconnection_client, @@ -306,23 +329,25 @@ def test_remote_function_direct_no_session_param_project_mismatched( bigquery_connection=bq_cf_connection_location_project_mismatched, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - ) - def square(x): - # Not expected to reach this code, as the project of the - # connection doesn't match the project of the dataset. - return x * x # pragma: NO COVER + name=get_rf_name(square), + )(square) @pytest.mark.flaky(retries=2, delay=120) -def test_remote_function_direct_session_param(session_with_bq_connection, scalars_dfs): - @rf.remote_function( - [int], - int, - session=session_with_bq_connection, - ) +def test_remote_function_direct_session_param( + session_with_bq_connection, scalars_dfs, dataset_id_permanent +): def square(x): return x * x + square = rf.remote_function( + int, + int, + session=session_with_bq_connection, + dataset=dataset_id_permanent, + name=get_rf_name(square), + )(square) + # Function should still work normally. assert square(2) == 4 @@ -351,7 +376,12 @@ def square(x): @pytest.mark.flaky(retries=2, delay=120) -def test_remote_function_via_session_default(session_with_bq_connection, scalars_dfs): +def test_remote_function_via_session_default( + session_with_bq_connection, scalars_dfs, dataset_id_permanent +): + def square(x): + return x * x + # Session has bigquery connection initialized via context. Without an # explicit dataset the default dataset from the session would be used. # Without an explicit bigquery connection, the one present in Session set @@ -359,9 +389,9 @@ def test_remote_function_via_session_default(session_with_bq_connection, scalars # the default behavior of reuse=True will take effect. Please note that the # udf is same as the one used in other tests in this file so the underlying # cloud function would be common and quickly reused. - @session_with_bq_connection.remote_function([int], int) - def square(x): - return x * x + square = session_with_bq_connection.remote_function( + int, int, dataset_id_permanent, name=get_rf_name(square) + )(square) # Function should still work normally. assert square(2) == 4 @@ -394,16 +424,18 @@ def square(x): def test_remote_function_via_session_with_overrides( session, scalars_dfs, dataset_id_permanent, bq_cf_connection ): - @session.remote_function( - [int], + def square(x): + return x * x + + square = session.remote_function( + int, int, dataset_id_permanent, bq_cf_connection, # See e2e tests for tests that actually deploy the Cloud Function. reuse=True, - ) - def square(x): - return x * x + name=get_rf_name(square), + )(square) # Function should still work normally. assert square(2) == 4 @@ -433,11 +465,15 @@ def square(x): @pytest.mark.flaky(retries=2, delay=120) -def test_dataframe_applymap(session_with_bq_connection, scalars_dfs): +def test_dataframe_applymap( + session_with_bq_connection, scalars_dfs, dataset_id_permanent +): def add_one(x): return x + 1 - remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one) + remote_add_one = session_with_bq_connection.remote_function( + [int], int, dataset_id_permanent, name=get_rf_name(add_one) + )(add_one) scalars_df, scalars_pandas_df = scalars_dfs int64_cols = ["int64_col", "int64_too"] @@ -460,11 +496,15 @@ def add_one(x): @pytest.mark.flaky(retries=2, delay=120) -def test_dataframe_applymap_na_ignore(session_with_bq_connection, scalars_dfs): +def test_dataframe_applymap_na_ignore( + session_with_bq_connection, scalars_dfs, dataset_id_permanent +): def add_one(x): return x + 1 - remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one) + remote_add_one = session_with_bq_connection.remote_function( + [int], int, dataset_id_permanent, name=get_rf_name(add_one) + )(add_one) scalars_df, scalars_pandas_df = scalars_dfs int64_cols = ["int64_col", "int64_too"] @@ -485,7 +525,9 @@ def add_one(x): @pytest.mark.flaky(retries=2, delay=120) -def test_series_map_bytes(session_with_bq_connection, scalars_dfs): +def test_series_map_bytes( + session_with_bq_connection, scalars_dfs, dataset_id_permanent +): """Check that bytes is support as input and output.""" scalars_df, scalars_pandas_df = scalars_dfs @@ -502,8 +544,11 @@ def bytes_to_hex(mybytes: bytes) -> bytes: pd.ArrowDtype(pyarrow.binary()) ) + packages = ["pandas"] remote_bytes_to_hex = session_with_bq_connection.remote_function( - packages=["pandas"] + dataset=dataset_id_permanent, + name=get_rf_name(bytes_to_hex, package_requirements=packages), + packages=packages, )(bytes_to_hex) bf_result = scalars_df.bytes_col.map(remote_bytes_to_hex).to_pandas() @@ -541,11 +586,14 @@ def test_skip_bq_connection_check(dataset_id_permanent): match=f"Not found: Connection {connection_name}", ): - @session.remote_function([int], int, dataset=dataset_id_permanent) def add_one(x): # Not expected to reach this code, as the connection doesn't exist. return x + 1 # pragma: NO COVER + session.remote_function( + [int], int, dataset=dataset_id_permanent, name=get_rf_name(add_one) + )(add_one) + @pytest.mark.flaky(retries=2, delay=120) def test_read_gbq_function_detects_invalid_function(session, dataset_id): @@ -570,7 +618,10 @@ def test_read_gbq_function_like_original( dataset_id_permanent, bq_cf_connection, ): - @rf.remote_function( + def square1(x): + return x * x + + square1 = rf.remote_function( [int], int, bigquery_client=bigquery_client, @@ -580,29 +631,28 @@ def test_read_gbq_function_like_original( resource_manager_client=resourcemanager_client, bigquery_connection=bq_cf_connection, reuse=True, - ) - def square1(x): - return x * x + name=get_rf_name(square1), + )(square1) # Function should still work normally. assert square1(2) == 4 square2 = rf.read_gbq_function( - function_name=square1.bigframes_remote_function, + function_name=square1.bigframes_remote_function, # type: ignore session=session, ) # The newly-created function (square1) should have a remote function AND a # cloud function associated with it, while the read-back version (square2) # should only have a remote function. - assert square1.bigframes_remote_function - assert square1.bigframes_cloud_function + assert square1.bigframes_remote_function # type: ignore + assert square1.bigframes_cloud_function # type: ignore assert square2.bigframes_remote_function assert not hasattr(square2, "bigframes_cloud_function") # They should point to the same function. - assert square1.bigframes_remote_function == square2.bigframes_remote_function + assert square1.bigframes_remote_function == square2.bigframes_remote_function # type: ignore # The result of applying them should be the same. int64_col = scalars_df_index["int64_col"] @@ -743,7 +793,7 @@ def test_read_gbq_function_enforces_explicit_types( @pytest.mark.flaky(retries=2, delay=120) -def test_df_apply_axis_1(session, scalars_dfs): +def test_df_apply_axis_1(session, scalars_dfs, dataset_id_permanent): columns = [ "bool_col", "int64_col", @@ -764,6 +814,8 @@ def add_ints(row): add_ints_remote = session.remote_function( bigframes.series.Series, int, + dataset_id_permanent, + name=get_rf_name(add_ints, is_row_processor=True), )(add_ints) with pytest.warns( @@ -785,7 +837,7 @@ def add_ints(row): @pytest.mark.flaky(retries=2, delay=120) -def test_df_apply_axis_1_ordering(session, scalars_dfs): +def test_df_apply_axis_1_ordering(session, scalars_dfs, dataset_id_permanent): columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] ordering_columns = ["bool_col", "int64_col"] scalars_df, scalars_pandas_df = scalars_dfs @@ -793,7 +845,12 @@ def test_df_apply_axis_1_ordering(session, scalars_dfs): def add_ints(row): return row["int64_col"] + row["int64_too"] - add_ints_remote = session.remote_function(bigframes.series.Series, int)(add_ints) + add_ints_remote = session.remote_function( + bigframes.series.Series, + int, + dataset_id_permanent, + name=get_rf_name(add_ints, is_row_processor=True), + )(add_ints) bf_result = ( scalars_df[columns] @@ -817,7 +874,7 @@ def add_ints(row): @pytest.mark.flaky(retries=2, delay=120) -def test_df_apply_axis_1_multiindex(session): +def test_df_apply_axis_1_multiindex(session, dataset_id_permanent): pd_df = pd.DataFrame( {"x": [1, 2, 3], "y": [1.5, 3.75, 5], "z": ["pq", "rs", "tu"]}, index=pd.MultiIndex.from_tuples([("a", 100), ("a", 200), ("b", 300)]), @@ -827,9 +884,12 @@ def test_df_apply_axis_1_multiindex(session): def add_numbers(row): return row["x"] + row["y"] - add_numbers_remote = session.remote_function(bigframes.series.Series, float)( - add_numbers - ) + add_numbers_remote = session.remote_function( + bigframes.series.Series, + float, + dataset_id_permanent, + name=get_rf_name(add_numbers, is_row_processor=True), + )(add_numbers) bf_result = bf_df.apply(add_numbers_remote, axis=1).to_pandas() pd_result = pd_df.apply(add_numbers, axis=1)