Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit fb1c323

Browse files
authored
Merge pull request #242 from datafold/joindiff
Join-diff (in-db) + new query builder
2 parents 70ea608 + 4772f06 commit fb1c323

39 files changed

+2741
-768
lines changed

README.md

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,20 @@ rows across two different databases.
1717
* 🔍 Outputs [diff of rows](#example-command-and-output) in detail
1818
* 🚨 Simple CLI/API to create monitoring and alerts
1919
* 🔁 Bridges column types of different formats and levels of precision (e.g. Double ⇆ Float ⇆ Decimal)
20-
* 🔥 Verify 25M+ rows in <10s, and 1B+ rows in ~5min.
20+
* 🔥 Fast! Verify 25M+ rows in <10s, and 1B+ rows in ~5min.
2121
* ♾️ Works for tables with 10s of billions of rows
2222

23-
**data-diff** splits the table into smaller segments, then checksums each
23+
data-diff can diff tables within the same database, or across different databases.
24+
25+
**Same-DB Diff**: Uses an outer-join to diff the rows as efficiently and accurately as possible.
26+
27+
Supports materializing the diff results to a database table.
28+
29+
Can also collect various extra statistics about the tables.
30+
31+
**Cross-DB Diff**: Employs a divide and conquer algorithm based on hashing, optimized for few changes.
32+
33+
data-diff splits the table into smaller segments, then checksums each
2434
segment in both databases. When the checksums for a segment aren't equal, it
2535
will further divide that segment into yet smaller segments, checksumming those
2636
until it gets to the differing row(s). See [Technical Explanation][tech-explain] for more
@@ -69,8 +79,8 @@ better than MySQL.
6979
may span a half-dozen systems, without verifying each intermediate datastore
7080
it's extremely difficult to track down where a row got lost.
7181
* **Detecting hard deletes for an `updated_at`-based pipeline**. If you're
72-
copying data to your warehouse based on an `updated_at`-style column, then
73-
you'll miss hard-deletes that **data-diff** can find for you.
82+
copying data to your warehouse based on an `updated_at`-style column, data-diff
83+
can find any hard-deletes that you might have missed.
7484
* **Make your replication self-healing.** You can use **data-diff** to
7585
self-heal by using the diff output to write/update rows in the target
7686
database.
@@ -128,9 +138,9 @@ $ data-diff \
128138
| PostgreSQL >=10 | `postgresql://<user>:<password>@<host>:5432/<database>` | 💚 |
129139
| MySQL | `mysql://<user>:<password>@<hostname>:5432/<database>` | 💚 |
130140
| Snowflake | `"snowflake://<user>[:<password>]@<account>/<database>/<SCHEMA>?warehouse=<WAREHOUSE>&role=<role>[&authenticator=externalbrowser]"` | 💚 |
141+
| BigQuery | `bigquery://<project>/<dataset>` | 💚 |
142+
| Redshift | `redshift://<username>:<password>@<hostname>:5439/<database>` | 💚 |
131143
| Oracle | `oracle://<username>:<password>@<hostname>/database` | 💛 |
132-
| BigQuery | `bigquery://<project>/<dataset>` | 💛 |
133-
| Redshift | `redshift://<username>:<password>@<hostname>:5439/<database>` | 💛 |
134144
| Presto | `presto://<username>:<password>@<hostname>:8080/<database>` | 💛 |
135145
| Databricks | `databricks://<http_path>:<access_token>@<server_hostname>/<catalog>/<schema>` | 💛 |
136146
| Trino | `trino://<username>:<password>@<hostname>:8080/<database>` | 💛 |
@@ -141,6 +151,8 @@ $ data-diff \
141151
| Pinot | | 📝 |
142152
| Druid | | 📝 |
143153
| Kafka | | 📝 |
154+
| DuckDB | | 📝 |
155+
| SQLite | | 📝 |
144156

145157
* 💚: Implemented and thoroughly tested.
146158
* 💛: Implemented, but not thoroughly tested yet.
@@ -217,7 +229,7 @@ may be case-sensitive. This is the case for the Snowflake schema and table names
217229
Options:
218230

219231
- `--help` - Show help message and exit.
220-
- `-k` or `--key-column` - Name of the primary key column
232+
- `-k` or `--key-columns` - Name of the primary key column. If none provided, default is 'id'.
221233
- `-t` or `--update-column` - Name of updated_at/last_updated column
222234
- `-c` or `--columns` - Names of extra columns to compare. Can be used more than once in the same command.
223235
Accepts a name or a pattern like in SQL.
@@ -232,12 +244,24 @@ Options:
232244
Example: `--min-age=5min` ignores rows from the last 5 minutes.
233245
Valid units: `d, days, h, hours, min, minutes, mon, months, s, seconds, w, weeks, y, years`
234246
- `--max-age` - Considers only rows younger than specified. See `--min-age`.
235-
- `--bisection-factor` - Segments per iteration. When set to 2, it performs binary search.
236-
- `--bisection-threshold` - Minimal bisection threshold. i.e. maximum size of pages to diff locally.
237247
- `-j` or `--threads` - Number of worker threads to use per database. Default=1.
238248
- `-w`, `--where` - An additional 'where' expression to restrict the search space.
239249
- `--conf`, `--run` - Specify the run and configuration from a TOML file. (see below)
240250
- `--no-tracking` - data-diff sends home anonymous usage data. Use this to disable it.
251+
- `-a`, `--algorithm` `[auto|joindiff|hashdiff]` - Force algorithm choice
252+
253+
Same-DB diff only:
254+
- `-m`, `--materialize` - Materialize the diff results into a new table in the database.
255+
If a table exists by that name, it will be replaced.
256+
Use `%t` in the name to place a timestamp.
257+
Example: `-m test_mat_%t`
258+
- `--assume-unique-key` - Skip validating the uniqueness of the key column during joindiff, which is costly in non-cloud dbs.
259+
- `--sample-exclusive-rows` - Sample several rows that only appear in one of the tables, but not the other. Use with `-s`.
260+
261+
Cross-DB diff only:
262+
- `--bisection-threshold` - Minimal size of segment to be split. Smaller segments will be downloaded and compared locally.
263+
- `--bisection-factor` - Segments per iteration. When set to 2, it performs binary search.
264+
241265

242266

243267
### How to use with a configuration file

data_diff/__init__.py

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,49 @@
1-
from typing import Tuple, Iterator, Optional, Union
1+
from typing import Sequence, Tuple, Iterator, Optional, Union
22

33
from .tracking import disable_tracking
44
from .databases.connect import connect
55
from .databases.database_types import DbKey, DbTime, DbPath
6-
from .diff_tables import TableSegment, TableDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
6+
from .diff_tables import Algorithm
7+
from .hashdiff_tables import HashDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
8+
from .joindiff_tables import JoinDiffer
9+
from .table_segment import TableSegment
710

811

912
def connect_to_table(
1013
db_info: Union[str, dict],
1114
table_name: Union[DbPath, str],
12-
key_column: str = "id",
15+
key_columns: str = ("id",),
1316
thread_count: Optional[int] = 1,
1417
**kwargs,
15-
):
18+
) -> TableSegment:
1619
"""Connects to the given database, and creates a TableSegment instance
1720
1821
Parameters:
1922
db_info: Either a URI string, or a dict of connection options.
2023
table_name: Name of the table as a string, or a tuple that signifies the path.
21-
key_column: Name of the key column
22-
thread_count: Number of threads for this connection (only if using a threadpooled implementation)
24+
key_columns: Names of the key columns
25+
thread_count: Number of threads for this connection (only if using a threadpooled db implementation)
26+
27+
See Also:
28+
:meth:`connect`
2329
"""
30+
if isinstance(key_columns, str):
31+
key_columns = (key_columns,)
2432

