From 8719e13c820411fa52a20b9920c29ca03073eabe Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 29 Mar 2017 14:50:18 +0800 Subject: [PATCH 01/20] Update river.go it looks that you have already support for multiple Primary key, but forget remove the code. --- river/river.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/river/river.go b/river/river.go index d9c66801..b91cbb53 100644 --- a/river/river.go +++ b/river/river.go @@ -221,12 +221,6 @@ func (r *River) prepareRule() error { if rule.TableInfo, err = r.canal.GetTable(rule.Schema, rule.Table); err != nil { return errors.Trace(err) } - - // table must have a PK for one column, multi columns may be supported later. - - if len(rule.TableInfo.PKColumns) != 1 { - return errors.Errorf("%s.%s must have a PK for a column", rule.Schema, rule.Table) - } } return nil From e5ff5dc96dabaa19ca36e02c57a4e08e912a5924 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Fri, 31 Mar 2017 22:40:43 +0800 Subject: [PATCH 02/20] if have only one request, use bulk will waste resource of elasticsearch --- river/sync.go | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/river/sync.go b/river/sync.go index ef975f43..4551139f 100644 --- a/river/sync.go +++ b/river/sync.go @@ -325,19 +325,41 @@ func (r *River) getParentID(rule *Rule, row []interface{}, columnName string) (s } func (r *River) doBulk(reqs []*elastic.BulkRequest) error { + flag := true + var err error if len(reqs) == 0 { return nil } - - if resp, err := r.es.Bulk(reqs); err != nil { - log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) - return errors.Trace(err) - } else if resp.Errors { - for i := 0; i < len(resp.Items); i++ { - for action, item := range resp.Items[i] { - if len(item.Error) > 0 { - log.Errorf("%s index: %s, type: %s, id: %s, status: %d, error: %s", - action, item.Index, item.Type, item.ID, item.Status, item.Error) + if len(reqs) == 1{ + switch reqs[0].Action { + case "index": + err = r.es.Update(reqs[0].Index, reqs[0].Type, reqs[0].ID, reqs[0].Data) + case "delete": + err = r.es.Delete(reqs[0].Index, reqs[0].Type, reqs[0].ID) + case "update": + err = r.es.Update(reqs[0].Index, reqs[0].Type, reqs[0].ID, reqs[0].Data) + default: + flag = false + err = nil + } + if flag { + if err != nil { + log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) + return errors.Trace(err) + } + return nil + } + } else if len(reqs) > 1 || (!flag){ + if resp, err := r.es.Bulk(reqs); err != nil { + log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) + return errors.Trace(err) + } else if resp.Errors { + for i := 0; i < len(resp.Items); i++ { + for action, item := range resp.Items[i] { + if len(item.Error) > 0 { + log.Errorf("%s index: %s, type: %s, id: %s, status: %d, error: %s", + action, item.Index, item.Type, item.ID, item.Status, item.Error) + } } } } From 04f3bfdcd2bb6361550ac3d5a2fa9b54a5f97b0d Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 19:12:51 +0800 Subject: [PATCH 03/20] can config es id's constituent part --- etc/river.toml | 3 + river/river.go | 4 +- river/rule.go | 2 + river/sync.go | 83 +++++++++---------- .../siddontang/go-mysql/canal/rows.go | 10 +++ 5 files changed, 53 insertions(+), 49 deletions(-) diff --git a/etc/river.toml b/etc/river.toml index 52b316f9..44b047c8 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -58,6 +58,8 @@ schema = "test" table = "t" index = "test" type = "t" +# Default will use the mysql's primary key as es's id, if set id will use the id's column value as id +id = ["id", "tags"] # Wildcard table rule, the wildcard table must be in source tables # All tables which match the wildcard format will be synced to ES index `test` and type `t`. @@ -113,3 +115,4 @@ type = "tfilter" # Only sync following columns filter = ["id", "name"] + diff --git a/river/river.go b/river/river.go index 545501c3..d23e840b 100644 --- a/river/river.go +++ b/river/river.go @@ -212,6 +212,7 @@ func (r *River) prepareRule() error { rr.Index = rule.Index rr.Type = rule.Type rr.Parent = rule.Parent + rr.Id = rule.Id rr.FieldMapping = rule.FieldMapping } } else { @@ -229,13 +230,10 @@ func (r *River) prepareRule() error { if rule.TableInfo, err = r.canal.GetTable(rule.Schema, rule.Table); err != nil { return errors.Trace(err) } -<<<<<<< HEAD -======= if len(rule.TableInfo.PKColumns) == 0 { return errors.Errorf("%s.%s must have a PK for a column", rule.Schema, rule.Table) } ->>>>>>> 4f21bc6ec1c64547ee05c698941ae8158b785760 } return nil diff --git a/river/rule.go b/river/rule.go index f8500126..3e7aaf6d 100644 --- a/river/rule.go +++ b/river/rule.go @@ -13,6 +13,7 @@ type Rule struct { Index string `toml:"index"` Type string `toml:"type"` Parent string `toml:"parent"` + Id []string `toml:"id"` // Default, a MySQL table field name is mapped to Elasticsearch field name. // Sometimes, you want to use different name, e.g, the MySQL file name is title, @@ -33,6 +34,7 @@ func newDefaultRule(schema string, table string) *Rule { r.Table = table r.Index = table r.Type = table + r.Id = id r.FieldMapping = make(map[string]string) return r diff --git a/river/sync.go b/river/sync.go index 2b47b170..782c1f6e 100644 --- a/river/sync.go +++ b/river/sync.go @@ -384,27 +384,40 @@ func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule, } } -// Get primary keys in one row and format them into a string -// PK must not be nil +// If Id in toml file is none, get primary keys in one row and format them into a string, and PK must not be nil +// Else get the Id's column in one row and format them into a string func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { - pks, err := canal.GetPKValues(rule.TableInfo, row) - if err != nil { - return "", err + var id []interface{} + var err error + if rule.Id == nil { + id, err = canal.GetPKValues(rule.TableInfo, row) + if err != nil { + return "", err + } + } else { + id = make([]interface{}, 0, len(rule.Id)) + for _, column := range rule.Id { + value, err := canal.GetFieldValue(rule.TableInfo, column, row) + if err != nil { + return "", err + } + id = append(id, value) + } } var buf bytes.Buffer - sep := "" - for i, value := range pks { - if value == nil { - return "", errors.Errorf("The %ds PK value is nil", i) - } + sep := "" + for i, value := range id { + if value == nil { + return "", errors.Errorf("The %ds Id or PK value is nil", i) + } - buf.WriteString(fmt.Sprintf("%s%v", sep, value)) - sep = ":" - } + buf.WriteString(fmt.Sprintf("%s%v", sep, value)) + sep = ":" + } - return buf.String(), nil + return buf.String(), nil } func (r *River) getParentID(rule *Rule, row []interface{}, columnName string) (string, error) { @@ -417,41 +430,19 @@ func (r *River) getParentID(rule *Rule, row []interface{}, columnName string) (s } func (r *River) doBulk(reqs []*elastic.BulkRequest) error { - flag := true - var err error if len(reqs) == 0 { return nil } - if len(reqs) == 1{ - switch reqs[0].Action { - case "index": - err = r.es.Update(reqs[0].Index, reqs[0].Type, reqs[0].ID, reqs[0].Data) - case "delete": - err = r.es.Delete(reqs[0].Index, reqs[0].Type, reqs[0].ID) - case "update": - err = r.es.Update(reqs[0].Index, reqs[0].Type, reqs[0].ID, reqs[0].Data) - default: - flag = false - err = nil - } - if flag { - if err != nil { - log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) - return errors.Trace(err) - } - return nil - } - } else if len(reqs) > 1 || (!flag){ - if resp, err := r.es.Bulk(reqs); err != nil { - log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) - return errors.Trace(err) - } else if resp.Errors { - for i := 0; i < len(resp.Items); i++ { - for action, item := range resp.Items[i] { - if len(item.Error) > 0 { - log.Errorf("%s index: %s, type: %s, id: %s, status: %d, error: %s", - action, item.Index, item.Type, item.ID, item.Status, item.Error) - } + + if resp, err := r.es.Bulk(reqs); err != nil { + log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) + return errors.Trace(err) + } else if resp.Errors { + for i := 0; i < len(resp.Items); i++ { + for action, item := range resp.Items[i] { + if len(item.Error) > 0 { + log.Errorf("%s index: %s, type: %s, id: %s, status: %d, error: %s", + action, item.Index, item.Type, item.ID, item.Status, item.Error) } } } diff --git a/vendor/github.com/siddontang/go-mysql/canal/rows.go b/vendor/github.com/siddontang/go-mysql/canal/rows.go index 5c5e467f..2e0afa81 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/rows.go +++ b/vendor/github.com/siddontang/go-mysql/canal/rows.go @@ -53,6 +53,16 @@ func GetPKValues(table *schema.Table, row []interface{}) ([]interface{}, error) return values, nil } +// Get term column's value +func GetColumnValue(table *schema.Table, column string, row []interface{}) (interface{}, error) { + index := table.FindColumn(column) + if index == -1 { + return nil, errors.Errorf("table %s has no column name %s", table, column) + } + + return row[index], nil +} + // String implements fmt.Stringer interface. func (r *RowsEvent) String() string { return fmt.Sprintf("%s %s %v", r.Action, r.Table, r.Rows) From b5e4d00a89b2b399966dd9b822ada9c3561aab3e Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 19:24:52 +0800 Subject: [PATCH 04/20] can config es id's constituent part --- river/rule.go | 1 - river/sync.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/river/rule.go b/river/rule.go index 3e7aaf6d..3720405e 100644 --- a/river/rule.go +++ b/river/rule.go @@ -34,7 +34,6 @@ func newDefaultRule(schema string, table string) *Rule { r.Table = table r.Index = table r.Type = table - r.Id = id r.FieldMapping = make(map[string]string) return r diff --git a/river/sync.go b/river/sync.go index 782c1f6e..e625603b 100644 --- a/river/sync.go +++ b/river/sync.go @@ -397,7 +397,7 @@ func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { } else { id = make([]interface{}, 0, len(rule.Id)) for _, column := range rule.Id { - value, err := canal.GetFieldValue(rule.TableInfo, column, row) + value, err := canal.GetColumnValue(rule.TableInfo, column, row) if err != nil { return "", err } From 81f71b48886d289d6e860cd8100582f8695296d6 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 19:29:47 +0800 Subject: [PATCH 05/20] can config es id's constituent part --- river/sync.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/river/sync.go b/river/sync.go index e625603b..316222ab 100644 --- a/river/sync.go +++ b/river/sync.go @@ -407,17 +407,17 @@ func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { var buf bytes.Buffer - sep := "" - for i, value := range id { - if value == nil { - return "", errors.Errorf("The %ds Id or PK value is nil", i) - } + sep := "" + for i, value := range id { + if value == nil { + return "", errors.Errorf("The %ds Id or PK value is nil", i) + } - buf.WriteString(fmt.Sprintf("%s%v", sep, value)) - sep = ":" - } + buf.WriteString(fmt.Sprintf("%s%v", sep, value)) + sep = ":" + } - return buf.String(), nil + return buf.String(), nil } func (r *River) getParentID(rule *Rule, row []interface{}, columnName string) (string, error) { From b40c48afe1813217234a0b19575684355d422bfb Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 20:16:35 +0800 Subject: [PATCH 06/20] change id to ids --- river/sync.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/river/sync.go b/river/sync.go index 316222ab..976d0659 100644 --- a/river/sync.go +++ b/river/sync.go @@ -387,28 +387,30 @@ func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule, // If Id in toml file is none, get primary keys in one row and format them into a string, and PK must not be nil // Else get the Id's column in one row and format them into a string func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { - var id []interface{} - var err error + var ( + ids []interface{} + err error + ) if rule.Id == nil { - id, err = canal.GetPKValues(rule.TableInfo, row) + ids, err = canal.GetPKValues(rule.TableInfo, row) if err != nil { return "", err } } else { - id = make([]interface{}, 0, len(rule.Id)) + ids = make([]interface{}, 0, len(rule.Id)) for _, column := range rule.Id { value, err := canal.GetColumnValue(rule.TableInfo, column, row) if err != nil { return "", err } - id = append(id, value) + ids = append(ids, value) } } var buf bytes.Buffer sep := "" - for i, value := range id { + for i, value := range ids { if value == nil { return "", errors.Errorf("The %ds Id or PK value is nil", i) } From 58ecf1741387bf793e9c58e978c25be45ecd8239 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 20:18:32 +0800 Subject: [PATCH 07/20] change Id to ID --- river/rule.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/rule.go b/river/rule.go index 3720405e..a5ffc85b 100644 --- a/river/rule.go +++ b/river/rule.go @@ -13,7 +13,7 @@ type Rule struct { Index string `toml:"index"` Type string `toml:"type"` Parent string `toml:"parent"` - Id []string `toml:"id"` + ID []string `toml:"id"` // Default, a MySQL table field name is mapped to Elasticsearch field name. // Sometimes, you want to use different name, e.g, the MySQL file name is title, From a76d30fb43c21f7d8545690078da69afe3d5382f Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 20:20:07 +0800 Subject: [PATCH 08/20] change Id to ID --- river/river.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/river.go b/river/river.go index d23e840b..1e056002 100644 --- a/river/river.go +++ b/river/river.go @@ -212,7 +212,7 @@ func (r *River) prepareRule() error { rr.Index = rule.Index rr.Type = rule.Type rr.Parent = rule.Parent - rr.Id = rule.Id + rr.ID = rule.ID rr.FieldMapping = rule.FieldMapping } } else { From 0130a1719bb738d9e712b45838abd39263d34be7 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 20:21:27 +0800 Subject: [PATCH 09/20] change Id to ID --- river/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/sync.go b/river/sync.go index 976d0659..bed7faad 100644 --- a/river/sync.go +++ b/river/sync.go @@ -391,7 +391,7 @@ func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { ids []interface{} err error ) - if rule.Id == nil { + if rule.ID == nil { ids, err = canal.GetPKValues(rule.TableInfo, row) if err != nil { return "", err From 2c0bab907196f23400e638537e47fbbb591b0714 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 20:23:32 +0800 Subject: [PATCH 10/20] change Id to ID --- river/sync.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/river/sync.go b/river/sync.go index bed7faad..5f914d08 100644 --- a/river/sync.go +++ b/river/sync.go @@ -384,8 +384,8 @@ func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule, } } -// If Id in toml file is none, get primary keys in one row and format them into a string, and PK must not be nil -// Else get the Id's column in one row and format them into a string +// If id in toml file is none, get primary keys in one row and format them into a string, and PK must not be nil +// Else get the ID's column in one row and format them into a string func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { var ( ids []interface{} @@ -397,8 +397,8 @@ func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { return "", err } } else { - ids = make([]interface{}, 0, len(rule.Id)) - for _, column := range rule.Id { + ids = make([]interface{}, 0, len(rule.ID)) + for _, column := range rule.ID { value, err := canal.GetColumnValue(rule.TableInfo, column, row) if err != nil { return "", err @@ -412,7 +412,7 @@ func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) { sep := "" for i, value := range ids { if value == nil { - return "", errors.Errorf("The %ds Id or PK value is nil", i) + return "", errors.Errorf("The %ds id or PK value is nil", i) } buf.WriteString(fmt.Sprintf("%s%v", sep, value)) From 2f0e10fcaa53d62c5df3d2c673885d2f4f546e98 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 20:45:41 +0800 Subject: [PATCH 11/20] add test of id --- river/river_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/river/river_test.go b/river/river_test.go index f66845cc..786a7d77 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -81,6 +81,14 @@ func (s *riverTestSuite) SetUpSuite(c *C) { Type: "river", FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"}, }, + + &Rule{Schema: "test", + Table: "test_river", + Index: "river", + Type: "river", + ID: []string{"id", "title"}, + FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"}, + }, &Rule{Schema: "test", Table: "test_river_[0-9]{4}", @@ -128,6 +136,7 @@ table = "test_river" index = "river" type = "river" parent = "pid" +id = ["id", "title"] [rule.field] title = "es_title" From 3d1d5c7b70c285b5f73cebfbee4a8224ba98bee4 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 20:55:29 +0800 Subject: [PATCH 12/20] add test of id --- river/river_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/river/river_test.go b/river/river_test.go index 786a7d77..e4589879 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -130,6 +130,17 @@ schema = "test" tables = ["test_river", "test_river_[0-9]{4}"] +[[rule]] +schema = "test" +table = "test_river" +index = "river" +type = "river" +parent = "pid" + + [rule.field] + title = "es_title" + mylist = "es_mylist,list" + [[rule]] schema = "test" table = "test_river" @@ -217,6 +228,9 @@ func (s *riverTestSuite) TestRiver(c *C) { c.Assert(r.Found, Equals, true) c.Assert(r.Source["tenum"], Equals, "e1") c.Assert(r.Source["tset"], Equals, "a,b") + + r = s.testElasticGet(c, "1:first") + c.Assert(r.Found, Equals, true) r = s.testElasticGet(c, "100") c.Assert(r.Found, Equals, false) From abdf23917308a165c01bc28f75b0ded35ac4bfd7 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 21:01:22 +0800 Subject: [PATCH 13/20] add test of id --- river/river_test.go | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/river/river_test.go b/river/river_test.go index e4589879..85cf7aca 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -75,13 +75,6 @@ func (s *riverTestSuite) SetUpSuite(c *C) { cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}"}}} cfg.Rules = []*Rule{ - &Rule{Schema: "test", - Table: "test_river", - Index: "river", - Type: "river", - FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"}, - }, - &Rule{Schema: "test", Table: "test_river", Index: "river", @@ -130,16 +123,6 @@ schema = "test" tables = ["test_river", "test_river_[0-9]{4}"] -[[rule]] -schema = "test" -table = "test_river" -index = "river" -type = "river" -parent = "pid" - - [rule.field] - title = "es_title" - mylist = "es_mylist,list" [[rule]] schema = "test" @@ -224,13 +207,13 @@ func (s *riverTestSuite) TestRiver(c *C) { testWaitSyncDone(c, s.r) var r *elastic.Response - r = s.testElasticGet(c, "1") + r = s.testElasticGet(c, "1:first") c.Assert(r.Found, Equals, true) c.Assert(r.Source["tenum"], Equals, "e1") c.Assert(r.Source["tset"], Equals, "a,b") - r = s.testElasticGet(c, "1:first") - c.Assert(r.Found, Equals, true) + r = s.testElasticGet(c, "1") + c.Assert(r.Found, Equals, false) r = s.testElasticGet(c, "100") c.Assert(r.Found, Equals, false) From c537142005ae481dce01d47627eae27a3bd10242 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Wed, 12 Apr 2017 21:13:29 +0800 Subject: [PATCH 14/20] modify test about id --- river/river_test.go | 49 ++++++++++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/river/river_test.go b/river/river_test.go index 85cf7aca..0311baea 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -79,6 +79,13 @@ func (s *riverTestSuite) SetUpSuite(c *C) { Table: "test_river", Index: "river", Type: "river", + FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"}, + }, + + &Rule{Schema: "test", + Table: "test_river", + Index: "river", + Type: "river_id", ID: []string{"id", "title"}, FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"}, }, @@ -123,12 +130,22 @@ schema = "test" tables = ["test_river", "test_river_[0-9]{4}"] - [[rule]] schema = "test" table = "test_river" index = "river" type = "river" +parent = "pid" + + [rule.field] + title = "es_title" + mylist = "es_mylist,list" + +[[rule]] +schema = "test" +table = "test_river" +index = "river" +type = "river_id" parent = "pid" id = ["id", "title"] @@ -172,9 +189,8 @@ func (s *riverTestSuite) testPrepareData(c *C) { } } -func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response { +func (s *riverTestSuite) testElasticGet(c *C, docType string, id string) *elastic.Response { index := "river" - docType := "river" r, err := s.r.es.Get(index, docType, id) c.Assert(err, IsNil) @@ -207,19 +223,24 @@ func (s *riverTestSuite) TestRiver(c *C) { testWaitSyncDone(c, s.r) var r *elastic.Response - r = s.testElasticGet(c, "1:first") + r = s.testElasticGet(c, "river", "1") c.Assert(r.Found, Equals, true) c.Assert(r.Source["tenum"], Equals, "e1") c.Assert(r.Source["tset"], Equals, "a,b") - r = s.testElasticGet(c, "1") + r = s.testElasticGet(c, "river_id", "1:first") + c.Assert(r.Found, Equals, true) + c.Assert(r.Source["tenum"], Equals, "e1") + c.Assert(r.Source["tset"], Equals, "a,b") + + r = s.testElasticGet(c, "river_id", "1:second") c.Assert(r.Found, Equals, false) - - r = s.testElasticGet(c, "100") + + r = s.testElasticGet(c, "river", "100") c.Assert(r.Found, Equals, false) for i := 0; i < 10; i++ { - r = s.testElasticGet(c, fmt.Sprintf("%d", 5+i)) + r = s.testElasticGet(c, "river", fmt.Sprintf("%d", 5+i)) c.Assert(r.Found, Equals, true) c.Assert(r.Source["es_title"], Equals, "abc") } @@ -241,10 +262,10 @@ func (s *riverTestSuite) TestRiver(c *C) { testWaitSyncDone(c, s.r) - r = s.testElasticGet(c, "1") + r = s.testElasticGet(c, "river", "1") c.Assert(r.Found, Equals, false) - r = s.testElasticGet(c, "2") + r = s.testElasticGet(c, "river", "2") c.Assert(r.Found, Equals, true) c.Assert(r.Source["es_title"], Equals, "second 2") c.Assert(r.Source["tenum"], Equals, "e3") @@ -252,21 +273,21 @@ func (s *riverTestSuite) TestRiver(c *C) { c.Assert(r.Source["es_mylist"], DeepEquals, []interface{}{"a", "b", "c"}) c.Assert(r.Source["tbit"], Equals, float64(1)) - r = s.testElasticGet(c, "4") + r = s.testElasticGet(c, "river", "4") c.Assert(r.Found, Equals, true) c.Assert(r.Source["tenum"], Equals, "") c.Assert(r.Source["tset"], Equals, "a,b,c") c.Assert(r.Source["tbit"], Equals, float64(0)) - r = s.testElasticGet(c, "3") + r = s.testElasticGet(c, "river", "3") c.Assert(r.Found, Equals, false) - r = s.testElasticGet(c, "30") + r = s.testElasticGet(c, "river", "30") c.Assert(r.Found, Equals, true) c.Assert(r.Source["es_title"], Equals, "second 30") for i := 0; i < 10; i++ { - r = s.testElasticGet(c, fmt.Sprintf("%d", 5+i)) + r = s.testElasticGet(c, "river", fmt.Sprintf("%d", 5+i)) c.Assert(r.Found, Equals, true) c.Assert(r.Source["es_title"], Equals, "hello") } From 4058633052530a5cafdcff9ffe3f5387fb4faad6 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 20:35:39 +0800 Subject: [PATCH 15/20] modify test about id --- river/river_test.go | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/river/river_test.go b/river/river_test.go index 0311baea..95692a22 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -47,6 +47,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { s.testExecute(c, "DROP TABLE IF EXISTS test_river") s.testExecute(c, fmt.Sprintf(schema, "test_river")) + s.testExecute(c, fmt.Sprintf(schema, "test_for_id")) for i := 0; i < 10; i++ { table := fmt.Sprintf("test_river_%04d", i) @@ -83,9 +84,9 @@ func (s *riverTestSuite) SetUpSuite(c *C) { }, &Rule{Schema: "test", - Table: "test_river", + Table: "test_for_id", Index: "river", - Type: "river_id", + Type: "river", ID: []string{"id", "title"}, FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"}, }, @@ -141,18 +142,19 @@ parent = "pid" title = "es_title" mylist = "es_mylist,list" + [[rule]] schema = "test" -table = "test_river" +table = "test_for_id" index = "river" -type = "river_id" +type = "river" parent = "pid" id = ["id", "title"] - [rule.field] title = "es_title" mylist = "es_mylist,list" + [[rule]] schema = "test" table = "test_river_[0-9]{4}" @@ -182,15 +184,17 @@ func (s *riverTestSuite) testPrepareData(c *C) { s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 2, "second", "hello mysql 2", "e2", "b,c") s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 3, "third", "hello elaticsearch 3", "e3", "c") s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tbit) VALUES (?, ?, ?, ?, ?, ?)", 4, "fouth", "hello go-mysql-elasticserach 4", "e1", "a,b,c", 0) - + s.testExecute(c, "INSERT INTO test_for_id (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 1, "first", "hello go 1", "e1", "a,b") + for i := 0; i < 10; i++ { table := fmt.Sprintf("test_river_%04d", i) s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c") } } -func (s *riverTestSuite) testElasticGet(c *C, docType string, id string) *elastic.Response { +func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response { index := "river" + docType := "river" r, err := s.r.es.Get(index, docType, id) c.Assert(err, IsNil) @@ -223,24 +227,19 @@ func (s *riverTestSuite) TestRiver(c *C) { testWaitSyncDone(c, s.r) var r *elastic.Response - r = s.testElasticGet(c, "river", "1") + r = s.testElasticGet(c, "1") c.Assert(r.Found, Equals, true) c.Assert(r.Source["tenum"], Equals, "e1") c.Assert(r.Source["tset"], Equals, "a,b") - r = s.testElasticGet(c, "river_id", "1:first") + r = s.testElasticGet(c, "1:first") c.Assert(r.Found, Equals, true) - c.Assert(r.Source["tenum"], Equals, "e1") - c.Assert(r.Source["tset"], Equals, "a,b") - - r = s.testElasticGet(c, "river_id", "1:second") - c.Assert(r.Found, Equals, false) - r = s.testElasticGet(c, "river", "100") + r = s.testElasticGet(c, "100") c.Assert(r.Found, Equals, false) for i := 0; i < 10; i++ { - r = s.testElasticGet(c, "river", fmt.Sprintf("%d", 5+i)) + r = s.testElasticGet(c, fmt.Sprintf("%d", 5+i)) c.Assert(r.Found, Equals, true) c.Assert(r.Source["es_title"], Equals, "abc") } @@ -262,10 +261,10 @@ func (s *riverTestSuite) TestRiver(c *C) { testWaitSyncDone(c, s.r) - r = s.testElasticGet(c, "river", "1") + r = s.testElasticGet(c, "1") c.Assert(r.Found, Equals, false) - r = s.testElasticGet(c, "river", "2") + r = s.testElasticGet(c, "2") c.Assert(r.Found, Equals, true) c.Assert(r.Source["es_title"], Equals, "second 2") c.Assert(r.Source["tenum"], Equals, "e3") @@ -273,21 +272,21 @@ func (s *riverTestSuite) TestRiver(c *C) { c.Assert(r.Source["es_mylist"], DeepEquals, []interface{}{"a", "b", "c"}) c.Assert(r.Source["tbit"], Equals, float64(1)) - r = s.testElasticGet(c, "river", "4") + r = s.testElasticGet(c, "4") c.Assert(r.Found, Equals, true) c.Assert(r.Source["tenum"], Equals, "") c.Assert(r.Source["tset"], Equals, "a,b,c") c.Assert(r.Source["tbit"], Equals, float64(0)) - r = s.testElasticGet(c, "river", "3") + r = s.testElasticGet(c, "3") c.Assert(r.Found, Equals, false) - r = s.testElasticGet(c, "river", "30") + r = s.testElasticGet(c, "30") c.Assert(r.Found, Equals, true) c.Assert(r.Source["es_title"], Equals, "second 30") for i := 0; i < 10; i++ { - r = s.testElasticGet(c, "river", fmt.Sprintf("%d", 5+i)) + r = s.testElasticGet(c, fmt.Sprintf("%d", 5+i)) c.Assert(r.Found, Equals, true) c.Assert(r.Source["es_title"], Equals, "hello") } From ecf40e330e5a6cbaae0665bbd1ae27b8adfe3a7a Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 20:46:15 +0800 Subject: [PATCH 16/20] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ffacd7bc..78dbffe2 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ It uses `mysqldump` to fetch the origin data at first, then syncs data increment + binlog format must be **row**. + binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image. + Can not alter table format at runtime. -+ MySQL table which will be synced must have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. ++ MySQL table which will be synced should have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. And you can also config the id's constituent part with other column. + You should create the associated mappings in Elasticsearch first, I don't think using the default mapping is a wise decision, you must know how to search accurately. + `mysqldump` must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only. + Don't change too many rows at same time in one SQL. From f2c690b1ca70c570c275e164b73a118eab1b3363 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 20:56:13 +0800 Subject: [PATCH 17/20] add rule of id --- etc/river.toml | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/etc/river.toml b/etc/river.toml index 44b047c8..9b02ac02 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -58,8 +58,6 @@ schema = "test" table = "t" index = "test" type = "t" -# Default will use the mysql's primary key as es's id, if set id will use the id's column value as id -id = ["id", "tags"] # Wildcard table rule, the wildcard table must be in source tables # All tables which match the wildcard format will be synced to ES index `test` and type `t`. @@ -116,3 +114,22 @@ type = "tfilter" # Only sync following columns filter = ["id", "name"] +# id rule +# +# desc tid_[0-9]{4}; +# +----------+--------------+------+-----+---------+-------+ +# | Field | Type | Null | Key | Default | Extra | +# +----------+--------------+------+-----+---------+-------+ +# | id | int(11) | NO | PRI | NULL | | +# | tag | varchar(256) | YES | | NULL | | +# | desc | varchar(256) | YES | | NULL | | +# +----------+--------------+------+-----+---------+-------+ +# +[[rule]] +schema = "test" +table = "tid_[0-9]{4}" +index = "test" +type = "t" +# The es doc's id will be `id`:`tag` +# It is useful for merge muliple table into one type while theses tables have same PK +id = ["id", "tag"] From ae47147941a259fb7b23161740cef9307388b063 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 21:03:34 +0800 Subject: [PATCH 18/20] Update river_test.go --- river/river_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/river/river_test.go b/river/river_test.go index 95692a22..1008a4b9 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -73,7 +73,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { os.RemoveAll(cfg.DataDir) - cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}"}}} + cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}", "test_for_id"}}} cfg.Rules = []*Rule{ &Rule{Schema: "test", @@ -170,7 +170,7 @@ type = "river" cfg, err := NewConfig(str) c.Assert(err, IsNil) c.Assert(cfg.Sources, HasLen, 1) - c.Assert(cfg.Sources[0].Tables, HasLen, 2) + c.Assert(cfg.Sources[0].Tables, HasLen, 3) c.Assert(cfg.Rules, HasLen, 2) } From 73a8be6e631dc6920021b7bbcbde93b1511a380c Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 21:31:27 +0800 Subject: [PATCH 19/20] Update river_test.go --- river/river_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/river/river_test.go b/river/river_test.go index 1008a4b9..38184da9 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -129,7 +129,7 @@ data_dir = "./var" [[source]] schema = "test" -tables = ["test_river", "test_river_[0-9]{4}"] +tables = ["test_river", "test_river_[0-9]{4}", "test_for_id"] [[rule]] schema = "test" @@ -171,7 +171,7 @@ type = "river" c.Assert(err, IsNil) c.Assert(cfg.Sources, HasLen, 1) c.Assert(cfg.Sources[0].Tables, HasLen, 3) - c.Assert(cfg.Rules, HasLen, 2) + c.Assert(cfg.Rules, HasLen, 3) } func (s *riverTestSuite) testExecute(c *C, query string, args ...interface{}) { From ecac9006420926bcfe02ea4caaa95aa7512404da Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Thu, 13 Apr 2017 23:37:11 +0800 Subject: [PATCH 20/20] update go-mysql commit version --- glide.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide.lock b/glide.lock index 97ef0512..736343ca 100644 --- a/glide.lock +++ b/glide.lock @@ -16,7 +16,7 @@ imports: - ioutil2 - sync2 - name: github.com/siddontang/go-mysql - version: d6523c91375d1d31abfbc9e5dce3a70e15dbb2bb + version: dd8c4fc06b092a5563dcd68431c857dd21a36d0a subpackages: - canal - client