Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
First review
  • Loading branch information
Olshansk committed Sep 4, 2025
commit c422add9ad342f79092a4ca0cdfc7d8a40e2ed2b
15 changes: 14 additions & 1 deletion network/http/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
package http

import "errors"
import (
"errors"
"fmt"
"net/http"
)

// Endpoint's backend service returned a non 2xx HTTP status code.
var ErrRelayEndpointHTTPError = errors.New("endpoint returned non 2xx HTTP status code")

// EnsureHTTPSuccess returns an error if the status code is not a 2xx successful status code.
// Otherwise returns nil.
func EnsureHTTPSuccess(statusCode int) error {
if statusCode < http.StatusOK || statusCode >= http.StatusMultipleChoices {
return fmt.Errorf("%w: %d", ErrRelayEndpointHTTPError, statusCode)
}
return nil
}
10 changes: 1 addition & 9 deletions network/http/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,21 +262,13 @@ func (h *HTTPClientWithDebugMetrics) readAndValidateResponse(resp *http.Response
}

// Validate HTTP status code
if err := CheckHTTPStatusCode(resp.StatusCode); err != nil {
if err := EnsureHTTPSuccess(resp.StatusCode); err != nil {
return nil, err
}

return responseBody, nil
}

// CheckHTTPStatusCode validates that the HTTP status code is in the range of 2xx.
func CheckHTTPStatusCode(statusCode int) error {
if statusCode < http.StatusOK || statusCode >= http.StatusMultipleChoices {
return fmt.Errorf("%w: %d", ErrRelayEndpointHTTPError, statusCode)
}
return nil
}

// createDetailedHTTPTrace creates comprehensive HTTP tracing using the httptrace library:
// https://pkg.go.dev/net/http/httptrace
// Captures granular timing for every phase of the HTTP request lifecycle to identify bottlenecks.
Expand Down
116 changes: 116 additions & 0 deletions network/http/http_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package http

import (
"net/http"
"testing"

"github.com/stretchr/testify/require"
)

