Skip to content

Commit e12fada

Browse files
committed
finish with gossip and token ranges
1 parent 49d8543 commit e12fada

File tree

7 files changed

+112
-218
lines changed

7 files changed

+112
-218
lines changed

mutexes/distributed-db/app/app.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ func New() (*App, error) {
2424
return nil, fmt.Errorf("need at least 1 node to talk to")
2525
}
2626

27-
2827
addr := fmt.Sprintf("localhost:%d", *port)
2928
nodes.CurrentNode = addr
30-
tokens := models.NewTokens(nodes, 5)
29+
tokens := models.NewTokens(nodes, 256)
3130
cacheRepo := repositories.NewCache()
3231
httpClient := clients.NewHTTP(addr)
3332
svc := services.NewCache(cacheRepo, httpClient, tokens)

mutexes/distributed-db/clients/http.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,29 @@ func (c *HTTPClient) Get(node string, key string) (models.CacheItem, error) {
4646
return cacheItem[0], nil
4747
}
4848

49-
func (c *HTTPClient) Gossip(node string, nodes []string, tokensChecksum string) error {
49+
func (c *HTTPClient) Gossip(node string, nodes []string, tokensChecksum string) ([]string, error) {
5050
body := models.GossipRequest{
5151
Nodes: nodes,
5252
CreatedAt: time.Now().UTC(),
5353
TokensChecksum: tokensChecksum,
5454
}
5555
req, err := c.makeRequest(http.MethodPost, c.url(node, "gossip"), body)
5656
if err != nil {
57-
return err
57+
return []string{}, err
5858
}
5959

60-
_, err = c.httpClient.Do(req)
60+
res, err := c.httpClient.Do(req)
61+
if err != nil {
62+
return []string{}, err
63+
}
64+
65+
var gossipRes models.GossipResponse
66+
err = json.NewDecoder(res.Body).Decode(&gossipRes)
6167
if err != nil {
62-
return err
68+
return []string{}, err
6369
}
6470

65-
return nil
71+
return gossipRes.Nodes, nil
6672
}
6773

6874
func (c *HTTPClient) Tokens(node string) (models.TokenMappings, error) {

mutexes/distributed-db/controllers/tokens.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
type tokensGetter interface {
12-
GetTokens() map[uint64]string
12+
GetTokens() map[int]string
1313
}
1414

1515
func tokens(svc tokensGetter) http.HandlerFunc {

mutexes/distributed-db/models/flags.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
type Nodes struct {
8-
Map map[string]struct{}
8+
Map map[string]struct{}
99
CurrentNode string
1010
}
1111

@@ -18,11 +18,21 @@ func (n Nodes) String() string {
1818
return strings.Join(n.List(len(n.Map)), ",")
1919
}
2020

21-
func (n Nodes) Add(node string) {
22-
if node == n.CurrentNode {
23-
return
21+
func (n Nodes) Add(nodes ...string) {
22+
for _, node := range nodes {
23+
if node == n.CurrentNode {
24+
return
25+
}
26+
n.Map[node] = struct{}{}
27+
}
28+
}
29+
30+
func (n Nodes) WithCurrentNode() map[string]struct{} {
31+
nodes := map[string]struct{}{n.CurrentNode: {}}
32+
for s := range n.Map {
33+
nodes[s] = struct{}{}
2434
}
25-
n.Map[node] = struct{}{}
35+
return nodes
2636
}
2737

2838
func (n Nodes) List(a int) []string {

mutexes/distributed-db/models/tokens.go

Lines changed: 63 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,49 +6,50 @@ import (
66
"fmt"
77
"hash/fnv"
88
"log"
9+
"math"
910
"math/rand"
11+
"reflect"
1012
"sort"
1113
"time"
1214
)
1315

14-
type TokenMappings map[uint64]string
16+
type TokenMappings map[int]string
1517

1618
type Tokens struct {
1719
Mappings TokenMappings
1820
Nodes Nodes
19-
ranges []uint64
21+
ranges []int
2022
numberOfTokenRanges int
2123
}
2224

23-
func NewTokens(nodes Nodes, numberOfTokenRanges int) Tokens {
24-
numberOfNodes := len(nodes.Map)
25-
tokenRange := uint64(200 / numberOfNodes / numberOfTokenRanges)
26-
ranges := make([]uint64, 0, numberOfNodes*numberOfTokenRanges)
27-
for i := 0; i < numberOfNodes; i++ {
25+
func NewTokens(nodes Nodes, numberOfTokenRanges int) *Tokens {
26+
nodeList := append([]string{nodes.CurrentNode}, nodes.List(len(nodes.Map))...)
27+
tokenRange := math.MaxInt / len(nodeList) / numberOfTokenRanges
28+
ranges := make([]int, 0, len(nodeList)*numberOfTokenRanges)
29+
for i := 0; i < len(nodeList); i++ {
2830
for j := numberOfTokenRanges * i; j < numberOfTokenRanges*(i+1); j++ {
29-
r := tokenRange * uint64(j+1)
31+
r := tokenRange * (j + 1)
3032
// produces a sorted ranges slice => needed for searching
3133
ranges = append(ranges, r)
3234
}
3335
}
3436

35-
randomRanges := append([]uint64{}, ranges...)
37+
randomRanges := append([]int{}, ranges...)
3638
rand.Seed(time.Now().UnixNano())
3739
rand.Shuffle(len(randomRanges), func(i, j int) {
3840
randomRanges[i], randomRanges[j] = randomRanges[j], randomRanges[i]
3941
})
4042

41-
i, mappings := 0, map[uint64]string{}
42-
nodeList := nodes.List(len(nodes.Map))
43+
i, mappings := 0, map[int]string{}
4344
for _, r := range randomRanges {
4445
mappings[r] = nodeList[i]
4546
i++
46-
if i == numberOfNodes {
47+
if i == len(nodeList) {
4748
i = 0
4849
}
4950
}
5051

51-
tokens := Tokens{
52+
tokens := &Tokens{
5253
Mappings: mappings,
5354
Nodes: nodes,
5455
ranges: ranges,
@@ -57,38 +58,64 @@ func NewTokens(nodes Nodes, numberOfTokenRanges int) Tokens {
5758
return tokens
5859
}
5960

60-
func (t *Tokens) GetNode(token uint64) string {
61-
idx := sort.Search(len(t.ranges)-1, func(i int) bool {
62-
return t.ranges[i] >= token
63-
})
61+
func (t *Tokens) GetNode(token int) string {
62+
idx := sort.SearchInts(t.ranges, token)
6463
node := t.Mappings[t.ranges[idx]]
6564
return node
6665
}
6766

68-
func (t *Tokens) AddNode(node string) {
69-
_, ok := t.Nodes.Map[node]
70-
if ok || node == t.Nodes.CurrentNode {
67+
func (t *Tokens) Merge(mappings map[int]string) {
68+
newMappings := map[int]string{}
69+
nodes := t.Nodes.WithCurrentNode()
70+
ranges := t.ranges
71+
m1, m2 := mappings, t.Mappings
72+
if len(mappings) < len(t.Mappings) {
73+
m1 = t.Mappings
74+
m2 = mappings
75+
}
76+
77+
newNodes := map[string]struct{}{}
78+
newRanges := make([]int, 0)
79+
for _, s := range mappings {
80+
newNodes[s] = struct{}{}
81+
}
82+
for r := range m1 {
83+
newRanges = append(newRanges, r)
84+
}
85+
sort.Ints(newRanges)
86+
ranges = newRanges
87+
nodes = newNodes
88+
89+
if reflect.DeepEqual(nodes, t.Nodes.Map) {
7190
return
7291
}
73-
tokenRange := uint64(200 / len(t.Nodes.Map) / t.numberOfTokenRanges)
74-
newRanges := make([]uint64, 0, t.numberOfTokenRanges)
75-
for i := 0; i < len(t.ranges)+t.numberOfTokenRanges; i++ {
76-
if i < len(t.ranges) {
77-
r := t.ranges[i]
78-
decrement := r - tokenRange*(uint64(i+1))
79-
newRange := r - decrement
80-
srv := t.Mappings[r]
8192

82-
delete(t.Mappings, r)
83-
t.Mappings[newRange] = srv
84-
t.ranges[i] = newRange
85-
} else {
86-
newRange := tokenRange * uint64(i+1)
87-
t.Mappings[newRange] = node
88-
newRanges = append(newRanges, newRange)
93+
for s := range nodes {
94+
_, ok := t.Nodes.WithCurrentNode()[s]
95+
if ok {
96+
delete(nodes, s)
8997
}
9098
}
91-
t.ranges = append(t.ranges, newRanges...)
99+
100+
i := 0
101+
numberOfNodes := len(nodes) + len(t.Nodes.WithCurrentNode())
102+
tokenRange := math.MaxInt / numberOfNodes / t.numberOfTokenRanges
103+
m1Nodes := map[string]struct{}{}
104+
for r, s := range m1 {
105+
m1Nodes[s] = struct{}{}
106+
factor := sort.SearchInts(ranges, r) + 1
107+
newMappings[factor*tokenRange] = s
108+
i++
109+
}
110+
for _, s := range m2 {
111+
_, ok := m1Nodes[s]
112+
if ok {
113+
continue
114+
}
115+
i++
116+
newMappings[(i)*tokenRange] = s
117+
}
118+
t.Mappings = newMappings
92119
}
93120

94121
func (t *Tokens) Checksum() string {

mutexes/distributed-db/playground/tokens/main.go

Lines changed: 0 additions & 141 deletions
This file was deleted.

0 commit comments

Comments
 (0)