Skip to content

Commit 08ea02c

Browse files
refactor: Separate dataset id generation from temp table management (#1520)
1 parent 34ab9b8 commit 08ea02c

File tree

8 files changed

+63
-50
lines changed

8 files changed

+63
-50
lines changed

bigframes/blob/_functions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ def _output_bq_type(self):
6868

6969
def _create_udf(self):
7070
"""Create Python UDF in BQ. Return name of the UDF."""
71-
udf_name = str(self._session._loader._storage_manager._random_table())
71+
udf_name = str(
72+
self._session._loader._storage_manager.generate_unique_resource_id()
73+
)
7274

7375
func_body = inspect.getsource(self._func)
7476
func_name = self._func.__name__

bigframes/dataframe.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3760,10 +3760,9 @@ def to_gbq(
37603760
)
37613761
if_exists = "replace"
37623762

3763-
temp_table_ref = self._session._temp_storage_manager._random_table(
3764-
# The client code owns this table reference now, so skip_cleanup=True
3765-
# to not clean it up when we close the session.
3766-
skip_cleanup=True,
3763+
# The client code owns this table reference now
3764+
temp_table_ref = (
3765+
self._session._temp_storage_manager.generate_unique_resource_id()
37673766
)
37683767
destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}"
37693768

bigframes/session/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ def __init__(
248248
self._metrics = bigframes.session.metrics.ExecutionMetrics()
249249
self._function_session = bff_session.FunctionSession()
250250
self._temp_storage_manager = (
251-
bigframes.session.temp_storage.TemporaryGbqStorageManager(
251+
bigframes.session.temp_storage.AnonymousDatasetManager(
252252
self._clients_provider.bqclient,
253253
location=self._location,
254254
session_id=self._session_id,
@@ -908,7 +908,7 @@ def read_csv(
908908
engine=engine,
909909
write_engine=write_engine,
910910
)
911-
table = self._temp_storage_manager._random_table()
911+
table = self._temp_storage_manager.allocate_temp_table()
912912

913913
if engine is not None and engine == "bigquery":
914914
if any(param is not None for param in (dtype, names)):
@@ -1054,7 +1054,7 @@ def read_parquet(
10541054
engine=engine,
10551055
write_engine=write_engine,
10561056
)
1057-
table = self._temp_storage_manager._random_table()
1057+
table = self._temp_storage_manager.allocate_temp_table()
10581058

10591059
if engine == "bigquery":
10601060
job_config = bigquery.LoadJobConfig()
@@ -1108,7 +1108,7 @@ def read_json(
11081108
engine=engine,
11091109
write_engine=write_engine,
11101110
)
1111-
table = self._temp_storage_manager._random_table()
1111+
table = self._temp_storage_manager.allocate_temp_table()
11121112

11131113
if engine == "bigquery":
11141114

@@ -1704,7 +1704,7 @@ def _start_query_ml_ddl(
17041704

17051705
def _create_object_table(self, path: str, connection: str) -> str:
17061706
"""Create a random id Object Table from the input path and connection."""
1707-
table = str(self._loader._storage_manager._random_table())
1707+
table = str(self._loader._storage_manager.generate_unique_resource_id())
17081708

17091709
import textwrap
17101710

bigframes/session/executor.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ class BigQueryCachingExecutor(Executor):
195195
def __init__(
196196
self,
197197
bqclient: bigquery.Client,
198-
storage_manager: bigframes.session.temp_storage.TemporaryGbqStorageManager,
198+
storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager,
199199
bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient,
200200
*,
201201
strictly_ordered: bool = True,
@@ -248,7 +248,7 @@ def execute(
248248
job_config = bigquery.QueryJobConfig()
249249
# Use explicit destination to avoid 10GB limit of temporary table
250250
if use_explicit_destination:
251-
destination_table = self.storage_manager.create_temp_table(
251+
destination_table = self.storage_manager.allocate_and_create_temp_table(
252252
array_value.schema.to_bigquery(), cluster_cols=[]
253253
)
254254
job_config.destination = destination_table
@@ -392,7 +392,7 @@ def peek(
392392
job_config = bigquery.QueryJobConfig()
393393
# Use explicit destination to avoid 10GB limit of temporary table
394394
if use_explicit_destination:
395-
destination_table = self.storage_manager.create_temp_table(
395+
destination_table = self.storage_manager.allocate_and_create_temp_table(
396396
array_value.schema.to_bigquery(), cluster_cols=[]
397397
)
398398
job_config.destination = destination_table
@@ -645,7 +645,9 @@ def _sql_as_cached_temp_table(
645645
cluster_cols: Sequence[str],
646646
) -> bigquery.TableReference:
647647
assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS
648-
temp_table = self.storage_manager.create_temp_table(schema, cluster_cols)
648+
temp_table = self.storage_manager.allocate_and_create_temp_table(
649+
schema, cluster_cols
650+
)
649651

650652
# TODO: Get default job config settings
651653
job_config = cast(

bigframes/session/loader.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def __init__(
115115
self,
116116
session: bigframes.session.Session,
117117
bqclient: bigquery.Client,
118-
storage_manager: bigframes.session.temp_storage.TemporaryGbqStorageManager,
118+
storage_manager: bigframes.session.temp_storage.AnonymousDatasetManager,
119119
default_index_type: bigframes.enums.DefaultIndexKind,
120120
scan_index_uniqueness: bool,
121121
force_total_order: bool,
@@ -167,7 +167,7 @@ def read_pandas_load_job(
167167

168168
job_config.labels = {"bigframes-api": api_name}
169169

170-
load_table_destination = self._storage_manager._random_table()
170+
load_table_destination = self._storage_manager.allocate_temp_table()
171171
load_job = self._bqclient.load_table_from_dataframe(
172172
pandas_dataframe_copy,
173173
load_table_destination,
@@ -216,7 +216,7 @@ def read_pandas_streaming(
216216
index=True,
217217
)
218218

219-
destination = self._storage_manager.create_temp_table(
219+
destination = self._storage_manager.allocate_and_create_temp_table(
220220
schema,
221221
[ordering_col],
222222
)
@@ -673,7 +673,9 @@ def _query_to_destination(
673673
)
674674
else:
675675
cluster_cols = []
676-
temp_table = self._storage_manager.create_temp_table(schema, cluster_cols)
676+
temp_table = self._storage_manager.allocate_and_create_temp_table(
677+
schema, cluster_cols
678+
)
677679

678680
timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get(
679681
"timeoutMs"

bigframes/session/temp_storage.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
_TEMP_TABLE_ID_FORMAT = "bqdf{date}_{session_id}_{random_id}"
2525

2626

27-
class TemporaryGbqStorageManager:
27+
class AnonymousDatasetManager:
2828
"""
2929
Responsible for allocating and cleaning up temporary gbq tables used by a BigFrames session.
3030
"""
@@ -46,32 +46,42 @@ def __init__(
4646
)
4747

4848
self.session_id = session_id
49-
self._table_ids: List[str] = []
49+
self._table_ids: List[bigquery.TableReference] = []
5050
self._kms_key = kms_key
5151

52-
def create_temp_table(
52+
def allocate_and_create_temp_table(
5353
self, schema: Sequence[bigquery.SchemaField], cluster_cols: Sequence[str]
5454
) -> bigquery.TableReference:
55-
# Can't set a table in _SESSION as destination via query job API, so we
56-
# run DDL, instead.
55+
"""
56+
Allocates and and creates a table in the anonymous dataset.
57+
The table will be cleaned up by clean_up_tables.
58+
"""
5759
expiration = (
5860
datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION
5961
)
6062
table = bf_io_bigquery.create_temp_table(
6163
self.bqclient,
62-
self._random_table(),
64+
self.allocate_temp_table(),
6365
expiration,
6466
schema=schema,
6567
cluster_columns=list(cluster_cols),
6668
kms_key=self._kms_key,
6769
)
6870
return bigquery.TableReference.from_string(table)
6971

70-
def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference:
72+
def allocate_temp_table(self) -> bigquery.TableReference:
73+
"""
74+
Allocates a unique table id, but does not create the table.
75+
The table will be cleaned up by clean_up_tables.
76+
"""
77+
table_id = self.generate_unique_resource_id()
78+
self._table_ids.append(table_id)
79+
return table_id
80+
81+
def generate_unique_resource_id(self) -> bigquery.TableReference:
7182
"""Generate a random table ID with BigQuery DataFrames prefix.
7283
73-
The generated ID will be stored and checked for deletion when the
74-
session is closed, unless skip_cleanup is True.
84+
This resource will not be cleaned up by this manager.
7585
7686
Args:
7787
skip_cleanup (bool, default False):
@@ -87,16 +97,9 @@ def _random_table(self, skip_cleanup: bool = False) -> bigquery.TableReference:
8797
table_id = _TEMP_TABLE_ID_FORMAT.format(
8898
date=now.strftime("%Y%m%d"), session_id=self.session_id, random_id=random_id
8999
)
90-
if not skip_cleanup:
91-
self._table_ids.append(table_id)
92100
return self.dataset.table(table_id)
93101

94102
def clean_up_tables(self):
95103
"""Delete tables that were created with this session's session_id."""
96-
client = self.bqclient
97-
project_id = self.dataset.project
98-
dataset_id = self.dataset.dataset_id
99-
100-
for table_id in self._table_ids:
101-
full_id = ".".join([project_id, dataset_id, table_id])
102-
client.delete_table(full_id, not_found_ok=True)
104+
for table_ref in self._table_ids:
105+
self.bqclient.delete_table(table_ref, not_found_ok=True)

tests/system/large/test_session.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import datetime
1616

17+
import google.cloud.bigquery as bigquery
1718
import google.cloud.exceptions
1819
import pytest
1920

@@ -70,10 +71,14 @@ def test_close(session: bigframes.Session):
7071
+ bigframes.constants.DEFAULT_EXPIRATION
7172
)
7273
full_id_1 = bigframes.session._io.bigquery.create_temp_table(
73-
session.bqclient, session._temp_storage_manager._random_table(), expiration
74+
session.bqclient,
75+
session._temp_storage_manager.allocate_temp_table(),
76+
expiration,
7477
)
7578
full_id_2 = bigframes.session._io.bigquery.create_temp_table(
76-
session.bqclient, session._temp_storage_manager._random_table(), expiration
79+
session.bqclient,
80+
session._temp_storage_manager.allocate_temp_table(),
81+
expiration,
7782
)
7883

7984
# check that the tables were actually created
@@ -106,10 +111,14 @@ def test_clean_up_by_session_id():
106111
+ bigframes.constants.DEFAULT_EXPIRATION
107112
)
108113
bigframes.session._io.bigquery.create_temp_table(
109-
session.bqclient, session._temp_storage_manager._random_table(), expiration
114+
session.bqclient,
115+
session._temp_storage_manager.allocate_temp_table(),
116+
expiration,
110117
)
111118
bigframes.session._io.bigquery.create_temp_table(
112-
session.bqclient, session._temp_storage_manager._random_table(), expiration
119+
session.bqclient,
120+
session._temp_storage_manager.allocate_temp_table(),
121+
expiration,
113122
)
114123

115124
# check that some table exists with the expected session_id
@@ -148,15 +157,11 @@ def test_clean_up_via_context_manager(session_creator):
148157
with session_creator() as session:
149158
bqclient = session.bqclient
150159

151-
expiration = (
152-
datetime.datetime.now(datetime.timezone.utc)
153-
+ bigframes.constants.DEFAULT_EXPIRATION
160+
full_id_1 = session._temp_storage_manager.allocate_and_create_temp_table(
161+
[bigquery.SchemaField("a", "INT64")], cluster_cols=[]
154162
)
155-
full_id_1 = bigframes.session._io.bigquery.create_temp_table(
156-
session.bqclient, session._temp_storage_manager._random_table(), expiration
157-
)
158-
full_id_2 = bigframes.session._io.bigquery.create_temp_table(
159-
session.bqclient, session._temp_storage_manager._random_table(), expiration
163+
full_id_2 = session._temp_storage_manager.allocate_and_create_temp_table(
164+
[bigquery.SchemaField("b", "STRING")], cluster_cols=["b"]
160165
)
161166

162167
# check that the tables were actually created

tests/system/small/test_encryption.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def test_session_load_job(bq_cmek, session_with_bq_cmek):
8989
pytest.skip("no cmek set for testing") # pragma: NO COVER
9090

9191
# Session should have cmek set in the default query and load job configs
92-
load_table = session_with_bq_cmek._temp_storage_manager._random_table()
92+
load_table = session_with_bq_cmek._temp_storage_manager.allocate_temp_table()
9393

9494
df = pandas.DataFrame({"col0": [1, 2, 3]})
9595
load_job_config = bigquery.LoadJobConfig()
@@ -194,7 +194,7 @@ def test_to_gbq(bq_cmek, session_with_bq_cmek, scalars_table_id):
194194

195195
# Write the result to BQ custom table and assert encryption
196196
session_with_bq_cmek.bqclient.get_table(output_table_id)
197-
output_table_ref = session_with_bq_cmek._temp_storage_manager._random_table()
197+
output_table_ref = session_with_bq_cmek._temp_storage_manager.allocate_temp_table()
198198
output_table_id = str(output_table_ref)
199199
df.to_gbq(output_table_id)
200200
output_table = session_with_bq_cmek.bqclient.get_table(output_table_id)

0 commit comments

Comments
 (0)