func TestEnsureHTTPSuccess(t *testing.T) {
tests := []struct {
name string
statusCode int
expectError bool
}{
// 2xx Success codes - should pass
{
name: "200 OK",
statusCode: http.StatusOK,
expectError: false,
},
{
name: "201 Created",
statusCode: http.StatusCreated,
expectError: false,
},
{
name: "202 Accepted",
statusCode: http.StatusAccepted,
expectError: false,
},
{
name: "204 No Content",
statusCode: http.StatusNoContent,
expectError: false,
},
{
name: "299 Last 2xx",
statusCode: 299,
expectError: false,
},
// Non-2xx codes - should fail
{
name: "100 Continue",
statusCode: http.StatusContinue,
expectError: true,
},
{
name: "300 Multiple Choices",
statusCode: http.StatusMultipleChoices,
expectError: true,
},
{
name: "301 Moved Permanently",
statusCode: http.StatusMovedPermanently,
expectError: true,
},
{
name: "400 Bad Request",
statusCode: http.StatusBadRequest,
expectError: true,
},
{
name: "401 Unauthorized",
statusCode: http.StatusUnauthorized,
expectError: true,
},
{
name: "403 Forbidden",
statusCode: http.StatusForbidden,
expectError: true,
},
{
name: "404 Not Found",
statusCode: http.StatusNotFound,
expectError: true,
},
{
name: "429 Too Many Requests",
statusCode: http.StatusTooManyRequests,
expectError: true,
},
{
name: "500 Internal Server Error",
statusCode: http.StatusInternalServerError,
expectError: true,
},
{
name: "502 Bad Gateway",
statusCode: http.StatusBadGateway,
expectError: true,
},
{
name: "503 Service Unavailable",
statusCode: http.StatusServiceUnavailable,
expectError: true,
},
{
name: "504 Gateway Timeout",
statusCode: http.StatusGatewayTimeout,
expectError: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := EnsureHTTPSuccess(tc.statusCode)
if tc.expectError {
require.Error(t, err)
require.ErrorIs(t, err, ErrRelayEndpointHTTPError)
} else {
require.NoError(t, err)
}
})
}
}
46 changes: 20 additions & 26 deletions protocol/shannon/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (rc *requestContext) setSelectedEndpoint(endpoint endpoint) {
// 2. Network conditions (session rollover periods)
func (rc *requestContext) executeRelayRequestStrategy(payload protocol.Payload) (protocol.Response, error) {
selectedEndpoint := rc.getSelectedEndpoint()
rc.hydratedLogger("executeRelayRequestStrategy")
rc.hydrateLogger("executeRelayRequestStrategy")

switch {
// ** Priority 1: Check Endpoint type **
Expand Down Expand Up @@ -375,7 +375,7 @@ func buildHeaders(payload protocol.Payload) map[string]string {
// - Updates the request context's selectedEndpoint for use by logging, metrics, and data logic.
// TODO_TECHDEBT(@adshmh): This is an interim solution to be replaced with intelligent fallback.
func (rc *requestContext) sendRelayWithFallback(payload protocol.Payload) (protocol.Response, error) {
rc.hydratedLogger("sendRelayWithFallback")
rc.hydrateLogger("sendRelayWithFallback")

// Convert timeout to time.Duration
relayTimeout := time.Duration(maxWaitBeforeFallbackMillisecond) * time.Millisecond
Expand Down Expand Up @@ -436,7 +436,7 @@ func (rc *requestContext) sendRelayToARandomFallbackEndpoint(payload protocol.Pa
return protocol.Response{}, fmt.Errorf("no fallback endpoints available")
}

rc.hydratedLogger("sendRelayToARandomFallbackEndpoint")
rc.hydrateLogger("sendRelayToARandomFallbackEndpoint")

// Select random fallback endpoint:
// - Convert map to slice for random selection
Expand Down Expand Up @@ -475,7 +475,7 @@ func (rc *requestContext) sendRelayToARandomFallbackEndpoint(payload protocol.Pa
// - Captures RelayMinerError data for reporting (but doesn't use it for classification).
// - Required to fulfill the FullNode interface.
func (rc *requestContext) sendProtocolRelay(payload protocol.Payload) (protocol.Response, error) {
rc.hydratedLogger("sendProtocolRelay")
rc.hydrateLogger("sendProtocolRelay")
rc.logger = hydrateLoggerWithPayload(rc.logger, &payload)

selectedEndpoint := rc.getSelectedEndpoint()
Expand All @@ -500,7 +500,7 @@ func (rc *requestContext) sendProtocolRelay(payload protocol.Payload) (protocol.
// Build and sign the relay request
signedRelayReq, err := rc.buildAndSignRelayRequest(payload, app)
if err != nil {
return defaultResponse, fmt.Errorf("SHOULD NEVER HAPPEN: failed to build and sign relay request: %w", err)
return defaultResponse, err
}

// Marshal relay request to bytes
Expand All @@ -526,13 +526,18 @@ func (rc *requestContext) sendProtocolRelay(payload protocol.Payload) (protocol.
if err != nil {
return defaultResponse, err
}
// Hydrate the response with the endpoint address
deserializedResponse.EndpointAddr = selectedEndpoint.Addr()

// Validate the backend service HTTP status code
if err := rc.validateBackendServiceResponse(deserializedResponse); err != nil {
return defaultResponse, err
// Ensure that serialized response contains a valid HTTP status code.
// Do not return non 2xx responses from the endpoint to the client.
responseHTTPStatusCode := deserializedResponse.HTTPStatusCode
if err := pathhttp.EnsureHTTPSuccess(responseHTTPStatusCode); err != nil {
errMsg := fmt.Sprintf("Backend service returned status non-2xx: %d", responseHTTPStatusCode)
rc.logger.Error().Err(err).Msg(errMsg)
return defaultResponse, fmt.Errorf("%w: %s", err, errMsg)
}

deserializedResponse.EndpointAddr = selectedEndpoint.Addr()
return deserializedResponse, nil
}

Expand Down Expand Up @@ -633,17 +638,6 @@ func (rc *requestContext) deserializeRelayResponse(response *servicetypes.RelayR
return deserializedResponse, nil
}

// validateBackendServiceResponse validates that the backend service returned a successful HTTP status code.
// Even though the relay miner responded correctly with a valid Shannon protocol response,
// we should sanction endpoints whose backend services are returning non-2xx status codes.
func (rc *requestContext) validateBackendServiceResponse(response protocol.Response) error {
if err := pathhttp.CheckHTTPStatusCode(response.HTTPStatusCode); err != nil {
return fmt.Errorf("%w: backend service returned status %d",
err, response.HTTPStatusCode)
}
return nil
}

func (rc *requestContext) signRelayRequest(unsignedRelayReq *servicetypes.RelayRequest, app apptypes.Application) (*servicetypes.RelayRequest, error) {
// Verify the relay request's metadata, specifically the session header.
// Note: cannot use the RelayRequest's ValidateBasic() method here, as it looks for a signature in the struct, which has not been added yet at this point.
Expand Down Expand Up @@ -742,7 +736,7 @@ func (rc *requestContext) trackRelayMinerError(relayResponse *servicetypes.Relay
}

relayMinerErr := relayResponse.RelayMinerError
rc.hydratedLogger("trackRelayMinerError")
rc.hydrateLogger("trackRelayMinerError")

// Log RelayMinerError details for visibility
rc.logger.With(
Expand All @@ -765,7 +759,7 @@ func (rc *requestContext) trackRelayMinerError(relayResponse *servicetypes.Relay
// - Records internal error on request for observations.
// - Logs error entry.
func (rc *requestContext) handleInternalError(internalErr error) (protocol.Response, error) {
rc.hydratedLogger("handleInternalError")
rc.hydrateLogger("handleInternalError")

// Log the internal error.
rc.logger.Error().Err(internalErr).Msg("Internal error occurred. This should be investigated as a bug.")
Expand All @@ -787,7 +781,7 @@ func (rc *requestContext) handleEndpointError(
endpointQueryTime time.Time,
endpointErr error,
) (protocol.Response, error) {
rc.hydratedLogger("handleEndpointError")
rc.hydrateLogger("handleEndpointError")
selectedEndpoint := rc.getSelectedEndpoint()
selectedEndpointAddr := selectedEndpoint.Addr()

Expand Down Expand Up @@ -834,7 +828,7 @@ func (rc *requestContext) handleEndpointSuccess(
endpointQueryTime time.Time,
endpointResponse *protocol.Response,
) error {
rc.hydratedLogger("handleEndpointSuccess")
rc.hydrateLogger("handleEndpointSuccess")
rc.logger = rc.logger.With("endpoint_response_payload_len", len(endpointResponse.Bytes))
rc.logger.Debug().Msg("Successfully deserialized the response received from the selected endpoint.")

Expand Down Expand Up @@ -905,14 +899,14 @@ func prepareURLFromPayload(endpointURL string, payload protocol.Payload) string
return url
}

// hydratedLogger:
// hydrateLogger:
// - Enhances the base logger with information from the request context.
// - Includes:
// - Method name
// - Service ID
// - Selected endpoint supplier
// - Selected endpoint URL
func (rc *requestContext) hydratedLogger(methodName string) {
func (rc *requestContext) hydrateLogger(methodName string) {
logger := rc.logger.With(
"method", methodName,
"service_id", rc.serviceID,
Expand Down
11 changes: 5 additions & 6 deletions protocol/shannon/fullnode_lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,14 @@ func embedHttpRequest(reqToEmbed *http.Request) (*servicetypes.RelayRequest, err
// - Deserialization is handled here (see sdktypes.DeserializeHTTPResponse below).
//
// Links:
// - Relay miner serializing the service response:
// https://github.com/pokt-network/poktroll/blob/e5024ea5d28cc94d09e531f84701a85cefb9d56f/pkg/relayer/proxy/synchronous.go#L361-L363
// - Relay response validation (potential package for serialization/deserialization):
// https://github.com/pokt-network/poktroll/blob/e5024ea5d28cc94d09e531f84701a85cefb9d56f/x/service/types/relay.go#L68
//
// - Relay miner serializing the service response:
// https://github.com/pokt-network/poktroll/blob/e5024ea5d28cc94d09e531f84701a85cefb9d56f/pkg/relayer/proxy/synchronous.go#L361-L363
// - Relay response validation (potential package for serialization/deserialization):
// https://github.com/pokt-network/poktroll/blob/e5024ea5d28cc94d09e531f84701a85cefb9d56f/x/service/types/relay.go#L68

// deserializeRelayResponse:
// - Uses the Shannon sdk to deserialize the relay response payload received from an endpoint into a protocol.Response.
// - Required because the relay miner (the endpoint serving the relay) returns the HTTP response in serialized format in its payload.

func deserializeRelayResponse(bz []byte) (protocol.Response, error) {
poktHttpResponse, err := sdktypes.DeserializeHTTPResponse(bz)
if err != nil {
Expand Down
16 changes: 9 additions & 7 deletions protocol/shannon/sanctions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ func classifyRelayError(logger polylog.Logger, err error) (protocolobservations.
protocolobservations.ShannonSanctionType_SHANNON_SANCTION_UNSPECIFIED
}

// Classify shannon-sdk errors
// Classify the errors and handle them appropriately.
// Errors make come from the SDK, HTTP, internal, etc....
switch {

// HTTP relay errors - check first to handle HTTP-specific classifications
case errors.Is(err, errSendHTTPRelay):
return classifyHttpError(logger, err)
Expand Down Expand Up @@ -110,10 +112,10 @@ func classifyRelayError(logger polylog.Logger, err error) (protocolobservations.
return protocolobservations.ShannonEndpointErrorType_SHANNON_ENDPOINT_ERROR_TIMEOUT,
protocolobservations.ShannonSanctionType_SHANNON_SANCTION_SESSION

// Endpoint's backend service returned a non 2xx HTTP status code.
// Backend service returned non-2xx HTTP status (4xx, 5xx errors)
// Session-level sanction allows recovery when service stabilizes
case pathhttp.ErrRelayEndpointHTTPError:
// TODO_IMPROVE: Make this a sanction that just lasts a few blocks
// Apply session sanction for backend service HTTP errors (502, 503, etc.)
// TODO_IMPROVE(#381): Make this a sanction that just lasts a few blocks
return protocolobservations.ShannonEndpointErrorType_SHANNON_ENDPOINT_ERROR_HTTP_BAD_RESPONSE,
protocolobservations.ShannonSanctionType_SHANNON_SANCTION_SESSION

Expand All @@ -139,10 +141,10 @@ func classifyRelayError(logger polylog.Logger, err error) (protocolobservations.
func classifyHttpError(logger polylog.Logger, err error) (protocolobservations.ShannonEndpointErrorType, protocolobservations.ShannonSanctionType) {
logger = logger.With("error_message", err.Error())

// Endpoint returned non 2xx HTTP Status code
// Backend service returned non-2xx HTTP status code
// Session-level sanction allows temporary failures to recover
if errors.Is(err, pathhttp.ErrRelayEndpointHTTPError) {
// TODO_IMPROVE: Make this a sanction that just lasts a few blocks
// Apply session sanction for backend service HTTP errors (502, 503, etc.)
// TODO_IMPROVE(#381): Make this a sanction that just lasts a few blocks
return protocolobservations.ShannonEndpointErrorType_SHANNON_ENDPOINT_ERROR_HTTP_NON_2XX_STATUS,
protocolobservations.ShannonSanctionType_SHANNON_SANCTION_SESSION
}
Expand Down
Loading