diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 0f1d6e1b..26b54d23 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -4,7 +4,13 @@ Changelog 0.2.1 / 2017-??-?? ------------------ +- Added support for replacing date partitioned tables (:issue:`43`), :func:`to_tbq` can be used regularly with a partition decorator +- :func:`GbqConnector.copy` allows to create a copy job +- :func:`_Table.contains_partition_decorator` returns whether the table_id contains a $ character +- :func:`_Table.exists` correctly ignores the partition decorator in case it is provided +- :func:`_Table.create` now supports the possibility to pass extra table resource details, allowing partitioned table creationsphinx-quickstart - :func:`read_gbq` now raises ``QueryTimeout`` if the request exceeds the ``query.timeoutMs`` value specified in the BigQuery configuration. (:issue:`76`) +- :func:`_Dataset.delete` now supports the delete_contents parameter 0.2.0 / 2017-07-24 ------------------ diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 0c538662..97f0420c 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -5,6 +5,7 @@ import uuid import time import sys +from random import randint import numpy as np @@ -14,7 +15,6 @@ def _check_google_client_version(): - try: import pkg_resources @@ -38,7 +38,6 @@ def _check_google_client_version(): def _test_google_api_imports(): - try: import httplib2 # noqa except ImportError as ex: @@ -212,7 +211,7 @@ def __init__(self, project_id, reauth=False, verbose=False, # BQ Queries costs $5 per TB. First 1 TB per month is free # see here for more: https://cloud.google.com/bigquery/pricing - self.query_price_for_TB = 5. / 2**40 # USD/TB + self.query_price_for_TB = 5. / 2 ** 40 # USD/TB def get_credentials(self): if self.private_key: @@ -485,6 +484,106 @@ def process_insert_errors(self, insert_errors): raise StreamingInsertError + def copy(self, source_dataset_id, source_table_id, destination_dataset_id, + destination_table_id, **kwargs): + """ Create a table in Google BigQuery given a table and schema + + Parameters + ---------- + source_table_id : str + Name of the source dataset + source_table_id: str + Name of the source table + destination_dataset_id : str + Name of the destination dataset + destination_table_id: str + Name of the destination table + **kwargs : Arbitrary keyword arguments + configuration (dict): table creation extra parameters + For example: + + configuration = {'copy': + {'writeDisposition': 'WRITE_TRUNCATE'} + } + + For more information see `BigQuery SQL Reference + `__ + """ + try: + from googleapiclient.errors import HttpError + except: + from apiclient.errors import HttpError + from google.auth.exceptions import RefreshError + + job_collection = self.service.jobs() + + job_config = { + 'copy': { + 'destinationTable': { + 'projectId': self.project_id, + 'datasetId': destination_dataset_id, + 'tableId': destination_table_id + }, + 'sourceTable': { + 'projectId': self.project_id, + 'datasetId': source_dataset_id, + 'tableId': source_table_id + } + } + } + config = kwargs.get('configuration') + if config is not None: + if len(config) != 1: + raise ValueError("Only one job type must be specified, but " + "given {}".format(','.join(config.keys()))) + if 'copy' in config: + if 'destinationTable' in config['copy'] or 'sourceTable' in \ + config['copy']: + raise ValueError("source and destination table must " + "be specified as parameters") + + job_config['copy'].update(config['copy']) + else: + raise ValueError("Only 'copy' job type is supported") + + job_data = { + 'configuration': job_config + } + + self._start_timer() + try: + self._print('Requesting copy... ', end="") + job_reply = job_collection.insert( + projectId=self.project_id, body=job_data).execute() + self._print('ok.') + except (RefreshError, ValueError): + if self.private_key: + raise AccessDenied( + "The service account credentials are not valid") + else: + raise AccessDenied( + "The credentials have been revoked or expired, " + "please re-run the application to re-authorize") + except HttpError as ex: + self.process_http_error(ex) + + job_reference = job_reply['jobReference'] + job_id = job_reference['jobId'] + self._print('Job ID: %s\nCopy running...' % job_id) + + while job_reply['status']['state'] == 'RUNNING': + self.print_elapsed_seconds(' Elapsed', 's. Waiting...') + + try: + job_reply = job_collection.get( + projectId=job_reference['projectId'], + jobId=job_id).execute() + except HttpError as ex: + self.process_http_error(ex) + + if self.verbose: + self._print('Copy completed.') + def run_query(self, query, **kwargs): try: from googleapiclient.errors import HttpError @@ -674,6 +773,38 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize): self._print("\n") + def resource(self, dataset_id, table_id): + """Retrieve the resource describing a table + + Obtain from BigQuery complete description + for the table defined by the parameters + + Parameters + ---------- + dataset_id : str + Name of the BigQuery dataset for the table + table_id : str + Name of the BigQuery table + + Returns + ------- + object + Table resource + """ + + try: + from googleapiclient.errors import HttpError + except: + from apiclient.errors import HttpError + + try: + return self.service.tables().get( + projectId=self.project_id, + datasetId=dataset_id, + tableId=table_id).execute() + except HttpError as ex: + self.process_http_error(ex) + def schema(self, dataset_id, table_id): """Retrieve the schema of the table @@ -735,9 +866,12 @@ def verify_schema(self, dataset_id, table_id, schema): Whether the schemas match """ - fields_remote = sorted(self.schema(dataset_id, table_id), + fields_remote = sorted([{'name': f['name'].lower(), 'type': f['type']} + for f in self.schema(dataset_id, table_id)], key=lambda x: x['name']) - fields_local = sorted(schema['fields'], key=lambda x: x['name']) + fields_local = sorted([{'name': f['name'].lower(), 'type': f['type']} + for f in schema['fields']], + key=lambda x: x['name']) return fields_remote == fields_local @@ -764,8 +898,10 @@ def schema_is_subset(self, dataset_id, table_id, schema): Whether the passed schema is a subset """ - fields_remote = self.schema(dataset_id, table_id) - fields_local = schema['fields'] + fields_remote = [{'name': f['name'].lower(), 'type': f['type']} + for f in self.schema(dataset_id, table_id)] + fields_local = [{'name': f['name'].lower(), 'type': f['type']} + for f in schema['fields']] return all(field in fields_remote for field in fields_local) @@ -935,7 +1071,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, else: raise InvalidIndexColumn( 'Index column "{0}" does not exist in DataFrame.' - .format(index_col) + .format(index_col) ) # Change the order of columns in the DataFrame based on provided list @@ -1046,27 +1182,97 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, table_schema = _generate_bq_schema(dataframe) - # If table exists, check if_exists parameter - if table.exists(table_id): - if if_exists == 'fail': - raise TableCreationError("Could not create the table because it " - "already exists. " - "Change the if_exists parameter to " - "append or replace data.") - elif if_exists == 'replace': - connector.delete_and_recreate_table( - dataset_id, table_id, table_schema) - elif if_exists == 'append': - if not connector.schema_is_subset(dataset_id, - table_id, - table_schema): - raise InvalidSchema("Please verify that the structure and " - "data types in the DataFrame match the " - "schema of the destination table.") + if _Table.contains_partition_decorator(table_id): + root_table_id, partition_id = table_id.rsplit('$', 1) + + if not table.exists(root_table_id): + raise NotFoundException("Could not write to the partition because " + "the table does not exist.") + + table_resource = table.resource(dataset_id, root_table_id) + + if 'timePartitioning' not in table_resource: + raise InvalidSchema("Could not write to the partition because " + "the table is not partitioned.") + + partition_exists = read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(destination_table), + project_id=project_id, + private_key=private_key)['num_rows'][0] > 0 + + if partition_exists: + if if_exists == 'fail': + raise TableCreationError("Could not create the partition " + "because it already exists. " + "Change the if_exists parameter to " + "append or replace data.") + elif if_exists == 'append': + if not connector.schema_is_subset(dataset_id, + root_table_id, + table_schema): + raise InvalidSchema("Please verify that the structure and " + "data types in the DataFrame match " + "the schema of the destination table.") + connector.load_data(dataframe, dataset_id, table_id, chunksize) + + elif if_exists == 'replace': + if not connector.schema_is_subset(dataset_id, + root_table_id, + table_schema): + raise InvalidSchema("Please verify that the structure and " + "data types in the DataFrame match " + "the schema of the destination table.") + + temporary_table_id = '_'.join( + [root_table_id + '_' + partition_id, + str(randint(1, 100000))]) + table.create(temporary_table_id, table_schema) + connector.load_data(dataframe, dataset_id, temporary_table_id, + chunksize) + sleep(30) # <- Curses Google!!! + connector.run_query('select * from {0}.{1}' + .format(dataset_id, temporary_table_id), + configuration={ + 'query': { + 'destinationTable': { + 'projectId': project_id, + 'datasetId': dataset_id, + 'tableId': table_id + }, + 'createDisposition': + 'CREATE_IF_NEEDED', + 'writeDisposition': + 'WRITE_TRUNCATE', + 'allowLargeResults': True + } + }) + table.delete(temporary_table_id) + + else: + connector.load_data(dataframe, dataset_id, table_id, chunksize) + else: - table.create(table_id, table_schema) + if table.exists(table_id): + if if_exists == 'fail': + raise TableCreationError( + "Could not create the table because it " + "already exists. " + "Change the if_exists parameter to " + "append or replace data.") + elif if_exists == 'replace': + connector.delete_and_recreate_table( + dataset_id, table_id, table_schema) + elif if_exists == 'append': + if not connector.schema_is_subset(dataset_id, + table_id, + table_schema): + raise InvalidSchema("Please verify that the structure and " + "data types in the DataFrame match " + "the schema of the destination table.") + else: + table.create(table_id, table_schema) - connector.load_data(dataframe, dataset_id, table_id, chunksize) + connector.load_data(dataframe, dataset_id, table_id, chunksize) def generate_bq_schema(df, default_type='STRING'): @@ -1107,7 +1313,6 @@ def _generate_bq_schema(df, default_type='STRING'): class _Table(GbqConnector): - def __init__(self, project_id, dataset_id, reauth=False, verbose=False, private_key=None): try: @@ -1132,11 +1337,16 @@ def exists(self, table_id): true if table exists, otherwise false """ + if _Table.contains_partition_decorator(table_id): + root_table_id, partition_id = table_id.rsplit('$', 1) + else: + root_table_id = table_id + try: self.service.tables().get( projectId=self.project_id, datasetId=self.dataset_id, - tableId=table_id).execute() + tableId=root_table_id).execute() return True except self.http_error as ex: if ex.resp.status == 404: @@ -1144,7 +1354,7 @@ def exists(self, table_id): else: self.process_http_error(ex) - def create(self, table_id, schema): + def create(self, table_id, schema, **kwargs): """ Create a table in Google BigQuery given a table and schema Parameters @@ -1152,8 +1362,17 @@ def create(self, table_id, schema): table : str Name of table to be written schema : str - Use the generate_bq_schema to generate your table schema from a - dataframe. + Use the generate_bq_schema to generate + your table schema from a dataframe. + + **kwargs : Arbitrary keyword arguments + body (dict): table creation extra parameters + For example: + + body = {'timePartitioning': {'type': 'DAY'}} + + For more information see `BigQuery SQL Reference + `__ """ if self.exists(table_id): @@ -1174,6 +1393,10 @@ def create(self, table_id, schema): } } + config = kwargs.get('body') + if config is not None: + body.update(config) + try: self.service.tables().insert( projectId=self.project_id, @@ -1204,9 +1427,12 @@ def delete(self, table_id): if ex.resp.status != 404: self.process_http_error(ex) + @staticmethod + def contains_partition_decorator(table_id): + return "$" in table_id + class _Dataset(GbqConnector): - def __init__(self, project_id, reauth=False, verbose=False, private_key=None): try: @@ -1312,13 +1538,17 @@ def create(self, dataset_id): except self.http_error as ex: self.process_http_error(ex) - def delete(self, dataset_id): + def delete(self, dataset_id, delete_contents=False): """ Delete a dataset in Google BigQuery Parameters ---------- dataset : str Name of dataset to be deleted + delete_contents : boolean + If True, delete all the tables in the dataset. + If False and the dataset contains tables, + the request will fail. Default is False """ if not self.exists(dataset_id): @@ -1328,7 +1558,8 @@ def delete(self, dataset_id): try: self.service.datasets().delete( datasetId=dataset_id, - projectId=self.project_id).execute() + projectId=self.project_id, + deleteContents=delete_contents).execute() except self.http_error as ex: # Ignore 404 error which may occur if dataset already deleted diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index c9ac31dd..c1f639b9 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1471,3 +1471,193 @@ def test_upload_data(self): project_id=_get_project_id(), private_key=_get_private_key_contents()) assert result['num_rows'][0] == test_size + + +class TestToPartitionGBQIntegrationWithServiceAccountKeyPath(object): + @classmethod + def setup_class(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *BEFORE* + # executing *ALL* tests described below. + + _skip_if_no_project_id() + _skip_if_no_private_key_path() + + _setup_common() + + def setup_method(self, method): + # - PER-TEST FIXTURES - + # put here any instruction you want to be run *BEFORE* *EVERY* test is + # executed. + + self.dataset_prefix = _get_dataset_prefix_random() + clean_gbq_environment(self.dataset_prefix, _get_private_key_path()) + self.dataset = gbq._Dataset(_get_project_id(), + private_key=_get_private_key_path()) + self.table = gbq._Table(_get_project_id(), self.dataset_prefix + "1", + private_key=_get_private_key_path()) + self.sut = gbq.GbqConnector(_get_project_id(), + private_key=_get_private_key_path()) + self.destination_table = "{0}{1}.{2}".format(self.dataset_prefix, "1", + TABLE_ID) + self.dataset.create(self.dataset_prefix + "1") + + @classmethod + def teardown_class(cls): + # - GLOBAL CLASS FIXTURES - + # put here any instruction you want to execute only *ONCE* *AFTER* + # executing all tests. + pass + + def teardown_method(self, method): + # - PER-TEST FIXTURES - + # put here any instructions you want to be run *AFTER* *EVERY* test is + # executed. + clean_gbq_environment(self.dataset_prefix, _get_private_key_path()) + + def test_create_partitioned_table(self): + test_id = "1" + test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, + {'name': 'D', 'type': 'TIMESTAMP'}]} + self.table.create(TABLE_ID + test_id, test_schema, + body={'timePartitioning': {'type': 'DAY'}}) + actual = self.sut.resource(self.dataset_prefix + + "1", TABLE_ID + test_id) + assert actual['timePartitioning']['type'] == 'DAY' + + def test_upload_data_to_partition(self): + test_id = "2" + test_size = 20001 + df = make_mixed_dataframe_v2(test_size) + + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), + body={'timePartitioning': {'type': 'DAY'}}) + + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") + + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + sleep(30) # <- Curses Google!!! + + result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(partition_name), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result['num_rows'][0] == test_size + + def test_upload_data_to_partition_non_existing_table(self): + test_id = "2" + test_size = 20001 + df = make_mixed_dataframe_v2(test_size) + + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") + + with pytest.raises(gbq.NotFoundException): + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + def test_upload_data_to_partition_non_partitioned_table(self): + test_id = "2" + test_size = 20001 + df = make_mixed_dataframe_v2(test_size) + + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df)) + + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") + + with pytest.raises(gbq.InvalidSchema): + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + def test_upload_data_if_partition_exists_fail(self): + test_id = "3" + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), + body={'timePartitioning': {'type': 'DAY'}}) + + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") + + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + # Test the default value of if_exists is 'fail' + with pytest.raises(gbq.TableCreationError): + gbq.to_gbq(df, partition_name, _get_project_id(), + private_key=_get_private_key_path()) + + # Test the if_exists parameter with value 'fail' + with pytest.raises(gbq.TableCreationError): + gbq.to_gbq(df, partition_name, _get_project_id(), + if_exists='fail', private_key=_get_private_key_path()) + + def test_upload_data_if_partition_exists_append(self): + test_id = "3" + test_size = 10 + df = make_mixed_dataframe_v2(test_size) + df_different_schema = tm.makeMixedDataFrame() + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), + body={'timePartitioning': {'type': 'DAY'}}) + + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") + + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + # Test the if_exists parameter with value 'append' + gbq.to_gbq(df, partition_name, _get_project_id(), + if_exists='append', private_key=_get_private_key_path()) + + sleep(30) # <- Curses Google!!! + + result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(partition_name), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result['num_rows'][0] == test_size * 2 + + # Try inserting with a different schema, confirm failure + with pytest.raises(gbq.InvalidSchema): + gbq.to_gbq(df_different_schema, partition_name, + _get_project_id(), if_exists='append', + private_key=_get_private_key_path()) + + def test_upload_data_if_partition_exists_replace(self): + test_id = "4" + test_size = 10 + expected_size = 25 + df = make_mixed_dataframe_v2(test_size) + df2 = make_mixed_dataframe_v2(expected_size) + df_different_schema = tm.makeMixedDataFrame() + self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df), + body={'timePartitioning': {'type': 'DAY'}}) + + partition_name = self.destination_table + test_id + \ + '$' + datetime.today().strftime("%Y%m%d") + + gbq.to_gbq(df, partition_name, _get_project_id(), + chunksize=10000, private_key=_get_private_key_path()) + + # Test the if_exists parameter with value 'replace' + gbq.to_gbq(df2, partition_name, _get_project_id(), + if_exists='replace', private_key=_get_private_key_path()) + + result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}" + .format(partition_name), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + assert result['num_rows'][0] == expected_size + + # Try inserting with a different schema, confirm failure + with pytest.raises(gbq.InvalidSchema): + gbq.to_gbq(df_different_schema, partition_name, + _get_project_id(), if_exists='replace', + private_key=_get_private_key_path())