From 961c04cae992eadb42d286d2f85f8a675bdc68c8 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Mon, 29 Jan 2018 14:17:40 -0800 Subject: Upgrading server dependancies (#8154) --- vendor/github.com/go-redis/redis/cluster.go | 76 ++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 17 deletions(-) (limited to 'vendor/github.com/go-redis/redis/cluster.go') diff --git a/vendor/github.com/go-redis/redis/cluster.go b/vendor/github.com/go-redis/redis/cluster.go index c81fc1d57..accdb3d27 100644 --- a/vendor/github.com/go-redis/redis/cluster.go +++ b/vendor/github.com/go-redis/redis/cluster.go @@ -226,7 +226,7 @@ func (c *clusterNodes) NextGeneration() uint32 { } // GC removes unused nodes. -func (c *clusterNodes) GC(generation uint32) error { +func (c *clusterNodes) GC(generation uint32) { var collected []*clusterNode c.mu.Lock() for i := 0; i < len(c.addrs); { @@ -243,14 +243,11 @@ func (c *clusterNodes) GC(generation uint32) error { } c.mu.Unlock() - var firstErr error - for _, node := range collected { - if err := node.Client.Close(); err != nil && firstErr == nil { - firstErr = err + time.AfterFunc(time.Minute, func() { + for _, node := range collected { + _ = node.Client.Close() } - } - - return firstErr + }) } func (c *clusterNodes) All() ([]*clusterNode, error) { @@ -533,16 +530,22 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo { return info } +func cmdSlot(cmd Cmder, pos int) int { + if pos == 0 { + return hashtag.RandomSlot() + } + firstKey := cmd.stringArg(pos) + return hashtag.Slot(firstKey) +} + func (c *ClusterClient) cmdSlot(cmd Cmder) int { cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) - return hashtag.Slot(firstKey) + return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) } func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) { cmdInfo := c.cmdInfo(cmd.Name()) - firstKey := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) - slot := hashtag.Slot(firstKey) + slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo)) if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly { if c.opt.RouteByLatency { @@ -590,6 +593,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { break } + if internal.IsRetryableError(err, true) { + continue + } + moved, ask, addr := internal.IsMovedError(err) if moved || ask { c.lazyReloadState() @@ -600,6 +607,13 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error { continue } + if err == pool.ErrClosed { + node, err = state.slotMasterNode(slot) + if err != nil { + return err + } + } + return err } @@ -635,10 +649,10 @@ func (c *ClusterClient) Process(cmd Cmder) error { if ask { pipe := node.Client.Pipeline() - pipe.Process(NewCmd("ASKING")) - pipe.Process(cmd) + _ = pipe.Process(NewCmd("ASKING")) + _ = pipe.Process(cmd) _, err = pipe.Exec() - pipe.Close() + _ = pipe.Close() ask = false } else { err = node.Client.Process(cmd) @@ -679,6 +693,14 @@ func (c *ClusterClient) Process(cmd Cmder) error { continue } + if err == pool.ErrClosed { + _, node, err = c.cmdSlotAndNode(state, cmd) + if err != nil { + cmd.setErr(err) + return err + } + } + break } @@ -915,7 +937,11 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error { for node, cmds := range cmdsMap { cn, _, err := node.Client.getConn() if err != nil { - setCmdsErr(cmds, err) + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } continue } @@ -955,6 +981,18 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e return cmdsMap, nil } +func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) { + remappedCmds, err := c.mapCmdsByNode(cmds) + if err != nil { + setCmdsErr(cmds, err) + return + } + + for node, cmds := range remappedCmds { + failedCmds[node] = cmds + } +} + func (c *ClusterClient) pipelineProcessCmds( node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { @@ -1061,7 +1099,11 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error { for node, cmds := range cmdsMap { cn, _, err := node.Client.getConn() if err != nil { - setCmdsErr(cmds, err) + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmds) + } else { + setCmdsErr(cmds, err) + } continue } -- cgit v1.2.3-1-g7c22