Skip to content

support batch for bulk #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-elasticsearch:
go build -o bin/go-mysql-elasticsearch ./cmd/go-mysql-elasticsearch

test:
go test --race ./...
go test -timeout 1m --race ./...

clean:
go clean -i ./...
Expand Down
10 changes: 8 additions & 2 deletions cmd/go-mysql-elasticsearch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,14 @@ func main() {
return
}

r.Run()
r.Start()

select {
case n := <-sc:
log.Infof("receive signal %v, closing", n)
case <-r.Ctx().Done():
log.Infof("context is done with %v, closing", r.Ctx().Err())
}

<-sc
r.Close()
}
8 changes: 7 additions & 1 deletion etc/river.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ es_addr = "127.0.0.1:9200"
# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
data_dir = ""
data_dir = "./var"

# Inner Http status address
stat_addr = "127.0.0.1:12800"
Expand All @@ -25,6 +25,12 @@ flavor = "mysql"
# if not set or empty, ignore mysqldump.
mysqldump = "mysqldump"

# minimal items to be inserted in one bulk
bulk_size = 128

# force flush the pending requests if we don't have enough items >= bulk_size
flush_bulk_time = "200ms"

# MySQL data source
[[source]]
schema = "test"
Expand Down
6 changes: 3 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import:
- package: github.com/satori/go.uuid
version: ^1.1.0
- package: github.com/siddontang/go-mysql
version: 2d709954ac23decc41a0168d33339e5ab64b9c1a
version: d6523c91375d1d31abfbc9e5dce3a70e15dbb2bb
subpackages:
- canal
- client
Expand Down
15 changes: 15 additions & 0 deletions river/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package river

