Skip to content

Commit ed643d2

Browse files
committed
adjust set and get operations in the distributed database example
1 parent e12fada commit ed643d2

File tree

8 files changed

+121
-37
lines changed

8 files changed

+121
-37
lines changed

mutexes/distributed-db/clients/http.go

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"net/http"
77
"net/url"
8-
"time"
98

109
"distributed-db/models"
1110
)
@@ -23,11 +22,12 @@ type HTTPClient struct {
2322
httpClient *http.Client
2423
}
2524

26-
func (c *HTTPClient) Get(node string, key string) (models.CacheItem, error) {
27-
body := models.GetRequest{
28-
Keys: []string{key},
25+
func (c *HTTPClient) Set(node string, key, value string) (models.CacheItem, error) {
26+
body := models.SetRequest{
27+
Key: key,
28+
Value: value,
2929
}
30-
req, err := c.makeRequest(http.MethodGet, c.url(node, "get"), body)
30+
req, err := c.makeRequest(http.MethodPost, c.url(node, "set"), body)
3131
if err != nil {
3232
return models.CacheItem{}, err
3333
}
@@ -37,19 +37,40 @@ func (c *HTTPClient) Get(node string, key string) (models.CacheItem, error) {
3737
return models.CacheItem{}, err
3838
}
3939

40-
var cacheItem []models.CacheItem
41-
err = json.NewDecoder(res.Body).Decode(&cacheItem)
40+
var item models.CacheItem
41+
err = json.NewDecoder(res.Body).Decode(&item)
4242
if err != nil {
4343
return models.CacheItem{}, err
4444
}
45+
item.Node = node
46+
47+
return item, nil
48+
}
49+
50+
func (c *HTTPClient) Get(node string, keys []string) ([]models.CacheItem, error) {
51+
body := models.GetRequest{Keys: keys}
52+
req, err := c.makeRequest(http.MethodGet, c.url(node, "get"), body)
53+
if err != nil {
54+
return []models.CacheItem{}, err
55+
}
56+
57+
res, err := c.httpClient.Do(req)
58+
if err != nil {
59+
return []models.CacheItem{}, err
60+
}
61+
62+
var cacheItems []models.CacheItem
63+
err = json.NewDecoder(res.Body).Decode(&cacheItems)
64+
if err != nil {
65+
return []models.CacheItem{}, err
66+
}
4567

46-
return cacheItem[0], nil
68+
return cacheItems, nil
4769
}
4870

4971
func (c *HTTPClient) Gossip(node string, nodes []string, tokensChecksum string) ([]string, error) {
5072
body := models.GossipRequest{
5173
Nodes: nodes,
52-
CreatedAt: time.Now().UTC(),
5374
TokensChecksum: tokensChecksum,
5475
}
5576
req, err := c.makeRequest(http.MethodPost, c.url(node, "gossip"), body)

mutexes/distributed-db/controllers/set.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
type cacheSetter interface {
12-
Set(key, value string)
12+
Set(key, value string) (models.CacheItem, error)
1313
}
1414

1515
func set(svc cacheSetter) http.HandlerFunc {
@@ -22,9 +22,18 @@ func set(svc cacheSetter) http.HandlerFunc {
2222
return
2323
}
2424

25-
svc.Set(req.Key, req.Value)
25+
item, err := svc.Set(req.Key, req.Value)
26+
if err != nil {
27+
log.Printf("could not store cache item: %v", err)
28+
w.WriteHeader(http.StatusInternalServerError)
29+
return
30+
}
2631

27-
log.Printf("successfully stored record with key: %s", req.Key)
28-
w.WriteHeader(http.StatusOK)
32+
log.Printf("successfully stored record with key: %s on: %s", item.Key, item.Node)
33+
w.Header().Set("Content-Type", "application/json")
34+
err = json.NewEncoder(w).Encode(item)
35+
if err != nil {
36+
log.Printf("could not encode set response: %v", err)
37+
}
2938
}
3039
}

mutexes/distributed-db/models/cache.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ type CacheItem struct {
88
Key string `json:"key"`
99
Value string `json:"value"`
1010
UpdatedAt time.Time `json:"updated_at"`
11+
Node string `json:"node,omitempty"`
1112
}

mutexes/distributed-db/models/flags.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ func (n Nodes) String() string {
2121
func (n Nodes) Add(nodes ...string) {
2222
for _, node := range nodes {
2323
if node == n.CurrentNode {
24-
return
24+
continue
2525
}
26+
// update some kind of UpdateAt field for nodes health check
27+
// change from map[string]struct{} to map[string]int => states: Up/Down
28+
// a node is Down if it hasn't sent a gossip request in 10 seconds
2629
n.Map[node] = struct{}{}
2730
}
2831
}

mutexes/distributed-db/models/requests.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package models
22

3-
import (
4-
"time"
5-
)
6-
73
type GetRequest struct {
84
Keys []string `json:"keys"`
95
// how many reads before returning
@@ -20,7 +16,6 @@ type SetRequest struct {
2016
}
2117

2218
type GossipRequest struct {
23-
CreatedAt time.Time `json:"created_at"`
2419
Nodes []string `json:"nodes"`
2520
TokensChecksum string `json:"tokens_checksum"`
2621
}

mutexes/distributed-db/models/tokens.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ func NewTokens(nodes Nodes, numberOfTokenRanges int) *Tokens {
5858
return tokens
5959
}
6060

61-
func (t *Tokens) GetNode(token int) string {
62-
idx := sort.SearchInts(t.ranges, token)
61+
func (t *Tokens) GetNode(key string) string {
62+
token := HashKey(key)
63+
idx := sort.SearchInts(t.ranges, int(token))
6364
node := t.Mappings[t.ranges[idx]]
6465
return node
6566
}
@@ -128,7 +129,7 @@ func (t *Tokens) Checksum() string {
128129
return fmt.Sprintf("%x", sum)
129130
}
130131

131-
func hash(s string) uint64 {
132+
func HashKey(s string) uint64 {
132133
h := fnv.New64a()
133134
_, _ = h.Write([]byte(s))
134135
return h.Sum64()
Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package repositories
22

33
import (
4+
"fmt"
45
"sync"
56
"time"
67

@@ -18,41 +19,44 @@ type Cache struct {
1819
data map[string]models.CacheItem
1920
}
2021

21-
func (c *Cache) Set(key, value string) {
22+
func (c *Cache) Set(key, value string) models.CacheItem {
2223
c.mu.Lock()
2324
defer c.mu.Unlock()
2425

25-
cacheValue := models.CacheItem{
26+
item := models.CacheItem{
2627
Key: key,
2728
Value: value,
2829
UpdatedAt: time.Now().UTC(),
2930
}
30-
c.data[key] = cacheValue
31+
32+
sum := fmt.Sprintf("%d", models.HashKey(key))
33+
c.data[sum] = item
34+
return item
3135
}
3236

3337
func (c *Cache) Get(key string) *models.CacheItem {
3438
c.mu.RLock()
3539
defer c.mu.RUnlock()
3640

37-
val, ok := c.data[key]
41+
item, ok := c.data[key]
3842
if !ok {
3943
return nil
4044
}
4145

42-
return &val
46+
return &item
4347
}
4448

4549
func (c *Cache) GetMany(keys []string) []models.CacheItem {
4650
c.mu.RLock()
4751
defer c.mu.RUnlock()
4852

49-
values := make([]models.CacheItem, 0)
50-
for _, k := range keys {
51-
v, ok := c.data[k]
53+
items := make([]models.CacheItem, 0)
54+
for _, key := range keys {
55+
item, ok := c.data[key]
5256
if ok {
53-
values = append(values, v)
57+
items = append(items, item)
5458
}
5559
}
5660

57-
return values
61+
return items
5862
}

mutexes/distributed-db/services/cache.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package services
22

33
import (
4+
"fmt"
45
"log"
56
"strings"
67

@@ -10,11 +11,12 @@ import (
1011
type CacheRepository interface {
1112
Get(key string) *models.CacheItem
1213
GetMany(keys []string) []models.CacheItem
13-
Set(key, value string)
14+
Set(key, value string) models.CacheItem
1415
}
1516

1617
type HTTPClient interface {
17-
Get(node string, key string) (models.CacheItem, error)
18+
Set(node, key, value string) (models.CacheItem, error)
19+
Get(node string, keys []string) ([]models.CacheItem, error)
1820
Gossip(node string, nodes []string, tokensChecksum string) (oldNodes []string, err error)
1921
Tokens(node string) (models.TokenMappings, error)
2022
}
@@ -34,11 +36,59 @@ type CacheSvc struct {
3436
}
3537

3638
func (svc CacheSvc) Get(keys []string) []models.CacheItem {
37-
return svc.cacheRepo.GetMany(keys)
39+
keyToNode := map[string]string{}
40+
sumToNode := map[string]string{}
41+
for _, key := range keys {
42+
sum := fmt.Sprintf("%d", models.HashKey(key))
43+
node := svc.tokens.GetNode(key)
44+
sumToNode[sum] = node
45+
keyToNode[key] = node
46+
}
47+
48+
nodeToSums := map[string][]string{}
49+
for sum, node := range sumToNode {
50+
nodeToSums[node] = append(nodeToSums[node], sum)
51+
}
52+
nodeToKeys := map[string][]string{}
53+
for key, node := range keyToNode {
54+
nodeToKeys[node] = append(nodeToKeys[node], key)
55+
}
56+
57+
cacheItems := make([]models.CacheItem, 0)
58+
for node, sums := range nodeToSums {
59+
if node == svc.tokens.Nodes.CurrentNode {
60+
items := svc.cacheRepo.GetMany(sums)
61+
for _, item := range items {
62+
item.Node = node
63+
cacheItems = append(cacheItems, item)
64+
}
65+
continue
66+
}
67+
68+
nodeKeys := nodeToKeys[node]
69+
items, err := svc.httpClient.Get(node, nodeKeys)
70+
if err != nil {
71+
log.Printf("could not get cache items from node: %s, %v", node, err)
72+
}
73+
74+
for _, item := range items {
75+
item.Node = node
76+
cacheItems = append(cacheItems, item)
77+
}
78+
}
79+
80+
return cacheItems
3881
}
3982

40-
func (svc CacheSvc) Set(key, value string) {
41-
svc.cacheRepo.Set(key, value)
83+
func (svc CacheSvc) Set(key, value string) (models.CacheItem, error) {
84+
node := svc.tokens.GetNode(key)
85+
if node == svc.tokens.Nodes.CurrentNode {
86+
item := svc.cacheRepo.Set(key, value)
87+
item.Node = node
88+
return item, nil
89+
}
90+
91+
return svc.httpClient.Set(node, key, value)
4292
}
4393

4494
func (svc CacheSvc) Gossip() {

0 commit comments

Comments
 (0)