Skip to content

Commit c48878b

Browse files
WangXiangUSTCsiddontang
authored andcommitted
can config es id's constituent part (#100)
1 parent 686718b commit c48878b

File tree

8 files changed

+88
-15
lines changed

8 files changed

+88
-15
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ It uses `mysqldump` to fetch the origin data at first, then syncs data increment
2323
+ binlog format must be **row**.
2424
+ binlog row image must be **full** for MySQL, you may lost some field data if you update PK data in MySQL with minimal or noblob binlog row image. MariaDB only supports full row image.
2525
+ Can not alter table format at runtime.
26-
+ MySQL table which will be synced must have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch.
26+
+ MySQL table which will be synced should have a PK(primary key), multi columns PK is allowed now, e,g, if the PKs is (a, b), we will use "a:b" as the key. The PK data will be used as "id" in Elasticsearch. And you can also config the id's constituent part with other column.
2727
+ You should create the associated mappings in Elasticsearch first, I don't think using the default mapping is a wise decision, you must know how to search accurately.
2828
+ `mysqldump` must exist in the same node with go-mysql-elasticsearch, if not, go-mysql-elasticsearch will try to sync binlog only.
2929
+ Don't change too many rows at same time in one SQL.

etc/river.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,23 @@ type = "tfilter"
113113

114114
# Only sync following columns
115115
filter = ["id", "name"]
116+
117+
# id rule
118+
#
119+
# desc tid_[0-9]{4};
120+
# +----------+--------------+------+-----+---------+-------+
121+
# | Field | Type | Null | Key | Default | Extra |
122+
# +----------+--------------+------+-----+---------+-------+
123+
# | id | int(11) | NO | PRI | NULL | |
124+
# | tag | varchar(256) | YES | | NULL | |
125+
# | desc | varchar(256) | YES | | NULL | |
126+
# +----------+--------------+------+-----+---------+-------+
127+
#
128+
[[rule]]
129+
schema = "test"
130+
table = "tid_[0-9]{4}"
131+
index = "test"
132+
type = "t"
133+
# The es doc's id will be `id`:`tag`
134+
# It is useful for merge muliple table into one type while theses tables have same PK
135+
id = ["id", "tag"]

glide.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

river/river.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ func (r *River) prepareRule() error {
212212
rr.Index = rule.Index
213213
rr.Type = rule.Type
214214
rr.Parent = rule.Parent
215+
rr.ID = rule.ID
215216
rr.FieldMapping = rule.FieldMapping
216217
}
217218
} else {

river/river_test.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
4747

4848
s.testExecute(c, "DROP TABLE IF EXISTS test_river")
4949
s.testExecute(c, fmt.Sprintf(schema, "test_river"))
50+
s.testExecute(c, fmt.Sprintf(schema, "test_for_id"))
5051

5152
for i := 0; i < 10; i++ {
5253
table := fmt.Sprintf("test_river_%04d", i)
@@ -72,7 +73,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
7273

7374
os.RemoveAll(cfg.DataDir)
7475

75-
cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}"}}}
76+
cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}", "test_for_id"}}}
7677

