Skip to content

Commit 84bc7a1

Browse files
authored
Merge pull request #25 from sunsingerus/master
pool chain
2 parents 6f3712b + 646fe4f commit 84bc7a1

File tree

10 files changed

+259
-166
lines changed

10 files changed

+259
-166
lines changed

src/cliopts.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ def config():
5858
default=60,
5959
help='max seconds num between flushes'
6060
)
61+
argparser.add_argument(
62+
'--csvpool-file-path-prefix',
63+
type=str,
64+
default=None,
65+
help='file path prefix to CSV pool files'
66+
)
6167

6268
argparser.add_argument(
6369
'--src-server-id',
@@ -173,6 +179,7 @@ def config():
173179
'mempool': args.mempool,
174180
'mempool-max-events-num': args.mempool_max_events_num,
175181
'mempool-max-flush-interval': args.mempool_max_flush_interval,
182+
'csvpool_file_path_prefix': args.csvpool_file_path_prefix,
176183
},
177184

178185
'reader-config': {

src/pool.py

Lines changed: 0 additions & 100 deletions
This file was deleted.

src/pool/bbpool.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import time
5+
6+
from .pool import Pool
7+
8+
9+
# Buckets Belts' Index Builder
10+
class BBIndexBuilder(object):
11+
12+
def build(self, item):
13+
# build key of the belt on which to place item
14+
return str(item.schema) + '.' + str(item.table)
15+
16+
17+
# Buckets Belts Pool
18+
class BBPool(Pool):
19+
20+
# buckets on the belts
21+
belts = {
22+
# pour data into 0-index bucket
23+
# 'key.1': [[item,], [item, item, item,], [item, item, item,]]
24+
# 'key.2': [[item,], [item, item, item,], [item, item, item,]]
25+
}
26+
27+
belts_rotated_at = {
28+
# 'key.1': UNIX TIMESTAMP
29+
# 'key.2': UNIX TIMESTAMP
30+
}
31+
32+
def __init__(
33+
self,
34+
writer_class=None,
35+
writer_params={},
36+
key_builder_class=None,
37+
max_bucket_size=10000,
38+
max_belt_size=1,
39+
max_interval_between_rotations=60,
40+
):
41+
super().__init__(
42+
writer_class=writer_class,
43+
writer_params=writer_params,
44+
key_builder_class=BBIndexBuilder,
45+
max_bucket_size=max_bucket_size,
46+
max_belt_size=max_belt_size,
47+
max_interval_between_rotations=max_interval_between_rotations,
48+
)
49+
50+
def create_belt(self, belt_index):
51+
# create belt with one empty bucket
52+
self.belts[belt_index] = [[]]
53+
self.belts_rotated_at[belt_index] = int(time.time())
54+
55+
def insert(self, item):
56+
# which belt we'll insert item?
57+
belt_index = self.key_builder.build(item)
58+
59+
# register belt if not yet
60+
if belt_index not in self.belts:
61+
self.create_belt(belt_index)
62+
63+
# append item to the 0-indexed bucket of the specified belt
64+
self.belts[belt_index][0].append(item)
65+
66+
# may be bucket is already full
67+
if len(self.belts[belt_index][0]) >= self.max_bucket_size:
68+
# bucket full, rotate the belt
69+
self.rotate_belt(belt_index)
70+
71+
def flush(self, key=None):
72+
belt_index = key
73+
empty_belts_indexes = []
74+
75+
if belt_index is None:
76+
for b_index in self.belts:
77+
if self.rotate_belt(b_index, flush=True):
78+
empty_belts_indexes.append(b_index)
79+
else:
80+
if self.rotate_belt(belt_index, flush=True):
81+
empty_belts_indexes.append(belt_index)
82+
83+
# delete belt
84+
for b_index in empty_belts_indexes:
85+
self.belts.pop(b_index)
86+
self.belts_rotated_at.pop(b_index)
87+
88+
def rotate_belt(self, belt_index, flush=False):
89+
now = int(time.time())
90+
need_rotation = True if flush else False
91+
rotate_by = "FLUSH"
92+
93+
if len(self.belts[belt_index][0]) >= self.max_bucket_size:
94+
# 0-index bucket is full
95+
need_rotation = True
96+
rotate_by = "SIZE"
97+
98+
elif now >= self.belts_rotated_at[belt_index] + self.max_interval_between_rotations:
99+
# time interval reached
100+
need_rotation = True
101+
rotate_by = "TIME"
102+
103+
if not need_rotation:
104+
# belt not rotated
105+
return False
106+
107+
# belts needs rotation
108+
109+
# insert empty bucket into the beginning of the belt
110+
self.belts[belt_index].insert(0, [])
111+
self.belts_rotated_at[belt_index] = now
112+
113+
# in case we flush belt we'll keep one just inserted empty bucket
114+
buckets_num_left_on_belt = 1 if flush else self.max_belt_size
115+
116+
while len(self.belts[belt_index]) > buckets_num_left_on_belt:
117+
# too many buckets on the belt
118+
119+
buckets_num = len(self.belts[belt_index])
120+
last_bucket_size = len(self.belts[belt_index][buckets_num-1])
121+
print(now, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts))
122+
123+
# time to flush data for specified key
124+
writer = self.writer_class(**self.writer_params)
125+
writer.insert(self.belts[belt_index].pop())
126+
del writer
127+
128+
# belt rotated
129+
return True

src/pool/pool.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
5+
class Pool(object):
6+
7+
writer_class = None
8+
writer_params = None
9+
10+
key_builder_class = None
11+
key_builder = None
12+
13+
max_bucket_size = None
14+
max_belt_size = None
15+
max_interval_between_rotations = None
16+
17+
def __init__(
18+
self,
19+
writer_class=None,
20+
writer_params={},
21+
key_builder_class=None,
22+
max_bucket_size=10000,
23+
max_belt_size=1,
24+
max_interval_between_rotations=60,
25+
):
26+
self.writer_class = writer_class
27+
self.writer_params = writer_params
28+
29+
self.key_builder_class = key_builder_class
30+
self.key_builder = self.key_builder_class()
31+
32+
self.max_bucket_size = max_bucket_size
33+
self.max_belt_size = max_belt_size
34+
self.max_interval_between_rotations = max_interval_between_rotations
35+
36+
def insert(self, item):
37+
pass
38+
39+
def flush(self, key=None):
40+
pass

src/pumper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def write_rows_event(self, event=None):
2727
pass
2828

2929
def write_rows_event_each_row(self, event=None):
30-
self.writer.insert(event=event)
30+
self.writer.insert(event)
3131

3232
def reader_idle_event(self):
3333
self.writer.flush()

src/reader/mysqlreader.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ def read(self):
7474
event.table = mysql_event.table
7575
self.fire('WriteRowsEvent', event=event)
7676
for row in mysql_event.rows:
77+
event = Event()
78+
event.schema = mysql_event.schema
79+
event.table = mysql_event.table
7780
event.row = row['values']
7881
self.fire('WriteRowsEvent.EachRow', event=event)
7982
else:
@@ -85,11 +88,15 @@ def read(self):
8588

8689
# blocking
8790
self.fire('ReaderIdleEvent')
91+
time.sleep(1)
8892

8993
except KeyboardInterrupt:
9094
pass
9195

92-
self.binlog_stream.close()
96+
try:
97+
self.binlog_stream.close()
98+
except:
99+
pass
93100
end_timestamp = int(time.time())
94101

95102
print('start', start_timestamp)

0 commit comments

Comments
 (0)