diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 8c5d41c820f..e6c9beddd26 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -189,6 +189,18 @@ type EndpointStatus struct { MaxTime int64 `json:"maxTime"` } +func newFailedConnectStatus(name string, lastCheck time.Time) EndpointStatus { + return EndpointStatus{ + Name: name, + LastCheck: lastCheck, + LabelSets: make([]labels.Labels, 0), + LastError: &stringError{originalErr: errors.New("failed to connect")}, + ComponentType: component.UnknownStoreAPI, + MinTime: math.MinInt64, + MaxTime: math.MaxInt32, + } +} + // endpointSetNodeCollector is a metric collector reporting the number of available storeAPIs for Querier. // A Collector is required as we want atomic updates for all 'thanos_store_nodes_grpc_connections' series. // TODO(hitanshu-mehta) Currently,only collecting metrics of storeEndpoints. Make this struct generic. @@ -295,6 +307,8 @@ type EndpointSet struct { endpointsMtx sync.RWMutex endpoints map[string]*endpointRef endpointsMetric *endpointSetNodeCollector + + failedConnectStatuses []EndpointStatus } // nowFunc is a function that returns time.Time. @@ -354,9 +368,10 @@ func (e *EndpointSet) Update(ctx context.Context) { level.Debug(e.logger).Log("msg", "starting to update API endpoints", "cachedEndpoints", len(e.endpoints)) var ( - newRefs = make(map[string]*endpointRef) - existingRefs = make(map[string]*endpointRef) - staleRefs = make(map[string]*endpointRef) + newRefs = make(map[string]*endpointRef) + existingRefs = make(map[string]*endpointRef) + staleRefs = make(map[string]*endpointRef) + failedConnects = make([]EndpointStatus, 0) wg sync.WaitGroup mu sync.Mutex @@ -389,6 +404,9 @@ func (e *EndpointSet) Update(ctx context.Context) { newRef, err := e.newEndpointRef(ctx, spec) if err != nil { + if spec.isStrictStatic { + failedConnects = append(failedConnects, newFailedConnectStatus(spec.Addr(), time.Now())) + } level.Warn(e.logger).Log("msg", "new endpoint creation failed", "err", err, "address", spec.Addr()) return } @@ -430,6 +448,7 @@ func (e *EndpointSet) Update(ctx context.Context) { er.Close() delete(e.endpoints, addr) } + e.failedConnectStatuses = failedConnects level.Debug(e.logger).Log("msg", "updated endpoints", "activeEndpoints", len(e.endpoints)) // Update stats. @@ -524,6 +543,16 @@ func (e *EndpointSet) GetStoreClients() []store.Client { er.mtx.RUnlock() } } + + e.endpointsMtx.RLock() + defer e.endpointsMtx.RUnlock() + for _, status := range e.failedConnectStatuses { + stores = append(stores, &endpointRef{ + StoreClient: storepb.NewDisconnectedClient(), + addr: status.Name, + }) + } + return stores } @@ -620,6 +649,7 @@ func (e *EndpointSet) GetEndpointStatus() []EndpointStatus { statuses = append(statuses, *status) } } + statuses = append(statuses, e.failedConnectStatuses...) sort.Slice(statuses, func(i, j int) bool { return statuses[i].Name < statuses[j].Name diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index d41d2a76d7d..a340fc585ed 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/store" "golang.org/x/sync/errgroup" @@ -26,6 +27,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/pkg/errors" promtestutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -566,6 +568,20 @@ func TestEndpointSetUpdate_StrictEndpointMetadata(t *testing.T) { testutil.Equals(t, info.Store.MaxTime, endpointSet.endpoints[addr].metadata.Store.MaxTime) } +func TestEndpointSetUpdate_ConnectToStrictEndpoint(t *testing.T) { + addr := "0.0.0.0:1234" + endpointSet := makeEndpointSet([]string{addr}, true, time.Now) + defer endpointSet.Close() + + endpointSet.Update(context.Background()) + testutil.Equals(t, 1, len(endpointSet.GetStoreClients())) + testutil.Equals(t, 1, len(endpointSet.GetEndpointStatus())) + + client := endpointSet.GetStoreClients()[0] + _, err := client.Series(context.Background(), &storepb.SeriesRequest{}) + testutil.NotOk(t, err) +} + func TestEndpointSetUpdate_PruneInactiveEndpoints(t *testing.T) { testCases := []struct { name string diff --git a/pkg/store/storepb/disconnected_client.go b/pkg/store/storepb/disconnected_client.go new file mode 100644 index 00000000000..444f58ff61d --- /dev/null +++ b/pkg/store/storepb/disconnected_client.go @@ -0,0 +1,35 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storepb + +import ( + "context" + + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +var ErrStoreNotConnected = errors.New("store is not connected") + +type disconnectedClient struct{} + +func NewDisconnectedClient() *disconnectedClient { + return &disconnectedClient{} +} + +func (d disconnectedClient) Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) { + return nil, ErrStoreNotConnected +} + +func (d disconnectedClient) Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (Store_SeriesClient, error) { + return nil, ErrStoreNotConnected +} + +func (d disconnectedClient) LabelNames(ctx context.Context, in *LabelNamesRequest, opts ...grpc.CallOption) (*LabelNamesResponse, error) { + return nil, ErrStoreNotConnected +} + +func (d disconnectedClient) LabelValues(ctx context.Context, in *LabelValuesRequest, opts ...grpc.CallOption) (*LabelValuesResponse, error) { + return nil, ErrStoreNotConnected +}