Skip to content

add support of parse json field #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 71 commits into from
Jun 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
8719e13
Update river.go
Mar 29, 2017
527dd68
Merge branch 'master' of https://github.com/siddontang/go-mysql-elast…
WangXiangUSTC Mar 31, 2017
e5ff5dc
if have only one request, use bulk will waste resource of elasticsearch
WangXiangUSTC Mar 31, 2017
74e0c36
Update river.go
WangXiangUSTC Apr 12, 2017
45c069a
Update README.md
WangXiangUSTC Apr 13, 2017
02ec489
Update README.md
WangXiangUSTC Apr 13, 2017
6c32bdc
update commit version of go-mysql
WangXiangUSTC Apr 13, 2017
4b0ba49
Update glide.lock
WangXiangUSTC Apr 28, 2017
4ef3830
add set of charcter
Apr 30, 2017
9b91277
update commit version
WangXiangUSTC May 5, 2017
e1e1472
update sync.go
WangXiangUSTC May 5, 2017
56cb5d7
update gp-mysql version
WangXiangUSTC May 5, 2017
e965925
update go-mysql version
WangXiangUSTC May 5, 2017
cdc4543
add config of mysql charset
WangXiangUSTC May 5, 2017
570317c
add test of mysql charset
WangXiangUSTC May 5, 2017
43d24a4
update go-mysql vendor
May 7, 2017
4bdeb15
update go-mysql vendor
May 7, 2017
945ddb2
Update sync.go
WangXiangUSTC May 7, 2017
cc7b314
add user and password of elasticsearch
WangXiangUSTC May 17, 2017
dc8a118
format code
WangXiangUSTC May 17, 2017
f8e619f
Update river.toml
WangXiangUSTC May 17, 2017
d516e6c
Update river_test.go
WangXiangUSTC May 17, 2017
230b029
Update client_test.go
WangXiangUSTC May 17, 2017
84dd57b
Update river_test.go
WangXiangUSTC May 17, 2017
8c8bceb
add config struct for elasticsearch
WangXiangUSTC May 19, 2017
7513258
Merge branch 'master' of https://github.com/WangXiangUSTC/go-mysql-el…
WangXiangUSTC May 19, 2017
fbe71a9
Update client_test.go
WangXiangUSTC May 19, 2017
1e0d0d4
Update client.go
WangXiangUSTC May 19, 2017
0629e8b
Update river_test.go
WangXiangUSTC May 19, 2017
912e9a8
Update river.go
WangXiangUSTC May 19, 2017
5e237e0
if http response code is not 2XX, return error
WangXiangUSTC May 29, 2017
5691ecd
Update client_test.go
WangXiangUSTC May 29, 2017
37d7961
Update client_test.go
WangXiangUSTC May 29, 2017
1b94f32
Update river_test.go
WangXiangUSTC May 29, 2017
e136df6
Update river_test.go
WangXiangUSTC May 29, 2017
72235fd
Update river_extra_test.go
WangXiangUSTC May 29, 2017
351742e
Update client_test.go
WangXiangUSTC Jun 3, 2017
66315f3
modify test code
Jun 4, 2017
1e21b3c
modify code about create mapping
Jun 6, 2017
7d38113
Merge branch 'master' into master
WangXiangUSTC Jun 6, 2017
7a7786d
judge http error in sync.go
Jun 18, 2017
5534119
Merge branch 'master' into master
WangXiangUSTC Jun 18, 2017
767e1c7
judge http response status
Jun 18, 2017
92ec5ad
fix bug in test code
Jun 18, 2017
dc17f52
fix bug in test code
Jun 18, 2017
b2b1222
Update river_test.go
WangXiangUSTC Jun 19, 2017
9ce43fb
Update sync.go
WangXiangUSTC Jun 19, 2017
c98d665
Update river_test.go
WangXiangUSTC Jun 19, 2017
447dc82
Update client.go
WangXiangUSTC Jun 19, 2017
f5b2279
Update client_test.go
WangXiangUSTC Jun 19, 2017
0ca37e3
Update client.go
WangXiangUSTC Jun 19, 2017
4f403db
Update client.go
WangXiangUSTC Jun 19, 2017
6c68f84
Update client.go
WangXiangUSTC Jun 19, 2017
6f3c080
Update client.go
WangXiangUSTC Jun 20, 2017
8ab1723
add support of parse json field
Jun 23, 2017
8ec6e48
Merge remote-tracking branch 'siddontang/master'
Jun 23, 2017
154366a
add test of json field
Jun 25, 2017
eb4b28d
fix the bug of test
Jun 25, 2017
5a4d850
fix the bug of test
Jun 25, 2017
e417e92
fix the bug of test
Jun 25, 2017
deee9d1
Update river_test.go
WangXiangUSTC Jun 25, 2017
feb0ad5
try to support mysql 5.7 in travis ci
Jun 25, 2017
923f83a
try to support mysql 5.7 in travis ci
Jun 25, 2017
f2ab181
try to support mysql 5.7 in travis ci
Jun 25, 2017
85c23cf
try to support mysql 5.7 in travis ci
Jun 25, 2017
500e4d4
try to support mysql 5.7 in travis ci
Jun 25, 2017
c82a1d6
try to support mysql 5.7 in travis ci
Jun 25, 2017
e4126d6
try to support mysql 5.7 in travis ci
Jun 25, 2017
daafd0f
try to support mysql 5.7 in travis ci
Jun 25, 2017
0accf56
Update river_test.go
WangXiangUSTC Jun 25, 2017
629cc8e
Update river_test.go
WangXiangUSTC Jun 25, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .travis.install-mysql-5.7.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mysql --version
echo mysql-apt-config mysql-apt-config/select-server select mysql-5.7 | sudo debconf-set-selections
wget https://dev.mysql.com/get/mysql-apt-config_0.8.6-1_all.deb
sudo dpkg --install mysql-apt-config_0.8.6-1_all.deb
sudo apt-get update -q
sudo apt-get install -q -y -o Dpkg::Options::=--force-confnew mysql-server
sudo mysql_upgrade --force
mysql --version
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ go:

