Skip to content

Commit 686718b

Browse files
authored
support batch for bulk (#94)
1 parent 8ca851e commit 686718b

File tree

12 files changed

+180
-59
lines changed

12 files changed

+180
-59
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-elasticsearch:
66
go build -o bin/go-mysql-elasticsearch ./cmd/go-mysql-elasticsearch
77

88
test:
9-
go test --race ./...
9+
go test -timeout 1m --race ./...
1010

1111
clean:
1212
go clean -i ./...

cmd/go-mysql-elasticsearch/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,14 @@ func main() {
8282
return
8383
}
8484

85-
r.Run()
85+
r.Start()
86+
87+
select {
88+
case n := <-sc:
89+
log.Infof("receive signal %v, closing", n)
90+
case <-r.Ctx().Done():
91+
log.Infof("context is done with %v, closing", r.Ctx().Err())
92+
}
8693

87-
<-sc
8894
r.Close()
8995
}

etc/river.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ es_addr = "127.0.0.1:9200"
1010
# Path to store data, like master.info, if not set or empty,
1111
# we must use this to support breakpoint resume syncing.
1212
# TODO: support other storage, like etcd.
13-
data_dir = ""
13+
data_dir = "./var"
1414

1515
# Inner Http status address
1616
stat_addr = "127.0.0.1:12800"
@@ -25,6 +25,12 @@ flavor = "mysql"
2525
# if not set or empty, ignore mysqldump.
2626
mysqldump = "mysqldump"
2727

28+
# minimal items to be inserted in one bulk
29+
bulk_size = 128
30+
31+
# force flush the pending requests if we don't have enough items >= bulk_size
32+
flush_bulk_time = "200ms"
33+
2834
# MySQL data source
2935
[[source]]
3036
schema = "test"

glide.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

glide.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import:
77
- package: github.com/satori/go.uuid
88
version: ^1.1.0
99
- package: github.com/siddontang/go-mysql
10-
version: 2d709954ac23decc41a0168d33339e5ab64b9c1a
10+
version: d6523c91375d1d31abfbc9e5dce3a70e15dbb2bb
1111
subpackages:
1212
- canal
1313
- client

river/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package river
22

