Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
bcaa9bb
bugfix for sns topic deletion upon termination
Jul 23, 2021
62ac6b0
Revert "bugfix for sns topic deletion upon termination"
Jul 23, 2021
de891cf
wip on normalizing queue/topic names
Jul 23, 2021
d4d2df7
sanitize queue and topic names
Jul 24, 2021
d3e08e7
sanitized names. bugfix for close
Jul 24, 2021
5c9949b
# This is a combination of 4 commits.
mthmulders Jul 19, 2021
f300ab3
Merge branch 'sns-sqs-topics' of https://github.com/amimimor/componen…
Jul 26, 2021
408fdcd
removed debug message
Jul 26, 2021
f3bee8a
raw string abort
Jul 26, 2021
be550bf
merge issues solved
Jul 26, 2021
7b7dc93
wip
Jul 26, 2021
cb0f2e5
gofmt+remove regex and use byte iter
Jul 26, 2021
1c82e85
Merge branch 'sns-sqs-topics' into dead-letters
Jul 27, 2021
dd0c6e2
wip. first impl of dead-letters queue config
Jul 28, 2021
6c3e283
wip. refactor and fallback values
Jul 28, 2021
e38572a
Merge remote-tracking branch 'upstream/master' into dead-letters
Jul 29, 2021
b4692c9
integration test wip
Jul 29, 2021
58ad8a9
wip integration test
Jul 29, 2021
46bbbc7
wip integration
Jul 30, 2021
43b6e9e
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 1, 2021
582dfb8
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 4, 2021
3cfa704
wip on testing
Aug 4, 2021
c5301a6
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 8, 2021
aace0a1
wip
Aug 8, 2021
f36e268
still buggy but wip!
Aug 8, 2021
eaf769e
bugfix in dlq creation
Aug 8, 2021
e88ae91
working. still bug in subscription clean up
Aug 8, 2021
06c2ff2
Update snssqs_integ_test.go
Aug 9, 2021
207c203
golangci-lint fixes
Aug 9, 2021
2b29a40
golangci-lint refactoring
Aug 10, 2021
98222d9
trying to skip running integrations for snssqs
Aug 10, 2021
a4b5c26
testing
Aug 10, 2021
39ca9ba
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 12, 2021
d17746f
code review fixes
Aug 12, 2021
fc73805
Update snssqs.go
Aug 13, 2021
aca1b3e
Merge remote-tracking branch 'upstream/master' into dead-letters
Aug 13, 2021
ec5c12f
Merge branch 'master' into dead-letters
artursouza Aug 13, 2021
eba168c
integ removed, renaming back of const
Aug 13, 2021
811c4b2
Merge branch 'dead-letters' of https://github.com/amimimor/components…
Aug 13, 2021
2722455
Merge branch 'master' into dead-letters
artursouza Aug 14, 2021
8f4f6ab
Merge branch 'master' into dead-letters
dapr-bot Aug 14, 2021
e5d5b37
Merge branch 'master' into dead-letters
dapr-bot Aug 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 104 additions & 13 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type sqsQueueInfo struct {
type snsSqsMetadata struct {
// name of the queue for this application. The is provided by the runtime as "consumerID"
sqsQueueName string

// name of the dead letter queue for this application
sqsDeadLettersQueueName string
// aws endpoint for the component to use.
Endpoint string
// access key to use for accessing sqs/sns
Expand All @@ -54,6 +55,9 @@ type snsSqsMetadata struct {
messageVisibilityTimeout int64
// number of times to resend a message after processing of that message fails before removing that message from the queue. Default: 10
messageRetryLimit int64
// if sqsDeadLettersQueueName is set to a value, then the messageReceiveLimit defines the number of times a message is received
// before it is moved to the dead-letters queue. This value must be smaller than messageRetryLimit
messageReceiveLimit int64
// amount of time to await receipt of a message before making another request. Default: 1
messageWaitTimeSeconds int64
// maximum number of messages to receive from the queue at a time. Default: 10, Maximum: 10
Expand All @@ -65,6 +69,7 @@ const (
awsSnsTopicNameKey = "dapr-topic-name"
)

// NewSnsSqs - constructor for a new snssqs dapr component
func NewSnsSqs(l logger.Logger) pubsub.PubSub {
return &snsSqs{
logger: l,
Expand All @@ -86,7 +91,7 @@ func getAliasedProperty(aliases []string, metadata pubsub.Metadata) (string, boo
func parseInt64(input string, propertyName string) (int64, error) {
number, err := strconv.Atoi(input)
if err != nil {
return -1, fmt.Errorf("parsing %s failed with: %v", propertyName, err)
return -1, fmt.Errorf("parsing %s failed with: %w", propertyName, err)
}

return int64(number), nil
Expand Down Expand Up @@ -175,6 +180,24 @@ func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata,
md.messageRetryLimit = retryLimit
}

if val, ok := getAliasedProperty([]string{"sqsDeadLettersQueueName"}, metadata); ok {
md.sqsDeadLettersQueueName = val
}

if val, ok := getAliasedProperty([]string{"messageReceiveLimit"}, metadata); ok {
messageReceiveLimit, err := parseInt64(val, "messageReceiveLimit")
if err != nil {
return nil, err
}
// assign: used provided configuration
md.messageReceiveLimit = messageReceiveLimit
}

// XOR on having either a valid messageReceiveLimit and invalid sqsDeadLettersQueueName, and vice versa
if (md.messageReceiveLimit > 0 || len(md.sqsDeadLettersQueueName) > 0) && !(md.messageReceiveLimit > 0 && len(md.sqsDeadLettersQueueName) > 0) {
return nil, errors.New("to use SQS dead letters queue, messageReceiveLimit and sqsDeadLettersQueueName must both be set to a value")
}

if val, ok := props["messageWaitTimeSeconds"]; !ok {
md.messageWaitTimeSeconds = 1
} else {
Expand Down Expand Up @@ -377,7 +400,7 @@ func (s *snsSqs) acknowledgeMessage(queueURL string, receiptHandle *string) erro
return err
}

func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, handler pubsub.Handler) error {
func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo, deadLettersQueueInfo *sqsQueueInfo, handler pubsub.Handler) error {
// if this message has been received > x times, delete from queue, it's borked
recvCount, ok := message.Attributes[sqs.MessageSystemAttributeNameApproximateReceiveCount]

Expand All @@ -391,23 +414,28 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha
return fmt.Errorf("error parsing ApproximateReceiveCount from message: %v", message)
}

// if we are over the allowable retry limit, delete the message from the queue
// TODO dead letter queue
if recvCountInt >= s.metadata.messageRetryLimit {
// if we are over the allowable retry limit, and there is no dead-letters queue, delete the message from the queue.
if deadLettersQueueInfo == nil && recvCountInt >= s.metadata.messageRetryLimit {
if innerErr := s.acknowledgeMessage(queueInfo.url, message.ReceiptHandle); innerErr != nil {
return fmt.Errorf("error acknowledging message after receiving the message too many times: %v", innerErr)
return fmt.Errorf("error acknowledging message after receiving the message too many times: %w", innerErr)
}

return fmt.Errorf(
"message received greater than %v times, deleting this message without further processing", s.metadata.messageRetryLimit)
}
// ... else, there is no need to actively do something if we reached the limit defined in messageReceiveLimit as the message had
// already been moved to the dead-letters queue by SQS
if deadLettersQueueInfo != nil && recvCountInt >= s.metadata.messageReceiveLimit {
s.logger.Warnf(
"message received greater than %v times, moving this message without further processing to dead-letters queue: %v", s.metadata.messageReceiveLimit, s.metadata.sqsDeadLettersQueueName)
}

// otherwise try to handle the message
var messageBody snsMessage
err = json.Unmarshal([]byte(*(message.Body)), &messageBody)

if err != nil {
return fmt.Errorf("error unmarshalling message: %v", err)
return fmt.Errorf("error unmarshalling message: %w", err)
}

topic := parseTopicArn(messageBody.TopicArn)
Expand All @@ -418,14 +446,14 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha
})

if err != nil {
return fmt.Errorf("error handling message: %v", err)
return fmt.Errorf("error handling message: %w", err)
}

// otherwise, there was no error, acknowledge the message
return s.acknowledgeMessage(queueInfo.url, message.ReceiptHandle)
}

func (s *snsSqs) consumeSubscription(queueInfo *sqsQueueInfo, handler pubsub.Handler) {
func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueInfo, handler pubsub.Handler) {
go func() {
for {
messageResponse, err := s.sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
Expand Down Expand Up @@ -454,14 +482,49 @@ func (s *snsSqs) consumeSubscription(queueInfo *sqsQueueInfo, handler pubsub.Han
s.logger.Debugf("%v message(s) received", len(messageResponse.Messages))

for _, m := range messageResponse.Messages {
if err := s.handleMessage(m, queueInfo, handler); err != nil {
if err := s.handleMessage(m, queueInfo, deadLettersQueueInfo, handler); err != nil {
s.logger.Error(err)
}
}
}
}()
}

func (s *snsSqs) createDeadLettersQueue() (*sqsQueueInfo, error) {
var deadLettersQueueInfo *sqsQueueInfo
deadLettersQueueInfo, err := s.getOrCreateQueue(s.metadata.sqsDeadLettersQueueName)
if err != nil {
s.logger.Errorf("error retrieving SQS dead-letter queue: %v", err)

return nil, err
}

return deadLettersQueueInfo, nil
}

func (s *snsSqs) createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo *sqsQueueInfo) (*sqs.SetQueueAttributesInput, error) {
policy := map[string]string{
"deadLetterTargetArn": deadLettersQueueInfo.arn,
"maxReceiveCount": strconv.FormatInt(s.metadata.messageReceiveLimit, 10),
}

b, err := json.Marshal(policy)
if err != nil {
s.logger.Errorf("error marshalling dead-letters queue policy: %v", err)

return nil, err
}

sqsSetQueueAttributesInput := &sqs.SetQueueAttributesInput{
QueueUrl: &queueInfo.url,
Attributes: map[string]*string{
sqs.QueueAttributeNameRedrivePolicy: aws.String(string(b)),
},
}

return sqsSetQueueAttributesInput, nil
}

func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
// subscribers declare a topic ARN
// and declare a SQS queue to use
Expand All @@ -475,13 +538,41 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler)
}

// this is the ID of the application, it is supplied via runtime as "consumerID"
queueInfo, err := s.getOrCreateQueue(s.metadata.sqsQueueName)
var queueInfo *sqsQueueInfo
queueInfo, err = s.getOrCreateQueue(s.metadata.sqsQueueName)
if err != nil {
s.logger.Errorf("error retrieving SQS queue: %v", err)

return err
}

var deadLettersQueueInfo *sqsQueueInfo
if len(s.metadata.sqsDeadLettersQueueName) > 0 {
var derr error
deadLettersQueueInfo, derr = s.createDeadLettersQueue()
if derr != nil {
s.logger.Errorf("error creating dead-letter queue: %v", derr)

return derr
}

var sqsSetQueueAttributesInput *sqs.SetQueueAttributesInput
sqsSetQueueAttributesInput, derr = s.createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo)
if derr != nil {
s.logger.Errorf("error creatubg queue attributes for dead-letter queue: %v", derr)

return derr
}
_, derr = s.sqsClient.SetQueueAttributes(sqsSetQueueAttributesInput)
if derr != nil {
s.logger.Errorf("error updating queue attributes with dead-letter queue: %v", derr)

return derr
}
}

// apply the dead letters queue attributes to the current queue

// subscription creation is idempotent. Subscriptions are unique by topic/queue
subscribeOutput, err := s.snsClient.Subscribe(&sns.SubscribeInput{
Attributes: nil,
Expand All @@ -499,7 +590,7 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler)
s.subscriptions = append(s.subscriptions, subscribeOutput.SubscriptionArn)
s.logger.Debugf("Subscribed to topic %s: %v", req.Topic, subscribeOutput)

s.consumeSubscription(queueInfo, handler)
s.consumeSubscription(queueInfo, deadLettersQueueInfo, handler)

return nil
}
Expand Down
Loading