diff --git a/elastic/client_test.go b/elastic/client_test.go index a9650b98..e73084b6 100644 --- a/elastic/client_test.go +++ b/elastic/client_test.go @@ -5,7 +5,7 @@ import ( "fmt" "testing" - . "gopkg.in/check.v1" + . "github.com/pingcap/check" ) var host = flag.String("host", "127.0.0.1", "Elasticsearch host") @@ -104,15 +104,15 @@ func (s *elasticTestSuite) TestParent(c *C) { ParentType := "parent" mapping := map[string]interface{}{ - docType: map[string]interface{}{ - "_parent": map[string]string{"type": ParentType}, + docType: map[string]interface{}{ + "_parent": map[string]string{"type": ParentType}, }, } - err := s.c.CreateMapping(index, docType, mapping) + err := s.c.CreateMapping(index, docType, mapping) c.Assert(err, IsNil) - + items := make([]*BulkRequest, 10) - + for i := 0; i < 10; i++ { id := fmt.Sprintf("%d", i) req := new(BulkRequest) diff --git a/etc/river.toml b/etc/river.toml index 553b0062..fed3d3e5 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -29,6 +29,10 @@ flavor = "mysql" # if not set or empty, ignore mysqldump. mysqldump = "mysqldump" +# if we have no privilege to use mysqldump with --master-data, +# we must skip it. +#skip_master_data = false + # minimal items to be inserted in one bulk bulk_size = 128 diff --git a/glide.lock b/glide.lock index c1c7fe56..f29f5cfb 100644 --- a/glide.lock +++ b/glide.lock @@ -1,12 +1,14 @@ -hash: c686d0a27cf308a19feac59dedb96533a1ccc93d9fdb126199fec8b495c6de4b -updated: 2017-04-18T22:33:52.637105842+08:00 +hash: 15e32db533c048351476925d28ad5c486e652ff31e71fa5445c791ccce1f815e +updated: 2017-07-15T21:44:41.838612712+08:00 imports: - name: github.com/BurntSushi/toml version: 056c9bc7be7190eaa7715723883caffa5f8fa3e4 - name: github.com/juju/errors version: 6f54ff6318409d31ff16261533ce2c8381a4fd5d - name: github.com/ngaut/log - version: cec23d3e10b016363780d894a0eb732a12c06e02 + version: d2af3a61f64d093457fb23b25d20f4ce3cd551ce +- name: github.com/pingcap/check + version: ce8a2f822ab1e245a4eefcef2996531c79c943f1 - name: github.com/satori/go.uuid version: 879c5887cd475cd7864858769793b2ceb0d44feb - name: github.com/siddontang/go @@ -16,7 +18,7 @@ imports: - ioutil2 - sync2 - name: github.com/siddontang/go-mysql - version: ead11cac47bd127a8c667efa07f171a9143d8a25 + version: 96156dbfdf7556c67eb0889781c072f93295078d subpackages: - canal - client @@ -26,9 +28,7 @@ imports: - replication - schema - name: golang.org/x/net - version: 6acef71eb69611914f7a30939ea9f6e194c78172 + version: f01ecb60fe3835d80d9a0b7b2bf24b228c89260e subpackages: - context -- name: gopkg.in/check.v1 - version: 11d3bc7aa68e238947792f30573146a3231fc0f1 testImports: [] diff --git a/glide.yaml b/glide.yaml index e84692f4..19b60262 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,7 +7,7 @@ import: - package: github.com/satori/go.uuid version: ^1.1.0 - package: github.com/siddontang/go-mysql - version: ead11cac47bd127a8c667efa07f171a9143d8a25 + version: 96156dbfdf7556c67eb0889781c072f93295078d subpackages: - canal - client @@ -22,5 +22,5 @@ import: - hack - ioutil2 - sync2 -- package: gopkg.in/check.v1 - version: 11d3bc7aa68e238947792f30573146a3231fc0f1 +- package: github.com/pingcap/check + version: ce8a2f822ab1e245a4eefcef2996531c79c943f1 diff --git a/river/config.go b/river/config.go index a5777dca..9d9fcd68 100644 --- a/river/config.go +++ b/river/config.go @@ -29,7 +29,8 @@ type Config struct { Flavor string `toml:"flavor"` DataDir string `toml:"data_dir"` - DumpExec string `toml:"mysqldump"` + DumpExec string `toml:"mysqldump"` + SkipMasterData bool `toml:"skip_master_data"` Sources []SourceConfig `toml:"source"` diff --git a/river/river.go b/river/river.go index d60507b4..aefab5ef 100644 --- a/river/river.go +++ b/river/river.go @@ -72,7 +72,6 @@ func NewRiver(c *Config) (*River, error) { cfg.Password = r.c.ESPassword r.es = elastic.NewClient(cfg) - r.st = &stat{r: r} go r.st.Run(r.c.StatAddr) @@ -90,6 +89,7 @@ func (r *River) newCanal() error { cfg.ServerID = r.c.ServerID cfg.Dump.ExecutionPath = r.c.DumpExec cfg.Dump.DiscardErr = false + cfg.Dump.SkipMasterData = r.c.SkipMasterData var err error r.canal, err = canal.NewCanal(cfg) diff --git a/river/river_extra_test.go b/river/river_extra_test.go index 84b4c894..e2d47481 100644 --- a/river/river_extra_test.go +++ b/river/river_extra_test.go @@ -7,7 +7,7 @@ import ( "os" "time" - . "gopkg.in/check.v1" + . "github.com/pingcap/check" ) func (s *riverTestSuite) setupExtra(c *C) (r *River) { diff --git a/river/river_test.go b/river/river_test.go index 5c555a24..98e11636 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -7,9 +7,9 @@ import ( "testing" "time" + . "github.com/pingcap/check" "github.com/siddontang/go-mysql-elasticsearch/elastic" "github.com/siddontang/go-mysql/client" - . "gopkg.in/check.v1" ) var my_addr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr") @@ -110,9 +110,9 @@ func (s *riverTestSuite) SetUpSuite(c *C) { }, &Rule{Schema: "test", - Table: "test_for_json", - Index: "river", - Type: "river", + Table: "test_for_json", + Index: "river", + Type: "river", }, } @@ -263,11 +263,11 @@ func (s *riverTestSuite) TestRiver(c *C) { c.Assert(r.Found, Equals, true) switch v := r.Source["info"].(type) { case map[string]interface{}: - c.Assert(v["first"], Equals, "a") - c.Assert(v["second"], Equals, "b") + c.Assert(v["first"], Equals, "a") + c.Assert(v["second"], Equals, "b") default: c.Assert(v, Equals, nil) - c.Assert(true, Equals, false) + c.Assert(true, Equals, false) } r = s.testElasticGet(c, "100") diff --git a/vendor/github.com/ngaut/log/crash_darwin.go b/vendor/github.com/ngaut/log/crash_darwin.go new file mode 100644 index 00000000..14ce5590 --- /dev/null +++ b/vendor/github.com/ngaut/log/crash_darwin.go @@ -0,0 +1,18 @@ +// +build darwin + +package log + +import ( + "log" + "os" + "syscall" +) + +func CrashLog(file string) { + f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Println(err.Error()) + } else { + syscall.Dup2(int(f.Fd()), 2) + } +} diff --git a/vendor/github.com/ngaut/log/crash_unix.go b/vendor/github.com/ngaut/log/crash_unix.go index 37f407de..5b9bc23a 100644 --- a/vendor/github.com/ngaut/log/crash_unix.go +++ b/vendor/github.com/ngaut/log/crash_unix.go @@ -1,4 +1,4 @@ -// +build freebsd openbsd netbsd dragonfly darwin linux +// +build freebsd openbsd netbsd dragonfly linux package log @@ -13,6 +13,6 @@ func CrashLog(file string) { if err != nil { log.Println(err.Error()) } else { - syscall.Dup2(int(f.Fd()), 2) + syscall.Dup3(int(f.Fd()), 2, 0) } } diff --git a/vendor/gopkg.in/check.v1/benchmark.go b/vendor/github.com/pingcap/check/benchmark.go similarity index 100% rename from vendor/gopkg.in/check.v1/benchmark.go rename to vendor/github.com/pingcap/check/benchmark.go diff --git a/vendor/gopkg.in/check.v1/check.go b/vendor/github.com/pingcap/check/check.go similarity index 100% rename from vendor/gopkg.in/check.v1/check.go rename to vendor/github.com/pingcap/check/check.go diff --git a/vendor/gopkg.in/check.v1/checkers.go b/vendor/github.com/pingcap/check/checkers.go similarity index 100% rename from vendor/gopkg.in/check.v1/checkers.go rename to vendor/github.com/pingcap/check/checkers.go diff --git a/vendor/github.com/pingcap/check/checkers2.go b/vendor/github.com/pingcap/check/checkers2.go new file mode 100644 index 00000000..c09bcdc5 --- /dev/null +++ b/vendor/github.com/pingcap/check/checkers2.go @@ -0,0 +1,131 @@ +// Extensions to the go-check unittest framework. +// +// NOTE: see https://github.com/go-check/check/pull/6 for reasons why these +// checkers live here. +package check + +import ( + "bytes" + "reflect" +) + +// ----------------------------------------------------------------------- +// IsTrue / IsFalse checker. + +type isBoolValueChecker struct { + *CheckerInfo + expected bool +} + +func (checker *isBoolValueChecker) Check( + params []interface{}, + names []string) ( + result bool, + error string) { + + obtained, ok := params[0].(bool) + if !ok { + return false, "Argument to " + checker.Name + " must be bool" + } + + return obtained == checker.expected, "" +} + +// The IsTrue checker verifies that the obtained value is true. +// +// For example: +// +// c.Assert(value, IsTrue) +// +var IsTrue Checker = &isBoolValueChecker{ + &CheckerInfo{Name: "IsTrue", Params: []string{"obtained"}}, + true, +} + +// The IsFalse checker verifies that the obtained value is false. +// +// For example: +// +// c.Assert(value, IsFalse) +// +var IsFalse Checker = &isBoolValueChecker{ + &CheckerInfo{Name: "IsFalse", Params: []string{"obtained"}}, + false, +} + +// ----------------------------------------------------------------------- +// BytesEquals checker. + +type bytesEquals struct{} + +func (b *bytesEquals) Check(params []interface{}, names []string) (bool, string) { + if len(params) != 2 { + return false, "BytesEqual takes 2 bytestring arguments" + } + b1, ok1 := params[0].([]byte) + b2, ok2 := params[1].([]byte) + + if !(ok1 && ok2) { + return false, "Arguments to BytesEqual must both be bytestrings" + } + + return bytes.Equal(b1, b2), "" +} + +func (b *bytesEquals) Info() *CheckerInfo { + return &CheckerInfo{ + Name: "BytesEquals", + Params: []string{"bytes_one", "bytes_two"}, + } +} + +// BytesEquals checker compares two bytes sequence using bytes.Equal. +// +// For example: +// +// c.Assert(b, BytesEquals, []byte("bar")) +// +// Main difference between DeepEquals and BytesEquals is that BytesEquals treats +// `nil` as empty byte sequence while DeepEquals doesn't. +// +// c.Assert(nil, BytesEquals, []byte("")) // succeeds +// c.Assert(nil, DeepEquals, []byte("")) // fails +var BytesEquals = &bytesEquals{} + +// ----------------------------------------------------------------------- +// HasKey checker. + +type hasKey struct{} + +func (h *hasKey) Check(params []interface{}, names []string) (bool, string) { + if len(params) != 2 { + return false, "HasKey takes 2 arguments: a map and a key" + } + + mapValue := reflect.ValueOf(params[0]) + if mapValue.Kind() != reflect.Map { + return false, "First argument to HasKey must be a map" + } + + keyValue := reflect.ValueOf(params[1]) + if !keyValue.Type().AssignableTo(mapValue.Type().Key()) { + return false, "Second argument must be assignable to the map key type" + } + + return mapValue.MapIndex(keyValue).IsValid(), "" +} + +func (h *hasKey) Info() *CheckerInfo { + return &CheckerInfo{ + Name: "HasKey", + Params: []string{"obtained", "key"}, + } +} + +// The HasKey checker verifies that the obtained map contains the given key. +// +// For example: +// +// c.Assert(myMap, HasKey, "foo") +// +var HasKey = &hasKey{} diff --git a/vendor/github.com/pingcap/check/compare.go b/vendor/github.com/pingcap/check/compare.go new file mode 100644 index 00000000..7005cba7 --- /dev/null +++ b/vendor/github.com/pingcap/check/compare.go @@ -0,0 +1,161 @@ +package check + +import ( + "bytes" + "fmt" + "reflect" + "time" +) + +type compareFunc func(v1 interface{}, v2 interface{}) (bool, error) + +type valueCompare struct { + Name string + + Func compareFunc + + Operator string +} + +// v1 and v2 must have the same type +// return >0 if v1 > v2 +// return 0 if v1 = v2 +// return <0 if v1 < v2 +// now we only support int, uint, float64, string and []byte comparison +func compare(v1 interface{}, v2 interface{}) (int, error) { + value1 := reflect.ValueOf(v1) + value2 := reflect.ValueOf(v2) + + switch v1.(type) { + case int, int8, int16, int32, int64: + a1 := value1.Int() + a2 := value2.Int() + if a1 > a2 { + return 1, nil + } else if a1 == a2 { + return 0, nil + } + return -1, nil + case uint, uint8, uint16, uint32, uint64: + a1 := value1.Uint() + a2 := value2.Uint() + if a1 > a2 { + return 1, nil + } else if a1 == a2 { + return 0, nil + } + return -1, nil + case float32, float64: + a1 := value1.Float() + a2 := value2.Float() + if a1 > a2 { + return 1, nil + } else if a1 == a2 { + return 0, nil + } + return -1, nil + case string: + a1 := value1.String() + a2 := value2.String() + if a1 > a2 { + return 1, nil + } else if a1 == a2 { + return 0, nil + } + return -1, nil + case []byte: + a1 := value1.Bytes() + a2 := value2.Bytes() + return bytes.Compare(a1, a2), nil + case time.Time: + a1 := v1.(time.Time) + a2 := v2.(time.Time) + if a1.After(a2) { + return 1, nil + } else if a1.Equal(a2) { + return 0, nil + } + return -1, nil + case time.Duration: + a1 := v1.(time.Duration) + a2 := v2.(time.Duration) + if a1 > a2 { + return 1, nil + } else if a1 == a2 { + return 0, nil + } + return -1, nil + default: + return 0, fmt.Errorf("type %T is not supported now", v1) + } +} + +func less(v1 interface{}, v2 interface{}) (bool, error) { + n, err := compare(v1, v2) + if err != nil { + return false, err + } + + return n < 0, nil +} + +func lessEqual(v1 interface{}, v2 interface{}) (bool, error) { + n, err := compare(v1, v2) + if err != nil { + return false, err + } + + return n <= 0, nil +} + +func greater(v1 interface{}, v2 interface{}) (bool, error) { + n, err := compare(v1, v2) + if err != nil { + return false, err + } + + return n > 0, nil +} + +func greaterEqual(v1 interface{}, v2 interface{}) (bool, error) { + n, err := compare(v1, v2) + if err != nil { + return false, err + } + + return n >= 0, nil +} + +func (v *valueCompare) Check(params []interface{}, names []string) (bool, string) { + if len(params) != 2 { + return false, fmt.Sprintf("%s needs 2 arguments", v.Name) + } + + v1 := params[0] + v2 := params[1] + v1Type := reflect.TypeOf(v1) + v2Type := reflect.TypeOf(v2) + + if v1Type.Kind() != v2Type.Kind() { + return false, fmt.Sprintf("%s needs two same type, but %s != %s", v.Name, v1Type.Kind(), v2Type.Kind()) + } + + b, err := v.Func(v1, v2) + if err != nil { + return false, fmt.Sprintf("%s check err %v", v.Name, err) + } + + return b, "" +} + +func (v *valueCompare) Info() *CheckerInfo { + return &CheckerInfo{ + Name: v.Name, + Params: []string{"compare_one", "compare_two"}, + } +} + +var Less = &valueCompare{Name: "Less", Func: less, Operator: "<"} +var LessEqual = &valueCompare{Name: "LessEqual", Func: lessEqual, Operator: "<="} +var Greater = &valueCompare{Name: "Greater", Func: greater, Operator: ">"} +var GreaterEqual = &valueCompare{Name: "GreaterEqual", Func: greaterEqual, Operator: ">="} diff --git a/vendor/gopkg.in/check.v1/helpers.go b/vendor/github.com/pingcap/check/helpers.go similarity index 100% rename from vendor/gopkg.in/check.v1/helpers.go rename to vendor/github.com/pingcap/check/helpers.go diff --git a/vendor/gopkg.in/check.v1/printer.go b/vendor/github.com/pingcap/check/printer.go similarity index 100% rename from vendor/gopkg.in/check.v1/printer.go rename to vendor/github.com/pingcap/check/printer.go diff --git a/vendor/gopkg.in/check.v1/run.go b/vendor/github.com/pingcap/check/run.go similarity index 100% rename from vendor/gopkg.in/check.v1/run.go rename to vendor/github.com/pingcap/check/run.go diff --git a/vendor/github.com/siddontang/go-mysql/canal/canal.go b/vendor/github.com/siddontang/go-mysql/canal/canal.go index aa5cd65d..d7685b9f 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/canal.go +++ b/vendor/github.com/siddontang/go-mysql/canal/canal.go @@ -101,8 +101,10 @@ func (c *Canal) prepareDumper() error { c.dumper.AddTables(tableDB, tables...) } - charset := c.cfg.Charset - c.dumper.SetCharset(charset) + charset := c.cfg.Charset + c.dumper.SetCharset(charset) + + c.dumper.SkipMasterData(c.cfg.Dump.SkipMasterData) for _, ignoreTable := range c.cfg.Dump.IgnoreTables { if seps := strings.Split(ignoreTable, ","); len(seps) == 2 { @@ -269,7 +271,7 @@ func (c *Canal) prepareSyncer() error { Port: uint16(port), User: c.cfg.User, Password: c.cfg.Password, - Charset : c.cfg.Charset, + Charset: c.cfg.Charset, } c.syncer = replication.NewBinlogSyncer(&cfg) diff --git a/vendor/github.com/siddontang/go-mysql/canal/config.go b/vendor/github.com/siddontang/go-mysql/canal/config.go index 8f01d321..567e6d6d 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/config.go +++ b/vendor/github.com/siddontang/go-mysql/canal/config.go @@ -4,9 +4,10 @@ import ( "io/ioutil" "math/rand" "time" - "github.com/siddontang/go-mysql/mysql" + "github.com/BurntSushi/toml" "github.com/juju/errors" + "github.com/siddontang/go-mysql/mysql" ) type DumpConfig struct { @@ -19,12 +20,16 @@ type DumpConfig struct { TableDB string `toml:"table_db"` Databases []string `toml:"dbs"` - + // Ignore table format is db.table IgnoreTables []string `toml:"ignore_tables"` // If true, discard error msg, else, output to stderr DiscardErr bool `toml:"discard_err"` + + // Set true to skip --master-data if we have no privilege to do + // 'FLUSH TABLES WITH READ LOCK' + SkipMasterData bool `toml:"skip_master_data"` } type Config struct { @@ -74,6 +79,7 @@ func NewDefaultConfig() *Config { c.Dump.ExecutionPath = "mysqldump" c.Dump.DiscardErr = true + c.Dump.SkipMasterData = false return c } diff --git a/vendor/github.com/siddontang/go-mysql/canal/dump.go b/vendor/github.com/siddontang/go-mysql/canal/dump.go index 68d42ce6..5e0d2152 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/dump.go +++ b/vendor/github.com/siddontang/go-mysql/canal/dump.go @@ -106,6 +106,16 @@ func (c *Canal) tryDump() error { h := &dumpParseHandler{c: c} + if c.cfg.Dump.SkipMasterData { + pos, err := c.GetMasterPos() + if err != nil { + return errors.Trace(err) + } + log.Infof("skip master data, get current binlog position %v", pos) + h.name = pos.Name + h.pos = uint64(pos.Pos) + } + start := time.Now() log.Info("try dump MySQL and parse") if err := c.dumper.DumpAndParse(h); err != nil { diff --git a/vendor/github.com/siddontang/go-mysql/canal/sync.go b/vendor/github.com/siddontang/go-mysql/canal/sync.go index e22bd6e8..3e028e96 100644 --- a/vendor/github.com/siddontang/go-mysql/canal/sync.go +++ b/vendor/github.com/siddontang/go-mysql/canal/sync.go @@ -134,14 +134,23 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error { return nil } -func (c *Canal) CatchMasterPos(timeout time.Duration) error { +func (c *Canal) GetMasterPos() (mysql.Position, error) { rr, err := c.Execute("SHOW MASTER STATUS") if err != nil { - return errors.Trace(err) + return mysql.Position{"", 0}, errors.Trace(err) } name, _ := rr.GetString(0, 0) pos, _ := rr.GetInt(0, 1) - return c.WaitUntilPos(mysql.Position{name, uint32(pos)}, timeout) + return mysql.Position{name, uint32(pos)}, nil +} + +func (c *Canal) CatchMasterPos(timeout time.Duration) error { + pos, err := c.GetMasterPos() + if err != nil { + return errors.Trace(err) + } + + return c.WaitUntilPos(pos, timeout) } diff --git a/vendor/github.com/siddontang/go-mysql/dump/dump.go b/vendor/github.com/siddontang/go-mysql/dump/dump.go index da3e5b3b..1a4bfd3e 100644 --- a/vendor/github.com/siddontang/go-mysql/dump/dump.go +++ b/vendor/github.com/siddontang/go-mysql/dump/dump.go @@ -6,8 +6,9 @@ import ( "os" "os/exec" "strings" - . "github.com/siddontang/go-mysql/mysql" + "github.com/juju/errors" + . "github.com/siddontang/go-mysql/mysql" ) // Unlick mysqldump, Dumper is designed for parsing and syning data easily. @@ -30,6 +31,8 @@ type Dumper struct { IgnoreTables map[string][]string ErrOut io.Writer + + masterDataSkipped bool } func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) { @@ -51,6 +54,7 @@ func NewDumper(executionPath string, addr string, user string, password string) d.Databases = make([]string, 0, 16) d.Charset = DEFAULT_CHARSET d.IgnoreTables = make(map[string][]string) + d.masterDataSkipped = false d.ErrOut = os.Stderr @@ -65,6 +69,11 @@ func (d *Dumper) SetErrOut(o io.Writer) { d.ErrOut = o } +// In some cloud MySQL, we have no privilege to use `--master-data`. +func (d *Dumper) SkipMasterData(v bool) { + d.masterDataSkipped = v +} + func (d *Dumper) AddDatabases(dbs ...string) { d.Databases = append(d.Databases, dbs...) } @@ -104,7 +113,10 @@ func (d *Dumper) Dump(w io.Writer) error { args = append(args, fmt.Sprintf("--user=%s", d.User)) args = append(args, fmt.Sprintf("--password=%s", d.Password)) - args = append(args, "--master-data") + if !d.masterDataSkipped { + args = append(args, "--master-data") + } + args = append(args, "--single-transaction") args = append(args, "--skip-lock-tables") @@ -158,7 +170,7 @@ func (d *Dumper) DumpAndParse(h ParseHandler) error { done := make(chan error, 1) go func() { - err := Parse(r, h) + err := Parse(r, h, !d.masterDataSkipped) r.CloseWithError(err) done <- err }() diff --git a/vendor/github.com/siddontang/go-mysql/dump/parser.go b/vendor/github.com/siddontang/go-mysql/dump/parser.go index 69f65c7b..bd2a1577 100644 --- a/vendor/github.com/siddontang/go-mysql/dump/parser.go +++ b/vendor/github.com/siddontang/go-mysql/dump/parser.go @@ -34,7 +34,7 @@ func init() { // Parse the dump data with Dumper generate. // It can not parse all the data formats with mysqldump outputs -func Parse(r io.Reader, h ParseHandler) error { +func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error { rb := bufio.NewReaderSize(r, 1024*16) var db string @@ -50,7 +50,7 @@ func Parse(r io.Reader, h ParseHandler) error { line = line[0 : len(line)-1] - if !binlogParsed { + if parseBinlogPos && !binlogParsed { if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { name := m[0][1] pos, err := strconv.ParseUint(m[0][2], 10, 64) diff --git a/vendor/github.com/siddontang/go-mysql/mysql/mysql_gtid.go b/vendor/github.com/siddontang/go-mysql/mysql/mysql_gtid.go index c54ded09..0f424426 100644 --- a/vendor/github.com/siddontang/go-mysql/mysql/mysql_gtid.go +++ b/vendor/github.com/siddontang/go-mysql/mysql/mysql_gtid.go @@ -291,11 +291,13 @@ type MysqlGTIDSet struct { func ParseMysqlGTIDSet(str string) (GTIDSet, error) { s := new(MysqlGTIDSet) + s.Sets = make(map[string]*UUIDSet) + if str == "" { + return s, nil + } sp := strings.Split(str, ",") - s.Sets = make(map[string]*UUIDSet, len(sp)) - //todo, handle redundant same uuid for i := 0; i < len(sp); i++ { if set, err := ParseUUIDSet(sp[i]); err != nil { @@ -334,6 +336,9 @@ func DecodeMysqlGTIDSet(data []byte) (*MysqlGTIDSet, error) { } func (s *MysqlGTIDSet) AddSet(set *UUIDSet) { + if set == nil { + return + } sid := set.SID.String() o, ok := s.Sets[sid] if ok { diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go index 8f69814a..a06dbd74 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "encoding/binary" "fmt" + "net" "os" "sync" "time" @@ -57,6 +58,9 @@ type BinlogSyncerConfig struct { ParseTime bool LogLevel string + + // RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection. + RecvBufferSize int } // BinlogSyncer syncs binlog event from server. @@ -154,7 +158,13 @@ func (b *BinlogSyncer) registerSlave() error { return errors.Trace(err) } if len(b.cfg.Charset) != 0 { - b.c.SetCharset(b.cfg.Charset) + b.c.SetCharset(b.cfg.Charset) + } + + if b.cfg.RecvBufferSize > 0 { + if tcp, ok := b.c.Conn.Conn.(*net.TCPConn); ok { + tcp.SetReadBuffer(b.cfg.RecvBufferSize) + } } //for mysql 5.6+, binlog has a crc32 checksum diff --git a/vendor/github.com/siddontang/go-mysql/replication/parser.go b/vendor/github.com/siddontang/go-mysql/replication/parser.go index 056a17b5..0e22a187 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/parser.go +++ b/vendor/github.com/siddontang/go-mysql/replication/parser.go @@ -60,8 +60,6 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) } func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error { - p.Reset() - var err error var n int64 @@ -102,7 +100,10 @@ func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error { var e Event e, err = p.parseEvent(h, data) if err != nil { - break + if _, ok := err.(errMissingTableMapEvent); ok { + continue + } + return errors.Trace(err) } if err = onEvent(&BinlogEvent{rawData, h, e}); err != nil { diff --git a/vendor/github.com/siddontang/go-mysql/replication/row_event.go b/vendor/github.com/siddontang/go-mysql/replication/row_event.go index a6032641..7cb8241f 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/row_event.go +++ b/vendor/github.com/siddontang/go-mysql/replication/row_event.go @@ -15,6 +15,8 @@ import ( "github.com/siddontang/go/hack" ) +type errMissingTableMapEvent error + type TableMapEvent struct { tableIDSize int @@ -259,7 +261,11 @@ func (e *RowsEvent) Decode(data []byte) error { var ok bool e.Table, ok = e.tables[e.TableID] if !ok { - return errors.Errorf("invalid table id %d, no correspond table map event", e.TableID) + if len(e.tables) > 0 { + return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID) + } else { + return errMissingTableMapEvent(errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)) + } } var err error diff --git a/vendor/golang.org/x/net/context/context.go b/vendor/golang.org/x/net/context/context.go index 77b64d0c..f143ed6a 100644 --- a/vendor/golang.org/x/net/context/context.go +++ b/vendor/golang.org/x/net/context/context.go @@ -7,7 +7,7 @@ // and between processes. // // Incoming requests to a server should create a Context, and outgoing calls to -// servers should accept a Context. The chain of function calls between must +// servers should accept a Context. The chain of function calls between must // propagate the Context, optionally replacing it with a modified copy created // using WithDeadline, WithTimeout, WithCancel, or WithValue. // @@ -16,14 +16,14 @@ // propagation: // // Do not store Contexts inside a struct type; instead, pass a Context -// explicitly to each function that needs it. The Context should be the first +// explicitly to each function that needs it. The Context should be the first // parameter, typically named ctx: // // func DoSomething(ctx context.Context, arg Arg) error { // // ... use ctx ... // } // -// Do not pass a nil Context, even if a function permits it. Pass context.TODO +// Do not pass a nil Context, even if a function permits it. Pass context.TODO // if you are unsure about which Context to use. // // Use context Values only for request-scoped data that transits processes and @@ -36,12 +36,7 @@ // Contexts. package context // import "golang.org/x/net/context" -import ( - "errors" - "fmt" - "sync" - "time" -) +import "time" // A Context carries a deadline, a cancelation signal, and other values across // API boundaries. @@ -49,13 +44,13 @@ import ( // Context's methods may be called by multiple goroutines simultaneously. type Context interface { // Deadline returns the time when work done on behalf of this context - // should be canceled. Deadline returns ok==false when no deadline is - // set. Successive calls to Deadline return the same results. + // should be canceled. Deadline returns ok==false when no deadline is + // set. Successive calls to Deadline return the same results. Deadline() (deadline time.Time, ok bool) // Done returns a channel that's closed when work done on behalf of this - // context should be canceled. Done may return nil if this context can - // never be canceled. Successive calls to Done return the same value. + // context should be canceled. Done may return nil if this context can + // never be canceled. Successive calls to Done return the same value. // // WithCancel arranges for Done to be closed when cancel is called; // WithDeadline arranges for Done to be closed when the deadline @@ -66,7 +61,7 @@ type Context interface { // // // Stream generates values with DoSomething and sends them to out // // until DoSomething returns an error or ctx.Done is closed. - // func Stream(ctx context.Context, out <-chan Value) error { + // func Stream(ctx context.Context, out chan<- Value) error { // for { // v, err := DoSomething(ctx) // if err != nil { @@ -84,24 +79,24 @@ type Context interface { // a Done channel for cancelation. Done() <-chan struct{} - // Err returns a non-nil error value after Done is closed. Err returns + // Err returns a non-nil error value after Done is closed. Err returns // Canceled if the context was canceled or DeadlineExceeded if the - // context's deadline passed. No other values for Err are defined. + // context's deadline passed. No other values for Err are defined. // After Done is closed, successive calls to Err return the same value. Err() error // Value returns the value associated with this context for key, or nil - // if no value is associated with key. Successive calls to Value with + // if no value is associated with key. Successive calls to Value with // the same key returns the same result. // // Use context values only for request-scoped data that transits // processes and API boundaries, not for passing optional parameters to // functions. // - // A key identifies a specific value in a Context. Functions that wish + // A key identifies a specific value in a Context. Functions that wish // to store values in Context typically allocate a key in a global // variable then use that key as the argument to context.WithValue and - // Context.Value. A key can be any type that supports equality; + // Context.Value. A key can be any type that supports equality; // packages should define keys as an unexported type to avoid // collisions. // @@ -120,7 +115,7 @@ type Context interface { // // This prevents collisions with keys defined in other packages. // type key int // - // // userKey is the key for user.User values in Contexts. It is + // // userKey is the key for user.User values in Contexts. It is // // unexported; clients use user.NewContext and user.FromContext // // instead of using this key directly. // var userKey key = 0 @@ -138,57 +133,15 @@ type Context interface { Value(key interface{}) interface{} } -// Canceled is the error returned by Context.Err when the context is canceled. -var Canceled = errors.New("context canceled") - -// DeadlineExceeded is the error returned by Context.Err when the context's -// deadline passes. -var DeadlineExceeded = errors.New("context deadline exceeded") - -// An emptyCtx is never canceled, has no values, and has no deadline. It is not -// struct{}, since vars of this type must have distinct addresses. -type emptyCtx int - -func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { - return -} - -func (*emptyCtx) Done() <-chan struct{} { - return nil -} - -func (*emptyCtx) Err() error { - return nil -} - -func (*emptyCtx) Value(key interface{}) interface{} { - return nil -} - -func (e *emptyCtx) String() string { - switch e { - case background: - return "context.Background" - case todo: - return "context.TODO" - } - return "unknown empty Context" -} - -var ( - background = new(emptyCtx) - todo = new(emptyCtx) -) - // Background returns a non-nil, empty Context. It is never canceled, has no -// values, and has no deadline. It is typically used by the main function, +// values, and has no deadline. It is typically used by the main function, // initialization, and tests, and as the top-level Context for incoming // requests. func Background() Context { return background } -// TODO returns a non-nil, empty Context. Code should use context.TODO when +// TODO returns a non-nil, empty Context. Code should use context.TODO when // it's unclear which Context to use or it is not yet available (because the // surrounding function has not yet been extended to accept a Context // parameter). TODO is recognized by static analysis tools that determine @@ -201,247 +154,3 @@ func TODO() Context { // A CancelFunc does not wait for the work to stop. // After the first call, subsequent calls to a CancelFunc do nothing. type CancelFunc func() - -// WithCancel returns a copy of parent with a new Done channel. The returned -// context's Done channel is closed when the returned cancel function is called -// or when the parent context's Done channel is closed, whichever happens first. -// -// Canceling this context releases resources associated with it, so code should -// call cancel as soon as the operations running in this Context complete. -func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { - c := newCancelCtx(parent) - propagateCancel(parent, &c) - return &c, func() { c.cancel(true, Canceled) } -} - -// newCancelCtx returns an initialized cancelCtx. -func newCancelCtx(parent Context) cancelCtx { - return cancelCtx{ - Context: parent, - done: make(chan struct{}), - } -} - -// propagateCancel arranges for child to be canceled when parent is. -func propagateCancel(parent Context, child canceler) { - if parent.Done() == nil { - return // parent is never canceled - } - if p, ok := parentCancelCtx(parent); ok { - p.mu.Lock() - if p.err != nil { - // parent has already been canceled - child.cancel(false, p.err) - } else { - if p.children == nil { - p.children = make(map[canceler]bool) - } - p.children[child] = true - } - p.mu.Unlock() - } else { - go func() { - select { - case <-parent.Done(): - child.cancel(false, parent.Err()) - case <-child.Done(): - } - }() - } -} - -// parentCancelCtx follows a chain of parent references until it finds a -// *cancelCtx. This function understands how each of the concrete types in this -// package represents its parent. -func parentCancelCtx(parent Context) (*cancelCtx, bool) { - for { - switch c := parent.(type) { - case *cancelCtx: - return c, true - case *timerCtx: - return &c.cancelCtx, true - case *valueCtx: - parent = c.Context - default: - return nil, false - } - } -} - -// removeChild removes a context from its parent. -func removeChild(parent Context, child canceler) { - p, ok := parentCancelCtx(parent) - if !ok { - return - } - p.mu.Lock() - if p.children != nil { - delete(p.children, child) - } - p.mu.Unlock() -} - -// A canceler is a context type that can be canceled directly. The -// implementations are *cancelCtx and *timerCtx. -type canceler interface { - cancel(removeFromParent bool, err error) - Done() <-chan struct{} -} - -// A cancelCtx can be canceled. When canceled, it also cancels any children -// that implement canceler. -type cancelCtx struct { - Context - - done chan struct{} // closed by the first cancel call. - - mu sync.Mutex - children map[canceler]bool // set to nil by the first cancel call - err error // set to non-nil by the first cancel call -} - -func (c *cancelCtx) Done() <-chan struct{} { - return c.done -} - -func (c *cancelCtx) Err() error { - c.mu.Lock() - defer c.mu.Unlock() - return c.err -} - -func (c *cancelCtx) String() string { - return fmt.Sprintf("%v.WithCancel", c.Context) -} - -// cancel closes c.done, cancels each of c's children, and, if -// removeFromParent is true, removes c from its parent's children. -func (c *cancelCtx) cancel(removeFromParent bool, err error) { - if err == nil { - panic("context: internal error: missing cancel error") - } - c.mu.Lock() - if c.err != nil { - c.mu.Unlock() - return // already canceled - } - c.err = err - close(c.done) - for child := range c.children { - // NOTE: acquiring the child's lock while holding parent's lock. - child.cancel(false, err) - } - c.children = nil - c.mu.Unlock() - - if removeFromParent { - removeChild(c.Context, c) - } -} - -// WithDeadline returns a copy of the parent context with the deadline adjusted -// to be no later than d. If the parent's deadline is already earlier than d, -// WithDeadline(parent, d) is semantically equivalent to parent. The returned -// context's Done channel is closed when the deadline expires, when the returned -// cancel function is called, or when the parent context's Done channel is -// closed, whichever happens first. -// -// Canceling this context releases resources associated with it, so code should -// call cancel as soon as the operations running in this Context complete. -func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) { - if cur, ok := parent.Deadline(); ok && cur.Before(deadline) { - // The current deadline is already sooner than the new one. - return WithCancel(parent) - } - c := &timerCtx{ - cancelCtx: newCancelCtx(parent), - deadline: deadline, - } - propagateCancel(parent, c) - d := deadline.Sub(time.Now()) - if d <= 0 { - c.cancel(true, DeadlineExceeded) // deadline has already passed - return c, func() { c.cancel(true, Canceled) } - } - c.mu.Lock() - defer c.mu.Unlock() - if c.err == nil { - c.timer = time.AfterFunc(d, func() { - c.cancel(true, DeadlineExceeded) - }) - } - return c, func() { c.cancel(true, Canceled) } -} - -// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to -// implement Done and Err. It implements cancel by stopping its timer then -// delegating to cancelCtx.cancel. -type timerCtx struct { - cancelCtx - timer *time.Timer // Under cancelCtx.mu. - - deadline time.Time -} - -func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { - return c.deadline, true -} - -func (c *timerCtx) String() string { - return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now())) -} - -func (c *timerCtx) cancel(removeFromParent bool, err error) { - c.cancelCtx.cancel(false, err) - if removeFromParent { - // Remove this timerCtx from its parent cancelCtx's children. - removeChild(c.cancelCtx.Context, c) - } - c.mu.Lock() - if c.timer != nil { - c.timer.Stop() - c.timer = nil - } - c.mu.Unlock() -} - -// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)). -// -// Canceling this context releases resources associated with it, so code should -// call cancel as soon as the operations running in this Context complete: -// -// func slowOperationWithTimeout(ctx context.Context) (Result, error) { -// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) -// defer cancel() // releases resources if slowOperation completes before timeout elapses -// return slowOperation(ctx) -// } -func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { - return WithDeadline(parent, time.Now().Add(timeout)) -} - -// WithValue returns a copy of parent in which the value associated with key is -// val. -// -// Use context Values only for request-scoped data that transits processes and -// APIs, not for passing optional parameters to functions. -func WithValue(parent Context, key interface{}, val interface{}) Context { - return &valueCtx{parent, key, val} -} - -// A valueCtx carries a key-value pair. It implements Value for that key and -// delegates all other calls to the embedded Context. -type valueCtx struct { - Context - key, val interface{} -} - -func (c *valueCtx) String() string { - return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val) -} - -func (c *valueCtx) Value(key interface{}) interface{} { - if c.key == key { - return c.val - } - return c.Context.Value(key) -} diff --git a/vendor/golang.org/x/net/context/go17.go b/vendor/golang.org/x/net/context/go17.go new file mode 100644 index 00000000..d20f52b7 --- /dev/null +++ b/vendor/golang.org/x/net/context/go17.go @@ -0,0 +1,72 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.7 + +package context + +import ( + "context" // standard library's context, as of Go 1.7 + "time" +) + +var ( + todo = context.TODO() + background = context.Background() +) + +// Canceled is the error returned by Context.Err when the context is canceled. +var Canceled = context.Canceled + +// DeadlineExceeded is the error returned by Context.Err when the context's +// deadline passes. +var DeadlineExceeded = context.DeadlineExceeded + +// WithCancel returns a copy of parent with a new Done channel. The returned +// context's Done channel is closed when the returned cancel function is called +// or when the parent context's Done channel is closed, whichever happens first. +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete. +func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { + ctx, f := context.WithCancel(parent) + return ctx, CancelFunc(f) +} + +// WithDeadline returns a copy of the parent context with the deadline adjusted +// to be no later than d. If the parent's deadline is already earlier than d, +// WithDeadline(parent, d) is semantically equivalent to parent. The returned +// context's Done channel is closed when the deadline expires, when the returned +// cancel function is called, or when the parent context's Done channel is +// closed, whichever happens first. +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete. +func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) { + ctx, f := context.WithDeadline(parent, deadline) + return ctx, CancelFunc(f) +} + +// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)). +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete: +// +// func slowOperationWithTimeout(ctx context.Context) (Result, error) { +// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) +// defer cancel() // releases resources if slowOperation completes before timeout elapses +// return slowOperation(ctx) +// } +func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { + return WithDeadline(parent, time.Now().Add(timeout)) +} + +// WithValue returns a copy of parent in which the value associated with key is +// val. +// +// Use context Values only for request-scoped data that transits processes and +// APIs, not for passing optional parameters to functions. +func WithValue(parent Context, key interface{}, val interface{}) Context { + return context.WithValue(parent, key, val) +} diff --git a/vendor/golang.org/x/net/context/pre_go17.go b/vendor/golang.org/x/net/context/pre_go17.go new file mode 100644 index 00000000..0f35592d --- /dev/null +++ b/vendor/golang.org/x/net/context/pre_go17.go @@ -0,0 +1,300 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !go1.7 + +package context + +import ( + "errors" + "fmt" + "sync" + "time" +) + +// An emptyCtx is never canceled, has no values, and has no deadline. It is not +// struct{}, since vars of this type must have distinct addresses. +type emptyCtx int + +func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { + return +} + +func (*emptyCtx) Done() <-chan struct{} { + return nil +} + +func (*emptyCtx) Err() error { + return nil +} + +func (*emptyCtx) Value(key interface{}) interface{} { + return nil +} + +func (e *emptyCtx) String() string { + switch e { + case background: + return "context.Background" + case todo: + return "context.TODO" + } + return "unknown empty Context" +} + +var ( + background = new(emptyCtx) + todo = new(emptyCtx) +) + +// Canceled is the error returned by Context.Err when the context is canceled. +var Canceled = errors.New("context canceled") + +// DeadlineExceeded is the error returned by Context.Err when the context's +// deadline passes. +var DeadlineExceeded = errors.New("context deadline exceeded") + +// WithCancel returns a copy of parent with a new Done channel. The returned +// context's Done channel is closed when the returned cancel function is called +// or when the parent context's Done channel is closed, whichever happens first. +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete. +func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { + c := newCancelCtx(parent) + propagateCancel(parent, c) + return c, func() { c.cancel(true, Canceled) } +} + +// newCancelCtx returns an initialized cancelCtx. +func newCancelCtx(parent Context) *cancelCtx { + return &cancelCtx{ + Context: parent, + done: make(chan struct{}), + } +} + +// propagateCancel arranges for child to be canceled when parent is. +func propagateCancel(parent Context, child canceler) { + if parent.Done() == nil { + return // parent is never canceled + } + if p, ok := parentCancelCtx(parent); ok { + p.mu.Lock() + if p.err != nil { + // parent has already been canceled + child.cancel(false, p.err) + } else { + if p.children == nil { + p.children = make(map[canceler]bool) + } + p.children[child] = true + } + p.mu.Unlock() + } else { + go func() { + select { + case <-parent.Done(): + child.cancel(false, parent.Err()) + case <-child.Done(): + } + }() + } +} + +// parentCancelCtx follows a chain of parent references until it finds a +// *cancelCtx. This function understands how each of the concrete types in this +// package represents its parent. +func parentCancelCtx(parent Context) (*cancelCtx, bool) { + for { + switch c := parent.(type) { + case *cancelCtx: + return c, true + case *timerCtx: + return c.cancelCtx, true + case *valueCtx: + parent = c.Context + default: + return nil, false + } + } +} + +// removeChild removes a context from its parent. +func removeChild(parent Context, child canceler) { + p, ok := parentCancelCtx(parent) + if !ok { + return + } + p.mu.Lock() + if p.children != nil { + delete(p.children, child) + } + p.mu.Unlock() +} + +// A canceler is a context type that can be canceled directly. The +// implementations are *cancelCtx and *timerCtx. +type canceler interface { + cancel(removeFromParent bool, err error) + Done() <-chan struct{} +} + +// A cancelCtx can be canceled. When canceled, it also cancels any children +// that implement canceler. +type cancelCtx struct { + Context + + done chan struct{} // closed by the first cancel call. + + mu sync.Mutex + children map[canceler]bool // set to nil by the first cancel call + err error // set to non-nil by the first cancel call +} + +func (c *cancelCtx) Done() <-chan struct{} { + return c.done +} + +func (c *cancelCtx) Err() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.err +} + +func (c *cancelCtx) String() string { + return fmt.Sprintf("%v.WithCancel", c.Context) +} + +// cancel closes c.done, cancels each of c's children, and, if +// removeFromParent is true, removes c from its parent's children. +func (c *cancelCtx) cancel(removeFromParent bool, err error) { + if err == nil { + panic("context: internal error: missing cancel error") + } + c.mu.Lock() + if c.err != nil { + c.mu.Unlock() + return // already canceled + } + c.err = err + close(c.done) + for child := range c.children { + // NOTE: acquiring the child's lock while holding parent's lock. + child.cancel(false, err) + } + c.children = nil + c.mu.Unlock() + + if removeFromParent { + removeChild(c.Context, c) + } +} + +// WithDeadline returns a copy of the parent context with the deadline adjusted +// to be no later than d. If the parent's deadline is already earlier than d, +// WithDeadline(parent, d) is semantically equivalent to parent. The returned +// context's Done channel is closed when the deadline expires, when the returned +// cancel function is called, or when the parent context's Done channel is +// closed, whichever happens first. +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete. +func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) { + if cur, ok := parent.Deadline(); ok && cur.Before(deadline) { + // The current deadline is already sooner than the new one. + return WithCancel(parent) + } + c := &timerCtx{ + cancelCtx: newCancelCtx(parent), + deadline: deadline, + } + propagateCancel(parent, c) + d := deadline.Sub(time.Now()) + if d <= 0 { + c.cancel(true, DeadlineExceeded) // deadline has already passed + return c, func() { c.cancel(true, Canceled) } + } + c.mu.Lock() + defer c.mu.Unlock() + if c.err == nil { + c.timer = time.AfterFunc(d, func() { + c.cancel(true, DeadlineExceeded) + }) + } + return c, func() { c.cancel(true, Canceled) } +} + +// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to +// implement Done and Err. It implements cancel by stopping its timer then +// delegating to cancelCtx.cancel. +type timerCtx struct { + *cancelCtx + timer *time.Timer // Under cancelCtx.mu. + + deadline time.Time +} + +func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { + return c.deadline, true +} + +func (c *timerCtx) String() string { + return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now())) +} + +func (c *timerCtx) cancel(removeFromParent bool, err error) { + c.cancelCtx.cancel(false, err) + if removeFromParent { + // Remove this timerCtx from its parent cancelCtx's children. + removeChild(c.cancelCtx.Context, c) + } + c.mu.Lock() + if c.timer != nil { + c.timer.Stop() + c.timer = nil + } + c.mu.Unlock() +} + +// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)). +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete: +// +// func slowOperationWithTimeout(ctx context.Context) (Result, error) { +// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) +// defer cancel() // releases resources if slowOperation completes before timeout elapses +// return slowOperation(ctx) +// } +func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { + return WithDeadline(parent, time.Now().Add(timeout)) +} + +// WithValue returns a copy of parent in which the value associated with key is +// val. +// +// Use context Values only for request-scoped data that transits processes and +// APIs, not for passing optional parameters to functions. +func WithValue(parent Context, key interface{}, val interface{}) Context { + return &valueCtx{parent, key, val} +} + +// A valueCtx carries a key-value pair. It implements Value for that key and +// delegates all other calls to the embedded Context. +type valueCtx struct { + Context + key, val interface{} +} + +func (c *valueCtx) String() string { + return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val) +} + +func (c *valueCtx) Value(key interface{}) interface{} { + if c.key == key { + return c.val + } + return c.Context.Value(key) +} diff --git a/vendor/gopkg.in/check.v1/LICENSE b/vendor/gopkg.in/check.v1/LICENSE deleted file mode 100644 index 545cf2d3..00000000 --- a/vendor/gopkg.in/check.v1/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -Gocheck - A rich testing framework for Go - -Copyright (c) 2010-2013 Gustavo Niemeyer - -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.