Skip to content

Commit 487feeb

Browse files
Joris Minjatvmihailenco
authored andcommitted
Add latency based routing to Redis Cluster client.
1 parent 3972f28 commit 487feeb

File tree

11 files changed

+757
-506
lines changed

11 files changed

+757
-506
lines changed

cluster.go

Lines changed: 240 additions & 156 deletions
Large diffs are not rendered by default.

cluster_client_test.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,21 @@ import (
66
)
77

88
func (c *ClusterClient) SlotAddrs(slot int) []string {
9-
return c.slotAddrs(slot)
9+
var addrs []string
10+
for _, n := range c.slotNodes(slot) {
11+
addrs = append(addrs, n.Addr)
12+
}
13+
return addrs
1014
}
1115

1216
// SwapSlot swaps a slot's master/slave address
1317
// for testing MOVED redirects
14-
func (c *ClusterClient) SwapSlot(pos int) []string {
15-
c.slotsMx.Lock()
16-
defer c.slotsMx.Unlock()
17-
c.slots[pos][0], c.slots[pos][1] = c.slots[pos][1], c.slots[pos][0]
18-
return c.slots[pos]
18+
func (c *ClusterClient) SwapSlotNodes(slot int) []string {
19+
c.mu.Lock()
20+
nodes := c.slots[slot]
21+
nodes[0], nodes[1] = nodes[1], nodes[0]
22+
c.mu.Unlock()
23+
return c.SlotAddrs(slot)
1924
}
2025

2126
var _ = Describe("ClusterClient", func() {
@@ -42,19 +47,26 @@ var _ = Describe("ClusterClient", func() {
4247

4348
It("should initialize", func() {
4449
Expect(subject.addrs).To(HaveLen(3))
45-
Expect(subject.slots).To(HaveLen(16384))
4650
})
4751

4852
It("should update slots cache", func() {
4953
populate()
50-
Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"}))
51-
Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"}))
52-
Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"}))
53-
Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"}))
54-
Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"}))
55-
Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"}))
56-
Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"}))
57-
Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"}))
54+
Expect(subject.slots[0][0].Addr).To(Equal("127.0.0.1:7000"))
55+
Expect(subject.slots[0][1].Addr).To(Equal("127.0.0.1:7004"))
56+
Expect(subject.slots[4095][0].Addr).To(Equal("127.0.0.1:7000"))
57+
Expect(subject.slots[4095][1].Addr).To(Equal("127.0.0.1:7004"))
58+
Expect(subject.slots[4096][0].Addr).To(Equal("127.0.0.1:7001"))
59+
Expect(subject.slots[4096][1].Addr).To(Equal("127.0.0.1:7005"))
60+
Expect(subject.slots[8191][0].Addr).To(Equal("127.0.0.1:7001"))
61+
Expect(subject.slots[8191][1].Addr).To(Equal("127.0.0.1:7005"))
62+
Expect(subject.slots[8192][0].Addr).To(Equal("127.0.0.1:7002"))
63+
Expect(subject.slots[8192][1].Addr).To(Equal("127.0.0.1:7006"))
64+
Expect(subject.slots[12287][0].Addr).To(Equal("127.0.0.1:7002"))
65+
Expect(subject.slots[12287][1].Addr).To(Equal("127.0.0.1:7006"))
66+
Expect(subject.slots[12288][0].Addr).To(Equal("127.0.0.1:7003"))
67+
Expect(subject.slots[12288][1].Addr).To(Equal("127.0.0.1:7007"))
68+
Expect(subject.slots[16383][0].Addr).To(Equal("127.0.0.1:7003"))
69+
Expect(subject.slots[16383][1].Addr).To(Equal("127.0.0.1:7007"))
5870
Expect(subject.addrs).To(Equal([]string{
5971
"127.0.0.1:6379",
6072
"127.0.0.1:7003",
@@ -71,11 +83,9 @@ var _ = Describe("ClusterClient", func() {
7183
It("should close", func() {
7284
populate()
7385
Expect(subject.Close()).NotTo(HaveOccurred())
74-
Expect(subject.clients).To(BeEmpty())
75-
Expect(subject.slots[0]).To(BeEmpty())
76-
Expect(subject.slots[8191]).To(BeEmpty())
77-
Expect(subject.slots[8192]).To(BeEmpty())
78-
Expect(subject.slots[16383]).To(BeEmpty())
86+
Expect(subject.addrs).To(BeEmpty())
87+
Expect(subject.nodes).To(BeEmpty())
88+
Expect(subject.slots).To(BeEmpty())
7989
Expect(subject.Ping().Err().Error()).To(Equal("redis: client is closed"))
8090
})
8191
})

cluster_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ var _ = Describe("Cluster", func() {
301301
})
302302

303303
It("should CLUSTER READONLY", func() {
304-
res, err := cluster.primary().Readonly().Result()
304+
res, err := cluster.primary().ReadOnly().Result()
305305
Expect(err).NotTo(HaveOccurred())
306306
Expect(res).To(Equal("OK"))
307307
})
@@ -353,15 +353,15 @@ var _ = Describe("Cluster", func() {
353353
Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
354354

355355
slot := hashtag.Slot("A")
356-
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
356+
Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
357357

358358
val, err := client.Get("A").Result()
359359
Expect(err).NotTo(HaveOccurred())
360360
Expect(val).To(Equal("VALUE"))
361361

362362
Eventually(func() []string {
363363
return client.SlotAddrs(slot)
364-
}, "5s").Should(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
364+
}, "10s").Should(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
365365
})
366366

367367
It("should return error when there are no attempts left", func() {
@@ -371,7 +371,7 @@ var _ = Describe("Cluster", func() {
371371
})
372372

373373
slot := hashtag.Slot("A")
374-
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
374+
Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
375375

376376
err := client.Get("A").Err()
377377
Expect(err).To(HaveOccurred())
@@ -435,7 +435,7 @@ var _ = Describe("Cluster", func() {
435435

436436
It("performs multi-pipelines", func() {
437437
slot := hashtag.Slot("A")
438-
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
438+
Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
439439

440440
pipe := client.Pipeline()
441441
defer pipe.Close()

0 commit comments

Comments
 (0)