diff --git a/main.py b/main.py index 9006574..77d6190 100755 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from src.daemon import Daemon import sys +import multiprocessing as mp if sys.version_info[0] < 3: @@ -17,6 +18,7 @@ class Main(Daemon): config = None def __init__(self): + mp.set_start_method('forkserver') self.config = CLIOpts.config() super().__init__(pidfile=self.config.pid_file()) diff --git a/run.sh b/run.sh index fd8218e..0162783 100755 --- a/run.sh +++ b/run.sh @@ -1,13 +1,22 @@ #!/bin/bash python3 main.py \ - --src-resume --src-wait \ - --src-host=127.0.0.1 --src-user=reader --src-password=qwerty \ - --dst-host=192.168.74.251 \ - --dst-db=db --dst-table=datatypes \ - --mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \ - --csvpool --csvpool-file-path-prefix=qwe \ - --csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 + --src-resume \ + --src-wait \ + --src-host=127.0.0.1 \ + --src-user=reader \ + --src-password=qwerty \ + --dst-host=192.168.74.251 \ + --csvpool \ + --csvpool-file-path-prefix=qwe_ \ + --csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 \ + --mempool-max-flush-interval=600 \ + --mempool-max-events-num=900000 -# --dst-file=dst.csv +# --mempool +# --mempool-max-events-num=3 +# --mempool-max-flush-interval=30 +# --dst-file=dst.csv +# --dst-schema=db +# --dst-table=datatypes # --csvpool-keep-files diff --git a/src/cliopts.py b/src/cliopts.py index fc78332..9d36566 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -52,7 +52,7 @@ def config(): argparser.add_argument( '--dry', action='store_true', - help='Dry mode - do not do anything that can harm.' + help='Dry mode - do not do anything that can harm. ' 'Useful for debugging.' ) argparser.add_argument( @@ -74,30 +74,30 @@ def config(): argparser.add_argument( '--mempool-max-events-num', type=int, - default=1000, - help='max events num to pool before batch write' + default=100000, + help='Max events number to pool - triggering pool flush' ) argparser.add_argument( '--mempool-max-flush-interval', type=int, default=60, - help='max seconds num between flushes' + help='Max seconds number between pool flushes' ) argparser.add_argument( '--csvpool', action='store_true', - help='Cache data in csv files.' + help='Cache data in CSV pool files on disk. Requires memory pooling, thus enables --mempool even if it is not explicitly specified' ) argparser.add_argument( '--csvpool-file-path-prefix', type=str, - default='/tmp/csvpool', - help='file path prefix to CSV pool files' + default='/tmp/csvpool_', + help='File path prefix to CSV pool files' ) argparser.add_argument( '--csvpool-keep-files', action='store_true', - help='Keep pool csv files.' + help='Keep CSV pool files. Useful for debugging' ) argparser.add_argument( @@ -156,7 +156,7 @@ def config(): '--src-file', type=str, default=None, - help='Source file tp read data from' + help='Source file to read data from' ) argparser.add_argument( @@ -190,10 +190,10 @@ def config(): help='Password to be used when writing to dst' ) argparser.add_argument( - '--dst-db', + '--dst-schema', type=str, default=None, - help='Database to be used when writing to dst' + help='Database/schema to be used when writing to dst' ) argparser.add_argument( '--dst-table', @@ -220,7 +220,7 @@ def config(): 'dry': args.dry, 'daemon': args.daemon, 'pid_file': args.pid_file, - 'mempool': args.mempool, + 'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled 'mempool-max-events-num': args.mempool_max_events_num, 'mempool-max-flush-interval': args.mempool_max_flush_interval, 'csvpool': args.csvpool, @@ -262,7 +262,7 @@ def config(): 'user': args.dst_user, 'password': args.dst_password, }, - 'dst_db': args.dst_db, + 'dst_schema': args.dst_schema, 'dst_table': args.dst_table, }, 'file': { @@ -270,7 +270,7 @@ def config(): 'csv_file_path_prefix': args.csvpool_file_path_prefix, 'csv_file_path_suffix_parts': [], 'csv_keep_file': args.csvpool_keep_files, - 'dst_db': args.dst_db, + 'dst_schema': args.dst_schema, 'dst_table': args.dst_table, }, }, diff --git a/src/config.py b/src/config.py index 2c8b487..06e19c9 100644 --- a/src/config.py +++ b/src/config.py @@ -8,8 +8,11 @@ from .writer.csvwriter import CSVWriter from .writer.chcsvwriter import CHCSVWriter from .writer.poolwriter import PoolWriter +from .writer.processwriter import ProcessWriter +from .objectbuilder import ObjectBuilder from .converter.csvwriteconverter import CSVWriteConverter +from .converter.chwriteconverter import CHWriteConverter class Config(object): @@ -40,30 +43,47 @@ def reader(self): else: return MySQLReader(**self.config['reader-config']['mysql']) - def writer_class(self): + def converter_builder(self): + if not self.config['converter-config']['csv']['column_default_value']: + # no default values for CSV columns provided + return None + return ObjectBuilder( + instance=CSVWriteConverter( + defaults=self.config['converter-config']['csv']['column_default_value'] + )) + + def writer_builder(self): if self.config['app-config']['csvpool']: - return CSVWriter, { - **self.config['writer-config']['file'], - 'next': CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings']), - 'converter': CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value']) if self.config['converter-config']['csv']['column_default_value'] else None, - } + return ObjectBuilder(class_name=ProcessWriter, constructor_params={ + 'next_writer_builder': ObjectBuilder(class_name=CSVWriter, constructor_params={ + **self.config['writer-config']['file'], + 'next_writer_builder': ObjectBuilder( + class_name=CHCSVWriter, + constructor_params=self.config['writer-config']['clickhouse']['connection_settings'] + ), + 'converter_builder': self.converter_builder(), + }) + }) elif self.config['writer-config']['file']['csv_file_path']: - return CSVWriter, self.config['writer-config']['file'] + return ObjectBuilder(class_name=CSVWriter, constructor_params={ + **self.config['writer-config']['file'], + 'converter_builder': self.converter_builder(), + }) else: - return CHWriter, self.config['writer-config']['clickhouse'] + return ObjectBuilder(class_name=CHWriter, constructor_params={ + **self.config['writer-config']['clickhouse'], + 'converter_builder': ObjectBuilder(instance=CHWriteConverter()), + }) def writer(self): - writer_class, writer_params = self.writer_class() - if self.config['app-config']['mempool']: return PoolWriter( - writer_class=writer_class, - writer_params=writer_params, + writer_builder=self.writer_builder(), max_pool_size=self.config['app-config']['mempool-max-events-num'], max_flush_interval=self.config['app-config']['mempool-max-flush-interval'], ) else: - return writer_class(**writer_params) + return self.writer_builder().get() diff --git a/src/converter/chwriteconverter.py b/src/converter/chwriteconverter.py index 4fb6df5..0499077 100644 --- a/src/converter/chwriteconverter.py +++ b/src/converter/chwriteconverter.py @@ -28,26 +28,27 @@ class CHWriteConverter(Converter): def convert(self, event): columns_to_delete = [] - for column_name in event.row: -# print(column_name, row['values'][column_name], type(row['values'][column_name])) - if self.delete_empty_columns and (event.row[column_name] is None): -# print("Skip None value for column", column_name) - columns_to_delete.append(column_name) + for column in event.row: + if (event.row[column] is None) and self.delete_empty_columns: + # include empty column to the list of to be deleted columns + columns_to_delete.append(column) + # move to next column continue for t in self.types_to_convert: - if isinstance(event.row[column_name], t): -# print("Converting column", column_name, "of type", type(event.row[column_name]), -# event.row[column_name]) - event.row[column_name] = str(event.row[column_name]) -# print("res", event.row[column_name]) + if isinstance(event.row[column], t): +# print("Converting column", column, "of type", type(event.row[column]), +# event.row[column]) + event.row[column] = str(event.row[column]) +# print("res", event.row[column]) break else: -# print("Using asis column", column_name, "of type", type(event.row[column_name])) +# print("Using asis column", column, "of type", type(event.row[column])) pass - for column_to_delete in columns_to_delete: - event.row.pop(column_to_delete) + # delete columns according to the list + for column in columns_to_delete: + event.row.pop(column) return event diff --git a/src/converter/csvwriteconverter.py b/src/converter/csvwriteconverter.py index 41d07d2..c820e14 100644 --- a/src/converter/csvwriteconverter.py +++ b/src/converter/csvwriteconverter.py @@ -6,17 +6,25 @@ class CSVWriteConverter(Converter): + # default values for columns - dict defaults = None - def __init__(self, defaults={}): + def __init__(self, defaults=None): self.defaults = defaults - def convert(self, event): + # no defaults - nothing to convert if not self.defaults: return event + # defaults are empty - nothing to convert + if len(self.defaults) < 1: + return event + + # have defaults for column in event.row: - if column in self.defaults and event.row[column] is None: + # replace None column with default value + if event.row[column] is None and column in self.defaults: event.row[column] = self.defaults[column] + return event diff --git a/src/objectbuilder.py b/src/objectbuilder.py new file mode 100644 index 0000000..08e6762 --- /dev/null +++ b/src/objectbuilder.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + + +class ObjectBuilder(object): + + class_name = None + constructor_params = None + instance = None + + def __init__(self, class_name=None, constructor_params=None, instance=None): + self.class_name = class_name + self.constructor_params = constructor_params + self.instance = instance + + def param(self, name, value): + if not self.constructor_params: + self.constructor_params = {} + self.constructor_params[name] = value + + def get(self): + if not self.class_name: + # no class name - return instance, it may be None + return self.instance + + # have class name + + if self.constructor_params: + return self.class_name(**self.constructor_params) + else: + return self.class_name() diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 29d74f5..76a8b94 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -4,12 +4,13 @@ import time from .pool import Pool +from ..objectbuilder import ObjectBuilder -# Buckets Belts' Index Builder -class BBIndexBuilder(object): +# Buckets Belts' Index Generator +class BBIndexGenerator(object): - def build(self, item): + def generate(self, item): # build key of the belt on which to place item return str(item.schema) + '.' + str(item.table) @@ -33,17 +34,15 @@ class BBPool(Pool): def __init__( self, - writer_class=None, - writer_params={}, - key_builder_class=None, + writer_builder=None, + key_builder=None, max_bucket_size=10000, max_belt_size=1, max_interval_between_rotations=60, ): super().__init__( - writer_class=writer_class, - writer_params=writer_params, - key_builder_class=BBIndexBuilder, + writer_builder=writer_builder, + key_builder=ObjectBuilder(class_name=BBIndexGenerator), max_bucket_size=max_bucket_size, max_belt_size=max_belt_size, max_interval_between_rotations=max_interval_between_rotations, @@ -56,7 +55,7 @@ def create_belt(self, belt_index): def insert(self, item): # which belt we'll insert item? - belt_index = self.key_builder.build(item) + belt_index = self.key_generator.generate(item) # register belt if not yet if belt_index not in self.belts: @@ -90,17 +89,17 @@ def flush(self, key=None): def rotate_belt(self, belt_index, flush=False): now = int(time.time()) need_rotation = True if flush else False - rotate_by = "FLUSH" + rotate_reason = "FLUSH" if len(self.belts[belt_index][0]) >= self.max_bucket_size: # 0-index bucket is full need_rotation = True - rotate_by = "SIZE" + rotate_reason = "SIZE" elif now >= self.belts_rotated_at[belt_index] + self.max_interval_between_rotations: # time interval reached need_rotation = True - rotate_by = "TIME" + rotate_reason = "TIME" if not need_rotation: # belt not rotated @@ -122,11 +121,11 @@ def rotate_belt(self, belt_index, flush=False): buckets_num = len(self.belts[belt_index]) last_bucket_size = len(self.belts[belt_index][buckets_num-1]) - print(now, self.buckets_count, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts)) + print('rotating belt. now:', now, 'bucket number:', self.buckets_count, 'index:', belt_index, 'reason:', rotate_reason, 'buckets on belt:', buckets_num, 'last bucket size:', last_bucket_size, 'belts count:', len(self.belts)) # time to flush data for specified key - self.writer_params['csv_file_path_suffix_parts'] = [str(now), str(self.buckets_count)] - writer = self.writer_class(**self.writer_params) + self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)]) + writer = self.writer_builder.get() writer.insert(self.belts[belt_index].pop()) writer.close() writer.push() diff --git a/src/pool/pool.py b/src/pool/pool.py index f62a0a2..9a0b402 100644 --- a/src/pool/pool.py +++ b/src/pool/pool.py @@ -4,30 +4,26 @@ class Pool(object): - writer_class = None - writer_params = None - - key_builder_class = None + writer_builder = None key_builder = None + key_generator = None + max_bucket_size = None max_belt_size = None max_interval_between_rotations = None def __init__( self, - writer_class=None, - writer_params={}, - key_builder_class=None, + writer_builder=None, + key_builder=None, max_bucket_size=10000, max_belt_size=1, max_interval_between_rotations=60, ): - self.writer_class = writer_class - self.writer_params = writer_params - - self.key_builder_class = key_builder_class - self.key_builder = self.key_builder_class() + self.writer_builder = writer_builder + self.key_builder = key_builder + self.key_generator = self.key_builder.get() self.max_bucket_size = max_bucket_size self.max_belt_size = max_belt_size diff --git a/src/pumper.py b/src/pumper.py index ef61736..03f5d31 100644 --- a/src/pumper.py +++ b/src/pumper.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import time class Pumper(object): diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index 9ac1dbe..88044db 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -88,7 +88,6 @@ def read(self): # blocking self.fire('ReaderIdleEvent') - time.sleep(1) except KeyboardInterrupt: pass diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py index a386788..e350b2f 100644 --- a/src/writer/chcsvwriter.py +++ b/src/writer/chcsvwriter.py @@ -1,9 +1,12 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from .writer import Writer + import os +import time -class CHCSVWriter(object): +class CHCSVWriter(Writer): host = None port = None @@ -26,7 +29,11 @@ def insert(self, event_or_events=None): # }, # ] - for event in event_or_events: + events = self.listify(event_or_events) + if len(events) < 1: + return + + for event in events: sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( event.schema, event.table, @@ -48,7 +55,7 @@ def insert(self, event_or_events=None): sql, ) - print('running:', bash) +# print('running:', bash) os.system(bash) pass diff --git a/src/writer/chwriter.py b/src/writer/chwriter.py index c331630..4625e6d 100644 --- a/src/writer/chwriter.py +++ b/src/writer/chwriter.py @@ -3,19 +3,26 @@ from clickhouse_driver.client import Client from .writer import Writer -from ..event.event import Event -from ..converter.chwriteconverter import CHWriteConverter class CHWriter(Writer): client = None - dst_db = None + dst_schema = None dst_table = None - def __init__(self, connection_settings, dst_db, dst_table): + def __init__( + self, + connection_settings, + dst_schema=None, + dst_table=None, + next_writer_builder=None, + converter_builder=None, + ): + super().__init__(next_writer_builder=next_writer_builder, converter_builder=converter_builder) + self.client = Client(**connection_settings) - self.dst_db = dst_db + self.dst_schema = dst_schema self.dst_table = dst_table def insert(self, event_or_events=None): @@ -28,29 +35,18 @@ def insert(self, event_or_events=None): # }, # ] - if event_or_events is None: - # nothing to insert at all + events = self.listify(event_or_events) + if len(events) < 1: return - elif isinstance(event_or_events, list): - if len(event_or_events) < 1: - # list is empty - nothing to insert - return - - else: - # event_or_events is instance of Event - event_or_events = [event_or_events] - - converter = CHWriteConverter() - values = [] - ev = None - for event in event_or_events: - ev = converter.convert(event) - values.append(ev.row) + event_converted = None + for event in events: + event_converted = self.convert(event) + values.append(event_converted.row) - schema = self.dst_db if self.dst_db else ev.schema - table = self.dst_table if self.dst_table else ev.table + schema = self.dst_schema if self.dst_schema else event_converted.schema + table = self.dst_table if self.dst_table else event_converted.table try: sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format( diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index f7cd044..6ea1f04 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -12,11 +12,10 @@ class CSVWriter(Writer): file = None path = None writer = None - dst_db = None + dst_schema = None dst_table = None fieldnames = None header_written = False - converter = None path_prefix = None path_suffix_parts = [] delete = False @@ -27,21 +26,21 @@ def __init__( csv_file_path_prefix=None, csv_file_path_suffix_parts=[], csv_keep_file=False, - dst_db=None, + dst_schema=None, dst_table=None, - next=None, - converter=None, + next_writer_builder=None, + converter_builder=None, ): + super().__init__(next_writer_builder=next_writer_builder, converter_builder=converter_builder) + self.path = csv_file_path self.path_prefix = csv_file_path_prefix self.path_suffix_parts = csv_file_path_suffix_parts - self.dst_db = dst_db + self.dst_schema = dst_schema self.dst_table = dst_table - self.next = next - self.converter = converter if self.path is None: - self.path = self.path_prefix + '_' + '_'.join(self.path_suffix_parts) + '.csv' + self.path = self.path_prefix + '_'.join(self.path_suffix_parts) + '.csv' self.delete = not csv_keep_file def __del__(self): @@ -69,41 +68,37 @@ def insert(self, event_or_events): # }, # ] - if event_or_events is None: - # nothing to insert at all + events = self.listify(event_or_events) + if len(events) < 1: return - elif isinstance(event_or_events, list): - if len(event_or_events) < 1: - # list is empty - nothing to insert - return - - else: - # event_or_events is instance of Event - event_or_events = [event_or_events] - if not self.opened(): self.open() if not self.writer: - self.fieldnames = sorted(event_or_events[0].row.keys()) + self.fieldnames = sorted(events[0].row.keys()) + if self.dst_schema is None: + self.dst_schema = events[0].schema + if self.dst_table is None: + self.dst_table = events[0].table + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) if not self.header_written: self.writer.writeheader() - for event in event_or_events: - self.writer.writerow(self.converter.convert(event).row if self.converter else event.row) + for event in events: + self.writer.writerow(self.convert(event).row) def push(self): - if not self.next: + if not self.next_writer_builder: return event = Event() - event.schema = self.dst_db + event.schema = self.dst_schema event.table = self.dst_table event.file = self.path event.fieldnames = self.fieldnames - self.next.insert([event]) + self.next_writer_builder.get().insert(event) def close(self): if self.opened(): diff --git a/src/writer/poolwriter.py b/src/writer/poolwriter.py index 3beaab4..84413bb 100644 --- a/src/writer/poolwriter.py +++ b/src/writer/poolwriter.py @@ -8,26 +8,22 @@ class PoolWriter(Writer): - writer_class = None - writer_params = None + writer_builder = None max_pool_size = None pool = None def __init__( self, - writer_class=None, - writer_params={}, + writer_builder=None, max_pool_size=10000, max_flush_interval=60 ): - self.writer_class = writer_class - self.writer_params = writer_params + self.writer_builder = writer_builder self.max_pool_size = max_pool_size self.max_flush_interval = max_flush_interval self.pool = BBPool( - writer_class=self.writer_class, - writer_params=self.writer_params, + writer_builder=self.writer_builder, max_bucket_size=self.max_pool_size, max_interval_between_rotations=self.max_flush_interval, ) diff --git a/src/writer/processwriter.py b/src/writer/processwriter.py new file mode 100644 index 0000000..93db354 --- /dev/null +++ b/src/writer/processwriter.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from .writer import Writer +import multiprocessing as mp + + +class ProcessWriter(Writer): + + args = None + + def __init__(self, **kwargs): + next_writer_builder = kwargs.pop('next_writer_builder', None) + converter_builder = kwargs.pop('converter_builder', None) + super().__init__(next_writer_builder=next_writer_builder, converter_builder=converter_builder) + for arg in kwargs: + self.next_writer_builder.param(arg, kwargs[arg]) + + def opened(self): + pass + + def open(self): + pass + + def process(self, event_or_events=None): + writer = self.next_writer_builder.get() + writer.insert(event_or_events) + writer.close() + writer.push() + writer.destroy() + + def insert(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + process = mp.Process(target=self.process, args=(event_or_events,)) + #print('Start Process') + process.start() + #print('Join Process') + #process.join() + #print('Done Process') + pass + + def flush(self): + pass + + def push(self): + pass + + def destroy(self): + pass + + def close(self): + pass diff --git a/src/writer/writer.py b/src/writer/writer.py index b6159c7..6fabd2f 100644 --- a/src/writer/writer.py +++ b/src/writer/writer.py @@ -4,7 +4,16 @@ class Writer(object): - next = None + next_writer_builder = None + converter_builder = None + + def __init__( + self, + next_writer_builder=None, + converter_builder=None + ): + self.next_writer_builder = next_writer_builder + self.converter_builder = converter_builder def opened(self): pass @@ -12,6 +21,28 @@ def opened(self): def open(self): pass + def listify(self, obj_or_list): + """Ensure list""" + + if obj_or_list is None: + # no value - return empty list + return [] + + elif isinstance(obj_or_list, list): + if len(obj_or_list) < 1: + # list is empty - nothing to do + return [] + else: + # list is good + return obj_or_list + + else: + # event_or_events is an object + return [obj_or_list] + + def convert(self, event): + return self.converter_builder.get().convert(event) if self.converter_builder else event + def insert(self, event_or_events=None): # event_or_events = [ # event: {