@@ -50,6 +50,7 @@ class Stats:
50
50
def sample (table ):
51
51
return table .order_by (Random ()).limit (10 )
52
52
53
+
53
54
def create_temp_table (c : Compiler , name : str , expr : Expr ):
54
55
db = c .database
55
56
if isinstance (db , BigQuery ):
@@ -67,12 +68,13 @@ def drop_table(db, name: DbPath):
67
68
t = TablePath (name )
68
69
db .query (t .drop (if_exists = True ))
69
70
71
+
70
72
def append_to_table (name : DbPath , expr : Expr ):
71
73
t = TablePath (name , expr .schema )
72
74
yield t .create (if_not_exists = True ) # uses expr.schema
73
- yield ' commit'
75
+ yield " commit"
74
76
yield t .insert_expr (expr )
75
- yield ' commit'
77
+ yield " commit"
76
78
77
79
78
80
def bool_to_int (x ):
@@ -95,10 +97,7 @@ def _outerjoin(db: Database, a: ITable, b: ITable, keys1: List[str], keys2: List
95
97
r = rightjoin (a , b ).on (* on ).select (is_exclusive_a = False , is_exclusive_b = is_exclusive_b , ** select_fields )
96
98
return l .union (r )
97
99
98
- return (
99
- outerjoin (a , b ).on (* on )
100
- .select (is_exclusive_a = is_exclusive_a , is_exclusive_b = is_exclusive_b , ** select_fields )
101
- )
100
+ return outerjoin (a , b ).on (* on ).select (is_exclusive_a = is_exclusive_a , is_exclusive_b = is_exclusive_b , ** select_fields )
102
101
103
102
104
103
def _slice_tuple (t , * sizes ):
@@ -115,7 +114,6 @@ def json_friendly_value(v):
115
114
return v
116
115
117
116
118
-
119
117
@dataclass
120
118
class JoinDiffer (TableDiffer ):
121
119
"""Finds the diff between two SQL tables in the same database, using JOINs.
@@ -143,11 +141,10 @@ def _diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult
143
141
144
142
table1 , table2 = self ._threaded_call ("with_schema" , [table1 , table2 ])
145
143
146
-
147
144
bg_funcs = [partial (self ._test_duplicate_keys , table1 , table2 )] if self .validate_unique_key else []
148
145
if self .materialize_to_table :
149
146
drop_table (db , self .materialize_to_table )
150
- db .query (' COMMIT' )
147
+ db .query (" COMMIT" )
151
148
152
149
with self ._run_in_background (* bg_funcs ):
153
150
@@ -158,7 +155,16 @@ def _diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult
158
155
yield from self ._bisect_and_diff_tables (table1 , table2 )
159
156
logger .info ("Diffing complete" )
160
157
161
- def _diff_segments (self , ti : ThreadedYielder , table1 : TableSegment , table2 : TableSegment , max_rows : int , level = 0 , segment_index = None , segment_count = None ):
158
+ def _diff_segments (
159
+ self ,
160
+ ti : ThreadedYielder ,
161
+ table1 : TableSegment ,
162
+ table2 : TableSegment ,
163
+ max_rows : int ,
164
+ level = 0 ,
165
+ segment_index = None ,
166
+ segment_count = None ,
167
+ ):
162
168
assert table1 .database is table2 .database
163
169
164
170
if segment_index or table1 .min_key or max_rows :
@@ -172,13 +178,15 @@ def _diff_segments(self, ti: ThreadedYielder, table1: TableSegment, table2: Tabl
172
178
diff_rows , a_cols , b_cols , is_diff_cols = self ._create_outer_join (table1 , table2 )
173
179
174
180
with self ._run_in_background (
175
- partial (self ._collect_stats , 1 , table1 ),
176
- partial (self ._collect_stats , 2 , table2 ),
177
- partial (self ._test_null_keys , table1 , table2 ),
178
- partial (self ._sample_and_count_exclusive , db , diff_rows , a_cols , b_cols ),
179
- partial (self ._count_diff_per_column , db , diff_rows , list (a_cols ), is_diff_cols ),
180
- partial (self ._materialize_diff , db , diff_rows , segment_index = segment_index ) if self .materialize_to_table else None ,
181
- ):
181
+ partial (self ._collect_stats , 1 , table1 ),
182
+ partial (self ._collect_stats , 2 , table2 ),
183
+ partial (self ._test_null_keys , table1 , table2 ),
184
+ partial (self ._sample_and_count_exclusive , db , diff_rows , a_cols , b_cols ),
185
+ partial (self ._count_diff_per_column , db , diff_rows , list (a_cols ), is_diff_cols ),
186
+ partial (self ._materialize_diff , db , diff_rows , segment_index = segment_index )
187
+ if self .materialize_to_table
188
+ else None ,
189
+ ):
182
190
183
191
logger .debug ("Querying for different rows" )
184
192
for is_xa , is_xb , * x in db .query (diff_rows , list ):
@@ -218,7 +226,6 @@ def _test_null_keys(self, table1, table2):
218
226
if nulls :
219
227
raise ValueError (f"NULL values in one or more primary keys" )
220
228
221
-
222
229
def _collect_stats (self , i , table ):
223
230
logger .info (f"Collecting stats for table #{ i } " )
224
231
db = table .database
@@ -265,31 +272,27 @@ def _create_outer_join(self, table1, table2):
265
272
a = table1 ._make_select ()
266
273
b = table2 ._make_select ()
267
274
268
- is_diff_cols = {
269
- f"is_diff_{ c1 } " : bool_to_int (a [c1 ].is_distinct_from (b [c2 ])) for c1 , c2 in safezip (cols1 , cols2 )
270
- }
275
+ is_diff_cols = {f"is_diff_{ c1 } " : bool_to_int (a [c1 ].is_distinct_from (b [c2 ])) for c1 , c2 in safezip (cols1 , cols2 )}
271
276
272
277
a_cols = {f"table1_{ c } " : NormalizeAsString (a [c ]) for c in cols1 }
273
278
b_cols = {f"table2_{ c } " : NormalizeAsString (b [c ]) for c in cols2 }
274
279
275
- diff_rows = (
276
- _outerjoin (db , a , b , keys1 , keys2 , {** is_diff_cols , ** a_cols , ** b_cols })
277
- .where (or_ (this [c ] == 1 for c in is_diff_cols ))
280
+ diff_rows = _outerjoin (db , a , b , keys1 , keys2 , {** is_diff_cols , ** a_cols , ** b_cols }).where (
281
+ or_ (this [c ] == 1 for c in is_diff_cols )
278
282
)
279
283
return diff_rows , a_cols , b_cols , is_diff_cols
280
284
281
-
282
285
def _count_diff_per_column (self , db , diff_rows , cols , is_diff_cols ):
283
286
logger .info ("Counting differences per column" )
284
287
is_diff_cols_counts = db .query (diff_rows .select (sum_ (this [c ]) for c in is_diff_cols ), tuple )
285
288
diff_counts = {}
286
289
for name , count in safezip (cols , is_diff_cols_counts ):
287
290
diff_counts [name ] = diff_counts .get (name , 0 ) + (count or 0 )
288
- self .stats [' diff_counts' ] = diff_counts
291
+ self .stats [" diff_counts" ] = diff_counts
289
292
290
293
def _sample_and_count_exclusive (self , db , diff_rows , a_cols , b_cols ):
291
294
if isinstance (db , Oracle ):
292
- exclusive_rows_query = diff_rows .where ((this .is_exclusive_a == 1 ) | (this .is_exclusive_b == 1 ))
295
+ exclusive_rows_query = diff_rows .where ((this .is_exclusive_a == 1 ) | (this .is_exclusive_b == 1 ))
293
296
else :
294
297
exclusive_rows_query = diff_rows .where (this .is_exclusive_a | this .is_exclusive_b )
295
298
@@ -299,16 +302,17 @@ def _sample_and_count_exclusive(self, db, diff_rows, a_cols, b_cols):
299
302
return
300
303
301
304
logger .info ("Counting and sampling exclusive rows" )
305
+
302
306
def exclusive_rows (expr ):
303
307
c = Compiler (db )
304
308
name = c .new_unique_table_name ("temp_table" )
305
309
yield create_temp_table (c , name , expr .limit (self .write_limit ))
306
310
exclusive_rows = table (name , schema = expr .source_table .schema )
307
311
308
312
count = yield exclusive_rows .count ()
309
- self .stats ["exclusive_count" ] = self .stats .get (' exclusive_count' , 0 ) + count [0 ][0 ]
313
+ self .stats ["exclusive_count" ] = self .stats .get (" exclusive_count" , 0 ) + count [0 ][0 ]
310
314
sample_rows = yield sample (exclusive_rows .select (* this [list (a_cols )], * this [list (b_cols )]))
311
- self .stats ["exclusive_sample" ] = self .stats .get (' exclusive_sample' , []) + sample_rows
315
+ self .stats ["exclusive_sample" ] = self .stats .get (" exclusive_sample" , []) + sample_rows
312
316
313
317
# Only drops if create table succeeded (meaning, the table didn't already exist)
314
318
yield f"drop table { c .quote (name )} "
@@ -321,4 +325,3 @@ def _materialize_diff(self, db, diff_rows, segment_index=None):
321
325
322
326
db .query (append_to_table (self .materialize_to_table , diff_rows .limit (self .write_limit )))
323
327
logger .info (f"Materialized diff to table '{ '.' .join (self .materialize_to_table )} '." )
324
-
0 commit comments