diff --git a/sdk/messaging/azeventhubs/consumer_client_test.go b/sdk/messaging/azeventhubs/consumer_client_test.go index 60646eab7304..5d70759c4b0a 100644 --- a/sdk/messaging/azeventhubs/consumer_client_test.go +++ b/sdk/messaging/azeventhubs/consumer_client_test.go @@ -5,6 +5,7 @@ package azeventhubs_test import ( "context" "fmt" + "net" "strings" "sync" "testing" @@ -17,8 +18,82 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub" "github.com/stretchr/testify/require" + "nhooyr.io/websocket" ) +func TestConsumerClient_UsingWebSockets(t *testing.T) { + // NOTE: This error is coming from the `nhooyr.io/websocket` package. There's an + // open discussion here: + // https://github.com/nhooyr/websocket/discussions/380 + // + // The frame it's waiting for (at this point) is the other half of the websocket CLOSE handshake. + // I wireshark'd this and confirmed that the frame does arrive, it's just not read by the local + // package. In this context, since the connection has already shut down, this is harmless. + var expectedWSErr = "failed to close WebSocket: failed to read frame header: EOF" + + newWebSocketConnFn := func(ctx context.Context, args azeventhubs.WebSocketConnParams) (net.Conn, error) { + opts := &websocket.DialOptions{ + Subprotocols: []string{"amqp"}, + } + wssConn, _, err := websocket.Dial(ctx, args.Host, opts) + + if err != nil { + return nil, err + } + + return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil + } + + testParams := test.GetConnectionParamsForTest(t) + + producerClient, err := azeventhubs.NewProducerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, &azeventhubs.ProducerClientOptions{ + NewWebSocketConn: newWebSocketConnFn, + }) + require.NoError(t, err) + + defer func() { + err := producerClient.Close(context.Background()) + require.EqualError(t, err, expectedWSErr) + }() + + partProps, err := producerClient.GetPartitionProperties(context.Background(), "0", nil) + require.NoError(t, err) + + batch, err := producerClient.NewEventDataBatch(context.Background(), &azeventhubs.EventDataBatchOptions{ + PartitionID: to.Ptr("0"), + }) + require.NoError(t, err) + + err = batch.AddEventData(&azeventhubs.EventData{ + Body: []byte("hello world"), + }, nil) + require.NoError(t, err) + + err = producerClient.SendEventDataBatch(context.Background(), batch, nil) + require.NoError(t, err) + + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{ + NewWebSocketConn: newWebSocketConnFn, + }) + require.NoError(t, err) + + defer func() { + err := consumerClient.Close(context.Background()) + require.EqualError(t, err, expectedWSErr) + }() + + partClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{ + StartPosition: getStartPosition(partProps), + }) + require.NoError(t, err) + + defer test.RequireClose(t, partClient) + + events, err := partClient.ReceiveEvents(context.Background(), 1, nil) + require.NoError(t, err) + require.Equal(t, []string{"hello world"}, getSortedBodies(events)) +} + func TestConsumerClient_DefaultAzureCredential(t *testing.T) { testParams := test.GetConnectionParamsForTest(t) diff --git a/sdk/messaging/azeventhubs/example_websockets_and_proxies_test.go b/sdk/messaging/azeventhubs/example_websockets_and_proxies_test.go new file mode 100644 index 000000000000..06869312253b --- /dev/null +++ b/sdk/messaging/azeventhubs/example_websockets_and_proxies_test.go @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azeventhubs_test + +import ( + "context" + "net" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "nhooyr.io/websocket" +) + +func ExampleNewClient_usingWebsocketsAndProxies() { + // You can use an HTTP proxy, with websockets, by setting the appropriate HTTP(s)_PROXY + // variable in your environment, as described in the https://pkg.go.dev/net/http#ProxyFromEnvironment + // function. + // + // A proxy is NOT required to use websockets. + newWebSocketConnFn := func(ctx context.Context, args azeventhubs.WebSocketConnParams) (net.Conn, error) { + opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}} + wssConn, _, err := websocket.Dial(ctx, args.Host, opts) + + if err != nil { + return nil, err + } + + return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil + } + + // NOTE: If you'd like to authenticate via Azure Active Directory use the + // the `NewProducerClient` or `NewConsumerClient` functions. + consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString("connection-string", "event-hub", azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{ + NewWebSocketConn: newWebSocketConnFn, + }) + + if err != nil { + panic(err) + } + + // NOTE: For users of `nhooyr.io/websocket` there's an open discussion here: + // https://github.com/nhooyr/websocket/discussions/380 + // + // An error ("failed to read frame header: EOF") can be returned when the + // websocket connection is closed. This error will be returned from the + // `ConsumerClient.Close` or `ProducerClient.Close` functions and can be + // ignored, as the websocket "close handshake" has already completed. + defer consumerClient.Close(context.TODO()) +} diff --git a/sdk/messaging/azeventhubs/go.mod b/sdk/messaging/azeventhubs/go.mod index c620605aa66a..b83df482a6aa 100644 --- a/sdk/messaging/azeventhubs/go.mod +++ b/sdk/messaging/azeventhubs/go.mod @@ -11,6 +11,7 @@ require ( github.com/golang/mock v1.6.0 github.com/joho/godotenv v1.4.0 github.com/stretchr/testify v1.7.1 + nhooyr.io/websocket v1.8.7 ) require ( @@ -20,6 +21,7 @@ require ( github.com/gofrs/uuid v3.3.0+incompatible // indirect github.com/golang-jwt/jwt v3.2.1+incompatible // indirect github.com/google/uuid v1.1.1 // indirect + github.com/klauspost/compress v1.10.3 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/sdk/messaging/azeventhubs/go.sum b/sdk/messaging/azeventhubs/go.sum index aa41f406b1a4..1436a17a878a 100644 --- a/sdk/messaging/azeventhubs/go.sum +++ b/sdk/messaging/azeventhubs/go.sum @@ -17,6 +17,23 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= @@ -25,11 +42,23 @@ github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8 github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -37,8 +66,16 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/microsoft/ApplicationInsights-Go v0.4.4 h1:G4+H9WNs6ygSCe6sUyxRc2U81TI5Es90b2t/MwX5KqY= github.com/microsoft/ApplicationInsights-Go v0.4.4/go.mod h1:fKRUseBqkw6bDiXTs3ESTiU/4YTIHsQS4W3fP2ieF4U= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -48,9 +85,15 @@ github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqgg github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -69,6 +112,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -77,14 +121,18 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+R golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= @@ -92,7 +140,11 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/sdk/messaging/azeventhubs/internal/test/test_helpers.go b/sdk/messaging/azeventhubs/internal/test/test_helpers.go index b3061b0bd7ba..aebb49af26ba 100644 --- a/sdk/messaging/azeventhubs/internal/test/test_helpers.go +++ b/sdk/messaging/azeventhubs/internal/test/test_helpers.go @@ -151,10 +151,10 @@ func mustGetEnvironmentVars(t *testing.T, names []string) map[string]string { return envVars } -func RequireClose(t *testing.T, closeable interface { +func RequireClose[T interface { Close(ctx context.Context) error -}) { - require.NoError(t, closeable.Close(context.Background())) +}](t *testing.T, closeable T) { + require.NoErrorf(t, closeable.Close(context.Background()), "%T closes cleanly", closeable) } // RequireContextHasDefaultTimeout checks that the context has a deadline set, and that it's diff --git a/sdk/messaging/azservicebus/client_test.go b/sdk/messaging/azservicebus/client_test.go index fec66215ae7e..fddbf40a04b9 100644 --- a/sdk/messaging/azservicebus/client_test.go +++ b/sdk/messaging/azservicebus/client_test.go @@ -5,6 +5,7 @@ package azservicebus import ( "context" + "io/ioutil" "net" "net/http" "os" @@ -97,19 +98,23 @@ func TestNewClientWithWebsockets(t *testing.T) { queue, cleanup := createQueue(t, connectionString, nil) defer cleanup() - webSocketCreateCalled := false - client, err := NewClientFromConnectionString(connectionString, &ClientOptions{ NewWebSocketConn: func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error) { - webSocketCreateCalled = true - opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}} + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig.KeyLogWriter = ioutil.Discard + + opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}, HTTPClient: &http.Client{ + Transport: transport, + }} + wssConn, _, err := websocket.Dial(ctx, args.Host, opts) if err != nil { return nil, err } - return websocket.NetConn(context.Background(), wssConn, websocket.MessageBinary), nil + conn := websocket.NetConn(context.Background(), wssConn, websocket.MessageBinary) + return conn, nil }, }) require.NoError(t, err) @@ -122,18 +127,15 @@ func TestNewClientWithWebsockets(t *testing.T) { }, nil) require.NoError(t, err) - // we have to test this down here since the connection is lazy initialized. - require.True(t, webSocketCreateCalled) - - receiver, err := client.NewReceiverForQueue(queue, &ReceiverOptions{ - ReceiveMode: ReceiveModeReceiveAndDelete, - }) - require.NoError(t, err) - - messages, err := receiver.ReceiveMessages(context.Background(), 1, nil) - require.NoError(t, err) - - require.EqualValues(t, "hello world", string(messages[0].Body)) + // NOTE: This error is coming from the `nhooyr.io/websocket` package. There's an + // open discussion here: + // https://github.com/nhooyr/websocket/discussions/380 + // + // The frame it's waiting for (at this point) is the other half of the websocket CLOSE handshake. + // I wireshark'd this and confirmed that the frame does arrive, it's just not read by the local + // package. In this context, since the connection has already shut down, this is harmless. + var expectedErr = "failed to close WebSocket: failed to read frame header: EOF" + require.EqualError(t, client.Close(context.Background()), expectedErr) } func TestNewClientUsingSharedAccessSignature(t *testing.T) { diff --git a/sdk/messaging/azservicebus/example_client_test.go b/sdk/messaging/azservicebus/example_client_test.go index 4f0b646ef819..f9cdf35c1712 100644 --- a/sdk/messaging/azservicebus/example_client_test.go +++ b/sdk/messaging/azservicebus/example_client_test.go @@ -4,15 +4,12 @@ package azservicebus_test import ( - "context" "fmt" - "net" "time" azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" - "nhooyr.io/websocket" ) func ExampleNewClient() { @@ -45,27 +42,6 @@ func ExampleNewClientFromConnectionString() { } } -func ExampleNewClient_usingWebsockets() { - // NOTE: If you'd like to authenticate via Azure Active Directory look at - // the `NewClient` function instead. - client, err = azservicebus.NewClientFromConnectionString(connectionString, &azservicebus.ClientOptions{ - NewWebSocketConn: func(ctx context.Context, args azservicebus.NewWebSocketConnArgs) (net.Conn, error) { - opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}} - wssConn, _, err := websocket.Dial(ctx, args.Host, opts) - - if err != nil { - return nil, err - } - - return websocket.NetConn(context.Background(), wssConn, websocket.MessageBinary), nil - }, - }) - - if err != nil { - panic(err) - } -} - func ExampleNewClient_configuringRetries() { // NOTE: If you'd like to authenticate via Azure Active Directory look at // the `NewClient` function instead. diff --git a/sdk/messaging/azservicebus/example_websockets_and_proxies_test.go b/sdk/messaging/azservicebus/example_websockets_and_proxies_test.go new file mode 100644 index 000000000000..124b876c8e8a --- /dev/null +++ b/sdk/messaging/azservicebus/example_websockets_and_proxies_test.go @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azservicebus_test + +import ( + "context" + "net" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "nhooyr.io/websocket" +) + +func ExampleNewClient_usingWebsocketsAndProxies() { + // You can use an HTTP proxy, with websockets, by setting the appropriate HTTP(s)_PROXY + // variable in your environment, as described in the https://pkg.go.dev/net/http#ProxyFromEnvironment + // function. + // + // A proxy is NOT required to use websockets. + newWebSocketConnFn := func(ctx context.Context, args azservicebus.NewWebSocketConnArgs) (net.Conn, error) { + opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}} + wssConn, _, err := websocket.Dial(ctx, args.Host, opts) + + if err != nil { + return nil, err + } + + return websocket.NetConn(ctx, wssConn, websocket.MessageBinary), nil + } + + // NOTE: If you'd like to authenticate via Azure Active Directory look at + // the `NewClient` function instead. + client, err = azservicebus.NewClientFromConnectionString(connectionString, &azservicebus.ClientOptions{ + NewWebSocketConn: newWebSocketConnFn, + }) + + if err != nil { + panic(err) + } + + // NOTE: For users of `nhooyr.io/websocket` there's an open discussion here: + // https://github.com/nhooyr/websocket/discussions/380 + // + // An error ("failed to read frame header: EOF") can be returned when the + // websocket connection is closed. This error will be returned from the + // `Client.Close` function and can be ignored, as the websocket "close handshake" + // has already completed. + defer client.Close(context.TODO()) +} diff --git a/sdk/messaging/azservicebus/go.mod b/sdk/messaging/azservicebus/go.mod index 0178ed5c81f2..383c288fbbf7 100644 --- a/sdk/messaging/azservicebus/go.mod +++ b/sdk/messaging/azservicebus/go.mod @@ -11,8 +11,6 @@ require ( ) require ( - // temporary until https://github.com/nhooyr/websocket/pull/310 is merged and released. - github.com/gin-gonic/gin v1.7.7 // indirect // used in tests only github.com/joho/godotenv v1.3.0 @@ -21,7 +19,7 @@ require ( github.com/stretchr/testify v1.7.0 // used in examples only - nhooyr.io/websocket v1.8.6 + nhooyr.io/websocket v1.8.7 ) require github.com/golang/mock v1.6.0 diff --git a/sdk/messaging/azservicebus/go.sum b/sdk/messaging/azservicebus/go.sum index 26aec1fb110a..5b65d5ef3c5f 100644 --- a/sdk/messaging/azservicebus/go.sum +++ b/sdk/messaging/azservicebus/go.sum @@ -15,17 +15,15 @@ github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= -github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs= -github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE= -github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= @@ -96,7 +94,6 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 h1:Tgea0cVUD0ivh5ADBX4WwuI12DUd2to3nCYe2eayMIw= golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -146,5 +143,5 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= -nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=