Skip to content

Commit f612f95

Browse files
committed
Improvements discused in PR conversation
Accidentally left a duplicate test in Correcting change to schema made by auto-rebase Fixing missing assertTrue and reversion to not checking subset on append (both from rebase) Replacing AssertEqual Shortening line to pass flake
1 parent 9dfd106 commit f612f95

File tree

4 files changed

+148
-9
lines changed

4 files changed

+148
-9
lines changed

docs/source/changelog.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ Changelog
99

1010
- All gbq errors will simply be subclasses of ``ValueError`` and no longer inherit from the deprecated ``PandasError``.
1111

12+
0.1.5 / 2017-04-20
13+
------------------
14+
- When using ```to_gbq``` if ```if_exists``` is set to ```append```, dataframe needs to contain only a subset of the fields in the BigQuery schema. GH#24
15+
1216
0.1.4 / 2017-03-17
1317
------------------
1418

docs/source/writing.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ a ``TableCreationError`` if the destination table already exists.
4040

4141
If the ``if_exists`` argument is set to ``'append'``, the destination dataframe will
4242
be written to the table using the defined table schema and column types. The
43-
dataframe must match the destination table in structure and data types.
43+
dataframe must contain fields (matching name and type) currently in the destination.
4444
If the ``if_exists`` argument is set to ``'replace'``, and the existing table has a
4545
different schema, a delay of 2 minutes will be forced to ensure that the new schema
4646
has propagated in the Google environment. See

pandas_gbq/gbq.py

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,19 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize):
557557

558558
self._print("\n")
559559

560-
def verify_schema(self, dataset_id, table_id, schema):
560+
def schema(self, dataset_id, table_id):
561+
"""Retrieve the schema of the table
562+
563+
Obtain from BigQuery the field names and field types
564+
for the table defined by the parameters
565+
566+
:param str dataset_id: Name of the BigQuery dataset for the table
567+
:param str table_id: Name of the BigQuery table
568+
569+
:return: Fields representing the schema
570+
:rtype: list of dicts
571+
"""
572+
561573
try:
562574
from googleapiclient.errors import HttpError
563575
except:
@@ -573,15 +585,53 @@ def verify_schema(self, dataset_id, table_id, schema):
573585
'type': field_remote['type']}
574586
for field_remote in remote_schema['fields']]
575587

576-
fields_remote = set([json.dumps(field_remote)
577-
for field_remote in remote_fields])
578-
fields_local = set(json.dumps(field_local)
579-
for field_local in schema['fields'])
580-
581-
return fields_remote == fields_local
588+
return remote_fields
582589
except HttpError as ex:
583590
self.process_http_error(ex)
584591

592+
def verify_schema(self, dataset_id, table_id, schema):
593+
"""Indicate whether schemas match exactly
594+
595+
Compare the BigQuery table identified in the parameters with
596+
the schema passed in and indicate whether all fields in the former
597+
are present in the latter. Order is not considered.
598+
599+
:param str dataset_id: Name of the BigQuery dataset for the table
600+
:param str table_id: Name of the BigQuery table
601+
:param list(dict) schema: Schema for comparison. Each item should have
602+
a 'name' and a 'type'
603+
604+
:return: Whether the schemas match
605+
:rtype: bool
606+
"""
607+
608+
fields_remote = sorted(self.schema(dataset_id, table_id),
609+
key=lambda x: x['name'])
610+
fields_local = sorted(schema['fields'], key=lambda x: x['name'])
611+
612+
return fields_remote == fields_local
613+
614+
def schema_is_subset(self, dataset_id, table_id, schema):
615+
"""Indicate whether the schema to be uploaded is a subset
616+
617+
Compare the BigQuery table identified in the parameters with
618+
the schema passed in and indicate whether a subset of the fields in
619+
the former are present in the latter. Order is not considered.
620+
621+
:param str dataset_id: Name of the BigQuery dataset for the table
622+
:param str table_id: Name of the BigQuery table
623+
:param list(dict) schema: Schema for comparison. Each item should have
624+
a 'name' and a 'type'
625+
626+
:return: Whether the passed schema is a subset
627+
:rtype: bool
628+
"""
629+
630+
fields_remote = self.schema(dataset_id, table_id)
631+
fields_local = schema['fields']
632+
633+
return all(field in fields_remote for field in fields_local)
634+
585635
def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
586636
delay = 0
587637

