diff --git a/elastic/client_test.go b/elastic/client_test.go index 74214317..00b72ee5 100644 --- a/elastic/client_test.go +++ b/elastic/client_test.go @@ -53,6 +53,7 @@ func (s *elasticTestSuite) TestSimple(c *C) { r, err := s.c.Get(index, docType, "1") c.Assert(err, IsNil) + c.Assert(r.Code, Equals, 200) c.Assert(r.ID, Equals, "1") err = s.c.Delete(index, docType, "1") @@ -78,6 +79,7 @@ func (s *elasticTestSuite) TestSimple(c *C) { resp, err := s.c.IndexTypeBulk(index, docType, items) c.Assert(err, IsNil) + c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) for i := 0; i < 10; i++ { @@ -90,6 +92,7 @@ func (s *elasticTestSuite) TestSimple(c *C) { resp, err = s.c.IndexTypeBulk(index, docType, items) c.Assert(err, IsNil) + c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) } @@ -97,9 +100,18 @@ func (s *elasticTestSuite) TestSimple(c *C) { func (s *elasticTestSuite) TestParent(c *C) { index := "dummy" docType := "comment" + ParentType := "parent" + mapping := map[string]interface{}{ + docType: map[string]interface{}{ + "_parent": map[string]string{"type": ParentType}, + }, + } + err := s.c.CreateMapping(index, docType, mapping) + c.Assert(err, IsNil) + items := make([]*BulkRequest, 10) - + for i := 0; i < 10; i++ { id := fmt.Sprintf("%d", i) req := new(BulkRequest) @@ -112,16 +124,20 @@ func (s *elasticTestSuite) TestParent(c *C) { resp, err := s.c.IndexTypeBulk(index, docType, items) c.Assert(err, IsNil) + c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) - for i := 0; i < 10; i++ { id := fmt.Sprintf("%d", i) req := new(BulkRequest) + req.Index = index + req.Type = docType req.Action = ActionDelete req.ID = id + req.Parent = "1" items[i] = req } - resp, err = s.c.IndexTypeBulk(index, docType, items) + resp, err = s.c.Bulk(items) c.Assert(err, IsNil) + c.Assert(resp.Code, Equals, 200) c.Assert(resp.Errors, Equals, false) } diff --git a/etc/river.toml b/etc/river.toml index 9b02ac02..0b9d5d68 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -3,6 +3,7 @@ my_addr = "127.0.0.1:3306" my_user = "root" my_pass = "" +my_charset = "utf8" # Elasticsearch address es_addr = "127.0.0.1:9200" diff --git a/glide.lock b/glide.lock index 3c6a5f9a..c1c7fe56 100644 --- a/glide.lock +++ b/glide.lock @@ -16,7 +16,7 @@ imports: - ioutil2 - sync2 - name: github.com/siddontang/go-mysql - version: 3fef0652795ca3a9d47e4af1a2f6d885e574e9bb + version: ead11cac47bd127a8c667efa07f171a9143d8a25 subpackages: - canal - client diff --git a/glide.yaml b/glide.yaml index 888c5acc..e84692f4 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,7 +7,7 @@ import: - package: github.com/satori/go.uuid version: ^1.1.0 - package: github.com/siddontang/go-mysql - version: 3fef0652795ca3a9d47e4af1a2f6d885e574e9bb + version: ead11cac47bd127a8c667efa07f171a9143d8a25 subpackages: - canal - client diff --git a/river/config.go b/river/config.go index c21687ca..c6f626f3 100644 --- a/river/config.go +++ b/river/config.go @@ -17,6 +17,7 @@ type Config struct { MyAddr string `toml:"my_addr"` MyUser string `toml:"my_user"` MyPassword string `toml:"my_pass"` + MyCharset string `toml:"my_charset"` ESAddr string `toml:"es_addr"` diff --git a/river/river.go b/river/river.go index 1e056002..61e7121f 100644 --- a/river/river.go +++ b/river/river.go @@ -79,6 +79,7 @@ func (r *River) newCanal() error { cfg.Addr = r.c.MyAddr cfg.User = r.c.MyUser cfg.Password = r.c.MyPassword + cfg.Charset = r.c.MyCharset cfg.Flavor = r.c.Flavor cfg.ServerID = r.c.ServerID @@ -234,6 +235,7 @@ func (r *River) prepareRule() error { if len(rule.TableInfo.PKColumns) == 0 { return errors.Errorf("%s.%s must have a PK for a column", rule.Schema, rule.Table) } + } return nil diff --git a/river/river_test.go b/river/river_test.go index 38184da9..0cc7e28d 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -59,6 +59,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { cfg.MyAddr = *my_addr cfg.MyUser = "root" cfg.MyPassword = "" + cfg.MyCharset = "utf8" cfg.ESAddr = *es_addr cfg.ServerID = 1001 @@ -121,7 +122,7 @@ func (s *riverTestSuite) TestConfig(c *C) { my_addr = "127.0.0.1:3306" my_user = "root" my_pass = "" - +my_charset = "utf8" es_addr = "127.0.0.1:9200" data_dir = "./var" diff --git a/vendor/github.com/siddontang/go-mysql/canal/canal.go b/vendor/github.com/siddontang/go-mysql/canal/canal.go index 4d668776..aa5cd65d 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/canal.go +++ b/vendor/github.com/siddontang/go-mysql/canal/canal.go @@ -101,6 +101,9 @@ func (c *Canal) prepareDumper() error { c.dumper.AddTables(tableDB, tables...) } + charset := c.cfg.Charset + c.dumper.SetCharset(charset) + for _, ignoreTable := range c.cfg.Dump.IgnoreTables { if seps := strings.Split(ignoreTable, ","); len(seps) == 2 { c.dumper.AddIgnoreTables(seps[0], seps[1]) @@ -266,6 +269,7 @@ func (c *Canal) prepareSyncer() error { Port: uint16(port), User: c.cfg.User, Password: c.cfg.Password, + Charset : c.cfg.Charset, } c.syncer = replication.NewBinlogSyncer(&cfg) diff --git a/vendor/github.com/siddontang/go-mysql/canal/config.go b/vendor/github.com/siddontang/go-mysql/canal/config.go index 50606c8c..8f01d321 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/config.go +++ b/vendor/github.com/siddontang/go-mysql/canal/config.go @@ -4,7 +4,7 @@ import ( "io/ioutil" "math/rand" "time" - + "github.com/siddontang/go-mysql/mysql" "github.com/BurntSushi/toml" "github.com/juju/errors" ) @@ -19,7 +19,7 @@ type DumpConfig struct { TableDB string `toml:"table_db"` Databases []string `toml:"dbs"` - + // Ignore table format is db.table IgnoreTables []string `toml:"ignore_tables"` @@ -32,6 +32,7 @@ type Config struct { User string `toml:"user"` Password string `toml:"password"` + Charset string `toml:"charset"` ServerID uint32 `toml:"server_id"` Flavor string `toml:"flavor"` @@ -65,6 +66,7 @@ func NewDefaultConfig() *Config { c.User = "root" c.Password = "" + c.Charset = mysql.DEFAULT_CHARSET rand.Seed(time.Now().Unix()) c.ServerID = uint32(rand.Intn(1000)) + 1001 diff --git a/vendor/github.com/siddontang/go-mysql/dump/dump.go b/vendor/github.com/siddontang/go-mysql/dump/dump.go index a6ff2091..da3e5b3b 100644 --- a/vendor/github.com/siddontang/go-mysql/dump/dump.go +++ b/vendor/github.com/siddontang/go-mysql/dump/dump.go @@ -6,7 +6,7 @@ import ( "os" "os/exec" "strings" - + . "github.com/siddontang/go-mysql/mysql" "github.com/juju/errors" ) @@ -25,6 +25,8 @@ type Dumper struct { Databases []string + Charset string + IgnoreTables map[string][]string ErrOut io.Writer @@ -47,6 +49,7 @@ func NewDumper(executionPath string, addr string, user string, password string) d.Password = password d.Tables = make([]string, 0, 16) d.Databases = make([]string, 0, 16) + d.Charset = DEFAULT_CHARSET d.IgnoreTables = make(map[string][]string) d.ErrOut = os.Stderr @@ -54,6 +57,10 @@ func NewDumper(executionPath string, addr string, user string, password string) return d, nil } +func (d *Dumper) SetCharset(charset string) { + d.Charset = charset +} + func (d *Dumper) SetErrOut(o io.Writer) { d.ErrOut = o } @@ -133,6 +140,10 @@ func (d *Dumper) Dump(w io.Writer) error { w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB))) } + if len(d.Charset) != 0 { + args = append(args, fmt.Sprintf("--default-character-set=%s", d.Charset)) + } + cmd := exec.Command(d.ExecutionPath, args...) cmd.Stderr = d.ErrOut diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go index b85e7cd2..8f69814a 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go @@ -40,6 +40,9 @@ type BinlogSyncerConfig struct { // If not set, use os.Hostname() instead. Localhost string + // Charset is for MySQL client character set + Charset string + // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool @@ -52,6 +55,8 @@ type BinlogSyncerConfig struct { // Use replication.Time structure for timestamp and datetime. // We will use Local location for timestamp and UTC location for datatime. ParseTime bool + + LogLevel string } // BinlogSyncer syncs binlog event from server. @@ -76,6 +81,11 @@ type BinlogSyncer struct { // NewBinlogSyncer creates the BinlogSyncer with cfg. func NewBinlogSyncer(cfg *BinlogSyncerConfig) *BinlogSyncer { + if cfg.LogLevel == "" { + cfg.LogLevel = "info" + } + log.SetLevelByString(cfg.LogLevel) + log.Infof("create BinlogSyncer with config %v", cfg) b := new(BinlogSyncer) @@ -143,6 +153,9 @@ func (b *BinlogSyncer) registerSlave() error { if err != nil { return errors.Trace(err) } + if len(b.cfg.Charset) != 0 { + b.c.SetCharset(b.cfg.Charset) + } //for mysql 5.6+, binlog has a crc32 checksum //before mysql 5.6, this will not work, don't matter.:-) @@ -556,7 +569,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { data = data[2:] } - e, err := b.parser.parse(data) + e, err := b.parser.Parse(data) if err != nil { return errors.Trace(err) } diff --git a/vendor/github.com/siddontang/go-mysql/replication/parser.go b/vendor/github.com/siddontang/go-mysql/replication/parser.go index 6f727295..056a17b5 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/parser.go +++ b/vendor/github.com/siddontang/go-mysql/replication/parser.go @@ -56,10 +56,10 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) return errors.Errorf("seek %s to %d error %v", name, offset, err) } - return p.parseReader(f, onEvent) + return p.ParseReader(f, onEvent) } -func (p *BinlogParser) parseReader(r io.Reader, onEvent OnEventFunc) error { +func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error { p.Reset() var err error @@ -212,7 +212,13 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte) (Event, error) { return e, nil } -func (p *BinlogParser) parse(data []byte) (*BinlogEvent, error) { +// Given the bytes for a a binary log event: return the decoded event. +// With the exception of the FORMAT_DESCRIPTION_EVENT event type +// there must have previously been passed a FORMAT_DESCRIPTION_EVENT +// into the parser for this to work properly on any given event. +// Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace +// an existing one. +func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) { rawData := data h, err := p.parseHeader(data)