From 4db13dd51e5185c815b8f6ff5ef769789d4a33fe Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Thu, 16 Nov 2017 02:14:17 +0300 Subject: [PATCH 1/8] test script --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 5da5e9c..8b57571 100644 --- a/README.md +++ b/README.md @@ -861,4 +861,25 @@ for file in $(ls *.csv|sort|head -n 100); do rm -f ontime.csv i=$((i+1)) done + +#!/bin/bash +files_to_import_num=3 +i=1 +for file in $(ls /mnt/nas/work/ontime/*.csv|sort|head -n $files_to_import_num); do + echo "$i. Prepare $file" + rm -f ontime + ln -s $file ontime + echo "$i. Import $file" + time mysqlimport \ + --ignore-lines=1 \ + --fields-terminated-by=, \ + --fields-enclosed-by=\" \ + --local \ + -u root \ + airline ontime + rm -f ontime + i=$((i+1)) +done + + ``` From 670047eb69efadb35982bbb13f654456afa00139 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Thu, 16 Nov 2017 11:40:12 +0300 Subject: [PATCH 2/8] speed report --- src/reader/mysqlreader.py | 45 ++++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index f5589c5..92bc995 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -69,15 +69,30 @@ def __init__( resume_stream=self.resume_stream, ) + def speed_report(self, start, rows_num, now=None): + # time to calc stat + if now is None: + now = time.time() + window_size = now - start + rows_per_sec = rows_num / window_size + logging.info( + 'rows_per_sec:%f for last %d rows %f sec', + rows_per_sec, + rows_num, + window_size + ) + def read(self): start_timestamp = int(time.time()) # fetch events try: - prev_stat_time = time.time() - rows_num = 0 - while True: logging.debug('Check events in binlog stream') + + start = time.time() + rows_num = 0 + + # fetch available events from MySQL for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): if self.subscribers('WriteRowsEvent'): @@ -100,22 +115,22 @@ def read(self): event.table = mysql_event.table event.row = row['values'] self.notify('WriteRowsEvent.EachRow', event=event) + + if rows_num % 100000 == 0: + # speed report each N rows + self.speed_report(start, rows_num) else: # skip non-insert events pass - now = time.time() - if now > prev_stat_time + 60: - # time to calc stat - window_size = now - prev_stat_time - rows_per_sec = rows_num / window_size - logging.info( - 'rows_per_sec:%f for last %f sec', - rows_per_sec, - window_size - ) - prev_stat_time = now - rows_num = 0 + # all events fetched (or none of them available) + + if rows_num > 0: + # we have some rows processed + now = time.time() + if now > start + 60: + # and processing was long enough + self.speed_report(start, rows_num, now) if not self.blocking: break # while True From 3afa7a8445e77c5a002beb488468788181ee6f26 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Thu, 16 Nov 2017 11:48:03 +0300 Subject: [PATCH 3/8] speed report --- src/reader/mysqlreader.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index 92bc995..9dc3f2a 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -91,6 +91,7 @@ def read(self): start = time.time() rows_num = 0 + rows_num_since_interim_speed_report = 0 # fetch available events from MySQL for mysql_event in self.binlog_stream: @@ -99,6 +100,7 @@ def read(self): self.write_rows_event_num += 1 logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) rows_num += len(mysql_event.rows) + rows_num_since_interim_speed_report += len(mysql_event.rows) event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -110,15 +112,17 @@ def read(self): logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) for row in mysql_event.rows: rows_num += 1 + rows_num_since_interim_speed_report += 1 event = Event() event.schema = mysql_event.schema event.table = mysql_event.table event.row = row['values'] self.notify('WriteRowsEvent.EachRow', event=event) - if rows_num % 100000 == 0: + if rows_num_since_interim_speed_report >= 100000: # speed report each N rows self.speed_report(start, rows_num) + rows_num_since_interim_speed_report = 0 else: # skip non-insert events pass From 6675cd1dc230bcf02fe35bfe27343dd565ca2a0c Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Mon, 20 Nov 2017 13:31:00 +0300 Subject: [PATCH 4/8] min and max rows per event --- run_ontime.sh | 7 +++++-- src/reader/mysqlreader.py | 42 +++++++++++++++++++++++++++++---------- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/run_ontime.sh b/run_ontime.sh index 520ad4a..5f814e3 100755 --- a/run_ontime.sh +++ b/run_ontime.sh @@ -2,7 +2,10 @@ sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" -python3.6 main.py ${*:1} \ +PYTHON=python3.6 +PYTHON=/home/user/pypy3.5-5.9-beta-linux_x86_64-portable/bin/pypy + +$PYTHON main.py ${*:1} \ --src-resume \ --src-wait \ --nice-pause=1 \ @@ -13,7 +16,7 @@ python3.6 main.py ${*:1} \ --csvpool \ --csvpool-file-path-prefix=qwe_ \ --mempool-max-flush-interval=60 \ - --mempool-max-events-num=100000 + --mempool-max-events-num=1000 # --mempool # --mempool-max-events-num=3 diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index 9dc3f2a..351dae7 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -69,17 +69,21 @@ def __init__( resume_stream=self.resume_stream, ) - def speed_report(self, start, rows_num, now=None): + def performance_report(self, start, rows_num, rows_per_event_min=-1, rows_per_event_max=-1, now=None): # time to calc stat + if now is None: now = time.time() + window_size = now - start rows_per_sec = rows_num / window_size logging.info( - 'rows_per_sec:%f for last %d rows %f sec', + 'rows_per_sec:%f rows_per_event_min: %d rows_per_event_max: %d for last %d rows %f sec', rows_per_sec, + rows_per_event_min, + rows_per_event_max, rows_num, - window_size + window_size, ) def read(self): @@ -91,16 +95,27 @@ def read(self): start = time.time() rows_num = 0 - rows_num_since_interim_speed_report = 0 + rows_num_since_interim_performance_report = 0 + rows_per_event = 0 + rows_per_event_min = 0 + rows_per_event_max = 0 + # fetch available events from MySQL for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): + + rows_per_event = len(mysql_event.rows) + if rows_per_event < rows_per_event_min: + rows_per_event_min = rows_per_event + if rows_per_event > rows_per_event_max: + rows_per_event_max = rows_per_event + if self.subscribers('WriteRowsEvent'): self.write_rows_event_num += 1 logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) rows_num += len(mysql_event.rows) - rows_num_since_interim_speed_report += len(mysql_event.rows) + rows_num_since_interim_performance_report += len(mysql_event.rows) event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -112,17 +127,24 @@ def read(self): logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) for row in mysql_event.rows: rows_num += 1 - rows_num_since_interim_speed_report += 1 + rows_num_since_interim_performance_report += 1 event = Event() event.schema = mysql_event.schema event.table = mysql_event.table event.row = row['values'] self.notify('WriteRowsEvent.EachRow', event=event) - if rows_num_since_interim_speed_report >= 100000: + if rows_num_since_interim_performance_report >= 100000: # speed report each N rows - self.speed_report(start, rows_num) - rows_num_since_interim_speed_report = 0 + self.performance_report( + start=start, + rows_num=rows_num, + rows_per_event_min=rows_per_event_min, + rows_per_event_max=rows_per_event_max, + ) + rows_num_since_interim_performance_report = 0 + rows_per_event_min = 0 + rows_per_event_max = 0 else: # skip non-insert events pass @@ -134,7 +156,7 @@ def read(self): now = time.time() if now > start + 60: # and processing was long enough - self.speed_report(start, rows_num, now) + self.performance_report(start, rows_num, now) if not self.blocking: break # while True From fcbb052bbec7df334a56f2359e21a152007d0bde Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Mon, 20 Nov 2017 13:44:57 +0300 Subject: [PATCH 5/8] min/max and windows size --- src/pool/bbpool.py | 19 +++++++++------- src/reader/mysqlreader.py | 48 +++++++++++++++++++++------------------ 2 files changed, 37 insertions(+), 30 deletions(-) diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 82cda55..47b9295 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -159,14 +159,17 @@ def rotate_belt(self, belt_index, flush=False): # have previous time - meaning this is at least second rotate # can calculate belt speed window_size = now - self.prev_time - buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size - items_per_sec = (self.items_count - self.prev_items_count) / window_size - logging.info( - 'buckets_per_sec:%f items_per_sec:%f for last %d sec', - buckets_per_sec, - items_per_sec, - window_size - ) + if window_size > 0: + buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size + items_per_sec = (self.items_count - self.prev_items_count) / window_size + logging.info( + 'PERF - buckets_per_sec:%f items_per_sec:%f for last %d sec', + buckets_per_sec, + items_per_sec, + window_size + ) + else: + logging.info("PERF - buckets window size=0 can not calc performance for this window") self.prev_time = now self.prev_buckets_count = self.buckets_count diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index 351dae7..4f2ff4d 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -69,22 +69,26 @@ def __init__( resume_stream=self.resume_stream, ) - def performance_report(self, start, rows_num, rows_per_event_min=-1, rows_per_event_max=-1, now=None): + def performance_report(self, start, rows_num, rows_num_per_event_min=None, rows_num_per_event_max=None, now=None): # time to calc stat if now is None: now = time.time() window_size = now - start - rows_per_sec = rows_num / window_size - logging.info( - 'rows_per_sec:%f rows_per_event_min: %d rows_per_event_max: %d for last %d rows %f sec', - rows_per_sec, - rows_per_event_min, - rows_per_event_max, - rows_num, - window_size, - ) + if window_size > 0: + rows_per_sec = rows_num / window_size + logging.info( + 'PERF - rows_per_sec:%f rows_per_event_min: %d rows_per_event_max: %d for last %d rows %f sec', + rows_per_sec, + rows_num_per_event_min if rows_num_per_event_min is not None else -1, + rows_num_per_event_max if rows_num_per_event_max is not None else -1, + rows_num, + window_size, + ) + else: + logging.info("PERF - rows window size=0 can not calc performance for this window") + def read(self): start_timestamp = int(time.time()) @@ -96,20 +100,20 @@ def read(self): start = time.time() rows_num = 0 rows_num_since_interim_performance_report = 0 - rows_per_event = 0 - rows_per_event_min = 0 - rows_per_event_max = 0 + rows_num_per_event = 0 + rows_num_per_event_min = None + rows_num_per_event_max = None # fetch available events from MySQL for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): - rows_per_event = len(mysql_event.rows) - if rows_per_event < rows_per_event_min: - rows_per_event_min = rows_per_event - if rows_per_event > rows_per_event_max: - rows_per_event_max = rows_per_event + rows_num_per_event = len(mysql_event.rows) + if (rows_num_per_event_min is None) or (rows_num_per_event < rows_num_per_event_min): + rows_num_per_event_min = rows_num_per_event + if (rows_num_per_event_max is None) or (rows_num_per_event > rows_num_per_event_max): + rows_num_per_event_max = rows_num_per_event if self.subscribers('WriteRowsEvent'): self.write_rows_event_num += 1 @@ -139,12 +143,12 @@ def read(self): self.performance_report( start=start, rows_num=rows_num, - rows_per_event_min=rows_per_event_min, - rows_per_event_max=rows_per_event_max, + rows_num_per_event_min=rows_num_per_event_min, + rows_num_per_event_max=rows_num_per_event_max, ) rows_num_since_interim_performance_report = 0 - rows_per_event_min = 0 - rows_per_event_max = 0 + rows_num_per_event_min = None + rows_num_per_event_max = None else: # skip non-insert events pass From 7fdd9e2d7684c2454e2a570946dd7ebf55afb591 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Mon, 20 Nov 2017 14:04:55 +0300 Subject: [PATCH 6/8] float timestamp --- src/pool/bbpool.py | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 47b9295..ab83770 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -31,8 +31,8 @@ class BBPool(Pool): # 'key.2': UNIX TIMESTAMP } - buckets_count = 0 - items_count = 0; + buckets_num_total = 0 + items_num_total = 0; prev_time = None prev_buckets_count = 0 @@ -98,7 +98,7 @@ def flush(self, key=None): def rotate_belt(self, belt_index, flush=False): """Try to rotate belt""" - now = int(time.time()) + now = time.time() if flush: # explicit flush requested @@ -129,25 +129,25 @@ def rotate_belt(self, belt_index, flush=False): # too many buckets on the belt # time to rotate belt and flush the most-right-bucket - buckets_num = len(self.belts[belt_index]) - last_bucket_size = len(self.belts[belt_index][buckets_num-1]) + buckets_on_belt_num = len(self.belts[belt_index]) + most_right_bucket_size = len(self.belts[belt_index][buckets_on_belt_num-1]) - self.buckets_count += 1 - self.items_count += last_bucket_size + self.buckets_num_total += 1 + self.items_num_total += most_right_bucket_size - logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d', - now, - self.buckets_count, - self.items_count, - str(belt_index), - rotate_reason, - buckets_num, - last_bucket_size, - len(self.belts) - ) + logging.info('rot now:%f bktttl:%d bktitemsttl: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d', + now, + self.buckets_num_total, + self.items_num_total, + str(belt_index), + rotate_reason, + buckets_on_belt_num, + most_right_bucket_size, + len(self.belts), + ) # time to flush data for specified key - self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)]) + self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)]) writer = self.writer_builder.get() writer.insert(self.belts[belt_index].pop()) writer.close() @@ -160,8 +160,8 @@ def rotate_belt(self, belt_index, flush=False): # can calculate belt speed window_size = now - self.prev_time if window_size > 0: - buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size - items_per_sec = (self.items_count - self.prev_items_count) / window_size + buckets_per_sec = (self.buckets_num_total - self.prev_buckets_count) / window_size + items_per_sec = (self.items_num_total - self.prev_items_count) / window_size logging.info( 'PERF - buckets_per_sec:%f items_per_sec:%f for last %d sec', buckets_per_sec, @@ -172,8 +172,8 @@ def rotate_belt(self, belt_index, flush=False): logging.info("PERF - buckets window size=0 can not calc performance for this window") self.prev_time = now - self.prev_buckets_count = self.buckets_count - self.prev_items_count = self.items_count + self.prev_buckets_count = self.buckets_num_total + self.prev_items_count = self.items_num_total # belt rotated return True From 2ca1ce680b67b350d85192f796287eb930446e04 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Mon, 20 Nov 2017 14:36:53 +0300 Subject: [PATCH 7/8] minor logging --- run_ontime.sh | 1 + src/writer/chcsvwriter.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/run_ontime.sh b/run_ontime.sh index 5f814e3..837ed97 100755 --- a/run_ontime.sh +++ b/run_ontime.sh @@ -10,6 +10,7 @@ $PYTHON main.py ${*:1} \ --src-wait \ --nice-pause=1 \ --log-level=info \ + --log-file=ontime.log \ --src-host=127.0.0.1 \ --src-user=root \ --dst-host=127.0.0.1 \ diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py index d9390fb..9ca27d7 100644 --- a/src/writer/chcsvwriter.py +++ b/src/writer/chcsvwriter.py @@ -58,7 +58,7 @@ def insert(self, event_or_events=None): sql, ) -# print('running:', bash) + logging.info('starting %s', bash) os.system(bash) pass From 607ef888d3a1366987f03391140fddd9ddd4aabd Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Mon, 20 Nov 2017 14:54:45 +0300 Subject: [PATCH 8/8] new csv writer each time --- src/pool/bbpool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index ab83770..e628ce5 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -148,7 +148,7 @@ def rotate_belt(self, belt_index, flush=False): # time to flush data for specified key self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)]) - writer = self.writer_builder.get() + writer = self.writer_builder.new() writer.insert(self.belts[belt_index].pop()) writer.close() writer.push()