diff options
Diffstat (limited to 'vendor/github.com/go-redis/redis')
20 files changed, 651 insertions, 309 deletions
diff --git a/vendor/github.com/go-redis/redis/.travis.yml b/vendor/github.com/go-redis/redis/.travis.yml index f8e0d652e..f4666c593 100644 --- a/vendor/github.com/go-redis/redis/.travis.yml +++ b/vendor/github.com/go-redis/redis/.travis.yml @@ -5,14 +5,14 @@ services: - redis-server go: - - 1.4 - - 1.7 - - 1.8 + - 1.4.x + - 1.7.x + - 1.8.x - tip matrix: allow_failures: - - go: 1.4 + - go: 1.4.x - go: tip install: diff --git a/vendor/github.com/go-redis/redis/README.md b/vendor/github.com/go-redis/redis/README.md index f3c61795e..fd036496d 100644 --- a/vendor/github.com/go-redis/redis/README.md +++ b/vendor/github.com/go-redis/redis/README.md @@ -102,7 +102,7 @@ Some corner cases: vals, err := client.ZInterStore("out", redis.ZStore{Weights: []int64{2, 3}}, "zset1", "zset2").Result() EVAL "return {KEYS[1],ARGV[1]}" 1 "key" "hello" - vals, err := client.Eval("return {KEYS[1],ARGV[1]}", []string{"key"}, []string{"hello"}).Result() + vals, err := client.Eval("return {KEYS[1],ARGV[1]}", []string{"key"}, "hello").Result() ## Benchmark diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index f758b01b9..647a25be3 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -28,18 +28,19 @@ type ClusterOptions struct { // Default is 16. MaxRedirects int - // Enables read queries for a connection to a Redis Cluster slave node. + // Enables read-only commands on slave nodes. ReadOnly bool - - // Enables routing read-only queries to the closest master or slave node. + // Allows routing read-only commands to the closest master or slave node. RouteByLatency bool // Following options are copied from Options struct. OnConnect func(*Conn) error - MaxRetries int - Password string + MaxRetries int + MinRetryBackoff time.Duration + MaxRetryBackoff time.Duration + Password string DialTimeout time.Duration ReadTimeout time.Duration @@ -62,6 +63,19 @@ func (opt *ClusterOptions) init() { if opt.RouteByLatency { opt.ReadOnly = true } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } + switch opt.MaxRetryBackoff { + case -1: + opt.MaxRetryBackoff = 0 + case 0: + opt.MaxRetryBackoff = 512 * time.Millisecond + } } func (opt *ClusterOptions) clientOptions() *Options { @@ -70,9 +84,11 @@ func (opt *ClusterOptions) clientOptions() *Options { return &Options{ OnConnect: opt.OnConnect, - MaxRetries: opt.MaxRetries, - Password: opt.Password, - ReadOnly: opt.ReadOnly, + MaxRetries: opt.MaxRetries, + MinRetryBackoff: opt.MinRetryBackoff, + MaxRetryBackoff: opt.MaxRetryBackoff, + Password: opt.Password, + readOnly: opt.ReadOnly, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, @@ -91,7 +107,9 @@ func (opt *ClusterOptions) clientOptions() *Options { type clusterNode struct { Client *Client Latency time.Duration - loading time.Time + + loading time.Time + generation uint32 } func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode { @@ -122,6 +140,17 @@ func (n *clusterNode) Loading() bool { return !n.loading.IsZero() && time.Since(n.loading) < time.Minute } +func (n *clusterNode) Generation() uint32 { + return n.generation +} + +func (n *clusterNode) SetGeneration(gen uint32) { + if gen < n.generation { + panic("gen < n.generation") + } + n.generation = gen +} + //------------------------------------------------------------------------------ type clusterNodes struct { @@ -131,6 +160,8 @@ type clusterNodes struct { addrs []string nodes map[string]*clusterNode closed bool + + generation uint32 } func newClusterNodes(opt *ClusterOptions) *clusterNodes { @@ -161,6 +192,39 @@ func (c *clusterNodes) Close() error { return firstErr } +func (c *clusterNodes) NextGeneration() uint32 { + c.generation++ + return c.generation +} + +// GC removes unused nodes. +func (c *clusterNodes) GC(generation uint32) error { + var collected []*clusterNode + c.mu.Lock() + for i := 0; i < len(c.addrs); { + addr := c.addrs[i] + node := c.nodes[addr] + if node.Generation() >= generation { + i++ + continue + } + + c.addrs = append(c.addrs[:i], c.addrs[i+1:]...) + delete(c.nodes, addr) + collected = append(collected, node) + } + c.mu.Unlock() + + var firstErr error + for _, node := range collected { + if err := node.Client.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + + return firstErr +} + func (c *clusterNodes) All() ([]*clusterNode, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -176,7 +240,7 @@ func (c *clusterNodes) All() ([]*clusterNode, error) { return nodes, nil } -func (c *clusterNodes) Get(addr string) (*clusterNode, error) { +func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) { var node *clusterNode var ok bool @@ -223,7 +287,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) { var nodeErr error for i := 0; i <= c.opt.MaxRedirects; i++ { n := rand.Intn(len(addrs)) - node, err := c.Get(addrs[n]) + node, err := c.GetOrCreate(addrs[n]) if err != nil { return nil, err } @@ -239,30 +303,45 @@ func (c *clusterNodes) Random() (*clusterNode, error) { //------------------------------------------------------------------------------ type clusterState struct { - nodes *clusterNodes + nodes *clusterNodes + masters []*clusterNode + slaves []*clusterNode + slots [][]*clusterNode + + generation uint32 } func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*clusterState, error) { c := clusterState{ - nodes: nodes, + nodes: nodes, + generation: nodes.NextGeneration(), + slots: make([][]*clusterNode, hashtag.SlotNumber), } isLoopbackOrigin := isLoopbackAddr(origin) for _, slot := range slots { var nodes []*clusterNode - for _, slotNode := range slot.Nodes { + for i, slotNode := range slot.Nodes { addr := slotNode.Addr if !isLoopbackOrigin && isLoopbackAddr(addr) { addr = origin } - node, err := c.nodes.Get(addr) + node, err := c.nodes.GetOrCreate(addr) if err != nil { return nil, err } + + node.SetGeneration(c.generation) nodes = append(nodes, node) + + if i == 0 { + c.masters = appendNode(c.masters, node) + } else { + c.slaves = appendNode(c.slaves, node) + } } for i := slot.Start; i <= slot.End; i++ { @@ -327,7 +406,7 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) { } func (c *clusterState) slotNodes(slot int) []*clusterNode { - if slot < len(c.slots) { + if slot >= 0 && slot < len(c.slots) { return c.slots[slot] } return nil @@ -348,7 +427,7 @@ type ClusterClient struct { cmdsInfoOnce internal.Once cmdsInfo map[string]*CommandInfo - // Reports where slots reloading is in progress. + // Reports whether slots reloading is in progress. reloading uint32 } @@ -365,12 +444,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { // Add initial nodes. for _, addr := range opt.Addrs { - _, _ = c.nodes.Get(addr) + _, _ = c.nodes.GetOrCreate(addr) } // Preload cluster slots. for i := 0; i < 10; i++ { - state, err := c.reloadSlots() + state, err := c.reloadState() if err == nil { c._state.Store(state) break @@ -394,7 +473,7 @@ func (c *ClusterClient) state() *clusterState { if v != nil { return v.(*clusterState) } - c.lazyReloadSlots() + c.lazyReloadState() return nil } @@ -476,6 +555,10 @@ func (c *ClusterClient) Process(cmd Cmder) error { var ask bool for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + time.Sleep(node.Client.retryBackoff(attempt)) + } + if ask { pipe := node.Client.Pipeline() pipe.Process(NewCmd("ASKING")) @@ -487,19 +570,20 @@ func (c *ClusterClient) Process(cmd Cmder) error { err = node.Client.Process(cmd) } - // If there is no (real) error - we are done. + // If there is no error - we are done. if err == nil { return nil } // If slave is loading - read from master. if c.opt.ReadOnly && internal.IsLoadingError(err) { + // TODO: race node.loading = time.Now() continue } // On network errors try random node. - if internal.IsRetryableError(err) { + if internal.IsRetryableError(err) || internal.IsClusterDownError(err) { node, err = c.nodes.Random() if err != nil { cmd.setErr(err) @@ -516,11 +600,11 @@ func (c *ClusterClient) Process(cmd Cmder) error { if state != nil && slot >= 0 { master, _ := state.slotMasterNode(slot) if moved && (master == nil || master.Client.getAddr() != addr) { - c.lazyReloadSlots() + c.lazyReloadState() } } - node, err = c.nodes.Get(addr) + node, err = c.nodes.GetOrCreate(addr) if err != nil { cmd.setErr(err) return err @@ -535,17 +619,17 @@ func (c *ClusterClient) Process(cmd Cmder) error { return cmd.Err() } -// ForEachNode concurrently calls the fn on each ever known node in the cluster. +// ForEachMaster concurrently calls the fn on each master node in the cluster. // It returns the first error if any. -func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { - nodes, err := c.nodes.All() - if err != nil { - return err +func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { + state := c.state() + if state == nil { + return errNilClusterState } var wg sync.WaitGroup errCh := make(chan error, 1) - for _, node := range nodes { + for _, master := range state.masters { wg.Add(1) go func(node *clusterNode) { defer wg.Done() @@ -556,7 +640,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { default: } } - }(node) + }(master) } wg.Wait() @@ -568,28 +652,17 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { } } -// ForEachMaster concurrently calls the fn on each master node in the cluster. +// ForEachSlave concurrently calls the fn on each slave node in the cluster. // It returns the first error if any. -func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { +func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error { state := c.state() if state == nil { return errNilClusterState } var wg sync.WaitGroup - visited := make(map[*clusterNode]struct{}) errCh := make(chan error, 1) - for _, nodes := range state.slots { - if len(nodes) == 0 { - continue - } - - master := nodes[0] - if _, ok := visited[master]; ok { - continue - } - visited[master] = struct{}{} - + for _, slave := range state.slaves { wg.Add(1) go func(node *clusterNode) { defer wg.Done() @@ -600,7 +673,7 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { default: } } - }(master) + }(slave) } wg.Wait() @@ -612,16 +685,64 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error { } } +// ForEachNode concurrently calls the fn on each known node in the cluster. +// It returns the first error if any. +func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error { + state := c.state() + if state == nil { + return errNilClusterState + } + + var wg sync.WaitGroup + errCh := make(chan error, 1) + worker := func(node *clusterNode) { + defer wg.Done() + err := fn(node.Client) + if err != nil { + select { + case errCh <- err: + default: + } + } + } + + for _, node := range state.masters { + wg.Add(1) + go worker(node) + } + for _, node := range state.slaves { + wg.Add(1) + go worker(node) + } + + wg.Wait() + select { + case err := <-errCh: + return err + default: + return nil + } +} + // PoolStats returns accumulated connection pool stats. func (c *ClusterClient) PoolStats() *PoolStats { var acc PoolStats - nodes, err := c.nodes.All() - if err != nil { + state := c.state() + if state == nil { return &acc } - for _, node := range nodes { + for _, node := range state.masters { + s := node.Client.connPool.Stats() + acc.Requests += s.Requests + acc.Hits += s.Hits + acc.Timeouts += s.Timeouts + acc.TotalConns += s.TotalConns + acc.FreeConns += s.FreeConns + } + + for _, node := range state.slaves { s := node.Client.connPool.Stats() acc.Requests += s.Requests acc.Hits += s.Hits @@ -629,33 +750,42 @@ func (c *ClusterClient) PoolStats() *PoolStats { acc.TotalConns += s.TotalConns acc.FreeConns += s.FreeConns } + return &acc } -func (c *ClusterClient) lazyReloadSlots() { +func (c *ClusterClient) lazyReloadState() { if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) { return } go func() { - for i := 0; i < 1000; i++ { - state, err := c.reloadSlots() + defer atomic.StoreUint32(&c.reloading, 0) + + var state *clusterState + for { + var err error + state, err = c.reloadState() if err == pool.ErrClosed { - break + return } - if err == nil { - c._state.Store(state) - break + + if err != nil { + time.Sleep(time.Millisecond) + continue } - time.Sleep(time.Millisecond) + + c._state.Store(state) + break } time.Sleep(3 * time.Second) - atomic.StoreUint32(&c.reloading, 0) + c.nodes.GC(state.generation) }() } -func (c *ClusterClient) reloadSlots() (*clusterState, error) { +// Not thread-safe. +func (c *ClusterClient) reloadState() (*clusterState, error) { node, err := c.nodes.Random() if err != nil { return nil, err @@ -720,14 +850,14 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.conn() + cn, _, err := node.Client.getConn() if err != nil { setCmdsErr(cmds, err) continue } err = c.pipelineProcessCmds(cn, cmds, failedCmds) - node.Client.putConn(cn, err) + node.Client.releaseConn(cn, err) } if len(failedCmds) == 0 { @@ -799,9 +929,9 @@ func (c *ClusterClient) pipelineReadCmds( func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error { moved, ask, addr := internal.IsMovedError(cmd.Err()) if moved { - c.lazyReloadSlots() + c.lazyReloadState() - node, err := c.nodes.Get(addr) + node, err := c.nodes.GetOrCreate(addr) if err != nil { return err } @@ -809,7 +939,7 @@ func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]C failedCmds[node] = append(failedCmds[node], cmd) } if ask { - node, err := c.nodes.Get(addr) + node, err := c.nodes.GetOrCreate(addr) if err != nil { return err } @@ -855,14 +985,14 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.conn() + cn, _, err := node.Client.getConn() if err != nil { setCmdsErr(cmds, err) continue } err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds) - node.Client.putConn(cn, err) + node.Client.releaseConn(cn, err) } if len(failedCmds) == 0 { @@ -966,6 +1096,56 @@ func (c *ClusterClient) txPipelineReadQueued( return firstErr } +func (c *ClusterClient) pubSub(channels []string) *PubSub { + opt := c.opt.clientOptions() + + var node *clusterNode + return &PubSub{ + opt: opt, + + newConn: func(channels []string) (*pool.Conn, error) { + if node == nil { + var slot int + if len(channels) > 0 { + slot = hashtag.Slot(channels[0]) + } else { + slot = -1 + } + + masterNode, err := c.state().slotMasterNode(slot) + if err != nil { + return nil, err + } + node = masterNode + } + return node.Client.newConn() + }, + closeConn: func(cn *pool.Conn) error { + return node.Client.connPool.CloseConn(cn) + }, + } +} + +// Subscribe subscribes the client to the specified channels. +// Channels can be omitted to create empty subscription. +func (c *ClusterClient) Subscribe(channels ...string) *PubSub { + pubsub := c.pubSub(channels) + if len(channels) > 0 { + _ = pubsub.Subscribe(channels...) + } + return pubsub +} + +// PSubscribe subscribes the client to the given patterns. +// Patterns can be omitted to create empty subscription. +func (c *ClusterClient) PSubscribe(channels ...string) *PubSub { + pubsub := c.pubSub(channels) + if len(channels) > 0 { + _ = pubsub.PSubscribe(channels...) + } + return pubsub +} + func isLoopbackAddr(addr string) bool { host, _, err := net.SplitHostPort(addr) if err != nil { @@ -979,3 +1159,12 @@ func isLoopbackAddr(addr string) bool { return ip.IsLoopback() } + +func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode { + for _, n := range nodes { + if n == node { + return nodes + } + } + return append(nodes, node) +} diff --git a/vendor/github.com/go-redis/redis/cluster_test.go b/vendor/github.com/go-redis/redis/cluster_test.go index 3a69255a4..324bd1ce1 100644 --- a/vendor/github.com/go-redis/redis/cluster_test.go +++ b/vendor/github.com/go-redis/redis/cluster_test.go @@ -75,7 +75,7 @@ func startCluster(scenario *clusterScenario) error { scenario.nodeIds[pos] = info[:40] } - // Meet cluster nodes + // Meet cluster nodes. for _, client := range scenario.clients { err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err() if err != nil { @@ -83,7 +83,7 @@ func startCluster(scenario *clusterScenario) error { } } - // Bootstrap masters + // Bootstrap masters. slots := []int{0, 5000, 10000, 16384} for pos, master := range scenario.masters() { err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err() @@ -92,7 +92,7 @@ func startCluster(scenario *clusterScenario) error { } } - // Bootstrap slaves + // Bootstrap slaves. for idx, slave := range scenario.slaves() { masterId := scenario.nodeIds[idx] @@ -115,7 +115,7 @@ func startCluster(scenario *clusterScenario) error { } } - // Wait until all nodes have consistent info + // Wait until all nodes have consistent info. for _, client := range scenario.clients { err := eventually(func() error { res, err := client.ClusterSlots().Result() @@ -189,62 +189,6 @@ var _ = Describe("ClusterClient", func() { var client *redis.ClusterClient assertClusterClient := func() { - It("should CLUSTER SLOTS", func() { - res, err := client.ClusterSlots().Result() - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(HaveLen(3)) - - wanted := []redis.ClusterSlot{ - {0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}}, - {5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}}, - {10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}}, - } - Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) - }) - - It("should CLUSTER NODES", func() { - res, err := client.ClusterNodes().Result() - Expect(err).NotTo(HaveOccurred()) - Expect(len(res)).To(BeNumerically(">", 400)) - }) - - It("should CLUSTER INFO", func() { - res, err := client.ClusterInfo().Result() - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(ContainSubstring("cluster_known_nodes:6")) - }) - - It("should CLUSTER KEYSLOT", func() { - hashSlot, err := client.ClusterKeySlot("somekey").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey")))) - }) - - It("should CLUSTER COUNT-FAILURE-REPORTS", func() { - n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(0))) - }) - - It("should CLUSTER COUNTKEYSINSLOT", func() { - n, err := client.ClusterCountKeysInSlot(10).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(n).To(Equal(int64(0))) - }) - - It("should CLUSTER SAVECONFIG", func() { - res, err := client.ClusterSaveConfig().Result() - Expect(err).NotTo(HaveOccurred()) - Expect(res).To(Equal("OK")) - }) - - It("should CLUSTER SLAVES", func() { - nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result() - Expect(err).NotTo(HaveOccurred()) - Expect(nodesList).Should(ContainElement(ContainSubstring("slave"))) - Expect(nodesList).Should(HaveLen(1)) - }) - It("should GET/SET/DEL", func() { val, err := client.Get("A").Result() Expect(err).To(Equal(redis.Nil)) @@ -254,55 +198,24 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) Expect(val).To(Equal("OK")) - val, err = client.Get("A").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(val).To(Equal("VALUE")) + Eventually(func() string { + return client.Get("A").Val() + }).Should(Equal("VALUE")) cnt, err := client.Del("A").Result() Expect(err).NotTo(HaveOccurred()) Expect(cnt).To(Equal(int64(1))) }) - It("returns pool stats", func() { - Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{})) - }) - - It("removes idle connections", func() { - stats := client.PoolStats() - Expect(stats.TotalConns).NotTo(BeZero()) - Expect(stats.FreeConns).NotTo(BeZero()) - - time.Sleep(2 * time.Second) - - stats = client.PoolStats() - Expect(stats.TotalConns).To(BeZero()) - Expect(stats.FreeConns).To(BeZero()) - }) - It("follows redirects", func() { Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred()) slot := hashtag.Slot("A") - Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) + client.SwapSlotNodes(slot) - val, err := client.Get("A").Result() - Expect(err).NotTo(HaveOccurred()) - Expect(val).To(Equal("VALUE")) - }) - - It("returns an error when there are no attempts left", func() { - opt := redisClusterOptions() - opt.MaxRedirects = -1 - client := cluster.clusterClient(opt) - - slot := hashtag.Slot("A") - Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"})) - - err := client.Get("A").Err() - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("MOVED")) - - Expect(client.Close()).NotTo(HaveOccurred()) + Eventually(func() string { + return client.Get("A").Val() + }).Should(Equal("VALUE")) }) It("distributes keys", func() { @@ -311,9 +224,14 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) } - wanted := []string{"keys=31", "keys=29", "keys=40"} - for i, master := range cluster.masters() { - Expect(master.Info().Val()).To(ContainSubstring(wanted[i])) + for _, master := range cluster.masters() { + Eventually(func() string { + return master.Info("keyspace").Val() + }, 5*time.Second).Should(Or( + ContainSubstring("keys=31"), + ContainSubstring("keys=29"), + ContainSubstring("keys=40"), + )) } }) @@ -330,9 +248,14 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) } - wanted := []string{"keys=31", "keys=29", "keys=40"} - for i, master := range cluster.masters() { - Expect(master.Info().Val()).To(ContainSubstring(wanted[i])) + for _, master := range cluster.masters() { + Eventually(func() string { + return master.Info("keyspace").Val() + }, 5*time.Second).Should(Or( + ContainSubstring("keys=31"), + ContainSubstring("keys=29"), + ContainSubstring("keys=40"), + )) } }) @@ -419,7 +342,8 @@ var _ = Describe("ClusterClient", func() { Expect(get.Val()).To(Equal(key + "_value")) ttl := cmds[(i*2)+1].(*redis.DurationCmd) - Expect(ttl.Val()).To(BeNumerically("~", time.Duration(i+1)*time.Hour, time.Second)) + dur := time.Duration(i+1) * time.Hour + Expect(ttl.Val()).To(BeNumerically("~", dur, 5*time.Second)) } }) @@ -447,7 +371,7 @@ var _ = Describe("ClusterClient", func() { }) } - Describe("Pipeline", func() { + Describe("with Pipeline", func() { BeforeEach(func() { pipe = client.Pipeline().(*redis.Pipeline) }) @@ -459,7 +383,7 @@ var _ = Describe("ClusterClient", func() { assertPipeline() }) - Describe("TxPipeline", func() { + Describe("with TxPipeline", func() { BeforeEach(func() { pipe = client.TxPipeline().(*redis.Pipeline) }) @@ -472,6 +396,76 @@ var _ = Describe("ClusterClient", func() { }) }) + It("supports PubSub", func() { + pubsub := client.Subscribe("mychannel") + defer pubsub.Close() + + Eventually(func() error { + _, err := client.Publish("mychannel", "hello").Result() + if err != nil { + return err + } + + msg, err := pubsub.ReceiveTimeout(time.Second) + if err != nil { + return err + } + + _, ok := msg.(*redis.Message) + if !ok { + return fmt.Errorf("got %T, wanted *redis.Message", msg) + } + + return nil + }, 30*time.Second).ShouldNot(HaveOccurred()) + }) + } + + Describe("ClusterClient", func() { + BeforeEach(func() { + opt = redisClusterOptions() + client = cluster.clusterClient(opt) + + _ = client.ForEachMaster(func(master *redis.Client) error { + return master.FlushDB().Err() + }) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("returns pool stats", func() { + Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{})) + }) + + It("removes idle connections", func() { + stats := client.PoolStats() + Expect(stats.TotalConns).NotTo(BeZero()) + Expect(stats.FreeConns).NotTo(BeZero()) + + time.Sleep(2 * time.Second) + + stats = client.PoolStats() + Expect(stats.TotalConns).To(BeZero()) + Expect(stats.FreeConns).To(BeZero()) + }) + + It("returns an error when there are no attempts left", func() { + opt := redisClusterOptions() + opt.MaxRedirects = -1 + client := cluster.clusterClient(opt) + + slot := hashtag.Slot("A") + client.SwapSlotNodes(slot) + + err := client.Get("A").Err() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("MOVED")) + + Expect(client.Close()).NotTo(HaveOccurred()) + }) + It("calls fn for every master node", func() { for i := 0; i < 10; i++ { Expect(client.Set(strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred()) @@ -483,14 +477,72 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) for _, client := range cluster.masters() { - keys, err := client.Keys("*").Result() + size, err := client.DBSize().Result() Expect(err).NotTo(HaveOccurred()) - Expect(keys).To(HaveLen(0)) + Expect(size).To(Equal(int64(0))) } }) - } - Describe("default ClusterClient", func() { + It("should CLUSTER SLOTS", func() { + res, err := client.ClusterSlots().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(3)) + + wanted := []redis.ClusterSlot{ + {0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}}, + {5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}}, + {10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}}, + } + Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) + }) + + It("should CLUSTER NODES", func() { + res, err := client.ClusterNodes().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(res)).To(BeNumerically(">", 400)) + }) + + It("should CLUSTER INFO", func() { + res, err := client.ClusterInfo().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(ContainSubstring("cluster_known_nodes:6")) + }) + + It("should CLUSTER KEYSLOT", func() { + hashSlot, err := client.ClusterKeySlot("somekey").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey")))) + }) + + It("should CLUSTER COUNT-FAILURE-REPORTS", func() { + n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + }) + + It("should CLUSTER COUNTKEYSINSLOT", func() { + n, err := client.ClusterCountKeysInSlot(10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(Equal(int64(0))) + }) + + It("should CLUSTER SAVECONFIG", func() { + res, err := client.ClusterSaveConfig().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal("OK")) + }) + + It("should CLUSTER SLAVES", func() { + nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(nodesList).Should(ContainElement(ContainSubstring("slave"))) + Expect(nodesList).Should(HaveLen(1)) + }) + + assertClusterClient() + }) + + Describe("ClusterClient failover", func() { BeforeEach(func() { opt = redisClusterOptions() client = cluster.clusterClient(opt) @@ -498,6 +550,13 @@ var _ = Describe("ClusterClient", func() { _ = client.ForEachMaster(func(master *redis.Client) error { return master.FlushDB().Err() }) + + _ = client.ForEachSlave(func(slave *redis.Client) error { + Eventually(func() int64 { + return client.DBSize().Val() + }, 30*time.Second).Should(Equal(int64(0))) + return slave.ClusterFailover().Err() + }) }) AfterEach(func() { @@ -645,14 +704,14 @@ var _ = Describe("ClusterClient timeout", func() { testTimeout() }) - Context("network timeout", func() { + Context("ClientPause timeout", func() { const pause = time.Second BeforeEach(func() { opt := redisClusterOptions() - opt.ReadTimeout = 100 * time.Millisecond - opt.WriteTimeout = 100 * time.Millisecond - opt.MaxRedirects = 1 + opt.ReadTimeout = pause / 10 + opt.WriteTimeout = pause / 10 + opt.MaxRedirects = -1 client = cluster.clusterClient(opt) err := client.ForEachNode(func(client *redis.Client) error { @@ -662,11 +721,12 @@ var _ = Describe("ClusterClient timeout", func() { }) AfterEach(func() { - Eventually(func() error { - return client.ForEachNode(func(client *redis.Client) error { + client.ForEachNode(func(client *redis.Client) error { + Eventually(func() error { return client.Ping().Err() - }) - }, 2*pause).ShouldNot(HaveOccurred()) + }, 2*pause).ShouldNot(HaveOccurred()) + return nil + }) }) testTimeout() diff --git a/vendor/github.com/go-redis/redis/command.go b/vendor/github.com/go-redis/redis/command.go index 361661adf..0e5b2016e 100644 --- a/vendor/github.com/go-redis/redis/command.go +++ b/vendor/github.com/go-redis/redis/command.go @@ -799,7 +799,9 @@ type GeoRadiusQuery struct { WithGeoHash bool Count int // Can be ASC or DESC. Default is no sort order. - Sort string + Sort string + Store string + StoreDist string } type GeoLocationCmd struct { @@ -817,20 +819,28 @@ func NewGeoLocationCmd(q *GeoRadiusQuery, args ...interface{}) *GeoLocationCmd { args = append(args, "km") } if q.WithCoord { - args = append(args, "WITHCOORD") + args = append(args, "withcoord") } if q.WithDist { - args = append(args, "WITHDIST") + args = append(args, "withdist") } if q.WithGeoHash { - args = append(args, "WITHHASH") + args = append(args, "withhash") } if q.Count > 0 { - args = append(args, "COUNT", q.Count) + args = append(args, "count", q.Count) } if q.Sort != "" { args = append(args, q.Sort) } + if q.Store != "" { + args = append(args, "store") + args = append(args, q.Store) + } + if q.StoreDist != "" { + args = append(args, "storedist") + args = append(args, q.StoreDist) + } return &GeoLocationCmd{ baseCmd: baseCmd{_args: args}, q: q, diff --git a/vendor/github.com/go-redis/redis/commands.go b/vendor/github.com/go-redis/redis/commands.go index 4ea78777c..83b3824f8 100644 --- a/vendor/github.com/go-redis/redis/commands.go +++ b/vendor/github.com/go-redis/redis/commands.go @@ -159,6 +159,7 @@ type Cmdable interface { ZIncrXX(key string, member Z) *FloatCmd ZCard(key string) *IntCmd ZCount(key, min, max string) *IntCmd + ZLexCount(key, min, max string) *IntCmd ZIncrBy(key string, increment float64, member string) *FloatCmd ZInterStore(destination string, store ZStore, keys ...string) *IntCmd ZRange(key string, start, stop int64) *StringSliceCmd @@ -190,7 +191,7 @@ type Cmdable interface { ConfigGet(parameter string) *SliceCmd ConfigResetStat() *StatusCmd ConfigSet(parameter, value string) *StatusCmd - DbSize() *IntCmd + DBSize() *IntCmd FlushAll() *StatusCmd FlushAllAsync() *StatusCmd FlushDB() *StatusCmd @@ -234,7 +235,9 @@ type Cmdable interface { GeoAdd(key string, geoLocation ...*GeoLocation) *IntCmd GeoPos(key string, members ...string) *GeoPosCmd GeoRadius(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd + GeoRadiusRO(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd GeoRadiusByMember(key, member string, query *GeoRadiusQuery) *GeoLocationCmd + GeoRadiusByMemberRO(key, member string, query *GeoRadiusQuery) *GeoLocationCmd GeoDist(key string, member1, member2, unit string) *FloatCmd GeoHash(key string, members ...string) *StringSliceCmd Command() *CommandsInfoCmd @@ -1350,6 +1353,12 @@ func (c *cmdable) ZCount(key, min, max string) *IntCmd { return cmd } +func (c *cmdable) ZLexCount(key, min, max string) *IntCmd { + cmd := NewIntCmd("zlexcount", key, min, max) + c.process(cmd) + return cmd +} + func (c *cmdable) ZIncrBy(key string, increment float64, member string) *FloatCmd { cmd := NewFloatCmd("zincrby", key, increment, member) c.process(cmd) @@ -1675,7 +1684,12 @@ func (c *cmdable) ConfigSet(parameter, value string) *StatusCmd { return cmd } +// Deperecated. Use DBSize instead. func (c *cmdable) DbSize() *IntCmd { + return c.DBSize() +} + +func (c *cmdable) DBSize() *IntCmd { cmd := NewIntCmd("dbsize") c.process(cmd) return cmd @@ -1695,9 +1709,7 @@ func (c *cmdable) FlushAllAsync() *StatusCmd { // Deprecated. Use FlushDB instead. func (c *cmdable) FlushDb() *StatusCmd { - cmd := NewStatusCmd("flushdb") - c.process(cmd) - return cmd + return c.FlushDB() } func (c *cmdable) FlushDB() *StatusCmd { @@ -2061,12 +2073,24 @@ func (c *cmdable) GeoRadius(key string, longitude, latitude float64, query *GeoR return cmd } +func (c *cmdable) GeoRadiusRO(key string, longitude, latitude float64, query *GeoRadiusQuery) *GeoLocationCmd { + cmd := NewGeoLocationCmd(query, "georadius_ro", key, longitude, latitude) + c.process(cmd) + return cmd +} + func (c *cmdable) GeoRadiusByMember(key, member string, query *GeoRadiusQuery) *GeoLocationCmd { cmd := NewGeoLocationCmd(query, "georadiusbymember", key, member) c.process(cmd) return cmd } +func (c *cmdable) GeoRadiusByMemberRO(key, member string, query *GeoRadiusQuery) *GeoLocationCmd { + cmd := NewGeoLocationCmd(query, "georadiusbymember_ro", key, member) + c.process(cmd) + return cmd +} + func (c *cmdable) GeoDist(key string, member1, member2, unit string) *FloatCmd { if unit == "" { unit = "km" diff --git a/vendor/github.com/go-redis/redis/commands_test.go b/vendor/github.com/go-redis/redis/commands_test.go index e8cdb205e..4298cba68 100644 --- a/vendor/github.com/go-redis/redis/commands_test.go +++ b/vendor/github.com/go-redis/redis/commands_test.go @@ -139,10 +139,10 @@ var _ = Describe("Commands", func() { Expect(configSet.Val()).To(Equal("OK")) }) - It("should DbSize", func() { - dbSize := client.DbSize() - Expect(dbSize.Err()).NotTo(HaveOccurred()) - Expect(dbSize.Val()).To(Equal(int64(0))) + It("should DBSize", func() { + size, err := client.DBSize().Result() + Expect(err).NotTo(HaveOccurred()) + Expect(size).To(Equal(int64(0))) }) It("should Info", func() { @@ -2176,20 +2176,24 @@ var _ = Describe("Commands", func() { }) It("should ZCount", func() { - zAdd := client.ZAdd("zset", redis.Z{1, "one"}) - Expect(zAdd.Err()).NotTo(HaveOccurred()) - zAdd = client.ZAdd("zset", redis.Z{2, "two"}) - Expect(zAdd.Err()).NotTo(HaveOccurred()) - zAdd = client.ZAdd("zset", redis.Z{3, "three"}) - Expect(zAdd.Err()).NotTo(HaveOccurred()) + err := client.ZAdd("zset", redis.Z{1, "one"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset", redis.Z{2, "two"}).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd("zset", redis.Z{3, "three"}).Err() + Expect(err).NotTo(HaveOccurred()) - zCount := client.ZCount("zset", "-inf", "+inf") - Expect(zCount.Err()).NotTo(HaveOccurred()) - Expect(zCount.Val()).To(Equal(int64(3))) + count, err := client.ZCount("zset", "-inf", "+inf").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(count).To(Equal(int64(3))) - zCount = client.ZCount("zset", "(1", "3") - Expect(zCount.Err()).NotTo(HaveOccurred()) - Expect(zCount.Val()).To(Equal(int64(2))) + count, err = client.ZCount("zset", "(1", "3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(count).To(Equal(int64(2))) + + count, err = client.ZLexCount("zset", "-", "+").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(count).To(Equal(int64(3))) }) It("should ZIncrBy", func() { diff --git a/vendor/github.com/go-redis/redis/example_test.go b/vendor/github.com/go-redis/redis/example_test.go index 319ea0ca2..7e04cd487 100644 --- a/vendor/github.com/go-redis/redis/example_test.go +++ b/vendor/github.com/go-redis/redis/example_test.go @@ -122,13 +122,13 @@ func ExampleClient_Set() { } func ExampleClient_Incr() { - if err := client.Incr("counter").Err(); err != nil { + result, err := client.Incr("counter").Result() + if err != nil { panic(err) } - n, err := client.Get("counter").Int64() - fmt.Println(n, err) - // Output: 1 <nil> + fmt.Println(result) + // Output: 1 } func ExampleClient_BLPop() { diff --git a/vendor/github.com/go-redis/redis/export_test.go b/vendor/github.com/go-redis/redis/export_test.go index b88e41be9..3b7965d79 100644 --- a/vendor/github.com/go-redis/redis/export_test.go +++ b/vendor/github.com/go-redis/redis/export_test.go @@ -28,8 +28,9 @@ func (c *ClusterClient) SlotAddrs(slot int) []string { } // SwapSlot swaps a slot's master/slave address for testing MOVED redirects. -func (c *ClusterClient) SwapSlotNodes(slot int) []string { +func (c *ClusterClient) SwapSlotNodes(slot int) { nodes := c.state().slots[slot] - nodes[0], nodes[1] = nodes[1], nodes[0] - return c.SlotAddrs(slot) + if len(nodes) == 2 { + nodes[0], nodes[1] = nodes[1], nodes[0] + } } diff --git a/vendor/github.com/go-redis/redis/internal/errors.go b/vendor/github.com/go-redis/redis/internal/error.go index c93e00818..90f6503a1 100644 --- a/vendor/github.com/go-redis/redis/internal/errors.go +++ b/vendor/github.com/go-redis/redis/internal/error.go @@ -67,9 +67,9 @@ func IsMovedError(err error) (moved bool, ask bool, addr string) { } func IsLoadingError(err error) bool { - return strings.HasPrefix(err.Error(), "LOADING") + return strings.HasPrefix(err.Error(), "LOADING ") } -func IsExecAbortError(err error) bool { - return strings.HasPrefix(err.Error(), "EXECABORT") +func IsClusterDownError(err error) bool { + return strings.HasPrefix(err.Error(), "CLUSTERDOWN ") } diff --git a/vendor/github.com/go-redis/redis/internal/internal.go b/vendor/github.com/go-redis/redis/internal/internal.go index fb4efa5f0..ad3fc3c9f 100644 --- a/vendor/github.com/go-redis/redis/internal/internal.go +++ b/vendor/github.com/go-redis/redis/internal/internal.go @@ -5,19 +5,20 @@ import ( "time" ) -const retryBackoff = 8 * time.Millisecond - // Retry backoff with jitter sleep to prevent overloaded conditions during intervals // https://www.awsarchitectureblog.com/2015/03/backoff.html -func RetryBackoff(retry int, maxRetryBackoff time.Duration) time.Duration { +func RetryBackoff(retry int, minBackoff, maxBackoff time.Duration) time.Duration { if retry < 0 { retry = 0 } - backoff := retryBackoff << uint(retry) - if backoff > maxRetryBackoff { - backoff = maxRetryBackoff + backoff := minBackoff << uint(retry) + if backoff > maxBackoff || backoff < minBackoff { + backoff = maxBackoff } + if backoff == 0 { + return 0 + } return time.Duration(rand.Int63n(int64(backoff))) } diff --git a/vendor/github.com/go-redis/redis/internal/internal_test.go b/vendor/github.com/go-redis/redis/internal/internal_test.go index 5c7000e1e..56ff611e1 100644 --- a/vendor/github.com/go-redis/redis/internal/internal_test.go +++ b/vendor/github.com/go-redis/redis/internal/internal_test.go @@ -2,15 +2,16 @@ package internal import ( "testing" - . "github.com/onsi/gomega" "time" + + . "github.com/onsi/gomega" ) func TestRetryBackoff(t *testing.T) { RegisterTestingT(t) - - for i := -1; i<= 8; i++ { - backoff := RetryBackoff(i, 512*time.Millisecond) + + for i := -1; i <= 16; i++ { + backoff := RetryBackoff(i, time.Millisecond, 512*time.Millisecond) Expect(backoff >= 0).To(BeTrue()) Expect(backoff <= 512*time.Millisecond).To(BeTrue()) } diff --git a/vendor/github.com/go-redis/redis/internal/pool/pool.go b/vendor/github.com/go-redis/redis/internal/pool/pool.go index a4e650847..25e78aa3c 100644 --- a/vendor/github.com/go-redis/redis/internal/pool/pool.go +++ b/vendor/github.com/go-redis/redis/internal/pool/pool.go @@ -119,6 +119,10 @@ func (p *ConnPool) NewConn() (*Conn, error) { func (p *ConnPool) tryDial() { for { + if p.closed() { + return + } + conn, err := p.opt.Dialer() if err != nil { p.setLastDialError(err) diff --git a/vendor/github.com/go-redis/redis/options.go b/vendor/github.com/go-redis/redis/options.go index cd6fa981a..dea045453 100644 --- a/vendor/github.com/go-redis/redis/options.go +++ b/vendor/github.com/go-redis/redis/options.go @@ -37,9 +37,11 @@ type Options struct { // Maximum number of retries before giving up. // Default is to not retry failed commands. MaxRetries int - + // Minimum backoff between each retry. + // Default is 8 milliseconds; -1 disables backoff. + MinRetryBackoff time.Duration // Maximum backoff between each retry. - // Default is 512 seconds; -1 disables backoff. + // Default is 512 milliseconds; -1 disables backoff. MaxRetryBackoff time.Duration // Dial timeout for establishing new connections. @@ -71,7 +73,7 @@ type Options struct { IdleCheckFrequency time.Duration // Enables read only queries on slave nodes. - ReadOnly bool + readOnly bool // TLS Config to use. When set TLS will be negotiated. TLSConfig *tls.Config @@ -118,6 +120,13 @@ func (opt *Options) init() { if opt.IdleCheckFrequency == 0 { opt.IdleCheckFrequency = time.Minute } + + switch opt.MinRetryBackoff { + case -1: + opt.MinRetryBackoff = 0 + case 0: + opt.MinRetryBackoff = 8 * time.Millisecond + } switch opt.MaxRetryBackoff { case -1: opt.MaxRetryBackoff = 0 diff --git a/vendor/github.com/go-redis/redis/pubsub.go b/vendor/github.com/go-redis/redis/pubsub.go index 4872b4e88..4a5c65f57 100644 --- a/vendor/github.com/go-redis/redis/pubsub.go +++ b/vendor/github.com/go-redis/redis/pubsub.go @@ -17,7 +17,10 @@ import ( // PubSub automatically resubscribes to the channels and patterns // when Redis becomes unavailable. type PubSub struct { - base baseClient + opt *Options + + newConn func([]string) (*pool.Conn, error) + closeConn func(*pool.Conn) error mu sync.Mutex cn *pool.Conn @@ -30,12 +33,12 @@ type PubSub struct { func (c *PubSub) conn() (*pool.Conn, error) { c.mu.Lock() - cn, err := c._conn() + cn, err := c._conn(nil) c.mu.Unlock() return cn, err } -func (c *PubSub) _conn() (*pool.Conn, error) { +func (c *PubSub) _conn(channels []string) (*pool.Conn, error) { if c.closed { return nil, pool.ErrClosed } @@ -44,20 +47,13 @@ func (c *PubSub) _conn() (*pool.Conn, error) { return c.cn, nil } - cn, err := c.base.connPool.NewConn() + cn, err := c.newConn(channels) if err != nil { return nil, err } - if !cn.Inited { - if err := c.base.initConn(cn); err != nil { - _ = c.base.connPool.CloseConn(cn) - return nil, err - } - } - if err := c.resubscribe(cn); err != nil { - _ = c.base.connPool.CloseConn(cn) + _ = c.closeConn(cn) return nil, err } @@ -88,24 +84,24 @@ func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) } cmd := NewSliceCmd(args...) - cn.SetWriteTimeout(c.base.opt.WriteTimeout) + cn.SetWriteTimeout(c.opt.WriteTimeout) return writeCmd(cn, cmd) } -func (c *PubSub) putConn(cn *pool.Conn, err error) { - if !internal.IsBadConn(err, true) { - return - } - +func (c *PubSub) releaseConn(cn *pool.Conn, err error) { c.mu.Lock() - if c.cn == cn { - _ = c.closeConn() - } + c._releaseConn(cn, err) c.mu.Unlock() } -func (c *PubSub) closeConn() error { - err := c.base.connPool.CloseConn(c.cn) +func (c *PubSub) _releaseConn(cn *pool.Conn, err error) { + if internal.IsBadConn(err, true) && c.cn == cn { + _ = c.closeTheCn() + } +} + +func (c *PubSub) closeTheCn() error { + err := c.closeConn(c.cn) c.cn = nil return err } @@ -120,7 +116,7 @@ func (c *PubSub) Close() error { c.closed = true if c.cn != nil { - return c.closeConn() + return c.closeTheCn() } return nil } @@ -166,13 +162,13 @@ func (c *PubSub) PUnsubscribe(patterns ...string) error { } func (c *PubSub) subscribe(redisCmd string, channels ...string) error { - cn, err := c._conn() + cn, err := c._conn(channels) if err != nil { return err } err = c._subscribe(cn, redisCmd, channels...) - c.putConn(cn, err) + c._releaseConn(cn, err) return err } @@ -188,9 +184,9 @@ func (c *PubSub) Ping(payload ...string) error { return err } - cn.SetWriteTimeout(c.base.opt.WriteTimeout) + cn.SetWriteTimeout(c.opt.WriteTimeout) err = writeCmd(cn, cmd) - c.putConn(cn, err) + c.releaseConn(cn, err) return err } @@ -283,7 +279,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) { cn.SetReadTimeout(timeout) err = c.cmd.readReply(cn) - c.putConn(cn, err) + c.releaseConn(cn, err) if err != nil { return nil, err } diff --git a/vendor/github.com/go-redis/redis/pubsub_test.go b/vendor/github.com/go-redis/redis/pubsub_test.go index e8589f461..1d9dfcb99 100644 --- a/vendor/github.com/go-redis/redis/pubsub_test.go +++ b/vendor/github.com/go-redis/redis/pubsub_test.go @@ -159,9 +159,9 @@ var _ = Describe("PubSub", func() { { msgi, err := pubsub.ReceiveTimeout(time.Second) Expect(err).NotTo(HaveOccurred()) - subscr := msgi.(*redis.Message) - Expect(subscr.Channel).To(Equal("mychannel")) - Expect(subscr.Payload).To(Equal("hello")) + msg := msgi.(*redis.Message) + Expect(msg.Channel).To(Equal("mychannel")) + Expect(msg.Payload).To(Equal("hello")) } { @@ -294,6 +294,22 @@ var _ = Describe("PubSub", func() { Expect(stats.Hits).To(Equal(uint32(1))) }) + It("returns an error when subscribe fails", func() { + pubsub := client.Subscribe() + defer pubsub.Close() + + pubsub.SetNetConn(&badConn{ + readErr: io.EOF, + writeErr: io.EOF, + }) + + err := pubsub.Subscribe("mychannel") + Expect(err).To(MatchError("EOF")) + + err = pubsub.Subscribe("mychannel") + Expect(err).NotTo(HaveOccurred()) + }) + expectReceiveMessageOnError := func(pubsub *redis.PubSub) { pubsub.SetNetConn(&badConn{ readErr: io.EOF, @@ -384,8 +400,11 @@ var _ = Describe("PubSub", func() { pubsub := client.Subscribe() defer pubsub.Close() + var wg sync.WaitGroup + wg.Add(1) go func() { defer GinkgoRecover() + defer wg.Done() time.Sleep(2 * timeout) @@ -402,5 +421,7 @@ var _ = Describe("PubSub", func() { Expect(err).NotTo(HaveOccurred()) Expect(msg.Channel).To(Equal("mychannel")) Expect(msg.Payload).To(Equal("hello")) + + wg.Wait() }) }) diff --git a/vendor/github.com/go-redis/redis/redis.go b/vendor/github.com/go-redis/redis/redis.go index 9812daf66..b18973cdb 100644 --- a/vendor/github.com/go-redis/redis/redis.go +++ b/vendor/github.com/go-redis/redis/redis.go @@ -21,7 +21,23 @@ func (c *baseClient) String() string { return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) } -func (c *baseClient) conn() (*pool.Conn, bool, error) { +func (c *baseClient) newConn() (*pool.Conn, error) { + cn, err := c.connPool.NewConn() + if err != nil { + return nil, err + } + + if !cn.Inited { + if err := c.initConn(cn); err != nil { + _ = c.connPool.CloseConn(cn) + return nil, err + } + } + + return cn, nil +} + +func (c *baseClient) getConn() (*pool.Conn, bool, error) { cn, isNew, err := c.connPool.Get() if err != nil { return nil, false, err @@ -37,7 +53,7 @@ func (c *baseClient) conn() (*pool.Conn, bool, error) { return cn, isNew, nil } -func (c *baseClient) putConn(cn *pool.Conn, err error) bool { +func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { if internal.IsBadConn(err, false) { _ = c.connPool.Remove(cn) return false @@ -52,7 +68,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error { if c.opt.Password == "" && c.opt.DB == 0 && - !c.opt.ReadOnly && + !c.opt.readOnly && c.opt.OnConnect == nil { return nil } @@ -75,7 +91,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error { pipe.Select(c.opt.DB) } - if c.opt.ReadOnly { + if c.opt.readOnly { pipe.ReadOnly() } @@ -91,13 +107,6 @@ func (c *baseClient) initConn(cn *pool.Conn) error { return nil } -func (c *baseClient) Process(cmd Cmder) error { - if c.process != nil { - return c.process(cmd) - } - return c.defaultProcess(cmd) -} - // WrapProcess replaces the process func. It takes a function createWrapper // which is supplied by the user. createWrapper takes the old process func as // an input and returns the new wrapper process func. createWrapper should @@ -106,13 +115,20 @@ func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func( c.process = fn(c.defaultProcess) } +func (c *baseClient) Process(cmd Cmder) error { + if c.process != nil { + return c.process(cmd) + } + return c.defaultProcess(cmd) +} + func (c *baseClient) defaultProcess(cmd Cmder) error { - for i := 0; i <= c.opt.MaxRetries; i++ { - if i > 0 { - time.Sleep(internal.RetryBackoff(i, c.opt.MaxRetryBackoff)) + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + if attempt > 0 { + time.Sleep(c.retryBackoff(attempt)) } - cn, _, err := c.conn() + cn, _, err := c.getConn() if err != nil { cmd.setErr(err) if internal.IsRetryableError(err) { @@ -123,7 +139,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { cn.SetWriteTimeout(c.opt.WriteTimeout) if err := writeCmd(cn, cmd); err != nil { - c.putConn(cn, err) + c.releaseConn(cn, err) cmd.setErr(err) if internal.IsRetryableError(err) { continue @@ -133,7 +149,7 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { cn.SetReadTimeout(c.cmdTimeout(cmd)) err = cmd.readReply(cn) - c.putConn(cn, err) + c.releaseConn(cn, err) if err != nil && internal.IsRetryableError(err) { continue } @@ -144,6 +160,10 @@ func (c *baseClient) defaultProcess(cmd Cmder) error { return cmd.Err() } +func (c *baseClient) retryBackoff(attempt int) time.Duration { + return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) +} + func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { if timeout := cmd.readTimeout(); timeout != nil { return *timeout @@ -179,14 +199,14 @@ func (c *baseClient) pipelineExecer(p pipelineProcessor) pipelineExecer { return func(cmds []Cmder) error { var firstErr error for i := 0; i <= c.opt.MaxRetries; i++ { - cn, _, err := c.conn() + cn, _, err := c.getConn() if err != nil { setCmdsErr(cmds, err) return err } canRetry, err := p(cn, cmds) - c.putConn(cn, err) + c.releaseConn(cn, err) if err == nil { return nil } @@ -375,10 +395,12 @@ func (c *Client) TxPipeline() Pipeliner { func (c *Client) pubSub() *PubSub { return &PubSub{ - base: baseClient{ - opt: c.opt, - connPool: c.connPool, + opt: c.opt, + + newConn: func(channels []string) (*pool.Conn, error) { + return c.newConn() }, + closeConn: c.connPool.CloseConn, } } diff --git a/vendor/github.com/go-redis/redis/ring.go b/vendor/github.com/go-redis/redis/ring.go index be9251096..72d52bf75 100644 --- a/vendor/github.com/go-redis/redis/ring.go +++ b/vendor/github.com/go-redis/redis/ring.go @@ -423,7 +423,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { continue } - cn, _, err := shard.Client.conn() + cn, _, err := shard.Client.getConn() if err != nil { setCmdsErr(cmds, err) if firstErr == nil { @@ -433,7 +433,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) { } canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) - shard.Client.putConn(cn, err) + shard.Client.releaseConn(cn, err) if err == nil { continue } diff --git a/vendor/github.com/go-redis/redis/sentinel.go b/vendor/github.com/go-redis/redis/sentinel.go index ed6e7ffb3..3bfdb4a3f 100644 --- a/vendor/github.com/go-redis/redis/sentinel.go +++ b/vendor/github.com/go-redis/redis/sentinel.go @@ -112,10 +112,12 @@ func newSentinel(opt *Options) *sentinelClient { func (c *sentinelClient) PubSub() *PubSub { return &PubSub{ - base: baseClient{ - opt: c.opt, - connPool: c.connPool, + opt: c.opt, + + newConn: func(channels []string) (*pool.Conn, error) { + return c.newConn() }, + closeConn: c.connPool.CloseConn, } } @@ -149,14 +151,6 @@ func (d *sentinelFailover) Close() error { return d.resetSentinel() } -func (d *sentinelFailover) dial() (net.Conn, error) { - addr, err := d.MasterAddr() - if err != nil { - return nil, err - } - return net.DialTimeout("tcp", addr, d.opt.DialTimeout) -} - func (d *sentinelFailover) Pool() *pool.ConnPool { d.poolOnce.Do(func() { d.opt.Dialer = d.dial @@ -165,6 +159,14 @@ func (d *sentinelFailover) Pool() *pool.ConnPool { return d.pool } +func (d *sentinelFailover) dial() (net.Conn, error) { + addr, err := d.MasterAddr() + if err != nil { + return nil, err + } + return net.DialTimeout("tcp", addr, d.opt.DialTimeout) +} + func (d *sentinelFailover) MasterAddr() (string, error) { d.mu.Lock() defer d.mu.Unlock() diff --git a/vendor/github.com/go-redis/redis/universal.go b/vendor/github.com/go-redis/redis/universal.go index 02ed51abd..4aa579fa4 100644 --- a/vendor/github.com/go-redis/redis/universal.go +++ b/vendor/github.com/go-redis/redis/universal.go @@ -17,12 +17,11 @@ type UniversalOptions struct { // Only single-node and failover clients. DB int + // Only cluster clients. + // Enables read only queries on slave nodes. - // Only cluster and single-node clients. ReadOnly bool - // Only cluster clients. - MaxRedirects int RouteByLatency bool @@ -93,7 +92,6 @@ func (o *UniversalOptions) simple() *Options { return &Options{ Addr: addr, DB: o.DB, - ReadOnly: o.ReadOnly, MaxRetries: o.MaxRetries, Password: o.Password, |