From 75a7cb819eb10060ecce46b65debb681392fdf2b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 5 Jan 2022 14:36:33 -0600 Subject: [PATCH 1/5] fix: use data project for destination in `to_gbq` --- pandas_gbq/gbq.py | 11 +++++++--- pandas_gbq/load.py | 42 ++++++++++++++++++++++++++++++++++----- tests/unit/test_to_gbq.py | 40 +++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 8 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 0a18cc3a..931f6add 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -576,6 +576,7 @@ def load_data( schema=None, progress_bar=True, api_method: str = "load_parquet", + billing_project: Optional[str] = None, ): from pandas_gbq import load @@ -590,6 +591,7 @@ def load_data( schema=schema, location=self.location, api_method=api_method, + billing_project=billing_project, ) if progress_bar and tqdm: chunks = tqdm.tqdm(chunks) @@ -602,8 +604,8 @@ def load_data( except self.http_error as ex: self.process_http_error(ex) - def delete_and_recreate_table(self, dataset_id, table_id, table_schema): - table = _Table(self.project_id, dataset_id, credentials=self.credentials) + def delete_and_recreate_table(self, project_id, dataset_id, table_id, table_schema): + table = _Table(project_id, dataset_id, credentials=self.credentials) table.delete(table_id) table.create(table_id, table_schema) @@ -1127,7 +1129,9 @@ def to_gbq( "'append' or 'replace' data." ) elif if_exists == "replace": - connector.delete_and_recreate_table(dataset_id, table_id, table_schema) + connector.delete_and_recreate_table( + project_id_table, dataset_id, table_id, table_schema + ) else: if not pandas_gbq.schema.schema_is_subset(original_schema, table_schema): raise InvalidSchema( @@ -1156,6 +1160,7 @@ def to_gbq( schema=table_schema, progress_bar=progress_bar, api_method=api_method, + billing_project=project_id, ) diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index 588a6719..e52952f2 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -114,6 +114,7 @@ def load_parquet( destination_table_ref: bigquery.TableReference, location: Optional[str], schema: Optional[Dict[str, Any]], + billing_project: Optional[str] = None, ): job_config = bigquery.LoadJobConfig() job_config.write_disposition = "WRITE_APPEND" @@ -126,7 +127,11 @@ def load_parquet( try: client.load_table_from_dataframe( - dataframe, destination_table_ref, job_config=job_config, location=location, + dataframe, + destination_table_ref, + job_config=job_config, + location=location, + project=billing_project, ).result() except pyarrow.lib.ArrowInvalid as exc: raise exceptions.ConversionError( @@ -162,6 +167,7 @@ def load_csv_from_dataframe( location: Optional[str], chunksize: Optional[int], schema: Optional[Dict[str, Any]], + billing_project: Optional[str] = None, ): bq_schema = None @@ -171,7 +177,11 @@ def load_csv_from_dataframe( def load_chunk(chunk, job_config): client.load_table_from_dataframe( - chunk, destination_table_ref, job_config=job_config, location=location, + chunk, + destination_table_ref, + job_config=job_config, + location=location, + project=billing_project, ).result() return load_csv(dataframe, chunksize, bq_schema, load_chunk) @@ -184,6 +194,7 @@ def load_csv_from_file( location: Optional[str], chunksize: Optional[int], schema: Optional[Dict[str, Any]], + billing_project: Optional[str] = None, ): """Manually encode a DataFrame to CSV and use the buffer in a load job. @@ -204,6 +215,7 @@ def load_chunk(chunk, job_config): destination_table_ref, job_config=job_config, location=location, + project=billing_project, ).result() finally: chunk_buffer.close() @@ -219,19 +231,39 @@ def load_chunks( schema=None, location=None, api_method="load_parquet", + billing_project: Optional[str] = None, ): if api_method == "load_parquet": - load_parquet(client, dataframe, destination_table_ref, location, schema) + load_parquet( + client, + dataframe, + destination_table_ref, + location, + schema, + billing_project=billing_project, + ) # TODO: yield progress depending on result() with timeout return [0] elif api_method == "load_csv": if FEATURES.bigquery_has_from_dataframe_with_csv: return load_csv_from_dataframe( - client, dataframe, destination_table_ref, location, chunksize, schema + client, + dataframe, + destination_table_ref, + location, + chunksize, + schema, + billing_project=billing_project, ) else: return load_csv_from_file( - client, dataframe, destination_table_ref, location, chunksize, schema + client, + dataframe, + destination_table_ref, + location, + chunksize, + schema, + billing_project=billing_project, ) else: raise ValueError( diff --git a/tests/unit/test_to_gbq.py b/tests/unit/test_to_gbq.py index e488bdb5..d74cba12 100644 --- a/tests/unit/test_to_gbq.py +++ b/tests/unit/test_to_gbq.py @@ -112,6 +112,46 @@ def test_to_gbq_with_if_exists_replace(mock_bigquery_client): assert mock_bigquery_client.create_table.called +def test_to_gbq_with_if_exists_replace_cross_project( + mock_bigquery_client, expected_load_method +): + mock_bigquery_client.get_table.side_effect = ( + # Initial check + google.cloud.bigquery.Table("data-project.my_dataset.my_table"), + # Recreate check + google.api_core.exceptions.NotFound("my_table"), + ) + gbq.to_gbq( + DataFrame([[1]]), + "data-project.my_dataset.my_table", + project_id="billing-project", + if_exists="replace", + ) + # TODO: We can avoid these API calls by using write disposition in the load + # job. See: https://github.com/googleapis/python-bigquery-pandas/issues/118 + assert mock_bigquery_client.delete_table.called + args, _ = mock_bigquery_client.delete_table.call_args + table_delete: google.cloud.bigquery.TableReference = args[0] + assert table_delete.project == "data-project" + assert table_delete.dataset_id == "my_dataset" + assert table_delete.table_id == "my_table" + assert mock_bigquery_client.create_table.called + args, _ = mock_bigquery_client.create_table.call_args + table_create: google.cloud.bigquery.TableReference = args[0] + assert table_create.project == "data-project" + assert table_create.dataset_id == "my_dataset" + assert table_create.table_id == "my_table" + + # Check that billing project and destination table is set correctly. + expected_load_method.assert_called_once() + load_args, load_kwargs = expected_load_method.call_args + table_destination = load_args[1] + assert table_destination.project == "data-project" + assert table_destination.dataset_id == "my_dataset" + assert table_destination.table_id == "my_table" + assert load_kwargs["project"] == "billing-project" + + def test_to_gbq_with_if_exists_unknown(): with pytest.raises(ValueError): gbq.to_gbq( From c5b1864c41b8ac982cff2e1bbb31708c7f2ec486 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 5 Jan 2022 14:40:04 -0600 Subject: [PATCH 2/5] bump coverage --- owlbot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/owlbot.py b/owlbot.py index 62c9f3c4..6333f7e5 100644 --- a/owlbot.py +++ b/owlbot.py @@ -33,7 +33,7 @@ templated_files = common.py_library( unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"], system_test_python_versions=["3.7", "3.8", "3.9", "3.10"], - cov_level=94, + cov_level=96, unit_test_extras=extras, system_test_extras=extras, intersphinx_dependencies={ From c1561366f984a71de50ca236cce66e5c6bb99e8b Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 5 Jan 2022 20:42:16 +0000 Subject: [PATCH 3/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .coveragerc | 2 +- noxfile.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.coveragerc b/.coveragerc index 0a3b1cea..d6261761 100644 --- a/.coveragerc +++ b/.coveragerc @@ -22,7 +22,7 @@ omit = google/cloud/__init__.py [report] -fail_under = 94 +fail_under = 96 show_missing = True exclude_lines = # Re-enable the standard pragma diff --git a/noxfile.py b/noxfile.py index 5e41983b..042c1303 100644 --- a/noxfile.py +++ b/noxfile.py @@ -259,7 +259,7 @@ def cover(session): test runs (not system test runs), and then erases coverage data. """ session.install("coverage", "pytest-cov") - session.run("coverage", "report", "--show-missing", "--fail-under=94") + session.run("coverage", "report", "--show-missing", "--fail-under=96") session.run("coverage", "erase") From 98832d07021a7ab6163ae7f4bf9b13468f3964cd Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 5 Jan 2022 20:43:14 +0000 Subject: [PATCH 4/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .coveragerc | 2 +- noxfile.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.coveragerc b/.coveragerc index 0a3b1cea..d6261761 100644 --- a/.coveragerc +++ b/.coveragerc @@ -22,7 +22,7 @@ omit = google/cloud/__init__.py [report] -fail_under = 94 +fail_under = 96 show_missing = True exclude_lines = # Re-enable the standard pragma diff --git a/noxfile.py b/noxfile.py index 5e41983b..042c1303 100644 --- a/noxfile.py +++ b/noxfile.py @@ -259,7 +259,7 @@ def cover(session): test runs (not system test runs), and then erases coverage data. """ session.install("coverage", "pytest-cov") - session.run("coverage", "report", "--show-missing", "--fail-under=94") + session.run("coverage", "report", "--show-missing", "--fail-under=96") session.run("coverage", "erase") From 402904f854a42c1a3bb052a989567e8ed28b5f8c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 5 Jan 2022 16:38:15 -0600 Subject: [PATCH 5/5] improve test coverage --- pandas_gbq/features.py | 22 +--------------------- pandas_gbq/gbq.py | 36 ++++++------------------------------ tests/unit/test_features.py | 20 +++++++++++++------- tests/unit/test_gbq.py | 20 -------------------- tests/unit/test_to_gbq.py | 19 +++++++++++++++++++ 5 files changed, 39 insertions(+), 78 deletions(-) diff --git a/pandas_gbq/features.py b/pandas_gbq/features.py index 77535041..ad20c640 100644 --- a/pandas_gbq/features.py +++ b/pandas_gbq/features.py @@ -5,9 +5,7 @@ """Module for checking dependency versions and supported features.""" # https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md -BIGQUERY_MINIMUM_VERSION = "1.11.1" -BIGQUERY_CLIENT_INFO_VERSION = "1.12.0" -BIGQUERY_BQSTORAGE_VERSION = "1.24.0" +BIGQUERY_MINIMUM_VERSION = "1.27.2" BIGQUERY_ACCURATE_TIMESTAMP_VERSION = "2.6.0" BIGQUERY_FROM_DATAFRAME_CSV_VERSION = "2.6.0" BIGQUERY_SUPPORTS_BIGNUMERIC_VERSION = "2.10.0" @@ -52,15 +50,6 @@ def bigquery_has_accurate_timestamp(self): min_version = pkg_resources.parse_version(BIGQUERY_ACCURATE_TIMESTAMP_VERSION) return self.bigquery_installed_version >= min_version - @property - def bigquery_has_client_info(self): - import pkg_resources - - bigquery_client_info_version = pkg_resources.parse_version( - BIGQUERY_CLIENT_INFO_VERSION - ) - return self.bigquery_installed_version >= bigquery_client_info_version - @property def bigquery_has_bignumeric(self): import pkg_resources @@ -68,15 +57,6 @@ def bigquery_has_bignumeric(self): min_version = pkg_resources.parse_version(BIGQUERY_SUPPORTS_BIGNUMERIC_VERSION) return self.bigquery_installed_version >= min_version - @property - def bigquery_has_bqstorage(self): - import pkg_resources - - bigquery_bqstorage_version = pkg_resources.parse_version( - BIGQUERY_BQSTORAGE_VERSION - ) - return self.bigquery_installed_version >= bigquery_bqstorage_version - @property def bigquery_has_from_dataframe_with_csv(self): import pkg_resources diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index c20a54c2..b1659257 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -30,9 +30,7 @@ from pandas_gbq.exceptions import ( AccessDenied, GenericGBQException, - PerformanceWarning, ) -from pandas_gbq import features from pandas_gbq.features import FEATURES import pandas_gbq.schema import pandas_gbq.timestamp @@ -377,18 +375,11 @@ def get_client(self): client_info = google.api_core.client_info.ClientInfo( user_agent="pandas-{}".format(pandas.__version__) ) - - # In addition to new enough version of google-api-core, a new enough - # version of google-cloud-bigquery is required to populate the - # client_info. - if FEATURES.bigquery_has_client_info: - return bigquery.Client( - project=self.project_id, - credentials=self.credentials, - client_info=client_info, - ) - - return bigquery.Client(project=self.project_id, credentials=self.credentials) + return bigquery.Client( + project=self.project_id, + credentials=self.credentials, + client_info=client_info, + ) @staticmethod def process_http_error(ex): @@ -529,27 +520,11 @@ def _download_results( if user_dtypes is None: user_dtypes = {} - if self.use_bqstorage_api and not FEATURES.bigquery_has_bqstorage: - warnings.warn( - ( - "use_bqstorage_api was set, but have google-cloud-bigquery " - "version {}. Requires google-cloud-bigquery version " - "{} or later." - ).format( - FEATURES.bigquery_installed_version, - features.BIGQUERY_BQSTORAGE_VERSION, - ), - PerformanceWarning, - stacklevel=4, - ) - create_bqstorage_client = self.use_bqstorage_api if max_results is not None: create_bqstorage_client = False to_dataframe_kwargs = {} - if FEATURES.bigquery_has_bqstorage: - to_dataframe_kwargs["create_bqstorage_client"] = create_bqstorage_client if FEATURES.bigquery_needs_date_as_object: to_dataframe_kwargs["date_as_object"] = True @@ -560,6 +535,7 @@ def _download_results( df = rows_iter.to_dataframe( dtypes=conversion_dtypes, progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, **to_dataframe_kwargs, ) except self.http_error as ex: diff --git a/tests/unit/test_features.py b/tests/unit/test_features.py index c810104f..bfe2ea9b 100644 --- a/tests/unit/test_features.py +++ b/tests/unit/test_features.py @@ -16,8 +16,8 @@ def fresh_bigquery_version(monkeypatch): @pytest.mark.parametrize( ["bigquery_version", "expected"], [ - ("1.11.1", False), - ("1.26.0", False), + ("1.27.2", False), + ("1.99.100", False), ("2.5.4", False), ("2.6.0", True), ("2.6.1", True), @@ -34,8 +34,8 @@ def test_bigquery_has_accurate_timestamp(monkeypatch, bigquery_version, expected @pytest.mark.parametrize( ["bigquery_version", "expected"], [ - ("1.11.1", False), - ("1.26.0", False), + ("1.27.2", False), + ("1.99.100", False), ("2.9.999", False), ("2.10.0", True), ("2.12.0", True), @@ -52,8 +52,8 @@ def test_bigquery_has_bignumeric(monkeypatch, bigquery_version, expected): @pytest.mark.parametrize( ["bigquery_version", "expected"], [ - ("1.11.1", False), - ("1.26.0", False), + ("1.27.2", False), + ("1.99.100", False), ("2.5.4", False), ("2.6.0", True), ("2.6.1", True), @@ -69,7 +69,13 @@ def test_bigquery_has_from_dataframe_with_csv(monkeypatch, bigquery_version, exp @pytest.mark.parametrize( ["bigquery_version", "expected"], - [("1.26.0", True), ("2.12.0", True), ("3.0.0", False), ("3.1.0", False)], + [ + ("1.27.2", True), + ("1.99.100", True), + ("2.12.0", True), + ("3.0.0", False), + ("3.1.0", False), + ], ) def test_bigquery_needs_date_as_object(monkeypatch, bigquery_version, expected): import google.cloud.bigquery diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index 9748595f..a3cd9e0c 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -109,25 +109,8 @@ def test__is_query(query_or_table, expected): assert result == expected -def test_GbqConnector_get_client_w_old_bq(monkeypatch, mock_bigquery_client): - gbq._test_google_api_imports() - connector = _make_connector() - monkeypatch.setattr( - type(FEATURES), - "bigquery_has_client_info", - mock.PropertyMock(return_value=False), - ) - - connector.get_client() - - # No client_info argument. - mock_bigquery_client.assert_called_with(credentials=mock.ANY, project=mock.ANY) - - def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client): gbq._test_google_api_imports() - if not FEATURES.bigquery_has_client_info: - pytest.skip("google-cloud-bigquery missing client_info feature") pytest.importorskip("google.api_core.client_info") connector = _make_connector() @@ -606,9 +589,6 @@ def test_read_gbq_passes_dtypes(mock_bigquery_client, mock_service_account_crede def test_read_gbq_use_bqstorage_api( mock_bigquery_client, mock_service_account_credentials ): - if not FEATURES.bigquery_has_bqstorage: # pragma: NO COVER - pytest.skip("requires BigQuery Storage API") - mock_service_account_credentials.project_id = "service_account_project_id" df = gbq.read_gbq( "SELECT 1 AS int_col", diff --git a/tests/unit/test_to_gbq.py b/tests/unit/test_to_gbq.py index d74cba12..a2fa800c 100644 --- a/tests/unit/test_to_gbq.py +++ b/tests/unit/test_to_gbq.py @@ -49,6 +49,25 @@ def test_to_gbq_create_dataset_translates_exception(mock_bigquery_client): gbq.to_gbq(DataFrame([[1]]), "my_dataset.my_table", project_id="1234") +def test_to_gbq_load_method_translates_exception( + mock_bigquery_client, expected_load_method +): + mock_bigquery_client.get_table.side_effect = google.api_core.exceptions.NotFound( + "my_table" + ) + expected_load_method.side_effect = google.api_core.exceptions.InternalServerError( + "error loading data" + ) + + with pytest.raises(gbq.GenericGBQException): + gbq.to_gbq( + DataFrame({"int_cole": [1, 2, 3]}), + "my_dataset.my_table", + project_id="myproj", + ) + expected_load_method.assert_called_once() + + def test_to_gbq_with_if_exists_append(mock_bigquery_client, expected_load_method): from google.cloud.bigquery import SchemaField