Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion balancer/pickfirst/pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
addrs = state.ResolverState.Addresses
if cfg.ShuffleAddressList {
addrs = append([]resolver.Address{}, addrs...)
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
internal.RandShuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
}

Expand Down
83 changes: 83 additions & 0 deletions balancer/pickfirst/pickfirst_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pickfirst_test

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
Expand All @@ -28,11 +29,14 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
pfbalancer "google.golang.org/grpc/balancer/pickfirst"
pfinternal "google.golang.org/grpc/balancer/pickfirst/internal"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
Expand Down Expand Up @@ -463,6 +467,85 @@ func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
}
}

// Tests the PF LB policy with shuffling enabled. It explicitly unsets the
// Endpoints field in the resolver update to test the shuffling of the
// Addresses.
func (s) TestPickFirst_ShuffleAddressListNoEndpoints(t *testing.T) {
// Install a shuffler that always reverses two entries.
origShuf := pfinternal.RandShuffle
defer func() { pfinternal.RandShuffle = origShuf }()
pfinternal.RandShuffle = func(n int, f func(int, int)) {
if n != 2 {
t.Errorf("Shuffle called with n=%v; want 2", n)
return
}
f(0, 1) // reverse the two addresses
}

pfBuilder := balancer.Get(pfbalancer.Name)
shuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": true }`))
if err != nil {
t.Fatal(err)
}
noShuffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": false }`))
if err != nil {
t.Fatal(err)
}
var activeCfg serviceconfig.LoadBalancingConfig

bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.ChildBalancer = pfBuilder.Build(bd.ClientConn, bd.BuildOptions)
},
Close: func(bd *stub.BalancerData) {
bd.ChildBalancer.Close()
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
ccs.BalancerConfig = activeCfg
ccs.ResolverState.Endpoints = nil
return bd.ChildBalancer.UpdateClientConnState(ccs)
},
}

stub.Register(t.Name(), bf)
svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())
// Set up our backends.
cc, r, backends := setupPickFirst(t, 2, grpc.WithDefaultServiceConfig(svcCfg))
addrs := stubBackendsToResolverAddrs(backends)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Push an update with both addresses and shuffling disabled. We should
// connect to backend 0.
activeCfg = noShuffleConfig
resolverState := resolver.State{Addresses: addrs}
r.UpdateState(resolverState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a config with shuffling enabled. This will reverse the addresses,
// but the channel should still be connected to backend 0.
activeCfg = shuffleConfig
r.UpdateState(resolverState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a resolver update with no addresses. This should push the channel
// into TransientFailure.
r.UpdateState(resolver.State{})
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Send the same config as last time with shuffling enabled. Since we are
// not connected to backend 0, we should connect to backend 1.
r.UpdateState(resolverState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}

// Test config parsing with the env var turned on and off for various scenarios.
func (s) TestPickFirst_ParseConfig_Success(t *testing.T) {
// Install a shuffler that always reverses two entries.
Expand Down
2 changes: 1 addition & 1 deletion balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
newAddrs = state.ResolverState.Addresses
if cfg.ShuffleAddressList {
newAddrs = append([]resolver.Address{}, newAddrs...)
internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] })
}
}

Expand Down