services:
- elasticsearch
- mysql

before_install:
- go install -race std

# stop mysql and use row-based format binlog
- "bash .travis.install-mysql-5.7.sh"
- "sudo /etc/init.d/mysql stop || true"
- "echo '[mysqld]' | sudo tee /etc/mysql/conf.d/replication.cnf"
- "echo 'server-id=1' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
Expand All @@ -25,3 +27,4 @@ before_install:

script:
- go test --race ./...

49 changes: 41 additions & 8 deletions river/river_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,19 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
PRIMARY KEY(id)) ENGINE=INNODB;
`

schema_json := `
CREATE TABLE IF NOT EXISTS %s (
id INT,
info JSON,
PRIMARY KEY(id)) ENGINE=INNODB;
`

s.testExecute(c, "DROP TABLE IF EXISTS test_river")
s.testExecute(c, "DROP TABLE IF EXISTS test_for_id")
s.testExecute(c, "DROP TABLE IF EXISTS test_for_json")
s.testExecute(c, fmt.Sprintf(schema, "test_river"))
s.testExecute(c, fmt.Sprintf(schema, "test_for_id"))
s.testExecute(c, fmt.Sprintf(schema_json, "test_for_json"))

for i := 0; i < 10; i++ {
table := fmt.Sprintf("test_river_%04d", i)
Expand All @@ -74,7 +84,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {

os.RemoveAll(cfg.DataDir)

cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}", "test_for_id"}}}
cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river", "test_river_[0-9]{4}", "test_for_id", "test_for_json"}}}

cfg.Rules = []*Rule{
&Rule{Schema: "test",
Expand All @@ -83,7 +93,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
Type: "river",
FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"},
},

&Rule{Schema: "test",
Table: "test_for_id",
Index: "river",
Expand All @@ -98,6 +108,12 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
Type: "river",
FieldMapping: map[string]string{"title": "es_title", "mylist": "es_mylist,list"},
},

&Rule{Schema: "test",
Table: "test_for_json",
Index: "river",
Type: "river",
},
}

s.r, err = NewRiver(cfg)
Expand Down Expand Up @@ -131,7 +147,7 @@ data_dir = "./var"
[[source]]
schema = "test"

tables = ["test_river", "test_river_[0-9]{4}", "test_for_id"]
tables = ["test_river", "test_river_[0-9]{4}", "test_for_id", "test_for_json"]

[[rule]]
schema = "test"
Expand Down Expand Up @@ -167,13 +183,18 @@ type = "river"
title = "es_title"
mylist = "es_mylist,list"

[[rule]]
schema = "test"
table = "test_for_json"
index = "river"
type = "river"
`

cfg, err := NewConfig(str)
c.Assert(err, IsNil)
c.Assert(cfg.Sources, HasLen, 1)
c.Assert(cfg.Sources[0].Tables, HasLen, 3)
c.Assert(cfg.Rules, HasLen, 3)
c.Assert(cfg.Sources[0].Tables, HasLen, 4)
c.Assert(cfg.Rules, HasLen, 4)
}

func (s *riverTestSuite) testExecute(c *C, query string, args ...interface{}) {
Expand All @@ -187,7 +208,8 @@ func (s *riverTestSuite) testPrepareData(c *C) {
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 3, "third", "hello elaticsearch 3", "e3", "c")
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tbit) VALUES (?, ?, ?, ?, ?, ?)", 4, "fouth", "hello go-mysql-elasticserach 4", "e1", "a,b,c", 0)
s.testExecute(c, "INSERT INTO test_for_id (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", 1, "first", "hello go 1", "e1", "a,b")

s.testExecute(c, "INSERT INTO test_for_json (id, info) VALUES (?, ?)", 9200, "{\"first\": \"a\", \"second\": \"b\"}")

for i := 0; i < 10; i++ {
table := fmt.Sprintf("test_river_%04d", i)
s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c")
Expand Down Expand Up @@ -233,10 +255,21 @@ func (s *riverTestSuite) TestRiver(c *C) {
c.Assert(r.Found, Equals, true)
c.Assert(r.Source["tenum"], Equals, "e1")
c.Assert(r.Source["tset"], Equals, "a,b")

r = s.testElasticGet(c, "1:first")
c.Assert(r.Found, Equals, true)


r = s.testElasticGet(c, "9200")
c.Assert(r.Found, Equals, true)
switch v := r.Source["info"].(type) {
case map[string]interface{}:
c.Assert(v["first"], Equals, "a")
c.Assert(v["second"], Equals, "b")
default:
c.Assert(v, Equals, nil)
c.Assert(true, Equals, false)
}

r = s.testElasticGet(c, "100")
c.Assert(r.Found, Equals, false)

Expand Down
13 changes: 13 additions & 0 deletions river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"strings"
"time"
"encoding/json"

"github.com/juju/errors"
"github.com/ngaut/log"
Expand Down Expand Up @@ -290,6 +291,18 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in
case []byte:
return string(value[:])
}
case schema.TYPE_JSON:
var f interface{}
var err error
switch v := value.(type) {
case string:
err = json.Unmarshal([]byte(v), &f)
case []byte:
err = json.Unmarshal(v, &f)
}
if err == nil && f != nil {
return f
}
}

return value
Expand Down