Skip to content

can config es id's constituent part #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ It uses `mysqldump` to fetch the origin data at first, then syncs data increment
+ binlog format must be **row**.
+ binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image.
+ Can not alter table format at runtime.
+ MySQL table which will be synced must have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch.
+ MySQL table which will be synced should have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. And you can also config the id's constituent part with other column.
+ You should create the associated mappings in Elasticsearch first, I don't think using the default mapping is a wise decision, you must know how to search accurately.
+ `mysqldump` must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only.
+ Don't change too many rows at same time in one SQL.
Expand Down
20 changes: 20 additions & 0 deletions etc/river.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,23 @@ type = "tfilter"

# Only sync following columns
filter = ["id", "name"]

# id rule
#
# desc tid_[0-9]{4};
# +----------+--------------+------+-----+---------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id | int(11) | NO | PRI | NULL | |
# | tag | varchar(256) | YES | | NULL | |
# | desc | varchar(256) | YES | | NULL | |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tid_[0-9]{4}"
index = "test"
type = "t"
# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK
id = ["id", "tag"]
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions river/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (r *River) prepareRule() error {
rr.Index = rule.Index
rr.Type = rule.Type
rr.Parent = rule.Parent
rr.ID = rule.ID
rr.FieldMapping = rule.FieldMapping
}
} else {
Expand Down
38 changes: 32 additions & 6 deletions river/river_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {

s.testExecute(c, "DROP TABLE IF EXISTS test_river")
s.testExecute(c, fmt.Sprintf(schema, "test_river"))
s.testExecute(c, fmt.Sprintf(schema, "test_for_id"))

for i := 0; i < 10; i++ {
table := fmt.Sprintf("test_river_%04d", i)
Expand All @@ -72,7 +73,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {

os.RemoveAll(cfg.DataDir)

cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}"}}}
cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}", "test_for_id"}}}

cfg.Rules = []*Rule{
&Rule{Schema: "test",
Expand All @@ -81,6 +82,14 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
Type: "river",
FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"},
},

&Rule{Schema: "test",
Table: "test_for_id",
Index: "river",
Type: "river",
ID: []string{"id", "title"},
FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"},
},

&Rule{Schema: "test",
Table: "test_river_[0-9]{4}",
Expand Down Expand Up @@ -120,7 +129,7 @@ data_dir = "./var"
[[source]]
schema = "test"

tables = ["test_river", "test_river_[0-9]{4}"]
tables = ["test_river", "test_river_[0-9]{4}", "test_for_id"]

[[rule]]
schema = "test"
Expand All @@ -133,6 +142,19 @@ parent = "pid"
title = "es_title"
mylist = "es_mylist,list"


[[rule]]
schema = "test"
table = "test_for_id"
index = "river"
type = "river"
parent = "pid"
id = ["id", "title"]
[rule.field]
title = "es_title"
mylist = "es_mylist,list"


[[rule]]
schema = "test"
table = "test_river_[0-9]{4}"
Expand All @@ -148,8 +170,8 @@ type = "river"
cfg, err := NewConfig(str)
c.Assert(err, IsNil)
c.Assert(cfg.Sources, HasLen, 1)
c.Assert(cfg.Sources[0].Tables, HasLen, 2)
c.Assert(cfg.Rules, HasLen, 2)
c.Assert(cfg.Sources[0].Tables, HasLen, 3)
c.Assert(cfg.Rules, HasLen, 3)
}

func (s *riverTestSuite) testExecute(c *C, query string, args ...interface{}) {
Expand All @@ -162,7 +184,8 @@ func (s *riverTestSuite) testPrepareData(c *C) {
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 2, "second", "hello mysql 2", "e2", "b,c")
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 3, "third", "hello elaticsearch 3", "e3", "c")
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tbit) VALUES (?, ?, ?, ?, ?, ?)", 4, "fouth", "hello go-mysql-elasticserach 4", "e1", "a,b,c", 0)

s.testExecute(c, "INSERT INTO test_for_id (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 1, "first", "hello go 1", "e1", "a,b")

for i := 0; i < 10; i++ {
table := fmt.Sprintf("test_river_%04d", i)
s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c")
Expand Down Expand Up @@ -208,7 +231,10 @@ func (s *riverTestSuite) TestRiver(c *C) {
c.Assert(r.Found, Equals, true)
c.Assert(r.Source["tenum"], Equals, "e1")
c.Assert(r.Source["tset"], Equals, "a,b")


r = s.testElasticGet(c, "1:first")
c.Assert(r.Found, Equals, true)

r = s.testElasticGet(c, "100")
c.Assert(r.Found, Equals, false)

Expand Down
1 change: 1 addition & 0 deletions river/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Rule struct {
Index string `toml:"index"`
Type string `toml:"type"`
Parent string `toml:"parent"`
ID []string `toml:"id"`

// Default, a MySQL table field name is mapped to Elasticsearch field name.
// Sometimes, you want to use different name, e.g, the MySQL file name is title,
Expand Down
29 changes: 22 additions & 7 deletions river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,35 @@ func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule,
}
}

// Get primary keys in one row and format them into a string
// PK must not be nil
// If id in toml file is none, get primary keys in one row and format them into a string, and PK must not be nil
// Else get the ID's column in one row and format them into a string
func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) {
pks, err := canal.GetPKValues(rule.TableInfo, row)
if err != nil {
return "", err
var (
ids []interface{}
err error
)
if rule.ID == nil {
ids, err = canal.GetPKValues(rule.TableInfo, row)
if err != nil {
return "", err
}
} else {
ids = make([]interface{}, 0, len(rule.ID))
for _, column := range rule.ID {
value, err := canal.GetColumnValue(rule.TableInfo, column, row)
if err != nil {
return "", err
}
ids = append(ids, value)
}
}

var buf bytes.Buffer

sep := ""
for i, value := range pks {
for i, value := range ids {
if value == nil {
return "", errors.Errorf("The %ds PK value is nil", i)
return "", errors.Errorf("The %ds id or PK value is nil", i)
}

buf.WriteString(fmt.Sprintf("%s%v", sep, value))
Expand Down
10 changes: 10 additions & 0 deletions vendor/github.com/siddontang/go-mysql/canal/rows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.