diff --git a/elastic/client.go b/elastic/client.go index 659a5871..24bbe3f1 100644 --- a/elastic/client.go +++ b/elastic/client.go @@ -144,6 +144,20 @@ type BulkResponseItem struct { Found bool `json:"found"` } +type MappingResponse struct { + Code int + Mapping Mapping +} + +type Mapping map[string]struct { + Mappings map[string]struct { + Properties map[string]struct { + Type string `json:"type"` + Fields interface{} `json:"fields"` + } `json:"properties"` + } `json:"mappings"` +} + func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) { req, err := http.NewRequest(method, url, body) if err != nil { @@ -249,6 +263,34 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string] return errors.Trace(err) } +func (c *Client) GetMapping(index string, docType string) (*MappingResponse, error){ + reqUrl := fmt.Sprintf("http://%s/%s/%s/_mapping", c.Addr, + url.QueryEscape(index), + url.QueryEscape(docType)) + buf := bytes.NewBuffer(nil) + resp, err := c.DoRequest("GET", reqUrl, buf) + + if err != nil { + return nil, errors.Trace(err) + } + + defer resp.Body.Close() + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Trace(err) + } + + ret := new(MappingResponse) + err = json.Unmarshal(data, &ret.Mapping) + if err != nil { + return nil, errors.Trace(err) + } + + ret.Code = resp.StatusCode + return ret, errors.Trace(err) +} + func (c *Client) DeleteIndex(index string) error { reqUrl := fmt.Sprintf("http://%s/%s", c.Addr, url.QueryEscape(index)) diff --git a/river/river_test.go b/river/river_test.go index 98e11636..1cb177c5 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -10,6 +10,7 @@ import ( . "github.com/pingcap/check" "github.com/siddontang/go-mysql-elasticsearch/elastic" "github.com/siddontang/go-mysql/client" + "github.com/siddontang/go-mysql/mysql" ) var my_addr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr") @@ -42,6 +43,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { tenum ENUM("e1", "e2", "e3"), tset SET("a", "b", "c"), tbit BIT(1) default 1, + tdatetime DATETIME DEFAULT NULL, PRIMARY KEY(id)) ENGINE=INNODB; ` @@ -214,6 +216,9 @@ func (s *riverTestSuite) testPrepareData(c *C) { table := fmt.Sprintf("test_river_%04d", i) s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c") } + + datetime := time.Now().Format(mysql.TimeFormat) + s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime) VALUES (?, ?, ?, ?, ?, ?)", 16, "test datetime", "hello go 16", "e1", "a,b", datetime) } func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response { @@ -226,6 +231,17 @@ func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response { return r } +func (s *riverTestSuite) testElasticMapping(c *C) *elastic.MappingResponse { + index := "river" + docType := "river" + + r, err := s.r.es.GetMapping(index, docType) + c.Assert(err, IsNil) + + c.Assert(r.Mapping[index].Mappings[docType].Properties["tdatetime"].Type, Equals, "date") + return r +} + func testWaitSyncDone(c *C, r *River) { <-r.canal.WaitDumpDone() @@ -250,6 +266,10 @@ func (s *riverTestSuite) TestRiver(c *C) { testWaitSyncDone(c, s.r) + var mr *elastic.MappingResponse + mr = s.testElasticMapping(c) + c.Assert(mr.Code, Equals, 200) + var r *elastic.Response r = s.testElasticGet(c, "1") c.Assert(r.Found, Equals, true) diff --git a/river/sync.go b/river/sync.go index 8d556b1b..01c0c586 100644 --- a/river/sync.go +++ b/river/sync.go @@ -311,6 +311,12 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in if err == nil && f != nil { return f } + case schema.TYPE_DATETIME: + switch v := value.(type) { + case string: + vt, _ := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local) + return vt.Format(time.RFC3339) + } } return value