diff --git a/bindings/smtp/smtp.go b/bindings/smtp/smtp.go index 28385a5f14..d71aefb69c 100644 --- a/bindings/smtp/smtp.go +++ b/bindings/smtp/smtp.go @@ -63,7 +63,7 @@ func (s *Mailer) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, // Merge config metadata with request metadata metadata := s.metadata.mergeWithRequestMetadata(req) if metadata.EmailFrom == "" { - return nil, fmt.Errorf("smtp binding error: fromEmail property not supplied in configuration- or request-metadata") + return nil, fmt.Errorf("smtp binding error: emailFrom property not supplied in configuration- or request-metadata") } if metadata.EmailTo == "" { return nil, fmt.Errorf("smtp binding error: emailTo property not supplied in configuration- or request-metadata") diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index 0e2448d5d8..e4cb6e8298 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -413,7 +413,14 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. a.metadata.MaxActiveMessages, a.metadata.MaxActiveMessagesRecoveryInSec) if innerErr != nil { - a.logger.Error(innerErr) + var detachError *amqp.DetachError + var ampqError *amqp.Error + if errors.Is(innerErr, detachError) || + (errors.As(innerErr, &qError) && ampqError.Condition == amqp.ErrorDetachForced) { + a.logger.Debug(innerErr) + } else { + a.logger.Error(innerErr) + } } cancel() // Cancel receive context diff --git a/pubsub/azure/servicebus/subscription.go b/pubsub/azure/servicebus/subscription.go index 1d60e811eb..ae7f6bf837 100644 --- a/pubsub/azure/servicebus/subscription.go +++ b/pubsub/azure/servicebus/subscription.go @@ -3,7 +3,6 @@ package servicebus import ( "context" "fmt" - "strings" "sync" "time" @@ -203,11 +202,7 @@ func (s *subscription) tryRenewLocks() { func (s *subscription) receiveMessage(ctx context.Context, handler azservicebus.HandlerFunc) error { s.logger.Debugf("Waiting to receive message on topic %s", s.topic) if err := s.entity.ReceiveOne(ctx, handler); err != nil { - if strings.Contains(err.Error(), "force detached") { - return nil - } - - return fmt.Errorf("%s error receiving message on topic %s, %s", errorMessagePrefix, s.topic, err) + return fmt.Errorf("%s error receiving message on topic %s, %w", errorMessagePrefix, s.topic, err) } return nil diff --git a/pubsub/nats/metadata.go b/pubsub/nats/metadata.go deleted file mode 100644 index e48fa1bb9a..0000000000 --- a/pubsub/nats/metadata.go +++ /dev/null @@ -1,11 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -type metadata struct { - natsURL string - natsQueueGroupName string -} diff --git a/pubsub/nats/nats.go b/pubsub/nats/nats.go deleted file mode 100644 index 84693debc3..0000000000 --- a/pubsub/nats/nats.go +++ /dev/null @@ -1,99 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -import ( - "context" - "errors" - "fmt" - - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" - nats "github.com/nats-io/nats.go" -) - -const ( - natsURL = "natsURL" - consumerID = "consumerID" -) - -type natsPubSub struct { - metadata metadata - natsConn *nats.Conn - - logger logger.Logger -} - -// NewNATSPubSub returns a new NATS pub-sub implementation -func NewNATSPubSub(logger logger.Logger) pubsub.PubSub { - return &natsPubSub{logger: logger} -} - -func parseNATSMetadata(meta pubsub.Metadata) (metadata, error) { - m := metadata{} - if val, ok := meta.Properties[natsURL]; ok && val != "" { - m.natsURL = val - } else { - return m, errors.New("nats error: missing nats URL") - } - - if val, ok := meta.Properties[consumerID]; ok && val != "" { - m.natsQueueGroupName = val - } else { - return m, errors.New("nats error: missing queue name") - } - - return m, nil -} - -func (n *natsPubSub) Init(metadata pubsub.Metadata) error { - m, err := parseNATSMetadata(metadata) - if err != nil { - return err - } - - n.metadata = m - natsConn, err := nats.Connect(m.natsURL) - if err != nil { - return fmt.Errorf("nats: error connecting to nats at %s: %s", m.natsURL, err) - } - n.logger.Debugf("connected to nats at %s", m.natsURL) - - n.natsConn = natsConn - - return nil -} - -func (n *natsPubSub) Publish(req *pubsub.PublishRequest) error { - err := n.natsConn.Publish(req.Topic, req.Data) - if err != nil { - return fmt.Errorf("nats: error from publish: %s", err) - } - - return nil -} - -func (n *natsPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { - sub, err := n.natsConn.QueueSubscribe(req.Topic, n.metadata.natsQueueGroupName, func(natsMsg *nats.Msg) { - handler(context.Background(), &pubsub.NewMessage{Topic: req.Topic, Data: natsMsg.Data}) - }) - if err != nil { - n.logger.Warnf("nats: error subscribe: %s", err) - } - n.logger.Debugf("nats: subscribed to subject %s with queue group %s", sub.Subject, sub.Queue) - - return nil -} - -func (n *natsPubSub) Close() error { - n.natsConn.Close() - - return nil -} - -func (n *natsPubSub) Features() []pubsub.Feature { - return nil -} diff --git a/pubsub/nats/nats_test.go b/pubsub/nats/nats_test.go deleted file mode 100644 index dfe805ef5f..0000000000 --- a/pubsub/nats/nats_test.go +++ /dev/null @@ -1,69 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation and Dapr Contributors. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package nats - -import ( - "errors" - "testing" - - "github.com/dapr/components-contrib/pubsub" - "github.com/stretchr/testify/assert" -) - -func TestParseNATSMetadata(t *testing.T) { - t.Run("metadata is correct", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "foonats1", - consumerID: "fooq1", - } - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - - // act - m, err := parseNATSMetadata(fakeMetaData) - - // assert - assert.NoError(t, err) - assert.NotEmpty(t, m.natsURL) - assert.NotEmpty(t, m.natsQueueGroupName) - assert.Equal(t, fakeProperties[natsURL], m.natsURL) - assert.Equal(t, fakeProperties[consumerID], m.natsQueueGroupName) - }) - - t.Run("queue is not given", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "foonats2", - consumerID: "", - } - - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - - // act - m, err := parseNATSMetadata(fakeMetaData) - // assert - assert.Error(t, errors.New("nats error: missing queue name"), err) - assert.Equal(t, fakeProperties[natsURL], m.natsURL) - assert.Empty(t, m.natsQueueGroupName) - }) - - t.Run("nats url is not given", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "", - consumerID: "fooq2", - } - fakeMetaData := pubsub.Metadata{ - Properties: fakeProperties, - } - // act - m, err := parseNATSMetadata(fakeMetaData) - // assert - assert.Error(t, errors.New("nats error: missing nats URL"), err) - assert.Empty(t, m.natsURL) - }) -}