Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
bcaa9bb
bugfix for sns topic deletion upon termination
Jul 23, 2021
62ac6b0
Revert "bugfix for sns topic deletion upon termination"
Jul 23, 2021
de891cf
wip on normalizing queue/topic names
Jul 23, 2021
d4d2df7
sanitize queue and topic names
Jul 24, 2021
d3e08e7
sanitized names. bugfix for close
Jul 24, 2021
5c9949b
# This is a combination of 4 commits.
mthmulders Jul 19, 2021
f300ab3
Merge branch 'sns-sqs-topics' of https://github.com/amimimor/componen…
Jul 26, 2021
408fdcd
removed debug message
Jul 26, 2021
f3bee8a
raw string abort
Jul 26, 2021
be550bf
merge issues solved
Jul 26, 2021
7b7dc93
wip
Jul 26, 2021
cb0f2e5
gofmt+remove regex and use byte iter
Jul 26, 2021
1c82e85
Merge branch 'sns-sqs-topics' into dead-letters
Jul 27, 2021
dd0c6e2
wip. first impl of dead-letters queue config
Jul 28, 2021
6c3e283
wip. refactor and fallback values
Jul 28, 2021
e38572a
Merge remote-tracking branch 'upstream/master' into dead-letters
Jul 29, 2021
b4692c9
integration test wip
Jul 29, 2021
58ad8a9
wip integration test
Jul 29, 2021
46bbbc7
wip integration
Jul 30, 2021
43b6e9e
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 1, 2021
582dfb8
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 4, 2021
3cfa704
wip on testing
Aug 4, 2021
c5301a6
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 8, 2021
aace0a1
wip
Aug 8, 2021
f36e268
still buggy but wip!
Aug 8, 2021
eaf769e
bugfix in dlq creation
Aug 8, 2021
e88ae91
working. still bug in subscription clean up
Aug 8, 2021
06c2ff2
Update snssqs_integ_test.go
Aug 9, 2021
207c203
golangci-lint fixes
Aug 9, 2021
2b29a40
golangci-lint refactoring
Aug 10, 2021
98222d9
trying to skip running integrations for snssqs
Aug 10, 2021
a4b5c26
testing
Aug 10, 2021
39ca9ba
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 12, 2021
d17746f
code review fixes
Aug 12, 2021
fc73805
Update snssqs.go
Aug 13, 2021
aca1b3e
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 13, 2021
ec5c12f
Merge branch 'master' into dead-letters
artursouza Aug 13, 2021
eba168c
integ removed, renaming back of const
Aug 13, 2021
811c4b2
Merge branch 'dead-letters' of https://github.com/amimimor/components…
Aug 13, 2021
2722455
Merge branch 'master' into dead-letters
artursouza Aug 14, 2021
8f4f6ab
Merge branch 'master' into dead-letters
dapr-bot Aug 14, 2021
e5d5b37
Merge branch 'master' into dead-letters
dapr-bot Aug 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package snssqs

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"regexp"

