Skip to content

Commit b841379

Browse files
shobsijialuoo
andcommitted
feat: upgrade BQ managed udf to preview (#1536)
* feat: upgrade BQ managed `udf` to preview * remove `_udf` footprint, add preview warning * capture references in remote_function * keep capture_references default to the curren tbehavior (True) for remote_function * target e2e tests to python 3.12 * used "udf" instead of "managed function" in the user facing messages * clarify the self-contained udf in the doc * export udf from pandas module --------- Co-authored-by: jialuoo <jialuo@google.com>
1 parent 034d08d commit b841379

File tree

9 files changed

+95
-114
lines changed

9 files changed

+95
-114
lines changed

bigframes/_config/experiment_options.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ def __init__(self):
2626
self._semantic_operators: bool = False
2727
self._ai_operators: bool = False
2828
self._blob: bool = False
29-
self._udf: bool = False
3029

3130
@property
3231
def semantic_operators(self) -> bool:
@@ -68,17 +67,3 @@ def blob(self, value: bool):
6867
)
6968
warnings.warn(msg, category=bfe.PreviewWarning)
7069
self._blob = value
71-
72-
@property
73-
def udf(self) -> bool:
74-
return self._udf
75-
76-
@udf.setter
77-
def udf(self, value: bool):
78-
if value is True:
79-
msg = bfe.format_message(
80-
"BigFrames managed function (udf) is still under experiments. "
81-
"It may not work and subject to change in the future."
82-
)
83-
warnings.warn(msg, category=bfe.PreviewWarning)
84-
self._udf = value

bigframes/functions/_function_client.py

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
}
5454
)
5555

56+
# BQ managed functions (@udf) currently only support Python 3.11.
57+
_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"
58+
5659

5760
class FunctionClient:
5861
# Wait time (in seconds) for an IAM binding to take effect after creation.
@@ -193,11 +196,22 @@ def provision_bq_managed_function(
193196
name,
194197
packages,
195198
is_row_processor,
199+
*,
200+
capture_references=False,
196201
):
197202
"""Create a BigQuery managed function."""
198-
import cloudpickle
199203

200-
pickled = cloudpickle.dumps(func)
204+
# TODO(b/406283812): Expose the capability to pass down
205+
# capture_references=True in the public udf API.
206+
if (
207+
capture_references
208+
and (python_version := _utils.get_python_version())
209+
!= _MANAGED_FUNC_PYTHON_VERSION
210+
):
211+
raise bf_formatting.create_exception_with_feedback_link(
212+
NotImplementedError,
213+
f"Capturing references for udf is currently supported only in Python version {_MANAGED_FUNC_PYTHON_VERSION}, you are running {python_version}.",
214+
)
201215

