Skip to content

Commit 898d064

Browse files
committed
finish with the gossip up and down statuses
1 parent 6f37192 commit 898d064

File tree

10 files changed

+169
-75
lines changed

10 files changed

+169
-75
lines changed

mutexes/distributed-db/app/app.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,23 @@ import (
1616
)
1717

1818
func New() (*App, error) {
19-
nodes := models.Nodes{Map: map[string]struct{}{}}
19+
nodesMap := models.NodesMap{}
2020
port := flag.Int("port", 8080, "the port of the running server")
2121
dataDir := flag.String("data", "", "the data directory of the running server")
22-
flag.Var(&nodes, "node", "the list of nodes to talk to")
22+
flag.Var(&nodesMap, "node", "the list of nodes to talk to")
2323

2424
flag.Parse()
2525

26-
addr := fmt.Sprintf("localhost:%d", *port)
27-
if len(nodes.Map) < 1 {
26+
if len(nodesMap) < 1 {
2827
return nil, fmt.Errorf("need at least 1 node to talk to")
2928
}
29+
30+
addr := fmt.Sprintf("localhost:%d", *port)
3031
if *dataDir == "" {
3132
*dataDir = fmt.Sprintf(".data/%s", addr)
3233
}
3334

34-
nodes.CurrentNode = addr
35+
nodes := models.NewNodes(addr, nodesMap)
3536
tokens := models.NewTokens(nodes, 256)
3637
cacheRepo := repositories.NewCache(*dataDir)
3738
httpClient := clients.NewHTTP(addr)

mutexes/distributed-db/clients/http.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,25 +68,25 @@ func (c *HTTPClient) Get(node string, keys []string) ([]models.CacheItem, error)
6868
return cacheItems, nil
6969
}
7070

71-
func (c *HTTPClient) Gossip(node string, nodes []string, tokensChecksum string) ([]string, error) {
71+
func (c *HTTPClient) Gossip(node string, nodes models.NodesMap, tokensChecksum string) (models.NodesMap, error) {
7272
body := models.GossipRequest{
7373
Nodes: nodes,
7474
TokensChecksum: tokensChecksum,
7575
}
7676
req, err := c.makeRequest(http.MethodPost, c.url(node, "gossip"), body)
7777
if err != nil {
78-
return []string{}, err
78+
return models.NodesMap{}, err
7979
}
8080

8181
res, err := c.httpClient.Do(req)
8282
if err != nil {
83-
return []string{}, err
83+
return models.NodesMap{}, err
8484
}
8585

8686
var gossipRes models.GossipResponse
8787
err = json.NewDecoder(res.Body).Decode(&gossipRes)
8888
if err != nil {
89-
return []string{}, err
89+
return models.NodesMap{}, err
9090
}
9191

9292
return gossipRes.Nodes, nil

mutexes/distributed-db/controllers/gossip.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
type tokensUpdater interface {
12-
UpdateTokens(node string, newNodes []string, tokensChecksum string) ([]string, error)
12+
UpdateTokens(node string, newNodes models.NodesMap, tokensChecksum string) (oldNodes models.NodesMap, err error)
1313
}
1414

1515
func gossip(svc tokensUpdater) http.HandlerFunc {
Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,21 @@
11
package models
22

33
import (
4+
"fmt"
45
"strings"
56
)
67

7-
type Nodes struct {
8-
Map map[string]struct{}
9-
CurrentNode string
10-
}
8+
type NodesMap map[string]int
119

12-
func (n Nodes) Set(s string) error {
13-
n.Add(s)
10+
func (m NodesMap) Set(element string) error {
11+
m[element] = NodeStatusUp
1412
return nil
1513
}
1614

17-
func (n Nodes) String() string {
18-
return strings.Join(n.List(len(n.Map)), ",")
19-
}
20-
21-
func (n Nodes) Add(nodes ...string) {
22-
for _, node := range nodes {
23-
if node == n.CurrentNode {
24-
continue
25-
}
26-
// update some kind of UpdateAt field for nodes health check
27-
// change from map[string]struct{} to map[string]int => states: Up/Down
28-
// a node is Down if it hasn't sent a gossip request in 10 seconds
29-
n.Map[node] = struct{}{}
30-
}
31-
}
32-
33-
func (n Nodes) WithCurrentNode() map[string]struct{} {
34-
nodes := map[string]struct{}{n.CurrentNode: {}}
35-
for s := range n.Map {
36-
nodes[s] = struct{}{}
37-
}
38-
return nodes
39-
}
40-
41-
func (n Nodes) List(a int) []string {
42-
i, keys := 0, make([]string, 0, len(n.Map))
43-
for k := range n.Map {
44-
if i == a {
45-
break
46-
}
47-
keys = append(keys, k)
48-
i++
15+
func (m NodesMap) String() string {
16+
list := make([]string, 0, len(m))
17+
for node, status := range m {
18+
list = append(list, fmt.Sprintf("%s:%d", node, status))
4919
}
50-
return keys
20+
return strings.Join(list, ",")
5121
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package models
2+
3+
import (
4+
"log"
5+
"time"
6+
)
7+
8+
const (
9+
NodeStatusUp = 1
10+
NodeStatusDown = 0
11+
gossipDeadline = 15 * time.Second
12+
retryPeriod = time.Minute
13+
maxFails = 5
14+
)
15+
16+
func NewNodes(currentNode string, nodeMap NodesMap) *Nodes {
17+
nodes := &Nodes{
18+
current: currentNode,
19+
nodesStatus: NodesMap{},
20+
gossipStatus: map[string]time.Time{},
21+
nodeFails: map[string]int{},
22+
nodeRetries: map[string]time.Time{},
23+
}
24+
nodes.Set(nodeMap)
25+
go nodes.retry()
26+
return nodes
27+
}
28+
29+
type Nodes struct {
30+
current string
31+
nodesStatus NodesMap
32+
gossipStatus map[string]time.Time
33+
nodeFails map[string]int
34+
nodeRetries map[string]time.Time
35+
}
36+
37+
func (n Nodes) Current() string {
38+
return n.current
39+
}
40+
41+
func (n Nodes) Fail(node string) {
42+
n.nodeFails[node]++
43+
if n.nodeFails[node] >= maxFails {
44+
log.Printf("node: %s is down, retryin in: %v", node, retryPeriod)
45+
n.nodesStatus[node] = NodeStatusDown
46+
n.nodeRetries[node] = time.Now().UTC()
47+
}
48+
}
49+
50+
func (n Nodes) retry() {
51+
for {
52+
time.Sleep(time.Second)
53+
for node, lastTried := range n.nodeRetries {
54+
now := time.Now().UTC()
55+
if now.Sub(lastTried) >= retryPeriod {
56+
log.Printf("retrying gossip on node: %s", node)
57+
n.nodeFails[node] = 0
58+
n.nodesStatus[node] = NodeStatusUp
59+
delete(n.nodeRetries, node)
60+
n.gossipStatus[node] = time.Now().UTC()
61+
}
62+
}
63+
}
64+
}
65+
66+
func (n Nodes) Gossip(node string) {
67+
n.gossipStatus[node] = time.Now().UTC()
68+
}
69+
70+
func (n Nodes) Set(nodesStatus NodesMap) {
71+
for node, status := range nodesStatus {
72+
if node == n.current {
73+
continue
74+
}
75+
n.nodesStatus[node] = status
76+
}
77+
}
78+
79+
func (n Nodes) Map() NodesMap {
80+
nodes := NodesMap{n.current: NodeStatusUp}
81+
for node, lastGossip := range n.gossipStatus {
82+
now := time.Now().UTC()
83+
if now.Sub(lastGossip) < gossipDeadline {
84+
nodes[node] = NodeStatusUp
85+
continue
86+
}
87+
nodes[node] = NodeStatusDown
88+
}
89+
return nodes
90+
}
91+
92+
func (n Nodes) List(x int) []string {
93+
return n.list(x, false)
94+
}
95+
96+
func (n Nodes) ListAll() []string {
97+
return n.list(len(n.nodesStatus), false)
98+
}
99+
100+
func (n Nodes) ListActive(x int) []string {
101+
return n.list(x, true)
102+
}
103+
104+
func (n Nodes) list(x int, filterByNodeStatusUp bool) []string {
105+
i, nodeList := 0, make([]string, 0, len(n.nodesStatus))
106+
for node, status := range n.nodesStatus {
107+
if i == x {
108+
break
109+
}
110+
111+
if filterByNodeStatusUp && status != NodeStatusUp {
112+
continue
113+
}
114+
115+
nodeList = append(nodeList, node)
116+
i++
117+
}
118+
return nodeList
119+
}

mutexes/distributed-db/models/requests.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ type SetRequest struct {
1616
}
1717

1818
type GossipRequest struct {
19-
Nodes []string `json:"nodes"`
20-
TokensChecksum string `json:"tokens_checksum"`
19+
Nodes map[string]int `json:"nodes"`
20+
TokensChecksum string `json:"tokens_checksum"`
2121
}

mutexes/distributed-db/models/responses.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package models
22

33
type GossipResponse struct {
4-
Nodes []string `json:"nodes"`
4+
Nodes map[string]int `json:"nodes"`
55
}
66

77
type TokensResponse struct {

mutexes/distributed-db/models/tokens.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ type TokenMappings map[int]string
1717

1818
type Tokens struct {
1919
Mappings TokenMappings
20-
Nodes Nodes
20+
Nodes *Nodes
2121
ranges []int
2222
numberOfTokenRanges int
2323
}
2424

25-
func NewTokens(nodes Nodes, numberOfTokenRanges int) *Tokens {
26-
nodeList := append([]string{nodes.CurrentNode}, nodes.List(len(nodes.Map))...)
25+
func NewTokens(nodes *Nodes, numberOfTokenRanges int) *Tokens {
26+
nodeList := append([]string{nodes.Current()}, nodes.ListAll()...)
2727
tokenRange := math.MaxInt / len(nodeList) / numberOfTokenRanges
2828
ranges := make([]int, 0, len(nodeList)*numberOfTokenRanges)
2929
for i := 0; i < len(nodeList); i++ {
@@ -67,18 +67,18 @@ func (t *Tokens) GetNode(key string) string {
6767

6868
func (t *Tokens) Merge(mappings map[int]string) {
6969
newMappings := map[int]string{}
70-
nodes := t.Nodes.WithCurrentNode()
70+
nodes := t.Nodes.Map()
7171
ranges := t.ranges
7272
m1, m2 := mappings, t.Mappings
7373
if len(mappings) < len(t.Mappings) {
7474
m1 = t.Mappings
7575
m2 = mappings
7676
}
7777

78-
newNodes := map[string]struct{}{}
78+
newNodes := map[string]int{}
7979
newRanges := make([]int, 0)
8080
for _, s := range mappings {
81-
newNodes[s] = struct{}{}
81+
newNodes[s] = NodeStatusUp
8282
}
8383
for r := range m1 {
8484
newRanges = append(newRanges, r)
@@ -92,14 +92,14 @@ func (t *Tokens) Merge(mappings map[int]string) {
9292
}
9393

9494
for s := range nodes {
95-
_, ok := t.Nodes.WithCurrentNode()[s]
95+
_, ok := t.Nodes.Map()[s]
9696
if ok {
9797
delete(nodes, s)
9898
}
9999
}
100100

101101
i := 0
102-
numberOfNodes := len(nodes) + len(t.Nodes.WithCurrentNode())
102+
numberOfNodes := len(nodes) + len(t.Nodes.Map())
103103
tokenRange := math.MaxInt / numberOfNodes / t.numberOfTokenRanges
104104
m1Nodes := map[string]struct{}{}
105105
for r, s := range m1 {

mutexes/distributed-db/services/cache.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type CacheRepository interface {
1616
type HTTPClient interface {
1717
Set(node, key, value string) (models.CacheItem, error)
1818
Get(node string, keys []string) ([]models.CacheItem, error)
19-
Gossip(node string, nodes []string, tokensChecksum string) (oldNodes []string, err error)
19+
Gossip(node string, newNodes models.NodesMap, tokensChecksum string) (oldNodes models.NodesMap, err error)
2020
Tokens(node string) (models.TokenMappings, error)
2121
}
2222

@@ -55,7 +55,7 @@ func (svc CacheSvc) Get(keys []string) []models.CacheItem {
5555

5656
cacheItems := make([]models.CacheItem, 0)
5757
for node, sums := range nodeToSums {
58-
if node == svc.tokens.Nodes.CurrentNode {
58+
if node == svc.tokens.Nodes.Current() {
5959
items := svc.cacheRepo.Get(sums)
6060
for _, item := range items {
6161
item.Node = node
@@ -81,7 +81,7 @@ func (svc CacheSvc) Get(keys []string) []models.CacheItem {
8181

8282
func (svc CacheSvc) Set(key, value string) (models.CacheItem, error) {
8383
node := svc.tokens.GetNode(key)
84-
if node == svc.tokens.Nodes.CurrentNode {
84+
if node == svc.tokens.Nodes.Current() {
8585
item := svc.cacheRepo.Set(key, value)
8686
item.Node = node
8787
return item, nil
@@ -91,33 +91,37 @@ func (svc CacheSvc) Set(key, value string) (models.CacheItem, error) {
9191
}
9292

9393
func (svc CacheSvc) Gossip() {
94-
nodes := svc.tokens.Nodes.List(2)
95-
log.Println("gossiping to:", strings.Join(nodes, ","))
94+
nodes := svc.tokens.Nodes.ListActive(2)
95+
if len(nodes) == 0 {
96+
return
97+
}
9698

97-
allNodes := append([]string{}, svc.tokens.Nodes.List(len(svc.tokens.Nodes.Map))...)
98-
allNodes = append(allNodes, svc.tokens.Nodes.CurrentNode)
99+
log.Println("gossiping to:", strings.Join(nodes, ","))
99100
for _, node := range nodes {
100-
oldNodes, err := svc.httpClient.Gossip(node, allNodes, svc.tokens.Checksum())
101+
oldNodes, err := svc.httpClient.Gossip(node, svc.tokens.Nodes.Map(), svc.tokens.Checksum())
101102
if err != nil {
102103
log.Printf("could not make http call for gossip: %v", err)
104+
svc.tokens.Nodes.Fail(node)
103105
continue
104106
}
105107

106-
svc.tokens.Nodes.Add(oldNodes...)
108+
svc.tokens.Nodes.Set(oldNodes)
107109
}
108110
}
109111

110-
func (svc CacheSvc) UpdateTokens(node string, newNodes []string, tokensChecksum string) ([]string, error) {
111-
oldNodes := svc.tokens.Nodes.List(len(svc.tokens.Nodes.Map))
112+
func (svc CacheSvc) UpdateTokens(node string, newNodes models.NodesMap, tokensChecksum string) (models.NodesMap, error) {
113+
svc.tokens.Nodes.Gossip(node)
114+
svc.tokens.Nodes.Set(newNodes)
115+
112116
if svc.tokens.Checksum() != tokensChecksum {
113-
svc.tokens.Nodes.Add(newNodes...)
114117
tokens, err := svc.httpClient.Tokens(node)
115118
if err != nil {
116-
return []string{}, err
119+
return models.NodesMap{}, err
117120
}
118121
svc.tokens.Merge(tokens)
119122
}
120-
return oldNodes, nil
123+
124+
return svc.tokens.Nodes.Map(), nil
121125
}
122126

123127
func (svc CacheSvc) GetTokens() map[int]string {

0 commit comments

Comments
 (0)