Skip to content

Commit 4515d5b

Browse files
WangXiangUSTCsiddontang
authored andcommitted
add config of elasticsearch's user and password (#104)
1 parent 98c1450 commit 4515d5b

File tree

7 files changed

+52
-23
lines changed

7 files changed

+52
-23
lines changed

elastic/client.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,25 @@ import (
1515
// Because we only need some very simple usages.
1616
type Client struct {
1717
Addr string
18+
User string
19+
Password string
1820

1921
c *http.Client
2022
}
2123

22-
func NewClient(addr string) *Client {
24+
type ClientConfig struct {
25+
Addr string
26+
User string
27+
Password string
28+
}
29+
30+
31+
func NewClient(conf *ClientConfig) *Client {
2332
c := new(Client)
2433

25-
c.Addr = addr
34+
c.Addr = conf.Addr
35+
c.User = conf.User
36+
c.Password = conf.Password
2637

2738
c.c = &http.Client{}
2839

@@ -134,20 +145,28 @@ type BulkResponseItem struct {
134145
Found bool `json:"found"`
135146
}
136147

137-
func (c *Client) Do(method string, url string, body map[string]interface{}) (*Response, error) {
138-
bodyData, err := json.Marshal(body)
148+
func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) {
149+
req, err := http.NewRequest(method, url, body)
139150
if err != nil {
140151
return nil, errors.Trace(err)
141152
}
153+
if len(c.User) > 0 && len(c.Password) > 0 {
154+
req.SetBasicAuth(c.User, c.Password)
155+
}
156+
resp, err := c.c.Do(req)
142157

143-
buf := bytes.NewBuffer(bodyData)
158+
return resp, err
159+
}
144160

145-
req, err := http.NewRequest(method, url, buf)
161+
func (c *Client) Do(method string, url string, body map[string]interface{}) (*Response, error) {
162+
bodyData, err := json.Marshal(body)
146163
if err != nil {
147164
return nil, errors.Trace(err)
148165
}
149166

150-
resp, err := c.c.Do(req)
167+
buf := bytes.NewBuffer(bodyData)
168+
169+
resp, err := c.DoRequest(method, url, buf)
151170
if err != nil {
152171
return nil, errors.Trace(err)
153172
}
@@ -178,12 +197,7 @@ func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error)
178197
}
179198
}
180199

181-
req, err := http.NewRequest("POST", url, &buf)
182-
if err != nil {
183-
return nil, errors.Trace(err)
184-
}
185-
186-
resp, err := c.c.Do(req)
200+
resp, err := c.DoRequest("POST", url, &buf)
187201
if err != nil {
188202
return nil, errors.Trace(err)
189203
}
@@ -214,13 +228,15 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string]
214228
return errors.Trace(err)
215229
}
216230

217-
// index doesn't exist, create index first
218-
if r.Code != http.StatusOK {
231+
// if index doesn't exist, will get 404 not found, create index first
232+
if r.Code == http.StatusNotFound {
219233
_, err = c.Do("PUT", reqUrl, nil)
220234

221235
if err != nil {
222236
return errors.Trace(err)
223237
}
238+
} else if r.Code != http.StatusOK {
239+
return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code)
224240
}
225241

226242
reqUrl = fmt.Sprintf("http://%s/%s/%s/_mapping", c.Addr,

elastic/client_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ type elasticTestSuite struct {
2222
var _ = Suite(&elasticTestSuite{})
2323

2424
func (s *elasticTestSuite) SetUpSuite(c *C) {
25-
s.c = NewClient(fmt.Sprintf("%s:%d", *host, *port))
25+
cfg := new(ClientConfig)
26+
cfg.Addr = fmt.Sprintf("%s:%d", *host, *port)
27+
cfg.User = ""
28+
cfg.Password = ""
29+
s.c = NewClient(cfg)
2630
}
2731

2832
func (s *elasticTestSuite) TearDownSuite(c *C) {
@@ -59,9 +63,6 @@ func (s *elasticTestSuite) TestSimple(c *C) {
5963
err = s.c.Delete(index, docType, "1")
6064
c.Assert(err, IsNil)
6165

62-
err = s.c.Delete(index, docType, "1")
63-
c.Assert(err, IsNil)
64-
6566
exists, err = s.c.Exists(index, docType, "1")
6667
c.Assert(err, IsNil)
6768
c.Assert(exists, Equals, false)
@@ -126,6 +127,7 @@ func (s *elasticTestSuite) TestParent(c *C) {
126127
c.Assert(err, IsNil)
127128
c.Assert(resp.Code, Equals, 200)
128129
c.Assert(resp.Errors, Equals, false)
130+
129131
for i := 0; i < 10; i++ {
130132
id := fmt.Sprintf("%d", i)
131133
req := new(BulkRequest)

etc/river.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ my_charset = "utf8"
77

88
# Elasticsearch address
99
es_addr = "127.0.0.1:9200"
10+
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
11+
es_user = ""
12+
es_pass = ""
1013

1114
# Path to store data, like master.info, if not set or empty,
1215
# we must use this to support breakpoint resume syncing.

river/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ type Config struct {
1919
MyPassword string `toml:"my_pass"`
2020
MyCharset string `toml:"my_charset"`
2121

22-
ESAddr string `toml:"es_addr"`
22+
ESAddr string `toml:"es_addr"`
23+
ESUser string `toml:"es_user"`
24+
ESPassword string `toml:"es_pass"`
2325

2426
StatAddr string `toml:"stat_addr"`
2527

river/river.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ func NewRiver(c *Config) (*River, error) {
6666
return nil, errors.Trace(err)
6767
}
6868

69-
r.es = elastic.NewClient(r.c.ESAddr)
69+
cfg := new(elastic.ClientConfig)
70+
cfg.Addr = r.c.ESAddr
71+
cfg.User = r.c.ESUser
72+
cfg.Password = r.c.ESPassword
73+
r.es = elastic.NewClient(cfg)
74+
7075

7176
r.st = &stat{r: r}
7277
go r.st.Run(r.c.StatAddr)

river/river_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ my_user = "root"
124124
my_pass = ""
125125
my_charset = "utf8"
126126
es_addr = "127.0.0.1:9200"
127-
127+
es_user = ""
128+
es_pass = ""
128129
data_dir = "./var"
129130
130131
[[source]]

river/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ func (r *River) doBulk(reqs []*elastic.BulkRequest) error {
439439
if resp, err := r.es.Bulk(reqs); err != nil {
440440
log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition())
441441
return errors.Trace(err)
442-
} else if resp.Errors {
442+
} else if resp.Code / 100 == 2 || resp.Errors {
443443
for i := 0; i < len(resp.Items); i++ {
444444
for action, item := range resp.Items[i] {
445445
if len(item.Error) > 0 {

0 commit comments

Comments
 (0)