Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Refactored code
Signed-off-by: Waleed Malik <[email protected]>
  • Loading branch information
ahmedwaleedmalik committed Aug 16, 2023
commit 39e5ceec87bc0985767a35390e044b4dbd7935f0
26 changes: 13 additions & 13 deletions pkg/cloudprovider/provider/gce/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ func (p *Provider) Validate(_ context.Context, _ *zap.SugaredLogger, spec cluste
}

// Get retrieves a node instance that is associated with the given machine.
func (p *Provider) Get(_ context.Context, _ *zap.SugaredLogger, machine *clusterv1alpha1.Machine, _ *cloudprovidertypes.ProviderData) (instance.Instance, error) {
return p.get(machine)
func (p *Provider) Get(ctx context.Context, _ *zap.SugaredLogger, machine *clusterv1alpha1.Machine, _ *cloudprovidertypes.ProviderData) (instance.Instance, error) {
return p.get(ctx, machine)
}

func (p *Provider) get(machine *clusterv1alpha1.Machine) (*googleInstance, error) {
func (p *Provider) get(ctx context.Context, machine *clusterv1alpha1.Machine) (*googleInstance, error) {
// Read configuration.
cfg, err := newConfig(p.resolver, machine.Spec.ProviderSpec)
if err != nil {
return nil, newError(common.InvalidConfigurationMachineError, errMachineSpec, err)
}
// Connect to Google compute.
svc, err := connectComputeService(cfg)
svc, err := connectComputeService(ctx, cfg)
if err != nil {
return nil, newError(common.InvalidConfigurationMachineError, errConnect, err)
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func (p *Provider) Create(ctx context.Context, log *zap.SugaredLogger, machine *
return nil, newError(common.InvalidConfigurationMachineError, errMachineSpec, err)
}
// Connect to Google compute.
svc, err := connectComputeService(cfg)
svc, err := connectComputeService(ctx, cfg)
if err != nil {
return nil, newError(common.InvalidConfigurationMachineError, errConnect, err)
}
Expand Down Expand Up @@ -295,7 +295,7 @@ func (p *Provider) Create(ctx context.Context, log *zap.SugaredLogger, machine *
if err != nil {
return nil, newError(common.InvalidConfigurationMachineError, errInsertInstance, err)
}
err = svc.waitZoneOperation(cfg, op.Name)
err = svc.waitZoneOperation(ctx, cfg, op.Name)
if err != nil {
return nil, newError(common.InvalidConfigurationMachineError, errInsertInstance, err)
}
Expand All @@ -304,14 +304,14 @@ func (p *Provider) Create(ctx context.Context, log *zap.SugaredLogger, machine *
}

// Cleanup deletes the instance associated with the machine and all associated resources.
func (p *Provider) Cleanup(_ context.Context, _ *zap.SugaredLogger, machine *clusterv1alpha1.Machine, _ *cloudprovidertypes.ProviderData) (bool, error) {
func (p *Provider) Cleanup(ctx context.Context, _ *zap.SugaredLogger, machine *clusterv1alpha1.Machine, _ *cloudprovidertypes.ProviderData) (bool, error) {
// Read configuration.
cfg, err := newConfig(p.resolver, machine.Spec.ProviderSpec)
if err != nil {
return false, newError(common.InvalidConfigurationMachineError, errMachineSpec, err)
}
// Connect to Google compute.
svc, err := connectComputeService(cfg)
svc, err := connectComputeService(ctx, cfg)
if err != nil {
return false, newError(common.InvalidConfigurationMachineError, errConnect, err)
}
Expand All @@ -326,7 +326,7 @@ func (p *Provider) Cleanup(_ context.Context, _ *zap.SugaredLogger, machine *clu
}
return false, newError(common.InvalidConfigurationMachineError, errDeleteInstance, err)
}
err = svc.waitZoneOperation(cfg, op.Name)
err = svc.waitZoneOperation(ctx, cfg, op.Name)
if err != nil {
return false, newError(common.InvalidConfigurationMachineError, errDeleteInstance, err)
}
Expand Down Expand Up @@ -355,19 +355,19 @@ func (p *Provider) MachineMetricsLabels(machine *clusterv1alpha1.Machine) (map[s

// MigrateUID updates the UID of an instance after the controller migrates types
// and the UID of the machine object changed.
func (p *Provider) MigrateUID(_ context.Context, _ *zap.SugaredLogger, machine *clusterv1alpha1.Machine, newUID types.UID) error {
func (p *Provider) MigrateUID(ctx context.Context, _ *zap.SugaredLogger, machine *clusterv1alpha1.Machine, newUID types.UID) error {
// Read configuration.
cfg, err := newConfig(p.resolver, machine.Spec.ProviderSpec)
if err != nil {
return newError(common.InvalidConfigurationMachineError, errMachineSpec, err)
}
// Connect to Google compute.
svc, err := connectComputeService(cfg)
svc, err := connectComputeService(ctx, cfg)
if err != nil {
return newError(common.InvalidConfigurationMachineError, errConnect, err)
}
// Retrieve instance.
inst, err := p.get(machine)
inst, err := p.get(ctx, machine)
if err != nil {
if errors.Is(err, cloudprovidererrors.ErrInstanceNotFound) {
return nil
Expand All @@ -389,7 +389,7 @@ func (p *Provider) MigrateUID(_ context.Context, _ *zap.SugaredLogger, machine *
if err != nil {
return newError(common.InvalidConfigurationMachineError, errSetLabels, err)
}
err = svc.waitZoneOperation(cfg, op.Name)
err = svc.waitZoneOperation(ctx, cfg, op.Name)
if err != nil {
return newError(common.InvalidConfigurationMachineError, errSetLabels, err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/cloudprovider/provider/gce/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type service struct {
}

// connectComputeService establishes a service connection to the Compute Engine.
func connectComputeService(cfg *config) (*service, error) {
func connectComputeService(ctx context.Context, cfg *config) (*service, error) {
if cfg.clientConfig != nil &&
cfg.clientConfig.TokenSource != nil {
client := oauth2.NewClient(context.Background(), cfg.clientConfig.TokenSource)
svc, err := compute.NewService(context.Background(), option.WithHTTPClient(client))
client := oauth2.NewClient(ctx, cfg.clientConfig.TokenSource)
svc, err := compute.NewService(ctx, option.WithHTTPClient(client))
if err != nil {
return nil, fmt.Errorf("cannot connect to Google Cloud: %w", err)
}
Expand Down Expand Up @@ -139,18 +139,18 @@ func (svc *service) attachedDisks(cfg *config) ([]*compute.AttachedDisk, error)
}

// waitZoneOperation waits for a GCE operation in a zone to be completed or timed out.
func (svc *service) waitZoneOperation(cfg *config, opName string) error {
return svc.waitOperation(func() (*compute.Operation, error) {
func (svc *service) waitZoneOperation(ctx context.Context, cfg *config, opName string) error {
return svc.waitOperation(ctx, func() (*compute.Operation, error) {
return svc.ZoneOperations.Get(cfg.projectID, cfg.zone, opName).Do()
})
}

// waitOperation waits for a GCE operation to be completed or timed out.
func (svc *service) waitOperation(refreshOperation func() (*compute.Operation, error)) error {
func (svc *service) waitOperation(ctx context.Context, refreshOperation func() (*compute.Operation, error)) error {
var op *compute.Operation
var err error

return wait.PollUntilContextTimeout(context.TODO(), pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
return wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
op, err = refreshOperation()
if err != nil {
return false, err
Expand Down
10 changes: 5 additions & 5 deletions pkg/cloudprovider/provider/openstack/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
type clientGetterFunc func(c *Config) (*gophercloud.ProviderClient, error)

// portReadinessWaiterFunc waits for the port with the given ID to be available.
type portReadinessWaiterFunc func(instanceLog *zap.SugaredLogger, netClient *gophercloud.ServiceClient, serverID string, networkID string, instanceReadyCheckPeriod time.Duration, instanceReadyCheckTimeout time.Duration) error
type portReadinessWaiterFunc func(ctx context.Context, instanceLog *zap.SugaredLogger, netClient *gophercloud.ServiceClient, serverID string, networkID string, instanceReadyCheckPeriod time.Duration, instanceReadyCheckTimeout time.Duration) error

type provider struct {
configVarResolver *providerconfig.ConfigVarResolver
Expand Down Expand Up @@ -556,7 +556,7 @@ func (p *provider) Validate(_ context.Context, _ *zap.SugaredLogger, spec cluste
return nil
}

func (p *provider) Create(_ context.Context, log *zap.SugaredLogger, machine *clusterv1alpha1.Machine, data *cloudprovidertypes.ProviderData, userdata string) (instance.Instance, error) {
func (p *provider) Create(ctx context.Context, log *zap.SugaredLogger, machine *clusterv1alpha1.Machine, data *cloudprovidertypes.ProviderData, userdata string) (instance.Instance, error) {
cfg, _, _, err := p.getConfig(machine.Spec.ProviderSpec)
if err != nil {
return nil, cloudprovidererrors.TerminalError{
Expand Down Expand Up @@ -672,7 +672,7 @@ func (p *provider) Create(_ context.Context, log *zap.SugaredLogger, machine *cl
if cfg.FloatingIPPool != "" {
instanceLog := log.With("instance", server.ID)

if err := p.portReadinessWaiter(instanceLog, netClient, server.ID, network.ID, cfg.InstanceReadyCheckPeriod, cfg.InstanceReadyCheckTimeout); err != nil {
if err := p.portReadinessWaiter(ctx, instanceLog, netClient, server.ID, network.ID, cfg.InstanceReadyCheckPeriod, cfg.InstanceReadyCheckTimeout); err != nil {
instanceLog.Infow("Port for instance did not became active", zap.Error(err))
}

Expand All @@ -686,7 +686,7 @@ func (p *provider) Create(_ context.Context, log *zap.SugaredLogger, machine *cl
return &osInstance{server: &server}, nil
}

func waitForPort(instanceLog *zap.SugaredLogger, netClient *gophercloud.ServiceClient, serverID string, networkID string, checkPeriod time.Duration, checkTimeout time.Duration) error {
func waitForPort(ctx context.Context, instanceLog *zap.SugaredLogger, netClient *gophercloud.ServiceClient, serverID string, networkID string, checkPeriod time.Duration, checkTimeout time.Duration) error {
started := time.Now()
instanceLog.Info("Waiting for the port to become active...")

Expand All @@ -705,7 +705,7 @@ func waitForPort(instanceLog *zap.SugaredLogger, netClient *gophercloud.ServiceC
return port.Status == "ACTIVE", nil
}

if err := wait.PollUntilContextTimeout(context.Background(), checkPeriod, checkTimeout, false, portIsReady); err != nil {
if err := wait.PollUntilContextTimeout(ctx, checkPeriod, checkTimeout, false, portIsReady); err != nil {
if wait.Interrupted(err) {
// In case we have a timeout, include the timeout details
return fmt.Errorf("instance port became not active after %f seconds", checkTimeout.Seconds())
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/provider/openstack/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestCreateServer(t *testing.T) {
return pc.ProviderClient, nil
},
// mock server readiness checker
portReadinessWaiter: func(*zap.SugaredLogger, *gophercloud.ServiceClient, string, string, time.Duration, time.Duration) error {
portReadinessWaiter: func(context.Context, *zap.SugaredLogger, *gophercloud.ServiceClient, string, string, time.Duration, time.Duration) error {
return nil
},
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/provisioning/all_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ C8QmzsMaZhk+mVFr1sGy

// wait for deployments to roll out
for _, deployment := range deployments {
if err := wait.PollUntilContextTimeout(context.Background(), 3*time.Second, 30*time.Second, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 3*time.Second, 30*time.Second, false, func(ctx context.Context) (bool, error) {
d := &appsv1.Deployment{}
key := types.NamespacedName{Namespace: ns, Name: deployment}

Expand Down
24 changes: 13 additions & 11 deletions test/e2e/provisioning/deploymentscenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ func verifyCreateUpdateAndDelete(kubeConfig, manifestPath string, parameters []s
if err != nil {
return err
}
ctx := context.Background()

// This test inherently relies on replicas being one so we enforce that
machineDeployment.Spec.Replicas = getInt32Ptr(1)

machineDeployment, err = createAndAssure(machineDeployment, client, timeout)
machineDeployment, err = createAndAssure(ctx, machineDeployment, client, timeout)
if err != nil {
return fmt.Errorf("failed to verify creation of node for MachineDeployment: %w", err)
}
Expand All @@ -50,7 +52,7 @@ func verifyCreateUpdateAndDelete(kubeConfig, manifestPath string, parameters []s

klog.Infof("Waiting for second MachineSet to appear after updating MachineDeployment %s", machineDeployment.Name)
var machineSets []clusterv1alpha1.MachineSet
if err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
machineSets, err = getMatchingMachineSets(machineDeployment, client)
if err != nil {
return false, err
Expand Down Expand Up @@ -79,7 +81,7 @@ func verifyCreateUpdateAndDelete(kubeConfig, manifestPath string, parameters []s
oldMachineSet = machineSets[1]
}
var machines []clusterv1alpha1.Machine
if err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
machines, err = getMatchingMachinesForMachineset(&newestMachineSet, client)
if err != nil {
return false, err
Expand All @@ -94,18 +96,18 @@ func verifyCreateUpdateAndDelete(kubeConfig, manifestPath string, parameters []s
klog.Infof("New MachineSet %s appeared with %v machines", newestMachineSet.Name, len(machines))

klog.Infof("Waiting for new MachineSet %s to get a ready node", newestMachineSet.Name)
if err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
return hasMachineReadyNode(&machines[0], client)
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
return hasMachineReadyNode(ctx, &machines[0], client)
}); err != nil {
return err
}
klog.Infof("Found ready node for MachineSet %s", newestMachineSet.Name)

klog.Infof("Waiting for old MachineSet %s to be scaled down and have no associated machines",
oldMachineSet.Name)
if err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
machineSet := &clusterv1alpha1.MachineSet{}
if err := client.Get(context.Background(), types.NamespacedName{Namespace: oldMachineSet.Namespace, Name: oldMachineSet.Name}, machineSet); err != nil {
if err := client.Get(ctx, types.NamespacedName{Namespace: oldMachineSet.Namespace, Name: oldMachineSet.Name}, machineSet); err != nil {
return false, err
}
if *machineSet.Spec.Replicas != int32(0) {
Expand All @@ -130,7 +132,7 @@ func verifyCreateUpdateAndDelete(kubeConfig, manifestPath string, parameters []s
klog.Infof("Successfully set replicas of MachineDeployment %s to 0", machineDeployment.Name)

klog.Infof("Waiting for MachineDeployment %s to not have any associated machines", machineDeployment.Name)
if err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
machines, err := getMatchingMachines(machineDeployment, client)
return len(machines) == 0, err
}); err != nil {
Expand All @@ -139,11 +141,11 @@ func verifyCreateUpdateAndDelete(kubeConfig, manifestPath string, parameters []s
klog.Infof("Successfully waited for MachineDeployment %s to not have any associated machines", machineDeployment.Name)

klog.Infof("Deleting MachineDeployment %s and waiting for it to disappear", machineDeployment.Name)
if err := client.Delete(context.Background(), machineDeployment); err != nil {
if err := client.Delete(ctx, machineDeployment); err != nil {
return fmt.Errorf("failed to delete MachineDeployment %s: %w", machineDeployment.Name, err)
}
if err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
err = client.Get(context.Background(), types.NamespacedName{Namespace: machineDeployment.Namespace, Name: machineDeployment.Name}, &clusterv1alpha1.MachineDeployment{})
if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
err = client.Get(ctx, types.NamespacedName{Namespace: machineDeployment.Namespace, Name: machineDeployment.Name}, &clusterv1alpha1.MachineDeployment{})
if kerrors.IsNotFound(err) {
return true, nil
}
Expand Down
18 changes: 10 additions & 8 deletions test/e2e/provisioning/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func verifyCreateAndDelete(kubeConfig, manifestPath string, parameters []string,
return err
}

machineDeployment, err = createAndAssure(machineDeployment, client, timeout)
ctx := context.Background()

machineDeployment, err = createAndAssure(ctx, machineDeployment, client, timeout)
if err != nil {
return fmt.Errorf("failed to verify creation of node for MachineDeployment: %w", err)
}
Expand Down Expand Up @@ -138,7 +140,7 @@ func prepare(kubeConfig, manifestPath string, parameters []string) (ctrlruntimec
return client, manifest, nil
}

func createAndAssure(machineDeployment *clusterv1alpha1.MachineDeployment, client ctrlruntimeclient.Client, timeout time.Duration) (*clusterv1alpha1.MachineDeployment, error) {
func createAndAssure(ctx context.Context, machineDeployment *clusterv1alpha1.MachineDeployment, client ctrlruntimeclient.Client, timeout time.Duration) (*clusterv1alpha1.MachineDeployment, error) {
// we expect that no node for machine exists in the cluster
err := assureNodeForMachineDeployment(machineDeployment, client, false)
if err != nil {
Expand All @@ -151,7 +153,7 @@ func createAndAssure(machineDeployment *clusterv1alpha1.MachineDeployment, clien
// needs longer to validate a MachineDeployment than the kube-apiserver is willing to wait.
// In real world scenarios this is not that critical, but for tests we need to pay closer
// attention and retry the creation a few times.
err = wait.PollUntilContextTimeout(context.Background(), 3*time.Second, 180*time.Second, false, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, 3*time.Second, 180*time.Second, false, func(ctx context.Context) (bool, error) {
err := client.Create(ctx, machineDeployment)
if err != nil {
klog.Warningf("Creation of %q failed, retrying: %v", machineDeployment.Name, err)
Expand All @@ -167,7 +169,7 @@ func createAndAssure(machineDeployment *clusterv1alpha1.MachineDeployment, clien
klog.Infof("MachineDeployment %q created", machineDeployment.Name)

var pollErr error
err = wait.PollUntilContextTimeout(context.Background(), machineReadyCheckPeriod, timeout, false, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, machineReadyCheckPeriod, timeout, false, func(ctx context.Context) (bool, error) {
pollErr = assureNodeForMachineDeployment(machineDeployment, client, true)
if pollErr == nil {
return true, nil
Expand All @@ -180,13 +182,13 @@ func createAndAssure(machineDeployment *clusterv1alpha1.MachineDeployment, clien
klog.Infof("Found a node for MachineDeployment %s", machineDeployment.Name)

klog.Infof("Waiting for node of MachineDeployment %s to become ready", machineDeployment.Name)
err = wait.PollUntilContextTimeout(context.Background(), machineReadyCheckPeriod, timeout, false, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, machineReadyCheckPeriod, timeout, false, func(ctx context.Context) (bool, error) {
machines, pollErr := getMatchingMachines(machineDeployment, client)
if pollErr != nil || len(machines) < 1 {
return false, nil
}
for _, machine := range machines {
hasReadyNode, pollErr := hasMachineReadyNode(&machine, client)
hasReadyNode, pollErr := hasMachineReadyNode(ctx, &machine, client)
if err != nil {
return false, pollErr
}
Expand All @@ -202,9 +204,9 @@ func createAndAssure(machineDeployment *clusterv1alpha1.MachineDeployment, clien
return machineDeployment, nil
}

func hasMachineReadyNode(machine *clusterv1alpha1.Machine, client ctrlruntimeclient.Client) (bool, error) {
func hasMachineReadyNode(ctx context.Context, machine *clusterv1alpha1.Machine, client ctrlruntimeclient.Client) (bool, error) {
nodes := &corev1.NodeList{}
if err := client.List(context.Background(), nodes); err != nil {
if err := client.List(ctx, nodes); err != nil {
return false, fmt.Errorf("failed to list nodes: %w", err)
}
for _, node := range nodes.Items {
Expand Down