33
import (
44
"io/ioutil"
5+
"time"
56

67
"github.com/BurntSushi/toml"
78
"github.com/juju/errors"
@@ -30,6 +31,10 @@ type Config struct {
3031
Sources []SourceConfig `toml:"source"`
3132

3233
Rules []*Rule `toml:"rule"`
34+
35+
BulkSize int `toml:"bulk_size"`
36+
37+
FlushBulkTime TomlDuration `toml:"flush_bulk_time"`
3338
}
3439

3540
func NewConfigWithFile(name string) (*Config, error) {
@@ -51,3 +56,13 @@ func NewConfig(data string) (*Config, error) {
5156

5257
return &c, nil
5358
}
59+
60+
type TomlDuration struct {
61+
time.Duration
62+
}
63+
64+
func (d *TomlDuration) UnmarshalText(text []byte) error {
65+
var err error
66+
d.Duration, err = time.ParseDuration(string(text))
67+
return err
68+
}

river/river.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import (
77

88
"github.com/juju/errors"
99
"github.com/ngaut/log"
10-
"github.com/siddontang/go-mysql/canal"
11-
1210
"github.com/siddontang/go-mysql-elasticsearch/elastic"
11+
"github.com/siddontang/go-mysql/canal"
12+
"golang.org/x/net/context"
1313
)
1414

1515
// In Elasticsearch, river is a pluggable service within Elasticsearch pulling data then indexing it into Elasticsearch.
@@ -22,24 +22,27 @@ type River struct {
2222

2323
rules map[string]*Rule
2424

25-
quit chan struct{}
26-
wg sync.WaitGroup
25+
ctx context.Context
26+
cancel context.CancelFunc
27+
28+
wg sync.WaitGroup
2729

2830
es *elastic.Client
2931

3032
st *stat
3133

3234
master *masterInfo
35+
36+
syncCh chan interface{}
3337
}
3438

3539
func NewRiver(c *Config) (*River, error) {
3640
r := new(River)
3741

3842
r.c = c
39-
40-
r.quit = make(chan struct{})
41-
4243
r.rules = make(map[string]*Rule)
44+
r.syncCh = make(chan interface{}, 4096)
45+
r.ctx, r.cancel = context.WithCancel(context.Background())
4346

4447
var err error
4548
if r.master, err = loadMasterInfo(c.DataDir); err != nil {
@@ -239,7 +242,10 @@ func ruleKey(schema string, table string) string {
239242
return fmt.Sprintf("%s:%s", schema, table)
240243
}
241244

242-
func (r *River) Run() error {
245+
func (r *River) Start() error {
246+
r.wg.Add(1)
247+
go r.syncLoop()
248+
243249
pos := r.master.Position()
244250
if err := r.canal.StartFrom(pos); err != nil {
245251
log.Errorf("start canal err %v", err)
@@ -249,9 +255,14 @@ func (r *River) Run() error {
249255
return nil
250256
}
251257

258+
func (r *River) Ctx() context.Context {
259+
return r.ctx
260+
}
261+
252262
func (r *River) Close() {
253263
log.Infof("closing river")
254-
close(r.quit)
264+
265+
r.cancel()
255266

256267
r.canal.Close()
257268

river/river_extra_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ func (s *riverTestSuite) setupExtra(c *C) (r *River) {
4646
cfg.DumpExec = "mysqldump"
4747

4848
cfg.StatAddr = "127.0.0.1:12800"
49+
cfg.BulkSize = 1
50+
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}
4951

5052
os.RemoveAll(cfg.DataDir)
5153

@@ -111,15 +113,14 @@ func (s *riverTestSuite) TestRiverWithParent(c *C) {
111113

112114
s.testPrepareExtraData(c)
113115

114-
go river.Run()
116+
river.Start()
115117

116-
<-river.canal.WaitDumpDone()
118+
testWaitSyncDone(c, river)
117119

118120
s.testElasticExtraExists(c, "1", "1", true)
119121

120122
s.testExecute(c, "DELETE FROM test_river_extra WHERE id = ?", 1)
121-
err := river.canal.CatchMasterPos(3 * time.Second)
122-
c.Assert(err, IsNil)
123+
testWaitSyncDone(c, river)
123124

124125
s.testElasticExtraExists(c, "1", "1", false)
125126
}

river/river_test.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
6767
cfg.DumpExec = "mysqldump"
6868

6969
cfg.StatAddr = "127.0.0.1:12800"
70+
cfg.BulkSize = 1
71+
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}
7072

7173
os.RemoveAll(cfg.DataDir)
7274

@@ -177,17 +179,29 @@ func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response {
177179
return r
178180
}
179181

180-
func (s *riverTestSuite) testWaitSyncDone(c *C) {
181-
err := s.r.canal.CatchMasterPos(10 * time.Second)
182+
func testWaitSyncDone(c *C, r *River) {
183+
<-r.canal.WaitDumpDone()
184+
185+
err := r.canal.CatchMasterPos(10 * time.Second)
182186
c.Assert(err, IsNil)
187+
188+
for i := 0; i < 1000; i++ {
189+
if len(r.syncCh) == 0 {
190+
return
191+
}
192+
193+
time.Sleep(10 * time.Millisecond)
194+
}
195+
196+
c.Fatalf("wait 1s but still have %d items to be synced", len(r.syncCh))
183197
}
184198

185199
func (s *riverTestSuite) TestRiver(c *C) {
186200
s.testPrepareData(c)
187201

188-
go s.r.Run()
202+
s.r.Start()
189203

190-
<-s.r.canal.WaitDumpDone()
204+
testWaitSyncDone(c, s.r)
191205

192206
var r *elastic.Response
193207
r = s.testElasticGet(c, "1")
@@ -219,7 +233,7 @@ func (s *riverTestSuite) TestRiver(c *C) {
219233
s.testExecute(c, fmt.Sprintf("UPDATE %s SET title = ? WHERE id = ?", table), "hello", 5+i)
220234
}
221235

222-
s.testWaitSyncDone(c)
236+
testWaitSyncDone(c, s.r)
223237

224238
r = s.testElasticGet(c, "1")
225239
c.Assert(r.Found, Equals, false)

river/sync.go

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"reflect"
77
"strings"
8+
"time"
89

910
"github.com/juju/errors"
1011
"github.com/ngaut/log"
@@ -25,6 +26,11 @@ const (
2526
fieldTypeList = "list"
2627
)
2728

29+
type posSaver struct {
30+
pos mysql.Position
31+
force bool
32+
}
33+
2834
type eventHandler struct {
2935
r *River
3036
}
@@ -35,15 +41,19 @@ func (h *eventHandler) OnRotate(e *replication.RotateEvent) error {
3541
uint32(e.Position),
3642
}
3743

38-
return h.r.master.Save(pos)
44+
h.r.syncCh <- posSaver{pos, true}
45+
46+
return h.r.ctx.Err()
3947
}
4048

4149
func (h *eventHandler) OnDDL(nextPos mysql.Position, _ *replication.QueryEvent) error {
42-
return h.r.master.Save(nextPos)
50+
h.r.syncCh <- posSaver{nextPos, true}
51+
return h.r.ctx.Err()
4352
}
4453

4554
func (h *eventHandler) OnXID(nextPos mysql.Position) error {
46-
return h.r.master.Save(nextPos)
55+
h.r.syncCh <- posSaver{nextPos, false}
56+
return h.r.ctx.Err()
4757
}
4858

4959
func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
@@ -62,20 +72,88 @@ func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
6272
case canal.UpdateAction:
6373
reqs, err = h.r.makeUpdateRequest(rule, e.Rows)
6474
default:
65-
return errors.Errorf("invalid rows action %s", e.Action)
75+
err = errors.Errorf("invalid rows action %s", e.Action)
6676
}
6777

6878
if err != nil {
69-
return errors.Errorf("make %s ES request err %v", e.Action, err)
79+
h.r.cancel()
80+
return errors.Errorf("make %s ES request err %v, close sync", e.Action, err)
7081
}
7182

72-
return h.r.doBulk(reqs)
83+
h.r.syncCh <- reqs
84+
85+
return h.r.ctx.Err()
7386
}
7487

7588
func (h *eventHandler) String() string {
7689
return "ESRiverEventHandler"
7790
}
7891

92+
func (r *River) syncLoop() {
93+
bulkSize := r.c.BulkSize
94+
if bulkSize == 0 {
95+
bulkSize = 128
96+
}
97+
98+
interval := r.c.FlushBulkTime.Duration
99+
if interval == 0 {
100+
interval = 200 * time.Millisecond
101+
}
102+
103+
ticker := time.NewTicker(interval)
104+
defer ticker.Stop()
105+
defer r.wg.Done()
106+
107+
lastSavedTime := time.Now()
108+
reqs := make([]*elastic.BulkRequest, 0, 1024)
109+
110+
var pos mysql.Position
111+
112+
for {
113+
needFlush := false
114+
needSavePos := false
115+
116+
select {
117+
case v := <-r.syncCh:
118+
switch v := v.(type) {
119+
case posSaver:
120+
now := time.Now()
121+
if v.force || now.Sub(lastSavedTime) > 3*time.Second {
122+
lastSavedTime = now
123+
needFlush = true
124+
needSavePos = true
125+
pos = v.pos
126+
}
127+
case []*elastic.BulkRequest:
128+
reqs = append(reqs, v...)
129+
needFlush = len(reqs) >= bulkSize
130+
}
131+
case <-ticker.C:
132+
needFlush = true
133+
case <-r.ctx.Done():
134+
return
135+
}
136+
137+
if needFlush {
138+
// TODO: retry some times?
139+
if err := r.doBulk(reqs); err != nil {
140+
log.Errorf("do ES bulk err %v, close sync", err)
141+
r.cancel()
142+
return
143+
}
144+
reqs = reqs[0:0]
145+
}
146+
147+
if needSavePos {
148+
if err := r.master.Save(pos); err != nil {
149+
log.Errorf("save sync position %s err %v, close sync", pos, err)
150+
r.cancel()
151+
return
152+
}
153+
}
154+
}
155+
}
156+
79157
// for insert and delete
80158
func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]*elastic.BulkRequest, error) {
81159
reqs := make([]*elastic.BulkRequest, 0, len(rows))

0 commit comments

Comments
 (0)