Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ type EndpointStatus struct {
MaxTime int64 `json:"maxTime"`
}

func newFailedConnectStatus(name string, lastCheck time.Time) EndpointStatus {
return EndpointStatus{
Name: name,
LastCheck: lastCheck,
LabelSets: make([]labels.Labels, 0),
LastError: &stringError{originalErr: errors.New("failed to connect")},
ComponentType: component.UnknownStoreAPI,
MinTime: math.MinInt64,
MaxTime: math.MaxInt32,
}
}

// endpointSetNodeCollector is a metric collector reporting the number of available storeAPIs for Querier.
// A Collector is required as we want atomic updates for all 'thanos_store_nodes_grpc_connections' series.
// TODO(hitanshu-mehta) Currently,only collecting metrics of storeEndpoints. Make this struct generic.
Expand Down Expand Up @@ -295,6 +307,8 @@ type EndpointSet struct {
endpointsMtx sync.RWMutex
endpoints map[string]*endpointRef
endpointsMetric *endpointSetNodeCollector

failedConnectStatuses []EndpointStatus
}

// nowFunc is a function that returns time.Time.
Expand Down Expand Up @@ -354,9 +368,10 @@ func (e *EndpointSet) Update(ctx context.Context) {
level.Debug(e.logger).Log("msg", "starting to update API endpoints", "cachedEndpoints", len(e.endpoints))

var (
newRefs = make(map[string]*endpointRef)
existingRefs = make(map[string]*endpointRef)
staleRefs = make(map[string]*endpointRef)
newRefs = make(map[string]*endpointRef)
existingRefs = make(map[string]*endpointRef)
staleRefs = make(map[string]*endpointRef)
failedConnects = make([]EndpointStatus, 0)

wg sync.WaitGroup
mu sync.Mutex
Expand Down Expand Up @@ -389,6 +404,9 @@ func (e *EndpointSet) Update(ctx context.Context) {

newRef, err := e.newEndpointRef(ctx, spec)
if err != nil {
if spec.isStrictStatic {
failedConnects = append(failedConnects, newFailedConnectStatus(spec.Addr(), time.Now()))
}
level.Warn(e.logger).Log("msg", "new endpoint creation failed", "err", err, "address", spec.Addr())
return
}
Expand Down Expand Up @@ -430,6 +448,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
er.Close()
delete(e.endpoints, addr)
}
e.failedConnectStatuses = failedConnects
level.Debug(e.logger).Log("msg", "updated endpoints", "activeEndpoints", len(e.endpoints))

// Update stats.
Expand Down Expand Up @@ -524,6 +543,16 @@ func (e *EndpointSet) GetStoreClients() []store.Client {
er.mtx.RUnlock()
}
}

e.endpointsMtx.RLock()
defer e.endpointsMtx.RUnlock()
for _, status := range e.failedConnectStatuses {
stores = append(stores, &endpointRef{
StoreClient: storepb.NewDisconnectedClient(),
addr: status.Name,
})
}

return stores
}

Expand Down Expand Up @@ -620,6 +649,7 @@ func (e *EndpointSet) GetEndpointStatus() []EndpointStatus {
statuses = append(statuses, *status)
}
}
statuses = append(statuses, e.failedConnectStatuses...)

sort.Slice(statuses, func(i, j int) bool {
return statuses[i].Name < statuses[j].Name
Expand Down
16 changes: 16 additions & 0 deletions pkg/query/endpointset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/thanos/pkg/store"

"golang.org/x/sync/errgroup"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/efficientgo/core/testutil"
"github.com/pkg/errors"
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -566,6 +568,20 @@ func TestEndpointSetUpdate_StrictEndpointMetadata(t *testing.T) {
testutil.Equals(t, info.Store.MaxTime, endpointSet.endpoints[addr].metadata.Store.MaxTime)
}

func TestEndpointSetUpdate_ConnectToStrictEndpoint(t *testing.T) {
addr := "0.0.0.0:1234"
endpointSet := makeEndpointSet([]string{addr}, true, time.Now)
defer endpointSet.Close()

endpointSet.Update(context.Background())
testutil.Equals(t, 1, len(endpointSet.GetStoreClients()))
testutil.Equals(t, 1, len(endpointSet.GetEndpointStatus()))

client := endpointSet.GetStoreClients()[0]
_, err := client.Series(context.Background(), &storepb.SeriesRequest{})
testutil.NotOk(t, err)
}

func TestEndpointSetUpdate_PruneInactiveEndpoints(t *testing.T) {
testCases := []struct {
name string
Expand Down
35 changes: 35 additions & 0 deletions pkg/store/storepb/disconnected_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storepb

import (
"context"

"github.com/pkg/errors"
"google.golang.org/grpc"
)

var ErrStoreNotConnected = errors.New("store is not connected")

type disconnectedClient struct{}

func NewDisconnectedClient() *disconnectedClient {
return &disconnectedClient{}
}

func (d disconnectedClient) Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) {
return nil, ErrStoreNotConnected
}

func (d disconnectedClient) Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (Store_SeriesClient, error) {
return nil, ErrStoreNotConnected
}

func (d disconnectedClient) LabelNames(ctx context.Context, in *LabelNamesRequest, opts ...grpc.CallOption) (*LabelNamesResponse, error) {
return nil, ErrStoreNotConnected
}

func (d disconnectedClient) LabelValues(ctx context.Context, in *LabelValuesRequest, opts ...grpc.CallOption) (*LabelValuesResponse, error) {
return nil, ErrStoreNotConnected
}