7778
cfg.Rules = []*Rule{
7879
&Rule{Schema: "test",
@@ -81,6 +82,14 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
8182
Type: "river",
8283
FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"},
8384
},
85+
86+
&Rule{Schema: "test",
87+
Table: "test_for_id",
88+
Index: "river",
89+
Type: "river",
90+
ID: []string{"id", "title"},
91+
FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"},
92+
},
8493

8594
&Rule{Schema: "test",
8695
Table: "test_river_[0-9]{4}",
@@ -120,7 +129,7 @@ data_dir = "./var"
120129
[[source]]
121130
schema = "test"
122131
123-
tables = ["test_river", "test_river_[0-9]{4}"]
132+
tables = ["test_river", "test_river_[0-9]{4}", "test_for_id"]
124133
125134
[[rule]]
126135
schema = "test"
@@ -133,6 +142,19 @@ parent = "pid"
133142
title = "es_title"
134143
mylist = "es_mylist,list"
135144
145+
146+
[[rule]]
147+
schema = "test"
148+
table = "test_for_id"
149+
index = "river"
150+
type = "river"
151+
parent = "pid"
152+
id = ["id", "title"]
153+
[rule.field]
154+
title = "es_title"
155+
mylist = "es_mylist,list"
156+
157+
136158
[[rule]]
137159
schema = "test"
138160
table = "test_river_[0-9]{4}"
@@ -148,8 +170,8 @@ type = "river"
148170
cfg, err := NewConfig(str)
149171
c.Assert(err, IsNil)
150172
c.Assert(cfg.Sources, HasLen, 1)
151-
c.Assert(cfg.Sources[0].Tables, HasLen, 2)
152-
c.Assert(cfg.Rules, HasLen, 2)
173+
c.Assert(cfg.Sources[0].Tables, HasLen, 3)
174+
c.Assert(cfg.Rules, HasLen, 3)
153175
}
154176

155177
func (s *riverTestSuite) testExecute(c *C, query string, args ...interface{}) {
@@ -162,7 +184,8 @@ func (s *riverTestSuite) testPrepareData(c *C) {
162184
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 2, "second", "hello mysql 2", "e2", "b,c")
163185
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 3, "third", "hello elaticsearch 3", "e3", "c")
164186
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tbit) VALUES (?, ?, ?, ?, ?, ?)", 4, "fouth", "hello go-mysql-elasticserach 4", "e1", "a,b,c", 0)
165-
187+
s.testExecute(c, "INSERT INTO test_for_id (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 1, "first", "hello go 1", "e1", "a,b")
188+
166189
for i := 0; i < 10; i++ {
167190
table := fmt.Sprintf("test_river_%04d", i)
168191
s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c")
@@ -208,7 +231,10 @@ func (s *riverTestSuite) TestRiver(c *C) {
208231
c.Assert(r.Found, Equals, true)
209232
c.Assert(r.Source["tenum"], Equals, "e1")
210233
c.Assert(r.Source["tset"], Equals, "a,b")
211-
234+
235+
r = s.testElasticGet(c, "1:first")
236+
c.Assert(r.Found, Equals, true)
237+
212238
r = s.testElasticGet(c, "100")
213239
c.Assert(r.Found, Equals, false)
214240

river/rule.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type Rule struct {
1313
Index string `toml:"index"`
1414
Type string `toml:"type"`
1515
Parent string `toml:"parent"`
16+
ID []string `toml:"id"`
1617

1718
// Default, a MySQL table field name is mapped to Elasticsearch field name.
1819
// Sometimes, you want to use different name, e.g, the MySQL file name is title,

river/sync.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -384,20 +384,35 @@ func (r *River) makeUpdateReqData(req *elastic.BulkRequest, rule *Rule,
384384
}
385385
}
386386

387-
// Get primary keys in one row and format them into a string
388-
// PK must not be nil
387+
// If id in toml file is none, get primary keys in one row and format them into a string, and PK must not be nil
388+
// Else get the ID's column in one row and format them into a string
389389
func (r *River) getDocID(rule *Rule, row []interface{}) (string, error) {
390-
pks, err := canal.GetPKValues(rule.TableInfo, row)
391-
if err != nil {
392-
return "", err
390+
var (
391+
ids []interface{}
392+
err error
393+
)
394+
if rule.ID == nil {
395+
ids, err = canal.GetPKValues(rule.TableInfo, row)
396+
if err != nil {
397+
return "", err
398+
}
399+
} else {
400+
ids = make([]interface{}, 0, len(rule.ID))
401+
for _, column := range rule.ID {
402+
value, err := canal.GetColumnValue(rule.TableInfo, column, row)
403+
if err != nil {
404+
return "", err
405+
}
406+
ids = append(ids, value)
407+
}
393408
}
394409

395410
var buf bytes.Buffer
396411

397412
sep := ""
398-
for i, value := range pks {
413+
for i, value := range ids {
399414
if value == nil {
400-
return "", errors.Errorf("The %ds PK value is nil", i)
415+
return "", errors.Errorf("The %ds id or PK value is nil", i)
401416
}
402417

403418
buf.WriteString(fmt.Sprintf("%s%v", sep, value))

vendor/github.com/siddontang/go-mysql/canal/rows.go

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)