"github.com/aws/aws-sdk-go/aws"
sns "github.com/aws/aws-sdk-go/service/sns"
Expand All @@ -20,8 +20,8 @@ import (
type snsSqs struct {
// key is the topic name, value is the ARN of the topic
topics map[string]string
// key is the hashed topic name, value is the actual topic name
topicHash map[string]string
// key is the sanitized topic name, value is the actual topic name
topicSanitized map[string]string
// key is the topic name, value holds the ARN of the queue and its url
queues map[string]*sqsQueueInfo
snsClient *sns.SNS
Expand Down Expand Up @@ -66,6 +66,9 @@ const (
awsSnsTopicNameKey = "dapr-topic-name"
)

var awsSnsSqsAllowedCharsRe = regexp.MustCompile("[^a-zA-Z0-9_\\-]+")


func NewSnsSqs(l logger.Logger) pubsub.PubSub {
return &snsSqs{
logger: l,
Expand Down Expand Up @@ -93,13 +96,16 @@ func parseInt64(input string, propertyName string) (int64, error) {
return int64(number), nil
}

// take a name and hash it for compatibility with AWS resource names
// the output is fixed at 64 characters
func nameToHash(name string) string {
h := sha256.New()
h.Write([]byte(name))

return fmt.Sprintf("%x", h.Sum(nil))
// sanitize topic/queue name to conform with:
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-queues.html
func nameToAWSSanitizedName(name string) string {
sanitizedName := awsSnsSqsAllowedCharsRe.ReplaceAllString(name, "")
if len(sanitizedName) > 80 {
sanitizedName = sanitizedName[:80]
}

return sanitizedName
}

func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, error) {
Expand Down Expand Up @@ -207,7 +213,7 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error {
// both Publish and Subscribe need reference the topic ARN
// track these ARNs in this map
s.topics = make(map[string]string)
s.topicHash = make(map[string]string)
s.topicSanitized = make(map[string]string)
s.queues = make(map[string]*sqsQueueInfo)
sess, err := aws_auth.GetClient(md.AccessKey, md.SecretKey, md.SessionToken, md.Region, md.Endpoint)
if err != nil {
Expand All @@ -220,16 +226,16 @@ func (s *snsSqs) Init(metadata pubsub.Metadata) error {
}

func (s *snsSqs) createTopic(topic string) (string, string, error) {
hashedName := nameToHash(topic)
sanitizedName := nameToAWSSanitizedName(topic)
createTopicResponse, err := s.snsClient.CreateTopic(&sns.CreateTopicInput{
Name: aws.String(hashedName),
Name: aws.String(sanitizedName),
Tags: []*sns.Tag{{Key: aws.String(awsSnsTopicNameKey), Value: aws.String(topic)}},
})
if err != nil {
return "", "", err
}

return *(createTopicResponse.TopicArn), hashedName, nil
return *(createTopicResponse.TopicArn), sanitizedName, nil
}

// get the topic ARN from the topics map. If it doesn't exist in the map, try to fetch it from AWS, if it doesn't exist
Expand All @@ -245,7 +251,7 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) {

s.logger.Debugf("No topic ARN found for %s\n Creating topic instead.", topic)

topicArn, hashedName, err := s.createTopic(topic)
topicArn, sanitizedName, err := s.createTopic(topic)
if err != nil {
s.logger.Errorf("error creating new topic %s: %v", topic, err)

Expand All @@ -254,14 +260,14 @@ func (s *snsSqs) getOrCreateTopic(topic string) (string, error) {

// record topic ARN
s.topics[topic] = topicArn
s.topicHash[hashedName] = topic
s.topicSanitized[sanitizedName] = topic

return topicArn, nil
}

func (s *snsSqs) createQueue(queueName string) (*sqsQueueInfo, error) {
createQueueResponse, err := s.sqsClient.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(nameToHash(queueName)),
QueueName: aws.String(nameToAWSSanitizedName(queueName)),
Tags: map[string]*string{awsSqsQueueNameKey: aws.String(queueName)},
})
if err != nil {
Expand Down Expand Up @@ -397,7 +403,7 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha
}

topic := parseTopicArn(messageBody.TopicArn)
topic = s.topicHash[topic]
topic = s.topicSanitized[topic]
err = handler(context.Background(), &pubsub.NewMessage{
Data: []byte(messageBody.Message),
Topic: topic,
Expand Down Expand Up @@ -491,11 +497,12 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler)
}

func (s *snsSqs) Close() error {
for _, sub := range s.subscriptions {
s.snsClient.Unsubscribe(&sns.UnsubscribeInput{
SubscriptionArn: sub,
})
}
s.logger.Debugf("Close was called and is now NOOP")
// for _, sub := range s.subscriptions {
// s.snsClient.Unsubscribe(&sns.UnsubscribeInput{
// SubscriptionArn: sub,
// })
// }

return nil
}
Expand Down
24 changes: 7 additions & 17 deletions pubsub/aws/snssqs/snssqs_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package snssqs

import (
"fmt"
"strings"
"testing"

"github.com/dapr/components-contrib/pubsub"
Expand Down Expand Up @@ -236,22 +234,14 @@ func Test_parseInt64(t *testing.T) {
r.Error(err)
}

func Test_nameToHash(t *testing.T) {

func Test_replaceNameToAWSSanitizedName(t *testing.T) {
r := require.New(t)

// This string is too long and contains invalid character for either an SQS queue or an SNS topic
hashedName := nameToHash(`
Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid
s := `Some_invalid-name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid
name // for an AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^Some invalid name // for an
AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^
`)

r.Equal(64, len(hashedName))
// Output is only expected to contain lower case characters representing valid hexadecimal numerals
for _, c := range hashedName {
r.True(
strings.ContainsAny(
"abcdef0123456789", string(c)),
fmt.Sprintf("Invalid character %s in hashed name", string(c)))
}
AWS resource &*()*&&^Some invalid name // for an AWS resource &*()*&&^`
v := nameToAWSSanitizedName(s)
r.Equal(80, len(v))
r.Equal("Some_invalid-nameforanAWSresourceSomeinvalidnameforanAWSresourceSomeinvalidnamef", v)
}