diff --git a/system_tests/system_test.go b/system_tests/system_test.go index b61f52726..9c36a4cbe 100644 --- a/system_tests/system_test.go +++ b/system_tests/system_test.go @@ -54,9 +54,11 @@ var _ = Describe("Operator", func() { Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed()) waitForRabbitmqRunning(cluster) - hostname = kubernetesNodeIp(ctx, clientSet) + hostname = kubernetesNodeIp(ctx, clientSet, cluster) port = rabbitmqNodePort(ctx, clientSet, cluster, "management") + fmt.Printf("hostname: " + hostname + " port: " + port) + var err error username, password, err = getUsernameAndPassword(ctx, clientSet, cluster.Namespace, cluster.Name) Expect(err).NotTo(HaveOccurred()) @@ -278,9 +280,11 @@ CONSOLE_LOG=new` waitForRabbitmqRunning(cluster) - hostname = kubernetesNodeIp(ctx, clientSet) + hostname = kubernetesNodeIp(ctx, clientSet, cluster) port = rabbitmqNodePort(ctx, clientSet, cluster, "management") + fmt.Printf("hostname: " + hostname + " port: " + port) + var err error username, password, err = getUsernameAndPassword(ctx, clientSet, cluster.Namespace, cluster.Name) Expect(err).NotTo(HaveOccurred()) @@ -404,8 +408,9 @@ CONSOLE_LOG=new` It("works", func() { username, password, err := getUsernameAndPassword(ctx, clientSet, cluster.Namespace, cluster.Name) - hostname := kubernetesNodeIp(ctx, clientSet) + hostname := kubernetesNodeIp(ctx, clientSet, cluster) port := rabbitmqNodePort(ctx, clientSet, cluster, "management") + fmt.Printf("hostname: " + hostname + " port: " + port) Expect(err).NotTo(HaveOccurred()) assertHttpReady(hostname, port) @@ -462,7 +467,7 @@ CONSOLE_LOG=new` // Passing a single hostname for certificate creation // the AMPQS client is connecting using the same hostname - hostname = kubernetesNodeIp(ctx, clientSet) + hostname = kubernetesNodeIp(ctx, clientSet, cluster) caFilePath, caCert, caKey = createTLSSecret("rabbitmq-tls-test-secret", namespace, hostname) // Update RabbitmqCluster with TLS secret name @@ -591,7 +596,8 @@ CONSOLE_LOG=new` BeforeEach(func() { instanceName := "mqtt-stomp-stream" cluster = newRabbitmqCluster(namespace, instanceName) - cluster.Spec.Service.Type = "NodePort" + + //cluster.Spec.Service.Type = "LoadBalancer" cluster.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{ "rabbitmq_mqtt", "rabbitmq_web_mqtt", @@ -611,7 +617,7 @@ CONSOLE_LOG=new` waitForPortReadiness(cluster, 1883) // mqtt waitForPortReadiness(cluster, 61613) // stomp - hostname = kubernetesNodeIp(ctx, clientSet) + hostname = kubernetesNodeIp(ctx, clientSet, cluster) var err error username, password, err = getUsernameAndPassword(ctx, clientSet, namespace, instanceName) Expect(err).NotTo(HaveOccurred()) diff --git a/system_tests/utils.go b/system_tests/utils.go index 7f9e1b5e9..d53a98899 100644 --- a/system_tests/utils.go +++ b/system_tests/utils.go @@ -61,6 +61,7 @@ import ( const podCreationTimeout = 10 * time.Minute const portReadinessTimeout = 1 * time.Minute +const serviceReadinessTimeout = 1 * time.Minute const k8sQueryTimeout = 1 * time.Minute type featureFlag struct { @@ -395,15 +396,22 @@ func getUsernameAndPassword(ctx context.Context, clientset *kubernetes.Clientset } func newRabbitmqCluster(namespace, instanceName string) *rabbitmqv1beta1.RabbitmqCluster { + serviceType := rabbitmqv1beta1.RabbitmqClusterServiceSpec{ + Type: "NodePort", + } + if os.Getenv("SYSTEM_TESTS_MODE") == "LoadBalancer" { + serviceType = rabbitmqv1beta1.RabbitmqClusterServiceSpec{ + Type: "LoadBalancer", + } + } + cluster := &rabbitmqv1beta1.RabbitmqCluster{ ObjectMeta: metav1.ObjectMeta{ Name: instanceName, Namespace: namespace, }, Spec: rabbitmqv1beta1.RabbitmqClusterSpec{ - Service: rabbitmqv1beta1.RabbitmqClusterServiceSpec{ - Type: "NodePort", - }, + Service: serviceType, // run system tests with low resources so that they can run on GitHub actions Resources: &corev1.ResourceRequirements{ Requests: map[corev1.ResourceName]k8sresource.Quantity{ @@ -483,25 +491,46 @@ func statefulSetPodName(cluster *rabbitmqv1beta1.RabbitmqCluster, index int) str * Helper function to fetch a Kubernetes Node IP. Node IPs are necessary * to access NodePort type services. */ -func kubernetesNodeIp(ctx context.Context, clientSet *kubernetes.Clientset) string { - nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - ExpectWithOffset(1, nodes).ToNot(BeNil()) - ExpectWithOffset(1, len(nodes.Items)).To(BeNumerically(">", 0)) +func kubernetesNodeIp(ctx context.Context, clientSet *kubernetes.Clientset, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster) string { var nodeIp string - for _, address := range nodes.Items[0].Status.Addresses { - // There are no order guarantees in this array. An Internal IP might come - // before an external IP or hostname. We want to return an external IP if - // available, or the internal IP. - // We don't want to return a hostname because it might not be resolvable by - // our local DNS - switch address.Type { - case corev1.NodeExternalIP: - return address.Address - case corev1.NodeInternalIP: - nodeIp = address.Address + + if os.Getenv("SYSTEM_TESTS_MODE") == "LoadBalancer" { + var output []byte + EventuallyWithOffset(2, func() string { + output, _ = kubectl( + "-n", + rabbitmqCluster.Namespace, + "get", + "services", + rabbitmqCluster.Name, + "-ojsonpath='{.status.loadBalancer.ingress[0].ip}'", + ) + return string(output) + }, serviceReadinessTimeout, 3).ShouldNot(Equal("")) + nodeIp = string(output) + + } else { + nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + ExpectWithOffset(1, nodes).ToNot(BeNil()) + ExpectWithOffset(1, len(nodes.Items)).To(BeNumerically(">", 0)) + + + for _, address := range nodes.Items[0].Status.Addresses { + // There are no order guarantees in this array. An Internal IP might come + // before an external IP or hostname. We want to return an external IP if + // available, or the internal IP. + // We don't want to return a hostname because it might not be resolvable by + // our local DNS + switch address.Type { + case corev1.NodeExternalIP: + return address.Address + case corev1.NodeInternalIP: + nodeIp = address.Address + } } + return nodeIp } // we did not find an external IP // we might return empty or the internal IP @@ -531,14 +560,17 @@ func containsPort(ports []corev1.ServicePort, portName string) bool { } func rabbitmqNodePort(ctx context.Context, clientSet *kubernetes.Clientset, cluster *rabbitmqv1beta1.RabbitmqCluster, portName string) string { + + + var err error service, err := clientSet.CoreV1().Services(cluster.Namespace). - Get(ctx, cluster.ChildResourceName(""), metav1.GetOptions{}) + Get(ctx, cluster.ChildResourceName(""), metav1.GetOptions{}) ExpectWithOffset(1, err).NotTo(HaveOccurred()) for _, port := range service.Spec.Ports { if port.Name == portName { - return strconv.Itoa(int(port.NodePort)) + return strconv.Itoa(int(port.Port)) } } @@ -600,6 +632,23 @@ func waitForRabbitmqRunningWithOffset(cluster *rabbitmqv1beta1.RabbitmqCluster, }, podCreationTimeout, 1).Should(Equal("'True'")) ExpectWithOffset(callStackOffset, err).NotTo(HaveOccurred()) + + if os.Getenv("SYSTEM_TESTS_MODE") == "LoadBalancer" { + var output []byte + EventuallyWithOffset(2, func() string { + output, _ = kubectl( + "-n", + cluster.Namespace, + "get", + "services", + cluster.Name, + "-ojsonpath='{.status.loadBalancer.ingress[0].ip}'", + ) + return string(output) + }, serviceReadinessTimeout, 3).ShouldNot(Equal("")) + } + + } func waitForPortConnectivity(cluster *rabbitmqv1beta1.RabbitmqCluster) {