Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
store: proxy: add test for deadlocking problem
Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Apr 10, 2020
commit fb39b30fbf02f7740d60df0e0d809b40e1335c67
66 changes: 65 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,60 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
expectedErr error
expectedWarningsLen int
}{
{
title: "partial response disabled; 1st errors out after some delay; 2nd store is fast",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storepb.NewWarnSeriesResponse(errors.New("warning")),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
RespDuration: 2 * time.Second,
SlowSeriesIndex: 1,
injectedError: errors.New("test"),
injectedErrorIndex: 1,
},
labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}},
minTime: 1,
maxTime: 300,
},
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storepb.NewWarnSeriesResponse(errors.New("warning")),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),

storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),

storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
},
},
labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}},
minTime: 1,
maxTime: 300,
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseDisabled: true,
},
expectedErr: errors.New("test: receive series from test: test"),
},
{
title: "partial response disabled; 1st store is slow, 2nd store is fast;",
storeAPIs: []Client{
Expand Down Expand Up @@ -1349,6 +1403,10 @@ type mockedStoreAPI struct {
LastSeriesReq *storepb.SeriesRequest
LastLabelValuesReq *storepb.LabelValuesRequest
LastLabelNamesReq *storepb.LabelNamesRequest

// injectedError will be injected into Recv() if not nil.
injectedError error
injectedErrorIndex int
}

func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ ...grpc.CallOption) (*storepb.InfoResponse, error) {
Expand All @@ -1358,7 +1416,7 @@ func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ .
func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) {
s.LastSeriesReq = req

return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError
return &StoreSeriesClient{injectedErrorIndex: s.injectedErrorIndex, injectedError: s.injectedError, ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError
}

func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) {
Expand All @@ -1382,12 +1440,18 @@ type StoreSeriesClient struct {
respSet []*storepb.SeriesResponse
respDur time.Duration
slowSeriesIndex int

injectedError error
injectedErrorIndex int
}

func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) {
if c.respDur != 0 && (c.slowSeriesIndex == c.i || c.slowSeriesIndex == 0) {
time.Sleep(c.respDur)
}
if c.injectedError != nil && (c.injectedErrorIndex == c.i || c.injectedErrorIndex == 0) {
return nil, c.injectedError
}

if c.i >= len(c.respSet) {
return nil, io.EOF
Expand Down