From e6fac1e53284b473a5df9907b1c7693b6cba0839 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 13 Nov 2023 11:19:09 -0800 Subject: [PATCH 01/23] v0 of concurrency --- data_diff/dbt.py | 16 ++++++++++++---- data_diff/utils.py | 28 ++++++++++++++-------------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index bf36c4fc..9ec0a42b 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -145,7 +145,8 @@ def dbt_diff( ) diff_threads.append(diff_thread) else: - _local_diff(diff_vars, json_output) + diff_thread = run_as_daemon(_local_diff, diff_vars, json_output, log_status_handler) + diff_threads.append(diff_thread) else: if json_output: print( @@ -265,7 +266,11 @@ def _get_prod_path_from_manifest(model, prod_manifest) -> Union[Tuple[str, str, return prod_database, prod_schema, prod_alias -def _local_diff(diff_vars: TDiffVars, json_output: bool = False) -> None: +def _local_diff( + diff_vars: TDiffVars, json_output: bool = False, log_status_handler: Optional[LogStatusHandler] = None +) -> None: + if log_status_handler: + log_status_handler.diff_started(diff_vars.dev_path[-1]) dev_qualified_str = ".".join(diff_vars.dev_path) prod_qualified_str = ".".join(diff_vars.prod_path) diff_output_str = _diff_output_base(dev_qualified_str, prod_qualified_str) @@ -373,6 +378,9 @@ def _local_diff(diff_vars: TDiffVars, json_output: bool = False) -> None: diff_output_str += no_differences_template() rich.print(diff_output_str) + if log_status_handler: + log_status_handler.diff_finished(diff_vars.dev_path[-1]) + def _initialize_api() -> Optional[DatafoldAPI]: datafold_host = os.environ.get("DATAFOLD_HOST") @@ -406,7 +414,7 @@ def _cloud_diff( log_status_handler: Optional[LogStatusHandler] = None, ) -> None: if log_status_handler: - log_status_handler.cloud_diff_started(diff_vars.dev_path[-1]) + log_status_handler.diff_started(diff_vars.dev_path[-1]) diff_output_str = _diff_output_base(".".join(diff_vars.dev_path), ".".join(diff_vars.prod_path)) payload = TCloudApiDataDiff( data_source1_id=datasource_id, @@ -476,7 +484,7 @@ def _cloud_diff( rich.print(diff_output_str) if log_status_handler: - log_status_handler.cloud_diff_finished(diff_vars.dev_path[-1]) + log_status_handler.diff_finished(diff_vars.dev_path[-1]) except BaseException as ex: # Catch KeyboardInterrupt too error = ex finally: diff --git a/data_diff/utils.py b/data_diff/utils.py index ee4a0f17..b9045cc1 100644 --- a/data_diff/utils.py +++ b/data_diff/utils.py @@ -485,31 +485,31 @@ def __init__(self): super().__init__() self.status = Status("") self.prefix = "" - self.cloud_diff_status = {} + self.diff_status = {} def emit(self, record): log_entry = self.format(record) - if self.cloud_diff_status: - self._update_cloud_status(log_entry) + if self.diff_status: + self._update_diff_status(log_entry) else: self.status.update(self.prefix + log_entry) def set_prefix(self, prefix_string): self.prefix = prefix_string - def cloud_diff_started(self, model_name): - self.cloud_diff_status[model_name] = "[yellow]In Progress[/]" - self._update_cloud_status() + def diff_started(self, model_name): + self.diff_status[model_name] = "[yellow]In Progress[/]" + self._update_diff_status() - def cloud_diff_finished(self, model_name): - self.cloud_diff_status[model_name] = "[green]Finished [/]" - self._update_cloud_status() + def diff_finished(self, model_name): + self.diff_status[model_name] = "[green]Finished [/]" + self._update_diff_status() - def _update_cloud_status(self, log=None): - cloud_status_string = "\n" - for model_name, status in self.cloud_diff_status.items(): - cloud_status_string += f"{status} {model_name}\n" - self.status.update(f"{cloud_status_string}{log or ''}") + def _update_diff_status(self, log=None): + status_string = "\n" + for model_name, status in self.diff_status.items(): + status_string += f"{status} {model_name}\n" + self.status.update(f"{status_string}{log or ''}") class UnknownMeta(type): From 6529027c50cd0702662b5f83bd620aac6bdfc671 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 13 Nov 2023 15:28:51 -0800 Subject: [PATCH 02/23] concurrent logging --- data_diff/__init__.py | 1 + data_diff/databases/_connect.py | 4 +++- data_diff/databases/base.py | 9 ++++--- data_diff/dbt.py | 1 + data_diff/dbt_parser.py | 2 +- data_diff/joindiff_tables.py | 42 ++++++++++++++++++--------------- 6 files changed, 35 insertions(+), 24 deletions(-) diff --git a/data_diff/__init__.py b/data_diff/__init__.py index 4ae223fb..a0b1311b 100644 --- a/data_diff/__init__.py +++ b/data_diff/__init__.py @@ -32,6 +32,7 @@ def connect_to_table( if isinstance(key_columns, str): key_columns = (key_columns,) + # TODO: maybe change args here db: Database = connect(db_info, thread_count=thread_count) if isinstance(table_name, str): diff --git a/data_diff/databases/_connect.py b/data_diff/databases/_connect.py index be55cc2d..ddf215eb 100644 --- a/data_diff/databases/_connect.py +++ b/data_diff/databases/_connect.py @@ -299,7 +299,8 @@ class Connect_SetUTC(Connect): def _connection_created(self, db): db = super()._connection_created(db) try: - db.query(db.dialect.set_timezone_to_utc()) + # TODO: this is the correct place to show those logs + db.query(db.dialect.set_timezone_to_utc(), diff_output_str="hello set utc") except NotImplementedError: logging.debug( f"Database '{db}' does not allow setting timezone. We recommend making sure it's set to 'UTC'." @@ -307,4 +308,5 @@ def _connection_created(self, db): return db +# TODO: maybe change args here connect = Connect_SetUTC(DATABASE_BY_SCHEME) diff --git a/data_diff/databases/base.py b/data_diff/databases/base.py index 9dc03909..ccc7531d 100644 --- a/data_diff/databases/base.py +++ b/data_diff/databases/base.py @@ -900,7 +900,7 @@ def name(self): def compile(self, sql_ast): return self.dialect.compile(Compiler(self), sql_ast) - def query(self, sql_ast: Union[Expr, Generator], res_type: type = None): + def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, diff_output_str: str = None): """Query the given SQL code/AST, and attempt to convert the result to type 'res_type' If given a generator, it will execute all the yielded sql queries with the same thread and cursor. @@ -925,7 +925,10 @@ def query(self, sql_ast: Union[Expr, Generator], res_type: type = None): if sql_code is SKIP: return SKIP - logger.debug("Running SQL (%s):\n%s", self.name, sql_code) + if diff_output_str: + logger.debug("Running SQL (%s): %s \n%s", self.name, diff_output_str, sql_code) + else: + logger.debug("Running SQL (%s):\n%s", self.name, sql_code) if self._interactive and isinstance(sql_ast, Select): explained_sql = self.compile(Explain(sql_ast)) @@ -991,7 +994,7 @@ def query_table_schema(self, path: DbPath) -> Dict[str, tuple]: Note: This method exists instead of select_table_schema(), just because not all databases support accessing the schema using a SQL query. """ - rows = self.query(self.select_table_schema(path), list) + rows = self.query(self.select_table_schema(path), list, path) if not rows: raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns") diff --git a/data_diff/dbt.py b/data_diff/dbt.py index 9ec0a42b..b0872a51 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -275,6 +275,7 @@ def _local_diff( prod_qualified_str = ".".join(diff_vars.prod_path) diff_output_str = _diff_output_base(dev_qualified_str, prod_qualified_str) + # TODO: figure out how to get the dbt model name in the diff output for "running sql logs" table1 = connect_to_table( diff_vars.connection, prod_qualified_str, tuple(diff_vars.primary_keys), diff_vars.threads ) diff --git a/data_diff/dbt_parser.py b/data_diff/dbt_parser.py index 4b6124d5..bc8f00d6 100644 --- a/data_diff/dbt_parser.py +++ b/data_diff/dbt_parser.py @@ -456,7 +456,7 @@ def get_pk_from_model(self, node, unique_columns: dict, pk_tag: str) -> List[str if node.unique_id in unique_columns: from_uniq = unique_columns.get(node.unique_id) if from_uniq is not None: - logger.debug("Found PKs via Uniqueness tests: " + str(from_uniq)) + logger.debug(f"Found PKs via Uniqueness tests [{node.name}]: {str(from_uniq)}") return list(from_uniq) except (KeyError, IndexError, TypeError) as e: diff --git a/data_diff/joindiff_tables.py b/data_diff/joindiff_tables.py index 6fadc5d8..4e3c163d 100644 --- a/data_diff/joindiff_tables.py +++ b/data_diff/joindiff_tables.py @@ -162,7 +162,7 @@ def _diff_tables_root(self, table1: TableSegment, table2: TableSegment, info_tre yield from self._diff_segments(None, table1, table2, info_tree, None) else: yield from self._bisect_and_diff_tables(table1, table2, info_tree) - logger.info("Diffing complete") + logger.info(f"Diffing complete: {table1.table_path} <> {table2.table_path}") if self.materialize_to_table: logger.info("Materialized diff to table '%s'.", ".".join(self.materialize_to_table)) @@ -193,8 +193,8 @@ def _diff_segments( partial(self._collect_stats, 1, table1, info_tree), partial(self._collect_stats, 2, table2, info_tree), partial(self._test_null_keys, table1, table2), - partial(self._sample_and_count_exclusive, db, diff_rows, a_cols, b_cols), - partial(self._count_diff_per_column, db, diff_rows, list(a_cols), is_diff_cols), + partial(self._sample_and_count_exclusive, db, diff_rows, a_cols, b_cols, table1, table2), + partial(self._count_diff_per_column, db, diff_rows, list(a_cols), is_diff_cols, table1, table2), partial( self._materialize_diff, db, @@ -205,8 +205,8 @@ def _diff_segments( else None, ): assert len(a_cols) == len(b_cols) - logger.debug("Querying for different rows") - diff = db.query(diff_rows, list) + logger.debug(f"Querying for different rows: {table1.table_path}") + diff = db.query(diff_rows, list, table1.table_path) info_tree.info.set_diff(diff, schema=tuple(diff_rows.schema.items())) for is_xa, is_xb, *x in diff: if is_xa and is_xb: @@ -227,7 +227,7 @@ def _diff_segments( yield "+", tuple(b_row) def _test_duplicate_keys(self, table1: TableSegment, table2: TableSegment): - logger.debug("Testing for duplicate keys") + logger.debug(f"Testing for duplicate keys: {table1.table_path} <> {table2.table_path}") # Test duplicate keys for ts in [table1, table2]: @@ -240,16 +240,16 @@ def _test_duplicate_keys(self, table1: TableSegment, table2: TableSegment): unvalidated = list(set(key_columns) - set(unique)) if unvalidated: - logger.info(f"Validating that the are no duplicate keys in columns: {unvalidated}") + logger.info(f"Validating that the are no duplicate keys in columns: {unvalidated} for {ts.table_path}") # Validate that there are no duplicate keys self.stats["validated_unique_keys"] = self.stats.get("validated_unique_keys", []) + [unvalidated] q = t.select(total=Count(), total_distinct=Count(Concat(this[unvalidated]), distinct=True)) - total, total_distinct = ts.database.query(q, tuple) + total, total_distinct = ts.database.query(q, tuple, ts.table_path) if total != total_distinct: raise ValueError("Duplicate primary keys") def _test_null_keys(self, table1, table2): - logger.debug("Testing for null keys") + logger.debug(f"Testing for null keys: {table1.table_path} <> {table2.table_path}") # Test null keys for ts in [table1, table2]: @@ -257,7 +257,7 @@ def _test_null_keys(self, table1, table2): key_columns = ts.key_columns q = t.select(*this[key_columns]).where(or_(this[k] == None for k in key_columns)) - nulls = ts.database.query(q, list) + nulls = ts.database.query(q, list, ts.table_path) if nulls: if self.skip_null_keys: logger.warning( @@ -267,7 +267,7 @@ def _test_null_keys(self, table1, table2): raise ValueError(f"NULL values in one or more primary keys of {ts.table_path}") def _collect_stats(self, i, table_seg: TableSegment, info_tree: InfoTree): - logger.debug(f"Collecting stats for table #{i}") + logger.debug(f"Collecting stats for table #{i}: {table_seg.table_path}") db = table_seg.database # Metrics @@ -288,7 +288,7 @@ def _collect_stats(self, i, table_seg: TableSegment, info_tree: InfoTree): ) col_exprs["count"] = Count() - res = db.query(table_seg.make_select().select(**col_exprs), tuple) + res = db.query(table_seg.make_select().select(**col_exprs), tuple, table_seg.table_path) for col_name, value in safezip(col_exprs, res): if value is not None: @@ -303,7 +303,7 @@ def _collect_stats(self, i, table_seg: TableSegment, info_tree: InfoTree): else: self.stats[stat_name] = value - logger.debug("Done collecting stats for table #%s", i) + logger.debug("Done collecting stats for table #%s: %s", i, table_seg.table_path) def _create_outer_join(self, table1, table2): db = table1.database @@ -334,23 +334,27 @@ def _create_outer_join(self, table1, table2): diff_rows = all_rows.where(or_(this[c] == 1 for c in is_diff_cols)) return diff_rows, a_cols, b_cols, is_diff_cols, all_rows - def _count_diff_per_column(self, db, diff_rows, cols, is_diff_cols): - logger.debug("Counting differences per column") - is_diff_cols_counts = db.query(diff_rows.select(sum_(this[c]) for c in is_diff_cols), tuple) + def _count_diff_per_column(self, db, diff_rows, cols, is_diff_cols, table1=None, table2=None): + logger.debug(f"Counting differences per column: {table1.table_path} <> {table2.table_path}") + is_diff_cols_counts = db.query( + diff_rows.select(sum_(this[c]) for c in is_diff_cols), tuple, f"{table1.table_path} <> {table2.table_path}" + ) diff_counts = {} for name, count in safezip(cols, is_diff_cols_counts): diff_counts[name] = diff_counts.get(name, 0) + (count or 0) self.stats["diff_counts"] = diff_counts - def _sample_and_count_exclusive(self, db, diff_rows, a_cols, b_cols): + def _sample_and_count_exclusive(self, db, diff_rows, a_cols, b_cols, table1=None, table2=None): if isinstance(db, (Oracle, MsSQL)): exclusive_rows_query = diff_rows.where((this.is_exclusive_a == 1) | (this.is_exclusive_b == 1)) else: exclusive_rows_query = diff_rows.where(this.is_exclusive_a | this.is_exclusive_b) if not self.sample_exclusive_rows: - logger.debug("Counting exclusive rows") - self.stats["exclusive_count"] = db.query(exclusive_rows_query.count(), int) + logger.debug(f"Counting exclusive rows: {table1.table_path} <> {table2.table_path}") + self.stats["exclusive_count"] = db.query( + exclusive_rows_query.count(), int, f"{table1.table_path} <> {table2.table_path}" + ) return logger.info("Counting and sampling exclusive rows") From 7ef9370234111e48573a9dd8c62ffea6de93d152 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 14 Nov 2023 16:21:08 -0800 Subject: [PATCH 03/23] remove todo --- data_diff/__init__.py | 1 - data_diff/databases/_connect.py | 3 +-- data_diff/dbt.py | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/data_diff/__init__.py b/data_diff/__init__.py index a0b1311b..4ae223fb 100644 --- a/data_diff/__init__.py +++ b/data_diff/__init__.py @@ -32,7 +32,6 @@ def connect_to_table( if isinstance(key_columns, str): key_columns = (key_columns,) - # TODO: maybe change args here db: Database = connect(db_info, thread_count=thread_count) if isinstance(table_name, str): diff --git a/data_diff/databases/_connect.py b/data_diff/databases/_connect.py index ddf215eb..01acac8d 100644 --- a/data_diff/databases/_connect.py +++ b/data_diff/databases/_connect.py @@ -299,8 +299,7 @@ class Connect_SetUTC(Connect): def _connection_created(self, db): db = super()._connection_created(db) try: - # TODO: this is the correct place to show those logs - db.query(db.dialect.set_timezone_to_utc(), diff_output_str="hello set utc") + db.query(db.dialect.set_timezone_to_utc()) except NotImplementedError: logging.debug( f"Database '{db}' does not allow setting timezone. We recommend making sure it's set to 'UTC'." diff --git a/data_diff/dbt.py b/data_diff/dbt.py index b0872a51..9ec0a42b 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -275,7 +275,6 @@ def _local_diff( prod_qualified_str = ".".join(diff_vars.prod_path) diff_output_str = _diff_output_base(dev_qualified_str, prod_qualified_str) - # TODO: figure out how to get the dbt model name in the diff output for "running sql logs" table1 = connect_to_table( diff_vars.connection, prod_qualified_str, tuple(diff_vars.primary_keys), diff_vars.threads ) From fb5d8ec4c5750d41d0b7aa1015345f15de4eaf4e Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 14 Nov 2023 16:22:33 -0800 Subject: [PATCH 04/23] remove todo --- data_diff/databases/_connect.py | 1 - 1 file changed, 1 deletion(-) diff --git a/data_diff/databases/_connect.py b/data_diff/databases/_connect.py index 01acac8d..be55cc2d 100644 --- a/data_diff/databases/_connect.py +++ b/data_diff/databases/_connect.py @@ -307,5 +307,4 @@ def _connection_created(self, db): return db -# TODO: maybe change args here connect = Connect_SetUTC(DATABASE_BY_SCHEME) From a56b2bebebe30b42447f37c68e025e219ed141d8 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 14 Nov 2023 16:27:17 -0800 Subject: [PATCH 05/23] better var name --- data_diff/databases/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/data_diff/databases/base.py b/data_diff/databases/base.py index 3eea8e27..af560bdd 100644 --- a/data_diff/databases/base.py +++ b/data_diff/databases/base.py @@ -925,7 +925,7 @@ def name(self): def compile(self, sql_ast): return self.dialect.compile(Compiler(self), sql_ast) - def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, diff_output_str: str = None): + def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_message: str = None): """Query the given SQL code/AST, and attempt to convert the result to type 'res_type' If given a generator, it will execute all the yielded sql queries with the same thread and cursor. @@ -950,8 +950,8 @@ def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, diff_out if sql_code is SKIP: return SKIP - if diff_output_str: - logger.debug("Running SQL (%s): %s \n%s", self.name, diff_output_str, sql_code) + if log_message: + logger.debug("Running SQL (%s): %s \n%s", self.name, log_message, sql_code) else: logger.debug("Running SQL (%s):\n%s", self.name, sql_code) From 9aa13e3f1376b8cd1fc209953579e3f86681f9e7 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 14 Nov 2023 16:32:15 -0800 Subject: [PATCH 06/23] add node name to logger --- data_diff/dbt_parser.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data_diff/dbt_parser.py b/data_diff/dbt_parser.py index bc8f00d6..ed6bbd0a 100644 --- a/data_diff/dbt_parser.py +++ b/data_diff/dbt_parser.py @@ -446,12 +446,12 @@ def get_pk_from_model(self, node, unique_columns: dict, pk_tag: str) -> List[str from_meta = [name for name, params in node.columns.items() if pk_tag in params.meta] or None if from_meta: - logger.debug("Found PKs via META: " + str(from_meta)) + logger.debug("Found PKs via META [{node.name}]: " + str(from_meta)) return from_meta from_tags = [name for name, params in node.columns.items() if pk_tag in params.tags] or None if from_tags: - logger.debug("Found PKs via Tags: " + str(from_tags)) + logger.debug("Found PKs via Tags [{node.name}]: " + str(from_tags)) return from_tags if node.unique_id in unique_columns: from_uniq = unique_columns.get(node.unique_id) From 05c79211faff2236bbcb29870a0a1a511d1de094 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Wed, 15 Nov 2023 13:50:08 -0800 Subject: [PATCH 07/23] format string logs --- data_diff/dbt_parser.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data_diff/dbt_parser.py b/data_diff/dbt_parser.py index ed6bbd0a..0d864a57 100644 --- a/data_diff/dbt_parser.py +++ b/data_diff/dbt_parser.py @@ -446,12 +446,12 @@ def get_pk_from_model(self, node, unique_columns: dict, pk_tag: str) -> List[str from_meta = [name for name, params in node.columns.items() if pk_tag in params.meta] or None if from_meta: - logger.debug("Found PKs via META [{node.name}]: " + str(from_meta)) + logger.debug(f"Found PKs via META [{node.name}]: " + str(from_meta)) return from_meta from_tags = [name for name, params in node.columns.items() if pk_tag in params.tags] or None if from_tags: - logger.debug("Found PKs via Tags [{node.name}]: " + str(from_tags)) + logger.debug(f"Found PKs via Tags [{node.name}]: " + str(from_tags)) return from_tags if node.unique_id in unique_columns: from_uniq = unique_columns.get(node.unique_id) From be3f631d4e7801bc243324f5397db5e22afbd9fd Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Wed, 15 Nov 2023 14:19:27 -0800 Subject: [PATCH 08/23] add optional logger param --- tests/test_dbt.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_dbt.py b/tests/test_dbt.py index c281b6fb..20f5ed33 100644 --- a/tests/test_dbt.py +++ b/tests/test_dbt.py @@ -407,7 +407,7 @@ def test_diff_is_not_cloud(self, mock_print, mock_dbt_parser, mock_cloud_diff, m mock_dbt_parser_inst.get_models.assert_called_once() mock_dbt_parser_inst.set_connection.assert_called_once() mock_cloud_diff.assert_not_called() - mock_local_diff.assert_called_once_with(diff_vars, False) + mock_local_diff.assert_called_once_with(diff_vars, False, None) mock_print.assert_not_called() @patch("data_diff.dbt._get_diff_vars") @@ -481,7 +481,7 @@ def test_diff_only_prod_db(self, mock_print, mock_dbt_parser, mock_cloud_diff, m mock_dbt_parser_inst.get_models.assert_called_once() mock_dbt_parser_inst.set_connection.assert_called_once() mock_cloud_diff.assert_not_called() - mock_local_diff.assert_called_once_with(diff_vars, False) + mock_local_diff.assert_called_once_with(diff_vars, False, None) mock_print.assert_not_called() @patch("data_diff.dbt._get_diff_vars") @@ -518,7 +518,7 @@ def test_diff_only_prod_schema( mock_dbt_parser_inst.get_models.assert_called_once() mock_dbt_parser_inst.set_connection.assert_called_once() mock_cloud_diff.assert_not_called() - mock_local_diff.assert_called_once_with(diff_vars, False) + mock_local_diff.assert_called_once_with(diff_vars, False, None) mock_print.assert_not_called() @patch("data_diff.dbt._initialize_api") From 88592adbf08a3e0efa022a50dcf644ba7fc4c074 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Fri, 17 Nov 2023 15:24:08 -0800 Subject: [PATCH 09/23] avoid extra threads --- data_diff/dbt.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index 9ec0a42b..9ea1dfaf 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -275,10 +275,8 @@ def _local_diff( prod_qualified_str = ".".join(diff_vars.prod_path) diff_output_str = _diff_output_base(dev_qualified_str, prod_qualified_str) - table1 = connect_to_table( - diff_vars.connection, prod_qualified_str, tuple(diff_vars.primary_keys), diff_vars.threads - ) - table2 = connect_to_table(diff_vars.connection, dev_qualified_str, tuple(diff_vars.primary_keys), diff_vars.threads) + table1 = connect_to_table(diff_vars.connection, prod_qualified_str, tuple(diff_vars.primary_keys)) + table2 = connect_to_table(diff_vars.connection, dev_qualified_str, tuple(diff_vars.primary_keys)) try: table1_columns = table1.get_schema() From e5ee4df29b1d7e91bb93afba1ddcc1c68cd6bda1 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Fri, 17 Nov 2023 16:20:03 -0800 Subject: [PATCH 10/23] use thread pools --- data_diff/dbt.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index 9ea1dfaf..ab073188 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -8,6 +8,7 @@ import pydantic import rich from rich.prompt import Prompt +from concurrent.futures import ThreadPoolExecutor from data_diff.errors import ( DataDiffCustomSchemaNoConfigError, @@ -80,7 +81,6 @@ def dbt_diff( production_schema_flag: Optional[str] = None, ) -> None: print_version_info() - diff_threads = [] set_entrypoint_name(os.getenv("DATAFOLD_TRIGGERED_BY", "CLI-dbt")) dbt_parser = DbtParser(profiles_dir_override, project_dir_override, state) models = dbt_parser.get_models(dbt_selection) @@ -112,7 +112,9 @@ def dbt_diff( else: dbt_parser.set_connection() - with log_status_handler.status if log_status_handler else nullcontext(): + with log_status_handler.status if log_status_handler else nullcontext(), ThreadPoolExecutor( + max_workers=dbt_parser.threads + ) as executor: for model in models: if log_status_handler: log_status_handler.set_prefix(f"Diffing {model.alias} \n") @@ -140,13 +142,9 @@ def dbt_diff( if diff_vars.primary_keys: if is_cloud: - diff_thread = run_as_daemon( - _cloud_diff, diff_vars, config.datasource_id, api, org_meta, log_status_handler - ) - diff_threads.append(diff_thread) + executor.submit(_cloud_diff, diff_vars, config.datasource_id, api, org_meta, log_status_handler) else: - diff_thread = run_as_daemon(_local_diff, diff_vars, json_output, log_status_handler) - diff_threads.append(diff_thread) + executor.submit(_local_diff, diff_vars, json_output, log_status_handler) else: if json_output: print( @@ -166,11 +164,6 @@ def dbt_diff( + "Skipped due to unknown primary key. Add uniqueness tests, meta, or tags.\n" ) - # wait for all threads - if diff_threads: - for thread in diff_threads: - thread.join() - _extension_notification() From 76d1302455ea00eb6328121382722c0b8611b943 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Fri, 17 Nov 2023 16:41:05 -0800 Subject: [PATCH 11/23] not multithreaded at the connection level anymore --- tests/test_dbt.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_dbt.py b/tests/test_dbt.py index 20f5ed33..bcde9258 100644 --- a/tests/test_dbt.py +++ b/tests/test_dbt.py @@ -93,8 +93,8 @@ def test_local_diff(self, mock_diff_tables): ) self.assertEqual(len(mock_diff_tables.call_args[1]["extra_columns"]), 2) self.assertEqual(mock_connect.call_count, 2) - mock_connect.assert_any_call(connection, ".".join(dev_qualified_list), tuple(expected_primary_keys), threads) - mock_connect.assert_any_call(connection, ".".join(prod_qualified_list), tuple(expected_primary_keys), threads) + mock_connect.assert_any_call(connection, ".".join(dev_qualified_list), tuple(expected_primary_keys)) + mock_connect.assert_any_call(connection, ".".join(prod_qualified_list), tuple(expected_primary_keys)) mock_diff.get_stats_string.assert_called_once() @patch("data_diff.dbt.diff_tables") @@ -180,8 +180,8 @@ def test_local_diff_no_diffs(self, mock_diff_tables): ) self.assertEqual(len(mock_diff_tables.call_args[1]["extra_columns"]), 2) self.assertEqual(mock_connect.call_count, 2) - mock_connect.assert_any_call(connection, ".".join(dev_qualified_list), tuple(expected_primary_keys), None) - mock_connect.assert_any_call(connection, ".".join(prod_qualified_list), tuple(expected_primary_keys), None) + mock_connect.assert_any_call(connection, ".".join(dev_qualified_list), tuple(expected_primary_keys)) + mock_connect.assert_any_call(connection, ".".join(prod_qualified_list), tuple(expected_primary_keys)) mock_diff.get_stats_string.assert_not_called() @patch("data_diff.dbt.rich.print") From 5812a169129565fbd8848c49cc60054c10b683e1 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 20 Nov 2023 11:26:06 -0800 Subject: [PATCH 12/23] show errors as they happen --- data_diff/dbt.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index ab073188..99ff18af 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -8,7 +8,7 @@ import pydantic import rich from rich.prompt import Prompt -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from data_diff.errors import ( DataDiffCustomSchemaNoConfigError, @@ -112,6 +112,8 @@ def dbt_diff( else: dbt_parser.set_connection() + futures = [] + with log_status_handler.status if log_status_handler else nullcontext(), ThreadPoolExecutor( max_workers=dbt_parser.threads ) as executor: @@ -142,9 +144,12 @@ def dbt_diff( if diff_vars.primary_keys: if is_cloud: - executor.submit(_cloud_diff, diff_vars, config.datasource_id, api, org_meta, log_status_handler) + future = executor.submit( + _cloud_diff, diff_vars, config.datasource_id, api, org_meta, log_status_handler + ) else: - executor.submit(_local_diff, diff_vars, json_output, log_status_handler) + future = executor.submit(_local_diff, diff_vars, json_output, log_status_handler) + futures.append(future) else: if json_output: print( @@ -164,6 +169,12 @@ def dbt_diff( + "Skipped due to unknown primary key. Add uniqueness tests, meta, or tags.\n" ) + for future in as_completed(futures): + try: + future.result() # if error occurred, it will be raised here + except Exception as e: + logger.error(f"An error occurred during the execution of a diff task: {model.unique_id} - {e}") + _extension_notification() From 8f6eb1b2b4bc0c5448e56e151774745699419c34 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 20 Nov 2023 14:24:39 -0800 Subject: [PATCH 13/23] show full stacktrace on error --- data_diff/dbt.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index 99ff18af..ff3c8afe 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -9,6 +9,7 @@ import rich from rich.prompt import Prompt from concurrent.futures import ThreadPoolExecutor, as_completed +import traceback from data_diff.errors import ( DataDiffCustomSchemaNoConfigError, @@ -173,7 +174,9 @@ def dbt_diff( try: future.result() # if error occurred, it will be raised here except Exception as e: - logger.error(f"An error occurred during the execution of a diff task: {model.unique_id} - {e}") + tb_str = traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__) + tb_str = "".join(tb_str) # Format traceback string from list + logger.error(f"An error occurred during the execution of a diff task: {model.unique_id} - {e}\n{tb_str}") _extension_notification() From 30deafaf30e6e221aca0253f0872b6b53474bd89 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 20 Nov 2023 14:29:36 -0800 Subject: [PATCH 14/23] rearrange trace --- data_diff/dbt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index ff3c8afe..a8c6fd6f 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -176,7 +176,7 @@ def dbt_diff( except Exception as e: tb_str = traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__) tb_str = "".join(tb_str) # Format traceback string from list - logger.error(f"An error occurred during the execution of a diff task: {model.unique_id} - {e}\n{tb_str}") + logger.error(f"{tb_str}\n An error occurred during the execution of a diff task: {model.unique_id} - {e}") _extension_notification() From 6fc9900ca3925cd0fb34f91d844439d3f17ac53b Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 20 Nov 2023 14:35:10 -0800 Subject: [PATCH 15/23] more logs for debugging --- data_diff/databases/base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/data_diff/databases/base.py b/data_diff/databases/base.py index af560bdd..d7096303 100644 --- a/data_diff/databases/base.py +++ b/data_diff/databases/base.py @@ -1019,7 +1019,7 @@ def query_table_schema(self, path: DbPath) -> Dict[str, tuple]: Note: This method exists instead of select_table_schema(), just because not all databases support accessing the schema using a SQL query. """ - rows = self.query(self.select_table_schema(path), list, path) + rows = self.query(self.select_table_schema(path), list, log_message=path) if not rows: raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns") @@ -1041,7 +1041,7 @@ def query_table_unique_columns(self, path: DbPath) -> List[str]: """Query the table for its unique columns for table in 'path', and return {column}""" if not self.SUPPORTS_UNIQUE_CONSTAINT: raise NotImplementedError("This database doesn't support 'unique' constraints") - res = self.query(self.select_table_unique_columns(path), List[str]) + res = self.query(self.select_table_unique_columns(path), List[str], log_message=path) return list(res) def _process_table_schema( @@ -1083,7 +1083,9 @@ def _refine_coltypes( fields = [Code(self.dialect.normalize_uuid(self.dialect.quote(c), String_UUID())) for c in text_columns] samples_by_row = self.query( - table(*table_path).select(*fields).where(Code(where) if where else SKIP).limit(sample_size), list + table(*table_path).select(*fields).where(Code(where) if where else SKIP).limit(sample_size), + list, + log_message=table_path, ) if not samples_by_row: raise ValueError(f"Table {table_path} is empty.") From 8fe43def3cf74138228b5215d56091355da5b4af Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 20 Nov 2023 14:56:43 -0800 Subject: [PATCH 16/23] update for threads mocking --- tests/test_dbt.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_dbt.py b/tests/test_dbt.py index bcde9258..31af99eb 100644 --- a/tests/test_dbt.py +++ b/tests/test_dbt.py @@ -248,6 +248,7 @@ def test_diff_is_cloud( where = "a_string" config = TDatadiffConfig(prod_database="prod_db", prod_schema="prod_schema", datasource_id=1) mock_dbt_parser_inst = Mock() + mock_dbt_parser_inst.threads = threads mock_model = Mock() mock_api.get_data_source.return_value = TCloudApiDataSource(id=1, type="snowflake", name="snowflake") mock_initialize_api.return_value = mock_api @@ -386,6 +387,7 @@ def test_diff_is_not_cloud(self, mock_print, mock_dbt_parser, mock_cloud_diff, m threads = None where = "a_string" mock_dbt_parser_inst = Mock() + mock_dbt_parser_inst.threads = threads mock_dbt_parser.return_value = mock_dbt_parser_inst mock_model = Mock() mock_dbt_parser_inst.get_models.return_value = [mock_model] @@ -423,6 +425,7 @@ def test_diff_state_model_dne( threads = None where = "a_string" mock_dbt_parser_inst = Mock() + mock_dbt_parser_inst.threads = threads mock_dbt_parser.return_value = mock_dbt_parser_inst mock_model = Mock() mock_dbt_parser_inst.get_models.return_value = [mock_model] @@ -460,6 +463,7 @@ def test_diff_only_prod_db(self, mock_print, mock_dbt_parser, mock_cloud_diff, m threads = None where = "a_string" mock_dbt_parser_inst = Mock() + mock_dbt_parser_inst.threads = threads mock_dbt_parser.return_value = mock_dbt_parser_inst mock_model = Mock() mock_dbt_parser_inst.get_models.return_value = [mock_model] @@ -497,6 +501,7 @@ def test_diff_only_prod_schema( threads = None where = "a_string" mock_dbt_parser_inst = Mock() + mock_dbt_parser_inst.threads = threads mock_dbt_parser.return_value = mock_dbt_parser_inst mock_model = Mock() mock_dbt_parser_inst.get_models.return_value = [mock_model] @@ -543,6 +548,7 @@ def test_diff_is_cloud_no_pks( mock_model = Mock() connection = {} threads = None + mock_dbt_parser_inst.threads = threads where = "a_string" config = TDatadiffConfig(prod_database="prod_db", prod_schema="prod_schema", datasource_id=1) mock_api = Mock() @@ -584,6 +590,7 @@ def test_diff_not_is_cloud_no_pks( threads = None where = "a_string" mock_dbt_parser_inst = Mock() + mock_dbt_parser_inst.threads = threads mock_dbt_parser.return_value = mock_dbt_parser_inst mock_model = Mock() mock_dbt_parser_inst.get_models.return_value = [mock_model] From 12bebcc2afac655846fba4a91cf1c4066f9fc20b Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 20 Nov 2023 15:03:24 -0800 Subject: [PATCH 17/23] clear log params --- data_diff/joindiff_tables.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/data_diff/joindiff_tables.py b/data_diff/joindiff_tables.py index 4e3c163d..e79cb271 100644 --- a/data_diff/joindiff_tables.py +++ b/data_diff/joindiff_tables.py @@ -206,7 +206,7 @@ def _diff_segments( ): assert len(a_cols) == len(b_cols) logger.debug(f"Querying for different rows: {table1.table_path}") - diff = db.query(diff_rows, list, table1.table_path) + diff = db.query(diff_rows, list, log_message=table1.table_path) info_tree.info.set_diff(diff, schema=tuple(diff_rows.schema.items())) for is_xa, is_xb, *x in diff: if is_xa and is_xb: @@ -244,7 +244,7 @@ def _test_duplicate_keys(self, table1: TableSegment, table2: TableSegment): # Validate that there are no duplicate keys self.stats["validated_unique_keys"] = self.stats.get("validated_unique_keys", []) + [unvalidated] q = t.select(total=Count(), total_distinct=Count(Concat(this[unvalidated]), distinct=True)) - total, total_distinct = ts.database.query(q, tuple, ts.table_path) + total, total_distinct = ts.database.query(q, tuple, log_message=ts.table_path) if total != total_distinct: raise ValueError("Duplicate primary keys") @@ -257,7 +257,7 @@ def _test_null_keys(self, table1, table2): key_columns = ts.key_columns q = t.select(*this[key_columns]).where(or_(this[k] == None for k in key_columns)) - nulls = ts.database.query(q, list, ts.table_path) + nulls = ts.database.query(q, list, log_message=ts.table_path) if nulls: if self.skip_null_keys: logger.warning( @@ -288,7 +288,7 @@ def _collect_stats(self, i, table_seg: TableSegment, info_tree: InfoTree): ) col_exprs["count"] = Count() - res = db.query(table_seg.make_select().select(**col_exprs), tuple, table_seg.table_path) + res = db.query(table_seg.make_select().select(**col_exprs), tuple, log_message=table_seg.table_path) for col_name, value in safezip(col_exprs, res): if value is not None: @@ -337,7 +337,9 @@ def _create_outer_join(self, table1, table2): def _count_diff_per_column(self, db, diff_rows, cols, is_diff_cols, table1=None, table2=None): logger.debug(f"Counting differences per column: {table1.table_path} <> {table2.table_path}") is_diff_cols_counts = db.query( - diff_rows.select(sum_(this[c]) for c in is_diff_cols), tuple, f"{table1.table_path} <> {table2.table_path}" + diff_rows.select(sum_(this[c]) for c in is_diff_cols), + tuple, + log_message=f"{table1.table_path} <> {table2.table_path}", ) diff_counts = {} for name, count in safezip(cols, is_diff_cols_counts): @@ -353,7 +355,7 @@ def _sample_and_count_exclusive(self, db, diff_rows, a_cols, b_cols, table1=None if not self.sample_exclusive_rows: logger.debug(f"Counting exclusive rows: {table1.table_path} <> {table2.table_path}") self.stats["exclusive_count"] = db.query( - exclusive_rows_query.count(), int, f"{table1.table_path} <> {table2.table_path}" + exclusive_rows_query.count(), int, log_message=f"{table1.table_path} <> {table2.table_path}" ) return From 57a9eec0b93e8b118e52f6418f0aafa0d185e95e Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 20 Nov 2023 17:22:19 -0800 Subject: [PATCH 18/23] remove extra space --- data_diff/dbt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index a8c6fd6f..0037353b 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -176,7 +176,7 @@ def dbt_diff( except Exception as e: tb_str = traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__) tb_str = "".join(tb_str) # Format traceback string from list - logger.error(f"{tb_str}\n An error occurred during the execution of a diff task: {model.unique_id} - {e}") + logger.error(f"{tb_str}\nAn error occurred during the execution of a diff task: {model.unique_id} - {e}") _extension_notification() From 5cd12ff769abb2b90fb32926739323e028a3604e Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 5 Dec 2023 11:32:36 -0800 Subject: [PATCH 19/23] remove long traceback --- data_diff/dbt.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index 0037353b..99ff18af 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -9,7 +9,6 @@ import rich from rich.prompt import Prompt from concurrent.futures import ThreadPoolExecutor, as_completed -import traceback from data_diff.errors import ( DataDiffCustomSchemaNoConfigError, @@ -174,9 +173,7 @@ def dbt_diff( try: future.result() # if error occurred, it will be raised here except Exception as e: - tb_str = traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__) - tb_str = "".join(tb_str) # Format traceback string from list - logger.error(f"{tb_str}\nAn error occurred during the execution of a diff task: {model.unique_id} - {e}") + logger.error(f"An error occurred during the execution of a diff task: {model.unique_id} - {e}") _extension_notification() From 6d41f2af421af8d8a7d66b18625ffd07aa52f4f1 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 5 Dec 2023 13:16:29 -0800 Subject: [PATCH 20/23] Ensure log_message is optional Co-authored-by: Dan Lawin --- data_diff/databases/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_diff/databases/base.py b/data_diff/databases/base.py index 47e6b6fe..bf165461 100644 --- a/data_diff/databases/base.py +++ b/data_diff/databases/base.py @@ -931,7 +931,7 @@ def name(self): def compile(self, sql_ast): return self.dialect.compile(Compiler(self), sql_ast) - def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_message: str = None): + def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_message: Optional[str] = None): """Query the given SQL code/AST, and attempt to convert the result to type 'res_type' If given a generator, it will execute all the yielded sql queries with the same thread and cursor. From 02e147fa607e8f67de7602a03586e80bddbae3f5 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 5 Dec 2023 13:41:42 -0800 Subject: [PATCH 21/23] map threaded result to proper model id --- data_diff/dbt.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/data_diff/dbt.py b/data_diff/dbt.py index 99ff18af..ef780429 100644 --- a/data_diff/dbt.py +++ b/data_diff/dbt.py @@ -112,7 +112,7 @@ def dbt_diff( else: dbt_parser.set_connection() - futures = [] + futures = {} with log_status_handler.status if log_status_handler else nullcontext(), ThreadPoolExecutor( max_workers=dbt_parser.threads @@ -149,7 +149,7 @@ def dbt_diff( ) else: future = executor.submit(_local_diff, diff_vars, json_output, log_status_handler) - futures.append(future) + futures[future] = model else: if json_output: print( @@ -170,6 +170,7 @@ def dbt_diff( ) for future in as_completed(futures): + model = futures[future] try: future.result() # if error occurred, it will be raised here except Exception as e: From 852dc3e5d96cbc5e2370d1649906dc3cab2c68a1 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 5 Dec 2023 14:36:10 -0800 Subject: [PATCH 22/23] explicit type and optional --- data_diff/joindiff_tables.py | 21 +++++++++++++++++++-- data_diff_demo | 1 + datafold-demo-sung | 1 + 3 files changed, 21 insertions(+), 2 deletions(-) create mode 160000 data_diff_demo create mode 160000 datafold-demo-sung diff --git a/data_diff/joindiff_tables.py b/data_diff/joindiff_tables.py index e79cb271..8e7fcf30 100644 --- a/data_diff/joindiff_tables.py +++ b/data_diff/joindiff_tables.py @@ -334,7 +334,16 @@ def _create_outer_join(self, table1, table2): diff_rows = all_rows.where(or_(this[c] == 1 for c in is_diff_cols)) return diff_rows, a_cols, b_cols, is_diff_cols, all_rows - def _count_diff_per_column(self, db, diff_rows, cols, is_diff_cols, table1=None, table2=None): + def _count_diff_per_column( + self, + db, + diff_rows, + cols, + is_diff_cols, + table1: Optional[TableSegment] = None, + table2: Optional[TableSegment] = None, + ): + logger.info(type(table1)) logger.debug(f"Counting differences per column: {table1.table_path} <> {table2.table_path}") is_diff_cols_counts = db.query( diff_rows.select(sum_(this[c]) for c in is_diff_cols), @@ -346,7 +355,15 @@ def _count_diff_per_column(self, db, diff_rows, cols, is_diff_cols, table1=None, diff_counts[name] = diff_counts.get(name, 0) + (count or 0) self.stats["diff_counts"] = diff_counts - def _sample_and_count_exclusive(self, db, diff_rows, a_cols, b_cols, table1=None, table2=None): + def _sample_and_count_exclusive( + self, + db, + diff_rows, + a_cols, + b_cols, + table1: Optional[TableSegment] = None, + table2: Optional[TableSegment] = None, + ): if isinstance(db, (Oracle, MsSQL)): exclusive_rows_query = diff_rows.where((this.is_exclusive_a == 1) | (this.is_exclusive_b == 1)) else: diff --git a/data_diff_demo b/data_diff_demo new file mode 160000 index 00000000..d0784e8d --- /dev/null +++ b/data_diff_demo @@ -0,0 +1 @@ +Subproject commit d0784e8de9fc7958f91a599fa454be4f8b09c60d diff --git a/datafold-demo-sung b/datafold-demo-sung new file mode 160000 index 00000000..6ebfb06d --- /dev/null +++ b/datafold-demo-sung @@ -0,0 +1 @@ +Subproject commit 6ebfb06d1e0937309384cdb4955e6dbd23387256 From 81d78ddbee377d31cdd61db012c0d2897bb094da Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 5 Dec 2023 14:36:21 -0800 Subject: [PATCH 23/23] rm submodules again --- data_diff_demo | 1 - datafold-demo-sung | 1 - 2 files changed, 2 deletions(-) delete mode 160000 data_diff_demo delete mode 160000 datafold-demo-sung diff --git a/data_diff_demo b/data_diff_demo deleted file mode 160000 index d0784e8d..00000000 --- a/data_diff_demo +++ /dev/null @@ -1 +0,0 @@ -Subproject commit d0784e8de9fc7958f91a599fa454be4f8b09c60d diff --git a/datafold-demo-sung b/datafold-demo-sung deleted file mode 160000 index 6ebfb06d..00000000 --- a/datafold-demo-sung +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 6ebfb06d1e0937309384cdb4955e6dbd23387256