Skip to content

Commit e14f509

Browse files
hentersiddontang
authored andcommitted
support TYPE_DATETIME to es date (RFC3339 UTC) (#143)
1 parent 9b77acc commit e14f509

File tree

3 files changed

+68
-0
lines changed

3 files changed

+68
-0
lines changed

elastic/client.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,20 @@ type BulkResponseItem struct {
144144
Found bool `json:"found"`
145145
}
146146

147+
type MappingResponse struct {
148+
Code int
149+
Mapping Mapping
150+
}
151+
152+
type Mapping map[string]struct {
153+
Mappings map[string]struct {
154+
Properties map[string]struct {
155+
Type string `json:"type"`
156+
Fields interface{} `json:"fields"`
157+
} `json:"properties"`
158+
} `json:"mappings"`
159+
}
160+
147161
func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) {
148162
req, err := http.NewRequest(method, url, body)
149163
req.Header.Add("Content-Type", "application/json")
@@ -250,6 +264,34 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string]
250264
return errors.Trace(err)
251265
}
252266

267+
func (c *Client) GetMapping(index string, docType string) (*MappingResponse, error){
268+
reqUrl := fmt.Sprintf("http://%s/%s/%s/_mapping", c.Addr,
269+
url.QueryEscape(index),
270+
url.QueryEscape(docType))
271+
buf := bytes.NewBuffer(nil)
272+
resp, err := c.DoRequest("GET", reqUrl, buf)
273+
274+
if err != nil {
275+
return nil, errors.Trace(err)
276+
}
277+
278+
defer resp.Body.Close()
279+
280+
data, err := ioutil.ReadAll(resp.Body)
281+
if err != nil {
282+
return nil, errors.Trace(err)
283+
}
284+
285+
ret := new(MappingResponse)
286+
err = json.Unmarshal(data, &ret.Mapping)
287+
if err != nil {
288+
return nil, errors.Trace(err)
289+
}
290+
291+
ret.Code = resp.StatusCode
292+
return ret, errors.Trace(err)
293+
}
294+
253295
func (c *Client) DeleteIndex(index string) error {
254296
reqUrl := fmt.Sprintf("http://%s/%s", c.Addr,
255297
url.QueryEscape(index))

river/river_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
. "github.com/pingcap/check"
1111
"github.com/siddontang/go-mysql-elasticsearch/elastic"
1212
"github.com/siddontang/go-mysql/client"
13+
"github.com/siddontang/go-mysql/mysql"
1314
)
1415

1516
var my_addr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr")
@@ -42,6 +43,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) {
4243
tenum ENUM("e1", "e2", "e3"),
4344
tset SET("a", "b", "c"),
4445
tbit BIT(1) default 1,
46+
tdatetime DATETIME DEFAULT NULL,
4547
PRIMARY KEY(id)) ENGINE=INNODB;
4648
`
4749

@@ -214,6 +216,9 @@ func (s *riverTestSuite) testPrepareData(c *C) {
214216
table := fmt.Sprintf("test_river_%04d", i)
215217
s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c")
216218
}
219+
220+
datetime := time.Now().Format(mysql.TimeFormat)
221+
s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime) VALUES (?, ?, ?, ?, ?, ?)", 16, "test datetime", "hello go 16", "e1", "a,b", datetime)
217222
}
218223

219224
func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response {
@@ -226,6 +231,17 @@ func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response {
226231
return r
227232
}
228233

234+
func (s *riverTestSuite) testElasticMapping(c *C) *elastic.MappingResponse {
235+
index := "river"
236+
docType := "river"
237+
238+
r, err := s.r.es.GetMapping(index, docType)
239+
c.Assert(err, IsNil)
240+
241+
c.Assert(r.Mapping[index].Mappings[docType].Properties["tdatetime"].Type, Equals, "date")
242+
return r
243+
}
244+
229245
func testWaitSyncDone(c *C, r *River) {
230246
<-r.canal.WaitDumpDone()
231247

@@ -250,6 +266,10 @@ func (s *riverTestSuite) TestRiver(c *C) {
250266

251267
testWaitSyncDone(c, s.r)
252268

269+
var mr *elastic.MappingResponse
270+
mr = s.testElasticMapping(c)
271+
c.Assert(mr.Code, Equals, 200)
272+
253273
var r *elastic.Response
254274
r = s.testElasticGet(c, "1")
255275
c.Assert(r.Found, Equals, true)

river/sync.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,12 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in
311311
if err == nil && f != nil {
312312
return f
313313
}
314+
case schema.TYPE_DATETIME:
315+
switch v := value.(type) {
316+
case string:
317+
vt, _ := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local)
318+
return vt.Format(time.RFC3339)
319+
}
314320
}
315321

316322
return value

0 commit comments

Comments
 (0)