Skip to content
Prev Previous commit
Next Next commit
tidy up interface organization
  • Loading branch information
cjerad committed Apr 5, 2022
commit c4ee5fb39a54dc7ddd01aecdd67e7304ffc99d78
86 changes: 24 additions & 62 deletions src/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
nodename "github.com/aws/aws-node-termination-handler/pkg/node/name"
"github.com/aws/aws-node-termination-handler/pkg/sqsmessage"
"github.com/aws/aws-node-termination-handler/pkg/terminator"
terminatoradapter "github.com/aws/aws-node-termination-handler/pkg/terminator/adapter"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
Expand Down Expand Up @@ -146,71 +147,32 @@ func main() {
logger.Fatal("failed to create EC2 client")
}

nodeGetter, err := node.NewGetter(kubeClient)
if err != nil {
logger.With("error", err).Fatal("failed to create node getter")
}
nodeNameGetter, err := nodename.NewGetter(ec2Client)
if err != nil {
logger.With("error", err).Fatal("failed to create node name getter")
}

asgTerminateEventV1Parser, err := asgterminateeventv1.NewParser(asgClient)
if err != nil {
logger.With("error", err).Fatal("failed to create ASG instance-terminate lifecycle event v1 parser")
}
asgTerminateEventV2Parser, err := asgterminateeventv2.NewParser(asgClient)
if err != nil {
logger.With("error", err).Fatal("failed to create ASG instance-terminate lifecycle event v2 parser")
}
sqsMessageParser, err := terminator.NewSQSMessageParser(event.NewParser(
asgTerminateEventV1Parser,
asgTerminateEventV2Parser,
rebalancerecommendationeventv0.NewParser(),
scheduledchangeeventv1.NewParser(),
spotinterruptioneventv1.NewParser(),
statechangeeventv1.NewParser(),
))
if err != nil {
logger.With("error", err).Fatal("failed to create SQS message parser")
}

terminatorGetter, err := terminator.NewGetter(kubeClient)
if err != nil {
logger.With("error", err).Fatal("failed to create terminator getter")
}

sqsMessageClient, err := sqsmessage.NewClient(sqsClient)
if err != nil {
logger.With("error", err).Fatal("failed to create SQS message client")
}
terminatorSQSClientBuilder, err := terminator.NewSQSClientBuilder(sqsMessageClient)
if err != nil {
logger.With("error", err).Fatal("failed to create terminator SQS message client builder")
}

