diff --git a/elastic/client.go b/elastic/client.go index 72d11452..25935321 100644 --- a/elastic/client.go +++ b/elastic/client.go @@ -15,14 +15,25 @@ import ( // Because we only need some very simple usages. type Client struct { Addr string + User string + Password string c *http.Client } -func NewClient(addr string) *Client { +type ClientConfig struct { + Addr string + User string + Password string +} + + +func NewClient(conf *ClientConfig) *Client { c := new(Client) - c.Addr = addr + c.Addr = conf.Addr + c.User = conf.User + c.Password = conf.Password c.c = &http.Client{} @@ -134,20 +145,28 @@ type BulkResponseItem struct { Found bool `json:"found"` } -func (c *Client) Do(method string, url string, body map[string]interface{}) (*Response, error) { - bodyData, err := json.Marshal(body) +func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) { + req, err := http.NewRequest(method, url, body) if err != nil { return nil, errors.Trace(err) } + if len(c.User) > 0 && len(c.Password) > 0 { + req.SetBasicAuth(c.User, c.Password) + } + resp, err := c.c.Do(req) - buf := bytes.NewBuffer(bodyData) + return resp, err +} - req, err := http.NewRequest(method, url, buf) +func (c *Client) Do(method string, url string, body map[string]interface{}) (*Response, error) { + bodyData, err := json.Marshal(body) if err != nil { return nil, errors.Trace(err) } - resp, err := c.c.Do(req) + buf := bytes.NewBuffer(bodyData) + + resp, err := c.DoRequest(method, url, buf) if err != nil { return nil, errors.Trace(err) } @@ -178,12 +197,7 @@ func (c *Client) DoBulk(url string, items []*BulkRequest) (*BulkResponse, error) } } - req, err := http.NewRequest("POST", url, &buf) - if err != nil { - return nil, errors.Trace(err) - } - - resp, err := c.c.Do(req) + resp, err := c.DoRequest("POST", url, &buf) if err != nil { return nil, errors.Trace(err) } @@ -214,13 +228,15 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string] return errors.Trace(err) } - // index doesn't exist, create index first - if r.Code != http.StatusOK { + // if index doesn't exist, will get 404 not found, create index first + if r.Code == http.StatusNotFound { _, err = c.Do("PUT", reqUrl, nil) if err != nil { return errors.Trace(err) } + } else if r.Code != http.StatusOK { + return errors.Errorf("Error: %s, code: %d", http.StatusText(r.Code), r.Code) } reqUrl = fmt.Sprintf("http://%s/%s/%s/_mapping", c.Addr, diff --git a/elastic/client_test.go b/elastic/client_test.go index 00b72ee5..a9650b98 100644 --- a/elastic/client_test.go +++ b/elastic/client_test.go @@ -22,7 +22,11 @@ type elasticTestSuite struct { var _ = Suite(&elasticTestSuite{}) func (s *elasticTestSuite) SetUpSuite(c *C) { - s.c = NewClient(fmt.Sprintf("%s:%d", *host, *port)) + cfg := new(ClientConfig) + cfg.Addr = fmt.Sprintf("%s:%d", *host, *port) + cfg.User = "" + cfg.Password = "" + s.c = NewClient(cfg) } func (s *elasticTestSuite) TearDownSuite(c *C) { @@ -59,9 +63,6 @@ func (s *elasticTestSuite) TestSimple(c *C) { err = s.c.Delete(index, docType, "1") c.Assert(err, IsNil) - err = s.c.Delete(index, docType, "1") - c.Assert(err, IsNil) - exists, err = s.c.Exists(index, docType, "1") c.Assert(err, IsNil) c.Assert(exists, Equals, false) @@ -126,6 +127,7 @@ func (s *elasticTestSuite) TestParent(c *C) { c.Assert(err, IsNil) c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) + for i := 0; i < 10; i++ { id := fmt.Sprintf("%d", i) req := new(BulkRequest) diff --git a/etc/river.toml b/etc/river.toml index 0b9d5d68..553b0062 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -7,6 +7,9 @@ my_charset = "utf8" # Elasticsearch address es_addr = "127.0.0.1:9200" +# Elasticsearch user and password, maybe set by shield, nginx, or x-pack +es_user = "" +es_pass = "" # Path to store data, like master.info, if not set or empty, # we must use this to support breakpoint resume syncing. diff --git a/river/config.go b/river/config.go index c6f626f3..a5777dca 100644 --- a/river/config.go +++ b/river/config.go @@ -19,7 +19,9 @@ type Config struct { MyPassword string `toml:"my_pass"` MyCharset string `toml:"my_charset"` - ESAddr string `toml:"es_addr"` + ESAddr string `toml:"es_addr"` + ESUser string `toml:"es_user"` + ESPassword string `toml:"es_pass"` StatAddr string `toml:"stat_addr"` diff --git a/river/river.go b/river/river.go index 61e7121f..d60507b4 100644 --- a/river/river.go +++ b/river/river.go @@ -66,7 +66,12 @@ func NewRiver(c *Config) (*River, error) { return nil, errors.Trace(err) } - r.es = elastic.NewClient(r.c.ESAddr) + cfg := new(elastic.ClientConfig) + cfg.Addr = r.c.ESAddr + cfg.User = r.c.ESUser + cfg.Password = r.c.ESPassword + r.es = elastic.NewClient(cfg) + r.st = &stat{r: r} go r.st.Run(r.c.StatAddr) diff --git a/river/river_test.go b/river/river_test.go index 0cc7e28d..3a32c649 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -124,7 +124,8 @@ my_user = "root" my_pass = "" my_charset = "utf8" es_addr = "127.0.0.1:9200" - +es_user = "" +es_pass = "" data_dir = "./var" [[source]] diff --git a/river/sync.go b/river/sync.go index 5f914d08..2c48d1a4 100644 --- a/river/sync.go +++ b/river/sync.go @@ -439,7 +439,7 @@ func (r *River) doBulk(reqs []*elastic.BulkRequest) error { if resp, err := r.es.Bulk(reqs); err != nil { log.Errorf("sync docs err %v after binlog %s", err, r.canal.SyncedPosition()) return errors.Trace(err) - } else if resp.Errors { + } else if resp.Code / 100 == 2 || resp.Errors { for i := 0; i < len(resp.Items); i++ { for action, item := range resp.Items[i] { if len(item.Error) > 0 {