202216
# Create BQ managed function.
203217
bq_function_args = []
@@ -209,13 +223,15 @@ def provision_bq_managed_function(
209223
bq_function_args.append(f"{name_} {type_}")
210224

211225
managed_function_options = {
212-
"runtime_version": _utils.get_python_version(),
226+
"runtime_version": _MANAGED_FUNC_PYTHON_VERSION,
213227
"entry_point": "bigframes_handler",
214228
}
215229

216230
# Augment user package requirements with any internal package
217231
# requirements.
218-
packages = _utils._get_updated_package_requirements(packages, is_row_processor)
232+
packages = _utils._get_updated_package_requirements(
233+
packages, is_row_processor, capture_references
234+
)
219235
if packages:
220236
managed_function_options["packages"] = packages
221237
managed_function_options_str = self._format_function_options(
@@ -235,20 +251,45 @@ def provision_bq_managed_function(
235251
persistent_func_id = (
236252
f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}"
237253
)
238-
create_function_ddl = textwrap.dedent(
239-
f"""
240-
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
241-
RETURNS {bq_function_return_type}
242-
LANGUAGE python
243-
OPTIONS ({managed_function_options_str})
244-
AS r'''
254+
255+
udf_name = func.__name__
256+
if capture_references:
257+
# This code path ensures that if the udf body contains any
258+
# references to variables and/or imports outside the body, they are
259+
# captured as well.
245260
import cloudpickle
246-
udf = cloudpickle.loads({pickled})
247-
def bigframes_handler(*args):
248-
return udf(*args)
249-
'''
250-
"""
251-
).strip()
261+
262+
pickled = cloudpickle.dumps(func)
263+
udf_code = textwrap.dedent(
264+
f"""
265+
import cloudpickle
266+
{udf_name} = cloudpickle.loads({pickled})
267+
"""
268+
)
269+
else:
270+
# This code path ensures that if the udf body is self contained,
271+
# i.e. there are no references to variables or imports outside the
272+
# body.
273+
udf_code = textwrap.dedent(inspect.getsource(func))
274+
udf_code = udf_code[udf_code.index("def") :]
275+
276+
create_function_ddl = (
277+
textwrap.dedent(
278+
f"""
279+
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
280+
RETURNS {bq_function_return_type}
281+
LANGUAGE python
282+
OPTIONS ({managed_function_options_str})
283+
AS r'''
284+
__UDF_PLACE_HOLDER__
285+
def bigframes_handler(*args):
286+
return {udf_name}(*args)
287+
'''
288+
"""
289+
)
290+
.strip()
291+
.replace("__UDF_PLACE_HOLDER__", udf_code)
292+
)
252293

253294
self._ensure_dataset_exists()
254295
self._create_bq_function(create_function_ddl)

bigframes/functions/_function_session.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@
5858

5959
from . import _function_client, _utils
6060

61-
# BQ managed functions (@udf) currently only support Python 3.11.
62-
_MANAGED_FUNC_PYTHON_VERSIONS = ("python-3.11",)
63-
6461

6562
class FunctionSession:
6663
"""Session to manage bigframes functions."""
@@ -758,7 +755,13 @@ def udf(
758755
name: Optional[str] = None,
759756
packages: Optional[Sequence[str]] = None,
760757
):
761-
"""Decorator to turn a Python udf into a BigQuery managed function.
758+
"""Decorator to turn a Python user defined function (udf) into a
759+
BigQuery managed function.
760+
761+
.. note::
762+
The udf must be self-contained, i.e. it must not contain any
763+
references to an import or variable defined outside the function
764+
body.
762765
763766
.. note::
764767
Please have following IAM roles enabled for you:
@@ -809,17 +812,8 @@ def udf(
809812
of the form supported in
810813
https://pip.pypa.io/en/stable/reference/requirements-file-format/.
811814
"""
812-
if not bigframes.options.experiments.udf:
813-
raise bf_formatting.create_exception_with_feedback_link(NotImplementedError)
814815

815-
# Check the Python version.
816-
python_version = _utils.get_python_version()
817-
if python_version not in _MANAGED_FUNC_PYTHON_VERSIONS:
818-
raise bf_formatting.create_exception_with_feedback_link(
819-
RuntimeError,
820-
f"Python version {python_version} is not supported yet for "
821-
"BigFrames managed function.",
822-
)
816+
warnings.warn("udf is in preview.", category=bfe.PreviewWarning)
823817

824818
# Some defaults may be used from the session if not provided otherwise.
825819
session = self._resolve_session(session)
@@ -862,7 +856,7 @@ def wrapper(func):
862856
ValueError,
863857
"'input_types' was not set and parameter "
864858
f"'{parameter.name}' is missing a type annotation. "
865-
"Types are required to use managed function.",
859+
"Types are required to use udf.",
866860
)
867861
input_types.append(param_type)
868862
elif not isinstance(input_types, collections.abc.Sequence):
@@ -875,8 +869,7 @@ def wrapper(func):
875869
raise bf_formatting.create_exception_with_feedback_link(
876870
ValueError,
877871
"'output_type' was not set and function is missing a "
878-
"return type annotation. Types are required to use "
879-
"managed function.",
872+
"return type annotation. Types are required to use udf",
880873
)
881874

882875
# The function will actually be receiving a pandas Series, but allow

bigframes/functions/_utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,12 @@ def get_remote_function_locations(bq_location):
6464

6565

6666
def _get_updated_package_requirements(
67-
package_requirements=None, is_row_processor=False
67+
package_requirements=None, is_row_processor=False, capture_references=True
6868
):
69-
requirements = [f"cloudpickle=={cloudpickle.__version__}"]
69+
requirements = []
70+
if capture_references:
71+
requirements.append(f"cloudpickle=={cloudpickle.__version__}")
72+
7073
if is_row_processor:
7174
# bigframes function will send an entire row of data as json, which
7275
# would be converted to a pandas series and processed Ensure numpy

bigframes/pandas/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,5 @@ def reset_session():
362362
"get_global_session",
363363
"close_session",
364364
"reset_session",
365+
"udf",
365366
]

bigframes/session/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1435,7 +1435,13 @@ def udf(
14351435
name: Optional[str] = None,
14361436
packages: Optional[Sequence[str]] = None,
14371437
):
1438-
"""Decorator to turn a Python udf into a BigQuery managed function.
1438+
"""Decorator to turn a Python user defined function (udf) into a
1439+
BigQuery managed function.
1440+
1441+
.. note::
1442+
The udf must be self-contained, i.e. it must not contain any
1443+
references to an import or variable defined outside the function
1444+
body.
14391445
14401446
.. note::
14411447
Please have following IAM roles enabled for you:

noxfile.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@
6161

6262
# Cloud Run Functions supports Python versions up to 3.12
6363
# https://cloud.google.com/run/docs/runtimes/python
64-
# Managed Python UDF is supported only in Python 3.11
65-
# Let's set the E2E tests version to 3.11 to cover most code paths.
66-
E2E_TEST_PYTHON_VERSION = "3.11"
64+
E2E_TEST_PYTHON_VERSION = "3.12"
6765

6866
UNIT_TEST_PYTHON_VERSIONS = ["3.9", "3.10", "3.11", "3.12", "3.13"]
6967
UNIT_TEST_STANDARD_DEPENDENCIES = [

tests/system/large/functions/test_managed_function.py

Lines changed: 13 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import pytest
1919

2020
import bigframes
21+
import bigframes.exceptions as bfe
2122
import bigframes.pandas as bpd
2223
from tests.system.utils import cleanup_function_assets
2324

@@ -27,8 +28,6 @@
2728
reason="temporarily disable to debug managed udf cleanup in the test project"
2829
)
2930

30-
bpd.options.experiments.udf = True
31-
3231

3332
def test_managed_function_multiply_with_ibis(
3433
session,
@@ -127,21 +126,12 @@ def stringify(x):
127126
cleanup_function_assets(stringify, bigquery_client, ignore_failures=False)
128127

129128

130-
@pytest.mark.parametrize(
131-
"array_dtype",
132-
[
133-
bool,
134-
int,
135-
float,
136-
str,
137-
],
138-
)
139-
def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype):
129+
def test_managed_function_array_output(session, scalars_dfs, dataset_id):
140130
try:
141131

142132
@session.udf(dataset=dataset_id)
143-
def featurize(x: int) -> list[array_dtype]: # type: ignore
144-
return [array_dtype(i) for i in [x, x + 1, x + 2]]
133+
def featurize(x: int) -> list[float]:
134+
return [float(i) for i in [x, x + 1, x + 2]]
145135

146136
scalars_df, scalars_pandas_df = scalars_dfs
147137

@@ -166,7 +156,7 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore
166156

167157
# Test on the function from read_gbq_function.
168158
got = featurize_ref(10)
169-
assert got == [array_dtype(i) for i in [10, 11, 12]]
159+
assert got == [10.0, 11.0, 12.0]
170160

171161
bf_result_gbq = bf_int64_col.apply(featurize_ref).to_pandas()
172162
pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False)
@@ -176,30 +166,18 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore
176166
cleanup_function_assets(featurize, session.bqclient, ignore_failures=False)
177167

178168

179-
@pytest.mark.parametrize(
180-
("typ",),
181-
[
182-
pytest.param(int),
183-
pytest.param(float),
184-
pytest.param(bool),
185-
pytest.param(str),
186-
pytest.param(bytes),
187-
],
188-
)
189169
def test_managed_function_series_apply(
190170
session,
191-
typ,
192171
scalars_dfs,
193172
):
194173
try:
195174

196175
@session.udf()
197-
def foo(x: int) -> typ: # type:ignore
198-
# The bytes() constructor expects a non-negative interger as its arg.
199-
return typ(abs(x))
176+
def foo(x: int) -> bytes:
177+
return bytes(abs(x))
200178

201179
# Function should still work normally.
202-
assert foo(-2) == typ(2)
180+
assert foo(-2) == bytes(2)
203181

204182
assert hasattr(foo, "bigframes_bigquery_function")
205183
assert hasattr(foo, "ibis_node")
@@ -243,26 +221,17 @@ def foo(x: int) -> typ: # type:ignore
243221
cleanup_function_assets(foo, session.bqclient, ignore_failures=False)
244222

245223

246-
@pytest.mark.parametrize(
247-
("typ",),
248-
[
249-
pytest.param(int),
250-
pytest.param(float),
251-
pytest.param(bool),
252-
pytest.param(str),
253-
],
254-
)
255224
def test_managed_function_series_apply_array_output(
256225
session,
257-
typ,
258226
scalars_dfs,
259227
):
260228
try:
261229

262-
@session.udf()
263-
def foo_list(x: int) -> list[typ]: # type:ignore
264-
# The bytes() constructor expects a non-negative interger as its arg.
265-
return [typ(abs(x)), typ(abs(x) + 1)]
230+
with pytest.warns(bfe.PreviewWarning, match="udf is in preview."):
231+
232+
@session.udf()
233+
def foo_list(x: int) -> list[float]:
234+
return [float(abs(x)), float(abs(x) + 1)]
266235

267236
scalars_df, scalars_pandas_df = scalars_dfs
268237

tests/unit/_config/test_experiment_options.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,3 @@ def test_blob_set_true_shows_warning():
6161
options.blob = True
6262

6363
assert options.blob is True
64-
65-
66-
def test_udf_default_false():
67-
options = experiment_options.ExperimentOptions()
68-
69-
assert options.udf is False
70-
71-
72-
def test_udf_set_true_shows_warning():
73-
options = experiment_options.ExperimentOptions()
74-
75-
with pytest.warns(bfe.PreviewWarning):
76-
options.udf = True
77-
78-
assert options.udf is True

0 commit comments

Comments
 (0)