diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index a5d371b..2371325 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -3,6 +3,7 @@ import time import logging +import sys from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent @@ -109,52 +110,56 @@ def read(self): # fetch available events from MySQL - for mysql_event in self.binlog_stream: - if isinstance(mysql_event, WriteRowsEvent): - - rows_num_per_event = len(mysql_event.rows) - if (rows_num_per_event_min is None) or (rows_num_per_event < rows_num_per_event_min): - rows_num_per_event_min = rows_num_per_event - if (rows_num_per_event_max is None) or (rows_num_per_event > rows_num_per_event_max): - rows_num_per_event_max = rows_num_per_event - - if self.subscribers('WriteRowsEvent'): - self.write_rows_event_num += 1 - logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) - rows_num += len(mysql_event.rows) - rows_num_since_interim_performance_report += len(mysql_event.rows) - event = Event() - event.schema = mysql_event.schema - event.table = mysql_event.table - event.mysql_event = mysql_event - self.notify('WriteRowsEvent', event=event) - - if self.subscribers('WriteRowsEvent.EachRow'): - self.write_rows_event_each_row_num += 1 - logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) - for row in mysql_event.rows: - rows_num += 1 - rows_num_since_interim_performance_report += 1 + try: + for mysql_event in self.binlog_stream: + if isinstance(mysql_event, WriteRowsEvent): + + rows_num_per_event = len(mysql_event.rows) + if (rows_num_per_event_min is None) or (rows_num_per_event < rows_num_per_event_min): + rows_num_per_event_min = rows_num_per_event + if (rows_num_per_event_max is None) or (rows_num_per_event > rows_num_per_event_max): + rows_num_per_event_max = rows_num_per_event + + if self.subscribers('WriteRowsEvent'): + self.write_rows_event_num += 1 + logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) + rows_num += len(mysql_event.rows) + rows_num_since_interim_performance_report += len(mysql_event.rows) event = Event() event.schema = mysql_event.schema event.table = mysql_event.table - event.row = row['values'] - self.notify('WriteRowsEvent.EachRow', event=event) - - if rows_num_since_interim_performance_report >= 100000: - # speed report each N rows - self.performance_report( - start=start, - rows_num=rows_num, - rows_num_per_event_min=rows_num_per_event_min, - rows_num_per_event_max=rows_num_per_event_max, - ) - rows_num_since_interim_performance_report = 0 - rows_num_per_event_min = None - rows_num_per_event_max = None - else: - # skip non-insert events - pass + event.mysql_event = mysql_event + self.notify('WriteRowsEvent', event=event) + + if self.subscribers('WriteRowsEvent.EachRow'): + self.write_rows_event_each_row_num += 1 + logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) + for row in mysql_event.rows: + rows_num += 1 + rows_num_since_interim_performance_report += 1 + event = Event() + event.schema = mysql_event.schema + event.table = mysql_event.table + event.row = row['values'] + self.notify('WriteRowsEvent.EachRow', event=event) + + if rows_num_since_interim_performance_report >= 100000: + # speed report each N rows + self.performance_report( + start=start, + rows_num=rows_num, + rows_num_per_event_min=rows_num_per_event_min, + rows_num_per_event_max=rows_num_per_event_max, + ) + rows_num_since_interim_performance_report = 0 + rows_num_per_event_min = None + rows_num_per_event_max = None + else: + # skip non-insert events + pass + except Exception as ex: + logging.critical(ex) + sys.exit(1) # all events fetched (or none of them available) diff --git a/dev_run.sh b/dev_run.sh new file mode 100755 index 0000000..69df7ec --- /dev/null +++ b/dev_run.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +# ugly stub to suppress unsufficient sockets +#sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" + +# run data reader with specified Python version + +PYTHON="python3" + +CH_MYSQL="-m clickhouse_mysql.main" + +if [ ! -d "clickhouse_mysql" ]; then + # no clickhouse_mysql dir available - step out of examples dir + cd .. +fi + +$PYTHON $CH_MYSQL ${*:1} \ + --src-resume \ + --src-wait \ + --nice-pause=1 \ + --log-level=info \ + --src-host=127.0.0.1 \ + --src-user=root \ + --dst-host=127.0.0.1 \ + --csvpool \ + --csvpool-file-path-prefix=qwe_ \ + --mempool-max-flush-interval=60 \ + --mempool-max-events-num=10000 + +# --log-file=ontime.log \ +# --mempool +# --mempool-max-events-num=3 +# --mempool-max-flush-interval=30 +# --dst-file=dst.csv +# --dst-schema=db +# --dst-table=datatypes +# --csvpool-keep-files +# --log-level=info \ diff --git a/examples/airline_ontime_data_download.sh b/examples/airline_ontime_data_download.sh index 8d9d3a9..ee4cd94 100755 --- a/examples/airline_ontime_data_download.sh +++ b/examples/airline_ontime_data_download.sh @@ -4,11 +4,17 @@ ZIP_FILES_DIR=$(pwd)"/zip" CSV_FILES_DIR=$(pwd)"/csv" +FROM_YEAR=1987 +TO_YEAR=2017 + +FROM_MONTH=1 +TO_MONTH=12 + echo "Check required commands availability" -if command -v wget && command -v unzip && command -v clickhouse-client && command -v wc && command -v awk; then +if command -v wget && command -v unzip; then echo "Looks like all required commands are available" else - echo "Please ensure availability of: wget && unzip && clickhouse-client && wc && awk" + echo "Please ensure availability of: wget && unzip" exit 1 fi @@ -23,8 +29,8 @@ if [ ! -d "$ZIP_FILES_DIR" ]; then fi echo "Download files into $ZIP_FILES_DIR" -for year in `seq 1987 2017`; do - for month in `seq 1 12`; do +for year in `seq $FROM_YEAR $TO_YEAR`; do + for month in `seq $FROM_MONTH $TO_MONTH`; do FILE_NAME="On_Time_On_Time_Performance_${year}_${month}.zip" wget -O "$ZIP_FILES_DIR/$FILE_NAME" "http://transtats.bts.gov/PREZIP/$FILE_NAME" done @@ -32,6 +38,7 @@ done echo "Unzip dataset" +echo "Create dir $CSV_FILES_DIR for unzipped CSV files" mkdir -p "$CSV_FILES_DIR" if [ ! -d "$CSV_FILES_DIR" ]; then diff --git a/examples/datatypes.sh b/examples/datatypes.sh index 8633145..677cdb7 100755 --- a/examples/datatypes.sh +++ b/examples/datatypes.sh @@ -20,10 +20,10 @@ $PYTHON $CH_MYSQL ${*:1} \ --csvpool \ --csvpool-file-path-prefix=qwe_ \ --csv-column-default-value \ - date_1=2000-01-01 \ - datetime_1=2000-01-01\ 01:02:03 \ - time_1=2001-01-01\ 01:02:03 \ - timestamp_1=2002-01-01\ 01:02:03 \ + date_1=2000-01-01 \ + datetime_1=2000-01-01\ 01:02:03 \ + time_1=2001-01-01\ 01:02:03 \ + timestamp_1=2002-01-01\ 01:02:03 \ --mempool-max-flush-interval=600 \ --mempool-max-events-num=900000 diff --git a/package_source_distr.sh b/package_source_distr.sh new file mode 100755 index 0000000..07306e9 --- /dev/null +++ b/package_source_distr.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +python3 setup.py sdist diff --git a/package_wheels_distr.sh b/package_wheels_distr.sh new file mode 100755 index 0000000..780d4f3 --- /dev/null +++ b/package_wheels_distr.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +python3 setup.py bdist_wheel