Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Join-diff (in-db) + new query builder #242

Merged
merged 35 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
75ed605
Query builder package (still incomplete)
erezsh Aug 31, 2022
70c5952
data-diff now uses new 'data_diff.queries' modules instead of 'data_d…
erezsh Sep 2, 2022
e5ace37
Join-diff implementation
erezsh Sep 8, 2022
b6170bf
Integrate joindiff into main
erezsh Sep 10, 2022
becf36c
Refactor diff_tables.TableDiffer -> hashdiff_tables.HashDiffer
erezsh Sep 14, 2022
74f31e8
Adjustments to joindiff implementation
erezsh Sep 14, 2022
686b1f7
refactor tablediffer
erezsh Sep 21, 2022
b830afc
joindiff now working for all major databases:
erezsh Sep 21, 2022
4f441f0
Fix in queries
erezsh Sep 22, 2022
de26e56
Joindiff now support tracking and bisection
erezsh Sep 22, 2022
7c7e5bd
Added diffing schemas (when same db, for mutual columns)
erezsh Sep 23, 2022
bee5479
Joindiff: Added Interpreter; Fixed exclusive_rows to use temp_table i…
erezsh Sep 23, 2022
4c80e5d
Tracking: Errors now provide more info, with truncated values
erezsh Sep 30, 2022
179ce54
Better docs and docstrings
erezsh Sep 30, 2022
073333c
Refactor joindiff
erezsh Oct 3, 2022
c1e171d
Queries: Derive schemas (WIP)
erezsh Oct 3, 2022
da6c2df
Queries: DDL initial (drop/create table, insert)
erezsh Oct 4, 2022
9f404a0
Queries: Fix in .type
erezsh Oct 5, 2022
5cd424d
Joindiff: Added support to materialize results as tables (-m)
erezsh Oct 3, 2022
733972a
Queries: Ran black
erezsh Oct 4, 2022
00ee415
Joindiff: Ran black
erezsh Oct 4, 2022
78e4c84
Many fixes; Added materialize tests;
erezsh Oct 5, 2022
b18dbcb
black
erezsh Oct 5, 2022
3a09a77
joindiff: docs, refactor
erezsh Oct 6, 2022
978259c
Merge branch 'master' into joindiff
erezsh Oct 6, 2022
90cbfb6
Queries fix
erezsh Oct 6, 2022
ad48f5d
Composite key - initial (WIP);
erezsh Oct 6, 2022
377b4a7
Fixed interactive mode and explain
erezsh Oct 7, 2022
4c16bac
Update README
erezsh Oct 7, 2022
472f422
Added --sample-exclusive-rows switch
erezsh Oct 7, 2022
abaabe8
README: Updated supported database list
erezsh Oct 7, 2022
245aeb6
Updated docs; Ran black
erezsh Oct 7, 2022
e8965fd
Joindiff: Fix stats collections
erezsh Oct 7, 2022
47b9faa
Cleanup and minor fixes (pylint pass)
erezsh Oct 8, 2022
4772f06
Merge branch 'master' into joindiff
erezsh Oct 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@ rows across two different databases.
* 🔍 Outputs [diff of rows](#example-command-and-output) in detail
* 🚨 Simple CLI/API to create monitoring and alerts
* 🔁 Bridges column types of different formats and levels of precision (e.g. Double ⇆ Float ⇆ Decimal)
* 🔥 Verify 25M+ rows in <10s, and 1B+ rows in ~5min.
* 🔥 Fast! Verify 25M+ rows in <10s, and 1B+ rows in ~5min.
* ♾️ Works for tables with 10s of billions of rows

**data-diff** splits the table into smaller segments, then checksums each
data-diff can diff tables within the same database, or across different databases.

**Same-DB Diff**: Uses an outer-join to diff the rows as efficiently and accurately as possible.

Supports materializing the diff results to a database table.

Can also collect various extra statistics about the tables.

**Cross-DB Diff**: Employs a divide and conquer algorithm based on hashing, optimized for few changes.

data-diff splits the table into smaller segments, then checksums each
segment in both databases. When the checksums for a segment aren't equal, it
will further divide that segment into yet smaller segments, checksumming those
until it gets to the differing row(s). See [Technical Explanation][tech-explain] for more
Expand Down Expand Up @@ -69,8 +79,8 @@ better than MySQL.
may span a half-dozen systems, without verifying each intermediate datastore
it's extremely difficult to track down where a row got lost.
* **Detecting hard deletes for an `updated_at`-based pipeline**. If you're
copying data to your warehouse based on an `updated_at`-style column, then
you'll miss hard-deletes that **data-diff** can find for you.
copying data to your warehouse based on an `updated_at`-style column, data-diff
can find any hard-deletes that you might have missed.
* **Make your replication self-healing.** You can use **data-diff** to
self-heal by using the diff output to write/update rows in the target
database.
Expand Down Expand Up @@ -128,9 +138,9 @@ $ data-diff \
| PostgreSQL >=10 | `postgresql://<user>:<password>@<host>:5432/<database>` | 💚 |
| MySQL | `mysql://<user>:<password>@<hostname>:5432/<database>` | 💚 |
| Snowflake | `"snowflake://<user>[:<password>]@<account>/<database>/<SCHEMA>?warehouse=<WAREHOUSE>&role=<role>[&authenticator=externalbrowser]"` | 💚 |
| BigQuery | `bigquery://<project>/<dataset>` | 💚 |
| Redshift | `redshift://<username>:<password>@<hostname>:5439/<database>` | 💚 |
| Oracle | `oracle://<username>:<password>@<hostname>/database` | 💛 |
| BigQuery | `bigquery://<project>/<dataset>` | 💛 |
| Redshift | `redshift://<username>:<password>@<hostname>:5439/<database>` | 💛 |
| Presto | `presto://<username>:<password>@<hostname>:8080/<database>` | 💛 |
| Databricks | `databricks://<http_path>:<access_token>@<server_hostname>/<catalog>/<schema>` | 💛 |
| Trino | `trino://<username>:<password>@<hostname>:8080/<database>` | 💛 |
Expand All @@ -141,6 +151,8 @@ $ data-diff \
| Pinot | | 📝 |
| Druid | | 📝 |
| Kafka | | 📝 |
| DuckDB | | 📝 |
| SQLite | | 📝 |

* 💚: Implemented and thoroughly tested.
* 💛: Implemented, but not thoroughly tested yet.
Expand Down Expand Up @@ -217,7 +229,7 @@ may be case-sensitive. This is the case for the Snowflake schema and table names
Options:

- `--help` - Show help message and exit.
- `-k` or `--key-column` - Name of the primary key column
- `-k` or `--key-columns` - Name of the primary key column. If none provided, default is 'id'.
- `-t` or `--update-column` - Name of updated_at/last_updated column
- `-c` or `--columns` - Names of extra columns to compare. Can be used more than once in the same command.
Accepts a name or a pattern like in SQL.
Expand All @@ -232,12 +244,24 @@ Options:
Example: `--min-age=5min` ignores rows from the last 5 minutes.
Valid units: `d, days, h, hours, min, minutes, mon, months, s, seconds, w, weeks, y, years`
- `--max-age` - Considers only rows younger than specified. See `--min-age`.
- `--bisection-factor` - Segments per iteration. When set to 2, it performs binary search.
- `--bisection-threshold` - Minimal bisection threshold. i.e. maximum size of pages to diff locally.
- `-j` or `--threads` - Number of worker threads to use per database. Default=1.
- `-w`, `--where` - An additional 'where' expression to restrict the search space.
- `--conf`, `--run` - Specify the run and configuration from a TOML file. (see below)
- `--no-tracking` - data-diff sends home anonymous usage data. Use this to disable it.
- `-a`, `--algorithm` `[auto|joindiff|hashdiff]` - Force algorithm choice

Same-DB diff only:
- `-m`, `--materialize` - Materialize the diff results into a new table in the database.
If a table exists by that name, it will be replaced.
Use `%t` in the name to place a timestamp.
Example: `-m test_mat_%t`
- `--assume-unique-key` - Skip validating the uniqueness of the key column during joindiff, which is costly in non-cloud dbs.
- `--sample-exclusive-rows` - Sample several rows that only appear in one of the tables, but not the other. Use with `-s`.

Cross-DB diff only:
- `--bisection-threshold` - Minimal size of segment to be split. Smaller segments will be downloaded and compared locally.
- `--bisection-factor` - Segments per iteration. When set to 2, it performs binary search.



### How to use with a configuration file
Expand Down
91 changes: 70 additions & 21 deletions data_diff/__init__.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,49 @@
from typing import Tuple, Iterator, Optional, Union
from typing import Sequence, Tuple, Iterator, Optional, Union

from .tracking import disable_tracking
from .databases.connect import connect
from .databases.database_types import DbKey, DbTime, DbPath
from .diff_tables import TableSegment, TableDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
from .diff_tables import Algorithm
from .hashdiff_tables import HashDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
from .joindiff_tables import JoinDiffer
from .table_segment import TableSegment


def connect_to_table(
db_info: Union[str, dict],
table_name: Union[DbPath, str],
key_column: str = "id",
key_columns: str = ("id",),
thread_count: Optional[int] = 1,
**kwargs,
):
) -> TableSegment:
"""Connects to the given database, and creates a TableSegment instance

Parameters:
db_info: Either a URI string, or a dict of connection options.
table_name: Name of the table as a string, or a tuple that signifies the path.
key_column: Name of the key column
thread_count: Number of threads for this connection (only if using a threadpooled implementation)
key_columns: Names of the key columns
thread_count: Number of threads for this connection (only if using a threadpooled db implementation)

See Also:
:meth:`connect`
"""
if isinstance(key_columns, str):
key_columns = (key_columns,)

