Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix shuffling of addresses
  • Loading branch information
arjan-bal committed Sep 25, 2025
commit 27b6b6819a042cd109b29b71bafdb1e464c9b37d
83 changes: 83 additions & 0 deletions balancer/pickfirst/pickfirst_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pickfirst_test

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
Expand All @@ -28,11 +29,14 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
pfbalancer "google.golang.org/grpc/balancer/pickfirst"
pfinternal "google.golang.org/grpc/balancer/pickfirst/internal"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
Expand Down Expand Up @@ -463,6 +467,85 @@ func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
}
}

// Tests the PF LB policy with shuffling enabled. It explicitly unsets the
// Endpoints field in the resolver update to test the shuffling of the
// Addresses.
func (s) TestPickFirst_ShuffleAddressListNoEndpoints(t *testing.T) {
// Install a shuffler that always reverses two entries.
origShuf := pfinternal.RandShuffle
defer func() { pfinternal.RandShuffle = origShuf }()
pfinternal.RandShuffle = func(n int, f func(int, int)) {
if n != 2 {
t.Errorf("Shuffle called with n=%v; want 2", n)
return
}
f(0, 1) // reverse the two addresses
}

pfBuilder := balancer.Get(pfbalancer.Name)
shffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": true }`))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol, here and in the next statement, prefer adding the u back into shuffle. Saving one character in the variable name is not going to be very useful :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to using complete words.

if err != nil {
t.Fatal(err)
}
noShffleConfig, err := pfBuilder.(balancer.ConfigParser).ParseConfig(json.RawMessage(`{ "shuffleAddressList": false }`))
if err != nil {
t.Fatal(err)
}
activeCfg := noShffleConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove this line since this assignment is anyways happening on line 521.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to only declaring the variable, but not initializing here.


bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.ChildBalancer = pfBuilder.Build(bd.ClientConn, bd.BuildOptions)
},
Close: func(bd *stub.BalancerData) {
bd.ChildBalancer.Close()
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
ccs.BalancerConfig = activeCfg
ccs.ResolverState.Endpoints = nil
return bd.ChildBalancer.UpdateClientConnState(ccs)
},
}

stub.Register(t.Name(), bf)
svcCfg := fmt.Sprintf(`{ "loadBalancingConfig": [{%q: {}}] }`, t.Name())
// Set up our backends.
cc, r, backends := setupPickFirst(t, 2, grpc.WithDefaultServiceConfig(svcCfg))
addrs := stubBackendsToResolverAddrs(backends)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Push an update with both addresses and shuffling disabled. We should
// connect to backend 0.
activeCfg = noShffleConfig
resolverState := resolver.State{Addresses: addrs}
r.UpdateState(resolverState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a config with shuffling enabled. This will reverse the addresses,
// but the channel should still be connected to backend 0.
activeCfg = shffleConfig
r.UpdateState(resolverState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
}

// Send a resolver update with no addresses. This should push the channel
// into TransientFailure.
r.UpdateState(resolver.State{})
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Send the same config as last time with shuffling enabled. Since we are
// not connected to backend 0, we should connect to backend 1.
r.UpdateState(resolverState)
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
}
}

// Test config parsing with the env var turned on and off for various scenarios.
func (s) TestPickFirst_ParseConfig_Success(t *testing.T) {
// Install a shuffler that always reverses two entries.
Expand Down
2 changes: 1 addition & 1 deletion balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
newAddrs = state.ResolverState.Addresses
if cfg.ShuffleAddressList {
newAddrs = append([]resolver.Address{}, newAddrs...)
internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] })
}
}

Expand Down
Loading