Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
storeset: convert strict mode stuff into a separate type
Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Mar 27, 2020
commit ddab7c9a9f443070233950dc18d814340d499e74
8 changes: 6 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ func runQuery(
)

staticStores, dynamicStores := dns.FilterNodes(storeAddrs...)
evictionPolicy := query.NormalMode
if strictMode {
evictionPolicy = query.StrictMode
}

var (
stores = query.NewStoreSet(
Expand Down Expand Up @@ -268,7 +272,7 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
stores.Update(ctx, strictMode)
stores.Update(ctx, evictionPolicy)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -300,7 +304,7 @@ func runQuery(
continue
}
fileSDCache.Update(update)
stores.Update(ctxUpdate, strictMode)
stores.Update(ctxUpdate, evictionPolicy)
dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), dynamicStores...))
case <-ctxUpdate.Done():
return nil
Expand Down
25 changes: 20 additions & 5 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ type StoreSpec interface {
Static() bool
}

// QuerierEvictionPolicy defines what logic should we use when trying to determine
// which queriers we should remove if they do not pass the health check anymore.
type QuerierEvictionPolicy int

const (
// NormalMode is a querier eviction policy when we immediately
// remove a querier if it doesn't pass the health check.
NormalMode QuerierEvictionPolicy = iota
// StrictMode is a querier eviction policy when we do not remove
// queriers which were statically defined even if they have not passed
// the health check i.e. no service discovery mechanism was used (file
// or DNS sd).
StrictMode
)

type StoreStatus struct {
Name string
LastCheck time.Time
Expand Down Expand Up @@ -328,8 +343,8 @@ func newStoreAPIStats() map[component.StoreAPI]map[string]int {
}

// Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata
// from all stores. If strictMode is true then it keeps around statically defined nodes.
func (s *StoreSet) Update(ctx context.Context, strictMode bool) {
// from all stores. The policy defines how we treat nodes which do not pass the health check.
func (s *StoreSet) Update(ctx context.Context, policy QuerierEvictionPolicy) {
s.updateMtx.Lock()
defer s.updateMtx.Unlock()

Expand All @@ -342,7 +357,7 @@ func (s *StoreSet) Update(ctx context.Context, strictMode bool) {

level.Debug(s.logger).Log("msg", "starting updating storeAPIs", "cachedStores", len(stores))

activeStores := s.getActiveStores(ctx, stores, strictMode)
activeStores := s.getActiveStores(ctx, stores, policy)
level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "activeStores", len(activeStores), "cachedStores", len(stores))

stats := newStoreAPIStats()
Expand Down Expand Up @@ -392,7 +407,7 @@ func (s *StoreSet) Update(ctx context.Context, strictMode bool) {
s.cleanUpStoreStatuses(stores)
}

func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*storeRef, strictMode bool) map[string]*storeRef {
func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*storeRef, policy QuerierEvictionPolicy) map[string]*storeRef {
var (
unique = make(map[string]struct{})
activeStores = make(map[string]*storeRef, len(stores))
Expand Down Expand Up @@ -439,7 +454,7 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store
s.updateStoreStatus(st, err)
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr)

if !(strictMode && spec.Static()) {
if !(policy == StrictMode && spec.Static()) {
return
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func TestStoreSet_Update(t *testing.T) {
defer storeSet.Close()

// Should not matter how many of these we run.
storeSet.Update(context.Background(), false)
storeSet.Update(context.Background(), false)
storeSet.Update(context.Background(), NormalMode)
storeSet.Update(context.Background(), NormalMode)
testutil.Equals(t, 2, len(storeSet.stores))
testutil.Equals(t, 3, len(storeSet.storeStatuses))

Expand All @@ -219,14 +219,14 @@ func TestStoreSet_Update(t *testing.T) {
// Remove address from discovered and reset last check, which should ensure cleanup of status on next update.
storeSet.storeStatuses[discoveredStoreAddr[2]].LastCheck = time.Now().Add(-4 * time.Minute)
discoveredStoreAddr = discoveredStoreAddr[:len(discoveredStoreAddr)-2]
storeSet.Update(context.Background(), false)
storeSet.Update(context.Background(), NormalMode)
testutil.Equals(t, 2, len(storeSet.storeStatuses))

stores.CloseOne(discoveredStoreAddr[0])
delete(expected[component.Sidecar], fmt.Sprintf("{a=\"b\"},{addr=\"%s\"}", discoveredStoreAddr[0]))

// We expect Update to tear down store client for closed store server.
storeSet.Update(context.Background(), false)
storeSet.Update(context.Background(), NormalMode)
testutil.Equals(t, 1, len(storeSet.stores), "only one service should respond just fine, so we expect one client to be ready.")
testutil.Equals(t, 2, len(storeSet.storeStatuses))

Expand Down Expand Up @@ -453,7 +453,7 @@ func TestStoreSet_Update(t *testing.T) {
discoveredStoreAddr = append(discoveredStoreAddr, stores2.StoreAddresses()...)

// New stores should be loaded.
storeSet.Update(context.Background(), false)
storeSet.Update(context.Background(), NormalMode)
testutil.Equals(t, 1+len(stores2.srvs), len(storeSet.stores))

// Check stats.
Expand Down Expand Up @@ -533,8 +533,8 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) {
storeSet.gRPCInfoCallTimeout = 2 * time.Second

// Should not matter how many of these we run.
storeSet.Update(context.Background(), false)
storeSet.Update(context.Background(), false)
storeSet.Update(context.Background(), NormalMode)
storeSet.Update(context.Background(), NormalMode)
testutil.Assert(t, len(storeSet.stores) == 0, "none of services should respond just fine, so we expect no client to be ready.")

// Leak test will ensure that we don't keep client connection around.
Expand Down Expand Up @@ -598,7 +598,7 @@ func TestQuerierStrict(t *testing.T) {
storeSet.gRPCInfoCallTimeout = 1 * time.Second

// Initial update.
storeSet.Update(context.Background(), true)
storeSet.Update(context.Background(), StrictMode)
testutil.Equals(t, 2, len(storeSet.stores), "one client must be available for a running store node")

// The store is statically defined + strict mode is enabled
Expand All @@ -611,9 +611,9 @@ func TestQuerierStrict(t *testing.T) {
st.Close()

// Update again many times. Should not matter WRT the static one.
storeSet.Update(context.Background(), true)
storeSet.Update(context.Background(), true)
storeSet.Update(context.Background(), true)
storeSet.Update(context.Background(), StrictMode)
storeSet.Update(context.Background(), StrictMode)
storeSet.Update(context.Background(), StrictMode)

// Check that the information is the same.
testutil.Equals(t, 1, len(storeSet.stores), "one client must remain available for a store node that is down")
Expand All @@ -622,6 +622,6 @@ func TestQuerierStrict(t *testing.T) {
testutil.NotOk(t, storeSet.storeStatuses[staticStoreAddr].LastError)

// Now let's turn off strict mode. The node should disappear.
storeSet.Update(context.Background(), false)
storeSet.Update(context.Background(), NormalMode)
testutil.Equals(t, 0, len(storeSet.stores), "there are still some stores even though strict mode is off")
}