Skip to content

add config of elasticsearch's user and password #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 54 commits into from
Jun 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
8719e13
Update river.go
Mar 29, 2017
527dd68
Merge branch 'master' of https://github.com/siddontang/go-mysql-elast…
WangXiangUSTC Mar 31, 2017
e5ff5dc
if have only one request, use bulk will waste resource of elasticsearch
WangXiangUSTC Mar 31, 2017
74e0c36
Update river.go
WangXiangUSTC Apr 12, 2017
45c069a
Update README.md
WangXiangUSTC Apr 13, 2017
02ec489
Update README.md
WangXiangUSTC Apr 13, 2017
6c32bdc
update commit version of go-mysql
WangXiangUSTC Apr 13, 2017
4b0ba49
Update glide.lock
WangXiangUSTC Apr 28, 2017
4ef3830
add set of charcter
Apr 30, 2017
9b91277
update commit version
WangXiangUSTC May 5, 2017
e1e1472
update sync.go
WangXiangUSTC May 5, 2017
56cb5d7
update gp-mysql version
WangXiangUSTC May 5, 2017
e965925
update go-mysql version
WangXiangUSTC May 5, 2017
cdc4543
add config of mysql charset
WangXiangUSTC May 5, 2017
570317c
add test of mysql charset
WangXiangUSTC May 5, 2017
43d24a4
update go-mysql vendor
May 7, 2017
4bdeb15
update go-mysql vendor
May 7, 2017
945ddb2
Update sync.go
WangXiangUSTC May 7, 2017
cc7b314
add user and password of elasticsearch
WangXiangUSTC May 17, 2017
dc8a118
format code
WangXiangUSTC May 17, 2017
f8e619f
Update river.toml
WangXiangUSTC May 17, 2017
d516e6c
Update river_test.go
WangXiangUSTC May 17, 2017
230b029
Update client_test.go
WangXiangUSTC May 17, 2017
84dd57b
Update river_test.go
WangXiangUSTC May 17, 2017
8c8bceb
add config struct for elasticsearch
WangXiangUSTC May 19, 2017
7513258
Merge branch 'master' of https://github.com/WangXiangUSTC/go-mysql-el…
WangXiangUSTC May 19, 2017
fbe71a9
Update client_test.go
WangXiangUSTC May 19, 2017
1e0d0d4
Update client.go
WangXiangUSTC May 19, 2017
0629e8b
Update river_test.go
WangXiangUSTC May 19, 2017
912e9a8
Update river.go
WangXiangUSTC May 19, 2017
5e237e0
if http response code is not 2XX, return error
WangXiangUSTC May 29, 2017
5691ecd
Update client_test.go
WangXiangUSTC May 29, 2017
37d7961
Update client_test.go
WangXiangUSTC May 29, 2017
1b94f32
Update river_test.go
WangXiangUSTC May 29, 2017
e136df6
Update river_test.go
WangXiangUSTC May 29, 2017
72235fd
Update river_extra_test.go
WangXiangUSTC May 29, 2017
351742e
Update client_test.go
WangXiangUSTC Jun 3, 2017
66315f3
modify test code
Jun 4, 2017
1e21b3c
modify code about create mapping
Jun 6, 2017
7d38113
Merge branch 'master' into master
WangXiangUSTC Jun 6, 2017
7a7786d
judge http error in sync.go
Jun 18, 2017
5534119
Merge branch 'master' into master
WangXiangUSTC Jun 18, 2017
767e1c7
judge http response status
Jun 18, 2017
92ec5ad
fix bug in test code
Jun 18, 2017
dc17f52
fix bug in test code
Jun 18, 2017
b2b1222
Update river_test.go
WangXiangUSTC Jun 19, 2017
9ce43fb
Update sync.go
WangXiangUSTC Jun 19, 2017
c98d665
Update river_test.go
WangXiangUSTC Jun 19, 2017
447dc82
Update client.go
WangXiangUSTC Jun 19, 2017
f5b2279
Update client_test.go
WangXiangUSTC Jun 19, 2017
0ca37e3
Update client.go
WangXiangUSTC Jun 19, 2017
4f403db
Update client.go
WangXiangUSTC Jun 19, 2017
6c68f84
Update client.go
WangXiangUSTC Jun 19, 2017
6f3c080
Update client.go
WangXiangUSTC Jun 20, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions elastic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,25 @@ import (
// Because we only need some very simple usages.
type Client struct {
Addr string
User string
Password string

c *http.Client
}

func NewClient(addr string) *Client {
type ClientConfig struct {
Addr string
User string
Password string
}


func NewClient(conf *ClientConfig) *Client {
c := new(Client)

c.Addr = addr
c.Addr = conf.Addr
c.User = conf.User
c.Password = conf.Password

c.c = &http.Client{}

Expand Down Expand Up @@ -134,20 +145,28 @@ type BulkResponseItem struct {
Found bool `json:"found"`
}

func (c *Client) Do(method string, url string, body map[string]interface{}) (*Response, error) {
bodyData, err := json.Marshal(body)
func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) {
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, errors.Trace(err)
}
if len(c.User) > 0 && len(c.Password) > 0 {
req.SetBasicAuth(c.User, c.Password)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we meet the case that user is not empty but the password is?

}
resp, err := c.c.Do(req)

buf := bytes.NewBuffer(bodyData)
return resp, err
}

req, err := http.NewRequest(method, url, buf)
func (c *Client) Do(method string, url string, body map[string]interface{}) (*Response, error) {
bodyData, err := json.Marshal(body)
if err != nil {
return nil, errors.Trace(err)
}

resp, err := c.c.Do(req)
buf := bytes.NewBuffer(bodyData)

resp, err := c.DoRequest(method, url, buf)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -178,12 +197,7 @@ func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error)
}
}

