diff --git a/Makefile b/Makefile index 9e77c1d0..bf734a8e 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ build-elasticsearch: go build -o bin/go-mysql-elasticsearch ./cmd/go-mysql-elasticsearch test: - go test --race ./... + go test -timeout 1m --race ./... clean: go clean -i ./... diff --git a/cmd/go-mysql-elasticsearch/main.go b/cmd/go-mysql-elasticsearch/main.go index 66e09269..ca6fad21 100644 --- a/cmd/go-mysql-elasticsearch/main.go +++ b/cmd/go-mysql-elasticsearch/main.go @@ -82,8 +82,14 @@ func main() { return } - r.Run() + r.Start() + + select { + case n := <-sc: + log.Infof("receive signal %v, closing", n) + case <-r.Ctx().Done(): + log.Infof("context is done with %v, closing", r.Ctx().Err()) + } - <-sc r.Close() } diff --git a/etc/river.toml b/etc/river.toml index 288bbdfe..52b316f9 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -10,7 +10,7 @@ es_addr = "127.0.0.1:9200" # Path to store data, like master.info, if not set or empty, # we must use this to support breakpoint resume syncing. # TODO: support other storage, like etcd. -data_dir = "" +data_dir = "./var" # Inner Http status address stat_addr = "127.0.0.1:12800" @@ -25,6 +25,12 @@ flavor = "mysql" # if not set or empty, ignore mysqldump. mysqldump = "mysqldump" +# minimal items to be inserted in one bulk +bulk_size = 128 + +# force flush the pending requests if we don't have enough items >= bulk_size +flush_bulk_time = "200ms" + # MySQL data source [[source]] schema = "test" diff --git a/glide.lock b/glide.lock index 94d24e82..97ef0512 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: dd9273e17c15d604139c7d1a56a22f6519ae844853cf08ee73dfed909ad74e89 -updated: 2017-04-02T21:48:34.314184617+08:00 +hash: 68b4d3bea2f3f7ec895a26c34d5b6ebb1d736f1e066140638e33980f4796c902 +updated: 2017-04-03T14:22:14.851008781+08:00 imports: - name: github.com/BurntSushi/toml version: 056c9bc7be7190eaa7715723883caffa5f8fa3e4 @@ -16,7 +16,7 @@ imports: - ioutil2 - sync2 - name: github.com/siddontang/go-mysql - version: 2d709954ac23decc41a0168d33339e5ab64b9c1a + version: d6523c91375d1d31abfbc9e5dce3a70e15dbb2bb subpackages: - canal - client diff --git a/glide.yaml b/glide.yaml index 2a6dfe41..89a9558b 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,7 +7,7 @@ import: - package: github.com/satori/go.uuid version: ^1.1.0 - package: github.com/siddontang/go-mysql - version: 2d709954ac23decc41a0168d33339e5ab64b9c1a + version: d6523c91375d1d31abfbc9e5dce3a70e15dbb2bb subpackages: - canal - client diff --git a/river/config.go b/river/config.go index 168f0463..c21687ca 100644 --- a/river/config.go +++ b/river/config.go @@ -2,6 +2,7 @@ package river import ( "io/ioutil" + "time" "github.com/BurntSushi/toml" "github.com/juju/errors" @@ -30,6 +31,10 @@ type Config struct { Sources []SourceConfig `toml:"source"` Rules []*Rule `toml:"rule"` + + BulkSize int `toml:"bulk_size"` + + FlushBulkTime TomlDuration `toml:"flush_bulk_time"` } func NewConfigWithFile(name string) (*Config, error) { @@ -51,3 +56,13 @@ func NewConfig(data string) (*Config, error) { return &c, nil } + +type TomlDuration struct { + time.Duration +} + +func (d *TomlDuration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return err +} diff --git a/river/river.go b/river/river.go index 733dced4..979b07db 100644 --- a/river/river.go +++ b/river/river.go @@ -7,9 +7,9 @@ import ( "github.com/juju/errors" "github.com/ngaut/log" - "github.com/siddontang/go-mysql/canal" - "github.com/siddontang/go-mysql-elasticsearch/elastic" + "github.com/siddontang/go-mysql/canal" + "golang.org/x/net/context" ) // In Elasticsearch, river is a pluggable service within Elasticsearch pulling data then indexing it into Elasticsearch. @@ -22,24 +22,27 @@ type River struct { rules map[string]*Rule - quit chan struct{} - wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + + wg sync.WaitGroup es *elastic.Client st *stat master *masterInfo + + syncCh chan interface{} } func NewRiver(c *Config) (*River, error) { r := new(River) r.c = c - - r.quit = make(chan struct{}) - r.rules = make(map[string]*Rule) + r.syncCh = make(chan interface{}, 4096) + r.ctx, r.cancel = context.WithCancel(context.Background()) var err error if r.master, err = loadMasterInfo(c.DataDir); err != nil { @@ -239,7 +242,10 @@ func ruleKey(schema string, table string) string { return fmt.Sprintf("%s:%s", schema, table) } -func (r *River) Run() error { +func (r *River) Start() error { + r.wg.Add(1) + go r.syncLoop() + pos := r.master.Position() if err := r.canal.StartFrom(pos); err != nil { log.Errorf("start canal err %v", err) @@ -249,9 +255,14 @@ func (r *River) Run() error { return nil } +func (r *River) Ctx() context.Context { + return r.ctx +} + func (r *River) Close() { log.Infof("closing river") - close(r.quit) + + r.cancel() r.canal.Close() diff --git a/river/river_extra_test.go b/river/river_extra_test.go index 910f7f2e..84b4c894 100644 --- a/river/river_extra_test.go +++ b/river/river_extra_test.go @@ -46,6 +46,8 @@ func (s *riverTestSuite) setupExtra(c *C) (r *River) { cfg.DumpExec = "mysqldump" cfg.StatAddr = "127.0.0.1:12800" + cfg.BulkSize = 1 + cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond} os.RemoveAll(cfg.DataDir) @@ -111,15 +113,14 @@ func (s *riverTestSuite) TestRiverWithParent(c *C) { s.testPrepareExtraData(c) - go river.Run() + river.Start() - <-river.canal.WaitDumpDone() + testWaitSyncDone(c, river) s.testElasticExtraExists(c, "1", "1", true) s.testExecute(c, "DELETE FROM test_river_extra WHERE id = ?", 1) - err := river.canal.CatchMasterPos(3 * time.Second) - c.Assert(err, IsNil) + testWaitSyncDone(c, river) s.testElasticExtraExists(c, "1", "1", false) } diff --git a/river/river_test.go b/river/river_test.go index f50c031b..f66845cc 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -67,6 +67,8 @@ func (s *riverTestSuite) SetUpSuite(c *C) { cfg.DumpExec = "mysqldump" cfg.StatAddr = "127.0.0.1:12800" + cfg.BulkSize = 1 + cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond} os.RemoveAll(cfg.DataDir) @@ -177,17 +179,29 @@ func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response { return r } -func (s *riverTestSuite) testWaitSyncDone(c *C) { - err := s.r.canal.CatchMasterPos(10 * time.Second) +func testWaitSyncDone(c *C, r *River) { + <-r.canal.WaitDumpDone() + + err := r.canal.CatchMasterPos(10 * time.Second) c.Assert(err, IsNil) + + for i := 0; i < 1000; i++ { + if len(r.syncCh) == 0 { + return + } + + time.Sleep(10 * time.Millisecond) + } + + c.Fatalf("wait 1s but still have %d items to be synced", len(r.syncCh)) } func (s *riverTestSuite) TestRiver(c *C) { s.testPrepareData(c) - go s.r.Run() + s.r.Start() - <-s.r.canal.WaitDumpDone() + testWaitSyncDone(c, s.r) var r *elastic.Response r = s.testElasticGet(c, "1") @@ -219,7 +233,7 @@ func (s *riverTestSuite) TestRiver(c *C) { s.testExecute(c, fmt.Sprintf("UPDATE %s SET title = ? WHERE id = ?", table), "hello", 5+i) } - s.testWaitSyncDone(c) + testWaitSyncDone(c, s.r) r = s.testElasticGet(c, "1") c.Assert(r.Found, Equals, false) diff --git a/river/sync.go b/river/sync.go index 05f0892d..2c6cb667 100644 --- a/river/sync.go +++ b/river/sync.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/juju/errors" "github.com/ngaut/log" @@ -25,6 +26,11 @@ const ( fieldTypeList = "list" ) +type posSaver struct { + pos mysql.Position + force bool +} + type eventHandler struct { r *River } @@ -35,15 +41,19 @@ func (h *eventHandler) OnRotate(e *replication.RotateEvent) error { uint32(e.Position), } - return h.r.master.Save(pos) + h.r.syncCh <- posSaver{pos, true} + + return h.r.ctx.Err() } func (h *eventHandler) OnDDL(nextPos mysql.Position, _ *replication.QueryEvent) error { - return h.r.master.Save(nextPos) + h.r.syncCh <- posSaver{nextPos, true} + return h.r.ctx.Err() } func (h *eventHandler) OnXID(nextPos mysql.Position) error { - return h.r.master.Save(nextPos) + h.r.syncCh <- posSaver{nextPos, false} + return h.r.ctx.Err() } func (h *eventHandler) OnRow(e *canal.RowsEvent) error { @@ -62,20 +72,88 @@ func (h *eventHandler) OnRow(e *canal.RowsEvent) error { case canal.UpdateAction: reqs, err = h.r.makeUpdateRequest(rule, e.Rows) default: - return errors.Errorf("invalid rows action %s", e.Action) + err = errors.Errorf("invalid rows action %s", e.Action) } if err != nil { - return errors.Errorf("make %s ES request err %v", e.Action, err) + h.r.cancel() + return errors.Errorf("make %s ES request err %v, close sync", e.Action, err) } - return h.r.doBulk(reqs) + h.r.syncCh <- reqs + + return h.r.ctx.Err() } func (h *eventHandler) String() string { return "ESRiverEventHandler" } +func (r *River) syncLoop() { + bulkSize := r.c.BulkSize + if bulkSize == 0 { + bulkSize = 128 + } + + interval := r.c.FlushBulkTime.Duration + if interval == 0 { + interval = 200 * time.Millisecond + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + defer r.wg.Done() + + lastSavedTime := time.Now() + reqs := make([]*elastic.BulkRequest, 0, 1024) + + var pos mysql.Position + + for { + needFlush := false + needSavePos := false + + select { + case v := <-r.syncCh: + switch v := v.(type) { + case posSaver: + now := time.Now() + if v.force || now.Sub(lastSavedTime) > 3*time.Second { + lastSavedTime = now + needFlush = true + needSavePos = true + pos = v.pos + } + case []*elastic.BulkRequest: + reqs = append(reqs, v...) + needFlush = len(reqs) >= bulkSize + } + case <-ticker.C: + needFlush = true + case <-r.ctx.Done(): + return + } + + if needFlush { + // TODO: retry some times? + if err := r.doBulk(reqs); err != nil { + log.Errorf("do ES bulk err %v, close sync", err) + r.cancel() + return + } + reqs = reqs[0:0] + } + + if needSavePos { + if err := r.master.Save(pos); err != nil { + log.Errorf("save sync position %s err %v, close sync", pos, err) + r.cancel() + return + } + } + } +} + // for insert and delete func (r *River) makeRequest(rule *Rule, action string, rows [][]interface{}) ([]*elastic.BulkRequest, error) { reqs := make([]*elastic.BulkRequest, 0, len(rows)) diff --git a/vendor/github.com/siddontang/go-mysql/canal/canal.go b/vendor/github.com/siddontang/go-mysql/canal/canal.go index d9b94f5a..4d668776 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/canal.go +++ b/vendor/github.com/siddontang/go-mysql/canal/canal.go @@ -15,12 +15,9 @@ import ( "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/schema" - "github.com/siddontang/go/sync2" "golang.org/x/net/context" ) -var errCanalClosed = errors.New("canal was closed") - // Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... // MySQL must open row format for binlog type Canal struct { @@ -43,8 +40,6 @@ type Canal struct { tableLock sync.RWMutex tables map[string]*schema.Table - closed sync2.AtomicBool - ctx context.Context cancel context.CancelFunc } @@ -52,7 +47,6 @@ type Canal struct { func NewCanal(cfg *Config) (*Canal, error) { c := new(Canal) c.cfg = cfg - c.closed.Set(false) c.ctx, c.cancel = context.WithCancel(context.Background()) @@ -139,43 +133,35 @@ func (c *Canal) StartFrom(pos mysql.Position) error { } func (c *Canal) run() error { - defer c.wg.Done() + defer func() { + c.wg.Done() + c.cancel() + }() + + err := c.tryDump() + close(c.dumpDoneCh) - if err := c.tryDump(); err != nil { + if err != nil { log.Errorf("canal dump mysql err: %v", err) return errors.Trace(err) } - close(c.dumpDoneCh) - - if err := c.startSyncBinlog(); err != nil { - if !c.isClosed() { - log.Errorf("canal start sync binlog err: %v", err) - } + if err = c.startSyncBinlog(); err != nil { + log.Errorf("canal start sync binlog err: %v", err) return errors.Trace(err) } return nil } -func (c *Canal) isClosed() bool { - return c.closed.Get() -} - func (c *Canal) Close() { - log.Infof("close canal") + log.Infof("closing canal") c.m.Lock() defer c.m.Unlock() c.cancel() - if c.isClosed() { - return - } - - c.closed.Set(true) - c.connLock.Lock() c.conn.Close() c.conn = nil @@ -193,6 +179,10 @@ func (c *Canal) WaitDumpDone() <-chan struct{} { return c.dumpDoneCh } +func (c *Canal) Ctx() context.Context { + return c.ctx +} + func (c *Canal) GetTable(db string, table string) (*schema.Table, error) { key := fmt.Sprintf("%s.%s", db, table) c.tableLock.RLock() diff --git a/vendor/github.com/siddontang/go-mysql/canal/dump.go b/vendor/github.com/siddontang/go-mysql/canal/dump.go index 0fc7eb78..68d42ce6 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/dump.go +++ b/vendor/github.com/siddontang/go-mysql/canal/dump.go @@ -24,8 +24,8 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error { } func (h *dumpParseHandler) Data(db string, table string, values []string) error { - if h.c.isClosed() { - return errCanalClosed + if err := h.c.ctx.Err(); err != nil { + return err } tableInfo, err := h.c.GetTable(db, table)