Skip to content

Commit 646fe4f

Browse files
committed
csv append files and csv pooler
1 parent 0357947 commit 646fe4f

File tree

3 files changed

+15
-4
lines changed

3 files changed

+15
-4
lines changed

src/pumper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def write_rows_event(self, event=None):
2727
pass
2828

2929
def write_rows_event_each_row(self, event=None):
30-
self.writer.insert(event_or_events=event)
30+
self.writer.insert(event)
3131

3232
def reader_idle_event(self):
3333
self.writer.flush()

src/reader/mysqlreader.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ def read(self):
7474
event.table = mysql_event.table
7575
self.fire('WriteRowsEvent', event=event)
7676
for row in mysql_event.rows:
77+
event = Event()
78+
event.schema = mysql_event.schema
79+
event.table = mysql_event.table
7780
event.row = row['values']
7881
self.fire('WriteRowsEvent.EachRow', event=event)
7982
else:

src/writer/csvwriter.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
from .writer import Writer
55
from ..event.event import Event
66
import csv
7+
import os.path
78

89

910
class CSVWriter(Writer):
1011

1112
file = None
1213
path = None
1314
writer = None
15+
header_written = False
1416

1517
def __init__(self, csv_file_path):
1618
self.path = csv_file_path
@@ -20,7 +22,12 @@ def opened(self):
2022

2123
def open(self):
2224
if not self.opened():
23-
self.file = open(self.path, 'w')
25+
# do not write header to already existing file
26+
# assume it was written earlier
27+
if os.path.isfile(self.path):
28+
self.header_written = True
29+
# open file for write-at-the-end mode
30+
self.file = open(self.path, 'a+')
2431

2532
def insert(self, event_or_events):
2633
# event_or_events = [
@@ -49,8 +56,9 @@ def insert(self, event_or_events):
4956
self.open()
5057

5158
if not self.writer:
52-
self.writer = csv.DictWriter(self.file, fieldnames=event_or_events[0].row.keys())
53-
self.writer.writeheader()
59+
self.writer = csv.DictWriter(self.file, fieldnames=sorted(event_or_events[0].row.keys()))
60+
if not self.header_written:
61+
self.writer.writeheader()
5462

5563
for event in event_or_events:
5664
self.writer.writerow(event.row)

0 commit comments

Comments
 (0)