From 2871690c21262ac57953c2927ac591a32ae8529c Mon Sep 17 00:00:00 2001 From: Paolo Burelli Date: Fri, 25 Aug 2017 11:28:57 +0200 Subject: [PATCH 1/7] ENH: Added support for partitioned table creation and data insertion (#43) --- pandas_gbq/gbq.py | 159 +++++++++++++++++++++++++++---- pandas_gbq/tests/test_gbq.py | 179 +++++++++++++++++++++++++++++++++++ 2 files changed, 317 insertions(+), 21 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 0c538662..e5cf1606 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -5,6 +5,7 @@ import uuid import time import sys +from random import randint import numpy as np @@ -674,6 +675,38 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize): self._print("\n") + def resource(self, dataset_id, table_id): + """Retrieve the resource describing a table + + Obtain from BigQuery complete description + for the table defined by the parameters + + Parameters + ---------- + dataset_id : str + Name of the BigQuery dataset for the table + table_id : str + Name of the BigQuery table + + Returns + ------- + object + Table resource + """ + + try: + from googleapiclient.errors import HttpError + except: + from apiclient.errors import HttpError + + try: + return self.service.tables().get( + projectId=self.project_id, + datasetId=dataset_id, + tableId=table_id).execute() + except HttpError as ex: + self.process_http_error(ex) + def schema(self, dataset_id, table_id): """Retrieve the schema of the table @@ -1046,27 +1079,89 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, table_schema = _generate_bq_schema(dataframe) - # If table exists, check if_exists parameter - if table.exists(table_id): - if if_exists == 'fail': - raise TableCreationError("Could not create the table because it " - "already exists. " - "Change the if_exists parameter to " - "append or replace data.") - elif if_exists == 'replace': - connector.delete_and_recreate_table( - dataset_id, table_id, table_schema) - elif if_exists == 'append': - if not connector.schema_is_subset(dataset_id, - table_id, - table_schema): - raise InvalidSchema("Please verify that the structure and " - "data types in the DataFrame match the " - "schema of the destination table.") + if _Table.contains_partition_decorator(table_id): + root_table_id, partition_id = table_id.rsplit('$', 1) + + if not table.exists(root_table_id): + raise NotFoundException("Could not write to the partition because " + "the table does not exist.") + + table_resource = table.resource(dataset_id, root_table_id) + + if 'timePartitioning' not in table_resource: + raise InvalidSchema("Could not write to the partition because " + "the table is not partitioned.") + + partition_exists = read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(destination_table), + project_id=project_id, + private_key=private_key)['num_rows'][0] > 0 + + if partition_exists: + if if_exists == 'fail': + raise TableCreationError("Could not create the partition because it " + "already exists. " + "Change the if_exists parameter to " + "append or replace data.") + elif if_exists == 'append': + if not connector.schema_is_subset(dataset_id, + root_table_id, + table_schema): + raise InvalidSchema("Please verify that the structure and " + "data types in the DataFrame match the " + "schema of the destination table.") + connector.load_data(dataframe, dataset_id, table_id, chunksize) + + elif if_exists == 'replace': + if not connector.schema_is_subset(dataset_id, + root_table_id, + table_schema): + raise InvalidSchema("Please verify that the structure and " + "data types in the DataFrame match the " + "schema of the destination table.") + + temporary_table_id = '_'.join([root_table_id + '_' + partition_id, str(randint(1, 100000))]) + table.create(temporary_table_id, table_schema) + connector.load_data(dataframe, dataset_id, temporary_table_id, chunksize) + sleep(30) # <- Curses Google!!! + connector.run_query('select * from {0}.{1}'.format(dataset_id, temporary_table_id), configuration={ + 'query': { + 'destinationTable': { + 'projectId': project_id, + 'datasetId': dataset_id, + 'tableId': table_id + }, + 'createDisposition': 'CREATE_IF_NEEDED', + 'writeDisposition': 'WRITE_TRUNCATE', + 'allowLargeResults': True + } + }) + table.delete(temporary_table_id) + + else: + connector.load_data(dataframe, dataset_id, table_id, chunksize) + else: - table.create(table_id, table_schema) + if table.exists(table_id): + if if_exists == 'fail': + raise TableCreationError("Could not create the table because it " + "already exists. " + "Change the if_exists parameter to " + "append or replace data.") + elif if_exists == 'replace': + connector.delete_and_recreate_table( + dataset_id, table_id, table_schema) + elif if_exists == 'append': + if not connector.schema_is_subset(dataset_id, + table_id, + table_schema): + raise InvalidSchema("Please verify that the structure and " + "data types in the DataFrame match the " + "schema of the destination table.") + else: + table.create(table_id, table_schema) - connector.load_data(dataframe, dataset_id, table_id, chunksize) + connector.load_data(dataframe, dataset_id, table_id, chunksize) def generate_bq_schema(df, default_type='STRING'): @@ -1132,11 +1227,16 @@ def exists(self, table_id): true if table exists, otherwise false """ + if _Table.contains_partition_decorator(table_id): + root_table_id, partition_id = table_id.rsplit('$', 1) + else: + root_table_id = table_id + try: self.service.tables().get( projectId=self.project_id, datasetId=self.dataset_id, - tableId=table_id).execute() + tableId=root_table_id).execute() return True except self.http_error as ex: if ex.resp.status == 404: @@ -1144,7 +1244,7 @@ def exists(self, table_id): else: self.process_http_error(ex) - def create(self, table_id, schema): + def create(self, table_id, schema, **kwargs): """ Create a table in Google BigQuery given a table and schema Parameters @@ -1154,6 +1254,15 @@ def create(self, table_id, schema): schema : str Use the generate_bq_schema to generate your table schema from a dataframe. + + **kwargs : Arbitrary keyword arguments + body (dict): table creation extra parameters + For example: + + body = {'timePartitioning': {'type': 'DAY'}} + + For more information see `BigQuery SQL Reference + `__ """ if self.exists(table_id): @@ -1174,6 +1283,10 @@ def create(self, table_id, schema): } } + config = kwargs.get('body') + if config is not None: + body.update(config) + try: self.service.tables().insert( projectId=self.project_id, @@ -1204,6 +1317,10 @@ def delete(self, table_id): if ex.resp.status != 404: self.process_http_error(ex) + @staticmethod + def contains_partition_decorator(table_id): + return "$" in table_id + class _Dataset(GbqConnector): diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index c9ac31dd..6c7cfa7c 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1471,3 +1471,182 @@ def test_upload_data(self): project_id=_get_project_id(), private_key=_get_private_key_contents()) assert result['num_rows'][0] == test_size + + +class TestToPartitionGBQIntegrationWithServiceAccountKeyPath(object): + @classmethod + def setup_class(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *BEFORE* + # executing *ALL* tests described below. + + _skip_if_no_project_id() + _skip_if_no_private_key_path() + + _setup_common() + + def setup_method(self, method): + # - PER-TEST FIXTURES - + # put here any instruction you want to be run *BEFORE* *EVERY* test is + # executed. + + self.dataset_prefix = _get_dataset_prefix_random() + clean_gbq_environment(self.dataset_prefix, _get_private_key_path()) + self.dataset = gbq._Dataset(_get_project_id(), + private_key=_get_private_key_path()) + self.table = gbq._Table(_get_project_id(), self.dataset_prefix + "1", + private_key=_get_private_key_path()) + self.sut = gbq.GbqConnector(_get_project_id(), + private_key=_get_private_key_path()) + self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1", + TABLE_ID) + self.dataset.create(self.dataset_prefix + "1") + + @classmethod + def teardown_class(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *AFTER* + # executing all tests. + pass + + def teardown_method(self, method): + # - PER-TEST FIXTURES - + # put here any instructions you want to be run *AFTER* *EVERY* test is + # executed. + clean_gbq_environment(self.dataset_prefix, _get_private_key_path()) + + def test_create_partitioned_table(self): + test_id = "1" + test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, + {'name': 'D', 'type': 'TIMESTAMP'}]} + self.table.create(TABLE_ID + test_id, test_schema, body={'timePartitioning': {'type': 'DAY'}}) + actual = self.sut.resource(self.dataset_prefix + "1", TABLE_ID + test_id) + assert actual['timePartitioning']['type'] == 'DAY' + + def test_upload_data_to_partition(self): + test_id = "2" + test_size = 20001 + df = make_mixed_dataframe_v2(test_size) + + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), + body={'timePartitioning': {'type': 'DAY'}}) + + partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + sleep(30) # <- Curses Google!!! + + result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(partition_name), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result['num_rows'][0] == test_size + + def test_upload_data_to_partition_non_existing_table(self): + test_id = "2" + test_size = 20001 + df = make_mixed_dataframe_v2(test_size) + + partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + + with pytest.raises(gbq.NotFoundException): + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + def test_upload_data_to_partition_non_partitioned_table(self): + test_id = "2" + test_size = 20001 + df = make_mixed_dataframe_v2(test_size) + + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df)) + + partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + + with pytest.raises(gbq.InvalidSchema): + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + def test_upload_data_if_partition_exists_fail(self): + test_id = "3" + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), body={'timePartitioning': {'type': 'DAY'}}) + + partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + # Test the default value of if_exists is 'fail' + with pytest.raises(gbq.TableCreationError): + gbq.to_gbq(df, partition_name, _get_project_id(), + private_key=_get_private_key_path()) + + # Test the if_exists parameter with value 'fail' + with pytest.raises(gbq.TableCreationError): + gbq.to_gbq(df, partition_name, _get_project_id(), + if_exists='fail', private_key=_get_private_key_path()) + + def test_upload_data_if_partition_exists_append(self): + test_id = "3" + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + df_different_schema = tm.makeMixedDataFrame() + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), body={'timePartitioning': {'type': 'DAY'}}) + + partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + # Test the if_exists parameter with value 'append' + gbq.to_gbq(df, partition_name, _get_project_id(), + if_exists='append', private_key=_get_private_key_path()) + + sleep(30) # <- Curses Google!!! + + result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(partition_name), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result['num_rows'][0] == test_size * 2 + + # Try inserting with a different schema, confirm failure + with pytest.raises(gbq.InvalidSchema): + gbq.to_gbq(df_different_schema, partition_name, + _get_project_id(), if_exists='append', + private_key=_get_private_key_path()) + + def test_upload_data_if_partition_exists_replace(self): + test_id = "4" + test_size = 10 + expected_size = 25 + df = make_mixed_dataframe_v2(test_size) + df2 = make_mixed_dataframe_v2(expected_size) + df_different_schema = tm.makeMixedDataFrame() + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), body={'timePartitioning': {'type': 'DAY'}}) + + partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + # Test the if_exists parameter with value 'replace' + gbq.to_gbq(df2, partition_name, _get_project_id(), + if_exists='replace', private_key=_get_private_key_path()) + + result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(partition_name), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result['num_rows'][0] == expected_size + + # Try inserting with a different schema, confirm failure + with pytest.raises(gbq.InvalidSchema): + gbq.to_gbq(df_different_schema, partition_name, + _get_project_id(), if_exists='replace', + private_key=_get_private_key_path()) From dec7b4938c2ca6ede02e673fd2a98c775aa30f92 Mon Sep 17 00:00:00 2001 From: Paolo Burelli Date: Fri, 25 Aug 2017 11:36:10 +0200 Subject: [PATCH 2/7] ENH: Added copy table function --- pandas_gbq/gbq.py | 95 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index e5cf1606..322a04fb 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -486,6 +486,101 @@ def process_insert_errors(self, insert_errors): raise StreamingInsertError + def copy(self, source_dataset_id, source_table_id, destination_dataset_id, destination_table_id, **kwargs): + """ Create a table in Google BigQuery given a table and schema + + Parameters + ---------- + source_table_id : str + Name of the source dataset + source_table_id: str + Name of the source table + destination_table_id : str + Name of the destination dataset + destination_table_id: str + Name of the destination table + **kwargs : Arbitrary keyword arguments + configuration (dict): table creation extra parameters + For example: + + configuration = {'copy': {'writeDisposition': 'WRITE_TRUNCATE'}} + + For more information see `BigQuery SQL Reference + `__ + """ + try: + from googleapiclient.errors import HttpError + except: + from apiclient.errors import HttpError + from google.auth.exceptions import RefreshError + + job_collection = self.service.jobs() + + job_config = { + 'copy': { + 'destinationTable': { + 'projectId': self.project_id, + 'datasetId': destination_dataset_id, + 'tableId': destination_table_id + }, + 'sourceTable': { + 'projectId': self.project_id, + 'datasetId': source_dataset_id, + 'tableId': source_table_id + } + } + } + config = kwargs.get('configuration') + if config is not None: + if len(config) != 1: + raise ValueError("Only one job type must be specified, but " + "given {}".format(','.join(config.keys()))) + if 'copy' in config: + if 'destinationTable' in config['copy'] or 'sourceTable' in config['copy']: + raise ValueError("source and destination table must be specified as parameters") + + job_config['copy'].update(config['copy']) + else: + raise ValueError("Only 'copy' job type is supported") + + job_data = { + 'configuration': job_config + } + + self._start_timer() + try: + self._print('Requesting copy... ', end="") + job_reply = job_collection.insert( + projectId=self.project_id, body=job_data).execute() + self._print('ok.') + except (RefreshError, ValueError): + if self.private_key: + raise AccessDenied( + "The service account credentials are not valid") + else: + raise AccessDenied( + "The credentials have been revoked or expired, " + "please re-run the application to re-authorize") + except HttpError as ex: + self.process_http_error(ex) + + job_reference = job_reply['jobReference'] + job_id = job_reference['jobId'] + self._print('Job ID: %s\nCopy running...' % job_id) + + while job_reply['status']['state'] == 'RUNNING': + self.print_elapsed_seconds(' Elapsed', 's. Waiting...') + + try: + job_reply = job_collection.get( + projectId=job_reference['projectId'], + jobId=job_id).execute() + except HttpError as ex: + self.process_http_error(ex) + + if self.verbose: + self._print('Copy completed.') + def run_query(self, query, **kwargs): try: from googleapiclient.errors import HttpError From de8a28e4960ed523f3a3e3d9ab0467cd7887b89d Mon Sep 17 00:00:00 2001 From: Paolo Burelli Date: Fri, 25 Aug 2017 11:55:43 +0200 Subject: [PATCH 3/7] DOC: Updated change log information (#43) --- docs/source/changelog.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 0f1d6e1b..799fd0ce 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -4,6 +4,11 @@ Changelog 0.2.1 / 2017-??-?? ------------------ +- Added support for replacing date partitioned tables (:issue:`43`), :func:`to_tbq` can be used regularly with a partition decorator +- :func:`GbqConnector.copy` allows to create a copy job +- :func:`_Table.contains_partition_decorator` returns whether the table_id contains a $ character +- :func:`_Table.exists` correctly ignores the partition decorator in case it is provided +- :func:`_Table.create` now supports the possibility to pass extra table resource details, allowing partitioned table creationsphinx-quickstart - :func:`read_gbq` now raises ``QueryTimeout`` if the request exceeds the ``query.timeoutMs`` value specified in the BigQuery configuration. (:issue:`76`) 0.2.0 / 2017-07-24 From e276da7f9ea4ecbb1997e03f2216357472f48e60 Mon Sep 17 00:00:00 2001 From: Paolo Burelli Date: Fri, 25 Aug 2017 12:48:22 +0200 Subject: [PATCH 4/7] CLN: Corrected linting --- pandas_gbq/gbq.py | 93 ++++++++++++++++++++---------------- pandas_gbq/tests/test_gbq.py | 33 ++++++++----- 2 files changed, 73 insertions(+), 53 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 322a04fb..9abac5b2 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -15,7 +15,6 @@ def _check_google_client_version(): - try: import pkg_resources @@ -39,7 +38,6 @@ def _check_google_client_version(): def _test_google_api_imports(): - try: import httplib2 # noqa except ImportError as ex: @@ -213,7 +211,7 @@ def __init__(self, project_id, reauth=False, verbose=False, # BQ Queries costs $5 per TB. First 1 TB per month is free # see here for more: https://cloud.google.com/bigquery/pricing - self.query_price_for_TB = 5. / 2**40 # USD/TB + self.query_price_for_TB = 5. / 2 ** 40 # USD/TB def get_credentials(self): if self.private_key: @@ -486,7 +484,8 @@ def process_insert_errors(self, insert_errors): raise StreamingInsertError - def copy(self, source_dataset_id, source_table_id, destination_dataset_id, destination_table_id, **kwargs): + def copy(self, source_dataset_id, source_table_id, destination_dataset_id, + destination_table_id, **kwargs): """ Create a table in Google BigQuery given a table and schema Parameters @@ -495,15 +494,17 @@ def copy(self, source_dataset_id, source_table_id, destination_dataset_id, desti Name of the source dataset source_table_id: str Name of the source table - destination_table_id : str + destination_dataset_id : str Name of the destination dataset destination_table_id: str - Name of the destination table + Name of the destination table **kwargs : Arbitrary keyword arguments configuration (dict): table creation extra parameters For example: - configuration = {'copy': {'writeDisposition': 'WRITE_TRUNCATE'}} + configuration = {'copy': + {'writeDisposition': 'WRITE_TRUNCATE'} + } For more information see `BigQuery SQL Reference `__ @@ -536,8 +537,10 @@ def copy(self, source_dataset_id, source_table_id, destination_dataset_id, desti raise ValueError("Only one job type must be specified, but " "given {}".format(','.join(config.keys()))) if 'copy' in config: - if 'destinationTable' in config['copy'] or 'sourceTable' in config['copy']: - raise ValueError("source and destination table must be specified as parameters") + if 'destinationTable' in config['copy'] or 'sourceTable' in \ + config['copy']: + raise ValueError("source and destination table must " + "be specified as parameters") job_config['copy'].update(config['copy']) else: @@ -1194,8 +1197,8 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, if partition_exists: if if_exists == 'fail': - raise TableCreationError("Could not create the partition because it " - "already exists. " + raise TableCreationError("Could not create the partition " + "because it already exists. " "Change the if_exists parameter to " "append or replace data.") elif if_exists == 'append': @@ -1203,8 +1206,8 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, root_table_id, table_schema): raise InvalidSchema("Please verify that the structure and " - "data types in the DataFrame match the " - "schema of the destination table.") + "data types in the DataFrame match " + "the schema of the destination table.") connector.load_data(dataframe, dataset_id, table_id, chunksize) elif if_exists == 'replace': @@ -1212,25 +1215,32 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, root_table_id, table_schema): raise InvalidSchema("Please verify that the structure and " - "data types in the DataFrame match the " - "schema of the destination table.") + "data types in the DataFrame match " + "the schema of the destination table.") - temporary_table_id = '_'.join([root_table_id + '_' + partition_id, str(randint(1, 100000))]) + temporary_table_id = '_'.join( + [root_table_id + '_' + partition_id, + str(randint(1, 100000))]) table.create(temporary_table_id, table_schema) - connector.load_data(dataframe, dataset_id, temporary_table_id, chunksize) + connector.load_data(dataframe, dataset_id, temporary_table_id, + chunksize) sleep(30) # <- Curses Google!!! - connector.run_query('select * from {0}.{1}'.format(dataset_id, temporary_table_id), configuration={ - 'query': { - 'destinationTable': { - 'projectId': project_id, - 'datasetId': dataset_id, - 'tableId': table_id - }, - 'createDisposition': 'CREATE_IF_NEEDED', - 'writeDisposition': 'WRITE_TRUNCATE', - 'allowLargeResults': True - } - }) + connector.run_query('select * from {0}.{1}' + .format(dataset_id, temporary_table_id), + configuration={ + 'query': { + 'destinationTable': { + 'projectId': project_id, + 'datasetId': dataset_id, + 'tableId': table_id + }, + 'createDisposition': + 'CREATE_IF_NEEDED', + 'writeDisposition': + 'WRITE_TRUNCATE', + 'allowLargeResults': True + } + }) table.delete(temporary_table_id) else: @@ -1239,10 +1249,11 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, else: if table.exists(table_id): if if_exists == 'fail': - raise TableCreationError("Could not create the table because it " - "already exists. " - "Change the if_exists parameter to " - "append or replace data.") + raise TableCreationError( + "Could not create the table because it " + "already exists. " + "Change the if_exists parameter to " + "append or replace data.") elif if_exists == 'replace': connector.delete_and_recreate_table( dataset_id, table_id, table_schema) @@ -1251,8 +1262,8 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, table_id, table_schema): raise InvalidSchema("Please verify that the structure and " - "data types in the DataFrame match the " - "schema of the destination table.") + "data types in the DataFrame match " + "the schema of the destination table.") else: table.create(table_id, table_schema) @@ -1297,7 +1308,6 @@ def _generate_bq_schema(df, default_type='STRING'): class _Table(GbqConnector): - def __init__(self, project_id, dataset_id, reauth=False, verbose=False, private_key=None): try: @@ -1347,15 +1357,15 @@ def create(self, table_id, schema, **kwargs): table : str Name of table to be written schema : str - Use the generate_bq_schema to generate your table schema from a - dataframe. - + Use the generate_bq_schema to generate + your table schema from a dataframe. + **kwargs : Arbitrary keyword arguments body (dict): table creation extra parameters For example: - + body = {'timePartitioning': {'type': 'DAY'}} - + For more information see `BigQuery SQL Reference `__ """ @@ -1418,7 +1428,6 @@ def contains_partition_decorator(table_id): class _Dataset(GbqConnector): - def __init__(self, project_id, reauth=False, verbose=False, private_key=None): try: diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index 6c7cfa7c..c1f639b9 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1521,8 +1521,10 @@ def test_create_partitioned_table(self): {'name': 'B', 'type': 'FLOAT'}, {'name': 'C', 'type': 'STRING'}, {'name': 'D', 'type': 'TIMESTAMP'}]} - self.table.create(TABLE_ID + test_id, test_schema, body={'timePartitioning': {'type': 'DAY'}}) - actual = self.sut.resource(self.dataset_prefix + "1", TABLE_ID + test_id) + self.table.create(TABLE_ID + test_id, test_schema, + body={'timePartitioning': {'type': 'DAY'}}) + actual = self.sut.resource(self.dataset_prefix + + "1", TABLE_ID + test_id) assert actual['timePartitioning']['type'] == 'DAY' def test_upload_data_to_partition(self): @@ -1533,7 +1535,8 @@ def test_upload_data_to_partition(self): self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), body={'timePartitioning': {'type': 'DAY'}}) - partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") gbq.to_gbq(df, partition_name, _get_project_id(), chunksize=10000, private_key=_get_private_key_path()) @@ -1551,7 +1554,8 @@ def test_upload_data_to_partition_non_existing_table(self): test_size = 20001 df = make_mixed_dataframe_v2(test_size) - partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") with pytest.raises(gbq.NotFoundException): gbq.to_gbq(df, partition_name, _get_project_id(), @@ -1564,7 +1568,8 @@ def test_upload_data_to_partition_non_partitioned_table(self): self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df)) - partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") with pytest.raises(gbq.InvalidSchema): gbq.to_gbq(df, partition_name, _get_project_id(), @@ -1574,9 +1579,11 @@ def test_upload_data_if_partition_exists_fail(self): test_id = "3" test_size = 10 df = make_mixed_dataframe_v2(test_size) - self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), body={'timePartitioning': {'type': 'DAY'}}) + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), + body={'timePartitioning': {'type': 'DAY'}}) - partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") gbq.to_gbq(df, partition_name, _get_project_id(), chunksize=10000, private_key=_get_private_key_path()) @@ -1596,9 +1603,11 @@ def test_upload_data_if_partition_exists_append(self): test_size = 10 df = make_mixed_dataframe_v2(test_size) df_different_schema = tm.makeMixedDataFrame() - self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), body={'timePartitioning': {'type': 'DAY'}}) + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), + body={'timePartitioning': {'type': 'DAY'}}) - partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") gbq.to_gbq(df, partition_name, _get_project_id(), chunksize=10000, private_key=_get_private_key_path()) @@ -1628,9 +1637,11 @@ def test_upload_data_if_partition_exists_replace(self): df = make_mixed_dataframe_v2(test_size) df2 = make_mixed_dataframe_v2(expected_size) df_different_schema = tm.makeMixedDataFrame() - self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), body={'timePartitioning': {'type': 'DAY'}}) + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), + body={'timePartitioning': {'type': 'DAY'}}) - partition_name = self.destination_table + test_id + '$' + datetime.today().strftime("%Y%m%d") + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") gbq.to_gbq(df, partition_name, _get_project_id(), chunksize=10000, private_key=_get_private_key_path()) From 2d217a258d008c28651b529deb14fcf3e0e2ef44 Mon Sep 17 00:00:00 2001 From: Paolo Burelli Date: Tue, 29 Aug 2017 14:15:32 +0200 Subject: [PATCH 5/7] ENH: _Dataset.delete now supports the delete_contents parameter --- docs/source/changelog.rst | 1 + pandas_gbq/gbq.py | 11 ++++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 799fd0ce..26b54d23 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -10,6 +10,7 @@ Changelog - :func:`_Table.exists` correctly ignores the partition decorator in case it is provided - :func:`_Table.create` now supports the possibility to pass extra table resource details, allowing partitioned table creationsphinx-quickstart - :func:`read_gbq` now raises ``QueryTimeout`` if the request exceeds the ``query.timeoutMs`` value specified in the BigQuery configuration. (:issue:`76`) +- :func:`_Dataset.delete` now supports the delete_contents parameter 0.2.0 / 2017-07-24 ------------------ diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 9abac5b2..eea5ee65 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -1066,7 +1066,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, else: raise InvalidIndexColumn( 'Index column "{0}" does not exist in DataFrame.' - .format(index_col) + .format(index_col) ) # Change the order of columns in the DataFrame based on provided list @@ -1533,13 +1533,17 @@ def create(self, dataset_id): except self.http_error as ex: self.process_http_error(ex) - def delete(self, dataset_id): + def delete(self, dataset_id, delete_contents=False): """ Delete a dataset in Google BigQuery Parameters ---------- dataset : str Name of dataset to be deleted + delete_contents : boolean + If True, delete all the tables in the dataset. + If False and the dataset contains tables, + the request will fail. Default is False """ if not self.exists(dataset_id): @@ -1549,7 +1553,8 @@ def delete(self, dataset_id): try: self.service.datasets().delete( datasetId=dataset_id, - projectId=self.project_id).execute() + projectId=self.project_id, + deleteContents=delete_contents).execute() except self.http_error as ex: # Ignore 404 error which may occur if dataset already deleted From d7f743d98593c072ded1914c1b3319fb1b3cd059 Mon Sep 17 00:00:00 2001 From: Paolo Burelli Date: Thu, 31 Aug 2017 21:21:14 +0200 Subject: [PATCH 6/7] BUG: made schema comparison case insensitive --- pandas_gbq/gbq.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index eea5ee65..15a787f5 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -866,10 +866,12 @@ def verify_schema(self, dataset_id, table_id, schema): Whether the schemas match """ - fields_remote = sorted(self.schema(dataset_id, table_id), - key=lambda x: x['name']) + fields_remote = sorted(self.schema(dataset_id, table_id), key=lambda x: x['name']) fields_local = sorted(schema['fields'], key=lambda x: x['name']) + fields_remote = [{'name': f['name'].lower(), 'type': f['type']} for f in fields_remote] + fields_local = [{'name': f['name'].lower(), 'type': f['type']} for f in fields_local] + return fields_remote == fields_local def schema_is_subset(self, dataset_id, table_id, schema): @@ -898,6 +900,9 @@ def schema_is_subset(self, dataset_id, table_id, schema): fields_remote = self.schema(dataset_id, table_id) fields_local = schema['fields'] + fields_remote = [{'name': f['name'].lower(), 'type': f['type']} for f in fields_remote] + fields_local = [{'name': f['name'].lower(), 'type': f['type']} for f in fields_local] + return all(field in fields_remote for field in fields_local) def delete_and_recreate_table(self, dataset_id, table_id, table_schema): From ded898b3c82a8e3cc8507e27f2a4de65d81b2469 Mon Sep 17 00:00:00 2001 From: Paolo Burelli Date: Thu, 31 Aug 2017 21:51:53 +0200 Subject: [PATCH 7/7] BUG: corrected fields order and lines length --- pandas_gbq/gbq.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 15a787f5..97f0420c 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -866,11 +866,12 @@ def verify_schema(self, dataset_id, table_id, schema): Whether the schemas match """ - fields_remote = sorted(self.schema(dataset_id, table_id), key=lambda x: x['name']) - fields_local = sorted(schema['fields'], key=lambda x: x['name']) - - fields_remote = [{'name': f['name'].lower(), 'type': f['type']} for f in fields_remote] - fields_local = [{'name': f['name'].lower(), 'type': f['type']} for f in fields_local] + fields_remote = sorted([{'name': f['name'].lower(), 'type': f['type']} + for f in self.schema(dataset_id, table_id)], + key=lambda x: x['name']) + fields_local = sorted([{'name': f['name'].lower(), 'type': f['type']} + for f in schema['fields']], + key=lambda x: x['name']) return fields_remote == fields_local @@ -897,11 +898,10 @@ def schema_is_subset(self, dataset_id, table_id, schema): Whether the passed schema is a subset """ - fields_remote = self.schema(dataset_id, table_id) - fields_local = schema['fields'] - - fields_remote = [{'name': f['name'].lower(), 'type': f['type']} for f in fields_remote] - fields_local = [{'name': f['name'].lower(), 'type': f['type']} for f in fields_local] + fields_remote = [{'name': f['name'].lower(), 'type': f['type']} + for f in self.schema(dataset_id, table_id)] + fields_local = [{'name': f['name'].lower(), 'type': f['type']} + for f in schema['fields']] return all(field in fields_remote for field in fields_local)