Skip to content

Commit ba4be81

Browse files
authored
Merge pull request #26 from sunsingerus/master
pooling writers
2 parents 84bc7a1 + 65836b7 commit ba4be81

File tree

14 files changed

+245
-18
lines changed

14 files changed

+245
-18
lines changed

run.sh

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/bin/bash
2+
3+
python3 main.py \
4+
--src-resume --src-wait \
5+
--src-host=127.0.0.1 --src-user=reader --src-password=qwerty \
6+
--dst-host=192.168.74.251 \
7+
--dst-db=db --dst-table=datatypes \
8+
--mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \
9+
--csvpool --csvpool-file-path-prefix=qwe \
10+
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03
11+
12+
# --dst-file=dst.csv
13+
# --csvpool-keep-files

src/cliopts.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,31 @@
77

88
class CLIOpts(object):
99

10+
@staticmethod
11+
def join(lists_to_join):
12+
# lists_to_join contains something like
13+
# [['a=b', 'c=d'], ['e=f', 'z=x'], ]
14+
if not isinstance(lists_to_join, list):
15+
return None
16+
17+
res = {}
18+
for lst in lists_to_join:
19+
# lst = ['a=b', 'c=d']
20+
for column_value_pair in lst:
21+
# value = 'a=b'
22+
column, value = column_value_pair.split('=', 2)
23+
res[column] = value
24+
25+
# dict {
26+
# 'col1': 'value1',
27+
# 'col2': 'value2',
28+
# }
29+
30+
if len(res) > 0:
31+
return res
32+
else:
33+
return None
34+
1035
@staticmethod
1136
def config():
1237
"""
@@ -58,12 +83,22 @@ def config():
5883
default=60,
5984
help='max seconds num between flushes'
6085
)
86+
argparser.add_argument(
87+
'--csvpool',
88+
action='store_true',
89+
help='Cache data in csv files.'
90+
)
6191
argparser.add_argument(
6292
'--csvpool-file-path-prefix',
6393
type=str,
64-
default=None,
94+
default='/tmp/csvpool',
6595
help='file path prefix to CSV pool files'
6696
)
97+
argparser.add_argument(
98+
'--csvpool-keep-files',
99+
action='store_true',
100+
help='Keep pool csv files.'
101+
)
67102

68103
argparser.add_argument(
69104
'--src-server-id',
@@ -167,6 +202,15 @@ def config():
167202
help='Table to be used when writing to dst'
168203
)
169204

205+
argparser.add_argument(
206+
'--csv-column-default-value',
207+
type=str,
208+
nargs='*',
209+
action='append',
210+
default=None,
211+
help='Table to be used when writing to dst'
212+
)
213+
170214
args = argparser.parse_args()
171215

172216
# build options
@@ -179,7 +223,16 @@ def config():
179223
'mempool': args.mempool,
180224
'mempool-max-events-num': args.mempool_max_events_num,
181225
'mempool-max-flush-interval': args.mempool_max_flush_interval,
182-
'csvpool_file_path_prefix': args.csvpool_file_path_prefix,
226+
'csvpool': args.csvpool,
227+
},
228+
229+
'converter-config': {
230+
'clickhouse': {
231+
232+
},
233+
'csv': {
234+
'column_default_value': CLIOpts.join(args.csv_column_default_value),
235+
},
183236
},
184237

185238
'reader-config': {
@@ -214,6 +267,11 @@ def config():
214267
},
215268
'file': {
216269
'csv_file_path': args.dst_file,
270+
'csv_file_path_prefix': args.csvpool_file_path_prefix,
271+
'csv_file_path_suffix_parts': [],
272+
'csv_keep_file': args.csvpool_keep_files,
273+
'dst_db': args.dst_db,
274+
'dst_table': args.dst_table,
217275
},
218276
},
219277
})

src/config.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33

44
from .reader.mysqlreader import MySQLReader
55
from .reader.csvreader import CSVReader
6+
67
from .writer.chwriter import CHWriter
78
from .writer.csvwriter import CSVWriter
9+
from .writer.chcsvwriter import CHCSVWriter
810
from .writer.poolwriter import PoolWriter
911

12+
from .converter.csvwriteconverter import CSVWriteConverter
13+
1014

1115
class Config(object):
1216

@@ -37,8 +41,17 @@ def reader(self):
3741
return MySQLReader(**self.config['reader-config']['mysql'])
3842

3943
def writer_class(self):
40-
if self.config['writer-config']['file']['csv_file_path']:
44+
45+
if self.config['app-config']['csvpool']:
46+
return CSVWriter, {
47+
**self.config['writer-config']['file'],
48+
'next': CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings']),
49+
'converter': CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value']) if self.config['converter-config']['csv']['column_default_value'] else None,
50+
}
51+
52+
elif self.config['writer-config']['file']['csv_file_path']:
4153
return CSVWriter, self.config['writer-config']['file']
54+
4255
else:
4356
return CHWriter, self.config['writer-config']['clickhouse']
4457

src/converter/chdatatypeconverter.py renamed to src/converter/chwriteconverter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import decimal
88

99

10-
class CHDataTypeConverter(Converter):
10+
class CHWriteConverter(Converter):
1111

1212
delete_empty_columns = False
1313

src/converter/csvemptyvalueconverter.py renamed to src/converter/csvreadconverter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import ast
66

77

8-
class CSVEmptyValueConverter(Converter):
8+
class CSVReadConverter(Converter):
99

1010
def convert(self, event):
1111
for column in event.row:

src/converter/csvwriteconverter.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
from .converter import Converter
5+
6+
7+
class CSVWriteConverter(Converter):
8+
9+
defaults = None
10+
11+
def __init__(self, defaults={}):
12+
self.defaults = defaults
13+
14+
15+
def convert(self, event):
16+
if not self.defaults:
17+
return event
18+
19+
for column in event.row:
20+
if column in self.defaults and event.row[column] is None:
21+
event.row[column] = self.defaults[column]
22+
return event

src/event/event.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,8 @@ class Event(object):
1212

1313
# {'id':1, 'col1':1}
1414
row = None
15+
16+
file = None
17+
18+
# ['id', 'col1', 'col2']
19+
fieldnames = None

src/pool/bbpool.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class BBPool(Pool):
2929
# 'key.2': UNIX TIMESTAMP
3030
}
3131

32+
buckets_count = 0
33+
3234
def __init__(
3335
self,
3436
writer_class=None,
@@ -115,14 +117,20 @@ def rotate_belt(self, belt_index, flush=False):
115117

116118
while len(self.belts[belt_index]) > buckets_num_left_on_belt:
117119
# too many buckets on the belt
120+
# time to rotate belt and flush the most-right-bucket
121+
self.buckets_count += 1
118122

119123
buckets_num = len(self.belts[belt_index])
120124
last_bucket_size = len(self.belts[belt_index][buckets_num-1])
121-
print(now, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts))
125+
print(now, self.buckets_count, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts))
122126

123127
# time to flush data for specified key
128+
self.writer_params['csv_file_path_suffix_parts'] = [str(now), str(self.buckets_count)]
124129
writer = self.writer_class(**self.writer_params)
125130
writer.insert(self.belts[belt_index].pop())
131+
writer.close()
132+
writer.push()
133+
writer.destroy()
126134
del writer
127135

128136
# belt rotated

src/reader/csvreader.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from .reader import Reader
55
from ..event.event import Event
6-
from ..converter.csvemptyvalueconverter import CSVEmptyValueConverter
6+
from ..converter.csvreadconverter import CSVReadConverter
77
import csv
88
import os
99

@@ -17,8 +17,8 @@ class CSVReader(Reader):
1717
has_header = False
1818
reader = None
1919

20-
def __init__(self, csv_file_path, callbacks={}):
21-
super().__init__(callbacks=callbacks)
20+
def __init__(self, csv_file_path, converter=None, callbacks={}):
21+
super().__init__(converter=converter, callbacks=callbacks)
2222

2323
self.csv_file_path = csv_file_path
2424
self.csvfile = open(self.csv_file_path)
@@ -44,8 +44,7 @@ def read(self):
4444
self.fire('WriteRowsEvent', event=event)
4545
for row in self.reader:
4646
event.row = row
47-
converter = CSVEmptyValueConverter()
48-
self.fire('WriteRowsEvent.EachRow', event=converter.convert(event))
47+
self.fire('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event)
4948
except KeyboardInterrupt:
5049
pass
5150

src/reader/reader.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
class Reader(object):
66

7+
converter = None
8+
79
callbacks = {
810
# called on each WriteRowsEvent
911
'WriteRowsEvent': [],
@@ -15,7 +17,8 @@ class Reader(object):
1517
'ReaderIdleEvent': [],
1618
}
1719

18-
def __init__(self, callbacks={}):
20+
def __init__(self, converter=None, callbacks={}):
21+
self.converter = converter
1922
self.subscribe(callbacks)
2023

2124
def subscribe(self, callbacks):

src/writer/chcsvwriter.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import os
5+
6+
class CHCSVWriter(object):
7+
8+
host = None
9+
port = None
10+
user = None
11+
password = None
12+
13+
def __init__(self, host=None, port=None, user=None, password=None):
14+
self.host = host
15+
self.port = port
16+
self.user = user
17+
self.password = password
18+
19+
def insert(self, event_or_events=None):
20+
# event_or_events = [
21+
# event: {
22+
# row: {'id': 3, 'a': 3}
23+
# },
24+
# event: {
25+
# row: {'id': 3, 'a': 3}
26+
# },
27+
# ]
28+
29+
for event in event_or_events:
30+
sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format(
31+
event.schema,
32+
event.table,
33+
', '.join(map(lambda column: '`%s`' % column, event.fieldnames)),
34+
)
35+
36+
choptions = ""
37+
if self.host:
38+
choptions += " --host=" + self.host
39+
if self.port:
40+
choptions += " --port=" + str(self.port)
41+
if self.user:
42+
choptions += " --user=" + self.user
43+
if self.password:
44+
choptions += " --password=" + self.password
45+
bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format(
46+
event.file,
47+
choptions,
48+
sql,
49+
)
50+
51+
print('running:', bash)
52+
os.system(bash)
53+
54+
pass

src/writer/chwriter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from clickhouse_driver.client import Client
55
from .writer import Writer
66
from ..event.event import Event
7-
from ..converter.chdatatypeconverter import CHDataTypeConverter
7+
from ..converter.chwriteconverter import CHWriteConverter
88

99

1010
class CHWriter(Writer):
@@ -41,7 +41,7 @@ def insert(self, event_or_events=None):
4141
# event_or_events is instance of Event
4242
event_or_events = [event_or_events]
4343

44-
converter = CHDataTypeConverter()
44+
converter = CHWriteConverter()
4545

4646
values = []
4747
ev = None

0 commit comments

Comments
 (0)