From 749efa813711f37d87a9623eb6c2943c58342302 Mon Sep 17 00:00:00 2001 From: ygnuss Date: Tue, 9 Mar 2021 13:58:48 +0100 Subject: [PATCH 01/12] Initial support for updates --- clickhouse_mysql/event/event.py | 3 + clickhouse_mysql/pool/bbpool.py | 14 +- clickhouse_mysql/pumper.py | 18 ++- clickhouse_mysql/reader/mysqlreader.py | 120 ++++++++++++++--- clickhouse_mysql/reader/reader.py | 7 + clickhouse_mysql/writer/chcsvwriter.py | 124 ++++++++++++++++- clickhouse_mysql/writer/chwriter.py | 164 +++++++++++++++++++++++ clickhouse_mysql/writer/csvwriter.py | 12 ++ clickhouse_mysql/writer/poolwriter.py | 12 ++ clickhouse_mysql/writer/processwriter.py | 66 +++++++++ clickhouse_mysql/writer/writer.py | 22 +++ 11 files changed, 539 insertions(+), 23 deletions(-) diff --git a/clickhouse_mysql/event/event.py b/clickhouse_mysql/event/event.py index 836f3d2..e018e57 100644 --- a/clickhouse_mysql/event/event.py +++ b/clickhouse_mysql/event/event.py @@ -28,6 +28,9 @@ class Event(object): # table name table = None + # primary key + primary_key = None + # /path/to/csv/file.csv filename = None diff --git a/clickhouse_mysql/pool/bbpool.py b/clickhouse_mysql/pool/bbpool.py index f15c268..c36265b 100644 --- a/clickhouse_mysql/pool/bbpool.py +++ b/clickhouse_mysql/pool/bbpool.py @@ -6,6 +6,7 @@ from clickhouse_mysql.pool.pool import Pool from clickhouse_mysql.objectbuilder import ObjectBuilder +from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent # Buckets Belts' Index Generator @@ -149,7 +150,18 @@ def rotate_belt(self, belt_index, flush=False): # time to flush data for specified key #self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)]) writer = self.writer_builder.new() - writer.insert(self.belts[belt_index].pop()) + item = self.belts[belt_index].pop() + # process event based on its type + if isinstance(item[0].pymysqlreplication_event, WriteRowsEvent): + writer.insert(item) + elif isinstance(item[0].pymysqlreplication_event, DeleteRowsEvent): + writer.delete(item) + elif isinstance(item[0].pymysqlreplication_event, UpdateRowsEvent): + writer.update(item) + else: + # skip other unhandled events + pass + # writer.insert(self.belts[belt_index].pop()) writer.close() writer.push() writer.destroy() diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index e75bc34..a868938 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -11,7 +11,6 @@ class Pumper(object): writer = None def __init__(self, reader=None, writer=None): - self.reader = reader self.writer = writer @@ -19,6 +18,8 @@ def __init__(self, reader=None, writer=None): # subscribe on reader's event notifications self.reader.subscribe({ 'WriteRowsEvent': self.write_rows_event, + 'UpdateRowsEvent': self.update_rows_event, + 'DeleteRowsEvent': self.delete_rows_event, # 'WriteRowsEvent.EachRow': self.write_rows_event_each_row, 'ReaderIdleEvent': self.reader_idle_event, }) @@ -46,5 +47,20 @@ def reader_idle_event(self): """ self.writer.flush() + def delete_rows_event(self, event=None): + """ + DeleteRowsEvent handler + :param event: + """ + self.writer.delete(event) + + def update_rows_event(self, event=None): + """ + UpdateRowsEvent handler + :param event: + """ + self.writer.update(event) + + if __name__ == '__main__': print("pumper") diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index f21e8b1..659ab77 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -12,7 +12,7 @@ from clickhouse_mysql.event.event import Event from clickhouse_mysql.tableprocessor import TableProcessor from clickhouse_mysql.util import Util -#from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent +from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent class MySQLReader(Reader): @@ -56,13 +56,15 @@ def __init__( self.server_id = server_id self.log_file = log_file self.log_pos = log_pos - self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables, tables_prefixes)) else TableProcessor.extract_dbs(schemas, Util.join_lists(tables, tables_prefixes)) + self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables, + tables_prefixes)) else TableProcessor.extract_dbs( + schemas, Util.join_lists(tables, tables_prefixes)) self.tables = None if tables is None else TableProcessor.extract_tables(tables) self.tables_prefixes = None if tables_prefixes is None else TableProcessor.extract_tables(tables_prefixes) self.blocking = blocking self.resume_stream = resume_stream self.nice_pause = nice_pause - self.binlog_position_file=binlog_position_file + self.binlog_position_file = binlog_position_file logging.info("raw dbs list. len()=%d", 0 if schemas is None else len(schemas)) if schemas is not None: @@ -86,7 +88,8 @@ def __init__( if tables_prefixes is not None: for table in tables_prefixes: logging.info(table) - logging.info("normalised tables-prefixes list. len()=%d", 0 if self.tables_prefixes is None else len(self.tables_prefixes)) + logging.info("normalised tables-prefixes list. len()=%d", + 0 if self.tables_prefixes is None else len(self.tables_prefixes)) if self.tables_prefixes is not None: for table in self.tables_prefixes: logging.info(table) @@ -101,21 +104,21 @@ def __init__( # we are interested in reading CH-repeatable events only only_events=[ # Possible events - #BeginLoadQueryEvent, + # BeginLoadQueryEvent, DeleteRowsEvent, - #ExecuteLoadQueryEvent, - #FormatDescriptionEvent, - #GtidEvent, - #HeartbeatLogEvent, - #IntvarEvent - #NotImplementedEvent, - #QueryEvent, - #RotateEvent, - #StopEvent, - #TableMapEvent, + # ExecuteLoadQueryEvent, + # FormatDescriptionEvent, + # GtidEvent, + # HeartbeatLogEvent, + # IntvarEvent + # NotImplementedEvent, + # QueryEvent, + # RotateEvent, + # StopEvent, + # TableMapEvent, UpdateRowsEvent, WriteRowsEvent, - #XidEvent, + # XidEvent, ], only_schemas=self.schemas, # in case we have any prefixes - this means we need to listen to all tables within specified schemas @@ -245,6 +248,9 @@ def process_write_rows_event(self, mysql_event): :param mysql_event: WriteRowsEvent instance :return: """ + + logging.debug("Received insert event for table: " + mysql_event.table) + if self.tables_prefixes: # we have prefixes specified # need to find whether current event is produced by table in 'looking-into-tables' list @@ -294,10 +300,81 @@ def process_write_rows_event(self, mysql_event): self.stat_write_rows_event_finalyse() def process_update_rows_event(self, mysql_event): - logging.info("Skip update rows") + + logging.debug("Received update event for table: " + mysql_event.table + " Schema: " + mysql_event.schema) + + # for row in mysql_event.rows: + # for key in row['before_values']: + # logging.debug("\t *%s:%s=>%s" % (key, row["before_values"][key], row["after_values"][key])) + + if self.tables_prefixes: + # we have prefixes specified + # need to find whether current event is produced by table in 'looking-into-tables' list + if not self.is_table_listened(mysql_event.table): + # this table is not listened + # processing is over - just skip event + return + + # statistics + #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) + + if self.subscribers('UpdateRowsEvent'): + # dispatch event to subscribers + + # statistics + #self.stat_write_rows_event_all_rows(mysql_event=mysql_event) + + # dispatch Event + event = Event() + event.schema = mysql_event.schema + event.table = mysql_event.table + event.pymysqlreplication_event = mysql_event + + #self.process_first_event(event=event) + self.notify('UpdateRowsEvent', event=event) + + # self.stat_write_rows_event_finalyse() + + # logging.info("Skip update rows") def process_delete_rows_event(self, mysql_event): - logging.info("Skip delete rows") + logging.debug("Received delete event for table: " + mysql_event.table) + + """ + for row in mysql_event.rows: + for key in row['values']: + logging.debug("\t *", key, ":", row["values"][key]) + """ + + if self.tables_prefixes: + # we have prefixes specified + # need to find whether current event is produced by table in 'looking-into-tables' list + if not self.is_table_listened(mysql_event.table): + # this table is not listened + # processing is over - just skip event + return + + # statistics + #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) + + if self.subscribers('DeleteRowsEvent'): + # dispatch event to subscribers + + # statistics + #self.stat_write_rows_event_all_rows(mysql_event=mysql_event) + + # dispatch Event + event = Event() + event.schema = mysql_event.schema + event.table = mysql_event.table + event.pymysqlreplication_event = mysql_event + + self.process_first_event(event=event) + self.notify('DeleteRowsEvent', event=event) + + # self.stat_write_rows_event_finalyse() + + # logging.info("Skip delete rows") def process_binlog_position(self, file, pos): if self.binlog_position_file: @@ -321,14 +398,16 @@ def read(self): self.stat_init_fetch_loop() try: - logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef") + logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str( + self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef") # fetch available events from MySQL for mysql_event in self.binlog_stream: # new event has come # check what to do with it - logging.debug('Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos)) + logging.debug( + 'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos)) # process event based on its type if isinstance(mysql_event, WriteRowsEvent): @@ -393,6 +472,7 @@ def read(self): logging.info('end %d', end_timestamp) logging.info('len %d', end_timestamp - self.start_timestamp) + if __name__ == '__main__': connection_settings = { 'host': '127.0.0.1', diff --git a/clickhouse_mysql/reader/reader.py b/clickhouse_mysql/reader/reader.py index 379cf5f..c4f5246 100644 --- a/clickhouse_mysql/reader/reader.py +++ b/clickhouse_mysql/reader/reader.py @@ -18,6 +18,13 @@ class Reader(Observable): # called when Reader has no data to read 'ReaderIdleEvent': [], + + # called on each DeleteRowsEvent + 'DeleteRowsEvent': [], + + # called on each UpdateRowsEvent + 'UpdateRowsEvent': [], + } def __init__(self, converter=None, callbacks={}): diff --git a/clickhouse_mysql/writer/chcsvwriter.py b/clickhouse_mysql/writer/chcsvwriter.py index caea56e..88571c3 100644 --- a/clickhouse_mysql/writer/chcsvwriter.py +++ b/clickhouse_mysql/writer/chcsvwriter.py @@ -33,7 +33,9 @@ def __init__( dst_schema += "_all" if dst_distribute and dst_table is not None: dst_table += "_all" - logging.info("CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, dst_table)) + logging.info( + "CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, + dst_table)) self.host = connection_settings['host'] self.port = connection_settings['port'] self.user = connection_settings['user'] @@ -98,3 +100,123 @@ def insert(self, event_or_events=None): os.system(bash) pass + + def deleteRow(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to delete. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s delete %d rows', __class__, len(events)) + + for event in events: + schema = self.dst_schema if self.dst_schema else event.schema + table = None + if self.dst_distribute: + table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) + else: + table = self.dst_table if self.dst_table else event.table + if self.dst_schema: + table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( + schema, + table, + ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + ) + + choptions = "" + if self.host: + choptions += " --host=" + shlex.quote(self.host) + if self.port: + choptions += " --port=" + str(self.port) + if self.user: + choptions += " --user=" + shlex.quote(self.user) + if self.password: + choptions += " --password=" + shlex.quote(self.password) + bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + event.filename, + choptions, + sql, + ) + + logging.info('starting clickhouse-client process for delete operation') + logging.debug('starting %s', bash) + os.system(bash) + + pass + + def update(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + logging.info('starting clickhouse-client process for update operation') + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to update. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s update %d rows', __class__, len(events)) + + for event in events: + schema = self.dst_schema if self.dst_schema else event.schema + table = None + if self.dst_distribute: + table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) + else: + table = self.dst_table if self.dst_table else event.table + if self.dst_schema: + table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( + schema, + table, + ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + ) + + sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {3}'.format( + schema, + table, + ', '.join(map(lambda column, value: '`%s`=`%s' % column, event.fieldnames, event.fieldnames)) + ) + + choptions = "" + if self.host: + choptions += " --host=" + shlex.quote(self.host) + if self.port: + choptions += " --port=" + str(self.port) + if self.user: + choptions += " --user=" + shlex.quote(self.user) + if self.password: + choptions += " --password=" + shlex.quote(self.password) + bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + event.filename, + choptions, + sql, + ) + + logging.info('starting clickhouse-client process') + logging.debug('starting %s', bash) + os.system(bash) + + pass diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index 587d48f..46ad105 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -112,6 +112,170 @@ def insert(self, event_or_events=None): # all DONE + def deleteRow(self, event_or_events): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + logging.debug("Delete CHWriter") + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to insert. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s delete %d event(s)', __class__, len(events)) + + # verify and converts events and consolidate converted rows from all events into one batch + + rows = [] + event_converted = None + for event in events: + if not event.verify: + logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), + __class__) + continue # for event + + event_converted = self.convert(event) + for row in event_converted: + for key in row.keys(): + # we need to convert Decimal value to str value for suitable for table structure + if type(row[key]) == Decimal: + row[key] = str(row[key]) + rows.append(row) + + logging.debug('class:%s delete %d row(s)', __class__, len(rows)) + + # determine target schema.table + + schema = self.dst_schema if self.dst_schema else event_converted.schema + table = None + if self.dst_distribute: + table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + else: + table = self.dst_table if self.dst_table else event_converted.table + if self.dst_schema: + table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, + self.dst_table)) + + # and DELETE converted rows + + sql = '' + try: + sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( + schema, + table, + ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + ) + self.client.execute(sql, rows) + except Exception as ex: + logging.critical('QUERY FAILED') + logging.critical('ex={}'.format(ex)) + logging.critical('sql={}'.format(sql)) + sys.exit(0) + + # all DONE + + """ + Get string format pattern for update and delete operations + """ + def get_data_format(self, column, value): + t = type(value) + if t == str: + return "`%s`='%s'" % (column, value) + else: + # int, float + return "`%s`=%s" % (column, value) + + def update(self, event_or_events): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + logging.debug("Update CHWriter") + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to update. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s update %d event(s)', __class__, len(events)) + + # verify and converts events and consolidate converted rows from all events into one batch + + rows = [] + event_converted = None + pk = None + for event in events: + if not event.verify: + logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), + __class__) + continue # for event + + event_converted = self.convert(event) + pk = event_converted.pymysqlreplication_event.primary_key + for row in event_converted.pymysqlreplication_event.rows: + for key in row['after_values'].keys(): + # we need to convert Decimal value to str value for suitable for table structure + if type(row['after_values'][key]) == Decimal: + row['after_values'][key] = str(row['after_values'][key]) + rows.append(row) + + logging.debug('class:%s update %d row(s)', __class__, len(rows)) + + # determine target schema.table + + schema = self.dst_schema if self.dst_schema else event_converted.schema + table = None + if self.dst_distribute: + table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + else: + table = self.dst_table if self.dst_table else event_converted.table + if self.dst_schema: + table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, + self.dst_table)) + + # and UPDATE converted rows + + sql = '' + try: + sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2} where {3}'.format( + schema, + table, + ', '.join(filter(None, map(lambda column, value: "" if column == pk else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), + ' and '.join(map(lambda column, value: self.get_data_format(column, value), row['before_values'].keys(), row['before_values'].values())) + ) + + # sql = "ALTER TABLE `test`.`animals` UPDATE `name`='pajaroTO', `position`=1 where `id`=1 and `name`='oso'" + self.client.execute(sql) + except Exception as ex: + logging.critical('QUERY FAILED') + logging.critical('ex={}'.format(ex)) + logging.critical('sql={}'.format(sql)) + # sys.exit(0) + + # all DONE + + + if __name__ == '__main__': connection_settings = { diff --git a/clickhouse_mysql/writer/csvwriter.py b/clickhouse_mysql/writer/csvwriter.py index 4ff9081..18cfda6 100644 --- a/clickhouse_mysql/writer/csvwriter.py +++ b/clickhouse_mysql/writer/csvwriter.py @@ -135,6 +135,18 @@ def insert(self, event_or_events): for row in event: self.writer.writerow(self.convert(row)) + def deleteRow(self, event_or_events): + """ + TODO + """ + logging.debug("Delete CSV Writer") + + def update(self, event_or_events): + """ + TODO + """ + logging.debug("Update CSV Writer") + def push(self): if not self.next_writer_builder or not self.fieldnames: return diff --git a/clickhouse_mysql/writer/poolwriter.py b/clickhouse_mysql/writer/poolwriter.py index b49e011..129f05a 100644 --- a/clickhouse_mysql/writer/poolwriter.py +++ b/clickhouse_mysql/writer/poolwriter.py @@ -37,6 +37,18 @@ def insert(self, event_or_events): logging.debug('class:%s insert', __class__) self.pool.insert(event_or_events) + + def delete(self, event_or_events): + """Insert delete data into Pool""" + logging.debug('class:%s delete', __class__) + self.pool.insert(event_or_events) + + def update(self, event_or_events): + """Insert update data into Pool""" + logging.debug('class:%s update', __class__) + self.pool.insert(event_or_events) + + def flush(self): self.pool.flush() diff --git a/clickhouse_mysql/writer/processwriter.py b/clickhouse_mysql/writer/processwriter.py index 226b72b..8177345 100644 --- a/clickhouse_mysql/writer/processwriter.py +++ b/clickhouse_mysql/writer/processwriter.py @@ -35,6 +35,28 @@ def process(self, event_or_events=None): writer.destroy() logging.debug('class:%s process() done', __class__) + def processDelete(self, event_or_events=None): + """Separate process body to be run""" + + logging.debug('class:%s process()', __class__) + writer = self.next_writer_builder.get() + writer.deleteRow(event_or_events) + writer.close() + writer.push() + writer.destroy() + logging.debug('class:%s process() done', __class__) + + def processUpdate(self, event_or_events=None): + """Separate process body to be run""" + + logging.debug('class:%s process()', __class__) + writer = self.next_writer_builder.get() + writer.delete(event_or_events) + writer.close() + writer.push() + writer.destroy() + logging.debug('class:%s process() done', __class__) + def insert(self, event_or_events=None): # event_or_events = [ # event: { @@ -57,6 +79,50 @@ def insert(self, event_or_events=None): logging.debug('class:%s insert done', __class__) pass + def delete(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + # start separated process with event_or_events to be inserted + + logging.debug('class:%s delete', __class__) + process = mp.Process(target=self.processDelete, args=(event_or_events,)) + + logging.debug('class:%s delete.process.start()', __class__) + process.start() + + #process.join() + logging.debug('class:%s delete done', __class__) + pass + + def update(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + # start separated process with event_or_events to be inserted + + logging.debug('class:%s update', __class__) + process = mp.Process(target=self.processUpdate, args=(event_or_events,)) + + logging.debug('class:%s update.process.start()', __class__) + process.start() + + #process.join() + logging.debug('class:%s update done', __class__) + pass + def flush(self): pass diff --git a/clickhouse_mysql/writer/writer.py b/clickhouse_mysql/writer/writer.py index 11f788c..3be276b 100644 --- a/clickhouse_mysql/writer/writer.py +++ b/clickhouse_mysql/writer/writer.py @@ -55,6 +55,28 @@ def insert(self, event_or_events=None): # ] pass + def update(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + pass + + def delete(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + pass + def flush(self): pass From 31b60bdd0671890632f8b505e676268f94502264 Mon Sep 17 00:00:00 2001 From: ygnuss Date: Wed, 10 Mar 2021 11:43:51 +0100 Subject: [PATCH 02/12] Fixed some errors with null values (None) for update events --- clickhouse_mysql/writer/chwriter.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index 46ad105..5774091 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -10,7 +10,7 @@ from clickhouse_mysql.writer.writer import Writer from clickhouse_mysql.tableprocessor import TableProcessor -from clickhouse_mysql.event.event import Event +import datetime class CHWriter(Writer): @@ -108,7 +108,8 @@ def insert(self, event_or_events=None): logging.critical('QUERY FAILED') logging.critical('ex={}'.format(ex)) logging.critical('sql={}'.format(sql)) - sys.exit(0) + logging.critical('data={}'.format(rows)) + # sys.exit(0) # all DONE @@ -181,7 +182,7 @@ def deleteRow(self, event_or_events): logging.critical('QUERY FAILED') logging.critical('ex={}'.format(ex)) logging.critical('sql={}'.format(sql)) - sys.exit(0) + # sys.exit(0) # all DONE @@ -190,7 +191,7 @@ def deleteRow(self, event_or_events): """ def get_data_format(self, column, value): t = type(value) - if t == str: + if t == str or t is datetime.datetime: return "`%s`='%s'" % (column, value) else: # int, float @@ -260,11 +261,12 @@ def update(self, event_or_events): sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2} where {3}'.format( schema, table, - ', '.join(filter(None, map(lambda column, value: "" if column == pk else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), - ' and '.join(map(lambda column, value: self.get_data_format(column, value), row['before_values'].keys(), row['before_values'].values())) + ', '.join(filter(None, map(lambda column, value: "" if column == pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), + ' and '.join(filter(None, map( + lambda column, value: "" if column != pk or value is None else self.get_data_format(column, value), + row['before_values'].keys(), row['before_values'].values()))) ) - # sql = "ALTER TABLE `test`.`animals` UPDATE `name`='pajaroTO', `position`=1 where `id`=1 and `name`='oso'" self.client.execute(sql) except Exception as ex: logging.critical('QUERY FAILED') From d52d82925fe212c1f58495a46f69227982591095 Mon Sep 17 00:00:00 2001 From: ygnuss Date: Wed, 10 Mar 2021 11:44:09 +0100 Subject: [PATCH 03/12] Added requirements file --- requirements.txt | 108 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bce8d28 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,108 @@ +appdirs==1.4.4 +apturl==0.5.2 +astroid==2.4.2 +basictracer==3.1.0 +blinker==1.4 +Brlapi==0.7.0 +cachetools==4.0.0 +certifi==2020.12.5 +chardet==3.0.4 +chrome-gnome-shell==0.0.0 +Click==7.0 +clickhouse-driver==0.0.10 +clickhouse-toolset @ file:///tmp/clickhouse_toolset-0.9.dev0-cp38-cp38-linux_x86_64.whl +colorama==0.4.3 +command-not-found==0.3 +configobj==5.0.6 +crcmod==1.7 +cryptography==3.0 +cupshelpers==1.0 +datasketch==1.2.10 +dbus-python==1.2.16 +defer==1.0.6 +distlib==0.3.1 +distro==1.5.0 +distro-info===0.23ubuntu1 +dlib==19.16.0 +filelock==3.0.12 +httplib2==0.18.1 +humanfriendly==8.2 +idna==2.6 +importlib-metadata==1.6.0 +isort==5.7.0 +jeepney==0.4.3 +keyring==21.3.0 +language-selector==0.1 +launchpadlib==1.10.13 +lazr.restfulclient==0.14.2 +lazr.uri==1.0.5 +lazy-object-proxy==1.4.3 +louis==3.14.0 +macaroonbakery==1.3.1 +Markdown==3.2.1 +mccabe==0.6.1 +more-itertools==4.2.0 +msal==1.5.0 +netifaces==0.10.4 +numpy==1.15.0 +oauthlib==3.1.0 +olefile==0.46 +opencv-python==4.4.0.46 +opentracing==2.0.0 +passlib==1.7.1 +pexpect==4.6.0 +Pillow==7.2.0 +powerline-status==2.8.1 +protobuf==3.12.3 +psutil==5.6.3 +psycopg2-binary==2.8.5 +pycairo==1.16.2 +pycups==2.0.1 +pycurl==7.43.0.6 +Pygments==2.3.1 +PyGObject==3.38.0 +PyJWT==1.6.4 +pylint==2.6.0 +pymacaroons==0.13.0 +PyMySQL==1.0.2 +PyNaCl==1.4.0 +pyRFC3339==1.1 +python-apt==2.1.3+ubuntu1.3 +python-dateutil==2.8.1 +python-debian==0.1.37 +pytz==2020.1 +pyxdg==0.26 +PyYAML==5.3.1 +rangehttpserver==1.2.0 +redis==3.2.1 +reportlab==3.5.47 +requests==2.18.4 +requests-toolbelt==0.9.1 +requests-unixsocket==0.2.0 +screen-resolution-extra==0.0.0 +SecretStorage==3.1.2 +simplejson==3.17.0 +six==1.15.0 +streaming-form-data==1.1.0 +systemd-python==234 +tabulate==0.8.3 +terminator==1.92 +-e git+git@gitlab.com:tinybird/analytics.git@0d13783b7e38c0decc97ac06901e8ce7b804221e#egg=tinybird +tinybird-cli==1.0.0b12 +TLPUI==1.3.1.post3 +toml==0.10.2 +toposort==1.5 +tornado==5.1.1 +tornado-opentracing==1.0.1 +torngithub==0.2.0 +ubuntu-advantage-tools==24.4 +ubuntu-drivers-common==0.0.0 +ufw==0.36 +unattended-upgrades==0.1 +urllib3==1.22 +vboxapi==1.0 +virtualenv==20.0.29+ds +wadllib==1.3.4 +wrapt==1.12.1 +xkit==0.0.0 +zipp==1.0.0 From 7ba28c4ee333ac06773878a810da6ca2ee9f789a Mon Sep 17 00:00:00 2001 From: ygnuss Date: Wed, 10 Mar 2021 12:02:05 +0100 Subject: [PATCH 04/12] Fixed handling delete events --- clickhouse_mysql/pumper.py | 2 +- clickhouse_mysql/writer/chwriter.py | 33 +++++++++++++++++++++++------ clickhouse_mysql/writer/writer.py | 12 ++++++++--- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index a868938..0b5d0c3 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -52,7 +52,7 @@ def delete_rows_event(self, event=None): DeleteRowsEvent handler :param event: """ - self.writer.delete(event) + self.writer.delete_row(event) def update_rows_event(self, event=None): """ diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index 5774091..5a243ae 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -113,7 +113,7 @@ def insert(self, event_or_events=None): # all DONE - def deleteRow(self, event_or_events): + def delete_row(self, event_or_events): # event_or_events = [ # event: { # row: {'id': 3, 'a': 3} @@ -138,6 +138,7 @@ def deleteRow(self, event_or_events): rows = [] event_converted = None + pk = None for event in events: if not event.verify: logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), @@ -145,6 +146,7 @@ def deleteRow(self, event_or_events): continue # for event event_converted = self.convert(event) + pk = event_converted.pymysqlreplication_event.primary_key for row in event_converted: for key in row.keys(): # we need to convert Decimal value to str value for suitable for table structure @@ -170,14 +172,27 @@ def deleteRow(self, event_or_events): # and DELETE converted rows + sql = '' + # try: + # sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( + # schema, + # table, + # ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + # ) + # self.client.execute(sql, rows) + sql = '' try: - sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( + sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2}'.format( schema, table, - ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + ' and '.join(filter(None, map( + lambda column, value: "" if column != pk else self.get_data_format(column, value), + row.keys(), row.values()))) ) - self.client.execute(sql, rows) + + self.client.execute(sql) + except Exception as ex: logging.critical('QUERY FAILED') logging.critical('ex={}'.format(ex)) @@ -200,10 +215,16 @@ def get_data_format(self, column, value): def update(self, event_or_events): # event_or_events = [ # event: { - # row: {'id': 3, 'a': 3} + # row: { + # 'before_values': {'id': 3, 'a': 3}, + # 'after_values': {'id': 3, 'a': 2} + # } # }, # event: { - # row: {'id': 3, 'a': 3} + # row: { + # 'before_values': {'id': 2, 'a': 3}, + # 'after_values': {'id': 2, 'a': 2} + # } # }, # ] diff --git a/clickhouse_mysql/writer/writer.py b/clickhouse_mysql/writer/writer.py index 3be276b..1bfaeb0 100644 --- a/clickhouse_mysql/writer/writer.py +++ b/clickhouse_mysql/writer/writer.py @@ -58,15 +58,21 @@ def insert(self, event_or_events=None): def update(self, event_or_events=None): # event_or_events = [ # event: { - # row: {'id': 3, 'a': 3} + # row: { + # 'before_values': {'id': 3, 'a': 3}, + # 'after_values': {'id': 3, 'a': 2} + # } # }, # event: { - # row: {'id': 3, 'a': 3} + # row: { + # 'before_values': {'id': 2, 'a': 3}, + # 'after_values': {'id': 2, 'a': 2} + # } # }, # ] pass - def delete(self, event_or_events=None): + def delete_row(self, event_or_events=None): # event_or_events = [ # event: { # row: {'id': 3, 'a': 3} From 76512c3c1762c16be3342e9da2d99dd375b39bbe Mon Sep 17 00:00:00 2001 From: ygnuss Date: Wed, 10 Mar 2021 16:50:47 +0100 Subject: [PATCH 05/12] Added audit column for tracking changes in Clickhouse --- clickhouse_mysql/writer/chwriter.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index 5a243ae..fbe9613 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -206,7 +206,9 @@ def delete_row(self, event_or_events): """ def get_data_format(self, column, value): t = type(value) - if t == str or t is datetime.datetime: + if t == str: + return "`%s`='%s'" % (column, value.replace("'", "\\'")) + elif t is datetime.datetime: return "`%s`='%s'" % (column, value) else: # int, float @@ -279,10 +281,11 @@ def update(self, event_or_events): sql = '' try: - sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2} where {3}'.format( + sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( schema, table, ', '.join(filter(None, map(lambda column, value: "" if column == pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), + "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ' and '.join(filter(None, map( lambda column, value: "" if column != pk or value is None else self.get_data_format(column, value), row['before_values'].keys(), row['before_values'].values()))) From 7e27baedb951d6c6533152095ebcc9327b0f8e3d Mon Sep 17 00:00:00 2001 From: ygnuss Date: Tue, 16 Mar 2021 09:39:17 +0100 Subject: [PATCH 06/12] Added scripts for running listeners --- run-listeners.sh | 51 +++++++++++++++++++++++++++++++++++++++++++++++ stop-listeners.sh | 23 +++++++++++++++++++++ tb_tables.config | 9 +++++++++ 3 files changed, 83 insertions(+) create mode 100755 run-listeners.sh create mode 100755 stop-listeners.sh create mode 100644 tb_tables.config diff --git a/run-listeners.sh b/run-listeners.sh new file mode 100755 index 0000000..60a5996 --- /dev/null +++ b/run-listeners.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +LOG_LEVEL=debug + +SOURCE_HOST=127.0.0.1 +SOURCE_PORT=3307 +DESTINATION_HOST=127.0.0.1 +SOURCE_USER=tinybird +SOURCE_PASSWD=goo7eu9AeS3i + +PID_LOG_FILE=/tmp/listeners-pid.log + +source tb_tables.config + +############################################################ +# Run a process to synchronize MySQL table using binlog. +# +# $1 --> Source schema +# $2 --> Source table +# $3 --> Destination schema +# $4 --> Destination table +# $5 --> Server id +# $6 --> Log file +# $7 --> Binlog position file +# +############################################################# +run_listener() { + + (clickhouse-mysql --src-server-id=$5 --src-wait --src-resume --binlog-position-file $7 --nice-pause=1 --src-host=$SOURCE_HOST --src-port=$SOURCE_PORT --src-user=$SOURCE_USER --src-password=$SOURCE_PASSWD --src-schemas=$1 --src-tables=$2 --dst-host=$DESTINATION_HOST --dst-schema=$3 --dst-table=$4 --log-level=$LOG_LEVEL --pump-data 2>> $6)& + +} + +run_listener "movida_preproduction" "schedulings" "$TB_DATABASE" "$SCHEDULINGS_TABLE" "91" "out-schedulings.log" "bl-pos-schedulings" +echo $! > $PID_LOG_FILE + +run_listener "movida_preproduction" "platforms" "$TB_DATABASE" "$PLATFORMS_TABLE" "92" "out-platforms.log" "bl-pos-platforms" +echo $! >> $PID_LOG_FILE + +run_listener "movida_preproduction" "titles" "$TB_DATABASE" "$TITLES_TABLE" "93" "out-titles.log" "bl-pos-titles" +echo $! >> $PID_LOG_FILE + +run_listener "movida_preproduction" "assets" "$TB_DATABASE" "$ASSETS_TABLE" "94" "out-assets.log" "bl-pos-assets" +echo $! >> $PID_LOG_FILE + +run_listener "movida_preproduction" "features" "$TB_DATABASE" "$FEATURES_TABLE" "95" "out-features.log" "bl-pos-features" +echo $! >> $PID_LOG_FILE + +run_listener "movida_preproduction" "collection_entries" "$TB_DATABASE" "$COLLECTIONS_TABLE" "96" "out-collections.log" "bl-pos-collections" +echo $! >> $PID_LOG_FILE + +echo "PID processes in $PID_LOG_FILE" \ No newline at end of file diff --git a/stop-listeners.sh b/stop-listeners.sh new file mode 100755 index 0000000..582e97c --- /dev/null +++ b/stop-listeners.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +PID_LOG_FILE=/tmp/listeners-pid.log + +count_processes() { + echo `ps aux | grep clickhouse-mysql-data-reader | wc -l` +} + +total_before=$(count_processes) + +while IFS= read -r line +do + echo "$line" + kill $line +done < "$PID_LOG_FILE" + +total_after=$(count_processes) + +procs=`echo "$total_after - 1" | bc` + +if [ $total_after -gt 1 ]; then + echo "You still have $procs processes running" +fi \ No newline at end of file diff --git a/tb_tables.config b/tb_tables.config new file mode 100644 index 0000000..be59079 --- /dev/null +++ b/tb_tables.config @@ -0,0 +1,9 @@ +#!/bin/bash + +TB_DATABASE='d_073c5e' +TITLES_TABLE='t_8a192b9c7ece4572a5a2fc9858e26d5c' +ASSETS_TABLE='t_4c03fdeb4e3e4db784ead40b06ec8617' +COLLECTIONS_TABLE='t_3dd7b323438943c687bd4e13a0e181a1' +FEATURES_TABLE='t_23f41723e0eb480088cbb1c8f890a38c' +PLATFORMS_TABLE='t_83f598dc74254de68216a7c7735caffb' +SCHEDULINGS_TABLE='t_b5e541d4e73d4301ba736c427bd667c5' \ No newline at end of file From da6b5e3d96dd6d95a121f4c72fb8aed3e53208dd Mon Sep 17 00:00:00 2001 From: ygnuss Date: Tue, 16 Mar 2021 09:47:22 +0100 Subject: [PATCH 07/12] Added dumper script and moved all init scripts to init folder --- init/dump-tables.sh | 132 ++++++++++++++++++++ run-listeners.sh => init/run-listeners.sh | 0 stop-listeners.sh => init/stop-listeners.sh | 0 tb_tables.config => init/tb_tables.config | 0 4 files changed, 132 insertions(+) create mode 100755 init/dump-tables.sh rename run-listeners.sh => init/run-listeners.sh (100%) rename stop-listeners.sh => init/stop-listeners.sh (100%) rename tb_tables.config => init/tb_tables.config (100%) diff --git a/init/dump-tables.sh b/init/dump-tables.sh new file mode 100755 index 0000000..25eb02b --- /dev/null +++ b/init/dump-tables.sh @@ -0,0 +1,132 @@ +#!/bin/bash + +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit -1 +fi + +DUMP_PATH=$1 + +source tb_tables.config + +########### +### titles +########### + +echo "Dumping titles" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction titles > $DUMP_PATH/titles.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/titles-insert-tb.sql +cat $DUMP_PATH/titles.sql | grep "INSERT INTO" >> $DUMP_PATH/titles-insert-tb.sql +sed -i 's/INSERT INTO `titles` VALUES/INSERT INTO `t_8a192b9c7ece4572a5a2fc9858e26d5c` (`id`, `name`, `licensor_id`, `created_at`, `updated_at`, `company_id`, `series_id`, `external_id`, `poster_file_name`, `poster_content_type`, `poster_file_size`, `poster_updated_at`, `episode_number`, `dirty_episode_number`, `rights_count`, `blackouts_count`, `denied_rights_count`, `images_count`, `cover_image_id`, `title_type`, `metadata_updated_at`, `promoted_content_id`, `promoted_content_type`, `soft_destroyed`, `credits_count`, `translated_attributes`, `rules_count`, `discarded`, `episode_reference_id`, `brand_id`) VALUES/g' $DUMP_PATH/titles-insert-tb.sql + +echo "Truncate titles table" +echo "truncate $TB_DATABASE.$TITLES_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading titles into CH" +cat $DUMP_PATH/titles-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Titles loaded" + +read -p "Press enter to continue" + +########### +### assets +########### + +echo "Dumping assets" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction assets > $DUMP_PATH/assets.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/assets-insert-tb.sql +cat $DUMP_PATH/assets.sql | grep "INSERT INTO" >> $DUMP_PATH/assets-insert-tb.sql +sed -i 's/INSERT INTO `assets` VALUES/INSERT INTO `t_4c03fdeb4e3e4db784ead40b06ec8617` (`id`, `name`, `title_id`, `created_at`, `updated_at`, `description`, `runtime_in_milliseconds`, `metadata_updated_at`, `company_id`, `asset_type_enumeration_entry_id`, `external_id`) VALUES/g' $DUMP_PATH/assets-insert-tb.sql + +echo "Truncate assets table" +echo "truncate $TB_DATABASE.$ASSETS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading assets into CH" +cat $DUMP_PATH/assets-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Assets loaded" + +read -p "Press enter to continue" + +####################### +### Collection-entries +####################### + +echo "Dumping collection-entries" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction collection_entries > $DUMP_PATH/collections.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/collections-insert-tb.sql +cat $DUMP_PATH/collections.sql | grep "INSERT INTO" >> $DUMP_PATH/collections-insert-tb.sql +sed -i 's/INSERT INTO `collection_entries` VALUES/INSERT INTO `t_3dd7b323438943c687bd4e13a0e181a1` (`collection_id`, `title_id`, `id`, `position`) VALUES/g' $DUMP_PATH/collections-insert-tb.sql + +echo "Truncate collections table" +echo "truncate $TB_DATABASE.$COLLECTIONS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading collection-entries into CH" +cat $DUMP_PATH/collections-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Collection-entries loaded" + +read -p "Press enter to continue" + +############## +### Features +############## + +echo "Dumping features" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction features > $DUMP_PATH/features.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/features-insert-tb.sql +read -p "Press enter to continue use" +cat $DUMP_PATH/features.sql | grep "INSERT INTO" >> $DUMP_PATH/features-insert-tb.sql +read -p "Press enter to continue insert" +sed -i 's/INSERT INTO `features` VALUES/INSERT INTO `t_23f41723e0eb480088cbb1c8f890a38c` (`id`, `name`, `enabled`, `company_id`, `created_at`, `updated_at`) VALUES/g' $DUMP_PATH/features-insert-tb.sql +read -p "Press enter to continue sed" +echo "Truncate features table" +echo "truncate $TB_DATABASE.$FEATURES_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading features into CH" +cat $DUMP_PATH/features-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Features loaded" + +read -p "Press enter to continue" + +############## +### Platforms +############## + +echo "Dumping platforms" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction platforms > $DUMP_PATH/platforms.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/platforms-insert-tb.sql +cat $DUMP_PATH/platforms.sql | grep "INSERT INTO" >> $DUMP_PATH/platforms-insert-tb.sql +sed -i 's/INSERT INTO `platforms` VALUES/INSERT INTO `t_83f598dc74254de68216a7c7735caffb` (`id`, `company_id`, `name`, `created_at`, `updated_at`, `sequence_service_titles_url`, `_deprecated_sequence_template_name`, `_deprecated_owned`, `sequence_template_url`, `metadata_constant_name`, `outlet_id`, `automatic_publication_enabled`, `metadata_updated_at`, `granted_categories`, `external_id`, `timezone`) VALUES/g' $DUMP_PATH/platforms-insert-tb.sql + +echo "Truncate platforms table" +echo "truncate $TB_DATABASE.$PLATFORMS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading platforms into CH" +cat $DUMP_PATH/platforms-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Platforms loaded" + +read -p "Press enter to continue" + +################# +### Schedulings +################# + +echo "Dumping schedulings" +mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction schedulings > $DUMP_PATH/schedulings.sql + +echo "use $TB_DATABASE;" > $DUMP_PATH/schedulings-insert-tb.sql +cat $DUMP_PATH/schedulings.sql | grep "INSERT INTO" >> $DUMP_PATH/schedulings-insert-tb.sql +sed -i 's/INSERT INTO `schedulings` VALUES/INSERT INTO `t_b5e541d4e73d4301ba736c427bd667c5` (`id`, `title_id`, `put_up`, `take_down`, `created_at`, `updated_at`, `cleared`, `platform_id`, `rule_id`, `workflow_offset`, `sequence_asset_url`, `sequence_asset_name`, `workflow_sent`, `status`, `asset_id`, `rule_asset_id`, `title_group_id`, `workflow_web_url`, `_deprecated_publication_status`, `published_at`, `_prev_put_up`, `_prev_take_down`, `_pending_simulation`, `workflow_template_url`, `original_draft_scheduling_id`, `playlist_id`, `updating_playlist`, `workflow_job_url`, `workflow_status`, `conflict_types`, `metadata_updated_at`, `company_id`, `cached_title_episode_number`, `metadata_status`, `publication_status`, `publication_status_updated_at`, `metadata_status_updated_at`, `external_id`, `disabled_at`, `scheduling_type`, `overridden_rule_attributes`, `update_in_progress`, `metadata_error_digest`) VALUES/g' $DUMP_PATH/schedulings-insert-tb.sql + +echo "Truncate schedulings table" +echo "truncate $TB_DATABASE.$SCHEDULINGS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn + +echo "Loading schedulings into CH" +cat $DUMP_PATH/schedulings-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn +echo "Schedulings loaded" + +echo "Process finished!" \ No newline at end of file diff --git a/run-listeners.sh b/init/run-listeners.sh similarity index 100% rename from run-listeners.sh rename to init/run-listeners.sh diff --git a/stop-listeners.sh b/init/stop-listeners.sh similarity index 100% rename from stop-listeners.sh rename to init/stop-listeners.sh diff --git a/tb_tables.config b/init/tb_tables.config similarity index 100% rename from tb_tables.config rename to init/tb_tables.config From 922e8768afbd2e54e967a5ed281fc514efbc25e6 Mon Sep 17 00:00:00 2001 From: ygnuss Date: Tue, 16 Mar 2021 09:48:04 +0100 Subject: [PATCH 08/12] Updated gitignore to ignore out log files and binlog checkpoint files --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index eb18180..c44086a 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,7 @@ _build # Pyenv .python-version + +# Tinibird +bl-* +out-* \ No newline at end of file From cbda47621e39f4a2809681547f18ae0268493a79 Mon Sep 17 00:00:00 2001 From: ygnuss Date: Tue, 16 Mar 2021 09:49:05 +0100 Subject: [PATCH 09/12] Fix to handle multivaluated keys in asset table. This needs to be fixed to be generic. I've added this just to continue with the PoC --- clickhouse_mysql/writer/chwriter.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index fbe9613..ab29ec4 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -253,7 +253,12 @@ def update(self, event_or_events): continue # for event event_converted = self.convert(event) - pk = event_converted.pymysqlreplication_event.primary_key + pk = [event_converted.pymysqlreplication_event.primary_key] + if event_converted.table == 'assets': + pk.append('name') + pk.append('title_id') + pk.append('company_id') + pk.append('asset_type_enumeration_entry_id') for row in event_converted.pymysqlreplication_event.rows: for key in row['after_values'].keys(): # we need to convert Decimal value to str value for suitable for table structure @@ -284,10 +289,10 @@ def update(self, event_or_events): sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( schema, table, - ', '.join(filter(None, map(lambda column, value: "" if column == pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), + ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ' and '.join(filter(None, map( - lambda column, value: "" if column != pk or value is None else self.get_data_format(column, value), + lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), row['before_values'].keys(), row['before_values'].values()))) ) From 7f78d394b2dae563b94954fff2bba6e4883ce4b1 Mon Sep 17 00:00:00 2001 From: ygnuss Date: Tue, 16 Mar 2021 09:51:45 +0100 Subject: [PATCH 10/12] Added first-processing script to initialize database without loosing data --- init/first-processing.sh | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100755 init/first-processing.sh diff --git a/init/first-processing.sh b/init/first-processing.sh new file mode 100755 index 0000000..7daa44c --- /dev/null +++ b/init/first-processing.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + exit -1 +fi + +echo "Generate binlog timelog" +./run-listener.sh +./stop-listeners.sh + +echo "Generating dumps and loading data ..." +./dump-tables.sh $1 + +echo "Starting listeners" +./run-listener.sh + +echo "Done!" \ No newline at end of file From e90ec12113e6b456f22fe4048dd06dcf40bda7ea Mon Sep 17 00:00:00 2001 From: ygnuss Date: Mon, 12 Apr 2021 10:57:56 +0200 Subject: [PATCH 11/12] fix: Updated run script to be more flexible and support running just one process or all --- init/run-listeners.sh | 105 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 92 insertions(+), 13 deletions(-) diff --git a/init/run-listeners.sh b/init/run-listeners.sh index 60a5996..21a7502 100755 --- a/init/run-listeners.sh +++ b/init/run-listeners.sh @@ -24,28 +24,107 @@ source tb_tables.config # $7 --> Binlog position file # ############################################################# -run_listener() { +function run_listener() { (clickhouse-mysql --src-server-id=$5 --src-wait --src-resume --binlog-position-file $7 --nice-pause=1 --src-host=$SOURCE_HOST --src-port=$SOURCE_PORT --src-user=$SOURCE_USER --src-password=$SOURCE_PASSWD --src-schemas=$1 --src-tables=$2 --dst-host=$DESTINATION_HOST --dst-schema=$3 --dst-table=$4 --log-level=$LOG_LEVEL --pump-data 2>> $6)& } -run_listener "movida_preproduction" "schedulings" "$TB_DATABASE" "$SCHEDULINGS_TABLE" "91" "out-schedulings.log" "bl-pos-schedulings" -echo $! > $PID_LOG_FILE +function run_schedulings() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi -run_listener "movida_preproduction" "platforms" "$TB_DATABASE" "$PLATFORMS_TABLE" "92" "out-platforms.log" "bl-pos-platforms" -echo $! >> $PID_LOG_FILE + run_listener "movida_preproduction" "schedulings" "$TB_DATABASE" "$SCHEDULINGS_TABLE" "91" "out-schedulings.log" "bl-pos-schedulings" + echo $! > $PID_LOG_FILE -run_listener "movida_preproduction" "titles" "$TB_DATABASE" "$TITLES_TABLE" "93" "out-titles.log" "bl-pos-titles" -echo $! >> $PID_LOG_FILE +} + +function run_platforms() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "platforms" "$TB_DATABASE" "$PLATFORMS_TABLE" "92" "out-platforms.log" "bl-pos-platforms" + echo $! >> $PID_LOG_FILE + +} + +function run_titles() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "titles" "$TB_DATABASE" "$TITLES_TABLE" "93" "out-titles.log" "bl-pos-titles" + echo $! >> $PID_LOG_FILE +} + +function run_assets() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi -run_listener "movida_preproduction" "assets" "$TB_DATABASE" "$ASSETS_TABLE" "94" "out-assets.log" "bl-pos-assets" -echo $! >> $PID_LOG_FILE + run_listener "movida_preproduction" "assets" "$TB_DATABASE" "$ASSETS_TABLE" "94" "out-assets.log" "bl-pos-assets" + echo $! >> $PID_LOG_FILE +} + +function run_features() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "features" "$TB_DATABASE" "$FEATURES_TABLE" "95" "out-features.log" "bl-pos-features" + echo $! >> $PID_LOG_FILE +} + +function run_collections() { + if [ $binlog == "true" ]; then + rm "bl-pos-collections" + fi + + run_listener "movida_preproduction" "collection_entries" "$TB_DATABASE" "$COLLECTIONS_TABLE" "96" "out-collections.log" "bl-pos-collections" + echo $! >> $PID_LOG_FILE +} + +function usage { + echo "usage: $0 -d datasource [-b clean_binlog]" + echo " -d datasource datasource to syn. Use all for synchronizing all available datasources." + echo " - all" + echo " - schedulings" + echo " - platforms" + echo " - titles" + echo " - assets" + echo " - features" + echo " - collections" + echo " -b clean_binlog clean binlog before running (true | false) False by default" + exit -1 +} -run_listener "movida_preproduction" "features" "$TB_DATABASE" "$FEATURES_TABLE" "95" "out-features.log" "bl-pos-features" -echo $! >> $PID_LOG_FILE +datasource="NONE" +while getopts d:b: flag +do + case "${flag}" in + d) datasource=${OPTARG};; + b) binlog=${OPTARG};; + esac +done -run_listener "movida_preproduction" "collection_entries" "$TB_DATABASE" "$COLLECTIONS_TABLE" "96" "out-collections.log" "bl-pos-collections" -echo $! >> $PID_LOG_FILE +case "${datasource}" in + NONE) usage;; + all) run_schedulings binlog + run_platforms binlog + run_titles binlog + run_assets binlog + run_features binlog + run_collections binlog + ;; + schedulings) run_schedulings binlog;; + platforms) run_platforms binlog;; + titles) run_titles binlog;; + assets) run_assets binlog;; + features) run_features binlog;; + collections) run_collections binlog;; + *) usage;; +esac echo "PID processes in $PID_LOG_FILE" \ No newline at end of file From a9c0cfdadbe07107cbcf22743fd81a1e2c414be8 Mon Sep 17 00:00:00 2001 From: ygnuss Date: Thu, 20 May 2021 13:30:01 +0200 Subject: [PATCH 12/12] Changed update to include in alter table command just those columns which actually change --- clickhouse_mysql/writer/chwriter.py | 18 ++++- requirements.txt | 112 ++-------------------------- 2 files changed, 21 insertions(+), 109 deletions(-) diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index ab29ec4..6cca8ef 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -283,19 +283,33 @@ def update(self, event_or_events): self.dst_table)) # and UPDATE converted rows - + # improve performance updating just those fields which have actually changed + updated_values = dict(set(row['after_values'].items()).difference(set(row['before_values'].items()))) + sql = '' try: + # sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( + # schema, + # table, + # ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), + # "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + # ' and '.join(filter(None, map( + # lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), + # row['before_values'].keys(), row['before_values'].values()))) + # ) + sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( schema, table, - ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), + ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), updated_values.keys(), updated_values.values()))), "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ' and '.join(filter(None, map( lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), row['before_values'].keys(), row['before_values'].values()))) ) + logging.debug("SQL UPDATE: \n\n " + sql + "\n\n") + self.client.execute(sql) except Exception as ex: logging.critical('QUERY FAILED') diff --git a/requirements.txt b/requirements.txt index bce8d28..da4173a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,108 +1,6 @@ -appdirs==1.4.4 -apturl==0.5.2 -astroid==2.4.2 -basictracer==3.1.0 -blinker==1.4 -Brlapi==0.7.0 -cachetools==4.0.0 -certifi==2020.12.5 -chardet==3.0.4 -chrome-gnome-shell==0.0.0 -Click==7.0 -clickhouse-driver==0.0.10 -clickhouse-toolset @ file:///tmp/clickhouse_toolset-0.9.dev0-cp38-cp38-linux_x86_64.whl -colorama==0.4.3 -command-not-found==0.3 -configobj==5.0.6 -crcmod==1.7 -cryptography==3.0 -cupshelpers==1.0 -datasketch==1.2.10 -dbus-python==1.2.16 -defer==1.0.6 -distlib==0.3.1 -distro==1.5.0 -distro-info===0.23ubuntu1 -dlib==19.16.0 -filelock==3.0.12 -httplib2==0.18.1 -humanfriendly==8.2 -idna==2.6 -importlib-metadata==1.6.0 -isort==5.7.0 -jeepney==0.4.3 -keyring==21.3.0 -language-selector==0.1 -launchpadlib==1.10.13 -lazr.restfulclient==0.14.2 -lazr.uri==1.0.5 -lazy-object-proxy==1.4.3 -louis==3.14.0 -macaroonbakery==1.3.1 -Markdown==3.2.1 -mccabe==0.6.1 -more-itertools==4.2.0 -msal==1.5.0 -netifaces==0.10.4 -numpy==1.15.0 -oauthlib==3.1.0 -olefile==0.46 -opencv-python==4.4.0.46 -opentracing==2.0.0 -passlib==1.7.1 -pexpect==4.6.0 -Pillow==7.2.0 -powerline-status==2.8.1 -protobuf==3.12.3 -psutil==5.6.3 -psycopg2-binary==2.8.5 -pycairo==1.16.2 -pycups==2.0.1 -pycurl==7.43.0.6 -Pygments==2.3.1 -PyGObject==3.38.0 -PyJWT==1.6.4 -pylint==2.6.0 -pymacaroons==0.13.0 +clickhouse-driver==0.2.0 +mysql-replication==0.23 +mysqlclient==2.0.3 PyMySQL==1.0.2 -PyNaCl==1.4.0 -pyRFC3339==1.1 -python-apt==2.1.3+ubuntu1.3 -python-dateutil==2.8.1 -python-debian==0.1.37 -pytz==2020.1 -pyxdg==0.26 -PyYAML==5.3.1 -rangehttpserver==1.2.0 -redis==3.2.1 -reportlab==3.5.47 -requests==2.18.4 -requests-toolbelt==0.9.1 -requests-unixsocket==0.2.0 -screen-resolution-extra==0.0.0 -SecretStorage==3.1.2 -simplejson==3.17.0 -six==1.15.0 -streaming-form-data==1.1.0 -systemd-python==234 -tabulate==0.8.3 -terminator==1.92 --e git+git@gitlab.com:tinybird/analytics.git@0d13783b7e38c0decc97ac06901e8ce7b804221e#egg=tinybird -tinybird-cli==1.0.0b12 -TLPUI==1.3.1.post3 -toml==0.10.2 -toposort==1.5 -tornado==5.1.1 -tornado-opentracing==1.0.1 -torngithub==0.2.0 -ubuntu-advantage-tools==24.4 -ubuntu-drivers-common==0.0.0 -ufw==0.36 -unattended-upgrades==0.1 -urllib3==1.22 -vboxapi==1.0 -virtualenv==20.0.29+ds -wadllib==1.3.4 -wrapt==1.12.1 -xkit==0.0.0 -zipp==1.0.0 +pytz==2021.1 +tzlocal==2.1