From 94b6d93889e293f111694da53c752932f490e763 Mon Sep 17 00:00:00 2001 From: Ilia Pinchuk Date: Sat, 28 Oct 2023 17:10:27 +0600 Subject: [PATCH 1/3] feat: support TOP operator --- data_diff/databases/base.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/data_diff/databases/base.py b/data_diff/databases/base.py index 8caa6817..f415a6cd 100644 --- a/data_diff/databases/base.py +++ b/data_diff/databases/base.py @@ -202,6 +202,7 @@ class BaseDialect(abc.ABC): TYPE_CLASSES: ClassVar[Dict[str, Type[ColType]]] = {} PLACEHOLDER_TABLE = None # Used for Oracle + USE_TOP_INSTEAD_LIMIT: bool = False # True for MsSQL or Teradata def parse_table_name(self, name: str) -> DbPath: "Parse the given table name into a DbPath" @@ -471,7 +472,10 @@ def render_select(self, parent_c: Compiler, elem: Select) -> str: columns = ", ".join(map(compile_fn, elem.columns)) if elem.columns else "*" distinct = "DISTINCT " if elem.distinct else "" optimizer_hints = self.optimizer_hints(elem.optimizer_hints) if elem.optimizer_hints else "" - select = f"SELECT {optimizer_hints}{distinct}{columns}" + if elem.limit_expr is not None and self.USE_TOP_INSTEAD_LIMIT: + select = f"SELECT TOP {elem.limit_expr} {optimizer_hints}{distinct}{columns}" + else: + select = f"SELECT {optimizer_hints}{distinct}{columns}" if elem.table: select += " FROM " + self.compile(c, elem.table) @@ -491,7 +495,7 @@ def render_select(self, parent_c: Compiler, elem: Select) -> str: if elem.order_by_exprs: select += " ORDER BY " + ", ".join(map(compile_fn, elem.order_by_exprs)) - if elem.limit_expr is not None: + if elem.limit_expr is not None and not self.USE_TOP_INSTEAD_LIMIT: has_order_by = bool(elem.order_by_exprs) select += " " + self.offset_limit(0, elem.limit_expr, has_order_by=has_order_by) From 066545220d95d4e1e0dc487beef8a5ec8f76d69c Mon Sep 17 00:00:00 2001 From: Ilia Pinchuk Date: Thu, 9 Nov 2023 02:50:21 +0600 Subject: [PATCH 2/3] feat: use a separate render function for limited query --- data_diff/databases/base.py | 23 +++++++++++------------ data_diff/databases/mssql.py | 10 +++++++--- data_diff/databases/oracle.py | 10 +++++++--- tests/test_query.py | 11 ++++++++--- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/data_diff/databases/base.py b/data_diff/databases/base.py index f415a6cd..9dc03909 100644 --- a/data_diff/databases/base.py +++ b/data_diff/databases/base.py @@ -202,7 +202,6 @@ class BaseDialect(abc.ABC): TYPE_CLASSES: ClassVar[Dict[str, Type[ColType]]] = {} PLACEHOLDER_TABLE = None # Used for Oracle - USE_TOP_INSTEAD_LIMIT: bool = False # True for MsSQL or Teradata def parse_table_name(self, name: str) -> DbPath: "Parse the given table name into a DbPath" @@ -472,10 +471,7 @@ def render_select(self, parent_c: Compiler, elem: Select) -> str: columns = ", ".join(map(compile_fn, elem.columns)) if elem.columns else "*" distinct = "DISTINCT " if elem.distinct else "" optimizer_hints = self.optimizer_hints(elem.optimizer_hints) if elem.optimizer_hints else "" - if elem.limit_expr is not None and self.USE_TOP_INSTEAD_LIMIT: - select = f"SELECT TOP {elem.limit_expr} {optimizer_hints}{distinct}{columns}" - else: - select = f"SELECT {optimizer_hints}{distinct}{columns}" + select = f"SELECT {optimizer_hints}{distinct}{columns}" if elem.table: select += " FROM " + self.compile(c, elem.table) @@ -495,9 +491,9 @@ def render_select(self, parent_c: Compiler, elem: Select) -> str: if elem.order_by_exprs: select += " ORDER BY " + ", ".join(map(compile_fn, elem.order_by_exprs)) - if elem.limit_expr is not None and not self.USE_TOP_INSTEAD_LIMIT: + if elem.limit_expr is not None: has_order_by = bool(elem.order_by_exprs) - select += " " + self.offset_limit(0, elem.limit_expr, has_order_by=has_order_by) + select = self.limit_select(select_query=select, offset=0, limit=elem.limit_expr, has_order_by=has_order_by) if parent_c.in_select: select = f"({select}) {c.new_unique_name()}" @@ -609,14 +605,17 @@ def render_inserttotable(self, c: Compiler, elem: InsertToTable) -> str: return f"INSERT INTO {self.compile(c, elem.path)}{columns} {expr}" - def offset_limit( - self, offset: Optional[int] = None, limit: Optional[int] = None, has_order_by: Optional[bool] = None + def limit_select( + self, + select_query: str, + offset: Optional[int] = None, + limit: Optional[int] = None, + has_order_by: Optional[bool] = None, ) -> str: - "Provide SQL fragment for limit and offset inside a select" if offset: raise NotImplementedError("No support for OFFSET in query") - return f"LIMIT {limit}" + return f"SELECT * FROM ({select_query}) AS LIMITED_SELECT LIMIT {limit}" def concat(self, items: List[str]) -> str: "Provide SQL for concatenating a bunch of columns into a string" @@ -1107,7 +1106,7 @@ def _query_cursor(self, c, sql_code: str) -> QueryResult: return result except Exception as _e: # logger.exception(e) - # logger.error(f'Caused by SQL: {sql_code}') + # logger.error(f"Caused by SQL: {sql_code}") raise def _query_conn(self, conn, sql_code: Union[str, ThreadLocalInterpreter]) -> QueryResult: diff --git a/data_diff/databases/mssql.py b/data_diff/databases/mssql.py index 7f039cc6..b6ffa25c 100644 --- a/data_diff/databases/mssql.py +++ b/data_diff/databases/mssql.py @@ -110,8 +110,12 @@ def is_distinct_from(self, a: str, b: str) -> str: # See: https://stackoverflow.com/a/18684859/857383 return f"(({a}<>{b} OR {a} IS NULL OR {b} IS NULL) AND NOT({a} IS NULL AND {b} IS NULL))" - def offset_limit( - self, offset: Optional[int] = None, limit: Optional[int] = None, has_order_by: Optional[bool] = None + def limit_select( + self, + select_query: str, + offset: Optional[int] = None, + limit: Optional[int] = None, + has_order_by: Optional[bool] = None, ) -> str: if offset: raise NotImplementedError("No support for OFFSET in query") @@ -121,7 +125,7 @@ def offset_limit( result += "ORDER BY 1" result += f" OFFSET 0 ROWS FETCH NEXT {limit} ROWS ONLY" - return result + return f"SELECT * FROM ({select_query}) AS LIMITED_SELECT {result}" def constant_values(self, rows) -> str: values = ", ".join("(%s)" % ", ".join(self._constant_value(v) for v in row) for row in rows) diff --git a/data_diff/databases/oracle.py b/data_diff/databases/oracle.py index a8b8b75b..32bd30ef 100644 --- a/data_diff/databases/oracle.py +++ b/data_diff/databases/oracle.py @@ -64,13 +64,17 @@ def quote(self, s: str): def to_string(self, s: str): return f"cast({s} as varchar(1024))" - def offset_limit( - self, offset: Optional[int] = None, limit: Optional[int] = None, has_order_by: Optional[bool] = None + def limit_select( + self, + select_query: str, + offset: Optional[int] = None, + limit: Optional[int] = None, + has_order_by: Optional[bool] = None, ) -> str: if offset: raise NotImplementedError("No support for OFFSET in query") - return f"FETCH NEXT {limit} ROWS ONLY" + return f"SELECT * FROM ({select_query}) FETCH NEXT {limit} ROWS ONLY" def concat(self, items: List[str]) -> str: joined_exprs = " || ".join(items) diff --git a/tests/test_query.py b/tests/test_query.py index 69900b4b..c175d138 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -50,11 +50,16 @@ def current_database(self) -> str: def current_schema(self) -> str: return "current_schema()" - def offset_limit( - self, offset: Optional[int] = None, limit: Optional[int] = None, has_order_by: Optional[bool] = None + def limit_select( + self, + select_query: str, + offset: Optional[int] = None, + limit: Optional[int] = None, + has_order_by: Optional[bool] = None, ) -> str: x = offset and f"OFFSET {offset}", limit and f"LIMIT {limit}" - return " ".join(filter(None, x)) + result = " ".join(filter(None, x)) + return f"SELECT * FROM ({select_query}) AS LIMITED_SELECT {result}" def explain_as_text(self, query: str) -> str: return f"explain {query}" From 0bd24a3cabbb9dfdc7c3080c27c34a3cc4605500 Mon Sep 17 00:00:00 2001 From: Ilia Pinchuk Date: Thu, 9 Nov 2023 03:00:53 +0600 Subject: [PATCH 3/3] feat: update tests --- tests/test_query.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/test_query.py b/tests/test_query.py index c175d138..0625a75d 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -197,7 +197,7 @@ def test_funcs(self): t = table("a") q = c.compile(t.order_by(Random()).limit(10)) - self.assertEqual(q, "SELECT * FROM a ORDER BY random() LIMIT 10") + self.assertEqual(q, "SELECT * FROM (SELECT * FROM a ORDER BY random()) AS LIMITED_SELECT LIMIT 10") q = c.compile(t.select(coalesce(this.a, this.b))) self.assertEqual(q, "SELECT COALESCE(a, b) FROM a") @@ -215,7 +215,7 @@ def test_select_distinct(self): # selects stay apart q = c.compile(t.limit(10).select(this.b, distinct=True)) - self.assertEqual(q, "SELECT DISTINCT b FROM (SELECT * FROM a LIMIT 10) tmp1") + self.assertEqual(q, "SELECT DISTINCT b FROM (SELECT * FROM (SELECT * FROM a) AS LIMITED_SELECT LIMIT 10) tmp1") q = c.compile(t.select(this.b, distinct=True).select(distinct=False)) self.assertEqual(q, "SELECT * FROM (SELECT DISTINCT b FROM a) tmp2") @@ -231,7 +231,9 @@ def test_select_with_optimizer_hints(self): self.assertEqual(q, "SELECT /*+ PARALLEL(a 16) */ b FROM a WHERE (b > 10)") q = c.compile(t.limit(10).select(this.b, optimizer_hints="PARALLEL(a 16)")) - self.assertEqual(q, "SELECT /*+ PARALLEL(a 16) */ b FROM (SELECT * FROM a LIMIT 10) tmp1") + self.assertEqual( + q, "SELECT /*+ PARALLEL(a 16) */ b FROM (SELECT * FROM (SELECT * FROM a) AS LIMITED_SELECT LIMIT 10) tmp1" + ) q = c.compile(t.select(this.a).group_by(this.b).agg(this.c).select(optimizer_hints="PARALLEL(a 16)")) self.assertEqual(