db = connect(db_info, thread_count=thread_count)

if isinstance(table_name, str):
table_name = db.parse_table_name(table_name)

return TableSegment(db, table_name, key_column, **kwargs)
return TableSegment(db, table_name, key_columns, **kwargs)


def diff_tables(
table1: TableSegment,
table2: TableSegment,
*,
# Name of the key column, which uniquely identifies each row (usually id)
key_column: str = None,
key_columns: Sequence[str] = None,
# Name of updated column, which signals that rows changed (usually updated_at or last_update)
update_column: str = None,
# Extra columns to compare
Expand All @@ -46,31 +54,63 @@ def diff_tables(
# Start/end update_column values, used to restrict the segment
min_update: DbTime = None,
max_update: DbTime = None,
# Into how many segments to bisect per iteration
# Algorithm
algorithm: Algorithm = Algorithm.HASHDIFF,
# Into how many segments to bisect per iteration (hashdiff only)
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
# When should we stop bisecting and compare locally (in row count)
# When should we stop bisecting and compare locally (in row count; hashdiff only)
bisection_threshold: int = DEFAULT_BISECTION_THRESHOLD,
# Enable/disable threaded diffing. Needed to take advantage of database threads.
threaded: bool = True,
# Maximum size of each threadpool. None = auto. Only relevant when threaded is True.
# There may be many pools, so number of actual threads can be a lot higher.
max_threadpool_size: Optional[int] = 1,
# Enable/disable debug prints
debug: bool = False,
) -> Iterator:
"""Efficiently finds the diff between table1 and table2.
"""Finds the diff between table1 and table2.

Parameters:
key_columns (Tuple[str, ...]): Name of the key column, which uniquely identifies each row (usually id)
update_column (str, optional): Name of updated column, which signals that rows changed.
Usually updated_at or last_update. Used by `min_update` and `max_update`.
extra_columns (Tuple[str, ...], optional): Extra columns to compare
min_key (:data:`DbKey`, optional): Lowest key value, used to restrict the segment
max_key (:data:`DbKey`, optional): Highest key value, used to restrict the segment
min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment
max_update (:data:`DbTime`, optional): Highest update_column value, used to restrict the segment
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`)
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
and compare locally. (Used when algorithm is `HASHDIFF`).
threaded (bool): Enable/disable threaded diffing. Needed to take advantage of database threads.
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
Only relevant when `threaded` is ``True``.
There may be many pools, so number of actual threads can be a lot higher.

Note:
The following parameters are used to override the corresponding attributes of the given :class:`TableSegment` instances:
`key_columns`, `update_column`, `extra_columns`, `min_key`, `max_key`.
If different values are needed per table, it's possible to omit them here, and instead set
them directly when creating each :class:`TableSegment`.

Example:
>>> table1 = connect_to_table('postgresql:///', 'Rating', 'id')
>>> list(diff_tables(table1, table1))
[]

See Also:
:class:`TableSegment`
:class:`HashDiffer`
:class:`JoinDiffer`

"""
if isinstance(key_columns, str):
key_columns = (key_columns,)

tables = [table1, table2]
override_attrs = {
k: v
for k, v in dict(
key_column=key_column,
key_columns=key_columns,
update_column=update_column,
extra_columns=extra_columns,
min_key=min_key,
Expand All @@ -83,11 +123,20 @@ def diff_tables(

segments = [t.new(**override_attrs) for t in tables] if override_attrs else tables

differ = TableDiffer(
bisection_factor=bisection_factor,
bisection_threshold=bisection_threshold,
debug=debug,
threaded=threaded,
max_threadpool_size=max_threadpool_size,
)
algorithm = Algorithm(algorithm)
if algorithm == Algorithm.HASHDIFF:
differ = HashDiffer(
bisection_factor=bisection_factor,
bisection_threshold=bisection_threshold,
threaded=threaded,
max_threadpool_size=max_threadpool_size,
)
elif algorithm == Algorithm.JOINDIFF:
differ = JoinDiffer(
threaded=threaded,
max_threadpool_size=max_threadpool_size,
)
else:
raise ValueError(f"Unknown algorithm: {algorithm}")

return differ.diff_tables(*segments)
Loading