@@ -844,7 +894,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
844894
connector.delete_and_recreate_table(
845895
dataset_id, table_id, table_schema)
846896
elif if_exists == 'append':
847-
if not connector.verify_schema(dataset_id, table_id, table_schema):
897+
if not connector.schema_is_subset(dataset_id,
898+
table_id,
899+
table_schema):
848900
raise InvalidSchema("Please verify that the structure and "
849901
"data types in the DataFrame match the "
850902
"schema of the destination table.")

pandas_gbq/tests/test_gbq.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,30 @@ def test_upload_data_if_table_exists_append(self):
10711071
_get_project_id(), if_exists='append',
10721072
private_key=_get_private_key_path())
10731073

1074+
def test_upload_subset_columns_if_table_exists_append(self):
1075+
# For pull request #24
1076+
test_id = "16"
1077+
test_size = 10
1078+
df = make_mixed_dataframe_v2(test_size)
1079+
df_subset_cols = df.iloc[:, :2]
1080+
1081+
# Initialize table with sample data
1082+
gbq.to_gbq(df, self.destination_table + test_id, _get_project_id(),
1083+
chunksize=10000, private_key=_get_private_key_path())
1084+
1085+
# Test the if_exists parameter with value 'append'
1086+
gbq.to_gbq(df_subset_cols,
1087+
self.destination_table + test_id, _get_project_id(),
1088+
if_exists='append', private_key=_get_private_key_path())
1089+
1090+
sleep(30) # <- Curses Google!!!
1091+
1092+
result = gbq.read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
1093+
.format(self.destination_table + test_id),
1094+
project_id=_get_project_id(),
1095+
private_key=_get_private_key_path())
1096+
assert result['num_rows'][0] == test_size * 2
1097+
10741098
def test_upload_data_if_table_exists_replace(self):
10751099
test_id = "4"
10761100
test_size = 10
@@ -1258,6 +1282,65 @@ def test_verify_schema_ignores_field_mode(self):
12581282
assert self.sut.verify_schema(
12591283
self.dataset_prefix + "1", TABLE_ID + test_id, test_schema_2)
12601284

1285+
def test_retrieve_schema(self):
1286+
# For pull request #24
1287+
test_id = "15"
1288+
test_schema = {'fields': [{'name': 'A', 'type': 'FLOAT'},
1289+
{'name': 'B', 'type': 'FLOAT'},
1290+
{'name': 'C', 'type': 'STRING'},
1291+
{'name': 'D', 'type': 'TIMESTAMP'}]}
1292+
1293+
self.table.create(TABLE_ID + test_id, test_schema)
1294+
actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)
1295+
expected = test_schema['fields']
1296+
assert expected == actual, 'Expected schema used to create table'
1297+
1298+
def test_schema_is_subset_passes_if_subset(self):
1299+
# For pull request #24
1300+
test_id = '16'
1301+
1302+
table_name = TABLE_ID + test_id
1303+
dataset = self.dataset_prefix + '1'
1304+
1305+
table_schema = {'fields': [{'name': 'A',
1306+
'type': 'FLOAT'},
1307+
{'name': 'B',
1308+
'type': 'FLOAT'},
1309+
{'name': 'C',
1310+
'type': 'STRING'}]}
1311+
tested_schema = {'fields': [{'name': 'A',
1312+
'type': 'FLOAT'},
1313+
{'name': 'B',
1314+
'type': 'FLOAT'}]}
1315+
1316+
self.table.create(table_name, table_schema)
1317+
1318+
assert self.sut.schema_is_subset(
1319+
dataset, table_name, tested_schema) is True
1320+
1321+
def test_schema_is_subset_fails_if_not_subset(self):
1322+
# For pull request #24
1323+
test_id = '17'
1324+
1325+
table_name = TABLE_ID + test_id
1326+
dataset = self.dataset_prefix + '1'
1327+
1328+
table_schema = {'fields': [{'name': 'A',
1329+
'type': 'FLOAT'},
1330+
{'name': 'B',
1331+
'type': 'FLOAT'},
1332+
{'name': 'C',
1333+
'type': 'STRING'}]}
1334+
tested_schema = {'fields': [{'name': 'A',
1335+
'type': 'FLOAT'},
1336+
{'name': 'C',
1337+
'type': 'FLOAT'}]}
1338+
1339+
self.table.create(table_name, table_schema)
1340+
1341+
assert self.sut.schema_is_subset(
1342+
dataset, table_name, tested_schema) is False
1343+
12611344
def test_list_dataset(self):
12621345
dataset_id = self.dataset_prefix + "1"
12631346
assert dataset_id in self.dataset.datasets()

0 commit comments

Comments
 (0)