diff --git a/balancer/pickfirst/pickfirst.go b/balancer/pickfirst/pickfirst.go index ea8899818c22..b15c10e46b0a 100644 --- a/balancer/pickfirst/pickfirst.go +++ b/balancer/pickfirst/pickfirst.go @@ -169,7 +169,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState addrs = state.ResolverState.Addresses if cfg.ShuffleAddressList { addrs = append([]resolver.Address{}, addrs...) - rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] }) + internal.RandShuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] }) } } diff --git a/balancer/pickfirst/pickfirst_ext_test.go b/balancer/pickfirst/pickfirst_ext_test.go index 81691fc806ab..207fc4316f98 100644 --- a/balancer/pickfirst/pickfirst_ext_test.go +++ b/balancer/pickfirst/pickfirst_ext_test.go @@ -20,6 +20,7 @@ package pickfirst_test import ( "context" + "encoding/json" "errors" "fmt" "strings" @@ -28,11 +29,14 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/balancer" + pfbalancer "google.golang.org/grpc/balancer/pickfirst" pfinternal "google.golang.org/grpc/balancer/pickfirst/internal" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" @@ -463,6 +467,85 @@ func (s) TestPickFirst_ShuffleAddressList(t *testing.T) { } } +// Tests the PF LB policy with shuffling enabled. It explicitly unsets the +// Endpoints field in the resolver update to test the shuffling of the +// Addresses. +func (s) TestPickFirst_ShuffleAddressListNoEndpoints(t *testing.T) { + // Install a shuffler that always reverses two entries. + origShuf := pfinternal.RandShuffle + defer func() { pfinternal.RandShuffle = origShuf }() + pfinternal.RandShuffle = func(n int, f func(int, int)) { + if n != 2 { + t.Errorf("Shuffle called with n=%v; want 2", n) + return + } + f(0, 1) // reverse the two addresses + } + + pfBuilder := balancer.Get(pfbalancer.Name) + shuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": true }`)) + if err != nil { + t.Fatal(err) + } + noShuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": false }`)) + if err != nil { + t.Fatal(err) + } + var activeCfg serviceconfig.LoadBalancingConfig + + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bd.ChildBalancer = pfBuilder.Build(bd.ClientConn, bd.BuildOptions) + }, + Close: func(bd *stub.BalancerData) { + bd.ChildBalancer.Close() + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + ccs.BalancerConfig = activeCfg + ccs.ResolverState.Endpoints = nil + return bd.ChildBalancer.UpdateClientConnState(ccs) + }, + } + + stub.Register(t.Name(), bf) + svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name()) + // Set up our backends. + cc, r, backends := setupPickFirst(t, 2, grpc.WithDefaultServiceConfig(svcCfg)) + addrs := stubBackendsToResolverAddrs(backends) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Push an update with both addresses and shuffling disabled. We should + // connect to backend 0. + activeCfg = noShuffleConfig + resolverState := resolver.State{Addresses: addrs} + r.UpdateState(resolverState) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + // Send a config with shuffling enabled. This will reverse the addresses, + // but the channel should still be connected to backend 0. + activeCfg = shuffleConfig + r.UpdateState(resolverState) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil { + t.Fatal(err) + } + + // Send a resolver update with no addresses. This should push the channel + // into TransientFailure. + r.UpdateState(resolver.State{}) + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) + + // Send the same config as last time with shuffling enabled. Since we are + // not connected to backend 0, we should connect to backend 1. + r.UpdateState(resolverState) + if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil { + t.Fatal(err) + } +} + // Test config parsing with the env var turned on and off for various scenarios. func (s) TestPickFirst_ParseConfig_Success(t *testing.T) { // Install a shuffler that always reverses two entries. diff --git a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 67f315a0dbc4..f5f5afec30a4 100644 --- a/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -283,7 +283,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState newAddrs = state.ResolverState.Addresses if cfg.ShuffleAddressList { newAddrs = append([]resolver.Address{}, newAddrs...) - internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) + internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] }) } }