cordonDrainerBuilder, err := kubectlcordondrainer.NewBuilder(
clientSet,
kubectlcordondrainer.DefaultCordoner,
kubectlcordondrainer.DefaultDrainer,
eventParser := event.NewAggregatedParser(
asgterminateeventv1.Parser{ASGLifecycleActionCompleter: asgClient},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do the versions (v0, v1, v2) correspond to? Is it necessary to have both asgterminateeventv1 and asgterminateeventv2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It corresponds to the value of the "version" field from the EventBridge message (stored in AWSMetadata.Version). Each event type has a versioned schema defined in EventBridge, e.g. here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh gotcha, I feel like I'm getting lost in the versions here: CRD/API version, app version, module/pkg version, and now EventBridge schema version.

With that said, I'm still leaning towards removing the version (v0, v1) from the project directories/file paths. I don't think the schema version needs to be exposed in the project, but instead handled within an event's Parse logic. For example with asgterminate event, Parse would try to unmarshal into v1 struct first.. if that fails try v2.

  • Another idea (related to my other comment) is a "super" struct AsgTerminate that is a superset of all fields, unmarshal into that, and then check field(s) for version.

Last concern with breaking out by schema version is if v3 is released tomorrow with very minimal changes, then we need to create a new directory and set of files that would be mostly copy/paste which I don't think is necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if v3 is released tomorrow with very minimal changes [emphasis mine]

It is also possible that new versions would include new and important data fields, or require new API calls.

I think the question is: what should NTH do if it encounters a newer version number of a known event type?

  1. Ignore it
    • This is the currently implemented behavior; an error is logged
  2. Handle it the same as the previous version
    • Should probably log a warning about the unknown version in case there is a failure later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question is: what should NTH do if it encounters a newer version number of a known event type?

  1. Ignore it
    This is the currently implemented behavior; an error is logged
  2. Handle it the same as the previous version
    Should probably log a warning about the unknown version in case there is a failure later

I'd go with number 2 and make it a best effort to handle it (or maybe configurable?). However, I think regardless of how we implement unknown version handling we don't need to expose the version at the package level.

asgterminateeventv2.Parser{ASGLifecycleActionCompleter: asgClient},
rebalancerecommendationeventv0.Parser{},
scheduledchangeeventv1.Parser{},
spotinterruptioneventv1.Parser{},
statechangeeventv1.Parser{},
)
if err != nil {
logger.With("error", err).Fatal("failed to create kubectl cordon/drain client")
}
terminatorCordonDrainerBuilder, err := terminator.NewCordonDrainerBuilder(cordonDrainerBuilder)
if err != nil {
logger.With("error", err).Fatal("failed to create terminator cordon/drain client")
}

rec := terminator.Reconciler{
Name: "terminator",
RequeueInterval: time.Duration(10) * time.Second,
NodeGetter: nodeGetter,
NodeNameGetter: nodeNameGetter,
SQSClientBuilder: terminatorSQSClientBuilder,
SQSMessageParser: sqsMessageParser,
Getter: terminatorGetter,
CordonDrainerBuilder: terminatorCordonDrainerBuilder,
Name: "terminator",
RequeueInterval: time.Duration(10) * time.Second,
NodeGetter: node.Getter{KubeGetter: kubeClient},
NodeNameGetter: nodename.Getter{EC2InstancesDescriber: ec2Client},
SQSClientBuilder: terminatoradapter.SQSMessageClientBuilder{
SQSMessageClient: sqsmessage.Client{SQSClient: sqsClient},
},
SQSMessageParser: terminatoradapter.EventParser{Parser: eventParser},
Getter: terminatoradapter.Getter{KubeGetter: kubeClient},
CordonDrainerBuilder: terminatoradapter.CordonDrainerBuilder{
Builder: kubectlcordondrainer.Builder{
ClientSet: clientSet,
Cordoner: kubectlcordondrainer.DefaultCordoner,
Drainer: kubectlcordondrainer.DefaultDrainer,
},
},
}
if err = rec.BuildController(
ctrl.NewControllerManagedBy(mgr).
Expand Down
15 changes: 11 additions & 4 deletions src/pkg/event/parser.go → src/pkg/event/aggregatedparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@ import (
"encoding/json"

"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/terminator"
)

type parser []Parser
type (
Parser interface {
Parse(context.Context, string) terminator.Event
}

AggregatedParser []Parser
)

func NewParser(parsers ...Parser) Parser {
return parser(parsers)
func NewAggregatedParser(parsers ...Parser) AggregatedParser {
return AggregatedParser(parsers)
}

func (p parser) Parse(ctx context.Context, str string) Event {
func (p AggregatedParser) Parse(ctx context.Context, str string) terminator.Event {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("event.parser"))

if str == "" {
Expand Down
14 changes: 3 additions & 11 deletions src/pkg/event/asgterminate/v1/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package v1
import (
"context"
"encoding/json"
"fmt"

"github.com/aws/aws-node-termination-handler/pkg/event"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/terminator"
)

const (
Expand All @@ -32,18 +31,11 @@ const (
acceptedTransition = "autoscaling:EC2_INSTANCE_TERMINATING"
)

type parser struct {
type Parser struct {
ASGLifecycleActionCompleter
}

func NewParser(completer ASGLifecycleActionCompleter) (event.Parser, error) {
if completer == nil {
return nil, fmt.Errorf("argument 'completer' is nil")
}
return parser{ASGLifecycleActionCompleter: completer}, nil
}

func (p parser) Parse(ctx context.Context, str string) event.Event {
func (p Parser) Parse(ctx context.Context, str string) terminator.Event {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("asgTerminateLifecycleAction.v1"))

evt := EC2InstanceTerminateLifecycleAction{
Expand Down
14 changes: 3 additions & 11 deletions src/pkg/event/asgterminate/v2/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package v2
import (
"context"
"encoding/json"
"fmt"

"github.com/aws/aws-node-termination-handler/pkg/event"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/terminator"
)

const (
Expand All @@ -32,18 +31,11 @@ const (
acceptedTransition = "autoscaling:EC2_INSTANCE_TERMINATING"
)

type parser struct {
type Parser struct {
ASGLifecycleActionCompleter
}

func NewParser(completer ASGLifecycleActionCompleter) (event.Parser, error) {
if completer == nil {
return nil, fmt.Errorf("argument 'completer' is nil")
}
return parser{ASGLifecycleActionCompleter: completer}, nil
}

func (p parser) Parse(ctx context.Context, str string) event.Event {
func (p Parser) Parse(ctx context.Context, str string) terminator.Event {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("asgTerminateLifecycleAction.v2"))

evt := EC2InstanceTerminateLifecycleAction{
Expand Down
8 changes: 3 additions & 5 deletions src/pkg/event/rebalancerecommendation/v0/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"context"
"encoding/json"

"github.com/aws/aws-node-termination-handler/pkg/event"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/terminator"
)

const (
Expand All @@ -30,11 +30,9 @@ const (
version = "0"
)

func NewParser() event.Parser {
return event.ParserFunc(parse)
}
type Parser struct{}

func parse(ctx context.Context, str string) event.Event {
func (Parser) Parse(ctx context.Context, str string) terminator.Event {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("rebalanceRecommendation.v0"))

evt := EC2InstanceRebalanceRecommendation{}
Expand Down
8 changes: 3 additions & 5 deletions src/pkg/event/scheduledchange/v1/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"context"
"encoding/json"

"github.com/aws/aws-node-termination-handler/pkg/event"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/terminator"
)

const (
Expand All @@ -32,11 +32,9 @@ const (
acceptedEventTypeCategory = "scheduledChange"
)

func NewParser() event.Parser {
return event.ParserFunc(parse)
}
type Parser struct{}

func parse(ctx context.Context, str string) event.Event {
func (Parser) Parse(ctx context.Context, str string) terminator.Event {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("scheduledChange.v1"))

evt := AWSHealthEvent{}
Expand Down
8 changes: 3 additions & 5 deletions src/pkg/event/spotinterruption/v1/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"context"
"encoding/json"

"github.com/aws/aws-node-termination-handler/pkg/event"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/terminator"
)

const (
Expand All @@ -30,11 +30,9 @@ const (
version = "1"
)

func NewParser() event.Parser {
return event.ParserFunc(parse)
}
type Parser struct{}

func parse(ctx context.Context, str string) event.Event {
func (Parser) Parse(ctx context.Context, str string) terminator.Event {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("spotInterruption.v1"))

evt := EC2SpotInstanceInterruptionWarning{}
Expand Down
8 changes: 3 additions & 5 deletions src/pkg/event/statechange/v1/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"encoding/json"
"strings"

"github.com/aws/aws-node-termination-handler/pkg/event"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/terminator"
)

const (
Expand All @@ -34,11 +34,9 @@ const (

var acceptedStatesList = strings.Split(acceptedStates, ",")

func NewParser() event.Parser {
return event.ParserFunc(parse)
}
type Parser struct{}

func parse(ctx context.Context, str string) event.Event {
func (Parser) Parse(ctx context.Context, str string) terminator.Event {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("stateChange.v1"))

evt := EC2InstanceStateChangeNotification{}
Expand Down
36 changes: 0 additions & 36 deletions src/pkg/event/types.go

This file was deleted.

14 changes: 5 additions & 9 deletions src/pkg/logging/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,24 @@ limitations under the License.
package logging

import (
"io"
"fmt"
"strings"

"go.uber.org/zap"
)

// loggerWriter adapts a logger to an `io.Writer`.
type loggerWriter struct {
// Writer adapts a logger to an `io.Writer`.
type Writer struct {
*zap.SugaredLogger
}

func NewWriter(logger *zap.SugaredLogger) io.Writer {
return loggerWriter{SugaredLogger: logger}
}

// Write converts `buf` to a string and sends it to the underlying logger.
// If the string beings with "warn" or "error" (case in-sensitive) the message
// will be logged at the corresponding level; otherwise the level will be
// "info".
func (w loggerWriter) Write(buf []byte) (int, error) {
func (w Writer) Write(buf []byte) (int, error) {
if w.SugaredLogger == nil {
return len(buf), nil
return 0, fmt.Errorf("Writer's backing logger is nil")
}

msg := string(buf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package event
package cordondrain

import (
"context"
)

type ParserFunc func(context.Context, string) Event

func (pf ParserFunc) Parse(ctx context.Context, str string) Event {
return pf(ctx, str)
type Config struct {
Force bool
GracePeriodSeconds int
IgnoreAllDaemonSets bool
DeleteEmptyDirData bool
TimeoutSeconds int
}
Loading