Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 5a4b879

Browse files
committed
attempt to utilize information_schema in databricks
information_schema previously was not available
1 parent 2c3a305 commit 5a4b879

File tree

1 file changed

+36
-17
lines changed

1 file changed

+36
-17
lines changed

data_diff/databases/databricks.py

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class Dialect(BaseDialect):
5353
"TIMESTAMP_NTZ": Timestamp,
5454
# Text
5555
"STRING": Text,
56+
"VARCHAR": Text,
5657
# Boolean
5758
"BOOLEAN": Boolean,
5859
}
@@ -138,25 +139,43 @@ def create_connection(self):
138139
raise ConnectionError(*e.args) from e
139140

140141
def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
141-
# Databricks has INFORMATION_SCHEMA only for Databricks Runtime, not for Databricks SQL.
142-
# https://docs.databricks.com/spark/latest/spark-sql/language-manual/information-schema/columns.html
143-
# So, to obtain information about schema, we should use another approach.
144-
145142
conn = self.create_connection()
143+
table_schema = {}
146144

147-
catalog, schema, table = self._normalize_table_path(path)
148-
with conn.cursor() as cursor:
149-
cursor.columns(catalog_name=catalog, schema_name=schema, table_name=table)
150-
try:
151-
rows = cursor.fetchall()
152-
finally:
153-
conn.close()
154-
if not rows:
155-
raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns")
156-
157-
d = {r.COLUMN_NAME: (r.COLUMN_NAME, r.TYPE_NAME, r.DECIMAL_DIGITS, None, None) for r in rows}
158-
assert len(d) == len(rows)
159-
return d
145+
try:
146+
table_schema = super().query_table_schema(path)
147+
except:
148+
logging.warning("Failed to get schema from information_schema, falling back to legacy approach.")
149+
150+
if not table_schema:
151+
catalog, schema, table = self._normalize_table_path(path)
152+
with conn.cursor() as cursor:
153+
cursor.columns(catalog_name=catalog, schema_name=schema, table_name=table)
154+
try:
155+
rows = cursor.fetchall()
156+
finally:
157+
conn.close()
158+
if not rows:
159+
raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns")
160+
161+
table_schema = {r.COLUMN_NAME: (r.COLUMN_NAME, r.TYPE_NAME, r.DECIMAL_DIGITS, None, None) for r in rows}
162+
assert len(table_schema) == len(rows)
163+
return table_schema
164+
else:
165+
return table_schema
166+
167+
def select_table_schema(self, path: DbPath) -> str:
168+
"""Provide SQL for selecting the table schema as (name, type, date_prec, num_prec)"""
169+
database, schema, name = self._normalize_table_path(path)
170+
info_schema_path = ["information_schema", "columns"]
171+
if database:
172+
info_schema_path.insert(0, database)
173+
174+
return (
175+
"SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale "
176+
f"FROM {'.'.join(info_schema_path)} "
177+
f"WHERE table_name = '{name}' AND table_schema = '{schema}'"
178+
)
160179

161180
def _process_table_schema(
162181
self, path: DbPath, raw_schema: Dict[str, tuple], filter_columns: Sequence[str], where: str = None

0 commit comments

Comments
 (0)