From 06a19d0d3a8fbf66580243d2cb86726c22d1265e Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Tue, 10 Aug 2021 18:59:00 +0200 Subject: [PATCH 01/18] Added support to insert/update and CSV --- .gitignore | 3 +- clickhouse_mysql/clioptions.py | 23 +++ clickhouse_mysql/config.py | 13 +- clickhouse_mysql/event/event.py | 6 +- clickhouse_mysql/reader/mysqlreader.py | 4 +- clickhouse_mysql/writer/chcsvwriter.py | 222 ---------------------- clickhouse_mysql/writer/chwriter.py | 144 ++++++++------- clickhouse_mysql/writer/csvwriter.py | 159 +++++++++++++++- clickhouse_mysql/writer/poolwriter.py | 6 + clickhouse_mysql/writer/processwriter.py | 8 +- clickhouse_mysql/writer/tbcsvwriter.py | 223 +++++++++++++++++++++++ init/dump-tables.sh | 132 -------------- init/first-processing.sh | 18 -- init/run-listeners.sh | 130 ------------- init/stop-listeners.sh | 23 --- init/tb_tables.config | 9 - notes.txt | 3 + run-local.sh | 1 + run-test.sh | 1 + 19 files changed, 509 insertions(+), 619 deletions(-) delete mode 100644 clickhouse_mysql/writer/chcsvwriter.py create mode 100644 clickhouse_mysql/writer/tbcsvwriter.py delete mode 100755 init/dump-tables.sh delete mode 100755 init/first-processing.sh delete mode 100755 init/run-listeners.sh delete mode 100755 init/stop-listeners.sh delete mode 100644 init/tb_tables.config create mode 100644 notes.txt create mode 100755 run-local.sh create mode 100755 run-test.sh diff --git a/.gitignore b/.gitignore index c44086a..1b8fd9a 100644 --- a/.gitignore +++ b/.gitignore @@ -46,4 +46,5 @@ _build # Tinibird bl-* -out-* \ No newline at end of file +out-* +.e diff --git a/clickhouse_mysql/clioptions.py b/clickhouse_mysql/clioptions.py index 4be23a2..c46897e 100644 --- a/clickhouse_mysql/clioptions.py +++ b/clickhouse_mysql/clioptions.py @@ -93,6 +93,10 @@ class CLIOptions(Options): # # general app section # + + 'tb_host': 'https://ui.tinybird.co', + 'tb_token': None, + 'config_file': '/etc/clickhouse-mysql/clickhouse-mysql.conf', 'log_file': None, 'log_level': None, @@ -171,6 +175,20 @@ def options(self): # # general app section # + argparser.add_argument( + '--tb-host', + type=str, + default=self.default_options['tb_host'], + help='Tinybird host' + ) + + argparser.add_argument( + '--tb-token', + type=str, + default=self.default_options['tb_token'], + help='Tinybird host' + ) + argparser.add_argument( '--config-file', type=str, @@ -508,6 +526,11 @@ def options(self): # # general app section # + + 'tb_host': args.tb_host, + 'tb_token': args.tb_token, + + 'config_file': args.config_file, 'log_file': args.log_file, 'log_level': args.log_level, diff --git a/clickhouse_mysql/config.py b/clickhouse_mysql/config.py index e4551c8..1fc9030 100644 --- a/clickhouse_mysql/config.py +++ b/clickhouse_mysql/config.py @@ -6,7 +6,7 @@ from clickhouse_mysql.writer.chwriter import CHWriter from clickhouse_mysql.writer.csvwriter import CSVWriter -from clickhouse_mysql.writer.chcsvwriter import CHCSVWriter +from clickhouse_mysql.writer.tbcsvwriter import TBCSVWriter from clickhouse_mysql.writer.poolwriter import PoolWriter from clickhouse_mysql.writer.processwriter import ProcessWriter from clickhouse_mysql.objectbuilder import ObjectBuilder @@ -61,6 +61,10 @@ def __init__(self): # # # + 'tinybird': { + 'host': self.options['tb_host'], + 'token': self.options['tb_token'], + }, 'app': { 'config_file': self.options['config_file'], 'log_file': self.options['log_file'], @@ -359,8 +363,11 @@ def writer_builder_csvpool(self): 'dst_table': self.config['writer']['file']['dst_table'], 'dst_table_prefix': self.config['writer']['file']['dst_table_prefix'], 'next_writer_builder': ObjectBuilder( - class_name=CHCSVWriter, - constructor_params=self.config['writer']['clickhouse'] + class_name=TBCSVWriter, + constructor_params={ + 'tb_host': self.config['tinybird']['host'], + 'tb_token': self.config['tinybird']['token'] + } ), 'converter_builder': self.converter_builder(CONVERTER_CSV), }) diff --git a/clickhouse_mysql/event/event.py b/clickhouse_mysql/event/event.py index e018e57..e38f80b 100644 --- a/clickhouse_mysql/event/event.py +++ b/clickhouse_mysql/event/event.py @@ -64,7 +64,11 @@ def __next__(self): if self.pymysqlreplication_event is not None: # in native replication event actual data are in row['values'] dict item - return item['values'] + if 'after_values' in item: + return item['after_values'] + else: + return item['values'] + else: # local-kept data return item diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index 659ab77..040bac0 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -316,7 +316,7 @@ def process_update_rows_event(self, mysql_event): return # statistics - #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) + self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) if self.subscribers('UpdateRowsEvent'): # dispatch event to subscribers @@ -330,7 +330,7 @@ def process_update_rows_event(self, mysql_event): event.table = mysql_event.table event.pymysqlreplication_event = mysql_event - #self.process_first_event(event=event) + self.process_first_event(event=event) self.notify('UpdateRowsEvent', event=event) # self.stat_write_rows_event_finalyse() diff --git a/clickhouse_mysql/writer/chcsvwriter.py b/clickhouse_mysql/writer/chcsvwriter.py deleted file mode 100644 index 88571c3..0000000 --- a/clickhouse_mysql/writer/chcsvwriter.py +++ /dev/null @@ -1,222 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import logging -import shlex - -from clickhouse_mysql.writer.writer import Writer -from clickhouse_mysql.tableprocessor import TableProcessor - - -class CHCSVWriter(Writer): - """Write into ClickHouse via CSV file and clickhouse-client tool""" - - dst_schema = None - dst_table = None - dst_distribute = None - - host = None - port = None - user = None - password = None - - def __init__( - self, - connection_settings, - dst_schema=None, - dst_table=None, - dst_table_prefix=None, - dst_distribute=False, - ): - if dst_distribute and dst_schema is not None: - dst_schema += "_all" - if dst_distribute and dst_table is not None: - dst_table += "_all" - logging.info( - "CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, - dst_table)) - self.host = connection_settings['host'] - self.port = connection_settings['port'] - self.user = connection_settings['user'] - self.password = connection_settings['password'] - self.dst_schema = dst_schema - self.dst_table = dst_table - self.dst_table_prefix = dst_table_prefix - self.dst_distribute = dst_distribute - - def insert(self, event_or_events=None): - # event_or_events = [ - # event: { - # row: {'id': 3, 'a': 3} - # }, - # event: { - # row: {'id': 3, 'a': 3} - # }, - # ] - - events = self.listify(event_or_events) - if len(events) < 1: - logging.warning('No events to insert. class: %s', __class__) - return - - # assume we have at least one Event - - logging.debug('class:%s insert %d rows', __class__, len(events)) - - for event in events: - schema = self.dst_schema if self.dst_schema else event.schema - table = None - if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) - else: - table = self.dst_table if self.dst_table else event.table - if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) - - sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( - schema, - table, - ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), - ) - - choptions = "" - if self.host: - choptions += " --host=" + shlex.quote(self.host) - if self.port: - choptions += " --port=" + str(self.port) - if self.user: - choptions += " --user=" + shlex.quote(self.user) - if self.password: - choptions += " --password=" + shlex.quote(self.password) - bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( - event.filename, - choptions, - sql, - ) - - logging.info('starting clickhouse-client process') - logging.debug('starting %s', bash) - os.system(bash) - - pass - - def deleteRow(self, event_or_events=None): - # event_or_events = [ - # event: { - # row: {'id': 3, 'a': 3} - # }, - # event: { - # row: {'id': 3, 'a': 3} - # }, - # ] - - events = self.listify(event_or_events) - if len(events) < 1: - logging.warning('No events to delete. class: %s', __class__) - return - - # assume we have at least one Event - - logging.debug('class:%s delete %d rows', __class__, len(events)) - - for event in events: - schema = self.dst_schema if self.dst_schema else event.schema - table = None - if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) - else: - table = self.dst_table if self.dst_table else event.table - if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) - - sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( - schema, - table, - ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), - ) - - choptions = "" - if self.host: - choptions += " --host=" + shlex.quote(self.host) - if self.port: - choptions += " --port=" + str(self.port) - if self.user: - choptions += " --user=" + shlex.quote(self.user) - if self.password: - choptions += " --password=" + shlex.quote(self.password) - bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( - event.filename, - choptions, - sql, - ) - - logging.info('starting clickhouse-client process for delete operation') - logging.debug('starting %s', bash) - os.system(bash) - - pass - - def update(self, event_or_events=None): - # event_or_events = [ - # event: { - # row: {'id': 3, 'a': 3} - # }, - # event: { - # row: {'id': 3, 'a': 3} - # }, - # ] - - logging.info('starting clickhouse-client process for update operation') - - events = self.listify(event_or_events) - if len(events) < 1: - logging.warning('No events to update. class: %s', __class__) - return - - # assume we have at least one Event - - logging.debug('class:%s update %d rows', __class__, len(events)) - - for event in events: - schema = self.dst_schema if self.dst_schema else event.schema - table = None - if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) - else: - table = self.dst_table if self.dst_table else event.table - if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) - - sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( - schema, - table, - ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), - ) - - sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {3}'.format( - schema, - table, - ', '.join(map(lambda column, value: '`%s`=`%s' % column, event.fieldnames, event.fieldnames)) - ) - - choptions = "" - if self.host: - choptions += " --host=" + shlex.quote(self.host) - if self.port: - choptions += " --port=" + str(self.port) - if self.user: - choptions += " --user=" + shlex.quote(self.user) - if self.password: - choptions += " --password=" + shlex.quote(self.password) - bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( - event.filename, - choptions, - sql, - ) - - logging.info('starting clickhouse-client process') - logging.debug('starting %s', bash) - os.system(bash) - - pass diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index 6cca8ef..c43ec42 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -68,15 +68,25 @@ def insert(self, event_or_events=None): event_converted = None for event in events: if not event.verify: - logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) - continue # for event + logging.warning( + 'Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) + continue # for event event_converted = self.convert(event) for row in event_converted: + # These columns are added to identify the last change (tb_upd) and the kind of operation performed + # 0 - INSERT, 1 - UPDATE, 2 - DELETE + row['tb_upd'] = datetime.datetime.now() + row['operation'] = 0 + for key in row.keys(): - # we need to convert Decimal value to str value for suitable for table structure - if type(row[key]) == Decimal: + # we need to convert Decimal or timedelta value to str value for suitable for table structure + if type(row[key]) == [Decimal, datetime.timedelta]: row[key] = str(row[key]) + + # These columns are added to identify the last change (tb_upd) and when a row is deleted (1) + # row['tb_upd'] = datetime.datetime.now() + # row['operation'] = 0 rows.append(row) logging.debug('class:%s insert %d row(s)', __class__, len(rows)) @@ -86,13 +96,16 @@ def insert(self, event_or_events=None): schema = self.dst_schema if self.dst_schema else event_converted.schema table = None if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + table = TableProcessor.create_distributed_table_name( + db=event_converted.schema, table=event_converted.table) else: table = self.dst_table if self.dst_table else event_converted.table if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + table = TableProcessor.create_migrated_table_name( + prefix=self.dst_table_prefix, table=table) - logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, self.dst_table)) + logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format( + schema, table, self.dst_schema, self.dst_table)) # and INSERT converted rows @@ -103,6 +116,7 @@ def insert(self, event_or_events=None): table, ', '.join(map(lambda column: '`%s`' % column, rows[0].keys())) ) + logging.debug(f"CHWRITER QUERY INSERT: {sql}") self.client.execute(sql, rows) except Exception as ex: logging.critical('QUERY FAILED') @@ -138,7 +152,6 @@ def delete_row(self, event_or_events): rows = [] event_converted = None - pk = None for event in events: if not event.verify: logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), @@ -146,12 +159,20 @@ def delete_row(self, event_or_events): continue # for event event_converted = self.convert(event) - pk = event_converted.pymysqlreplication_event.primary_key for row in event_converted: + # These columns are added to identify the last change (tb_upd) and the kind of operation performed + # 0 - INSERT, 1 - UPDATE, 2 - DELETE + row['tb_upd'] = datetime.datetime.now() + row['operation'] = 2 + for key in row.keys(): - # we need to convert Decimal value to str value for suitable for table structure - if type(row[key]) == Decimal: + # we need to convert Decimal or timedelta value to str value for suitable for table structure + if type(row[key]) in [Decimal, datetime.timedelta]: row[key] = str(row[key]) + + # These columns are added to identify the last change (tb_upd) and when a row is deleted (1) + # row['tb_upd'] = datetime.datetime.now() + # row['operation'] = 2 rows.append(row) logging.debug('class:%s delete %d row(s)', __class__, len(rows)) @@ -161,37 +182,45 @@ def delete_row(self, event_or_events): schema = self.dst_schema if self.dst_schema else event_converted.schema table = None if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + table = TableProcessor.create_distributed_table_name( + db=event_converted.schema, table=event_converted.table) else: table = self.dst_table if self.dst_table else event_converted.table if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + table = TableProcessor.create_migrated_table_name( + prefix=self.dst_table_prefix, table=table) logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, self.dst_table)) # and DELETE converted rows - sql = '' - # try: - # sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( - # schema, - # table, - # ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), - # ) - # self.client.execute(sql, rows) + # These columns are added to identify the last change (tb_upd) and the kind of operation performed + # 0 - INSERT, 1 - UPDATE, 2 - DELETE + rows[0]['tb_upd'] = datetime.datetime.now() + rows[0]['operation'] = 2 sql = '' try: - sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2}'.format( + sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format( schema, table, - ' and '.join(filter(None, map( - lambda column, value: "" if column != pk else self.get_data_format(column, value), - row.keys(), row.values()))) + ', '.join(map(lambda column: '`%s`' % column, rows[0].keys())) ) + logging.debug(f"CHWRITER QUERY DELETE: {sql}") + self.client.execute(sql, rows) - self.client.execute(sql) + # sql = '' + # try: + # sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2}'.format( + # schema, + # table, + # ' and '.join(filter(None, map( + # lambda column, value: "" if column != pk else self.get_data_format(column, value), + # row.keys(), row.values()))) + # ) + # + # self.client.execute(sql) except Exception as ex: logging.critical('QUERY FAILED') @@ -204,6 +233,7 @@ def delete_row(self, event_or_events): """ Get string format pattern for update and delete operations """ + def get_data_format(self, column, value): t = type(value) if t == str: @@ -245,7 +275,6 @@ def update(self, event_or_events): rows = [] event_converted = None - pk = None for event in events: if not event.verify: logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), @@ -253,18 +282,18 @@ def update(self, event_or_events): continue # for event event_converted = self.convert(event) - pk = [event_converted.pymysqlreplication_event.primary_key] - if event_converted.table == 'assets': - pk.append('name') - pk.append('title_id') - pk.append('company_id') - pk.append('asset_type_enumeration_entry_id') for row in event_converted.pymysqlreplication_event.rows: + for key in row['after_values'].keys(): - # we need to convert Decimal value to str value for suitable for table structure - if type(row['after_values'][key]) == Decimal: - row['after_values'][key] = str(row['after_values'][key]) - rows.append(row) + # we need to convert Decimal or timedelta value to str value for suitable for table structure + if type(row['after_values'][key]) in [Decimal, datetime.timedelta]: + row['after_values'][key] = str( + row['after_values'][key]) + + # These columns are added to identify the last change (tb_upd) and when a row is deleted (1) + row['after_values']['tb_upd'] = datetime.datetime.now() + row['after_values']['operation'] = 1 + rows.append(row['after_values']) logging.debug('class:%s update %d row(s)', __class__, len(rows)) @@ -273,55 +302,42 @@ def update(self, event_or_events): schema = self.dst_schema if self.dst_schema else event_converted.schema table = None if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + table = TableProcessor.create_distributed_table_name( + db=event_converted.schema, table=event_converted.table) else: table = self.dst_table if self.dst_table else event_converted.table if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + table = TableProcessor.create_migrated_table_name( + prefix=self.dst_table_prefix, table=table) logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, self.dst_table)) # and UPDATE converted rows - # improve performance updating just those fields which have actually changed - updated_values = dict(set(row['after_values'].items()).difference(set(row['before_values'].items()))) - + + # These columns are added to identify the last change (tb_upd) and when a row is deleted (1) + rows[0]['tb_upd'] = datetime.datetime.now() + rows[0]['operation'] = 1 + sql = '' try: - # sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( - # schema, - # table, - # ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), - # "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - # ' and '.join(filter(None, map( - # lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), - # row['before_values'].keys(), row['before_values'].values()))) - # ) - - sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( + sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format( schema, table, - ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), updated_values.keys(), updated_values.values()))), - "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - ' and '.join(filter(None, map( - lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), - row['before_values'].keys(), row['before_values'].values()))) + ', '.join(map(lambda column: '`%s`' % column, rows[0].keys())) ) - - logging.debug("SQL UPDATE: \n\n " + sql + "\n\n") - - self.client.execute(sql) + logging.debug(f"CHWRITER QUERY UPDATE: {sql}") + self.client.execute(sql, rows) except Exception as ex: logging.critical('QUERY FAILED') logging.critical('ex={}'.format(ex)) logging.critical('sql={}'.format(sql)) + logging.critical('data={}'.format(rows)) # sys.exit(0) # all DONE - - if __name__ == '__main__': connection_settings = { 'host': '192.168.74.230', diff --git a/clickhouse_mysql/writer/csvwriter.py b/clickhouse_mysql/writer/csvwriter.py index 18cfda6..00c6eaf 100644 --- a/clickhouse_mysql/writer/csvwriter.py +++ b/clickhouse_mysql/writer/csvwriter.py @@ -11,6 +11,10 @@ from clickhouse_mysql.writer.writer import Writer from clickhouse_mysql.event.event import Event +import datetime + +from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent + class CSVWriter(Writer): """Write CSV files""" @@ -89,6 +93,7 @@ def open(self): # open file for write-at-the-end mode self.file = open(self.path, 'a+') + def insert(self, event_or_events): # event_or_events = [ # event: { @@ -118,7 +123,14 @@ def insert(self, event_or_events): logging.warning('Event verification failed. Skip insert(). Event: %s Class: %s', event.meta(), __class__) return - self.fieldnames = sorted(self.convert(copy.copy(event.first_row())).keys()) + event_converted = self.convert(event) + rows = event_converted.pymysqlreplication_event.rows + headers = list(rows[0]['values'].keys()) + headers.append('operation') + headers.append('tb_upd') + + # self.fieldnames = sorted(self.convert(copy.copy(event.first_row())).keys()) + self.fieldnames = headers if self.dst_schema is None: self.dst_schema = event.schema if self.dst_table is None: @@ -132,21 +144,148 @@ def insert(self, event_or_events): if not event.verify: logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) continue # for event - for row in event: - self.writer.writerow(self.convert(row)) + self.generate_row(event) + + def delete_row(self, event_or_events): + + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] - def deleteRow(self, event_or_events): - """ - TODO - """ logging.debug("Delete CSV Writer") + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to delete. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s delete %d events', __class__, len(events)) + + if not self.opened(): + self.open() + + if not self.writer: + # pick any event from the list + event = events[0] + if not event.verify: + logging.warning('Event verification failed. Skip insert(). Event: %s Class: %s', event.meta(), __class__) + return + + event_converted = self.convert(event) + rows = event_converted.pymysqlreplication_event.rows + headers = list(rows[0]['values'].keys()) + headers.append('operation') + headers.append('tb_upd') + + self.fieldnames = headers + if self.dst_schema is None: + self.dst_schema = event.schema + if self.dst_table is None: + self.dst_table = event.table + + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) + if not self.header_written: + self.writer.writeheader() + + for event in events: + if not event.verify: + logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) + continue # for event + self.generate_row(event) + + + def update(self, event_or_events): - """ - TODO - """ + + # event_or_events = [ + # event: { + # row: { + # 'before_values': {'id': 3, 'a': 3}, + # 'after_values': {'id': 3, 'a': 2} + # } + # }, + # event: { + # row: { + # 'before_values': {'id': 2, 'a': 3}, + # 'after_values': {'id': 2, 'a': 2} + # } + # }, + # ] + logging.debug("Update CSV Writer") + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to update. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s updated %d events', __class__, len(events)) + + if not self.opened(): + self.open() + + if not self.writer: + # pick any event from the list + event = events[0] + if not event.verify: + logging.warning('Event verification failed. Skip insert(). Event: %s Class: %s', event.meta(), __class__) + return + + event_converted = self.convert(event) + rows = event_converted.pymysqlreplication_event.rows + headers = list(rows[0]['after_values'].keys()) + headers.append('operation') + headers.append('tb_upd') + + # self.fieldnames = sorted(headers) + self.fieldnames = headers + if self.dst_schema is None: + self.dst_schema = event.schema + if self.dst_table is None: + self.dst_table = event.table + + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) + if not self.header_written: + self.writer.writeheader() + + for event in events: + if not event.verify: + logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) + continue # for event + + event_converted = self.convert(event) + self.generate_row(event_converted) + + + def generate_row(self, event): + """ When using mempool or csvpool events are cached so you can receive different kind of events in the same list. These events should be handled in a different way """ + + if isinstance(event.pymysqlreplication_event, WriteRowsEvent): + for row in event: + row['tb_upd'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + row['operation'] = 0 + self.writer.writerow(self.convert(row)) + elif isinstance(event.pymysqlreplication_event, DeleteRowsEvent): + for row in event: + row['tb_upd'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + row['operation'] = 2 + self.writer.writerow(self.convert(row)) + elif isinstance(event.pymysqlreplication_event, UpdateRowsEvent): + for row in event.pymysqlreplication_event.rows: + row['after_values']['tb_upd'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + row['after_values']['operation'] = 1 + self.writer.writerow(self.convert(row['after_values'])) + + def push(self): if not self.next_writer_builder or not self.fieldnames: return diff --git a/clickhouse_mysql/writer/poolwriter.py b/clickhouse_mysql/writer/poolwriter.py index 129f05a..071a6ef 100644 --- a/clickhouse_mysql/writer/poolwriter.py +++ b/clickhouse_mysql/writer/poolwriter.py @@ -38,11 +38,17 @@ def insert(self, event_or_events): self.pool.insert(event_or_events) + # TODO delete if delete_row works def delete(self, event_or_events): """Insert delete data into Pool""" logging.debug('class:%s delete', __class__) self.pool.insert(event_or_events) + def delete_row(self, event_or_events): + """Insert delete data into Pool""" + logging.debug('class:%s delete', __class__) + self.pool.insert(event_or_events) + def update(self, event_or_events): """Insert update data into Pool""" logging.debug('class:%s update', __class__) diff --git a/clickhouse_mysql/writer/processwriter.py b/clickhouse_mysql/writer/processwriter.py index 8177345..b3584f2 100644 --- a/clickhouse_mysql/writer/processwriter.py +++ b/clickhouse_mysql/writer/processwriter.py @@ -40,22 +40,22 @@ def processDelete(self, event_or_events=None): logging.debug('class:%s process()', __class__) writer = self.next_writer_builder.get() - writer.deleteRow(event_or_events) + writer.delete_row(event_or_events) writer.close() writer.push() writer.destroy() - logging.debug('class:%s process() done', __class__) + logging.debug('class:%s processDelete() done', __class__) def processUpdate(self, event_or_events=None): """Separate process body to be run""" logging.debug('class:%s process()', __class__) writer = self.next_writer_builder.get() - writer.delete(event_or_events) + writer.update(event_or_events) writer.close() writer.push() writer.destroy() - logging.debug('class:%s process() done', __class__) + logging.debug('class:%s processUpdate() done', __class__) def insert(self, event_or_events=None): # event_or_events = [ diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py new file mode 100644 index 0000000..60482f0 --- /dev/null +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import logging +import shlex + +from clickhouse_mysql.writer.writer import Writer +from clickhouse_mysql.tableprocessor import TableProcessor + +import requests +from requests_toolbelt.multipart.encoder import MultipartEncoder +import json + +class TBCSVWriter(Writer): + """Write into Tinybird via CSV file""" + + dst_schema = None + dst_table = None + dst_distribute = None + + tb_host = None + tb_token = None + + def __init__( + self, + tb_host, + tb_token, + dst_schema=None, + dst_table=None, + dst_table_prefix=None, + dst_distribute=False, + ): + # if dst_distribute and dst_schema is not None: + # dst_schema += "_all" + # if dst_distribute and dst_table is not None: + # dst_table += "_all" + # logging.info( + # "CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, + # dst_table)) + self.tb_host = tb_host + self.tb_token = tb_token + + if self.tb_host is None or self.tb_token is None: + logging.critical(f" Host: {self.tb_host} or token {self.tb_token} is missing") + return None + + self.dst_schema = dst_schema + self.dst_table = dst_table + self.dst_table_prefix = dst_table_prefix + self.dst_distribute = dst_distribute + + def insert(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to insert. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s insert %d rows', __class__, len(events)) + + for event in events: + #schema = self.dst_schema if self.dst_schema else event.schema + #table = self.dst_table if self.dst_table else event.table + + params = { + 'name': self.dst_table, + 'mode': 'append' + } + + f = open(event.filename, 'rb') + m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')}) + + url = f"{self.tb_host}/v0/datasources" + + response = requests.post(url, data=m, + headers={'Authorization': 'Bearer ' + self.tb_token, 'Content-Type': m.content_type}, + params=params + ) + + # logging.debug(response.text) + if response.status_code == 200: + json_object = json.loads(response.content) + logging.debug(f"Import id: {json_object['import_id']}") + # logging.debug(f"Response: {json.dumps(json_object, indent=2)}") + + else: + logging.debug(f"ERROR {response.text}") + + pass + + def deleteRow(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + # events = self.listify(event_or_events) + # if len(events) < 1: + # logging.warning('No events to delete. class: %s', __class__) + # return + + # # assume we have at least one Event + + # logging.debug('class:%s delete %d rows', __class__, len(events)) + + # for event in events: + # schema = self.dst_schema if self.dst_schema else event.schema + # table = None + # if self.dst_distribute: + # table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) + # else: + # table = self.dst_table if self.dst_table else event.table + # if self.dst_schema: + # table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + # sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( + # schema, + # table, + # ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + # ) + + # choptions = "" + # if self.host: + # choptions += " --host=" + shlex.quote(self.host) + # if self.port: + # choptions += " --port=" + str(self.port) + # if self.user: + # choptions += " --user=" + shlex.quote(self.user) + # if self.password: + # choptions += " --password=" + shlex.quote(self.password) + # bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + # event.filename, + # choptions, + # sql, + # ) + + # logging.info('starting clickhouse-client process for delete operation') + # logging.debug('starting %s', bash) + # os.system(bash) + + logging.debug("CHCSVWriter: delete row") + pass + + def update(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + # logging.info('starting clickhouse-client process for update operation') + + # events = self.listify(event_or_events) + # if len(events) < 1: + # logging.warning('No events to update. class: %s', __class__) + # return + + # # assume we have at least one Event + + # logging.debug('class:%s update %d rows', __class__, len(events)) + + # for event in events: + # schema = self.dst_schema if self.dst_schema else event.schema + # table = None + # if self.dst_distribute: + # table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) + # else: + # table = self.dst_table if self.dst_table else event.table + # if self.dst_schema: + # table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + # sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( + # schema, + # table, + # ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + # ) + + # sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {3}'.format( + # schema, + # table, + # ', '.join(map(lambda column, value: '`%s`=`%s' % column, event.fieldnames, event.fieldnames)) + # ) + + # choptions = "" + # if self.host: + # choptions += " --host=" + shlex.quote(self.host) + # if self.port: + # choptions += " --port=" + str(self.port) + # if self.user: + # choptions += " --user=" + shlex.quote(self.user) + # if self.password: + # choptions += " --password=" + shlex.quote(self.password) + # bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + # event.filename, + # choptions, + # sql, + # ) + + # logging.info('starting clickhouse-client process') + # logging.debug('starting %s', bash) + # os.system(bash) + + logging.debug("CHCSVWriter: delete row") + + pass diff --git a/init/dump-tables.sh b/init/dump-tables.sh deleted file mode 100755 index 25eb02b..0000000 --- a/init/dump-tables.sh +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/bash - -if [ "$#" -ne 1 ]; then - echo "Usage: $0 " - exit -1 -fi - -DUMP_PATH=$1 - -source tb_tables.config - -########### -### titles -########### - -echo "Dumping titles" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction titles > $DUMP_PATH/titles.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/titles-insert-tb.sql -cat $DUMP_PATH/titles.sql | grep "INSERT INTO" >> $DUMP_PATH/titles-insert-tb.sql -sed -i 's/INSERT INTO `titles` VALUES/INSERT INTO `t_8a192b9c7ece4572a5a2fc9858e26d5c` (`id`, `name`, `licensor_id`, `created_at`, `updated_at`, `company_id`, `series_id`, `external_id`, `poster_file_name`, `poster_content_type`, `poster_file_size`, `poster_updated_at`, `episode_number`, `dirty_episode_number`, `rights_count`, `blackouts_count`, `denied_rights_count`, `images_count`, `cover_image_id`, `title_type`, `metadata_updated_at`, `promoted_content_id`, `promoted_content_type`, `soft_destroyed`, `credits_count`, `translated_attributes`, `rules_count`, `discarded`, `episode_reference_id`, `brand_id`) VALUES/g' $DUMP_PATH/titles-insert-tb.sql - -echo "Truncate titles table" -echo "truncate $TB_DATABASE.$TITLES_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading titles into CH" -cat $DUMP_PATH/titles-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Titles loaded" - -read -p "Press enter to continue" - -########### -### assets -########### - -echo "Dumping assets" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction assets > $DUMP_PATH/assets.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/assets-insert-tb.sql -cat $DUMP_PATH/assets.sql | grep "INSERT INTO" >> $DUMP_PATH/assets-insert-tb.sql -sed -i 's/INSERT INTO `assets` VALUES/INSERT INTO `t_4c03fdeb4e3e4db784ead40b06ec8617` (`id`, `name`, `title_id`, `created_at`, `updated_at`, `description`, `runtime_in_milliseconds`, `metadata_updated_at`, `company_id`, `asset_type_enumeration_entry_id`, `external_id`) VALUES/g' $DUMP_PATH/assets-insert-tb.sql - -echo "Truncate assets table" -echo "truncate $TB_DATABASE.$ASSETS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading assets into CH" -cat $DUMP_PATH/assets-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Assets loaded" - -read -p "Press enter to continue" - -####################### -### Collection-entries -####################### - -echo "Dumping collection-entries" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction collection_entries > $DUMP_PATH/collections.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/collections-insert-tb.sql -cat $DUMP_PATH/collections.sql | grep "INSERT INTO" >> $DUMP_PATH/collections-insert-tb.sql -sed -i 's/INSERT INTO `collection_entries` VALUES/INSERT INTO `t_3dd7b323438943c687bd4e13a0e181a1` (`collection_id`, `title_id`, `id`, `position`) VALUES/g' $DUMP_PATH/collections-insert-tb.sql - -echo "Truncate collections table" -echo "truncate $TB_DATABASE.$COLLECTIONS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading collection-entries into CH" -cat $DUMP_PATH/collections-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Collection-entries loaded" - -read -p "Press enter to continue" - -############## -### Features -############## - -echo "Dumping features" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction features > $DUMP_PATH/features.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/features-insert-tb.sql -read -p "Press enter to continue use" -cat $DUMP_PATH/features.sql | grep "INSERT INTO" >> $DUMP_PATH/features-insert-tb.sql -read -p "Press enter to continue insert" -sed -i 's/INSERT INTO `features` VALUES/INSERT INTO `t_23f41723e0eb480088cbb1c8f890a38c` (`id`, `name`, `enabled`, `company_id`, `created_at`, `updated_at`) VALUES/g' $DUMP_PATH/features-insert-tb.sql -read -p "Press enter to continue sed" -echo "Truncate features table" -echo "truncate $TB_DATABASE.$FEATURES_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading features into CH" -cat $DUMP_PATH/features-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Features loaded" - -read -p "Press enter to continue" - -############## -### Platforms -############## - -echo "Dumping platforms" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction platforms > $DUMP_PATH/platforms.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/platforms-insert-tb.sql -cat $DUMP_PATH/platforms.sql | grep "INSERT INTO" >> $DUMP_PATH/platforms-insert-tb.sql -sed -i 's/INSERT INTO `platforms` VALUES/INSERT INTO `t_83f598dc74254de68216a7c7735caffb` (`id`, `company_id`, `name`, `created_at`, `updated_at`, `sequence_service_titles_url`, `_deprecated_sequence_template_name`, `_deprecated_owned`, `sequence_template_url`, `metadata_constant_name`, `outlet_id`, `automatic_publication_enabled`, `metadata_updated_at`, `granted_categories`, `external_id`, `timezone`) VALUES/g' $DUMP_PATH/platforms-insert-tb.sql - -echo "Truncate platforms table" -echo "truncate $TB_DATABASE.$PLATFORMS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading platforms into CH" -cat $DUMP_PATH/platforms-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Platforms loaded" - -read -p "Press enter to continue" - -################# -### Schedulings -################# - -echo "Dumping schedulings" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction schedulings > $DUMP_PATH/schedulings.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/schedulings-insert-tb.sql -cat $DUMP_PATH/schedulings.sql | grep "INSERT INTO" >> $DUMP_PATH/schedulings-insert-tb.sql -sed -i 's/INSERT INTO `schedulings` VALUES/INSERT INTO `t_b5e541d4e73d4301ba736c427bd667c5` (`id`, `title_id`, `put_up`, `take_down`, `created_at`, `updated_at`, `cleared`, `platform_id`, `rule_id`, `workflow_offset`, `sequence_asset_url`, `sequence_asset_name`, `workflow_sent`, `status`, `asset_id`, `rule_asset_id`, `title_group_id`, `workflow_web_url`, `_deprecated_publication_status`, `published_at`, `_prev_put_up`, `_prev_take_down`, `_pending_simulation`, `workflow_template_url`, `original_draft_scheduling_id`, `playlist_id`, `updating_playlist`, `workflow_job_url`, `workflow_status`, `conflict_types`, `metadata_updated_at`, `company_id`, `cached_title_episode_number`, `metadata_status`, `publication_status`, `publication_status_updated_at`, `metadata_status_updated_at`, `external_id`, `disabled_at`, `scheduling_type`, `overridden_rule_attributes`, `update_in_progress`, `metadata_error_digest`) VALUES/g' $DUMP_PATH/schedulings-insert-tb.sql - -echo "Truncate schedulings table" -echo "truncate $TB_DATABASE.$SCHEDULINGS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading schedulings into CH" -cat $DUMP_PATH/schedulings-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Schedulings loaded" - -echo "Process finished!" \ No newline at end of file diff --git a/init/first-processing.sh b/init/first-processing.sh deleted file mode 100755 index 7daa44c..0000000 --- a/init/first-processing.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -if [ "$#" -ne 1 ]; then - echo "Usage: $0 " - exit -1 -fi - -echo "Generate binlog timelog" -./run-listener.sh -./stop-listeners.sh - -echo "Generating dumps and loading data ..." -./dump-tables.sh $1 - -echo "Starting listeners" -./run-listener.sh - -echo "Done!" \ No newline at end of file diff --git a/init/run-listeners.sh b/init/run-listeners.sh deleted file mode 100755 index 21a7502..0000000 --- a/init/run-listeners.sh +++ /dev/null @@ -1,130 +0,0 @@ -#!/bin/bash - -LOG_LEVEL=debug - -SOURCE_HOST=127.0.0.1 -SOURCE_PORT=3307 -DESTINATION_HOST=127.0.0.1 -SOURCE_USER=tinybird -SOURCE_PASSWD=goo7eu9AeS3i - -PID_LOG_FILE=/tmp/listeners-pid.log - -source tb_tables.config - -############################################################ -# Run a process to synchronize MySQL table using binlog. -# -# $1 --> Source schema -# $2 --> Source table -# $3 --> Destination schema -# $4 --> Destination table -# $5 --> Server id -# $6 --> Log file -# $7 --> Binlog position file -# -############################################################# -function run_listener() { - - (clickhouse-mysql --src-server-id=$5 --src-wait --src-resume --binlog-position-file $7 --nice-pause=1 --src-host=$SOURCE_HOST --src-port=$SOURCE_PORT --src-user=$SOURCE_USER --src-password=$SOURCE_PASSWD --src-schemas=$1 --src-tables=$2 --dst-host=$DESTINATION_HOST --dst-schema=$3 --dst-table=$4 --log-level=$LOG_LEVEL --pump-data 2>> $6)& - -} - -function run_schedulings() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "schedulings" "$TB_DATABASE" "$SCHEDULINGS_TABLE" "91" "out-schedulings.log" "bl-pos-schedulings" - echo $! > $PID_LOG_FILE - -} - -function run_platforms() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "platforms" "$TB_DATABASE" "$PLATFORMS_TABLE" "92" "out-platforms.log" "bl-pos-platforms" - echo $! >> $PID_LOG_FILE - -} - -function run_titles() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "titles" "$TB_DATABASE" "$TITLES_TABLE" "93" "out-titles.log" "bl-pos-titles" - echo $! >> $PID_LOG_FILE -} - -function run_assets() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "assets" "$TB_DATABASE" "$ASSETS_TABLE" "94" "out-assets.log" "bl-pos-assets" - echo $! >> $PID_LOG_FILE -} - -function run_features() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "features" "$TB_DATABASE" "$FEATURES_TABLE" "95" "out-features.log" "bl-pos-features" - echo $! >> $PID_LOG_FILE -} - -function run_collections() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "collection_entries" "$TB_DATABASE" "$COLLECTIONS_TABLE" "96" "out-collections.log" "bl-pos-collections" - echo $! >> $PID_LOG_FILE -} - -function usage { - echo "usage: $0 -d datasource [-b clean_binlog]" - echo " -d datasource datasource to syn. Use all for synchronizing all available datasources." - echo " - all" - echo " - schedulings" - echo " - platforms" - echo " - titles" - echo " - assets" - echo " - features" - echo " - collections" - echo " -b clean_binlog clean binlog before running (true | false) False by default" - exit -1 -} - -datasource="NONE" -while getopts d:b: flag -do - case "${flag}" in - d) datasource=${OPTARG};; - b) binlog=${OPTARG};; - esac -done - -case "${datasource}" in - NONE) usage;; - all) run_schedulings binlog - run_platforms binlog - run_titles binlog - run_assets binlog - run_features binlog - run_collections binlog - ;; - schedulings) run_schedulings binlog;; - platforms) run_platforms binlog;; - titles) run_titles binlog;; - assets) run_assets binlog;; - features) run_features binlog;; - collections) run_collections binlog;; - *) usage;; -esac - -echo "PID processes in $PID_LOG_FILE" \ No newline at end of file diff --git a/init/stop-listeners.sh b/init/stop-listeners.sh deleted file mode 100755 index 582e97c..0000000 --- a/init/stop-listeners.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -PID_LOG_FILE=/tmp/listeners-pid.log - -count_processes() { - echo `ps aux | grep clickhouse-mysql-data-reader | wc -l` -} - -total_before=$(count_processes) - -while IFS= read -r line -do - echo "$line" - kill $line -done < "$PID_LOG_FILE" - -total_after=$(count_processes) - -procs=`echo "$total_after - 1" | bc` - -if [ $total_after -gt 1 ]; then - echo "You still have $procs processes running" -fi \ No newline at end of file diff --git a/init/tb_tables.config b/init/tb_tables.config deleted file mode 100644 index be59079..0000000 --- a/init/tb_tables.config +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -TB_DATABASE='d_073c5e' -TITLES_TABLE='t_8a192b9c7ece4572a5a2fc9858e26d5c' -ASSETS_TABLE='t_4c03fdeb4e3e4db784ead40b06ec8617' -COLLECTIONS_TABLE='t_3dd7b323438943c687bd4e13a0e181a1' -FEATURES_TABLE='t_23f41723e0eb480088cbb1c8f890a38c' -PLATFORMS_TABLE='t_83f598dc74254de68216a7c7735caffb' -SCHEDULINGS_TABLE='t_b5e541d4e73d4301ba736c427bd667c5' \ No newline at end of file diff --git a/notes.txt b/notes.txt new file mode 100644 index 0000000..20da4c9 --- /dev/null +++ b/notes.txt @@ -0,0 +1,3 @@ +# Add delete field + +awk -F"," 'BEGIN { OFS = "," } {$45="0"; print}' test.csv > test-out.csv \ No newline at end of file diff --git a/run-local.sh b/run-local.sh new file mode 100755 index 0000000..55d6224 --- /dev/null +++ b/run-local.sh @@ -0,0 +1 @@ +clickhouse-mysql --src-server=33 --src-wait --src-resume --binlog-position-file=bl-pos-schedulings --nice-pause=1 --src-host=127.0.1 --src-port=3307 --src-user=tinybird --src-password=eequush8Oo3aip1Eenae --src-schemas=movida_production --src-tables=schedulings --dst-host=127.0.0.1 --dst-schema=d_66a1b9 --dst-table=t_dc3499dc95544d98888ab410855356ba --log-level=debug --pump-data --csvpool --dst-file=/home/ygnuss/tinybird/dev/clickhouse-mysql-data-reader/sched-log.csv \ No newline at end of file diff --git a/run-test.sh b/run-test.sh new file mode 100755 index 0000000..791d109 --- /dev/null +++ b/run-test.sh @@ -0,0 +1 @@ +(clickhouse-mysql --src-server=33 --src-wait --src-resume --binlog-position-file=bl-pos-schedulings --nice-pause=1 --src-host=127.0.1 --src-port=3307 --src-user=tinybird --src-password=eequush8Oo3aip1Eenae --src-schemas=movida_production --src-tables=schedulings --dst-host=127.0.0.1 --dst-schema=d_66a1b9 --dst-table=t_dc3499dc95544d98888ab410855356ba --log-level=debug --pump-data --mempool --mempool-max-flush-interval=60 --mempool_max_rows_num=1000 --csvpool --dst-file=/home/deploy/clickhouse-mysql-data-reader/sched-log.csv 2>> out-listener.log)& From b5c568f0781fae67524cba85eea5a605546e766b Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Tue, 10 Aug 2021 19:22:15 +0200 Subject: [PATCH 02/18] Fix missing config --- clickhouse_mysql/config.py | 3 ++- clickhouse_mysql/writer/tbcsvwriter.py | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clickhouse_mysql/config.py b/clickhouse_mysql/config.py index 1fc9030..3103c87 100644 --- a/clickhouse_mysql/config.py +++ b/clickhouse_mysql/config.py @@ -366,7 +366,8 @@ def writer_builder_csvpool(self): class_name=TBCSVWriter, constructor_params={ 'tb_host': self.config['tinybird']['host'], - 'tb_token': self.config['tinybird']['token'] + 'tb_token': self.config['tinybird']['token'], + 'dst_table': self.config['writer']['clickhouse']['dst_table'] } ), 'converter_builder': self.converter_builder(CONVERTER_CSV), diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py index 60482f0..f2a6d50 100644 --- a/clickhouse_mysql/writer/tbcsvwriter.py +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -71,10 +71,9 @@ def insert(self, event_or_events=None): for event in events: #schema = self.dst_schema if self.dst_schema else event.schema - #table = self.dst_table if self.dst_table else event.table - + table = self.dst_table if self.dst_table else event.table params = { - 'name': self.dst_table, + 'name': table, 'mode': 'append' } From f149b5f27b743cd8235484e90d8b5ee92706676f Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Tue, 7 Sep 2021 19:52:00 +0300 Subject: [PATCH 03/18] Added shutdown gracefully --- clickhouse_mysql/pumper.py | 16 ++++++++++-- clickhouse_mysql/reader/mysqlreader.py | 21 ++++++++++------ clickhouse_mysql/reader/reader.py | 3 +++ clickhouse_mysql/writer/poolwriter.py | 8 ++++-- clickhouse_mysql/writer/tbcsvwriter.py | 35 +++++++++++++------------- setup.py | 2 ++ 6 files changed, 56 insertions(+), 29 deletions(-) diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index 0b5d0c3..b1ec9df 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -2,18 +2,26 @@ # -*- coding: utf-8 -*- +from clickhouse_mysql.reader.reader import Reader +from clickhouse_mysql.writer.writer import Writer +import signal + + class Pumper(object): """ Pump data - read data from reader and push into writer """ - reader = None - writer = None + reader: Reader = None + writer: Writer = None def __init__(self, reader=None, writer=None): self.reader = reader self.writer = writer + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + if self.reader: # subscribe on reader's event notifications self.reader.subscribe({ @@ -60,6 +68,10 @@ def update_rows_event(self, event=None): :param event: """ self.writer.update(event) + + def exit_gracefully(self): + self.reader.close() + self.writer.close() if __name__ == '__main__': diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index 040bac0..79fb0d2 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -29,6 +29,7 @@ class MySQLReader(Reader): resume_stream = None binlog_stream = None nice_pause = 0 + exit_gracefully = False write_rows_event_num = 0 write_rows_event_each_row_num = 0; @@ -389,7 +390,7 @@ def read(self): # fetch events try: - while True: + while not self.exit_gracefully: logging.debug('Check events in binlog stream') self.init_fetch_loop() @@ -423,10 +424,6 @@ def read(self): # after event processed, we need to handle current binlog position self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos) - except KeyboardInterrupt: - # pass SIGINT further - logging.info("SIGINT received. Pass it further.") - raise except Exception as ex: if self.blocking: # we'd like to continue waiting for data @@ -454,8 +451,6 @@ def read(self): self.notify('ReaderIdleEvent') - except KeyboardInterrupt: - logging.info("SIGINT received. Time to exit.") except Exception as ex: logging.warning("Got an exception, handle it") logging.warning(ex) @@ -473,6 +468,18 @@ def read(self): logging.info('len %d', end_timestamp - self.start_timestamp) + def close(self): + self.exit_gracefully = True + try: + self.binlog_stream.close() + except Exception as ex: + logging.warning("Unable to close binlog stream correctly") + logging.warning(ex) + + logging.info("MySQL reader closed") + + + if __name__ == '__main__': connection_settings = { 'host': '127.0.0.1', diff --git a/clickhouse_mysql/reader/reader.py b/clickhouse_mysql/reader/reader.py index c4f5246..107d04a 100644 --- a/clickhouse_mysql/reader/reader.py +++ b/clickhouse_mysql/reader/reader.py @@ -33,3 +33,6 @@ def __init__(self, converter=None, callbacks={}): def read(self): pass + + def close(self): + pass diff --git a/clickhouse_mysql/writer/poolwriter.py b/clickhouse_mysql/writer/poolwriter.py index 071a6ef..5c06fc4 100644 --- a/clickhouse_mysql/writer/poolwriter.py +++ b/clickhouse_mysql/writer/poolwriter.py @@ -37,7 +37,6 @@ def insert(self, event_or_events): logging.debug('class:%s insert', __class__) self.pool.insert(event_or_events) - # TODO delete if delete_row works def delete(self, event_or_events): """Insert delete data into Pool""" @@ -54,10 +53,15 @@ def update(self, event_or_events): logging.debug('class:%s update', __class__) self.pool.insert(event_or_events) - def flush(self): self.pool.flush() + + def close(self): + self.pool.flush() + logging.info("Closed PoolWriter") + + if __name__ == '__main__': path = 'file.csv' diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py index f2a6d50..dca3819 100644 --- a/clickhouse_mysql/writer/tbcsvwriter.py +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -77,24 +77,23 @@ def insert(self, event_or_events=None): 'mode': 'append' } - f = open(event.filename, 'rb') - m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')}) - - url = f"{self.tb_host}/v0/datasources" - - response = requests.post(url, data=m, - headers={'Authorization': 'Bearer ' + self.tb_token, 'Content-Type': m.content_type}, - params=params - ) - - # logging.debug(response.text) - if response.status_code == 200: - json_object = json.loads(response.content) - logging.debug(f"Import id: {json_object['import_id']}") - # logging.debug(f"Response: {json.dumps(json_object, indent=2)}") - - else: - logging.debug(f"ERROR {response.text}") + with open(event.filename, 'rb') as f: + m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')}) + url = f"{self.tb_host}/v0/datasources" + + response = requests.post(url, data=m, + headers={'Authorization': 'Bearer ' + self.tb_token, 'Content-Type': m.content_type}, + params=params + ) + + # logging.debug(response.text) + if response.status_code == 200: + json_object = json.loads(response.content) + logging.debug(f"Import id: {json_object['import_id']}") + # logging.debug(f"Response: {json.dumps(json_object, indent=2)}") + + else: + logging.debug(f"ERROR {response.text}") pass diff --git a/setup.py b/setup.py index f5be528..a8b39e2 100644 --- a/setup.py +++ b/setup.py @@ -79,6 +79,8 @@ 'clickhouse-driver', 'configobj', 'setuptools', + 'requests_toolbelt', + 'requests' ], # cross-platform support for pip to create the appropriate form of executable From 9ee1ddf6c4b9ae0d70124d3624af70857b7718b3 Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Tue, 7 Sep 2021 20:24:29 +0300 Subject: [PATCH 04/18] Moved signal handler to main --- clickhouse_mysql/main.py | 5 +++++ clickhouse_mysql/pumper.py | 3 --- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/clickhouse_mysql/main.py b/clickhouse_mysql/main.py index 662fd4d..d751573 100644 --- a/clickhouse_mysql/main.py +++ b/clickhouse_mysql/main.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import signal import sys import multiprocessing as mp import logging @@ -145,6 +146,10 @@ def run(self): reader=self.config.reader(), writer=self.config.writer(), ) + + signal.signal(signal.SIGINT, pumper.exit_gracefully) + signal.signal(signal.SIGTERM, pumper.exit_gracefully) + pumper.run() except Exception as ex: diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index b1ec9df..6aadef9 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -19,9 +19,6 @@ def __init__(self, reader=None, writer=None): self.reader = reader self.writer = writer - signal.signal(signal.SIGINT, self.exit_gracefully) - signal.signal(signal.SIGTERM, self.exit_gracefully) - if self.reader: # subscribe on reader's event notifications self.reader.subscribe({ From 60e732ed13b3f1efef9a9a815cc49cfb90b2e23f Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 8 Sep 2021 10:52:46 +0300 Subject: [PATCH 05/18] Fix issue with MySQL graceful exit --- clickhouse_mysql/pumper.py | 2 +- clickhouse_mysql/reader/mysqlreader.py | 18 ++++++------------ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index 6aadef9..7cad8e3 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -66,7 +66,7 @@ def update_rows_event(self, event=None): """ self.writer.update(event) - def exit_gracefully(self): + def exit_gracefully(self, sig, frame): self.reader.close() self.writer.close() diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index 79fb0d2..71f0e9a 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -429,11 +429,11 @@ def read(self): # we'd like to continue waiting for data # report and continue cycle logging.warning("Got an exception, skip it in blocking mode") - logging.warning(ex) + logging.exception(ex) else: # do not continue, report error and exit logging.critical("Got an exception, abort it in non-blocking mode") - logging.critical(ex) + logging.exception(ex) sys.exit(1) # all events fetched (or none of them available) @@ -450,16 +450,16 @@ def read(self): time.sleep(self.nice_pause) self.notify('ReaderIdleEvent') - except Exception as ex: logging.warning("Got an exception, handle it") - logging.warning(ex) + logging.exception(ex) try: self.binlog_stream.close() + logging.info("Stop reading from MySQL") except Exception as ex: logging.warning("Unable to close binlog stream correctly") - logging.warning(ex) + logging.exception(ex) end_timestamp = int(time.time()) @@ -470,13 +470,7 @@ def read(self): def close(self): self.exit_gracefully = True - try: - self.binlog_stream.close() - except Exception as ex: - logging.warning("Unable to close binlog stream correctly") - logging.warning(ex) - - logging.info("MySQL reader closed") + logging.info("MySQL should stop in the next loop") From 522ef1e17df6b4a66463dc85fb6ecc2237a771c1 Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 8 Sep 2021 12:29:04 +0300 Subject: [PATCH 06/18] Add retry in case of 429 --- clickhouse_mysql/writer/tbcsvwriter.py | 55 +++++++++++++++----------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py index dca3819..b490bc5 100644 --- a/clickhouse_mysql/writer/tbcsvwriter.py +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -4,6 +4,7 @@ import os import logging import shlex +import time from clickhouse_mysql.writer.writer import Writer from clickhouse_mysql.tableprocessor import TableProcessor @@ -50,6 +51,37 @@ def __init__( self.dst_table_prefix = dst_table_prefix self.dst_distribute = dst_distribute + + def uploadCSV(self, table, filename): + params = { + 'name': table, + 'mode': 'append' + } + + with open(filename, 'rb') as f: + m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')}) + url = f"{self.tb_host}/v0/datasources" + + response = requests.post(url, data=m, + headers={'Authorization': 'Bearer ' + self.tb_token, 'Content-Type': m.content_type}, + params=params + ) + + # logging.debug(response.text) + if response.status_code == 200: + json_object = json.loads(response.content) + logging.debug(f"Import id: {json_object['import_id']}") + elif response.status_code == 429: + logging.error(f"Too many requests retrying in {response.headers['Retry-After']} seconds", response) + time.sleep(response.headers['Retry-After']) + self.uploadCSV(table, filename) + + else: + logging.error(response.text) + + + + def insert(self, event_or_events=None): # event_or_events = [ # event: { @@ -72,28 +104,7 @@ def insert(self, event_or_events=None): for event in events: #schema = self.dst_schema if self.dst_schema else event.schema table = self.dst_table if self.dst_table else event.table - params = { - 'name': table, - 'mode': 'append' - } - - with open(event.filename, 'rb') as f: - m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')}) - url = f"{self.tb_host}/v0/datasources" - - response = requests.post(url, data=m, - headers={'Authorization': 'Bearer ' + self.tb_token, 'Content-Type': m.content_type}, - params=params - ) - - # logging.debug(response.text) - if response.status_code == 200: - json_object = json.loads(response.content) - logging.debug(f"Import id: {json_object['import_id']}") - # logging.debug(f"Response: {json.dumps(json_object, indent=2)}") - - else: - logging.debug(f"ERROR {response.text}") + self.uploadCSV(table, event.filename) pass From 28d442ef205c36e52151714af7ff5939b1c281ac Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 8 Sep 2021 12:51:02 +0300 Subject: [PATCH 07/18] Graceful exit only stops reader --- clickhouse_mysql/pumper.py | 1 - clickhouse_mysql/reader/mysqlreader.py | 1 + clickhouse_mysql/writer/poolwriter.py | 5 ----- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index 7cad8e3..ffffc83 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -68,7 +68,6 @@ def update_rows_event(self, event=None): def exit_gracefully(self, sig, frame): self.reader.close() - self.writer.close() if __name__ == '__main__': diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index 71f0e9a..3149489 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -470,6 +470,7 @@ def read(self): def close(self): self.exit_gracefully = True + self.nice_pause = 0 logging.info("MySQL should stop in the next loop") diff --git a/clickhouse_mysql/writer/poolwriter.py b/clickhouse_mysql/writer/poolwriter.py index 5c06fc4..303ed84 100644 --- a/clickhouse_mysql/writer/poolwriter.py +++ b/clickhouse_mysql/writer/poolwriter.py @@ -57,11 +57,6 @@ def flush(self): self.pool.flush() - def close(self): - self.pool.flush() - logging.info("Closed PoolWriter") - - if __name__ == '__main__': path = 'file.csv' From 2ab8dd95211725c0697ecf1eab426b800d8b3f1d Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 8 Sep 2021 13:29:56 +0300 Subject: [PATCH 08/18] cast retry-after to int --- clickhouse_mysql/writer/tbcsvwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py index b490bc5..f3cc194 100644 --- a/clickhouse_mysql/writer/tbcsvwriter.py +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -73,7 +73,7 @@ def uploadCSV(self, table, filename): logging.debug(f"Import id: {json_object['import_id']}") elif response.status_code == 429: logging.error(f"Too many requests retrying in {response.headers['Retry-After']} seconds", response) - time.sleep(response.headers['Retry-After']) + time.sleep(int(response.headers['Retry-After'])) self.uploadCSV(table, filename) else: From 684fc7765b3864f7df121ebc8002058f0a1b76c6 Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 8 Sep 2021 14:33:15 +0300 Subject: [PATCH 09/18] Improve retry in TB CSV --- clickhouse_mysql/writer/tbcsvwriter.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py index f3cc194..1803ff9 100644 --- a/clickhouse_mysql/writer/tbcsvwriter.py +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -52,7 +52,8 @@ def __init__( self.dst_distribute = dst_distribute - def uploadCSV(self, table, filename): + def uploadCSV(self, table, filename, tries=1): + limit_of_retries=3 params = { 'name': table, 'mode': 'append' @@ -68,19 +69,23 @@ def uploadCSV(self, table, filename): ) # logging.debug(response.text) + logging.info(response.json()) if response.status_code == 200: json_object = json.loads(response.content) logging.debug(f"Import id: {json_object['import_id']}") elif response.status_code == 429: - logging.error(f"Too many requests retrying in {response.headers['Retry-After']} seconds", response) - time.sleep(int(response.headers['Retry-After'])) - self.uploadCSV(table, filename) - - else: - logging.error(response.text) - - - + retry_after = int(response.headers['Retry-After']) + tries + logging.error(f"Too many requests retrying in {retry_after} seconds to upload {filename } to {table}") + time.sleep(retry_after) + self.uploadCSV(table, filename, tries+1) + else: + # In case of error let's retry only + logging.exception(response.json()) + time.sleep(tries) + logging.info(f"Retrying { tries } of { limit_of_retries }") + if tries > limit_of_retries: + return + self.uploadCSV(self, table, filename, tries + 1) def insert(self, event_or_events=None): # event_or_events = [ From f0b47b04097f54467959fda4728f09e0b81f8b38 Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 8 Sep 2021 16:58:08 +0300 Subject: [PATCH 10/18] If received SIGINT break the loop --- clickhouse_mysql/reader/mysqlreader.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index 3149489..4286792 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -404,8 +404,9 @@ def read(self): # fetch available events from MySQL for mysql_event in self.binlog_stream: - # new event has come - # check what to do with it + + if self.exit_gracefully: + break logging.debug( 'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos)) From 0c1a4b92251f7ea69c7c0c036e31fd3af94e4b4e Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 8 Sep 2021 17:09:31 +0300 Subject: [PATCH 11/18] Improve logging when reading binlog pos --- clickhouse_mysql/config.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/clickhouse_mysql/config.py b/clickhouse_mysql/config.py index 3103c87..929cf5b 100644 --- a/clickhouse_mysql/config.py +++ b/clickhouse_mysql/config.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import logging from clickhouse_mysql.reader.mysqlreader import MySQLReader from clickhouse_mysql.reader.csvreader import CSVReader @@ -50,10 +51,11 @@ def __init__(self): log_file, log_pos )) - except: + except Exception as e: + logging.exception(e) log_file = None log_pos = None - print("can't read binlog position from file {}".format( + logging.info("can't read binlog position from file {}".format( self.options['binlog_position_file'], )) # build application config out of aggregated options From 6e8c047b9634a14cd2722f2a43122e9603a5e70d Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 8 Sep 2021 17:51:42 +0300 Subject: [PATCH 12/18] Validate that binlog file exist before reading it --- clickhouse_mysql/config.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clickhouse_mysql/config.py b/clickhouse_mysql/config.py index 929cf5b..217aa0b 100644 --- a/clickhouse_mysql/config.py +++ b/clickhouse_mysql/config.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import logging +import os from clickhouse_mysql.reader.mysqlreader import MySQLReader from clickhouse_mysql.reader.csvreader import CSVReader @@ -40,7 +41,7 @@ def __init__(self): log_file = None log_pos = None - if self.options['binlog_position_file'] and self.options.get_bool('src_resume'): + if self.options['binlog_position_file'] and self.options.get_bool('src_resume') and os.path.exists(self.options['binlog_position_file']): try: with open(self.options['binlog_position_file'], 'r') as f: position = f.read() @@ -52,9 +53,9 @@ def __init__(self): log_pos )) except Exception as e: - logging.exception(e) log_file = None log_pos = None + logging.exception(e) logging.info("can't read binlog position from file {}".format( self.options['binlog_position_file'], )) From 03afe7c621a3df76a07e2d41e0febc200eff12a8 Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Mon, 13 Sep 2021 13:18:35 +0300 Subject: [PATCH 13/18] remove local and test script --- run-local.sh | 1 - run-test.sh | 1 - 2 files changed, 2 deletions(-) delete mode 100755 run-local.sh delete mode 100755 run-test.sh diff --git a/run-local.sh b/run-local.sh deleted file mode 100755 index 55d6224..0000000 --- a/run-local.sh +++ /dev/null @@ -1 +0,0 @@ -clickhouse-mysql --src-server=33 --src-wait --src-resume --binlog-position-file=bl-pos-schedulings --nice-pause=1 --src-host=127.0.1 --src-port=3307 --src-user=tinybird --src-password=eequush8Oo3aip1Eenae --src-schemas=movida_production --src-tables=schedulings --dst-host=127.0.0.1 --dst-schema=d_66a1b9 --dst-table=t_dc3499dc95544d98888ab410855356ba --log-level=debug --pump-data --csvpool --dst-file=/home/ygnuss/tinybird/dev/clickhouse-mysql-data-reader/sched-log.csv \ No newline at end of file diff --git a/run-test.sh b/run-test.sh deleted file mode 100755 index 791d109..0000000 --- a/run-test.sh +++ /dev/null @@ -1 +0,0 @@ -(clickhouse-mysql --src-server=33 --src-wait --src-resume --binlog-position-file=bl-pos-schedulings --nice-pause=1 --src-host=127.0.1 --src-port=3307 --src-user=tinybird --src-password=eequush8Oo3aip1Eenae --src-schemas=movida_production --src-tables=schedulings --dst-host=127.0.0.1 --dst-schema=d_66a1b9 --dst-table=t_dc3499dc95544d98888ab410855356ba --log-level=debug --pump-data --mempool --mempool-max-flush-interval=60 --mempool_max_rows_num=1000 --csvpool --dst-file=/home/deploy/clickhouse-mysql-data-reader/sched-log.csv 2>> out-listener.log)& From 34c53c05252e67e06d094d41757d65c29bcbd510 Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Tue, 14 Sep 2021 14:50:54 +0300 Subject: [PATCH 14/18] Added try/catch in upload to retry in case of error --- clickhouse_mysql/writer/tbcsvwriter.py | 80 +++++++++++++++----------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py index 1803ff9..f7a28f6 100644 --- a/clickhouse_mysql/writer/tbcsvwriter.py +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -1,18 +1,16 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os import logging -import shlex import time from clickhouse_mysql.writer.writer import Writer -from clickhouse_mysql.tableprocessor import TableProcessor import requests from requests_toolbelt.multipart.encoder import MultipartEncoder import json + class TBCSVWriter(Writer): """Write into Tinybird via CSV file""" @@ -43,7 +41,8 @@ def __init__( self.tb_token = tb_token if self.tb_host is None or self.tb_token is None: - logging.critical(f" Host: {self.tb_host} or token {self.tb_token} is missing") + logging.critical( + f" Host: {self.tb_host} or token {self.tb_token} is missing") return None self.dst_schema = dst_schema @@ -51,41 +50,54 @@ def __init__( self.dst_table_prefix = dst_table_prefix self.dst_distribute = dst_distribute - def uploadCSV(self, table, filename, tries=1): - limit_of_retries=3 + limit_of_retries = 3 params = { 'name': table, 'mode': 'append' } - with open(filename, 'rb') as f: - m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')}) - url = f"{self.tb_host}/v0/datasources" - - response = requests.post(url, data=m, - headers={'Authorization': 'Bearer ' + self.tb_token, 'Content-Type': m.content_type}, - params=params - ) - - # logging.debug(response.text) - logging.info(response.json()) - if response.status_code == 200: - json_object = json.loads(response.content) - logging.debug(f"Import id: {json_object['import_id']}") - elif response.status_code == 429: - retry_after = int(response.headers['Retry-After']) + tries - logging.error(f"Too many requests retrying in {retry_after} seconds to upload {filename } to {table}") - time.sleep(retry_after) - self.uploadCSV(table, filename, tries+1) - else: - # In case of error let's retry only - logging.exception(response.json()) - time.sleep(tries) - logging.info(f"Retrying { tries } of { limit_of_retries }") - if tries > limit_of_retries: - return - self.uploadCSV(self, table, filename, tries + 1) + try: + with open(filename, 'rb') as f: + m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')}) + url = f"{self.tb_host}/v0/datasources" + + response = requests.post( + url, + data=m, + headers={ + 'Authorization': 'Bearer ' + self.tb_token, + 'Content-Type': m.content_type + }, + params=params) + + # logging.debug(response.text) + logging.info(response.json()) + if response.status_code == 200: + json_object = json.loads(response.content) + logging.debug(f"Import id: {json_object['import_id']}") + elif response.status_code == 429: + retry_after = int(response.headers['Retry-After']) + tries + logging.error( + f"Too many requests retrying in {retry_after} seconds to upload {filename } to {table}") + time.sleep(retry_after) + self.uploadCSV(table, filename, tries+1) + else: + # In case of error let's retry only + logging.exception(response.json()) + time.sleep(tries) + logging.info(f"Retrying { tries } of { limit_of_retries }") + if tries > limit_of_retries: + return + self.uploadCSV(table, filename, tries + 1) + except Exception as e: + logging.exception(e) + # We wait tries^2 sec to try again + time.sleep(tries * tries) + logging.info(f"Retrying { tries } of { limit_of_retries }") + if tries > limit_of_retries: + return + self.uploadCSV(table, filename, tries + 1) def insert(self, event_or_events=None): # event_or_events = [ @@ -166,7 +178,7 @@ def deleteRow(self, event_or_events=None): # logging.info('starting clickhouse-client process for delete operation') # logging.debug('starting %s', bash) # os.system(bash) - + logging.debug("CHCSVWriter: delete row") pass From be4ee99bdd4f88e5956da3ce3a2a1b1b1a9768a9 Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Thu, 23 Sep 2021 10:22:14 +0300 Subject: [PATCH 15/18] Changed CSV writer to use QUOTE_ALL --- clickhouse_mysql/writer/csvwriter.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/clickhouse_mysql/writer/csvwriter.py b/clickhouse_mysql/writer/csvwriter.py index 00c6eaf..c0e14a1 100644 --- a/clickhouse_mysql/writer/csvwriter.py +++ b/clickhouse_mysql/writer/csvwriter.py @@ -4,7 +4,6 @@ import csv import os.path import logging -import copy import time import uuid @@ -136,9 +135,9 @@ def insert(self, event_or_events): if self.dst_table is None: self.dst_table = event.table - self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) + self.writer = csv.writer(self.file, quoting=csv.QUOTE_ALL) if not self.header_written: - self.writer.writeheader() + self.writer.writerow(self.fieldnames) for event in events: if not event.verify: @@ -190,9 +189,9 @@ def delete_row(self, event_or_events): if self.dst_table is None: self.dst_table = event.table - self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) + self.writer = csv.writer(self.file, quoting=csv.QUOTE_ALL) if not self.header_written: - self.writer.writeheader() + self.writer.writerow(self.fieldnames) for event in events: if not event.verify: @@ -253,9 +252,9 @@ def update(self, event_or_events): if self.dst_table is None: self.dst_table = event.table - self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) + self.writer = csv.writer(self.file, quoting=csv.QUOTE_ALL) if not self.header_written: - self.writer.writeheader() + self.writer.writerow(self.fieldnames) for event in events: if not event.verify: From 72a10e312bada8c4c6a22449fa05f5c9bfe9a474 Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Thu, 23 Sep 2021 11:02:10 +0300 Subject: [PATCH 16/18] Undo last change and add QUOTE_ALL to DictWriter --- clickhouse_mysql/writer/csvwriter.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/clickhouse_mysql/writer/csvwriter.py b/clickhouse_mysql/writer/csvwriter.py index c0e14a1..b9cf762 100644 --- a/clickhouse_mysql/writer/csvwriter.py +++ b/clickhouse_mysql/writer/csvwriter.py @@ -4,6 +4,7 @@ import csv import os.path import logging +import copy import time import uuid @@ -135,9 +136,9 @@ def insert(self, event_or_events): if self.dst_table is None: self.dst_table = event.table - self.writer = csv.writer(self.file, quoting=csv.QUOTE_ALL) + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) if not self.header_written: - self.writer.writerow(self.fieldnames) + self.writer.writeheader() for event in events: if not event.verify: @@ -189,9 +190,9 @@ def delete_row(self, event_or_events): if self.dst_table is None: self.dst_table = event.table - self.writer = csv.writer(self.file, quoting=csv.QUOTE_ALL) + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) if not self.header_written: - self.writer.writerow(self.fieldnames) + self.writer.writeheader() for event in events: if not event.verify: @@ -252,9 +253,9 @@ def update(self, event_or_events): if self.dst_table is None: self.dst_table = event.table - self.writer = csv.writer(self.file, quoting=csv.QUOTE_ALL) + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) if not self.header_written: - self.writer.writerow(self.fieldnames) + self.writer.writeheader() for event in events: if not event.verify: From 3389ec8380a6a3e9315fdc2b878ffd3d8fd66b4a Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Sat, 25 Sep 2021 18:12:49 +0300 Subject: [PATCH 17/18] Avoiding flushing with every round --- .flake8 | 13 +++++++++++++ clickhouse_mysql/pumper.py | 4 ++-- clickhouse_mysql/reader/mysqlreader.py | 15 ++++++--------- 3 files changed, 21 insertions(+), 11 deletions(-) create mode 100644 .flake8 diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..420a518 --- /dev/null +++ b/.flake8 @@ -0,0 +1,13 @@ +[flake8] +ignore = + ; except + E722, + ; inline regex + W605, + ; long lines + E501, + ; too complex + C901 +max-complexity = 10 +max-line-length = 120 +application-import-names = flake8 \ No newline at end of file diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index ffffc83..245f9c2 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -26,7 +26,7 @@ def __init__(self, reader=None, writer=None): 'UpdateRowsEvent': self.update_rows_event, 'DeleteRowsEvent': self.delete_rows_event, # 'WriteRowsEvent.EachRow': self.write_rows_event_each_row, - 'ReaderIdleEvent': self.reader_idle_event, + # 'ReaderIdleEvent': self.reader_idle_event, }) def run(self): @@ -65,7 +65,7 @@ def update_rows_event(self, event=None): :param event: """ self.writer.update(event) - + def exit_gracefully(self, sig, frame): self.reader.close() diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index 4286792..5cb6c5c 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -12,7 +12,6 @@ from clickhouse_mysql.event.event import Event from clickhouse_mysql.tableprocessor import TableProcessor from clickhouse_mysql.util import Util -from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent class MySQLReader(Reader): @@ -32,7 +31,7 @@ class MySQLReader(Reader): exit_gracefully = False write_rows_event_num = 0 - write_rows_event_each_row_num = 0; + write_rows_event_each_row_num = 0 binlog_position_file = None @@ -323,7 +322,7 @@ def process_update_rows_event(self, mysql_event): # dispatch event to subscribers # statistics - #self.stat_write_rows_event_all_rows(mysql_event=mysql_event) + # self.stat_write_rows_event_all_rows(mysql_event=mysql_event) # dispatch Event event = Event() @@ -356,13 +355,13 @@ def process_delete_rows_event(self, mysql_event): return # statistics - #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) + # self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) if self.subscribers('DeleteRowsEvent'): # dispatch event to subscribers # statistics - #self.stat_write_rows_event_all_rows(mysql_event=mysql_event) + # self.stat_write_rows_event_all_rows(mysql_event=mysql_event) # dispatch Event event = Event() @@ -422,8 +421,8 @@ def read(self): # skip other unhandled events pass - # after event processed, we need to handle current binlog position - self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos) + # after event processed, we need to handle current binlog position + self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos) except Exception as ex: if self.blocking: @@ -468,12 +467,10 @@ def read(self): logging.info('end %d', end_timestamp) logging.info('len %d', end_timestamp - self.start_timestamp) - def close(self): self.exit_gracefully = True self.nice_pause = 0 logging.info("MySQL should stop in the next loop") - if __name__ == '__main__': From 410adb378ca8e001229d6dd62ed0ef2df92ed54b Mon Sep 17 00:00:00 2001 From: Alejandro Del Amo Date: Wed, 6 Oct 2021 12:28:02 +0200 Subject: [PATCH 18/18] Disabled Verify to avoid SSL checking --- clickhouse_mysql/writer/tbcsvwriter.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py index f7a28f6..9684f25 100644 --- a/clickhouse_mysql/writer/tbcsvwriter.py +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -69,7 +69,8 @@ def uploadCSV(self, table, filename, tries=1): 'Authorization': 'Bearer ' + self.tb_token, 'Content-Type': m.content_type }, - params=params) + params=params, + verify=False) # logging.debug(response.text) logging.info(response.json()) @@ -81,7 +82,7 @@ def uploadCSV(self, table, filename, tries=1): logging.error( f"Too many requests retrying in {retry_after} seconds to upload {filename } to {table}") time.sleep(retry_after) - self.uploadCSV(table, filename, tries+1) + self.uploadCSV(table, filename, tries + 1) else: # In case of error let's retry only logging.exception(response.json())