2533
db = connect(db_info, thread_count=thread_count)
2634

2735
if isinstance(table_name, str):
2836
table_name = db.parse_table_name(table_name)
2937

30-
return TableSegment(db, table_name, key_column, **kwargs)
38+
return TableSegment(db, table_name, key_columns, **kwargs)
3139

3240

3341
def diff_tables(
3442
table1: TableSegment,
3543
table2: TableSegment,
3644
*,
3745
# Name of the key column, which uniquely identifies each row (usually id)
38-
key_column: str = None,
46+
key_columns: Sequence[str] = None,
3947
# Name of updated column, which signals that rows changed (usually updated_at or last_update)
4048
update_column: str = None,
4149
# Extra columns to compare
@@ -46,31 +54,63 @@ def diff_tables(
4654
# Start/end update_column values, used to restrict the segment
4755
min_update: DbTime = None,
4856
max_update: DbTime = None,
49-
# Into how many segments to bisect per iteration
57+
# Algorithm
58+
algorithm: Algorithm = Algorithm.HASHDIFF,
59+
# Into how many segments to bisect per iteration (hashdiff only)
5060
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
51-
# When should we stop bisecting and compare locally (in row count)
61+
# When should we stop bisecting and compare locally (in row count; hashdiff only)
5262
bisection_threshold: int = DEFAULT_BISECTION_THRESHOLD,
5363
# Enable/disable threaded diffing. Needed to take advantage of database threads.
5464
threaded: bool = True,
5565
# Maximum size of each threadpool. None = auto. Only relevant when threaded is True.
5666
# There may be many pools, so number of actual threads can be a lot higher.
5767
max_threadpool_size: Optional[int] = 1,
58-
# Enable/disable debug prints
59-
debug: bool = False,
6068
) -> Iterator:
61-
"""Efficiently finds the diff between table1 and table2.
69+
"""Finds the diff between table1 and table2.
70+
71+
Parameters:
72+
key_columns (Tuple[str, ...]): Name of the key column, which uniquely identifies each row (usually id)
73+
update_column (str, optional): Name of updated column, which signals that rows changed.
74+
Usually updated_at or last_update. Used by `min_update` and `max_update`.
75+
extra_columns (Tuple[str, ...], optional): Extra columns to compare
76+
min_key (:data:`DbKey`, optional): Lowest key value, used to restrict the segment
77+
max_key (:data:`DbKey`, optional): Highest key value, used to restrict the segment
78+
min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment
79+
max_update (:data:`DbTime`, optional): Highest update_column value, used to restrict the segment
80+
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`)
81+
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
82+
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
83+
and compare locally. (Used when algorithm is `HASHDIFF`).
84+
threaded (bool): Enable/disable threaded diffing. Needed to take advantage of database threads.
85+
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
86+
Only relevant when `threaded` is ``True``.
87+
There may be many pools, so number of actual threads can be a lot higher.
88+
89+
Note:
90+
The following parameters are used to override the corresponding attributes of the given :class:`TableSegment` instances:
91+
`key_columns`, `update_column`, `extra_columns`, `min_key`, `max_key`.
92+
If different values are needed per table, it's possible to omit them here, and instead set
93+
them directly when creating each :class:`TableSegment`.
6294
6395
Example:
6496
>>> table1 = connect_to_table('postgresql:///', 'Rating', 'id')
6597
>>> list(diff_tables(table1, table1))
6698
[]
6799
100+
See Also:
101+
:class:`TableSegment`
102+
:class:`HashDiffer`
103+
:class:`JoinDiffer`
104+
68105
"""
106+
if isinstance(key_columns, str):
107+
key_columns = (key_columns,)
108+
69109
tables = [table1, table2]
70110
override_attrs = {
71111
k: v
72112
for k, v in dict(
73-
key_column=key_column,
113+
key_columns=key_columns,
74114
update_column=update_column,
75115
extra_columns=extra_columns,
76116
min_key=min_key,
@@ -83,11 +123,20 @@ def diff_tables(
83123

84124
segments = [t.new(**override_attrs) for t in tables] if override_attrs else tables
85125

86-
differ = TableDiffer(
87-
bisection_factor=bisection_factor,
88-
bisection_threshold=bisection_threshold,
89-
debug=debug,
90-
threaded=threaded,
91-
max_threadpool_size=max_threadpool_size,
92-
)
126+
algorithm = Algorithm(algorithm)
127+
if algorithm == Algorithm.HASHDIFF:
128+
differ = HashDiffer(
129+
bisection_factor=bisection_factor,
130+
bisection_threshold=bisection_threshold,
131+
threaded=threaded,
132+
max_threadpool_size=max_threadpool_size,
133+
)
134+
elif algorithm == Algorithm.JOINDIFF:
135+
differ = JoinDiffer(
136+
threaded=threaded,
137+
max_threadpool_size=max_threadpool_size,
138+
)
139+
else:
140+
raise ValueError(f"Unknown algorithm: {algorithm}")
141+
93142
return differ.diff_tables(*segments)

0 commit comments

Comments
 (0)