From bcaa9bb5629b14f719c3d750076af246939f2f1d Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 23 Jul 2021 17:34:38 +0300 Subject: [PATCH 01/29] bugfix for sns topic deletion upon termination --- pubsub/aws/snssqs/snssqs.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 79892dcbda..746b562abd 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -70,6 +70,7 @@ func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, + pattern: regexp.MustCompile("[^a-zA-Z0-9_\\-]+"), } } @@ -220,7 +221,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { } func (s *snsSqs) createTopic(topic string) (string, string, error) { - hashedName := nameToHash(topic) + hashedName := s.nameToValidName(topic) createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{ Name: aws.String(hashedName), Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}}, @@ -491,11 +492,7 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } func (s *snsSqs) Close() error { - for _, sub := range s.subscriptions { - s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ - SubscriptionArn: sub, - }) - } + s.logger.Debugf("Closing sns-sqs pubsub component. This is NOOP") return nil } From 62ac6b01d3a1fe12e5acd28cf556cf751b78fd7a Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 23 Jul 2021 17:40:02 +0300 Subject: [PATCH 02/29] Revert "bugfix for sns topic deletion upon termination" This reverts commit bcaa9bb5629b14f719c3d750076af246939f2f1d. --- pubsub/aws/snssqs/snssqs.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 746b562abd..79892dcbda 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -70,7 +70,6 @@ func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, - pattern: regexp.MustCompile("[^a-zA-Z0-9_\\-]+"), } } @@ -221,7 +220,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { } func (s *snsSqs) createTopic(topic string) (string, string, error) { - hashedName := s.nameToValidName(topic) + hashedName := nameToHash(topic) createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{ Name: aws.String(hashedName), Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}}, @@ -492,7 +491,11 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } func (s *snsSqs) Close() error { - s.logger.Debugf("Closing sns-sqs pubsub component. This is NOOP") + for _, sub := range s.subscriptions { + s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ + SubscriptionArn: sub, + }) + } return nil } From de891cf96d568702ef5196107d3c8944fd8a7404 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 23 Jul 2021 17:48:26 +0300 Subject: [PATCH 03/29] wip on normalizing queue/topic names --- pubsub/aws/snssqs/snssqs.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 79892dcbda..25a8a46be5 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -8,6 +8,7 @@ import ( "fmt" "strconv" "strings" + "regexp" "github.com/aws/aws-sdk-go/aws" sns "github.com/aws/aws-sdk-go/service/sns" @@ -70,6 +71,7 @@ func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, + pattern: regexp.MustCompile("[^a-zA-Z0-9_\\-]+"), } } @@ -102,6 +104,14 @@ func nameToHash(name string) string { return fmt.Sprintf("%x", h.Sum(nil)) } +// normalize topic name to conform with: +// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html +func (s *snsSqs) nameToValidName(name string) string { + replacedName := s.pattern.ReplaceAllString(name, "") + replacedName[] +} + + func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) { md := snsSqsMetadata{} props := metadata.Properties From d4d2df7057adf8485e93145e5ea79a3eb41c253d Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Sat, 24 Jul 2021 14:55:19 +0300 Subject: [PATCH 04/29] sanitize queue and topic names --- pubsub/aws/snssqs/snssqs.go | 31 ++++++++++++++++++------------- pubsub/aws/snssqs/snssqs_test.go | 11 +++++++++++ 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 25a8a46be5..5780c82f48 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -67,11 +67,13 @@ const ( awsSnsTopicNameKey = "dapr-topic-name" ) +var awsSnsSqsTopicAllowCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") + + func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, - pattern: regexp.MustCompile("[^a-zA-Z0-9_\\-]+"), } } @@ -104,14 +106,16 @@ func nameToHash(name string) string { return fmt.Sprintf("%x", h.Sum(nil)) } -// normalize topic name to conform with: +// normalize topic/queue name to conform with: // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html -func (s *snsSqs) nameToValidName(name string) string { - replacedName := s.pattern.ReplaceAllString(name, "") - replacedName[] +func nameToValidName(name string) string { + replacedName := awsSnsSqsTopicAllowCharsRe.ReplaceAllString(name, "") + if len(replacedName) > 80 { + replacedName = replacedName[:80] + } + return replacedName } - func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) { md := snsSqsMetadata{} props := metadata.Properties @@ -230,7 +234,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { } func (s *snsSqs) createTopic(topic string) (string, string, error) { - hashedName := nameToHash(topic) + hashedName := nameToValidName(topic) createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{ Name: aws.String(hashedName), Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}}, @@ -271,7 +275,7 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) { func (s *snsSqs) createQueue(queueName string) (*sqsQueueInfo, error) { createQueueResponse, err := s.sqsClient.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(nameToHash(queueName)), + QueueName: aws.String(nameToValidName(queueName)), Tags: map[string]*string{awsSqsQueueNameKey: aws.String(queueName)}, }) if err != nil { @@ -501,11 +505,12 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } func (s *snsSqs) Close() error { - for _, sub := range s.subscriptions { - s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ - SubscriptionArn: sub, - }) - } + s.logger.Debugf("Close was called and is now NOOP") + // for _, sub := range s.subscriptions { + // s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ + // SubscriptionArn: sub, + // }) + // } return nil } diff --git a/pubsub/aws/snssqs/snssqs_test.go b/pubsub/aws/snssqs/snssqs_test.go index 3979df4001..aaabb05362 100644 --- a/pubsub/aws/snssqs/snssqs_test.go +++ b/pubsub/aws/snssqs/snssqs_test.go @@ -255,3 +255,14 @@ func Test_nameToHash(t *testing.T) { fmt.Sprintf("Invalid character %s in hashed name", string(c))) } } + +func Test_replaceInvalidQueueNameChars(t *testing.T) { + r := require.New(t) + + s := `Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid + name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an + AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^` + v := nameToValidName(s) + r.Equal(80, len(v)) + r.Equal("SomeinvalidnameforanAWSresourceSomeinvalidnameforanAWSresourceSomeinvalidnamefor", v) +} From d3e08e72fdb98179b9d80ed25192c5e567be1cae Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Sat, 24 Jul 2021 15:53:09 +0300 Subject: [PATCH 05/29] sanitized names. bugfix for close --- pubsub/aws/snssqs/snssqs.go | 46 +++++++++++++------------------- pubsub/aws/snssqs/snssqs_test.go | 29 +++----------------- 2 files changed, 23 insertions(+), 52 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 5780c82f48..f520b0ed00 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -2,7 +2,6 @@ package snssqs import ( "context" - "crypto/sha256" "encoding/json" "errors" "fmt" @@ -21,8 +20,8 @@ import ( type snsSqs struct { // key is the topic name, value is the ARN of the topic topics map[string]string - // key is the hashed topic name, value is the actual topic name - topicHash map[string]string + // key is the sanitized topic name, value is the actual topic name + topicSanitized map[string]string // key is the topic name, value holds the ARN of the queue and its url queues map[string]*sqsQueueInfo snsClient *sns.SNS @@ -67,7 +66,7 @@ const ( awsSnsTopicNameKey = "dapr-topic-name" ) -var awsSnsSqsTopicAllowCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") +var awsSnsSqsAllowedCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") func NewSnsSqs(l logger.Logger) pubsub.PubSub { @@ -97,23 +96,16 @@ func parseInt64(input string, propertyName string) (int64, error) { return int64(number), nil } -// take a name and hash it for compatibility with AWS resource names -// the output is fixed at 64 characters -func nameToHash(name string) string { - h := sha256.New() - h.Write([]byte(name)) - return fmt.Sprintf("%x", h.Sum(nil)) -} - -// normalize topic/queue name to conform with: -// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html -func nameToValidName(name string) string { - replacedName := awsSnsSqsTopicAllowCharsRe.ReplaceAllString(name, "") - if len(replacedName) > 80 { - replacedName = replacedName[:80] +// sanitize topic/queue name to conform with: +// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html +func nameToAWSSanitizedName(name string) string { + sanitizedName := awsSnsSqsAllowedCharsRe.ReplaceAllString(name, "") + if len(sanitizedName) > 80 { + sanitizedName = sanitizedName[:80] } - return replacedName + + return sanitizedName } func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) { @@ -221,7 +213,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { // both Publish and Subscribe need reference the topic ARN // track these ARNs in this map s.topics = make(map[string]string) - s.topicHash = make(map[string]string) + s.topicSanitized = make(map[string]string) s.queues = make(map[string]*sqsQueueInfo) sess, err := aws_auth.GetClient(md.AccessKey, md.SecretKey, md.SessionToken, md.Region, md.Endpoint) if err != nil { @@ -234,16 +226,16 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error { } func (s *snsSqs) createTopic(topic string) (string, string, error) { - hashedName := nameToValidName(topic) + sanitizedName := nameToAWSSanitizedName(topic) createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{ - Name: aws.String(hashedName), + Name: aws.String(sanitizedName), Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}}, }) if err != nil { return "", "", err } - return *(createTopicResponse.TopicArn), hashedName, nil + return *(createTopicResponse.TopicArn), sanitizedName, nil } // get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist @@ -259,7 +251,7 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) { s.logger.Debugf("No topic ARN found for %s\n Creating topic instead.", topic) - topicArn, hashedName, err := s.createTopic(topic) + topicArn, sanitizedName, err := s.createTopic(topic) if err != nil { s.logger.Errorf("error creating new topic %s: %v", topic, err) @@ -268,14 +260,14 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) { // record topic ARN s.topics[topic] = topicArn - s.topicHash[hashedName] = topic + s.topicSanitized[sanitizedName] = topic return topicArn, nil } func (s *snsSqs) createQueue(queueName string) (*sqsQueueInfo, error) { createQueueResponse, err := s.sqsClient.CreateQueue(&sqs.CreateQueueInput{ - QueueName: aws.String(nameToValidName(queueName)), + QueueName: aws.String(nameToAWSSanitizedName(queueName)), Tags: map[string]*string{awsSqsQueueNameKey: aws.String(queueName)}, }) if err != nil { @@ -411,7 +403,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha } topic := parseTopicArn(messageBody.TopicArn) - topic = s.topicHash[topic] + topic = s.topicSanitized[topic] err = handler(context.Background(), &pubsub.NewMessage{ Data: []byte(messageBody.Message), Topic: topic, diff --git a/pubsub/aws/snssqs/snssqs_test.go b/pubsub/aws/snssqs/snssqs_test.go index aaabb05362..b5a61caf37 100644 --- a/pubsub/aws/snssqs/snssqs_test.go +++ b/pubsub/aws/snssqs/snssqs_test.go @@ -1,8 +1,6 @@ package snssqs import ( - "fmt" - "strings" "testing" "github.com/dapr/components-contrib/pubsub" @@ -236,33 +234,14 @@ func Test_parseInt64(t *testing.T) { r.Error(err) } -func Test_nameToHash(t *testing.T) { - r := require.New(t) - - // This string is too long and contains invalid character for either an SQS queue or an SNS topic - hashedName := nameToHash(` - Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid - name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an - AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^ - `) - - r.Equal(64, len(hashedName)) - // Output is only expected to contain lower case characters representing valid hexadecimal numerals - for _, c := range hashedName { - r.True( - strings.ContainsAny( - "abcdef0123456789", string(c)), - fmt.Sprintf("Invalid character %s in hashed name", string(c))) - } -} -func Test_replaceInvalidQueueNameChars(t *testing.T) { +func Test_replaceNameToAWSSanitizedName(t *testing.T) { r := require.New(t) - s := `Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid + s := `Some_invalid-name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^` - v := nameToValidName(s) + v := nameToAWSSanitizedName(s) r.Equal(80, len(v)) - r.Equal("SomeinvalidnameforanAWSresourceSomeinvalidnameforanAWSresourceSomeinvalidnamefor", v) + r.Equal("Some_invalid-nameforanAWSresourceSomeinvalidnameforanAWSresourceSomeinvalidnamef", v) } From 5c9949b40a4f79afa70d54b5af19b1d941594a00 Mon Sep 17 00:00:00 2001 From: Maarten Mulders Date: Mon, 19 Jul 2021 03:44:38 +0200 Subject: [PATCH 06/29] # This is a combination of 4 commits. # This is the 1st commit message: Improve error message in case of missing property (#1012) Co-authored-by: Artur Souza # This is the commit message #2: Remove vestigial pubsub/nats code (#1024) The pubsub/nats component was replaced by pubsub/natsstreaming as part of https://github.com/dapr/dapr/pull/2003, but the corresponding code in dapr/components-contrib was not removed, so this change removes it. # This is the commit message #3: bugfix for sns topic deletion upon termination # This is the commit message #4: Revert "bugfix for sns topic deletion upon termination" This reverts commit bcaa9bb5629b14f719c3d750076af246939f2f1d. --- bindings/smtp/smtp.go | 2 +- pubsub/nats/metadata.go | 11 ----- pubsub/nats/nats.go | 99 ---------------------------------------- pubsub/nats/nats_test.go | 69 ---------------------------- 4 files changed, 1 insertion(+), 180 deletions(-) delete mode 100644 pubsub/nats/metadata.go delete mode 100644 pubsub/nats/nats.go delete mode 100644 pubsub/nats/nats_test.go diff --git a/bindings/smtp/smtp.go b/bindings/smtp/smtp.go index 28385a5f14..d71aefb69c 100644 --- a/bindings/smtp/smtp.go +++ b/bindings/smtp/smtp.go @@ -63,7 +63,7 @@ func (s *Mailer) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, // Merge config metadata with request metadata metadata := s.metadata.mergeWithRequestMetadata(req) if metadata.EmailFrom == "" { - return nil, fmt.Errorf("smtp binding error: fromEmail property not supplied in configuration- or request-metadata") + return nil, fmt.Errorf("smtp binding error: emailFrom property not supplied in configuration- or request-metadata") } if metadata.EmailTo == "" { return nil, fmt.Errorf("smtp binding error: emailTo property not supplied in configuration- or request-metadata") diff --git a/pubsub/nats/metadata.go b/pubsub/nats/metadata.go deleted file mode 100644 index e48fa1bb9a..0000000000 --- a/pubsub/nats/metadata.go +++ /dev/null @@ -1,11 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -type metadata struct { - natsURL string - natsQueueGroupName string -} diff --git a/pubsub/nats/nats.go b/pubsub/nats/nats.go deleted file mode 100644 index 84693debc3..0000000000 --- a/pubsub/nats/nats.go +++ /dev/null @@ -1,99 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -import ( - "context" - "errors" - "fmt" - - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" - nats "github.com/nats-io/nats.go" -) - -const ( - natsURL = "natsURL" - consumerID = "consumerID" -) - -type natsPubSub struct { - metadata metadata - natsConn *nats.Conn - - logger logger.Logger -} - -// NewNATSPubSub returns a new NATS pub-sub implementation -func NewNATSPubSub(logger logger.Logger) pubsub.PubSub { - return &natsPubSub{logger: logger} -} - -func parseNATSMetadata(meta pubsub.Metadata) (metadata, error) { - m := metadata{} - if val, ok := meta.Properties[natsURL]; ok && val != "" { - m.natsURL = val - } else { - return m, errors.New("nats error: missing nats URL") - } - - if val, ok := meta.Properties[consumerID]; ok && val != "" { - m.natsQueueGroupName = val - } else { - return m, errors.New("nats error: missing queue name") - } - - return m, nil -} - -func (n *natsPubSub) Init(metadata pubsub.Metadata) error { - m, err := parseNATSMetadata(metadata) - if err != nil { - return err - } - - n.metadata = m - natsConn, err := nats.Connect(m.natsURL) - if err != nil { - return fmt.Errorf("nats: error connecting to nats at %s: %s", m.natsURL, err) - } - n.logger.Debugf("connected to nats at %s", m.natsURL) - - n.natsConn = natsConn - - return nil -} - -func (n *natsPubSub) Publish(req *pubsub.PublishRequest) error { - err := n.natsConn.Publish(req.Topic, req.Data) - if err != nil { - return fmt.Errorf("nats: error from publish: %s", err) - } - - return nil -} - -func (n *natsPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { - sub, err := n.natsConn.QueueSubscribe(req.Topic, n.metadata.natsQueueGroupName, func(natsMsg *nats.Msg) { - handler(context.Background(), &pubsub.NewMessage{Topic: req.Topic, Data: natsMsg.Data}) - }) - if err != nil { - n.logger.Warnf("nats: error subscribe: %s", err) - } - n.logger.Debugf("nats: subscribed to subject %s with queue group %s", sub.Subject, sub.Queue) - - return nil -} - -func (n *natsPubSub) Close() error { - n.natsConn.Close() - - return nil -} - -func (n *natsPubSub) Features() []pubsub.Feature { - return nil -} diff --git a/pubsub/nats/nats_test.go b/pubsub/nats/nats_test.go deleted file mode 100644 index dfe805ef5f..0000000000 --- a/pubsub/nats/nats_test.go +++ /dev/null @@ -1,69 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -import ( - "errors" - "testing" - - "github.com/dapr/components-contrib/pubsub" - "github.com/stretchr/testify/assert" -) - -func TestParseNATSMetadata(t *testing.T) { - t.Run("metadata is correct", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "foonats1", - consumerID: "fooq1", - } - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - - // act - m, err := parseNATSMetadata(fakeMetaData) - - // assert - assert.NoError(t, err) - assert.NotEmpty(t, m.natsURL) - assert.NotEmpty(t, m.natsQueueGroupName) - assert.Equal(t, fakeProperties[natsURL], m.natsURL) - assert.Equal(t, fakeProperties[consumerID], m.natsQueueGroupName) - }) - - t.Run("queue is not given", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "foonats2", - consumerID: "", - } - - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - - // act - m, err := parseNATSMetadata(fakeMetaData) - // assert - assert.Error(t, errors.New("nats error: missing queue name"), err) - assert.Equal(t, fakeProperties[natsURL], m.natsURL) - assert.Empty(t, m.natsQueueGroupName) - }) - - t.Run("nats url is not given", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "", - consumerID: "fooq2", - } - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - // act - m, err := parseNATSMetadata(fakeMetaData) - // assert - assert.Error(t, errors.New("nats error: missing nats URL"), err) - assert.Empty(t, m.natsURL) - }) -} From 408fdcdcadf2df71da084b86894d85ef67d033ae Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 26 Jul 2021 20:55:53 +0300 Subject: [PATCH 07/29] removed debug message --- pubsub/aws/snssqs/snssqs.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index f520b0ed00..2ef8f5c82a 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -497,13 +497,6 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } func (s *snsSqs) Close() error { - s.logger.Debugf("Close was called and is now NOOP") - // for _, sub := range s.subscriptions { - // s.snsClient.Unsubscribe(&sns.UnsubscribeInput{ - // SubscriptionArn: sub, - // }) - // } - return nil } From f3bee8a17760f2f62366be6ad708eb7b307ab7de Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 26 Jul 2021 21:40:55 +0300 Subject: [PATCH 08/29] raw string abort --- pubsub/aws/snssqs/snssqs.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 2ef8f5c82a..9dc4cda8d6 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -73,6 +73,10 @@ func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, +<<<<<<< Updated upstream +======= + pattern: regexp.MustCompile(`[^a-zA-Z0-9_\-]+`), +>>>>>>> Stashed changes } } From be550bf93e2b0e3f018c7a5ca558989fb6bf1680 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 26 Jul 2021 21:55:45 +0300 Subject: [PATCH 09/29] merge issues solved --- pubsub/aws/snssqs/snssqs.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 9dc4cda8d6..397f79b01c 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -66,17 +66,13 @@ const ( awsSnsTopicNameKey = "dapr-topic-name" ) -var awsSnsSqsAllowedCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") +var awsSnsSqsAllowedCharsRe = regexp.MustCompile(`[^a-zA-Z0-9_\-]+`) func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, -<<<<<<< Updated upstream -======= - pattern: regexp.MustCompile(`[^a-zA-Z0-9_\-]+`), ->>>>>>> Stashed changes } } From 7b7dc935c3305f23b9c5601e58564079d38d1f8b Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 26 Jul 2021 22:32:19 +0300 Subject: [PATCH 10/29] wip --- pubsub/aws/snssqs/snssqs.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 397f79b01c..e89917f615 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -39,7 +39,8 @@ type sqsQueueInfo struct { type snsSqsMetadata struct { // name of the queue for this application. The is provided by the runtime as "consumerID" sqsQueueName string - + // name of the dead letter queue for this application + sqsDeadLettersQueueName string // aws endpoint for the component to use. Endpoint string // access key to use for accessing sqs/sns @@ -54,6 +55,8 @@ type snsSqsMetadata struct { // amount of time in seconds that a message is hidden from receive requests after it is sent to a subscriber. Default: 10 messageVisibilityTimeout int64 // number of times to resend a message after processing of that message fails before removing that message from the queue. Default: 10 + // if sqsDeadLettersQueueName is set to a value, then the messageRetryLimit value would be used as the threshold number of times after + // which an unprocessed message would be removed and placed in the dead-letters queue messageRetryLimit int64 // amount of time to await receipt of a message before making another request. Default: 1 messageWaitTimeSeconds int64 @@ -167,6 +170,10 @@ func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, md.messageRetryLimit = retryLimit } + if val, ok := getAliasedProperty([]string{"sqsDeadLettersQueueName"}, metadata); ok { + md.sqsDeadLettersQueueName = val + } + if val, ok := props["messageWaitTimeSeconds"]; !ok { md.messageWaitTimeSeconds = 1 } else { From cb0f2e5466eb65246868b2b76def097b065dde6b Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 26 Jul 2021 23:59:21 +0300 Subject: [PATCH 11/29] gofmt+remove regex and use byte iter --- pubsub/aws/snssqs/snssqs.go | 30 +++++++++++++++++------------- pubsub/aws/snssqs/snssqs_test.go | 1 - 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 9dc4cda8d6..1c8a7a332e 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -7,7 +7,6 @@ import ( "fmt" "strconv" "strings" - "regexp" "github.com/aws/aws-sdk-go/aws" sns "github.com/aws/aws-sdk-go/service/sns" @@ -66,17 +65,10 @@ const ( awsSnsTopicNameKey = "dapr-topic-name" ) -var awsSnsSqsAllowedCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+") - - func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, subscriptions: []*string{}, -<<<<<<< Updated upstream -======= - pattern: regexp.MustCompile(`[^a-zA-Z0-9_\-]+`), ->>>>>>> Stashed changes } } @@ -100,16 +92,28 @@ func parseInt64(input string, propertyName string) (int64, error) { return int64(number), nil } - // sanitize topic/queue name to conform with: // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html func nameToAWSSanitizedName(name string) string { - sanitizedName := awsSnsSqsAllowedCharsRe.ReplaceAllString(name, "") - if len(sanitizedName) > 80 { - sanitizedName = sanitizedName[:80] + s := []byte(name) + + j := 0 + for _, b := range s { + if ('a' <= b && b <= 'z') || + ('A' <= b && b <= 'Z') || + ('0' <= b && b <= '9') || + (b == '-') || + (b == '_') { + s[j] = b + j++ + + if j == 80 { + break + } + } } - return sanitizedName + return string(s[:j]) } func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) { diff --git a/pubsub/aws/snssqs/snssqs_test.go b/pubsub/aws/snssqs/snssqs_test.go index b5a61caf37..835100bc83 100644 --- a/pubsub/aws/snssqs/snssqs_test.go +++ b/pubsub/aws/snssqs/snssqs_test.go @@ -234,7 +234,6 @@ func Test_parseInt64(t *testing.T) { r.Error(err) } - func Test_replaceNameToAWSSanitizedName(t *testing.T) { r := require.New(t) From dd0c6e24d69c3ce1e57b2bc85135d211ab304318 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Thu, 29 Jul 2021 00:42:34 +0300 Subject: [PATCH 12/29] wip. first impl of dead-letters queue config --- pubsub/aws/snssqs/snssqs.go | 90 ++++++++++++++++++++++++++++++++----- 1 file changed, 79 insertions(+), 11 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 3ef005b9a5..adecc5be1f 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -38,7 +38,7 @@ type sqsQueueInfo struct { type snsSqsMetadata struct { // name of the queue for this application. The is provided by the runtime as "consumerID" sqsQueueName string - // name of the dead letter queue for this application + // name of the dead letter queue for this application sqsDeadLettersQueueName string // aws endpoint for the component to use. Endpoint string @@ -54,9 +54,10 @@ type snsSqsMetadata struct { // amount of time in seconds that a message is hidden from receive requests after it is sent to a subscriber. Default: 10 messageVisibilityTimeout int64 // number of times to resend a message after processing of that message fails before removing that message from the queue. Default: 10 - // if sqsDeadLettersQueueName is set to a value, then the messageRetryLimit value would be used as the threshold number of times after - // which an unprocessed message would be removed and placed in the dead-letters queue messageRetryLimit int64 + // if sqsDeadLettersQueueName is set to a value, then the deadLettersMaxReceives defines the number of times a message is received + // before it is moved to the dead-letters queue. This value must be smaller than messageRetryLimit + deadLettersMaxReceives int64 // amount of time to await receipt of a message before making another request. Default: 1 messageWaitTimeSeconds int64 // maximum number of messages to receive from the queue at a time. Default: 10, Maximum: 10 @@ -180,6 +181,20 @@ func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, if val, ok := getAliasedProperty([]string{"sqsDeadLettersQueueName"}, metadata); ok { md.sqsDeadLettersQueueName = val + + if val, ok = getAliasedProperty([]string{"deadLettersMaxReceives"}, metadata); !ok { + md.deadLettersMaxReceives = md.messageRetryLimit + } else { + deadLettersMaxReceives, err := parseInt64(val, "deadLettersMaxReceives") + if err != nil { + return nil, err + } + if deadLettersMaxReceives > md.messageRetryLimit { + return nil, errors.New("deadLettersMaxReceives must be less than or equal to messageRetryLimit") + } + + md.deadLettersMaxReceives = deadLettersMaxReceives + } } if val, ok := props["messageWaitTimeSeconds"]; !ok { @@ -384,7 +399,7 @@ func (s *snsSqs) acknowledgeMessage(queueURL string, receiptHandle *string) erro return err } -func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, handler pubsub.Handler) error { +func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo, deadLettersQueueInfo *sqsQueueInfo, handler pubsub.Handler) error { // if this message has been received > x times, delete from queue, it's borked recvCount, ok := message.Attributes[sqs.MessageSystemAttributeNameApproximateReceiveCount] @@ -398,16 +413,20 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha return fmt.Errorf("error parsing ApproximateReceiveCount from message: %v", message) } - // if we are over the allowable retry limit, delete the message from the queue - // TODO dead letter queue - if recvCountInt >= s.metadata.messageRetryLimit { + // if we are over the allowable retry limit, and there is no dead-letters queue, delete the message from the queue. + if deadLettersQueueInfo == nil && recvCountInt >= s.metadata.messageRetryLimit { if innerErr := s.acknowledgeMessage(queueInfo.url, message.ReceiptHandle); innerErr != nil { return fmt.Errorf("error acknowledging message after receiving the message too many times: %v", innerErr) } - return fmt.Errorf( "message received greater than %v times, deleting this message without further processing", s.metadata.messageRetryLimit) } + // ... else, there is no need to actively do something if we reached the limit defined in deadLettersMaxReceives as the message had + // already been moved to the dead-letters queue by SQS + if deadLettersQueueInfo != nil && recvCountInt >= s.metadata.deadLettersMaxReceives { + s.logger.Warnf( + "message received greater than %v times, moving this message without further processing to dead-letters queue: %v", s.metadata.deadLettersMaxReceives, s.metadata.sqsDeadLettersQueueName) + } // otherwise try to handle the message var messageBody snsMessage @@ -432,7 +451,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha return s.acknowledgeMessage(queueInfo.url, message.ReceiptHandle) } -func (s *snsSqs) consumeSubscription(queueInfo *sqsQueueInfo, handler pubsub.Handler) { +func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueInfo, handler pubsub.Handler) { go func() { for { messageResponse, err := s.sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{ @@ -461,7 +480,7 @@ func (s *snsSqs) consumeSubscription(queueInfo *sqsQueueInfo, handler pubsub.Han s.logger.Debugf("%v message(s) received", len(messageResponse.Messages)) for _, m := range messageResponse.Messages { - if err := s.handleMessage(m, queueInfo, handler); err != nil { + if err := s.handleMessage(m, queueInfo, deadLettersQueueInfo, handler); err != nil { s.logger.Error(err) } } @@ -469,6 +488,39 @@ func (s *snsSqs) consumeSubscription(queueInfo *sqsQueueInfo, handler pubsub.Han }() } +func (s *snsSqs) createDeadLettersQueue() (*sqsQueueInfo, error) { + var deadLettersQueueInfo *sqsQueueInfo + deadLettersQueueInfo, err := s.getOrCreateQueue(s.metadata.sqsDeadLettersQueueName) + if err != nil { + s.logger.Errorf("error retrieving SQS dead-letter queue: %v", err) + + return nil, err + } + + return deadLettersQueueInfo, nil +} + +func (s *snsSqs) updateQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo *sqsQueueInfo, sqsSetQueueAttributesInput *sqs.SetQueueAttributesInput) error { + policy := map[string]string{ + "deadLetterTargetArn": deadLettersQueueInfo.arn, + "maxReceiveCount": strconv.FormatInt(s.metadata.deadLettersMaxReceives, 10), + } + + b, err := json.Marshal(policy) + if err != nil { + s.logger.Errorf("error marshalling dead-letters queue policy: %v", err) + + return err + } + + sqsSetQueueAttributesInput.QueueUrl = &queueInfo.url + sqsSetQueueAttributesInput.Attributes = map[string]*string{ + sqs.QueueAttributeNameRedrivePolicy: aws.String(string(b)), + } + + return nil +} + func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { // subscribers declare a topic ARN // and declare a SQS queue to use @@ -489,6 +541,22 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) return err } + var deadLettersQueueInfo *sqsQueueInfo = nil + sqsSetQueueAttributesInput := &sqs.SetQueueAttributesInput{} + + if len(s.metadata.sqsDeadLettersQueueName) > 0 { + var err error + deadLettersQueueInfo, err = s.createDeadLettersQueue() + if err != nil { + s.logger.Errorf("error creating dead-letter queue: %v", err) + + return err + } + + s.updateQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo, sqsSetQueueAttributesInput) + } + + s.sqsClient.SetQueueAttributesRequest(sqsSetQueueAttributesInput) // subscription creation is idempotent. Subscriptions are unique by topic/queue subscribeOutput, err := s.snsClient.Subscribe(&sns.SubscribeInput{ Attributes: nil, @@ -506,7 +574,7 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) s.subscriptions = append(s.subscriptions, subscribeOutput.SubscriptionArn) s.logger.Debugf("Subscribed to topic %s: %v", req.Topic, subscribeOutput) - s.consumeSubscription(queueInfo, handler) + s.consumeSubscription(queueInfo, deadLettersQueueInfo, handler) return nil } From 6c3e28343f97fbe62328d23a1f6170437f1765c6 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Thu, 29 Jul 2021 01:06:24 +0300 Subject: [PATCH 13/29] wip. refactor and fallback values --- pubsub/aws/snssqs/snssqs.go | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index adecc5be1f..3d5e5cc1f8 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -65,8 +65,9 @@ type snsSqsMetadata struct { } const ( - awsSqsQueueNameKey = "dapr-queue-name" - awsSnsTopicNameKey = "dapr-topic-name" + awsSqsQueueNameKey = "dapr-queue-name" + awsSnsTopicNameKey = "dapr-topic-name" + awsSqsDeadLettersQueueName = "dapr-deadletters" ) func NewSnsSqs(l logger.Logger) pubsub.PubSub { @@ -181,20 +182,29 @@ func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, if val, ok := getAliasedProperty([]string{"sqsDeadLettersQueueName"}, metadata); ok { md.sqsDeadLettersQueueName = val + } - if val, ok = getAliasedProperty([]string{"deadLettersMaxReceives"}, metadata); !ok { - md.deadLettersMaxReceives = md.messageRetryLimit - } else { - deadLettersMaxReceives, err := parseInt64(val, "deadLettersMaxReceives") - if err != nil { - return nil, err - } - if deadLettersMaxReceives > md.messageRetryLimit { - return nil, errors.New("deadLettersMaxReceives must be less than or equal to messageRetryLimit") - } + if val, ok := getAliasedProperty([]string{"deadLettersMaxReceives"}, metadata); !ok { + md.deadLettersMaxReceives = md.messageRetryLimit + } else { + // fallback: use default dead-letters queue name if deadLettersMaxReceives is defined but the sqsDeadLettersQueueName isn't + if len(md.sqsDeadLettersQueueName) == 0 { + md.sqsDeadLettersQueueName = awsSqsDeadLettersQueueName + } - md.deadLettersMaxReceives = deadLettersMaxReceives + deadLettersMaxReceives, err := parseInt64(val, "deadLettersMaxReceives") + if err != nil { + return nil, err + } + + // validate: if deadLettersMaxReceives is greater than messageRetryLimit, the message would be deleted by daprd before + // SQS has the opportunity to move the message to the dead-letters queue, so we reject this + if deadLettersMaxReceives > md.messageRetryLimit { + return nil, errors.New("deadLettersMaxReceives must be less than or equal to messageRetryLimit") } + + // assign: used provided configuration + md.deadLettersMaxReceives = deadLettersMaxReceives } if val, ok := props["messageWaitTimeSeconds"]; !ok { @@ -418,6 +428,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo, deadLettersQueue if innerErr := s.acknowledgeMessage(queueInfo.url, message.ReceiptHandle); innerErr != nil { return fmt.Errorf("error acknowledging message after receiving the message too many times: %v", innerErr) } + return fmt.Errorf( "message received greater than %v times, deleting this message without further processing", s.metadata.messageRetryLimit) } From b4692c96b28e0df34c39f4ede780e2f5d55e3c38 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Thu, 29 Jul 2021 18:05:52 +0300 Subject: [PATCH 14/29] integration test wip --- pubsub/aws/snssqs/snssqs_integ_test.go | 31 ++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 pubsub/aws/snssqs/snssqs_integ_test.go diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go new file mode 100644 index 0000000000..a02d4de3a3 --- /dev/null +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -0,0 +1,31 @@ +package snssqs + +import ( + "os" + "testing" + + "github.com/dapr/components-contrib/snssqs" + "github.com/dapr/kit/logger" + "github.com/stretchr/testify/assert" +) + +// TestIntegrationGetSecret requires AWS specific environments for authentication AWS_DEFAULT_REGION AWS_ACCESS_KEY_ID, +// AWS_SECRET_ACCESS_KkEY and AWS_SESSION_TOKEN +func TestIntegrationCreateAllSnsSqs(t *testing.T) { + ss := NewSnsSqs(logger.NewLogger("test")) + err := ss.Init(snssqs.Metadata{ + Properties: map[string]string{ + "Region": os.Getenv("AWS_DEFAULT_REGION"), + "AccessKey": os.Getenv("AWS_ACCESS_KEY_ID"), + "SecretKey": os.Getenv("AWS_SECRET_ACCESS_KEY"), + "SessionToken": os.Getenv("AWS_SESSION_TOKEN"), + }, + }) + assert.Nil(t, err) + response, err := sm.GetSecret(secretstores.GetSecretRequest{ + Name: secretName, + Metadata: map[string]string{}, + }) + assert.Nil(t, err) + assert.NotNil(t, response) +} From 58ad8a9410003432c122c809a4f596ae932d8267 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Thu, 29 Jul 2021 21:29:51 +0300 Subject: [PATCH 15/29] wip integration test --- pubsub/aws/snssqs/snssqs_integ_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index a02d4de3a3..972e2a6058 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -4,7 +4,7 @@ import ( "os" "testing" - "github.com/dapr/components-contrib/snssqs" + "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/stretchr/testify/assert" ) @@ -12,8 +12,8 @@ import ( // TestIntegrationGetSecret requires AWS specific environments for authentication AWS_DEFAULT_REGION AWS_ACCESS_KEY_ID, // AWS_SECRET_ACCESS_KkEY and AWS_SESSION_TOKEN func TestIntegrationCreateAllSnsSqs(t *testing.T) { - ss := NewSnsSqs(logger.NewLogger("test")) - err := ss.Init(snssqs.Metadata{ + snsq := NewSnsSqs(logger.NewLogger("test")) + err := snsq.Init(snssqs.snsSqsMetadata{ Properties: map[string]string{ "Region": os.Getenv("AWS_DEFAULT_REGION"), "AccessKey": os.Getenv("AWS_ACCESS_KEY_ID"), @@ -22,10 +22,8 @@ func TestIntegrationCreateAllSnsSqs(t *testing.T) { }, }) assert.Nil(t, err) - response, err := sm.GetSecret(secretstores.GetSecretRequest{ - Name: secretName, - Metadata: map[string]string{}, - }) + // snsq.Subscribe() + assert.Nil(t, err) assert.NotNil(t, response) } From 46bbbc7184b57bc02db13c84a257a54568a1bf79 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 30 Jul 2021 09:04:04 +0300 Subject: [PATCH 16/29] wip integration --- pubsub/aws/snssqs/snssqs_integ_test.go | 109 +++++++++++++++++++++++-- 1 file changed, 103 insertions(+), 6 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index 972e2a6058..60c77fd409 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -1,19 +1,53 @@ package snssqs import ( + "context" "os" "testing" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/stretchr/testify/assert" ) -// TestIntegrationGetSecret requires AWS specific environments for authentication AWS_DEFAULT_REGION AWS_ACCESS_KEY_ID, -// AWS_SECRET_ACCESS_KkEY and AWS_SESSION_TOKEN -func TestIntegrationCreateAllSnsSqs(t *testing.T) { +func TestMain(m *testing.M) { + setup() + code := m.Run() + teardown() + os.Exit(code) +} + +type testFixture struct { + topicName string + deadLettersQueueName string + queueName string +} + +func getFixture() *testFixture { + return &testFixture{ + topicName: "dapr-sns-test-topic", + deadLettersQueueName: "dapr-sqs-test-deadletters-queue", + queueName: "dapr-sqs-test-queue", + } +} + +func setupTestCase(t *testing.T) func(t *testing.T) { + t.Log("setup test case") + return func(t *testing.T) { + t.Log("teardown test case") + } +} + +func snsSqsTest(t *testing.T) func(t *testing.T) { + t.Log("setup sub test") + sess := session.Must(session.NewSessionWithOptions(session.Options{ + SharedConfigState: session.SharedConfigEnable, + })) + snsq := NewSnsSqs(logger.NewLogger("test")) - err := snsq.Init(snssqs.snsSqsMetadata{ + err := snsq.Init(pubsub.Metadata{ Properties: map[string]string{ "Region": os.Getenv("AWS_DEFAULT_REGION"), "AccessKey": os.Getenv("AWS_ACCESS_KEY_ID"), @@ -21,9 +55,72 @@ func TestIntegrationCreateAllSnsSqs(t *testing.T) { "SessionToken": os.Getenv("AWS_SESSION_TOKEN"), }, }) + + fixture := getFixture() + // subscriber listens to SQS queue + req := pubsub.SubscribeRequest{Topic: fixture.queueName} + handler := func(ctx context.Context, msg *pubsub.NewMessage) error { + return nil + } + + err = snsq.Subscribe(req, handler) + assert.Nil(t, err) + + var topicArn *sqs.GetQueueUrlOutput + topicArn, err = getQueueURL(sess, &fixture.queueName) assert.Nil(t, err) - // snsq.Subscribe() + assert.NotNil(t, topicArn) + + // tear down callback + return func(t *testing.T) { + t.Log("teardown sub test") + + } +} + +func TestSnsSqs(t *testing.T) { + cases := []struct { + name string + a int + b int + expected int + }{ + {"add", 2, 2, 4}, + {"minus", 0, -2, -2}, + {"zero", 0, 0, 0}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + teardownSnsSqsTest := snsSqsTest(t) + defer teardownSnsSqsTest(t) + + // result := Sum(tc.a, tc.b) + // if result != tc.expected { + // t.Fatalf("expected sum %v, but got %v", tc.expected, result) + // } + }) + } +} + +func getQueueURL(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutput, error) { + // Create an SQS service client + svc := sqs.New(sess) + + result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: queueName, + }) + if err != nil { + return nil, err + } + + return result, nil +} + +// TestIntegrationGetSecret requires AWS specific environments for authentication AWS_DEFAULT_REGION AWS_ACCESS_KEY_ID, +// AWS_SECRET_ACCESS_KkEY and AWS_SESSION_TOKEN +func TestIntegrationCreateAllSnsSqs(t *testing.T) { assert.Nil(t, err) - assert.NotNil(t, response) + // assert.NotNil(t, response) } From 3cfa7045f6b5315ae3f679cbcd1d473342617157 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Wed, 4 Aug 2021 14:02:50 +0300 Subject: [PATCH 17/29] wip on testing --- pubsub/aws/snssqs/snssqs.go | 2 +- pubsub/aws/snssqs/snssqs_integ_test.go | 90 +++++++++++++++++++++----- 2 files changed, 75 insertions(+), 17 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 3d5e5cc1f8..6b1fb637af 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -65,7 +65,7 @@ type snsSqsMetadata struct { } const ( - awsSqsQueueNameKey = "dapr-queue-name" + awsSqsQueueNameKey = "dapr-worker-name" awsSnsTopicNameKey = "dapr-topic-name" awsSqsDeadLettersQueueName = "dapr-deadletters" ) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index 60c77fd409..a56bd4cd52 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -2,10 +2,14 @@ package snssqs import ( "context" + "fmt" "os" "testing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sqs" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -13,9 +17,8 @@ import ( ) func TestMain(m *testing.M) { - setup() + code := m.Run() - teardown() os.Exit(code) } @@ -33,6 +36,27 @@ func getFixture() *testFixture { } } +func newAWSSession(cfg *s3UploaderConfig) *session.Session { + var mySession *session.Session + sessionCfg := aws.NewConfig() + // Create a S3 client with additional configuration + if len(cfg.EndpointURL) != 0 { + sessionCfg.Endpoint = &cfg.EndpointURL + sessionCfg.Region = &cfg.Region + sessionCfg.Credentials = credentials.NewStaticCredentials("my-key", "my-secret", "") + sessionCfg.DisableSSL = aws.Bool(true) + + opts := session.Options{} + opts.Profile = "minio" + opts.Config = *sessionCfg + mySession = session.Must(session.NewSessionWithOptions(opts)) + } else { + sessionCfg.Region = aws.String(endpoints.UsEast1RegionID) + mySession = session.Must(session.NewSession(sessionCfg)) + } + return mySession +} + func setupTestCase(t *testing.T) func(t *testing.T) { t.Log("setup test case") return func(t *testing.T) { @@ -41,38 +65,70 @@ func setupTestCase(t *testing.T) func(t *testing.T) { } func snsSqsTest(t *testing.T) func(t *testing.T) { - t.Log("setup sub test") + fixture := getFixture() sess := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, + })) - snsq := NewSnsSqs(logger.NewLogger("test")) - err := snsq.Init(pubsub.Metadata{ + t.Log("setup sub test") + + snssqsClient := NewSnsSqs(logger.NewLogger("test")) + err := snssqsClient.Init(pubsub.Metadata{ Properties: map[string]string{ - "Region": os.Getenv("AWS_DEFAULT_REGION"), - "AccessKey": os.Getenv("AWS_ACCESS_KEY_ID"), - "SecretKey": os.Getenv("AWS_SECRET_ACCESS_KEY"), - "SessionToken": os.Getenv("AWS_SESSION_TOKEN"), + "region": os.Getenv("AWS_DEFAULT_REGION"), + "accessKey": os.Getenv("AWS_ACCESS_KEY_ID"), + "secretKey": os.Getenv("AWS_SECRET_ACCESS_KEY"), + "endpoint": os.Getenv("AWS_ENDPOINT_URL"), + "consumerID": fixture.queueName, }, }) - fixture := getFixture() // subscriber listens to SQS queue req := pubsub.SubscribeRequest{Topic: fixture.queueName} + msgs := make([]*pubsub.NewMessage, 1) handler := func(ctx context.Context, msg *pubsub.NewMessage) error { + msgs = append(msgs, msg) + return nil } - err = snsq.Subscribe(req, handler) + err = snssqsClient.Subscribe(req, handler) assert.Nil(t, err) - var topicArn *sqs.GetQueueUrlOutput - topicArn, err = getQueueURL(sess, &fixture.queueName) + var queueURL *sqs.GetQueueUrlOutput + queueURL, err = getQueueURL(sess, &fixture.queueName) + assert.Nil(t, err) + assert.NotNil(t, queueURL) + + publishReq := &pubsub.PublishRequest{Topic: fixture.topicName, PubsubName: "test", Data: []byte("string")} + err = snssqsClient.Publish(publishReq) assert.Nil(t, err) - assert.NotNil(t, topicArn) // tear down callback return func(t *testing.T) { + + svc := sqs.New(sess) + _, err := svc.DeleteQueue(&sqs.DeleteQueueInput{ + QueueUrl: queueURL.QueueUrl, + }) + + assert.Nil(t, err) + + snsSvc := sns.New(sess) + result, err := snsSvc.ListSubscriptions(nil) + if err != nil { + fmt.Println(err.Error()) + + os.Exit(1) + } + + for _, s := range result.Subscriptions { + fmt.Println(*s.SubscriptionArn) + fmt.Println(" " + *s.TopicArn) + fmt.Println("") + } + t.Log("teardown sub test") } @@ -107,8 +163,10 @@ func getQueueURL(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutp // Create an SQS service client svc := sqs.New(sess) + endpointAccountId := "000000000000" result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: queueName, + QueueName: queueName, + QueueOwnerAWSAccountId: &endpointAccountId, }) if err != nil { return nil, err @@ -121,6 +179,6 @@ func getQueueURL(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutp // AWS_SECRET_ACCESS_KkEY and AWS_SESSION_TOKEN func TestIntegrationCreateAllSnsSqs(t *testing.T) { - assert.Nil(t, err) + // assert.Nil(t, err) // assert.NotNil(t, response) } From aace0a151f4f6555e6bc6ed94fb4b9d5baca40b4 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Sun, 8 Aug 2021 21:53:06 +0300 Subject: [PATCH 18/29] wip --- pubsub/aws/snssqs/snssqs_integ_test.go | 299 ++++++++++++++++--------- 1 file changed, 198 insertions(+), 101 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index a56bd4cd52..0179a59014 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -11,81 +11,153 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sts" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/stretchr/testify/assert" ) -func TestMain(m *testing.M) { +type testFixture struct { + name string + endpoint string + region string + profile string + topicName string + deadLettersQueueName string + deadLettersMaxReceives string + queueName string + accessKey string + secretKey string +} +func TestMain(m *testing.M) { code := m.Run() os.Exit(code) } -type testFixture struct { - topicName string - deadLettersQueueName string - queueName string -} - func getFixture() *testFixture { return &testFixture{ - topicName: "dapr-sns-test-topic", - deadLettersQueueName: "dapr-sqs-test-deadletters-queue", - queueName: "dapr-sqs-test-queue", + region: os.Getenv("AWS_DEFAULT_REGION"), + accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), + secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), + endpoint: os.Getenv("AWS_ENDPOINT_URL"), + profile: "minio", + topicName: "dapr-sns-test-topic", + deadLettersQueueName: "dapr-sqs-test-deadletters-queue", + deadLettersMaxReceives: "9", + queueName: "dapr-sqs-test-queue", } } -func newAWSSession(cfg *s3UploaderConfig) *session.Session { +func newAWSSession(cfg *testFixture) *session.Session { + // run localstack and use the endpoint url: http://localhost:4566 by using the following cmd + // SERVICES=sns,sqs,sts DEBUG=1 localstack start var mySession *session.Session - sessionCfg := aws.NewConfig() - // Create a S3 client with additional configuration - if len(cfg.EndpointURL) != 0 { - sessionCfg.Endpoint = &cfg.EndpointURL - sessionCfg.Region = &cfg.Region - sessionCfg.Credentials = credentials.NewStaticCredentials("my-key", "my-secret", "") + sessionCfg := aws.NewConfig() + // Create a client with additional configuration + if len(cfg.endpoint) != 0 { + sessionCfg.Endpoint = &cfg.endpoint + sessionCfg.Region = &cfg.region + sessionCfg.Credentials = credentials.NewStaticCredentials(cfg.accessKey, cfg.secretKey, "") sessionCfg.DisableSSL = aws.Bool(true) - opts := session.Options{} - opts.Profile = "minio" - opts.Config = *sessionCfg + opts := session.Options{Profile: cfg.profile, Config: *sessionCfg} mySession = session.Must(session.NewSessionWithOptions(opts)) } else { - sessionCfg.Region = aws.String(endpoints.UsEast1RegionID) - mySession = session.Must(session.NewSession(sessionCfg)) + sessionCfg.Region = aws.String(cfg.region) + opts := session.Options{SharedConfigState: session.SharedConfigEnable, Config: *sessionCfg} + mySession = session.Must(session.NewSessionWithOptions(opts)) } + return mySession } -func setupTestCase(t *testing.T) func(t *testing.T) { - t.Log("setup test case") - return func(t *testing.T) { - t.Log("teardown test case") +func setupTest(t *testing.T, fixture *testFixture) (pubsub.PubSub, *session.Session) { + sess := newAWSSession(fixture) + assert.NotNil(t, sess) + t.Log("setup test") + + snssqsClient := NewSnsSqs(logger.NewLogger("test")) + assert.NotNil(t, snssqsClient) + + props := map[string]string{ + "region": fixture.region, + "accessKey": fixture.accessKey, + "secretKey": fixture.secretKey, + "endpoint": fixture.endpoint, + "consumerID": fixture.queueName, } + + if len(fixture.deadLettersQueueName) > 0 { + props["sqsDeadLettersQueueName"] = fixture.deadLettersQueueName + } + if len(fixture.deadLettersMaxReceives) > 0 { + props["deadLettersMaxReceives"] = fixture.deadLettersMaxReceives + } + + pubsubMetadata := pubsub.Metadata{Properties: props} + + err := snssqsClient.Init(pubsubMetadata) + assert.Nil(t, err) + + return snssqsClient, sess } -func snsSqsTest(t *testing.T) func(t *testing.T) { - fixture := getFixture() - sess := session.Must(session.NewSessionWithOptions(session.Options{ - SharedConfigState: session.SharedConfigEnable, - - })) +func getAccountId(sess *session.Session) (*sts.GetCallerIdentityOutput, error) { + svc := sts.New(sess) + input := &sts.GetCallerIdentityInput{} - t.Log("setup sub test") + result, err := svc.GetCallerIdentity(input) + if err != nil { + return nil, err + } - snssqsClient := NewSnsSqs(logger.NewLogger("test")) - err := snssqsClient.Init(pubsub.Metadata{ - Properties: map[string]string{ - "region": os.Getenv("AWS_DEFAULT_REGION"), - "accessKey": os.Getenv("AWS_ACCESS_KEY_ID"), - "secretKey": os.Getenv("AWS_SECRET_ACCESS_KEY"), - "endpoint": os.Getenv("AWS_ENDPOINT_URL"), - "consumerID": fixture.queueName, - }, + return result, err +} + +func getQueueUrl(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutput, error) { + // Get the account ID + accountResult, aErr := getAccountId(sess) + if aErr != nil { + return nil, aErr + } + + // Create an SQS service client + svc := sqs.New(sess) + result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: queueName, + QueueOwnerAWSAccountId: accountResult.Account, }) + if err != nil { + return nil, err + } + + return result, nil +} + +func teardownSns(t *testing.T, sess *session.Session, fixture *testFixture) { + snsSvc := sns.New(sess) + result, err := snsSvc.ListTopics(nil) + assert.Nil(t, err) + assert.NotNil(t, result) + + var accountId *sts.GetCallerIdentityOutput + accountId, err = getAccountId(sess) + assert.Nil(t, err) + assert.NotNil(t, accountId) - // subscriber listens to SQS queue - req := pubsub.SubscribeRequest{Topic: fixture.queueName} + lookupTopicArn := fmt.Sprintf("arn:aws:sns:%v:%v:%v", fixture.region, *accountId.Account, fixture.topicName) + for _, topic := range result.Topics { + if *topic.TopicArn == lookupTopicArn { + snsSvc.DeleteTopic(&sns.DeleteTopicInput{TopicArn: topic.TopicArn}) + } + } +} + +func snsSqsTest(t *testing.T, sess *session.Session, snssqsClient pubsub.PubSub, fixture *testFixture) func(t *testing.T) { + // subscriber registers to listen to (SNS) topic which eventually land on the fixture.queueName + // over (SQS) queue + req := pubsub.SubscribeRequest{Topic: fixture.topicName} msgs := make([]*pubsub.NewMessage, 1) handler := func(ctx context.Context, msg *pubsub.NewMessage) error { msgs = append(msgs, msg) @@ -93,92 +165,117 @@ func snsSqsTest(t *testing.T) func(t *testing.T) { return nil } - err = snssqsClient.Subscribe(req, handler) + err := snssqsClient.Subscribe(req, handler) assert.Nil(t, err) var queueURL *sqs.GetQueueUrlOutput - queueURL, err = getQueueURL(sess, &fixture.queueName) + queueURL, err = getQueueUrl(sess, &fixture.queueName) assert.Nil(t, err) assert.NotNil(t, queueURL) publishReq := &pubsub.PublishRequest{Topic: fixture.topicName, PubsubName: "test", Data: []byte("string")} err = snssqsClient.Publish(publishReq) assert.Nil(t, err) + assert.Len(t, msgs, 1) // tear down callback return func(t *testing.T) { - - svc := sqs.New(sess) - _, err := svc.DeleteQueue(&sqs.DeleteQueueInput{ + sqsSvc := sqs.New(sess) + _, err := sqsSvc.DeleteQueue(&sqs.DeleteQueueInput{ QueueUrl: queueURL.QueueUrl, }) + assert.Nil(t, err) + teardownSns(t, sess, fixture) + t.Log("teardown test") + } +} + +func snsSqsDeadlettersTest(t *testing.T, sess *session.Session, snssqsClient pubsub.PubSub, fixture *testFixture) func(t *testing.T) { + // subscriber's handlers always fails to process message forcing dead letters queue to + req := pubsub.SubscribeRequest{Topic: fixture.topicName} + handler := func(ctx context.Context, msg *pubsub.NewMessage) error { + return fmt.Errorf("failure to receive - dead letters tests") + } + + err := snssqsClient.Subscribe(req, handler) + assert.Nil(t, err) + + var queueURL *sqs.GetQueueUrlOutput + queueURL, err = getQueueUrl(sess, &fixture.queueName) + assert.Nil(t, err) + assert.NotNil(t, queueURL) + + publishReq := &pubsub.PublishRequest{Topic: fixture.topicName, PubsubName: "test", Data: []byte("string")} + err = snssqsClient.Publish(publishReq) + assert.Nil(t, err) + + // tear down callback + return func(t *testing.T) { + sqsSvc := sqs.New(sess) + dlQueueURL, err := getQueueUrl(sess, &fixture.deadLettersQueueName) assert.Nil(t, err) - snsSvc := sns.New(sess) - result, err := snsSvc.ListSubscriptions(nil) - if err != nil { - fmt.Println(err.Error()) + var output *sqs.ReceiveMessageOutput + output, err = sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: dlQueueURL.QueueUrl}) + assert.Nil(t, err) + assert.NotNil(t, output) - os.Exit(1) - } + _, err = sqsSvc.DeleteQueue(&sqs.DeleteQueueInput{ + QueueUrl: queueURL.QueueUrl, + }) + assert.Nil(t, err) - for _, s := range result.Subscriptions { - fmt.Println(*s.SubscriptionArn) - fmt.Println(" " + *s.TopicArn) - fmt.Println("") - } + _, err = sqsSvc.DeleteQueue(&sqs.DeleteQueueInput{ + QueueUrl: dlQueueURL.QueueUrl, + }) + assert.Nil(t, err) - t.Log("teardown sub test") + teardownSns(t, sess, fixture) + t.Log("teardown test") } } func TestSnsSqs(t *testing.T) { - cases := []struct { - name string - a int - b int - expected int - }{ - {"add", 2, 2, 4}, - {"minus", 0, -2, -2}, - {"zero", 0, 0, 0}, - } - - for _, tc := range cases { + fixtures := []testFixture{ + { + name: "with dead letters", + region: os.Getenv("AWS_DEFAULT_REGION"), + accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), + secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), + endpoint: os.Getenv("AWS_ENDPOINT_URL"), + profile: "minio", + topicName: "dapr-sns-test-topic", + deadLettersQueueName: "dapr-sqs-test-deadletters-queue", + deadLettersMaxReceives: "1", + queueName: "dapr-sqs-test-queue", + }, + { + name: "without dead letters", + region: os.Getenv("AWS_DEFAULT_REGION"), + accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), + secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), + endpoint: os.Getenv("AWS_ENDPOINT_URL"), + profile: "minio", + topicName: "dapr-sns-test-topic", + queueName: "dapr-sqs-test-queue", + }, + } + + for _, tc := range fixtures { t.Run(tc.name, func(t *testing.T) { - teardownSnsSqsTest := snsSqsTest(t) + client, sess := setupTest(t, &tc) + teardownSnsSqsTest := snsSqsTest(t, sess, client, &tc) defer teardownSnsSqsTest(t) - - // result := Sum(tc.a, tc.b) - // if result != tc.expected { - // t.Fatalf("expected sum %v, but got %v", tc.expected, result) - // } }) } -} - -func getQueueURL(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutput, error) { - // Create an SQS service client - svc := sqs.New(sess) - endpointAccountId := "000000000000" - result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: queueName, - QueueOwnerAWSAccountId: &endpointAccountId, - }) - if err != nil { - return nil, err + for _, tc := range fixtures { + t.Run(tc.name, func(t *testing.T) { + client, sess := setupTest(t, &tc) + teardownSnsSqsTest := snsSqsDeadlettersTest(t, sess, client, &tc) + defer teardownSnsSqsTest(t) + }) } - - return result, nil -} - -// TestIntegrationGetSecret requires AWS specific environments for authentication AWS_DEFAULT_REGION AWS_ACCESS_KEY_ID, -// AWS_SECRET_ACCESS_KkEY and AWS_SESSION_TOKEN -func TestIntegrationCreateAllSnsSqs(t *testing.T) { - - // assert.Nil(t, err) - // assert.NotNil(t, response) } From f36e268be50d620384117d0eb0e0b9b19217b845 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Sun, 8 Aug 2021 22:18:24 +0300 Subject: [PATCH 19/29] still buggy but wip! --- pubsub/aws/snssqs/snssqs_integ_test.go | 48 +++++++++++++++----------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index 0179a59014..190de76737 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -135,9 +135,29 @@ func getQueueUrl(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutp return result, nil } +func teardownSqs(t *testing.T, sess *session.Session, fixture *testFixture) { + svc := sqs.New(sess) + + queueUrl, err := getQueueUrl(sess, &fixture.queueName) + _, err = svc.DeleteQueue(&sqs.DeleteQueueInput{ + QueueUrl: queueUrl.QueueUrl, + }) + assert.Nil(t, err) + + var dlQueueUrl *sqs.GetQueueUrlOutput + dlQueueUrl, err = getQueueUrl(sess, &fixture.deadLettersQueueName) + if err != nil { + return + } + + svc.DeleteQueue(&sqs.DeleteQueueInput{ + QueueUrl: dlQueueUrl.QueueUrl, + }) +} + func teardownSns(t *testing.T, sess *session.Session, fixture *testFixture) { - snsSvc := sns.New(sess) - result, err := snsSvc.ListTopics(nil) + svc := sns.New(sess) + result, err := svc.ListTopics(nil) assert.Nil(t, err) assert.NotNil(t, result) @@ -149,7 +169,8 @@ func teardownSns(t *testing.T, sess *session.Session, fixture *testFixture) { lookupTopicArn := fmt.Sprintf("arn:aws:sns:%v:%v:%v", fixture.region, *accountId.Account, fixture.topicName) for _, topic := range result.Topics { if *topic.TopicArn == lookupTopicArn { - snsSvc.DeleteTopic(&sns.DeleteTopicInput{TopicArn: topic.TopicArn}) + _, err = svc.DeleteTopic(&sns.DeleteTopicInput{TopicArn: topic.TopicArn}) + assert.Nil(t, err) } } } @@ -180,12 +201,7 @@ func snsSqsTest(t *testing.T, sess *session.Session, snssqsClient pubsub.PubSub, // tear down callback return func(t *testing.T) { - sqsSvc := sqs.New(sess) - _, err := sqsSvc.DeleteQueue(&sqs.DeleteQueueInput{ - QueueUrl: queueURL.QueueUrl, - }) - assert.Nil(t, err) - + teardownSqs(t, sess, fixture) teardownSns(t, sess, fixture) t.Log("teardown test") } @@ -219,18 +235,10 @@ func snsSqsDeadlettersTest(t *testing.T, sess *session.Session, snssqsClient pub var output *sqs.ReceiveMessageOutput output, err = sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: dlQueueURL.QueueUrl}) assert.Nil(t, err) - assert.NotNil(t, output) - - _, err = sqsSvc.DeleteQueue(&sqs.DeleteQueueInput{ - QueueUrl: queueURL.QueueUrl, - }) - assert.Nil(t, err) - - _, err = sqsSvc.DeleteQueue(&sqs.DeleteQueueInput{ - QueueUrl: dlQueueURL.QueueUrl, - }) - assert.Nil(t, err) + assert.NotNil(t, output.Messages) + assert.Len(t, output.Messages, 1) + teardownSqs(t, sess, fixture) teardownSns(t, sess, fixture) t.Log("teardown test") From eaf769e961b5ff719b549178953c9dbb8af402fc Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 9 Aug 2021 00:14:02 +0300 Subject: [PATCH 20/29] bugfix in dlq creation --- pubsub/aws/snssqs/snssqs.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 6b1fb637af..b5cd2f2cc5 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -567,7 +567,13 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) s.updateQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo, sqsSetQueueAttributesInput) } - s.sqsClient.SetQueueAttributesRequest(sqsSetQueueAttributesInput) + // apply the dead letters queue attributes to the current queue + _, aerr := s.sqsClient.SetQueueAttributes(sqsSetQueueAttributesInput) + if aerr != nil { + s.logger.Errorf("error updating queue attributes with dead-letter queue: %v", aerr) + + return aerr + } // subscription creation is idempotent. Subscriptions are unique by topic/queue subscribeOutput, err := s.snsClient.Subscribe(&sns.SubscribeInput{ Attributes: nil, From e88ae910288233a05f21619df7030f4b2039e44c Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 9 Aug 2021 00:51:28 +0300 Subject: [PATCH 21/29] working. still bug in subscription clean up --- pubsub/aws/snssqs/snssqs.go | 34 ++++++++++++-------- pubsub/aws/snssqs/snssqs_integ_test.go | 43 ++++++++++++++++---------- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index b5cd2f2cc5..3b2ee4f03c 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -511,7 +511,7 @@ func (s *snsSqs) createDeadLettersQueue() (*sqsQueueInfo, error) { return deadLettersQueueInfo, nil } -func (s *snsSqs) updateQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo *sqsQueueInfo, sqsSetQueueAttributesInput *sqs.SetQueueAttributesInput) error { +func (s *snsSqs) createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo *sqsQueueInfo) (*sqs.SetQueueAttributesInput, error) { policy := map[string]string{ "deadLetterTargetArn": deadLettersQueueInfo.arn, "maxReceiveCount": strconv.FormatInt(s.metadata.deadLettersMaxReceives, 10), @@ -521,15 +521,17 @@ func (s *snsSqs) updateQueueAttributesWithDeadLetters(queueInfo, deadLettersQueu if err != nil { s.logger.Errorf("error marshalling dead-letters queue policy: %v", err) - return err + return nil, err } - sqsSetQueueAttributesInput.QueueUrl = &queueInfo.url - sqsSetQueueAttributesInput.Attributes = map[string]*string{ - sqs.QueueAttributeNameRedrivePolicy: aws.String(string(b)), + sqsSetQueueAttributesInput := &sqs.SetQueueAttributesInput{ + QueueUrl: &queueInfo.url, + Attributes: map[string]*string{ + sqs.QueueAttributeNameRedrivePolicy: aws.String(string(b)), + }, } - return nil + return sqsSetQueueAttributesInput, nil } func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { @@ -553,7 +555,6 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } var deadLettersQueueInfo *sqsQueueInfo = nil - sqsSetQueueAttributesInput := &sqs.SetQueueAttributesInput{} if len(s.metadata.sqsDeadLettersQueueName) > 0 { var err error @@ -564,16 +565,23 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) return err } - s.updateQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo, sqsSetQueueAttributesInput) + var sqsSetQueueAttributesInput *sqs.SetQueueAttributesInput + sqsSetQueueAttributesInput, err = s.createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo) + if err != nil { + s.logger.Errorf("error creatubg queue attributes for dead-letter queue: %v", err) + + return err + } + _, err = s.sqsClient.SetQueueAttributes(sqsSetQueueAttributesInput) + if err != nil { + s.logger.Errorf("error updating queue attributes with dead-letter queue: %v", err) + + return err + } } // apply the dead letters queue attributes to the current queue - _, aerr := s.sqsClient.SetQueueAttributes(sqsSetQueueAttributesInput) - if aerr != nil { - s.logger.Errorf("error updating queue attributes with dead-letter queue: %v", aerr) - return aerr - } // subscription creation is idempotent. Subscriptions are unique by topic/queue subscribeOutput, err := s.snsClient.Subscribe(&sns.SubscribeInput{ Attributes: nil, diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index 190de76737..1c80876ad6 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "os" + "strconv" "testing" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -147,7 +149,7 @@ func teardownSqs(t *testing.T, sess *session.Session, fixture *testFixture) { var dlQueueUrl *sqs.GetQueueUrlOutput dlQueueUrl, err = getQueueUrl(sess, &fixture.deadLettersQueueName) if err != nil { - return + return } svc.DeleteQueue(&sqs.DeleteQueueInput{ @@ -203,6 +205,7 @@ func snsSqsTest(t *testing.T, sess *session.Session, snssqsClient pubsub.PubSub, return func(t *testing.T) { teardownSqs(t, sess, fixture) teardownSns(t, sess, fixture) + t.Log("teardown test") } } @@ -232,8 +235,9 @@ func snsSqsDeadlettersTest(t *testing.T, sess *session.Session, snssqsClient pub dlQueueURL, err := getQueueUrl(sess, &fixture.deadLettersQueueName) assert.Nil(t, err) + waitTimeSeconds := int64(10) var output *sqs.ReceiveMessageOutput - output, err = sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: dlQueueURL.QueueUrl}) + output, err = sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: dlQueueURL.QueueUrl, WaitTimeSeconds: &waitTimeSeconds}) assert.Nil(t, err) assert.NotNil(t, output.Messages) assert.Len(t, output.Messages, 1) @@ -246,19 +250,9 @@ func snsSqsDeadlettersTest(t *testing.T, sess *session.Session, snssqsClient pub } func TestSnsSqs(t *testing.T) { + timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) fixtures := []testFixture{ - { - name: "with dead letters", - region: os.Getenv("AWS_DEFAULT_REGION"), - accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), - secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), - endpoint: os.Getenv("AWS_ENDPOINT_URL"), - profile: "minio", - topicName: "dapr-sns-test-topic", - deadLettersQueueName: "dapr-sqs-test-deadletters-queue", - deadLettersMaxReceives: "1", - queueName: "dapr-sqs-test-queue", - }, + { name: "without dead letters", region: os.Getenv("AWS_DEFAULT_REGION"), @@ -266,8 +260,8 @@ func TestSnsSqs(t *testing.T) { secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), endpoint: os.Getenv("AWS_ENDPOINT_URL"), profile: "minio", - topicName: "dapr-sns-test-topic", - queueName: "dapr-sqs-test-queue", + topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), + queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), }, } @@ -279,6 +273,20 @@ func TestSnsSqs(t *testing.T) { }) } + timestamp = strconv.FormatInt(time.Now().UTC().UnixNano(), 10) + fixtures = []testFixture{{ + name: "with dead letters", + region: os.Getenv("AWS_DEFAULT_REGION"), + accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), + secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), + endpoint: os.Getenv("AWS_ENDPOINT_URL"), + profile: "minio", + topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), + deadLettersQueueName: fmt.Sprintf("dapr-sqs-test-deadletters-queue-%v", timestamp), + queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), + deadLettersMaxReceives: "1", + }} + for _, tc := range fixtures { t.Run(tc.name, func(t *testing.T) { client, sess := setupTest(t, &tc) @@ -287,3 +295,6 @@ func TestSnsSqs(t *testing.T) { }) } } + +// TODO split the above to 2 tests +// TODO delete subscription not working From 06c2ff2d16e0af8515fd4a44bb805097eff5c645 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 9 Aug 2021 20:36:11 +0300 Subject: [PATCH 22/29] Update snssqs_integ_test.go --- pubsub/aws/snssqs/snssqs_integ_test.go | 71 ++++++++++++-------------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index 1c80876ad6..f470cd3960 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -32,25 +32,6 @@ type testFixture struct { secretKey string } -func TestMain(m *testing.M) { - code := m.Run() - os.Exit(code) -} - -func getFixture() *testFixture { - return &testFixture{ - region: os.Getenv("AWS_DEFAULT_REGION"), - accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), - secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), - endpoint: os.Getenv("AWS_ENDPOINT_URL"), - profile: "minio", - topicName: "dapr-sns-test-topic", - deadLettersQueueName: "dapr-sqs-test-deadletters-queue", - deadLettersMaxReceives: "9", - queueName: "dapr-sqs-test-queue", - } -} - func newAWSSession(cfg *testFixture) *session.Session { // run localstack and use the endpoint url: http://localhost:4566 by using the following cmd // SERVICES=sns,sqs,sts DEBUG=1 localstack start @@ -141,6 +122,9 @@ func teardownSqs(t *testing.T, sess *session.Session, fixture *testFixture) { svc := sqs.New(sess) queueUrl, err := getQueueUrl(sess, &fixture.queueName) + assert.Nil(t, err) + assert.NotNil(t, queueUrl) + _, err = svc.DeleteQueue(&sqs.DeleteQueueInput{ QueueUrl: queueUrl.QueueUrl, }) @@ -148,6 +132,8 @@ func teardownSqs(t *testing.T, sess *session.Session, fixture *testFixture) { var dlQueueUrl *sqs.GetQueueUrlOutput dlQueueUrl, err = getQueueUrl(sess, &fixture.deadLettersQueueName) + // err would exist if no dead-letter queue exist, which might be the case + // in some tests if err != nil { return } @@ -171,6 +157,10 @@ func teardownSns(t *testing.T, sess *session.Session, fixture *testFixture) { lookupTopicArn := fmt.Sprintf("arn:aws:sns:%v:%v:%v", fixture.region, *accountId.Account, fixture.topicName) for _, topic := range result.Topics { if *topic.TopicArn == lookupTopicArn { + // deletes topic + // currently there is a bug in aws-go-sdk that results in the subscription not being + // deleted along with the topic (as should be) so the subscription needs + // to be manually deleted _, err = svc.DeleteTopic(&sns.DeleteTopicInput{TopicArn: topic.TopicArn}) assert.Nil(t, err) } @@ -249,20 +239,25 @@ func snsSqsDeadlettersTest(t *testing.T, sess *session.Session, snssqsClient pub } } +func TestMain(m *testing.M) { + code := m.Run() + os.Exit(code) +} + func TestSnsSqs(t *testing.T) { timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) fixtures := []testFixture{ - { name: "without dead letters", region: os.Getenv("AWS_DEFAULT_REGION"), accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), endpoint: os.Getenv("AWS_ENDPOINT_URL"), - profile: "minio", + profile: os.Getenv("AWS_PROFILE"), topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), }, + // expand to other fixtures if needed } for _, tc := range fixtures { @@ -272,20 +267,25 @@ func TestSnsSqs(t *testing.T) { defer teardownSnsSqsTest(t) }) } +} - timestamp = strconv.FormatInt(time.Now().UTC().UnixNano(), 10) - fixtures = []testFixture{{ - name: "with dead letters", - region: os.Getenv("AWS_DEFAULT_REGION"), - accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), - secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), - endpoint: os.Getenv("AWS_ENDPOINT_URL"), - profile: "minio", - topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), - deadLettersQueueName: fmt.Sprintf("dapr-sqs-test-deadletters-queue-%v", timestamp), - queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), - deadLettersMaxReceives: "1", - }} +func TestSnsSqsWithDLQ(t *testing.T) { + timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) + fixtures := []testFixture{ + { + name: "with dead letters", + region: os.Getenv("AWS_DEFAULT_REGION"), + accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), + secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), + endpoint: os.Getenv("AWS_ENDPOINT_URL"), + profile: os.Getenv("AWS_PROFILE"), + topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), + deadLettersQueueName: fmt.Sprintf("dapr-sqs-test-deadletters-queue-%v", timestamp), + queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), + deadLettersMaxReceives: "1", + }, + // expand to other fixtures if needed + } for _, tc := range fixtures { t.Run(tc.name, func(t *testing.T) { @@ -295,6 +295,3 @@ func TestSnsSqs(t *testing.T) { }) } } - -// TODO split the above to 2 tests -// TODO delete subscription not working From 207c2034a7df02fb7048adac29d2a9b330be4c39 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Mon, 9 Aug 2021 22:03:29 +0300 Subject: [PATCH 23/29] golangci-lint fixes --- pubsub/aws/snssqs/snssqs.go | 3 +- pubsub/aws/snssqs/snssqs_integ_test.go | 44 +++++++++++++------------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 3b2ee4f03c..11e0ebb180 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -547,7 +547,8 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) } // this is the ID of the application, it is supplied via runtime as "consumerID" - queueInfo, err := s.getOrCreateQueue(s.metadata.sqsQueueName) + var queueInfo *sqsQueueInfo + queueInfo, err = s.getOrCreateQueue(s.metadata.sqsQueueName) if err != nil { s.logger.Errorf("error retrieving SQS queue: %v", err) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index f470cd3960..86acef6ef9 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -86,7 +86,7 @@ func setupTest(t *testing.T, fixture *testFixture) (pubsub.PubSub, *session.Sess return snssqsClient, sess } -func getAccountId(sess *session.Session) (*sts.GetCallerIdentityOutput, error) { +func getAccountID(sess *session.Session) (*sts.GetCallerIdentityOutput, error) { svc := sts.New(sess) input := &sts.GetCallerIdentityInput{} @@ -98,9 +98,9 @@ func getAccountId(sess *session.Session) (*sts.GetCallerIdentityOutput, error) { return result, err } -func getQueueUrl(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutput, error) { +func getQueueURL(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutput, error) { // Get the account ID - accountResult, aErr := getAccountId(sess) + accountResult, aErr := getAccountID(sess) if aErr != nil { return nil, aErr } @@ -121,17 +121,17 @@ func getQueueUrl(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutp func teardownSqs(t *testing.T, sess *session.Session, fixture *testFixture) { svc := sqs.New(sess) - queueUrl, err := getQueueUrl(sess, &fixture.queueName) + queueURL, err := getQueueURL(sess, &fixture.queueName) assert.Nil(t, err) - assert.NotNil(t, queueUrl) + assert.NotNil(t, queueURL) _, err = svc.DeleteQueue(&sqs.DeleteQueueInput{ - QueueUrl: queueUrl.QueueUrl, + QueueUrl: queueURL.QueueUrl, }) assert.Nil(t, err) - var dlQueueUrl *sqs.GetQueueUrlOutput - dlQueueUrl, err = getQueueUrl(sess, &fixture.deadLettersQueueName) + var dlQueueURL *sqs.GetQueueUrlOutput + dlQueueURL, err = getQueueURL(sess, &fixture.deadLettersQueueName) // err would exist if no dead-letter queue exist, which might be the case // in some tests if err != nil { @@ -139,7 +139,7 @@ func teardownSqs(t *testing.T, sess *session.Session, fixture *testFixture) { } svc.DeleteQueue(&sqs.DeleteQueueInput{ - QueueUrl: dlQueueUrl.QueueUrl, + QueueUrl: dlQueueURL.QueueUrl, }) } @@ -149,12 +149,12 @@ func teardownSns(t *testing.T, sess *session.Session, fixture *testFixture) { assert.Nil(t, err) assert.NotNil(t, result) - var accountId *sts.GetCallerIdentityOutput - accountId, err = getAccountId(sess) + var accountID *sts.GetCallerIdentityOutput + accountID, err = getAccountID(sess) assert.Nil(t, err) - assert.NotNil(t, accountId) + assert.NotNil(t, accountID) - lookupTopicArn := fmt.Sprintf("arn:aws:sns:%v:%v:%v", fixture.region, *accountId.Account, fixture.topicName) + lookupTopicArn := fmt.Sprintf("arn:aws:sns:%v:%v:%v", fixture.region, *accountID.Account, fixture.topicName) for _, topic := range result.Topics { if *topic.TopicArn == lookupTopicArn { // deletes topic @@ -182,7 +182,7 @@ func snsSqsTest(t *testing.T, sess *session.Session, snssqsClient pubsub.PubSub, assert.Nil(t, err) var queueURL *sqs.GetQueueUrlOutput - queueURL, err = getQueueUrl(sess, &fixture.queueName) + queueURL, err = getQueueURL(sess, &fixture.queueName) assert.Nil(t, err) assert.NotNil(t, queueURL) @@ -211,7 +211,7 @@ func snsSqsDeadlettersTest(t *testing.T, sess *session.Session, snssqsClient pub assert.Nil(t, err) var queueURL *sqs.GetQueueUrlOutput - queueURL, err = getQueueUrl(sess, &fixture.queueName) + queueURL, err = getQueueURL(sess, &fixture.queueName) assert.Nil(t, err) assert.NotNil(t, queueURL) @@ -222,7 +222,7 @@ func snsSqsDeadlettersTest(t *testing.T, sess *session.Session, snssqsClient pub // tear down callback return func(t *testing.T) { sqsSvc := sqs.New(sess) - dlQueueURL, err := getQueueUrl(sess, &fixture.deadLettersQueueName) + dlQueueURL, err := getQueueURL(sess, &fixture.deadLettersQueueName) assert.Nil(t, err) waitTimeSeconds := int64(10) @@ -246,7 +246,7 @@ func TestMain(m *testing.M) { func TestSnsSqs(t *testing.T) { timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) - fixtures := []testFixture{ + fixtures := []*testFixture{ { name: "without dead letters", region: os.Getenv("AWS_DEFAULT_REGION"), @@ -262,8 +262,8 @@ func TestSnsSqs(t *testing.T) { for _, tc := range fixtures { t.Run(tc.name, func(t *testing.T) { - client, sess := setupTest(t, &tc) - teardownSnsSqsTest := snsSqsTest(t, sess, client, &tc) + client, sess := setupTest(t, tc) + teardownSnsSqsTest := snsSqsTest(t, sess, client, tc) defer teardownSnsSqsTest(t) }) } @@ -271,7 +271,7 @@ func TestSnsSqs(t *testing.T) { func TestSnsSqsWithDLQ(t *testing.T) { timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) - fixtures := []testFixture{ + fixtures := []*testFixture{ { name: "with dead letters", region: os.Getenv("AWS_DEFAULT_REGION"), @@ -289,8 +289,8 @@ func TestSnsSqsWithDLQ(t *testing.T) { for _, tc := range fixtures { t.Run(tc.name, func(t *testing.T) { - client, sess := setupTest(t, &tc) - teardownSnsSqsTest := snsSqsDeadlettersTest(t, sess, client, &tc) + client, sess := setupTest(t, tc) + teardownSnsSqsTest := snsSqsDeadlettersTest(t, sess, client, tc) defer teardownSnsSqsTest(t) }) } From 2b29a40311a1b7975073793889bbf8c3592fe5ee Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Tue, 10 Aug 2021 14:58:21 +0300 Subject: [PATCH 24/29] golangci-lint refactoring --- pubsub/aws/snssqs/snssqs.go | 35 +++++++++++++------------- pubsub/aws/snssqs/snssqs_integ_test.go | 15 +++++------ 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 11e0ebb180..0174353030 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -91,7 +91,7 @@ func getAliasedProperty(aliases []string, metadata pubsub.Metadata) (string, boo func parseInt64(input string, propertyName string) (int64, error) { number, err := strconv.Atoi(input) if err != nil { - return -1, fmt.Errorf("parsing %s failed with: %v", propertyName, err) + return -1, fmt.Errorf("parsing %s failed with: %w", propertyName, err) } return int64(number), nil @@ -444,7 +444,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo, deadLettersQueue err = json.Unmarshal([]byte(*(message.Body)), &messageBody) if err != nil { - return fmt.Errorf("error unmarshalling message: %v", err) + return fmt.Errorf("error unmarshalling message: %w", err) } topic := parseTopicArn(messageBody.TopicArn) @@ -455,7 +455,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo, deadLettersQueue }) if err != nil { - return fmt.Errorf("error handling message: %v", err) + return fmt.Errorf("error handling message: %w", err) } // otherwise, there was no error, acknowledge the message @@ -555,29 +555,28 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) return err } - var deadLettersQueueInfo *sqsQueueInfo = nil - + var deadLettersQueueInfo *sqsQueueInfo if len(s.metadata.sqsDeadLettersQueueName) > 0 { - var err error - deadLettersQueueInfo, err = s.createDeadLettersQueue() - if err != nil { - s.logger.Errorf("error creating dead-letter queue: %v", err) + var derr error + deadLettersQueueInfo, derr = s.createDeadLettersQueue() + if derr != nil { + s.logger.Errorf("error creating dead-letter queue: %v", derr) - return err + return derr } var sqsSetQueueAttributesInput *sqs.SetQueueAttributesInput - sqsSetQueueAttributesInput, err = s.createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo) - if err != nil { - s.logger.Errorf("error creatubg queue attributes for dead-letter queue: %v", err) + sqsSetQueueAttributesInput, derr = s.createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo) + if derr != nil { + s.logger.Errorf("error creatubg queue attributes for dead-letter queue: %v", derr) - return err + return derr } - _, err = s.sqsClient.SetQueueAttributes(sqsSetQueueAttributesInput) - if err != nil { - s.logger.Errorf("error updating queue attributes with dead-letter queue: %v", err) + _, derr = s.sqsClient.SetQueueAttributes(sqsSetQueueAttributesInput) + if derr != nil { + s.logger.Errorf("error updating queue attributes with dead-letter queue: %v", derr) - return err + return derr } } diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index 86acef6ef9..ae7748eab4 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" @@ -86,7 +87,7 @@ func setupTest(t *testing.T, fixture *testFixture) (pubsub.PubSub, *session.Sess return snssqsClient, sess } -func getAccountID(sess *session.Session) (*sts.GetCallerIdentityOutput, error) { +func getAccountID(sess client.ConfigProvider) (*sts.GetCallerIdentityOutput, error) { svc := sts.New(sess) input := &sts.GetCallerIdentityInput{} @@ -98,7 +99,7 @@ func getAccountID(sess *session.Session) (*sts.GetCallerIdentityOutput, error) { return result, err } -func getQueueURL(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutput, error) { +func getQueueURL(sess client.ConfigProvider, queueName *string) (*sqs.GetQueueUrlOutput, error) { // Get the account ID accountResult, aErr := getAccountID(sess) if aErr != nil { @@ -118,7 +119,7 @@ func getQueueURL(sess *session.Session, queueName *string) (*sqs.GetQueueUrlOutp return result, nil } -func teardownSqs(t *testing.T, sess *session.Session, fixture *testFixture) { +func teardownSqs(t *testing.T, sess client.ConfigProvider, fixture *testFixture) { svc := sqs.New(sess) queueURL, err := getQueueURL(sess, &fixture.queueName) @@ -143,7 +144,7 @@ func teardownSqs(t *testing.T, sess *session.Session, fixture *testFixture) { }) } -func teardownSns(t *testing.T, sess *session.Session, fixture *testFixture) { +func teardownSns(t *testing.T, sess client.ConfigProvider, fixture *testFixture) { svc := sns.New(sess) result, err := svc.ListTopics(nil) assert.Nil(t, err) @@ -167,11 +168,11 @@ func teardownSns(t *testing.T, sess *session.Session, fixture *testFixture) { } } -func snsSqsTest(t *testing.T, sess *session.Session, snssqsClient pubsub.PubSub, fixture *testFixture) func(t *testing.T) { +func snsSqsTest(t *testing.T, sess client.ConfigProvider, snssqsClient pubsub.PubSub, fixture *testFixture) func(t *testing.T) { // subscriber registers to listen to (SNS) topic which eventually land on the fixture.queueName // over (SQS) queue req := pubsub.SubscribeRequest{Topic: fixture.topicName} - msgs := make([]*pubsub.NewMessage, 1) + msgs := []*pubsub.NewMessage{} handler := func(ctx context.Context, msg *pubsub.NewMessage) error { msgs = append(msgs, msg) @@ -200,7 +201,7 @@ func snsSqsTest(t *testing.T, sess *session.Session, snssqsClient pubsub.PubSub, } } -func snsSqsDeadlettersTest(t *testing.T, sess *session.Session, snssqsClient pubsub.PubSub, fixture *testFixture) func(t *testing.T) { +func snsSqsDeadlettersTest(t *testing.T, sess client.ConfigProvider, snssqsClient pubsub.PubSub, fixture *testFixture) func(t *testing.T) { // subscriber's handlers always fails to process message forcing dead letters queue to req := pubsub.SubscribeRequest{Topic: fixture.topicName} handler := func(ctx context.Context, msg *pubsub.NewMessage) error { From 98222d9ce115596949075b40375c9e0910cbf271 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Tue, 10 Aug 2021 15:13:21 +0300 Subject: [PATCH 25/29] trying to skip running integrations for snssqs --- pubsub/aws/snssqs/snssqs_integ_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index ae7748eab4..05a3b565ac 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -1,3 +1,9 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ +// +build integration + package snssqs import ( From a4b5c2696cc008050d2312460b165d12f3425756 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Tue, 10 Aug 2021 16:51:30 +0300 Subject: [PATCH 26/29] testing * skip integration test if no AWS related envvars are set (skip in CI) * parallel testing in unittests --- pubsub/aws/snssqs/snssqs.go | 2 +- pubsub/aws/snssqs/snssqs_integ_test.go | 102 +++++++++++++------------ pubsub/aws/snssqs/snssqs_test.go | 11 +++ 3 files changed, 66 insertions(+), 49 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 0174353030..5b39c2efae 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -426,7 +426,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo, deadLettersQueue // if we are over the allowable retry limit, and there is no dead-letters queue, delete the message from the queue. if deadLettersQueueInfo == nil && recvCountInt >= s.metadata.messageRetryLimit { if innerErr := s.acknowledgeMessage(queueInfo.url, message.ReceiptHandle); innerErr != nil { - return fmt.Errorf("error acknowledging message after receiving the message too many times: %v", innerErr) + return fmt.Errorf("error acknowledging message after receiving the message too many times: %w", innerErr) } return fmt.Errorf( diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index 05a3b565ac..159d27944d 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -37,6 +37,28 @@ type testFixture struct { queueName string accessKey string secretKey string + sessionToken string +} + +func getDefaultTestFixture(withDLQ bool, dlqMaxReceives ...string) *testFixture { + timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) + fixture := &testFixture{ + region: os.Getenv("AWS_DEFAULT_REGION"), + accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), + secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), + endpoint: os.Getenv("AWS_ENDPOINT_URL"), + profile: os.Getenv("AWS_PROFILE"), + sessionToken: os.Getenv("AWS_SESSION_TOKEN"), + topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), + queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), + } + if withDLQ { + fixture.deadLettersQueueName = fmt.Sprintf("dapr-sqs-test-deadletters-queue-%v", timestamp) + } + if len(dlqMaxReceives) > 0 { + fixture.deadLettersMaxReceives = dlqMaxReceives[0] + } + return fixture } func newAWSSession(cfg *testFixture) *session.Session { @@ -71,11 +93,12 @@ func setupTest(t *testing.T, fixture *testFixture) (pubsub.PubSub, *session.Sess assert.NotNil(t, snssqsClient) props := map[string]string{ - "region": fixture.region, - "accessKey": fixture.accessKey, - "secretKey": fixture.secretKey, - "endpoint": fixture.endpoint, - "consumerID": fixture.queueName, + "region": fixture.region, + "accessKey": fixture.accessKey, + "secretKey": fixture.secretKey, + "endpoint": fixture.endpoint, + "sessionToken": fixture.sessionToken, + "consumerID": fixture.queueName, } if len(fixture.deadLettersQueueName) > 0 { @@ -196,6 +219,8 @@ func snsSqsTest(t *testing.T, sess client.ConfigProvider, snssqsClient pubsub.Pu publishReq := &pubsub.PublishRequest{Topic: fixture.topicName, PubsubName: "test", Data: []byte("string")} err = snssqsClient.Publish(publishReq) assert.Nil(t, err) + // delay between send/recv in sqs + time.Sleep(5 * time.Second) assert.Len(t, msgs, 1) // tear down callback @@ -252,53 +277,34 @@ func TestMain(m *testing.M) { } func TestSnsSqs(t *testing.T) { - timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) - fixtures := []*testFixture{ - { - name: "without dead letters", - region: os.Getenv("AWS_DEFAULT_REGION"), - accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), - secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), - endpoint: os.Getenv("AWS_ENDPOINT_URL"), - profile: os.Getenv("AWS_PROFILE"), - topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), - queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), - }, - // expand to other fixtures if needed + t.Parallel() + + fixture := getDefaultTestFixture(false) + if (len(fixture.accessKey) == 0 && len(fixture.secretKey) == 0) && len(fixture.sessionToken) == 0 { + t.Skip( + `environment variables of either AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + or AWS_SESSION_TOKEN must be set in order to run these integration + tests + `) } + client, sess := setupTest(t, fixture) + teardownSnsSqsTest := snsSqsTest(t, sess, client, fixture) + defer teardownSnsSqsTest(t) - for _, tc := range fixtures { - t.Run(tc.name, func(t *testing.T) { - client, sess := setupTest(t, tc) - teardownSnsSqsTest := snsSqsTest(t, sess, client, tc) - defer teardownSnsSqsTest(t) - }) - } } func TestSnsSqsWithDLQ(t *testing.T) { - timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) - fixtures := []*testFixture{ - { - name: "with dead letters", - region: os.Getenv("AWS_DEFAULT_REGION"), - accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), - secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), - endpoint: os.Getenv("AWS_ENDPOINT_URL"), - profile: os.Getenv("AWS_PROFILE"), - topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), - deadLettersQueueName: fmt.Sprintf("dapr-sqs-test-deadletters-queue-%v", timestamp), - queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), - deadLettersMaxReceives: "1", - }, - // expand to other fixtures if needed - } - - for _, tc := range fixtures { - t.Run(tc.name, func(t *testing.T) { - client, sess := setupTest(t, tc) - teardownSnsSqsTest := snsSqsDeadlettersTest(t, sess, client, tc) - defer teardownSnsSqsTest(t) - }) + t.Parallel() + + fixture := getDefaultTestFixture(true, "1") + if (len(fixture.accessKey) == 0 && len(fixture.secretKey) == 0) && len(fixture.sessionToken) == 0 { + t.Skip( + `environment variables of either AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + or AWS_SESSION_TOKEN must be set in order to run these integration + tests + `) } + client, sess := setupTest(t, fixture) + teardownSnsSqsTest := snsSqsDeadlettersTest(t, sess, client, fixture) + defer teardownSnsSqsTest(t) } diff --git a/pubsub/aws/snssqs/snssqs_test.go b/pubsub/aws/snssqs/snssqs_test.go index 835100bc83..b5f900c9bf 100644 --- a/pubsub/aws/snssqs/snssqs_test.go +++ b/pubsub/aws/snssqs/snssqs_test.go @@ -9,6 +9,7 @@ import ( ) func Test_parseTopicArn(t *testing.T) { + t.Parallel() // no further guarantees are made about this function r := require.New(t) r.Equal("qqnoob", parseTopicArn("arn:aws:sqs:us-east-1:000000000000:qqnoob")) @@ -16,6 +17,7 @@ func Test_parseTopicArn(t *testing.T) { // Verify that all metadata ends up in the correct spot func Test_getSnsSqsMetatdata_AllConfiguration(t *testing.T) { + t.Parallel() r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) @@ -51,6 +53,7 @@ func Test_getSnsSqsMetatdata_AllConfiguration(t *testing.T) { } func Test_getSnsSqsMetatdata_defaults(t *testing.T) { + t.Parallel() r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) @@ -80,6 +83,7 @@ func Test_getSnsSqsMetatdata_defaults(t *testing.T) { } func Test_getSnsSqsMetatdata_legacyaliases(t *testing.T) { + t.Parallel() r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) @@ -108,6 +112,7 @@ func Test_getSnsSqsMetatdata_legacyaliases(t *testing.T) { } func Test_getSnsSqsMetatdata_invalidMessageVisibility(t *testing.T) { + t.Parallel() r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) @@ -130,6 +135,7 @@ func Test_getSnsSqsMetatdata_invalidMessageVisibility(t *testing.T) { } func Test_getSnsSqsMetatdata_invalidMessageRetryLimit(t *testing.T) { + t.Parallel() r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) @@ -152,6 +158,7 @@ func Test_getSnsSqsMetatdata_invalidMessageRetryLimit(t *testing.T) { } func Test_getSnsSqsMetatdata_invalidWaitTimeSecondsTooLow(t *testing.T) { + t.Parallel() r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) @@ -174,6 +181,7 @@ func Test_getSnsSqsMetatdata_invalidWaitTimeSecondsTooLow(t *testing.T) { } func Test_getSnsSqsMetatdata_invalidMessageMaxNumberTooHigh(t *testing.T) { + t.Parallel() r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) @@ -196,6 +204,7 @@ func Test_getSnsSqsMetatdata_invalidMessageMaxNumberTooHigh(t *testing.T) { } func Test_getSnsSqsMetatdata_invalidMessageMaxNumberTooLow(t *testing.T) { + t.Parallel() r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) @@ -218,6 +227,7 @@ func Test_getSnsSqsMetatdata_invalidMessageMaxNumberTooLow(t *testing.T) { } func Test_parseInt64(t *testing.T) { + t.Parallel() r := require.New(t) number, err := parseInt64("applesauce", "propertyName") r.EqualError(err, "parsing propertyName failed with: strconv.Atoi: parsing \"applesauce\": invalid syntax") @@ -235,6 +245,7 @@ func Test_parseInt64(t *testing.T) { } func Test_replaceNameToAWSSanitizedName(t *testing.T) { + t.Parallel() r := require.New(t) s := `Some_invalid-name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid From d17746f6638ccaa4da7a1811a0de793f388bba38 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Thu, 12 Aug 2021 19:10:44 +0300 Subject: [PATCH 27/29] code review fixes * not using implicit maxReceives * maxReceives renamed * unittest refactor --- pubsub/aws/snssqs/snssqs.go | 43 +++--- pubsub/aws/snssqs/snssqs_integ_test.go | 43 +++--- pubsub/aws/snssqs/snssqs_test.go | 196 +++++++++++++------------ 3 files changed, 140 insertions(+), 142 deletions(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 5b39c2efae..dfae45059f 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -55,9 +55,9 @@ type snsSqsMetadata struct { messageVisibilityTimeout int64 // number of times to resend a message after processing of that message fails before removing that message from the queue. Default: 10 messageRetryLimit int64 - // if sqsDeadLettersQueueName is set to a value, then the deadLettersMaxReceives defines the number of times a message is received + // if sqsDeadLettersQueueName is set to a value, then the messageReceiveLimit defines the number of times a message is received // before it is moved to the dead-letters queue. This value must be smaller than messageRetryLimit - deadLettersMaxReceives int64 + messageReceiveLimit int64 // amount of time to await receipt of a message before making another request. Default: 1 messageWaitTimeSeconds int64 // maximum number of messages to receive from the queue at a time. Default: 10, Maximum: 10 @@ -65,11 +65,11 @@ type snsSqsMetadata struct { } const ( - awsSqsQueueNameKey = "dapr-worker-name" - awsSnsTopicNameKey = "dapr-topic-name" - awsSqsDeadLettersQueueName = "dapr-deadletters" + awsSqsQueueNameKey = "dapr-worker-name" + awsSnsTopicNameKey = "dapr-topic-name" ) +// NewSnsSqs - construct a new snssqs dapr component func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, @@ -184,27 +184,18 @@ func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, md.sqsDeadLettersQueueName = val } - if val, ok := getAliasedProperty([]string{"deadLettersMaxReceives"}, metadata); !ok { - md.deadLettersMaxReceives = md.messageRetryLimit - } else { - // fallback: use default dead-letters queue name if deadLettersMaxReceives is defined but the sqsDeadLettersQueueName isn't - if len(md.sqsDeadLettersQueueName) == 0 { - md.sqsDeadLettersQueueName = awsSqsDeadLettersQueueName - } - - deadLettersMaxReceives, err := parseInt64(val, "deadLettersMaxReceives") + if val, ok := getAliasedProperty([]string{"messageReceiveLimit"}, metadata); ok { + messageReceiveLimit, err := parseInt64(val, "messageReceiveLimit") if err != nil { return nil, err } - - // validate: if deadLettersMaxReceives is greater than messageRetryLimit, the message would be deleted by daprd before - // SQS has the opportunity to move the message to the dead-letters queue, so we reject this - if deadLettersMaxReceives > md.messageRetryLimit { - return nil, errors.New("deadLettersMaxReceives must be less than or equal to messageRetryLimit") - } - // assign: used provided configuration - md.deadLettersMaxReceives = deadLettersMaxReceives + md.messageReceiveLimit = messageReceiveLimit + } + + // XOR on having either a valid messageReceiveLimit and invalid sqsDeadLettersQueueName, and vice versa + if (md.messageReceiveLimit > 0 || len(md.sqsDeadLettersQueueName) > 0) && !(md.messageReceiveLimit > 0 && len(md.sqsDeadLettersQueueName) > 0) { + return nil, errors.New("to use SQS dead letters queue, messageReceiveLimit and sqsDeadLettersQueueName must both be set to a value") } if val, ok := props["messageWaitTimeSeconds"]; !ok { @@ -432,11 +423,11 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo, deadLettersQueue return fmt.Errorf( "message received greater than %v times, deleting this message without further processing", s.metadata.messageRetryLimit) } - // ... else, there is no need to actively do something if we reached the limit defined in deadLettersMaxReceives as the message had + // ... else, there is no need to actively do something if we reached the limit defined in messageReceiveLimit as the message had // already been moved to the dead-letters queue by SQS - if deadLettersQueueInfo != nil && recvCountInt >= s.metadata.deadLettersMaxReceives { + if deadLettersQueueInfo != nil && recvCountInt >= s.metadata.messageReceiveLimit { s.logger.Warnf( - "message received greater than %v times, moving this message without further processing to dead-letters queue: %v", s.metadata.deadLettersMaxReceives, s.metadata.sqsDeadLettersQueueName) + "message received greater than %v times, moving this message without further processing to dead-letters queue: %v", s.metadata.messageReceiveLimit, s.metadata.sqsDeadLettersQueueName) } // otherwise try to handle the message @@ -514,7 +505,7 @@ func (s *snsSqs) createDeadLettersQueue() (*sqsQueueInfo, error) { func (s *snsSqs) createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo *sqsQueueInfo) (*sqs.SetQueueAttributesInput, error) { policy := map[string]string{ "deadLetterTargetArn": deadLettersQueueInfo.arn, - "maxReceiveCount": strconv.FormatInt(s.metadata.deadLettersMaxReceives, 10), + "maxReceiveCount": strconv.FormatInt(s.metadata.messageReceiveLimit, 10), } b, err := json.Marshal(policy) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go index 159d27944d..ea4e95b7b4 100644 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ b/pubsub/aws/snssqs/snssqs_integ_test.go @@ -2,7 +2,7 @@ // Copyright (c) Microsoft Corporation and Dapr Contributors. // Licensed under the MIT License. // ------------------------------------------------------------ -// +build integration +// +build e2e package snssqs @@ -27,20 +27,20 @@ import ( ) type testFixture struct { - name string - endpoint string - region string - profile string - topicName string - deadLettersQueueName string - deadLettersMaxReceives string - queueName string - accessKey string - secretKey string - sessionToken string + name string + endpoint string + region string + profile string + topicName string + deadLettersQueueName string + messageReceiveLimit string + queueName string + accessKey string + secretKey string + sessionToken string } -func getDefaultTestFixture(withDLQ bool, dlqMaxReceives ...string) *testFixture { +func getDefaultTestFixture(messageReceiveLimits ...string) *testFixture { timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) fixture := &testFixture{ region: os.Getenv("AWS_DEFAULT_REGION"), @@ -52,12 +52,12 @@ func getDefaultTestFixture(withDLQ bool, dlqMaxReceives ...string) *testFixture topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), } - if withDLQ { + + if len(messageReceiveLimits) > 0 { fixture.deadLettersQueueName = fmt.Sprintf("dapr-sqs-test-deadletters-queue-%v", timestamp) + fixture.messageReceiveLimit = messageReceiveLimits[0] } - if len(dlqMaxReceives) > 0 { - fixture.deadLettersMaxReceives = dlqMaxReceives[0] - } + return fixture } @@ -104,8 +104,8 @@ func setupTest(t *testing.T, fixture *testFixture) (pubsub.PubSub, *session.Sess if len(fixture.deadLettersQueueName) > 0 { props["sqsDeadLettersQueueName"] = fixture.deadLettersQueueName } - if len(fixture.deadLettersMaxReceives) > 0 { - props["deadLettersMaxReceives"] = fixture.deadLettersMaxReceives + if len(fixture.messageReceiveLimit) > 0 { + props["messageReceiveLimit"] = fixture.messageReceiveLimit } pubsubMetadata := pubsub.Metadata{Properties: props} @@ -279,7 +279,7 @@ func TestMain(m *testing.M) { func TestSnsSqs(t *testing.T) { t.Parallel() - fixture := getDefaultTestFixture(false) + fixture := getDefaultTestFixture() if (len(fixture.accessKey) == 0 && len(fixture.secretKey) == 0) && len(fixture.sessionToken) == 0 { t.Skip( `environment variables of either AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY @@ -290,13 +290,12 @@ func TestSnsSqs(t *testing.T) { client, sess := setupTest(t, fixture) teardownSnsSqsTest := snsSqsTest(t, sess, client, fixture) defer teardownSnsSqsTest(t) - } func TestSnsSqsWithDLQ(t *testing.T) { t.Parallel() - fixture := getDefaultTestFixture(true, "1") + fixture := getDefaultTestFixture("1") if (len(fixture.accessKey) == 0 && len(fixture.secretKey) == 0) && len(fixture.sessionToken) == 0 { t.Skip( `environment variables of either AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY diff --git a/pubsub/aws/snssqs/snssqs_test.go b/pubsub/aws/snssqs/snssqs_test.go index b5f900c9bf..ba011b2c8b 100644 --- a/pubsub/aws/snssqs/snssqs_test.go +++ b/pubsub/aws/snssqs/snssqs_test.go @@ -8,6 +8,11 @@ import ( "github.com/stretchr/testify/require" ) +type testUnitFixture struct { + metadata pubsub.Metadata + name string +} + func Test_parseTopicArn(t *testing.T) { t.Parallel() // no further guarantees are made about this function @@ -32,10 +37,12 @@ func Test_getSnsSqsMetatdata_AllConfiguration(t *testing.T) { "secretKey": "s", "sessionToken": "t", "region": "r", + "sqsDeadLettersQueueName": "q", "messageVisibilityTimeout": "2", "messageRetryLimit": "3", "messageWaitTimeSeconds": "4", "messageMaxNumber": "5", + "messageReceiveLimit": "6", }}) r.NoError(err) @@ -46,10 +53,12 @@ func Test_getSnsSqsMetatdata_AllConfiguration(t *testing.T) { r.Equal("s", md.SecretKey) r.Equal("t", md.SessionToken) r.Equal("r", md.Region) + r.Equal("q", md.sqsDeadLettersQueueName) r.Equal(int64(2), md.messageVisibilityTimeout) r.Equal(int64(3), md.messageRetryLimit) r.Equal(int64(4), md.messageWaitTimeSeconds) r.Equal(int64(5), md.messageMaxNumber) + r.Equal(int64(6), md.messageReceiveLimit) } func Test_getSnsSqsMetatdata_defaults(t *testing.T) { @@ -111,119 +120,118 @@ func Test_getSnsSqsMetatdata_legacyaliases(t *testing.T) { r.Equal(int64(10), md.messageMaxNumber) } -func Test_getSnsSqsMetatdata_invalidMessageVisibility(t *testing.T) { +func testMetadataParsingShouldFail(t *testing.T, metadata pubsub.Metadata, l logger.Logger) { t.Parallel() r := require.New(t) - l := logger.NewLogger("SnsSqs unit test") - l.SetOutputLevel(logger.DebugLevel) - ps := snsSqs{ - logger: l, - } - - md, err := ps.getSnsSqsMetatdata(pubsub.Metadata{Properties: map[string]string{ - "consumerID": "consumer", - "Endpoint": "endpoint", - "AccessKey": "acctId", - "SecretKey": "secret", - "awsToken": "token", - "Region": "region", - "messageVisibilityTimeout": "-100", - }}) - - r.Error(err) - r.Nil(md) -} -func Test_getSnsSqsMetatdata_invalidMessageRetryLimit(t *testing.T) { - t.Parallel() - r := require.New(t) - l := logger.NewLogger("SnsSqs unit test") - l.SetOutputLevel(logger.DebugLevel) ps := snsSqs{ logger: l, } - md, err := ps.getSnsSqsMetatdata(pubsub.Metadata{Properties: map[string]string{ - "consumerID": "consumer", - "Endpoint": "endpoint", - "AccessKey": "acctId", - "SecretKey": "secret", - "awsToken": "token", - "Region": "region", - "messageRetryLimit": "-100", - }}) + md, err := ps.getSnsSqsMetatdata(metadata) r.Error(err) r.Nil(md) } -func Test_getSnsSqsMetatdata_invalidWaitTimeSecondsTooLow(t *testing.T) { +func Test_getSnsSqsMetatdata_invalidMetadataSetup(t *testing.T) { t.Parallel() - r := require.New(t) - l := logger.NewLogger("SnsSqs unit test") - l.SetOutputLevel(logger.DebugLevel) - ps := snsSqs{ - logger: l, - } - md, err := ps.getSnsSqsMetatdata(pubsub.Metadata{Properties: map[string]string{ - "consumerID": "consumer", - "Endpoint": "endpoint", - "AccessKey": "acctId", - "SecretKey": "secret", - "awsToken": "token", - "Region": "region", - "messageWaitTimeSeconds": "0", - }}) - - r.Error(err) - r.Nil(md) -} - -func Test_getSnsSqsMetatdata_invalidMessageMaxNumberTooHigh(t *testing.T) { - t.Parallel() - r := require.New(t) - l := logger.NewLogger("SnsSqs unit test") - l.SetOutputLevel(logger.DebugLevel) - ps := snsSqs{ - logger: l, + fixtures := []testUnitFixture{ + { + metadata: pubsub.Metadata{Properties: map[string]string{ + "consumerID": "consumer", + "Endpoint": "endpoint", + "AccessKey": "acctId", + "SecretKey": "secret", + "awsToken": "token", + "Region": "region", + "messageReceiveLimit": "100", + }}, + name: "deadletters receive limit without deadletters queue name", + }, + { + metadata: pubsub.Metadata{Properties: map[string]string{ + "consumerID": "consumer", + "Endpoint": "endpoint", + "AccessKey": "acctId", + "SecretKey": "secret", + "awsToken": "token", + "Region": "region", + "sqsDeadLettersQueueName": "my-queue", + }}, + name: "deadletters message queue without deadletters receive limit", + }, + { + metadata: pubsub.Metadata{Properties: map[string]string{ + "consumerID": "consumer", + "Endpoint": "endpoint", + "AccessKey": "acctId", + "SecretKey": "secret", + "awsToken": "token", + "Region": "region", + "messageMaxNumber": "-100", + }}, + name: "illigal message max number (negative, too low)", + }, + { + metadata: pubsub.Metadata{Properties: map[string]string{ + "consumerID": "consumer", + "Endpoint": "endpoint", + "AccessKey": "acctId", + "SecretKey": "secret", + "awsToken": "token", + "Region": "region", + "messageMaxNumber": "100", + }}, + name: "illigal message max number (too high)", + }, + { + metadata: pubsub.Metadata{Properties: map[string]string{ + "consumerID": "consumer", + "Endpoint": "endpoint", + "AccessKey": "acctId", + "SecretKey": "secret", + "awsToken": "token", + "Region": "region", + "messageWaitTimeSeconds": "0", + }}, + name: "invalid wait time seconds (too low)", + }, + { + metadata: pubsub.Metadata{Properties: map[string]string{ + "consumerID": "consumer", + "Endpoint": "endpoint", + "AccessKey": "acctId", + "SecretKey": "secret", + "awsToken": "token", + "Region": "region", + "messageVisibilityTimeout": "-100", + }}, + name: "invalid message visibility", + }, + { + metadata: pubsub.Metadata{Properties: map[string]string{ + "consumerID": "consumer", + "Endpoint": "endpoint", + "AccessKey": "acctId", + "SecretKey": "secret", + "awsToken": "token", + "Region": "region", + "messageRetryLimit": "-100", + }}, + name: "invalid message retry limit", + }, } - md, err := ps.getSnsSqsMetatdata(pubsub.Metadata{Properties: map[string]string{ - "consumerID": "consumer", - "Endpoint": "endpoint", - "AccessKey": "acctId", - "SecretKey": "secret", - "awsToken": "token", - "Region": "region", - "messageMaxNumber": "100", - }}) - - r.Error(err) - r.Nil(md) -} - -func Test_getSnsSqsMetatdata_invalidMessageMaxNumberTooLow(t *testing.T) { - t.Parallel() - r := require.New(t) l := logger.NewLogger("SnsSqs unit test") l.SetOutputLevel(logger.DebugLevel) - ps := snsSqs{ - logger: l, - } - - md, err := ps.getSnsSqsMetatdata(pubsub.Metadata{Properties: map[string]string{ - "consumerID": "consumer", - "Endpoint": "endpoint", - "AccessKey": "acctId", - "SecretKey": "secret", - "awsToken": "token", - "Region": "region", - "messageMaxNumber": "-100", - }}) - r.Error(err) - r.Nil(md) + for _, tc := range fixtures { + t.Run(tc.name, func(t *testing.T) { + testMetadataParsingShouldFail(t, tc.metadata, l) + }) + } } func Test_parseInt64(t *testing.T) { From fc738057ad4fd9bcca5c679c7a3b8503d5f1f046 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 13 Aug 2021 10:06:50 +0300 Subject: [PATCH 28/29] Update snssqs.go --- pubsub/aws/snssqs/snssqs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index dfae45059f..0acfb39111 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -69,7 +69,7 @@ const ( awsSnsTopicNameKey = "dapr-topic-name" ) -// NewSnsSqs - construct a new snssqs dapr component +// NewSnsSqs - constructor for a new snssqs dapr component func NewSnsSqs(l logger.Logger) pubsub.PubSub { return &snsSqs{ logger: l, From eba168c5aa3db6714b6b4e8cc1830fd666b6a984 Mon Sep 17 00:00:00 2001 From: Amit Mor Date: Fri, 13 Aug 2021 23:46:33 +0300 Subject: [PATCH 29/29] integ removed, renaming back of const --- pubsub/aws/snssqs/snssqs.go | 2 +- pubsub/aws/snssqs/snssqs_integ_test.go | 309 ------------------------- 2 files changed, 1 insertion(+), 310 deletions(-) delete mode 100644 pubsub/aws/snssqs/snssqs_integ_test.go diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index 0acfb39111..0116fe2bc1 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -65,7 +65,7 @@ type snsSqsMetadata struct { } const ( - awsSqsQueueNameKey = "dapr-worker-name" + awsSqsQueueNameKey = "dapr-queue-name" awsSnsTopicNameKey = "dapr-topic-name" ) diff --git a/pubsub/aws/snssqs/snssqs_integ_test.go b/pubsub/aws/snssqs/snssqs_integ_test.go deleted file mode 100644 index ea4e95b7b4..0000000000 --- a/pubsub/aws/snssqs/snssqs_integ_test.go +++ /dev/null @@ -1,309 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ -// +build e2e - -package snssqs - -import ( - "context" - "fmt" - "os" - "strconv" - "testing" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/client" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sns" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/aws/aws-sdk-go/service/sts" - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" - "github.com/stretchr/testify/assert" -) - -type testFixture struct { - name string - endpoint string - region string - profile string - topicName string - deadLettersQueueName string - messageReceiveLimit string - queueName string - accessKey string - secretKey string - sessionToken string -} - -func getDefaultTestFixture(messageReceiveLimits ...string) *testFixture { - timestamp := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) - fixture := &testFixture{ - region: os.Getenv("AWS_DEFAULT_REGION"), - accessKey: os.Getenv("AWS_ACCESS_KEY_ID"), - secretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), - endpoint: os.Getenv("AWS_ENDPOINT_URL"), - profile: os.Getenv("AWS_PROFILE"), - sessionToken: os.Getenv("AWS_SESSION_TOKEN"), - topicName: fmt.Sprintf("dapr-sns-test-topic-%v", timestamp), - queueName: fmt.Sprintf("dapr-sqs-test-queue-%v", timestamp), - } - - if len(messageReceiveLimits) > 0 { - fixture.deadLettersQueueName = fmt.Sprintf("dapr-sqs-test-deadletters-queue-%v", timestamp) - fixture.messageReceiveLimit = messageReceiveLimits[0] - } - - return fixture -} - -func newAWSSession(cfg *testFixture) *session.Session { - // run localstack and use the endpoint url: http://localhost:4566 by using the following cmd - // SERVICES=sns,sqs,sts DEBUG=1 localstack start - var mySession *session.Session - sessionCfg := aws.NewConfig() - // Create a client with additional configuration - if len(cfg.endpoint) != 0 { - sessionCfg.Endpoint = &cfg.endpoint - sessionCfg.Region = &cfg.region - sessionCfg.Credentials = credentials.NewStaticCredentials(cfg.accessKey, cfg.secretKey, "") - sessionCfg.DisableSSL = aws.Bool(true) - - opts := session.Options{Profile: cfg.profile, Config: *sessionCfg} - mySession = session.Must(session.NewSessionWithOptions(opts)) - } else { - sessionCfg.Region = aws.String(cfg.region) - opts := session.Options{SharedConfigState: session.SharedConfigEnable, Config: *sessionCfg} - mySession = session.Must(session.NewSessionWithOptions(opts)) - } - - return mySession -} - -func setupTest(t *testing.T, fixture *testFixture) (pubsub.PubSub, *session.Session) { - sess := newAWSSession(fixture) - assert.NotNil(t, sess) - t.Log("setup test") - - snssqsClient := NewSnsSqs(logger.NewLogger("test")) - assert.NotNil(t, snssqsClient) - - props := map[string]string{ - "region": fixture.region, - "accessKey": fixture.accessKey, - "secretKey": fixture.secretKey, - "endpoint": fixture.endpoint, - "sessionToken": fixture.sessionToken, - "consumerID": fixture.queueName, - } - - if len(fixture.deadLettersQueueName) > 0 { - props["sqsDeadLettersQueueName"] = fixture.deadLettersQueueName - } - if len(fixture.messageReceiveLimit) > 0 { - props["messageReceiveLimit"] = fixture.messageReceiveLimit - } - - pubsubMetadata := pubsub.Metadata{Properties: props} - - err := snssqsClient.Init(pubsubMetadata) - assert.Nil(t, err) - - return snssqsClient, sess -} - -func getAccountID(sess client.ConfigProvider) (*sts.GetCallerIdentityOutput, error) { - svc := sts.New(sess) - input := &sts.GetCallerIdentityInput{} - - result, err := svc.GetCallerIdentity(input) - if err != nil { - return nil, err - } - - return result, err -} - -func getQueueURL(sess client.ConfigProvider, queueName *string) (*sqs.GetQueueUrlOutput, error) { - // Get the account ID - accountResult, aErr := getAccountID(sess) - if aErr != nil { - return nil, aErr - } - - // Create an SQS service client - svc := sqs.New(sess) - result, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ - QueueName: queueName, - QueueOwnerAWSAccountId: accountResult.Account, - }) - if err != nil { - return nil, err - } - - return result, nil -} - -func teardownSqs(t *testing.T, sess client.ConfigProvider, fixture *testFixture) { - svc := sqs.New(sess) - - queueURL, err := getQueueURL(sess, &fixture.queueName) - assert.Nil(t, err) - assert.NotNil(t, queueURL) - - _, err = svc.DeleteQueue(&sqs.DeleteQueueInput{ - QueueUrl: queueURL.QueueUrl, - }) - assert.Nil(t, err) - - var dlQueueURL *sqs.GetQueueUrlOutput - dlQueueURL, err = getQueueURL(sess, &fixture.deadLettersQueueName) - // err would exist if no dead-letter queue exist, which might be the case - // in some tests - if err != nil { - return - } - - svc.DeleteQueue(&sqs.DeleteQueueInput{ - QueueUrl: dlQueueURL.QueueUrl, - }) -} - -func teardownSns(t *testing.T, sess client.ConfigProvider, fixture *testFixture) { - svc := sns.New(sess) - result, err := svc.ListTopics(nil) - assert.Nil(t, err) - assert.NotNil(t, result) - - var accountID *sts.GetCallerIdentityOutput - accountID, err = getAccountID(sess) - assert.Nil(t, err) - assert.NotNil(t, accountID) - - lookupTopicArn := fmt.Sprintf("arn:aws:sns:%v:%v:%v", fixture.region, *accountID.Account, fixture.topicName) - for _, topic := range result.Topics { - if *topic.TopicArn == lookupTopicArn { - // deletes topic - // currently there is a bug in aws-go-sdk that results in the subscription not being - // deleted along with the topic (as should be) so the subscription needs - // to be manually deleted - _, err = svc.DeleteTopic(&sns.DeleteTopicInput{TopicArn: topic.TopicArn}) - assert.Nil(t, err) - } - } -} - -func snsSqsTest(t *testing.T, sess client.ConfigProvider, snssqsClient pubsub.PubSub, fixture *testFixture) func(t *testing.T) { - // subscriber registers to listen to (SNS) topic which eventually land on the fixture.queueName - // over (SQS) queue - req := pubsub.SubscribeRequest{Topic: fixture.topicName} - msgs := []*pubsub.NewMessage{} - handler := func(ctx context.Context, msg *pubsub.NewMessage) error { - msgs = append(msgs, msg) - - return nil - } - - err := snssqsClient.Subscribe(req, handler) - assert.Nil(t, err) - - var queueURL *sqs.GetQueueUrlOutput - queueURL, err = getQueueURL(sess, &fixture.queueName) - assert.Nil(t, err) - assert.NotNil(t, queueURL) - - publishReq := &pubsub.PublishRequest{Topic: fixture.topicName, PubsubName: "test", Data: []byte("string")} - err = snssqsClient.Publish(publishReq) - assert.Nil(t, err) - // delay between send/recv in sqs - time.Sleep(5 * time.Second) - assert.Len(t, msgs, 1) - - // tear down callback - return func(t *testing.T) { - teardownSqs(t, sess, fixture) - teardownSns(t, sess, fixture) - - t.Log("teardown test") - } -} - -func snsSqsDeadlettersTest(t *testing.T, sess client.ConfigProvider, snssqsClient pubsub.PubSub, fixture *testFixture) func(t *testing.T) { - // subscriber's handlers always fails to process message forcing dead letters queue to - req := pubsub.SubscribeRequest{Topic: fixture.topicName} - handler := func(ctx context.Context, msg *pubsub.NewMessage) error { - return fmt.Errorf("failure to receive - dead letters tests") - } - - err := snssqsClient.Subscribe(req, handler) - assert.Nil(t, err) - - var queueURL *sqs.GetQueueUrlOutput - queueURL, err = getQueueURL(sess, &fixture.queueName) - assert.Nil(t, err) - assert.NotNil(t, queueURL) - - publishReq := &pubsub.PublishRequest{Topic: fixture.topicName, PubsubName: "test", Data: []byte("string")} - err = snssqsClient.Publish(publishReq) - assert.Nil(t, err) - - // tear down callback - return func(t *testing.T) { - sqsSvc := sqs.New(sess) - dlQueueURL, err := getQueueURL(sess, &fixture.deadLettersQueueName) - assert.Nil(t, err) - - waitTimeSeconds := int64(10) - var output *sqs.ReceiveMessageOutput - output, err = sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: dlQueueURL.QueueUrl, WaitTimeSeconds: &waitTimeSeconds}) - assert.Nil(t, err) - assert.NotNil(t, output.Messages) - assert.Len(t, output.Messages, 1) - - teardownSqs(t, sess, fixture) - teardownSns(t, sess, fixture) - - t.Log("teardown test") - } -} - -func TestMain(m *testing.M) { - code := m.Run() - os.Exit(code) -} - -func TestSnsSqs(t *testing.T) { - t.Parallel() - - fixture := getDefaultTestFixture() - if (len(fixture.accessKey) == 0 && len(fixture.secretKey) == 0) && len(fixture.sessionToken) == 0 { - t.Skip( - `environment variables of either AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY - or AWS_SESSION_TOKEN must be set in order to run these integration - tests - `) - } - client, sess := setupTest(t, fixture) - teardownSnsSqsTest := snsSqsTest(t, sess, client, fixture) - defer teardownSnsSqsTest(t) -} - -func TestSnsSqsWithDLQ(t *testing.T) { - t.Parallel() - - fixture := getDefaultTestFixture("1") - if (len(fixture.accessKey) == 0 && len(fixture.secretKey) == 0) && len(fixture.sessionToken) == 0 { - t.Skip( - `environment variables of either AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY - or AWS_SESSION_TOKEN must be set in order to run these integration - tests - `) - } - client, sess := setupTest(t, fixture) - teardownSnsSqsTest := snsSqsDeadlettersTest(t, sess, client, fixture) - defer teardownSnsSqsTest(t) -}