req, err := http.NewRequest("POST", url, &buf)
if err != nil {
return nil, errors.Trace(err)
}

resp, err := c.c.Do(req)
resp, err := c.DoRequest("POST", url, &buf)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -214,13 +228,15 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string]
return errors.Trace(err)
}

// index doesn't exist, create index first
if r.Code != http.StatusOK {
// if index doesn't exist, will get 404 not found, create index first
if r.Code == http.StatusNotFound {
_, err = c.Do("PUT", reqUrl, nil)

if err != nil {
return errors.Trace(err)
}
} else if r.Code != http.StatusOK {
return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
}

reqUrl = fmt.Sprintf("http://%s/%s/%s/_mapping", c.Addr,
Expand Down
10 changes: 6 additions & 4 deletions elastic/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ type elasticTestSuite struct {
var _ = Suite(&elasticTestSuite{})

func (s *elasticTestSuite) SetUpSuite(c *C) {
s.c = NewClient(fmt.Sprintf("%s:%d", *host, *port))
cfg := new(ClientConfig)
cfg.Addr = fmt.Sprintf("%s:%d", *host, *port)
cfg.User = ""
cfg.Password = ""
s.c = NewClient(cfg)
}

func (s *elasticTestSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -59,9 +63,6 @@ func (s *elasticTestSuite) TestSimple(c *C) {
err = s.c.Delete(index, docType, "1")
c.Assert(err, IsNil)

err = s.c.Delete(index, docType, "1")
c.Assert(err, IsNil)

exists, err = s.c.Exists(index, docType, "1")
c.Assert(err, IsNil)
c.Assert(exists, Equals, false)
Expand Down Expand Up @@ -126,6 +127,7 @@ func (s *elasticTestSuite) TestParent(c *C) {
c.Assert(err, IsNil)
c.Assert(resp.Code, Equals, 200)
c.Assert(resp.Errors, Equals, false)

for i := 0; i < 10; i++ {
id := fmt.Sprintf("%d", i)
req := new(BulkRequest)
Expand Down
3 changes: 3 additions & 0 deletions etc/river.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ my_charset = "utf8"

# Elasticsearch address
es_addr = "127.0.0.1:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""

# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
Expand Down
4 changes: 3 additions & 1 deletion river/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ type Config struct {
MyPassword string `toml:"my_pass"`
MyCharset string `toml:"my_charset"`

ESAddr string `toml:"es_addr"`
ESAddr string `toml:"es_addr"`
ESUser string `toml:"es_user"`
ESPassword string `toml:"es_pass"`

StatAddr string `toml:"stat_addr"`

Expand Down
7 changes: 6 additions & 1 deletion river/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ func NewRiver(c *Config) (*River, error) {
return nil, errors.Trace(err)
}

r.es = elastic.NewClient(r.c.ESAddr)
cfg := new(elastic.ClientConfig)
cfg.Addr = r.c.ESAddr
cfg.User = r.c.ESUser
cfg.Password = r.c.ESPassword
r.es = elastic.NewClient(cfg)


r.st = &stat{r: r}
go r.st.Run(r.c.StatAddr)
Expand Down
3 changes: 2 additions & 1 deletion river/river_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ my_user = "root"
my_pass = ""
my_charset = "utf8"
es_addr = "127.0.0.1:9200"

es_user = ""
es_pass = ""
data_dir = "./var"

[[source]]
Expand Down
2 changes: 1 addition & 1 deletion river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (r *River) doBulk(reqs []*elastic.BulkRequest) error {
if resp, err := r.es.Bulk(reqs); err != nil {
log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition())
return errors.Trace(err)
} else if resp.Errors {
} else if resp.Code / 100 == 2 || resp.Errors {
for i := 0; i < len(resp.Items); i++ {
for action, item := range resp.Items[i] {
if len(item.Error) > 0 {
Expand Down