Skip to content
Prev Previous commit
Next Next commit
remove SQS request params from Terminator spec
  • Loading branch information
cjerad committed Apr 5, 2022
commit 0407de64ed7c467ef4a30662b30e8adb70e3ec3b
18 changes: 0 additions & 18 deletions src/api/v1alpha1/terminator_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,7 @@ func (t *TerminatorSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error {
}

func (s SQSSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddArray("attributeNames", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, attrName := range s.AttributeNames {
enc.AppendString(attrName)
}
return nil
}))

enc.AddInt64("maxNumberOfMessages", s.MaxNumberOfMessages)

enc.AddArray("messageAttributeNames", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, attrName := range s.MessageAttributeNames {
enc.AppendString(attrName)
}
return nil
}))

enc.AddString("queueURL", s.QueueURL)
enc.AddInt64("visibilityTimeoutSeconds", s.VisibilityTimeoutSeconds)
enc.AddInt64("waitTimeSeconds", s.WaitTimeSeconds)
return nil
}

Expand Down
7 changes: 1 addition & 6 deletions src/api/v1alpha1/terminator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ type TerminatorSpec struct {
// SQSSpec defines inputs to SQS "receive messages" requests.
type SQSSpec struct {
// https://pkg.go.dev/github.com/aws/[email protected]/service/sqs#ReceiveMessageInput
AttributeNames []string `json:"attributeNames,omitempty"`
MaxNumberOfMessages int64 `json:"maxNumberOfMessages,omitempty"`
MessageAttributeNames []string `json:"messageAttributeNames,omitempty"`
QueueURL string `json:"queueURL,omitempty"`
VisibilityTimeoutSeconds int64 `json:"visibilityTimeoutSeconds,omitempty"`
WaitTimeSeconds int64 `json:"waitTimeSeconds,omitempty"`
QueueURL string `json:"queueURL,omitempty"`
}

// DrainSpec defines inputs to the cordon and drain operations.
Expand Down
39 changes: 0 additions & 39 deletions src/api/v1alpha1/terminator_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package v1alpha1
import (
"context"
"net/url"
"strings"

"github.com/aws/aws-sdk-go/service/sqs"

Expand All @@ -45,46 +44,8 @@ func (t *TerminatorSpec) validate() (errs *apis.FieldError) {
}

func (s *SQSSpec) validate() (errs *apis.FieldError) {
for _, attrName := range s.AttributeNames {
if !knownSQSAttributeNames.Has(attrName) {
errs = errs.Also(apis.ErrInvalidValue(attrName, "attributeNames"))
}
}

// https://github.com/aws/aws-sdk-go/blob/v1.38.55/service/sqs/api.go#L3996-L3999
if s.MaxNumberOfMessages < 1 || 10 < s.MaxNumberOfMessages {
errs = errs.Also(apis.ErrInvalidValue(s.MaxNumberOfMessages, "maxNumberOfMessages", "must be in range 1-10"))
}

// https://github.com/aws/aws-sdk-go/blob/v1.38.55/service/sqs/api.go#L4001-L4021
//
// Simple checks are done below. More indepth checks are left to the SQS client/service.
for _, attrName := range s.MessageAttributeNames {
if len(attrName) > 256 {
errs = errs.Also(apis.ErrInvalidValue(attrName, "messageAttributeNames", "must be 256 characters or less"))
}

lcAttrName := strings.ToLower(attrName)
if strings.HasPrefix(lcAttrName, "aws") || strings.HasPrefix(lcAttrName, "amazon") {
errs = errs.Also(apis.ErrInvalidValue(attrName, "messageAttributeNames", `must not use reserved prefixes "AWS" or "Amazon"`))
}

if strings.HasPrefix(attrName, ".") || strings.HasSuffix(attrName, ".") {
errs = errs.Also(apis.ErrInvalidValue(attrName, "messageAttributeNames", "must not begin or end with a period (.)"))
}
}

if _, err := url.Parse(s.QueueURL); err != nil {
errs = errs.Also(apis.ErrInvalidValue(s.QueueURL, "queueURL", "must be a valid URL"))
}

if s.VisibilityTimeoutSeconds < 0 {
errs = errs.Also(apis.ErrInvalidValue(s.VisibilityTimeoutSeconds, "visibilityTimeoutSeconds", "must be zero or greater"))
}

if s.WaitTimeSeconds < 0 {
errs = errs.Also(apis.ErrInvalidValue(s.WaitTimeSeconds, "waitTimeSeconds", "must be zero or greater"))
}

return errs
}
10 changes: 0 additions & 10 deletions src/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40,82 +40,6 @@ spec:
description: AWS SQS queue configuration.
type: object
properties:
attributeNames:
description: |
A list of attributes that need to be returned along with each message. These
attributes include:

* All – Returns all values.

* ApproximateFirstReceiveTimestamp – Returns the time the message was
first received from the queue (epoch time (http://en.wikipedia.org/wiki/Unix_time)
in milliseconds).

* ApproximateReceiveCount – Returns the number of times a message has
been received across all queues but not deleted.

* AWSTraceHeader – Returns the AWS X-Ray trace header string.

* SenderId For an IAM user, returns the IAM user ID, for example ABCDEFGHI1JKLMNOPQ23R.
For an IAM role, returns the IAM role ID, for example ABCDE1F2GH3I4JK5LMNOP:i-a123b456.

* SentTimestamp – Returns the time the message was sent to the queue
(epoch time (http://en.wikipedia.org/wiki/Unix_time) in milliseconds).

* MessageDeduplicationId – Returns the value provided by the producer
that calls the SendMessage action.

* MessageGroupId – Returns the value provided by the producer that calls
the SendMessage action. Messages with the same MessageGroupId are returned
in sequence.

* SequenceNumber – Returns the value provided by Amazon SQS.
type: array
items:
type: string
{{- with .Values.terminator.defaults.sqs.attributeNames }}
default:
{{- toYaml . | nindent 22 }}
{{- end }}
maxNumberOfMessages:
description: |
The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned). Valid values:
1 to 10.
type: integer
format: int64
{{- with .Values.terminator.defaults.sqs.maxNumberOfMessages }}
default: {{ . }}
{{- end }}
messageAttributeNames:
description: |
The name of the message attribute, where N is the index.

* The name can contain alphanumeric characters and the underscore (_),
hyphen (-), and period (.).

* The name is case-sensitive and must be unique among all attribute names
for the message.

* The name must not start with AWS-reserved prefixes such as AWS. or Amazon.
(or any casing variants).

* The name must not start or end with a period (.), and it should not
have periods in succession (..).

* The name can be up to 256 characters long.

When using ReceiveMessage, you can send a list of attribute names to receive,
or you can return all of the attributes by specifying All or .* in your request.
You can also use all message attributes starting with a prefix, for example
bar.*.
type: array
items:
type: string
{{- with .Values.terminator.defaults.sqs.messageAttributeNames }}
default:
{{- toYaml . | nindent 22 }}
{{- end }}
queueURL:
description: |
The URL of the Amazon SQS queue from which messages are received.
Expand All @@ -124,33 +48,6 @@ spec:

* QueueURL is a required field
type: string
visibilityTimeoutSeconds:
description: |
The duration (in seconds) that the received messages are hidden from subsequent
retrieve requests after being retrieved by a ReceiveMessage request.
type: integer
format: int64
{{- with .Values.terminator.defaults.sqs.visibilityTimeoutSeconds }}
default: {{ . }}
{{- end }}
waitTimeSeconds:
description: |
The duration (in seconds) for which the call waits for a message to arrive
in the queue before returning. If a message is available, the call returns
sooner than WaitTimeSeconds. If no messages are available and the wait time
expires, the call returns successfully with an empty list of messages.

To avoid HTTP errors, ensure that the HTTP response timeout for ReceiveMessage
requests is longer than the WaitTimeSeconds parameter. For example, with
the Java SDK, you can set HTTP transport settings using the NettyNioAsyncHttpClient
(https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.html)
for asynchronous clients, or the ApacheHttpClient (https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.html)
for synchronous clients.
type: integer
format: int64
{{- with .Values.terminator.defaults.sqs.waitTimeSeconds }}
default: {{ . }}
{{- end }}
drain:
description: TBD
type: object
Expand Down
8 changes: 0 additions & 8 deletions src/charts/aws-node-termination-handler-2/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ annotations: {}

terminator:
defaults:
sqs:
attributeNames:
- SentTimestamp
maxNumberOfMessages: 10
messageAttributeNames:
- All
visibilityTimeoutSeconds: 20
waitTimeSeconds: 20
drain:
force: true
gracePeriodSeconds: -1
Expand Down
20 changes: 9 additions & 11 deletions src/pkg/terminator/adapter/sqsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,16 @@ type (

func (s SQSMessageClientBuilder) NewSQSClient(terminator *v1alpha1.Terminator) (terminator.SQSClient, error) {
receiveMessageInput := sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(terminator.Spec.SQS.MaxNumberOfMessages),
QueueUrl: aws.String(terminator.Spec.SQS.QueueURL),
VisibilityTimeout: aws.Int64(terminator.Spec.SQS.VisibilityTimeoutSeconds),
WaitTimeSeconds: aws.Int64(terminator.Spec.SQS.WaitTimeSeconds),
}
receiveMessageInput.AttributeNames = make([]*string, len(terminator.Spec.SQS.AttributeNames))
for i, attrName := range terminator.Spec.SQS.AttributeNames {
receiveMessageInput.AttributeNames[i] = aws.String(attrName)
}
receiveMessageInput.MessageAttributeNames = make([]*string, len(terminator.Spec.SQS.MessageAttributeNames))
for i, attrName := range terminator.Spec.SQS.MessageAttributeNames {
receiveMessageInput.MessageAttributeNames[i] = aws.String(attrName)
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(20), // Seconds
WaitTimeSeconds: aws.Int64(20), // Seconds, maximum for long polling
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
}

deleteMessageInput := sqs.DeleteMessageInput{
Expand Down
15 changes: 5 additions & 10 deletions src/test/reconciliation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,26 +1195,21 @@ var _ = Describe("Reconciliation", func() {

When("getting messages from a terminator's SQS queue", func() {
const (
maxNumberOfMessages = int64(4)
visibilityTimeoutSeconds = int64(17)
waitTimeSeconds = int64(31)
maxNumberOfMessages = int64(10)
visibilityTimeoutSeconds = int64(20)
waitTimeSeconds = int64(20)
)
var (
attributeNames = []string{"TestAttributeName1", "TestAttributeName2"}
messageAttributeNames = []string{"TestMsgAttributeName1", "TestMsgAttributeName2"}
attributeNames = []string{sqs.MessageSystemAttributeNameSentTimestamp}
messageAttributeNames = []string{sqs.QueueAttributeNameAll}
input *sqs.ReceiveMessageInput
)

BeforeEach(func() {
terminator, found := terminators[terminatorNamespaceName]
Expect(found).To(BeTrue())

terminator.Spec.SQS.MaxNumberOfMessages = maxNumberOfMessages
terminator.Spec.SQS.QueueURL = queueURL
terminator.Spec.SQS.VisibilityTimeoutSeconds = visibilityTimeoutSeconds
terminator.Spec.SQS.WaitTimeSeconds = waitTimeSeconds
terminator.Spec.SQS.AttributeNames = append([]string{}, attributeNames...)
terminator.Spec.SQS.MessageAttributeNames = append([]string{}, messageAttributeNames...)

defaultReceiveSQSMessageFunc := receiveSQSMessageFunc
receiveSQSMessageFunc = func(ctx aws.Context, in *sqs.ReceiveMessageInput, options ...awsrequest.Option) (*sqs.ReceiveMessageOutput, error) {
Expand Down