-
Notifications
You must be signed in to change notification settings - Fork 543
pubsub/aws/snssqs to support SQS dead-letter queue #1066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
bcaa9bb
bugfix for sns topic deletion upon termination
62ac6b0
Revert "bugfix for sns topic deletion upon termination"
de891cf
wip on normalizing queue/topic names
d4d2df7
sanitize queue and topic names
d3e08e7
sanitized names. bugfix for close
5c9949b
# This is a combination of 4 commits.
mthmulders f300ab3
Merge branch 'sns-sqs-topics' of https://github.com/amimimor/componen…
408fdcd
removed debug message
f3bee8a
raw string abort
be550bf
merge issues solved
7b7dc93
wip
cb0f2e5
gofmt+remove regex and use byte iter
1c82e85
Merge branch 'sns-sqs-topics' into dead-letters
dd0c6e2
wip. first impl of dead-letters queue config
6c3e283
wip. refactor and fallback values
e38572a
Merge remote-tracking branch 'upstream/master' into dead-letters
b4692c9
integration test wip
58ad8a9
wip integration test
46bbbc7
wip integration
43b6e9e
Merge remote-tracking branch 'upstream/master' into dead-letters
582dfb8
Merge remote-tracking branch 'upstream/master' into dead-letters
3cfa704
wip on testing
c5301a6
Merge remote-tracking branch 'upstream/master' into dead-letters
aace0a1
wip
f36e268
still buggy but wip!
eaf769e
bugfix in dlq creation
e88ae91
working. still bug in subscription clean up
06c2ff2
Update snssqs_integ_test.go
207c203
golangci-lint fixes
2b29a40
golangci-lint refactoring
98222d9
trying to skip running integrations for snssqs
a4b5c26
testing
39ca9ba
Merge remote-tracking branch 'upstream/master' into dead-letters
d17746f
code review fixes
fc73805
Update snssqs.go
aca1b3e
Merge remote-tracking branch 'upstream/master' into dead-letters
ec5c12f
Merge branch 'master' into dead-letters
artursouza eba168c
integ removed, renaming back of const
811c4b2
Merge branch 'dead-letters' of https://github.com/amimimor/components…
2722455
Merge branch 'master' into dead-letters
artursouza 8f4f6ab
Merge branch 'master' into dead-letters
dapr-bot e5d5b37
Merge branch 'master' into dead-letters
dapr-bot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,8 @@ type sqsQueueInfo struct { | |
| type snsSqsMetadata struct { | ||
| // name of the queue for this application. The is provided by the runtime as "consumerID" | ||
| sqsQueueName string | ||
|
|
||
| // name of the dead letter queue for this application | ||
| sqsDeadLettersQueueName string | ||
| // aws endpoint for the component to use. | ||
| Endpoint string | ||
| // access key to use for accessing sqs/sns | ||
|
|
@@ -54,15 +55,19 @@ type snsSqsMetadata struct { | |
| messageVisibilityTimeout int64 | ||
| // number of times to resend a message after processing of that message fails before removing that message from the queue. Default: 10 | ||
| messageRetryLimit int64 | ||
| // if sqsDeadLettersQueueName is set to a value, then the deadLettersMaxReceives defines the number of times a message is received | ||
| // before it is moved to the dead-letters queue. This value must be smaller than messageRetryLimit | ||
| deadLettersMaxReceives int64 | ||
| // amount of time to await receipt of a message before making another request. Default: 1 | ||
| messageWaitTimeSeconds int64 | ||
| // maximum number of messages to receive from the queue at a time. Default: 10, Maximum: 10 | ||
| messageMaxNumber int64 | ||
| } | ||
|
|
||
| const ( | ||
| awsSqsQueueNameKey = "dapr-queue-name" | ||
| awsSnsTopicNameKey = "dapr-topic-name" | ||
| awsSqsQueueNameKey = "dapr-worker-name" | ||
| awsSnsTopicNameKey = "dapr-topic-name" | ||
| awsSqsDeadLettersQueueName = "dapr-deadletters" | ||
| ) | ||
|
|
||
| func NewSnsSqs(l logger.Logger) pubsub.PubSub { | ||
|
|
@@ -86,7 +91,7 @@ func getAliasedProperty(aliases []string, metadata pubsub.Metadata) (string, boo | |
| func parseInt64(input string, propertyName string) (int64, error) { | ||
| number, err := strconv.Atoi(input) | ||
| if err != nil { | ||
| return -1, fmt.Errorf("parsing %s failed with: %v", propertyName, err) | ||
| return -1, fmt.Errorf("parsing %s failed with: %w", propertyName, err) | ||
| } | ||
|
|
||
| return int64(number), nil | ||
|
|
@@ -175,6 +180,33 @@ func (s *snsSqs) getSnsSqsMetatdata(metadata pubsub.Metadata) (*snsSqsMetadata, | |
| md.messageRetryLimit = retryLimit | ||
| } | ||
|
|
||
| if val, ok := getAliasedProperty([]string{"sqsDeadLettersQueueName"}, metadata); ok { | ||
| md.sqsDeadLettersQueueName = val | ||
| } | ||
|
|
||
| if val, ok := getAliasedProperty([]string{"deadLettersMaxReceives"}, metadata); !ok { | ||
| md.deadLettersMaxReceives = md.messageRetryLimit | ||
|
||
| } else { | ||
| // fallback: use default dead-letters queue name if deadLettersMaxReceives is defined but the sqsDeadLettersQueueName isn't | ||
| if len(md.sqsDeadLettersQueueName) == 0 { | ||
| md.sqsDeadLettersQueueName = awsSqsDeadLettersQueueName | ||
| } | ||
|
|
||
| deadLettersMaxReceives, err := parseInt64(val, "deadLettersMaxReceives") | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // validate: if deadLettersMaxReceives is greater than messageRetryLimit, the message would be deleted by daprd before | ||
| // SQS has the opportunity to move the message to the dead-letters queue, so we reject this | ||
| if deadLettersMaxReceives > md.messageRetryLimit { | ||
| return nil, errors.New("deadLettersMaxReceives must be less than or equal to messageRetryLimit") | ||
| } | ||
|
|
||
| // assign: used provided configuration | ||
| md.deadLettersMaxReceives = deadLettersMaxReceives | ||
| } | ||
|
|
||
| if val, ok := props["messageWaitTimeSeconds"]; !ok { | ||
| md.messageWaitTimeSeconds = 1 | ||
| } else { | ||
|
|
@@ -377,7 +409,7 @@ func (s *snsSqs) acknowledgeMessage(queueURL string, receiptHandle *string) erro | |
| return err | ||
| } | ||
|
|
||
| func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, handler pubsub.Handler) error { | ||
| func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo, deadLettersQueueInfo *sqsQueueInfo, handler pubsub.Handler) error { | ||
| // if this message has been received > x times, delete from queue, it's borked | ||
| recvCount, ok := message.Attributes[sqs.MessageSystemAttributeNameApproximateReceiveCount] | ||
|
|
||
|
|
@@ -391,23 +423,28 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha | |
| return fmt.Errorf("error parsing ApproximateReceiveCount from message: %v", message) | ||
| } | ||
|
|
||
| // if we are over the allowable retry limit, delete the message from the queue | ||
| // TODO dead letter queue | ||
| if recvCountInt >= s.metadata.messageRetryLimit { | ||
| // if we are over the allowable retry limit, and there is no dead-letters queue, delete the message from the queue. | ||
| if deadLettersQueueInfo == nil && recvCountInt >= s.metadata.messageRetryLimit { | ||
| if innerErr := s.acknowledgeMessage(queueInfo.url, message.ReceiptHandle); innerErr != nil { | ||
| return fmt.Errorf("error acknowledging message after receiving the message too many times: %v", innerErr) | ||
| return fmt.Errorf("error acknowledging message after receiving the message too many times: %w", innerErr) | ||
| } | ||
|
|
||
| return fmt.Errorf( | ||
| "message received greater than %v times, deleting this message without further processing", s.metadata.messageRetryLimit) | ||
| } | ||
| // ... else, there is no need to actively do something if we reached the limit defined in deadLettersMaxReceives as the message had | ||
| // already been moved to the dead-letters queue by SQS | ||
| if deadLettersQueueInfo != nil && recvCountInt >= s.metadata.deadLettersMaxReceives { | ||
| s.logger.Warnf( | ||
| "message received greater than %v times, moving this message without further processing to dead-letters queue: %v", s.metadata.deadLettersMaxReceives, s.metadata.sqsDeadLettersQueueName) | ||
| } | ||
|
|
||
| // otherwise try to handle the message | ||
| var messageBody snsMessage | ||
| err = json.Unmarshal([]byte(*(message.Body)), &messageBody) | ||
|
|
||
| if err != nil { | ||
| return fmt.Errorf("error unmarshalling message: %v", err) | ||
| return fmt.Errorf("error unmarshalling message: %w", err) | ||
| } | ||
|
|
||
| topic := parseTopicArn(messageBody.TopicArn) | ||
|
|
@@ -418,14 +455,14 @@ func (s *snsSqs) handleMessage(message *sqs.Message, queueInfo *sqsQueueInfo, ha | |
| }) | ||
|
|
||
| if err != nil { | ||
| return fmt.Errorf("error handling message: %v", err) | ||
| return fmt.Errorf("error handling message: %w", err) | ||
| } | ||
|
|
||
| // otherwise, there was no error, acknowledge the message | ||
| return s.acknowledgeMessage(queueInfo.url, message.ReceiptHandle) | ||
| } | ||
|
|
||
| func (s *snsSqs) consumeSubscription(queueInfo *sqsQueueInfo, handler pubsub.Handler) { | ||
| func (s *snsSqs) consumeSubscription(queueInfo, deadLettersQueueInfo *sqsQueueInfo, handler pubsub.Handler) { | ||
| go func() { | ||
| for { | ||
| messageResponse, err := s.sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{ | ||
|
|
@@ -454,14 +491,49 @@ func (s *snsSqs) consumeSubscription(queueInfo *sqsQueueInfo, handler pubsub.Han | |
| s.logger.Debugf("%v message(s) received", len(messageResponse.Messages)) | ||
|
|
||
| for _, m := range messageResponse.Messages { | ||
| if err := s.handleMessage(m, queueInfo, handler); err != nil { | ||
| if err := s.handleMessage(m, queueInfo, deadLettersQueueInfo, handler); err != nil { | ||
| s.logger.Error(err) | ||
| } | ||
| } | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| func (s *snsSqs) createDeadLettersQueue() (*sqsQueueInfo, error) { | ||
| var deadLettersQueueInfo *sqsQueueInfo | ||
| deadLettersQueueInfo, err := s.getOrCreateQueue(s.metadata.sqsDeadLettersQueueName) | ||
| if err != nil { | ||
| s.logger.Errorf("error retrieving SQS dead-letter queue: %v", err) | ||
|
|
||
| return nil, err | ||
| } | ||
|
|
||
| return deadLettersQueueInfo, nil | ||
| } | ||
|
|
||
| func (s *snsSqs) createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo *sqsQueueInfo) (*sqs.SetQueueAttributesInput, error) { | ||
| policy := map[string]string{ | ||
| "deadLetterTargetArn": deadLettersQueueInfo.arn, | ||
| "maxReceiveCount": strconv.FormatInt(s.metadata.deadLettersMaxReceives, 10), | ||
| } | ||
|
|
||
| b, err := json.Marshal(policy) | ||
| if err != nil { | ||
| s.logger.Errorf("error marshalling dead-letters queue policy: %v", err) | ||
|
|
||
| return nil, err | ||
| } | ||
|
|
||
| sqsSetQueueAttributesInput := &sqs.SetQueueAttributesInput{ | ||
| QueueUrl: &queueInfo.url, | ||
| Attributes: map[string]*string{ | ||
| sqs.QueueAttributeNameRedrivePolicy: aws.String(string(b)), | ||
| }, | ||
| } | ||
|
|
||
| return sqsSetQueueAttributesInput, nil | ||
| } | ||
|
|
||
| func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { | ||
| // subscribers declare a topic ARN | ||
| // and declare a SQS queue to use | ||
|
|
@@ -475,13 +547,41 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) | |
| } | ||
|
|
||
| // this is the ID of the application, it is supplied via runtime as "consumerID" | ||
| queueInfo, err := s.getOrCreateQueue(s.metadata.sqsQueueName) | ||
| var queueInfo *sqsQueueInfo | ||
| queueInfo, err = s.getOrCreateQueue(s.metadata.sqsQueueName) | ||
| if err != nil { | ||
| s.logger.Errorf("error retrieving SQS queue: %v", err) | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| var deadLettersQueueInfo *sqsQueueInfo | ||
| if len(s.metadata.sqsDeadLettersQueueName) > 0 { | ||
| var derr error | ||
| deadLettersQueueInfo, derr = s.createDeadLettersQueue() | ||
| if derr != nil { | ||
| s.logger.Errorf("error creating dead-letter queue: %v", derr) | ||
|
|
||
| return derr | ||
| } | ||
|
|
||
| var sqsSetQueueAttributesInput *sqs.SetQueueAttributesInput | ||
| sqsSetQueueAttributesInput, derr = s.createQueueAttributesWithDeadLetters(queueInfo, deadLettersQueueInfo) | ||
| if derr != nil { | ||
| s.logger.Errorf("error creatubg queue attributes for dead-letter queue: %v", derr) | ||
|
|
||
| return derr | ||
| } | ||
| _, derr = s.sqsClient.SetQueueAttributes(sqsSetQueueAttributesInput) | ||
| if derr != nil { | ||
| s.logger.Errorf("error updating queue attributes with dead-letter queue: %v", derr) | ||
|
|
||
| return derr | ||
| } | ||
| } | ||
|
|
||
| // apply the dead letters queue attributes to the current queue | ||
|
|
||
| // subscription creation is idempotent. Subscriptions are unique by topic/queue | ||
| subscribeOutput, err := s.snsClient.Subscribe(&sns.SubscribeInput{ | ||
| Attributes: nil, | ||
|
|
@@ -499,7 +599,7 @@ func (s *snsSqs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) | |
| s.subscriptions = append(s.subscriptions, subscribeOutput.SubscriptionArn) | ||
| s.logger.Debugf("Subscribed to topic %s: %v", req.Topic, subscribeOutput) | ||
|
|
||
| s.consumeSubscription(queueInfo, handler) | ||
| s.consumeSubscription(queueInfo, deadLettersQueueInfo, handler) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be renamed to messageReceiveLimit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. I borrowed it from the AWS official param. but your suggestion makes sense