import (
"io/ioutil"
"time"

"github.com/BurntSushi/toml"
"github.com/juju/errors"
Expand Down Expand Up @@ -30,6 +31,10 @@ type Config struct {
Sources []SourceConfig `toml:"source"`

Rules []*Rule `toml:"rule"`

BulkSize int `toml:"bulk_size"`

FlushBulkTime TomlDuration `toml:"flush_bulk_time"`
}

func NewConfigWithFile(name string) (*Config, error) {
Expand All @@ -51,3 +56,13 @@ func NewConfig(data string) (*Config, error) {

return &c, nil
}

type TomlDuration struct {
time.Duration
}

func (d *TomlDuration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}
29 changes: 20 additions & 9 deletions river/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go-mysql/canal"

"github.com/siddontang/go-mysql-elasticsearch/elastic"
"github.com/siddontang/go-mysql/canal"
"golang.org/x/net/context"
)

// In Elasticsearch, river is a pluggable service within Elasticsearch pulling data then indexing it into Elasticsearch.
Expand All @@ -22,24 +22,27 @@ type River struct {

rules map[string]*Rule

quit chan struct{}
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc

wg sync.WaitGroup

es *elastic.Client

st *stat

master *masterInfo

syncCh chan interface{}
}

func NewRiver(c *Config) (*River, error) {
r := new(River)

r.c = c

r.quit = make(chan struct{})

r.rules = make(map[string]*Rule)
r.syncCh = make(chan interface{}, 4096)
r.ctx, r.cancel = context.WithCancel(context.Background())

var err error
if r.master, err = loadMasterInfo(c.DataDir); err != nil {
Expand Down Expand Up @@ -239,7 +242,10 @@ func ruleKey(schema string, table string) string {
return fmt.Sprintf("%s:%s", schema, table)
}

func (r *River) Run() error {
func (r *River) Start() error {
r.wg.Add(1)
go r.syncLoop()

pos := r.master.Position()
if err := r.canal.StartFrom(pos); err != nil {
log.Errorf("start canal err %v", err)
Expand All @@ -249,9 +255,14 @@ func (r *River) Run() error {
return nil
}

func (r *River) Ctx() context.Context {
return r.ctx
}

func (r *River) Close() {
log.Infof("closing river")
close(r.quit)

r.cancel()

r.canal.Close()

Expand Down
9 changes: 5 additions & 4 deletions river/river_extra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (s *riverTestSuite) setupExtra(c *C) (r *River) {
cfg.DumpExec = "mysqldump"

cfg.StatAddr = "127.0.0.1:12800"
cfg.BulkSize = 1
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}

os.RemoveAll(cfg.DataDir)

Expand Down Expand Up @@ -111,15 +113,14 @@ func (s *riverTestSuite) TestRiverWithParent(c *C) {

s.testPrepareExtraData(c)

go river.Run()
river.Start()

<-river.canal.WaitDumpDone()
testWaitSyncDone(c, river)

s.testElasticExtraExists(c, "1", "1", true)

s.testExecute(c, "DELETE FROM test_river_extra WHERE id = ?", 1)
err := river.canal.CatchMasterPos(3 * time.Second)
c.Assert(err, IsNil)
testWaitSyncDone(c, river)

s.testElasticExtraExists(c, "1", "1", false)
}
24 changes: 19 additions & 5 deletions river/river_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
cfg.DumpExec = "mysqldump"

cfg.StatAddr = "127.0.0.1:12800"
cfg.BulkSize = 1
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}

os.RemoveAll(cfg.DataDir)

Expand Down Expand Up @@ -177,17 +179,29 @@ func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response {
return r
}

func (s *riverTestSuite) testWaitSyncDone(c *C) {
err := s.r.canal.CatchMasterPos(10 * time.Second)
func testWaitSyncDone(c *C, r *River) {
<-r.canal.WaitDumpDone()

err := r.canal.CatchMasterPos(10 * time.Second)
c.Assert(err, IsNil)

for i := 0; i < 1000; i++ {
if len(r.syncCh) == 0 {
return
}

time.Sleep(10 * time.Millisecond)
}

c.Fatalf("wait 1s but still have %d items to be synced", len(r.syncCh))
}

func (s *riverTestSuite) TestRiver(c *C) {
s.testPrepareData(c)

go s.r.Run()
s.r.Start()

<-s.r.canal.WaitDumpDone()
testWaitSyncDone(c, s.r)

var r *elastic.Response
r = s.testElasticGet(c, "1")
Expand Down Expand Up @@ -219,7 +233,7 @@ func (s *riverTestSuite) TestRiver(c *C) {
s.testExecute(c, fmt.Sprintf("UPDATE %s SET title = ? WHERE id = ?", table), "hello", 5+i)
}

s.testWaitSyncDone(c)
testWaitSyncDone(c, s.r)

r = s.testElasticGet(c, "1")
c.Assert(r.Found, Equals, false)
Expand Down
90 changes: 84 additions & 6 deletions river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
Expand All @@ -25,6 +26,11 @@ const (
fieldTypeList = "list"
)

type posSaver struct {
pos mysql.Position
force bool
}

type eventHandler struct {
r *River
}
Expand All @@ -35,15 +41,19 @@ func (h *eventHandler) OnRotate(e *replication.RotateEvent) error {
uint32(e.Position),
}

return h.r.master.Save(pos)
h.r.syncCh <- posSaver{pos, true}

return h.r.ctx.Err()
}

func (h *eventHandler) OnDDL(nextPos mysql.Position, _ *replication.QueryEvent) error {
return h.r.master.Save(nextPos)
h.r.syncCh <- posSaver{nextPos, true}
return h.r.ctx.Err()
}

func (h *eventHandler) OnXID(nextPos mysql.Position) error {
return h.r.master.Save(nextPos)
h.r.syncCh <- posSaver{nextPos, false}
return h.r.ctx.Err()
}

func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
Expand All @@ -62,20 +72,88 @@ func (h *eventHandler) OnRow(e *canal.RowsEvent) error {
case canal.UpdateAction:
reqs, err = h.r.makeUpdateRequest(rule, e.Rows)
default:
return errors.Errorf("invalid rows action %s", e.Action)
err = errors.Errorf("invalid rows action %s", e.Action)
}

if err != nil {
return errors.Errorf("make %s ES request err %v", e.Action, err)
h.r.cancel()
return errors.Errorf("make %s ES request err %v, close sync", e.Action, err)
}

return h.r.doBulk(reqs)
h.r.syncCh <- reqs

return h.r.ctx.Err()
}

func (h *eventHandler) String() string {
return "ESRiverEventHandler"
}

func (r *River) syncLoop() {
bulkSize := r.c.BulkSize
if bulkSize == 0 {
bulkSize = 128
}

interval := r.c.FlushBulkTime.Duration
if interval == 0 {
interval = 200 * time.Millisecond
}

ticker := time.NewTicker(interval)
defer ticker.Stop()
defer r.wg.Done()

lastSavedTime := time.Now()
reqs := make([]*elastic.BulkRequest, 0, 1024)

var pos mysql.Position

for {
needFlush := false
needSavePos := false

select {
case v := <-r.syncCh:
switch v := v.(type) {
case posSaver:
now := time.Now()
if v.force || now.Sub(lastSavedTime) > 3*time.Second {
lastSavedTime = now
needFlush = true
needSavePos = true
pos = v.pos
}
case []*elastic.BulkRequest:
reqs = append(reqs, v...)
needFlush = len(reqs) >= bulkSize
}
case <-ticker.C:
needFlush = true
case <-r.ctx.Done():
return
}

if needFlush {
// TODO: retry some times?
if err := r.doBulk(reqs); err != nil {
log.Errorf("do ES bulk err %v, close sync", err)
r.cancel()
return
}
reqs = reqs[0:0]
}

if needSavePos {
if err := r.master.Save(pos); err != nil {
log.Errorf("save sync position %s err %v, close sync", pos, err)
r.cancel()
return
}
}
}
}

// for insert and delete
func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]*elastic.BulkRequest, error) {
reqs := make([]*elastic.BulkRequest, 0, len(rows))
Expand Down
Loading