From 8719e13c820411fa52a20b9920c29ca03073eabe Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 29 Mar 2017 14:50:18 +0800 Subject: [PATCH 01/18] Update river.go it looks that you have already support for multiple Primary key, but forget remove the code. --- river/river.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/river/river.go b/river/river.go index d9c66801..b91cbb53 100644 --- a/river/river.go +++ b/river/river.go @@ -221,12 +221,6 @@ func (r *River) prepareRule() error { if rule.TableInfo, err = r.canal.GetTable(rule.Schema, rule.Table); err != nil { return errors.Trace(err) } - - // table must have a PK for one column, multi columns may be supported later. - - if len(rule.TableInfo.PKColumns) != 1 { - return errors.Errorf("%s.%s must have a PK for a column", rule.Schema, rule.Table) - } } return nil From e5ff5dc96dabaa19ca36e02c57a4e08e912a5924 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Fri, 31 Mar 2017 22:40:43 +0800 Subject: [PATCH 02/18] if have only one request, use bulk will waste resource of elasticsearch --- river/sync.go | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/river/sync.go b/river/sync.go index ef975f43..4551139f 100644 --- a/river/sync.go +++ b/river/sync.go @@ -325,19 +325,41 @@ func (r *River) getParentID(rule *Rule, row []interface{}, columnName string) (s } func (r *River) doBulk(reqs []*elastic.BulkRequest) error { + flag := true + var err error if len(reqs) == 0 { return nil } - - if resp, err := r.es.Bulk(reqs); err != nil { - log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) - return errors.Trace(err) - } else if resp.Errors { - for i := 0; i < len(resp.Items); i++ { - for action, item := range resp.Items[i] { - if len(item.Error) > 0 { - log.Errorf("%s index: %s, type: %s, id: %s, status: %d, error: %s", - action, item.Index, item.Type, item.ID, item.Status, item.Error) + if len(reqs) == 1{ + switch reqs[0].Action { + case "index": + err = r.es.Update(reqs[0].Index, reqs[0].Type, reqs[0].ID, reqs[0].Data) + case "delete": + err = r.es.Delete(reqs[0].Index, reqs[0].Type, reqs[0].ID) + case "update": + err = r.es.Update(reqs[0].Index, reqs[0].Type, reqs[0].ID, reqs[0].Data) + default: + flag = false + err = nil + } + if flag { + if err != nil { + log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) + return errors.Trace(err) + } + return nil + } + } else if len(reqs) > 1 || (!flag){ + if resp, err := r.es.Bulk(reqs); err != nil { + log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) + return errors.Trace(err) + } else if resp.Errors { + for i := 0; i < len(resp.Items); i++ { + for action, item := range resp.Items[i] { + if len(item.Error) > 0 { + log.Errorf("%s index: %s, type: %s, id: %s, status: %d, error: %s", + action, item.Index, item.Type, item.ID, item.Status, item.Error) + } } } } From 74e0c36076b4b8fa3bd8db078fa9a04f2186c3f5 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 17:52:08 +0800 Subject: [PATCH 03/18] Update river.go --- river/river.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/river/river.go b/river/river.go index cac1aa0e..a9db22d3 100644 --- a/river/river.go +++ b/river/river.go @@ -221,13 +221,11 @@ func (r *River) prepareRule() error { if rule.TableInfo, err = r.canal.GetTable(rule.Schema, rule.Table); err != nil { return errors.Trace(err) } -<<<<<<< HEAD -======= if len(rule.TableInfo.PKColumns) == 0 { return errors.Errorf("%s.%s must have a PK for a column", rule.Schema, rule.Table) } ->>>>>>> 4f21bc6ec1c64547ee05c698941ae8158b785760 + } return nil From 45c069ab9cbfae38f92f40bdf48e70c551923b2a Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 20:42:44 +0800 Subject: [PATCH 04/18] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ffacd7bc..7c43a6f3 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ It uses `mysqldump` to fetch the origin data at first, then syncs data increment + binlog format must be **row**. + binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image. + Can not alter table format at runtime. -+ MySQL table which will be synced must have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. ++ MySQL table which will be synced should have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. And you can also config the id's constituent part. + You should create the associated mappings in Elasticsearch first, I don't think using the default mapping is a wise decision, you must know how to search accurately. + `mysqldump` must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only. + Don't change too many rows at same time in one SQL. From 02ec489e96424a6e6753265d8d95643076feebae Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 20:44:02 +0800 Subject: [PATCH 05/18] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7c43a6f3..78dbffe2 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ It uses `mysqldump` to fetch the origin data at first, then syncs data increment + binlog format must be **row**. + binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image. + Can not alter table format at runtime. -+ MySQL table which will be synced should have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. And you can also config the id's constituent part. ++ MySQL table which will be synced should have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. And you can also config the id's constituent part with other column. + You should create the associated mappings in Elasticsearch first, I don't think using the default mapping is a wise decision, you must know how to search accurately. + `mysqldump` must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only. + Don't change too many rows at same time in one SQL. From 6c32bdc81532bc4c00d840d17b19f5c0d02cae05 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 23:35:34 +0800 Subject: [PATCH 06/18] update commit version of go-mysql --- glide.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide.lock b/glide.lock index 9c2cfbc1..5328ed51 100644 --- a/glide.lock +++ b/glide.lock @@ -16,7 +16,7 @@ imports: - ioutil2 - sync2 - name: github.com/siddontang/go-mysql - version: 3ca161ffa3b5844340e534d4b3cdc03b60f32610 + version: dd8c4fc06b092a5563dcd68431c857dd21a36d0a subpackages: - canal - client From 4b0ba49529c2745e76a180e096d19d73bd32209b Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Sat, 29 Apr 2017 01:01:27 +0800 Subject: [PATCH 07/18] Update glide.lock --- glide.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/glide.lock b/glide.lock index 5328ed51..73252e14 100644 --- a/glide.lock +++ b/glide.lock @@ -15,8 +15,8 @@ imports: - hack - ioutil2 - sync2 -- name: github.com/siddontang/go-mysql - version: dd8c4fc06b092a5563dcd68431c857dd21a36d0a +- name: github.com/WangXiangUSTC/go-mysql + version: fddc32296e000e7d07ca11c49225f8ae4c03f915 subpackages: - canal - client From 4ef3830c0eb48523c86bdeeb158110816d27cd88 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC <347249478@qq.com> Date: Sun, 30 Apr 2017 16:03:15 +0800 Subject: [PATCH 08/18] add set of charcter --- river/config.go | 1 + river/river.go | 1 + 2 files changed, 2 insertions(+) diff --git a/river/config.go b/river/config.go index c21687ca..c6f626f3 100644 --- a/river/config.go +++ b/river/config.go @@ -17,6 +17,7 @@ type Config struct { MyAddr string `toml:"my_addr"` MyUser string `toml:"my_user"` MyPassword string `toml:"my_pass"` + MyCharset string `toml:"my_charset"` ESAddr string `toml:"es_addr"` diff --git a/river/river.go b/river/river.go index 1e056002..fc349b63 100644 --- a/river/river.go +++ b/river/river.go @@ -79,6 +79,7 @@ func (r *River) newCanal() error { cfg.Addr = r.c.MyAddr cfg.User = r.c.MyUser cfg.Password = r.c.MyPassword + cfg.Charset = r.c.MyCharset cfg.Flavor = r.c.Flavor cfg.ServerID = r.c.ServerID From e1e147213a91ed51ebf3a7e75a8d3cad129c6c31 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Fri, 5 May 2017 20:31:58 +0800 Subject: [PATCH 09/18] update sync.go --- river/sync.go | 44 +++++++++++--------------------------------- 1 file changed, 11 insertions(+), 33 deletions(-) diff --git a/river/sync.go b/river/sync.go index aea2fd07..4217f851 100644 --- a/river/sync.go +++ b/river/sync.go @@ -432,45 +432,23 @@ func (r *River) getParentID(rule *Rule, row []interface{}, columnName string) (s } func (r *River) doBulk(reqs []*elastic.BulkRequest) error { - flag := true - var err error if len(reqs) == 0 { return nil } - if len(reqs) == 1{ - switch reqs[0].Action { - case "index": - err = r.es.Update(reqs[0].Index, reqs[0].Type, reqs[0].ID, reqs[0].Data) - case "delete": - err = r.es.Delete(reqs[0].Index, reqs[0].Type, reqs[0].ID) - case "update": - err = r.es.Update(reqs[0].Index, reqs[0].Type, reqs[0].ID, reqs[0].Data) - default: - flag = false - err = nil - } - if flag { - if err != nil { - log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) - return errors.Trace(err) - } - return nil - } - } else if len(reqs) > 1 || (!flag){ - if resp, err := r.es.Bulk(reqs); err != nil { - log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) - return errors.Trace(err) - } else if resp.Errors { - for i := 0; i < len(resp.Items); i++ { - for action, item := range resp.Items[i] { - if len(item.Error) > 0 { - log.Errorf("%s index: %s, type: %s, id: %s, status: %d, error: %s", - action, item.Index, item.Type, item.ID, item.Status, item.Error) - } + + if resp, err := r.es.Bulk(reqs); err != nil { + log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) + return errors.Trace(err) + } else if resp.Errors { + for i := 0; i < len(resp.Items); i++ { + for action, item := range resp.Items[i] { + if len(item.Error) > 0 { + log.Errorf("%s index: %s, type: %s, id: %s, status: %d, error: %s", + action, item.Index, item.Type, item.ID, item.Status, item.Error) } } } } - return nil + return nil } From 56cb5d7e279534f708dbcc78b1f1f1ec8d4a270c Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Fri, 5 May 2017 20:53:32 +0800 Subject: [PATCH 10/18] update gp-mysql version --- glide.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide.lock b/glide.lock index 3c6a5f9a..c1c7fe56 100644 --- a/glide.lock +++ b/glide.lock @@ -16,7 +16,7 @@ imports: - ioutil2 - sync2 - name: github.com/siddontang/go-mysql - version: 3fef0652795ca3a9d47e4af1a2f6d885e574e9bb + version: ead11cac47bd127a8c667efa07f171a9143d8a25 subpackages: - canal - client From e9659253b0295a334f91f0b2a69ddc1d66f439a6 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Fri, 5 May 2017 20:54:25 +0800 Subject: [PATCH 11/18] update go-mysql version --- glide.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide.yaml b/glide.yaml index 888c5acc..e84692f4 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,7 +7,7 @@ import: - package: github.com/satori/go.uuid version: ^1.1.0 - package: github.com/siddontang/go-mysql - version: 3fef0652795ca3a9d47e4af1a2f6d885e574e9bb + version: ead11cac47bd127a8c667efa07f171a9143d8a25 subpackages: - canal - client From cdc4543150c0e123ef8cfc8816aa37cb15e62caf Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Fri, 5 May 2017 20:56:14 +0800 Subject: [PATCH 12/18] add config of mysql charset --- etc/river.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/etc/river.toml b/etc/river.toml index 9b02ac02..0b9d5d68 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -3,6 +3,7 @@ my_addr = "127.0.0.1:3306" my_user = "root" my_pass = "" +my_charset = "utf8" # Elasticsearch address es_addr = "127.0.0.1:9200" From 570317cbd00db0fd6e863516502b6ae30bd91962 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Fri, 5 May 2017 20:59:08 +0800 Subject: [PATCH 13/18] add test of mysql charset --- river/river_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/river/river_test.go b/river/river_test.go index 38184da9..0cc7e28d 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -59,6 +59,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { cfg.MyAddr = *my_addr cfg.MyUser = "root" cfg.MyPassword = "" + cfg.MyCharset = "utf8" cfg.ESAddr = *es_addr cfg.ServerID = 1001 @@ -121,7 +122,7 @@ func (s *riverTestSuite) TestConfig(c *C) { my_addr = "127.0.0.1:3306" my_user = "root" my_pass = "" - +my_charset = "utf8" es_addr = "127.0.0.1:9200" data_dir = "./var" From 43d24a4419b3aabdcbd820bbbdd9d39cbd2e98d2 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC <347249478@qq.com> Date: Sun, 7 May 2017 13:16:39 +0800 Subject: [PATCH 14/18] update go-mysql vendor --- .../github.com/siddontang/go-mysql/canal/canal.go | 4 ++++ .../siddontang/go-mysql/canal/config.go | 6 ++++-- .../github.com/siddontang/go-mysql/dump/dump.go | 13 ++++++++++++- .../go-mysql/replication/binlogsyncer.go | 15 ++++++++++++++- 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/vendor/github.com/siddontang/go-mysql/canal/canal.go b/vendor/github.com/siddontang/go-mysql/canal/canal.go index 4d668776..aa5cd65d 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/canal.go +++ b/vendor/github.com/siddontang/go-mysql/canal/canal.go @@ -101,6 +101,9 @@ func (c *Canal) prepareDumper() error { c.dumper.AddTables(tableDB, tables...) } + charset := c.cfg.Charset + c.dumper.SetCharset(charset) + for _, ignoreTable := range c.cfg.Dump.IgnoreTables { if seps := strings.Split(ignoreTable, ","); len(seps) == 2 { c.dumper.AddIgnoreTables(seps[0], seps[1]) @@ -266,6 +269,7 @@ func (c *Canal) prepareSyncer() error { Port: uint16(port), User: c.cfg.User, Password: c.cfg.Password, + Charset : c.cfg.Charset, } c.syncer = replication.NewBinlogSyncer(&cfg) diff --git a/vendor/github.com/siddontang/go-mysql/canal/config.go b/vendor/github.com/siddontang/go-mysql/canal/config.go index 50606c8c..8f01d321 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/config.go +++ b/vendor/github.com/siddontang/go-mysql/canal/config.go @@ -4,7 +4,7 @@ import ( "io/ioutil" "math/rand" "time" - + "github.com/siddontang/go-mysql/mysql" "github.com/BurntSushi/toml" "github.com/juju/errors" ) @@ -19,7 +19,7 @@ type DumpConfig struct { TableDB string `toml:"table_db"` Databases []string `toml:"dbs"` - + // Ignore table format is db.table IgnoreTables []string `toml:"ignore_tables"` @@ -32,6 +32,7 @@ type Config struct { User string `toml:"user"` Password string `toml:"password"` + Charset string `toml:"charset"` ServerID uint32 `toml:"server_id"` Flavor string `toml:"flavor"` @@ -65,6 +66,7 @@ func NewDefaultConfig() *Config { c.User = "root" c.Password = "" + c.Charset = mysql.DEFAULT_CHARSET rand.Seed(time.Now().Unix()) c.ServerID = uint32(rand.Intn(1000)) + 1001 diff --git a/vendor/github.com/siddontang/go-mysql/dump/dump.go b/vendor/github.com/siddontang/go-mysql/dump/dump.go index a6ff2091..da3e5b3b 100644 --- a/vendor/github.com/siddontang/go-mysql/dump/dump.go +++ b/vendor/github.com/siddontang/go-mysql/dump/dump.go @@ -6,7 +6,7 @@ import ( "os" "os/exec" "strings" - + . "github.com/siddontang/go-mysql/mysql" "github.com/juju/errors" ) @@ -25,6 +25,8 @@ type Dumper struct { Databases []string + Charset string + IgnoreTables map[string][]string ErrOut io.Writer @@ -47,6 +49,7 @@ func NewDumper(executionPath string, addr string, user string, password string) d.Password = password d.Tables = make([]string, 0, 16) d.Databases = make([]string, 0, 16) + d.Charset = DEFAULT_CHARSET d.IgnoreTables = make(map[string][]string) d.ErrOut = os.Stderr @@ -54,6 +57,10 @@ func NewDumper(executionPath string, addr string, user string, password string) return d, nil } +func (d *Dumper) SetCharset(charset string) { + d.Charset = charset +} + func (d *Dumper) SetErrOut(o io.Writer) { d.ErrOut = o } @@ -133,6 +140,10 @@ func (d *Dumper) Dump(w io.Writer) error { w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB))) } + if len(d.Charset) != 0 { + args = append(args, fmt.Sprintf("--default-character-set=%s", d.Charset)) + } + cmd := exec.Command(d.ExecutionPath, args...) cmd.Stderr = d.ErrOut diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go index b85e7cd2..8f69814a 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go @@ -40,6 +40,9 @@ type BinlogSyncerConfig struct { // If not set, use os.Hostname() instead. Localhost string + // Charset is for MySQL client character set + Charset string + // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool @@ -52,6 +55,8 @@ type BinlogSyncerConfig struct { // Use replication.Time structure for timestamp and datetime. // We will use Local location for timestamp and UTC location for datatime. ParseTime bool + + LogLevel string } // BinlogSyncer syncs binlog event from server. @@ -76,6 +81,11 @@ type BinlogSyncer struct { // NewBinlogSyncer creates the BinlogSyncer with cfg. func NewBinlogSyncer(cfg *BinlogSyncerConfig) *BinlogSyncer { + if cfg.LogLevel == "" { + cfg.LogLevel = "info" + } + log.SetLevelByString(cfg.LogLevel) + log.Infof("create BinlogSyncer with config %v", cfg) b := new(BinlogSyncer) @@ -143,6 +153,9 @@ func (b *BinlogSyncer) registerSlave() error { if err != nil { return errors.Trace(err) } + if len(b.cfg.Charset) != 0 { + b.c.SetCharset(b.cfg.Charset) + } //for mysql 5.6+, binlog has a crc32 checksum //before mysql 5.6, this will not work, don't matter.:-) @@ -556,7 +569,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { data = data[2:] } - e, err := b.parser.parse(data) + e, err := b.parser.Parse(data) if err != nil { return errors.Trace(err) } From 4bdeb151d007568af3d7504ee9426998b4258148 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC <347249478@qq.com> Date: Sun, 7 May 2017 13:28:38 +0800 Subject: [PATCH 15/18] update go-mysql vendor --- .../siddontang/go-mysql/replication/parser.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/vendor/github.com/siddontang/go-mysql/replication/parser.go b/vendor/github.com/siddontang/go-mysql/replication/parser.go index 6f727295..056a17b5 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/parser.go +++ b/vendor/github.com/siddontang/go-mysql/replication/parser.go @@ -56,10 +56,10 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) return errors.Errorf("seek %s to %d error %v", name, offset, err) } - return p.parseReader(f, onEvent) + return p.ParseReader(f, onEvent) } -func (p *BinlogParser) parseReader(r io.Reader, onEvent OnEventFunc) error { +func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error { p.Reset() var err error @@ -212,7 +212,13 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte) (Event, error) { return e, nil } -func (p *BinlogParser) parse(data []byte) (*BinlogEvent, error) { +// Given the bytes for a a binary log event: return the decoded event. +// With the exception of the FORMAT_DESCRIPTION_EVENT event type +// there must have previously been passed a FORMAT_DESCRIPTION_EVENT +// into the parser for this to work properly on any given event. +// Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace +// an existing one. +func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) { rawData := data h, err := p.parseHeader(data) From 945ddb25fe914a58ba847310f1fc7c1b6b75ef70 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Sun, 7 May 2017 13:33:34 +0800 Subject: [PATCH 16/18] Update sync.go --- river/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/sync.go b/river/sync.go index 4217f851..5f914d08 100644 --- a/river/sync.go +++ b/river/sync.go @@ -450,5 +450,5 @@ func (r *River) doBulk(reqs []*elastic.BulkRequest) error { } } - return nil + return nil } From 82de3f84309f00cb835b60a20bbfaac6a39e199d Mon Sep 17 00:00:00 2001 From: WangXiangUSTC <347249478@qq.com> Date: Tue, 13 Jun 2017 23:24:48 +0800 Subject: [PATCH 17/18] add assert of response code, and fix the bug of parent test --- elastic/client_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/elastic/client_test.go b/elastic/client_test.go index 74214317..dd4547a3 100644 --- a/elastic/client_test.go +++ b/elastic/client_test.go @@ -53,6 +53,7 @@ func (s *elasticTestSuite) TestSimple(c *C) { r, err := s.c.Get(index, docType, "1") c.Assert(err, IsNil) + c.Assert(r.Code, Equals, 200) c.Assert(r.ID, Equals, "1") err = s.c.Delete(index, docType, "1") @@ -78,6 +79,7 @@ func (s *elasticTestSuite) TestSimple(c *C) { resp, err := s.c.IndexTypeBulk(index, docType, items) c.Assert(err, IsNil) + c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) for i := 0; i < 10; i++ { @@ -90,6 +92,7 @@ func (s *elasticTestSuite) TestSimple(c *C) { resp, err = s.c.IndexTypeBulk(index, docType, items) c.Assert(err, IsNil) + c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) } @@ -112,16 +115,20 @@ func (s *elasticTestSuite) TestParent(c *C) { resp, err := s.c.IndexTypeBulk(index, docType, items) c.Assert(err, IsNil) + c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) - for i := 0; i < 10; i++ { id := fmt.Sprintf("%d", i) req := new(BulkRequest) + req.Index = index + req.Type = docType req.Action = ActionDelete req.ID = id + req.Parent = "1" items[i] = req } - resp, err = s.c.IndexTypeBulk(index, docType, items) + resp, err = s.c.Bulk(items) c.Assert(err, IsNil) + c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) } From c9f4ab14b655ca3b92bbe7527d26afe73ec541ee Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 14 Jun 2017 09:40:33 +0800 Subject: [PATCH 18/18] add create mapping --- elastic/client_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/elastic/client_test.go b/elastic/client_test.go index dd4547a3..00b72ee5 100644 --- a/elastic/client_test.go +++ b/elastic/client_test.go @@ -100,9 +100,18 @@ func (s *elasticTestSuite) TestSimple(c *C) { func (s *elasticTestSuite) TestParent(c *C) { index := "dummy" docType := "comment" + ParentType := "parent" + mapping := map[string]interface{}{ + docType: map[string]interface{}{ + "_parent": map[string]string{"type": ParentType}, + }, + } + err := s.c.CreateMapping(index, docType, mapping) + c.Assert(err, IsNil) + items := make([]*BulkRequest, 10) - + for i := 0; i < 10; i++ { id := fmt.Sprintf("%d", i) req := new(BulkRequest)