Skip to content

Commit 0a9a085

Browse files
Merge branch 'main' into geo_window_part
2 parents 9401370 + 2818ab9 commit 0a9a085

File tree

15 files changed

+451
-91
lines changed

15 files changed

+451
-91
lines changed

bigframes/clients.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,24 @@ def create_bq_connection(
9494
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#grant_permission_on_function
9595
self._ensure_iam_binding(project_id, service_account_id, iam_role)
9696

97-
# Introduce retries to accommodate transient errors like etag mismatch,
98-
# which can be caused by concurrent operation on the same resource, and
99-
# manifests with message like:
100-
# google.api_core.exceptions.Aborted: 409 There were concurrent policy
101-
# changes. Please retry the whole read-modify-write with exponential
102-
# backoff. The request's ETag '\007\006\003,\264\304\337\272' did not match
103-
# the current policy's ETag '\007\006\003,\3750&\363'.
97+
# Introduce retries to accommodate transient errors like:
98+
# (1) Etag mismatch,
99+
# which can be caused by concurrent operation on the same resource, and
100+
# manifests with message like:
101+
# google.api_core.exceptions.Aborted: 409 There were concurrent policy
102+
# changes. Please retry the whole read-modify-write with exponential
103+
# backoff. The request's ETag '\007\006\003,\264\304\337\272' did not
104+
# match the current policy's ETag '\007\006\003,\3750&\363'.
105+
# (2) Connection creation,
106+
# for which sometimes it takes a bit for its service account to reflect
107+
# across APIs (e.g. b/397662004, b/386838767), before which, an attempt
108+
# to set an IAM policy for the service account may throw an error like:
109+
# google.api_core.exceptions.InvalidArgument: 400 Service account
110+
# bqcx-*@gcp-sa-bigquery-condel.iam.gserviceaccount.com does not exist.
104111
@google.api_core.retry.Retry(
105112
predicate=google.api_core.retry.if_exception_type(
106-
google.api_core.exceptions.Aborted
113+
google.api_core.exceptions.Aborted,
114+
google.api_core.exceptions.InvalidArgument,
107115
),
108116
initial=10,
109117
maximum=20,

bigframes/core/blocks.py

Lines changed: 101 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from __future__ import annotations
2323

2424
import ast
25+
import copy
2526
import dataclasses
2627
import datetime
2728
import functools
@@ -30,6 +31,7 @@
3031
import textwrap
3132
import typing
3233
from typing import (
34+
Any,
3335
Iterable,
3436
List,
3537
Literal,
@@ -49,7 +51,7 @@
4951
import pyarrow as pa
5052

5153
from bigframes import session
52-
import bigframes._config.sampling_options as sampling_options
54+
from bigframes._config import sampling_options
5355
import bigframes.constants
5456
import bigframes.core as core
5557
import bigframes.core.compile.googlesql as googlesql
@@ -535,19 +537,9 @@ def to_pandas(
535537
Returns:
536538
pandas.DataFrame, QueryJob
537539
"""
538-
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
539-
raise NotImplementedError(
540-
f"The downsampling method {sampling_method} is not implemented, "
541-
f"please choose from {','.join(_SAMPLING_METHODS)}."
542-
)
543-
544-
sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
545-
if sampling_method is not None:
546-
sampling = sampling.with_method(sampling_method).with_random_state( # type: ignore
547-
random_state
548-
)
549-
else:
550-
sampling = sampling.with_disabled()
540+
sampling = self._get_sampling_option(
541+
max_download_size, sampling_method, random_state
542+
)
551543

552544
df, query_job = self._materialize_local(
553545
materialize_options=MaterializationOptions(
@@ -559,6 +551,27 @@ def to_pandas(
559551
df.set_axis(self.column_labels, axis=1, copy=False)
560552
return df, query_job
561553

554+
def _get_sampling_option(
555+
self,
556+
max_download_size: Optional[int] = None,
557+
sampling_method: Optional[str] = None,
558+
random_state: Optional[int] = None,
559+
) -> sampling_options.SamplingOptions:
560+
561+
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
562+
raise NotImplementedError(
563+
f"The downsampling method {sampling_method} is not implemented, "
564+
f"please choose from {','.join(_SAMPLING_METHODS)}."
565+
)
566+
567+
sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
568+
if sampling_method is None:
569+
return sampling.with_disabled()
570+
571+
return sampling.with_method(sampling_method).with_random_state( # type: ignore
572+
random_state
573+
)
574+
562575
def try_peek(
563576
self, n: int = 20, force: bool = False, allow_large_results=None
564577
) -> typing.Optional[pd.DataFrame]:
@@ -798,11 +811,73 @@ def split(
798811
return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks]
799812

800813
def _compute_dry_run(
801-
self, value_keys: Optional[Iterable[str]] = None
802-
) -> bigquery.QueryJob:
814+
self,
815+
value_keys: Optional[Iterable[str]] = None,
816+
*,
817+
ordered: bool = True,
818+
max_download_size: Optional[int] = None,
819+
sampling_method: Optional[str] = None,
820+
random_state: Optional[int] = None,
821+
) -> typing.Tuple[pd.Series, bigquery.QueryJob]:
822+
sampling = self._get_sampling_option(
823+
max_download_size, sampling_method, random_state
824+
)
825+
if sampling.enable_downsampling:
826+
raise NotImplementedError("Dry run with sampling is not supported")
827+
828+
index: List[Any] = []
829+
values: List[Any] = []
830+
831+
index.append("columnCount")
832+
values.append(len(self.value_columns))
833+
index.append("columnDtypes")
834+
values.append(
835+
{
836+
col: self.expr.get_column_type(self.resolve_label_exact_or_error(col))
837+
for col in self.column_labels
838+
}
839+
)
840+
841+
index.append("indexLevel")
842+
values.append(self.index.nlevels)
843+
index.append("indexDtypes")
844+
values.append(self.index.dtypes)
845+
803846
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
804-
query_job = self.session._executor.dry_run(expr)
805-
return query_job
847+
query_job = self.session._executor.dry_run(expr, ordered)
848+
job_api_repr = copy.deepcopy(query_job._properties)
849+
850+
job_ref = job_api_repr["jobReference"]
851+
for key, val in job_ref.items():
852+
index.append(key)
853+
values.append(val)
854+
855+
index.append("jobType")
856+
values.append(job_api_repr["configuration"]["jobType"])
857+
858+
query_config = job_api_repr["configuration"]["query"]
859+
for key in ("destinationTable", "useLegacySql"):
860+
index.append(key)
861+
values.append(query_config.get(key))
862+
863+
query_stats = job_api_repr["statistics"]["query"]
864+
for key in (
865+
"referencedTables",
866+
"totalBytesProcessed",
867+
"cacheHit",
868+
"statementType",
869+
):
870+
index.append(key)
871+
values.append(query_stats.get(key))
872+
873+
index.append("creationTime")
874+
values.append(
875+
pd.Timestamp(
876+
job_api_repr["statistics"]["creationTime"], unit="ms", tz="UTC"
877+
)
878+
)
879+
880+
return pd.Series(values, index=index), query_job
806881

807882
def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
808883
expr = self._expr
@@ -2703,11 +2778,18 @@ def to_pandas(
27032778
"Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
27042779
)
27052780
ordered = ordered if ordered is not None else True
2781+
27062782
df, query_job = self._block.select_columns([]).to_pandas(
2707-
ordered=ordered, allow_large_results=allow_large_results
2783+
ordered=ordered,
2784+
allow_large_results=allow_large_results,
27082785
)
27092786
return df.index, query_job
27102787

2788+
def _compute_dry_run(
2789+
self, *, ordered: bool = True
2790+
) -> Tuple[pd.Series, bigquery.QueryJob]:
2791+
return self._block.select_columns([])._compute_dry_run(ordered=ordered)
2792+
27112793
def resolve_level(self, level: LevelsType) -> typing.Sequence[str]:
27122794
if utils.is_list_like(level):
27132795
levels = list(level)

bigframes/core/indexers.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import bigframes.core.guid as guid
2828
import bigframes.core.indexes as indexes
2929
import bigframes.core.scalar
30+
import bigframes.core.window_spec as windows
3031
import bigframes.dataframe
3132
import bigframes.dtypes
3233
import bigframes.exceptions as bfe
@@ -477,6 +478,19 @@ def _iloc_getitem_series_or_dataframe(
477478
Union[bigframes.dataframe.DataFrame, bigframes.series.Series],
478479
series_or_dataframe.iloc[0:0],
479480
)
481+
482+
# Check if both positive index and negative index are necessary
483+
if isinstance(key, (bigframes.series.Series, indexes.Index)):
484+
# Avoid data download
485+
is_key_unisigned = False
486+
else:
487+
first_sign = key[0] >= 0
488+
is_key_unisigned = True
489+
for k in key:
490+
if (k >= 0) != first_sign:
491+
is_key_unisigned = False
492+
break
493+
480494
if isinstance(series_or_dataframe, bigframes.series.Series):
481495
original_series_name = series_or_dataframe.name
482496
series_name = (
@@ -497,7 +511,27 @@ def _iloc_getitem_series_or_dataframe(
497511
block = df._block
498512
# explicitly set index to offsets, reset_index may not generate offsets in some modes
499513
block, offsets_id = block.promote_offsets("temp_iloc_offsets_")
500-
block = block.set_index([offsets_id])
514+
pos_block = block.set_index([offsets_id])
515+
516+
if not is_key_unisigned or key[0] < 0:
517+
neg_block, size_col_id = block.apply_window_op(
518+
offsets_id,
519+
ops.aggregations.SizeUnaryOp(),
520+
window_spec=windows.rows(),
521+
)
522+
neg_block, neg_index_id = neg_block.apply_binary_op(
523+
offsets_id, size_col_id, ops.SubOp()
524+
)
525+
526+
neg_block = neg_block.set_index([neg_index_id]).drop_columns(
527+
[size_col_id, offsets_id]
528+
)
529+
530+
if is_key_unisigned:
531+
block = pos_block if key[0] >= 0 else neg_block
532+
else:
533+
block = pos_block.concat([neg_block], how="inner")
534+
501535
df = bigframes.dataframe.DataFrame(block)
502536

503537
result = df.loc[key]

bigframes/core/indexes/base.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from __future__ import annotations
1818

1919
import typing
20-
from typing import Hashable, Literal, Optional, Sequence, Union
20+
from typing import Hashable, Literal, Optional, overload, Sequence, Union
2121

2222
import bigframes_vendored.constants as constants
2323
import bigframes_vendored.pandas.core.indexes.base as vendored_pandas_index
@@ -228,15 +228,16 @@ def T(self) -> Index:
228228
return self.transpose()
229229

230230
@property
231-
def query_job(self) -> Optional[bigquery.QueryJob]:
231+
def query_job(self) -> bigquery.QueryJob:
232232
"""BigQuery job metadata for the most recent query.
233233
234234
Returns:
235235
The most recent `QueryJob
236236
<https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob>`_.
237237
"""
238238
if self._query_job is None:
239-
self._query_job = self._block._compute_dry_run()
239+
_, query_job = self._block._compute_dry_run()
240+
self._query_job = query_job
240241
return self._query_job
241242

242243
def __repr__(self) -> str:
@@ -252,7 +253,8 @@ def __repr__(self) -> str:
252253
opts = bigframes.options.display
253254
max_results = opts.max_rows
254255
if opts.repr_mode == "deferred":
255-
return formatter.repr_query_job(self._block._compute_dry_run())
256+
_, dry_run_query_job = self._block._compute_dry_run()
257+
return formatter.repr_query_job(dry_run_query_job)
256258

257259
pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results)
258260
self._query_job = query_job
@@ -490,18 +492,46 @@ def __getitem__(self, key: int) -> typing.Any:
490492
else:
491493
raise NotImplementedError(f"Index key not supported {key}")
492494

493-
def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Index:
495+
@overload
496+
def to_pandas( # type: ignore[overload-overlap]
497+
self,
498+
*,
499+
allow_large_results: Optional[bool] = ...,
500+
dry_run: Literal[False] = ...,
501+
) -> pandas.Index:
502+
...
503+
504+
@overload
505+
def to_pandas(
506+
self, *, allow_large_results: Optional[bool] = ..., dry_run: Literal[True] = ...
507+
) -> pandas.Series:
508+
...
509+
510+
def to_pandas(
511+
self, *, allow_large_results: Optional[bool] = None, dry_run: bool = False
512+
) -> pandas.Index | pandas.Series:
494513
"""Gets the Index as a pandas Index.
495514
496515
Args:
497516
allow_large_results (bool, default None):
498517
If not None, overrides the global setting to allow or disallow large query results
499518
over the default size limit of 10 GB.
519+
dry_run (bool, default False):
520+
If this argument is true, this method will not process the data. Instead, it returns
521+
a Pandas series containing dtype and the amount of bytes to be processed.
500522
501523
Returns:
502-
pandas.Index:
503-
A pandas Index with all of the labels from this Index.
524+
pandas.Index | pandas.Series:
525+
A pandas Index with all of the labels from this Index. If dry run is set to True,
526+
returns a Series containing dry run statistics.
504527
"""
528+
if dry_run:
529+
dry_run_stats, dry_run_job = self._block.index._compute_dry_run(
530+
ordered=True
531+
)
532+
self._query_job = dry_run_job
533+
return dry_run_stats
534+
505535
df, query_job = self._block.index.to_pandas(
506536
ordered=True, allow_large_results=allow_large_results
507537
)

0 commit comments

Comments
 (0)